Error-Related Operators
In this lesson, we’re going to review the following operators:
doOnError
If you remember from the previous module, doOnError
is one of the life cycle hooks Reactor provides.
There are three versions of doOnError
for Mono<T>
. For all of them, the Consumer
is executed first, and then the onError
signal is propagated.
The first one adds behavior when the Mono
completes with an error matching the given exception type:
<E extends Throwable> Mono<T> doOnError(
Class<E> exceptionType,
Consumer<? super E> onError
)
The second one adds behavior triggered when the Mono
completes with an error:
Mono<T> doOnError(
Consumer<? super Throwable> onError
)
And the third one adds behavior when the Mono
completes with an error matching the given predicate:
Mono<T> doOnError(
Predicate<? super Throwable> predicate,
Consumer<? super Throwable> onError
)
Flux
also has three versions. Once again, the Consumer
is executed first, and then the onError
signal is propagated.
The first one adds behavior when the Flux
completes with an error matching the given exception type:
<E extends Throwable> Flux<T> doOnError(
Class<E> exceptionType, Consumer<? super E> onError
)
The second one adds behavior triggered when the Flux
completes with an error:
Flux<T> doOnError(
Consumer<? super Throwable> onError
)
And the third one adds behavior when the Flux
completes with an error matching the given predicate:
Flux<T> doOnError(
Predicate<? super Throwable> predicate,
Consumer<? super Throwable> onError
)
Remember that lifecycle hooks add behavior at certain points of the execution of a reactive sequence. With doOnError
, you can catch the exception, do something with it, and then let it propagated downstream.
In other words, using doOnError
like this:
Flux<Integer> integerFlux =
Flux.just(1, 2, 3, 4, 5);
integerFlux
.map(i -> i/(i-3))
.doOnError(ArithmeticException.class,
e ->
System.out.println(
"ArithmeticException: " + e.getMessage()
)
)
.subscribe(System.out::println);
Would be the equivalent of this:
try {
i = i/(i-3);
} catch(ArithmeticException e) {
System.out.println(
"ArithmeticException: " + e.getMessage()
);
throw e;
}
This is the result of running the example:
0
-2
ArithmeticException: / by zero
[ERROR] (main) Operator called default onErrorDropped - reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.ArithmeticException: / by zero
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.ArithmeticException: / by zero
Caused by: java.lang.ArithmeticException: / by zero
at net.eherrera.reactor.m5.Test_02_ErrorOperators.lambda$example_01_doOnError$0(Test_02_ErrorOperators.java:14)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)
at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:172)
at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:97)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144)
at reactor.core.publisher.LambdaSubscriber.onSubscribe(LambdaSubscriber.java:119)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:178)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
at reactor.core.publisher.Flux.subscribe(Flux.java:8466)
at reactor.core.publisher.Flux.subscribeWith(Flux.java:8639)
at reactor.core.publisher.Flux.subscribe(Flux.java:8436)
at reactor.core.publisher.Flux.subscribe(Flux.java:8360)
at reactor.core.publisher.Flux.subscribe(Flux.java:8303)
...
As you can see, the error is propagated to the subscribe
method. We can add a consumer of the error to avoid the stack trace:
Flux<Integer> integerFlux =
Flux.just(1, 2, 3, 4, 5);
integerFlux
.map(i -> i/(i-3))
.doOnError(ArithmeticException.class,
e ->
System.out.println(
"doOnError: " + e.getMessage()
)
)
.subscribe(System.out::println,
System.out::println
);
This is the result:
0
-2
doOnError: / by zero
java.lang.ArithmeticException: / by zero
By the way, another lifecycle method that can be useful when handling exceptions is doFinally
:
// For Mono
Mono<T> doFinally(
Consumer<SignalType> onFinally
)
// For Flux
Flux<T> doFinally(Consumer<SignalType> onFinally)
Remember, this method adds behavior after the sequence terminates for any reason, including cancellation and errors. This way, it can work similarly to a finally
clause in a try-catch
statement.
Consider the following example:
Flux<Integer> integerFlux = Flux.just(1, 2, 3, 4, 5);
integerFlux
.map(i -> i/(i-3))
.doOnError(ArithmeticException.class,
e -> System.out.println(
"doOnError: " + e.getMessage()
)
)
.doFinally(signalType ->
System.out.println("doFinally: " + signalType)
)
.subscribe(System.out::println,
System.out::println
);
It’s the equivalent of the following try-catch-finally
statement:
try {
i = i/(i-3);
} catch(ArithmeticException e) {
System.out.println(
"ArithmeticException: " + e.getMessage()
);
throw e;
} finally {
System.out.println("doFinally: ...");
}
And this will be the result:
0
-2
doOnError: / by zero
java.lang.ArithmeticException: / by zero
doFinally: onError
Notice the print statement from doFinally
was the last one to be executed and the signal type is onError
.
onErrorReturn
Now, what if we want to return a default or fallback value when catching an exception?
We can do this with the operator onErrorReturn
.
There are three versions of this operator for Mono<T>
.
The first one emits a captured fallback value when an error is observed on the Mono
.
Mono<T> onErrorReturn(T fallback)
The second one emits a captured fallback value when an error of the specified type is observed on the Mono
:
<E extends Throwable> Mono<T> onErrorReturn(
Class<E> type,
T fallbackValue
)
And the third one emits a captured fallback value when an error matching the given predicate is observed on the Mono
:
Mono<T> onErrorReturn(
Predicate<? super Throwable> predicate,
T fallbackValue
)
Flux
also has three versions of this operator.
The first one emits a captured fallback value when an error is observed on the Flux
:
Flux<T> onErrorReturn(T fallbackValue)
The second one emits a captured fallback value when an error of the specified type is observed on the Flux
:
<E extends Throwable> Flux<T> onErrorReturn(
Class<E> type,
T fallbackValue
)
And the third one emits a captured fallback value when an error matching the given predicate is observed on the Flux
:
Flux<T> onErrorReturn(
Predicate<? super Throwable> predicate,
T fallbackValue
)
For example, when dividing by zero, instead of letting the ArithmeticException
propagate, we can provide a fallback value, let’s say 0
:
Flux<Integer> integerFlux =
Flux.just(1, 2, 3, 4, 5);
integerFlux
.map(i -> i/(i-3))
.onErrorReturn(
ArithmeticException.class,
0
)
.subscribe(
System.out::println,
System.out::println
);
This is the result:
0
-2
0
Notice that after the error, the sequence stopped. Remember, all errors in Reactor are terminal, onErrorReturn
only provides a fallback value.
Finally, you can also apply a Predicate
on the exception to decide whether or not to return a fallback value.
For example, we can test if the exception message contains a particular string:
Flux<Integer> integerFlux =
Flux.just(1, 2, 3, 4, 5);
integerFlux
.map(i -> i/(i-3))
.onErrorReturn(
e -> e.getMessage().contains("3"),
0
)
.subscribe(
System.out::println,
System.out::println
);
In this case, since the exception message doesn’t contain the string 3
, the exception is caught by the consumer of the subscribe
method:
0
-2
java.lang.ArithmeticException: / by zero
onErrorResume
Instead of providing a simple value, you might want to provide a fallback sequence. You can do this with onErrorResume
.
There are three versions of this operator for Mono<T>
. All of them use a function to choose the fallback depending on the error.
The first one subscribes to a fallback publisher when any error occurs:
Mono<T> onErrorResume(
Function<? super Throwable,? extends Mono<? extends T>> fallback
)
The second one subscribes to a fallback publisher when an error matching the given type occurs:
<E extends Throwable> Mono<T> onErrorResume(
Class<E> type,
Function<? super E, ? extends Mono<? extends T>> fallback
)
The third one subscribes to a fallback publisher when an error matching a given predicate occurs:
Mono<T> onErrorResume(
Predicate<? super Throwable> predicate,
Function<? super Throwable, ? extends Mono<? extends T>> fallback
)
Flux
also has three versions for this operator that use a function to choose the fallback depending on the error.
The first one subscribes to a returned fallback publisher when any error occurs:
Flux<T> onErrorResume(
Function<? super Throwable, ? extends Publisher<? extends T>> fallback
)
The second one subscribes to a fallback publisher when an error matching the given type occurs:
<E extends Throwable> Flux<T> onErrorResume(
Class<E> type,
Function<? super E, ? extends Publisher<? extends T>> fallback
)
And the third one subscribes to a fallback publisher when an error matching a given predicate occurs:
Flux<T> onErrorResume(
Predicate<? super Throwable> predicate,
Function<? super Throwable, ? extends Publisher<? extends T>> fallback
)
Just like onErrorReturn
, onErrorResume
has versions that take the class of the exception that you want to handle or a Predicate
to conditionally handle an exception.
However, unlike onErrorReturn
, you can pass to onErrorResume
a function that takes the exception thrown and returns a fallback publisher. This way, the original sequence will be canceled, and onErrorResume
will subscribe to the fallback publisher to keep emitting elements to the subscriber.
For example, as we know that the value 3
prevents the emission of the values 4
and 5
, we can define a fallback publisher with these elements:
Flux<Integer> integerFlux =
Flux.just(1, 2, 3, 4, 5);
integerFlux
.map(i -> i/(i-3))
.onErrorResume(e -> Flux.just(4, 5))
.subscribe(System.out::println,
System.out::println
);
This will be the result:
0
-2
4
5
However, notice that the map
operator that divides the value by i-3
is not executed against the fallback sequence, this is emitted just as it is. That’s something to take into account.
All right, but you might be wondering, what if we place onErrorResume
before the map
operator? Something like this:
Flux<Integer> integerFlux =
Flux.just(1, 2, 3, 4, 5);
integerFlux
.onErrorResume(e -> Flux.just(4, 5))
.map(i -> i/(i-3))
.subscribe(System.out::println,
System.out::println
);
What do you think it will happen?
Will the elements from the fallback publisher go through the map
operator?
If you run the above example, this will be the result:
0
-2
java.lang.ArithmeticException: / by zero
As you can see, the exception thrown by the map
operator was not caught, so we can infer that onErrorResume
can only catch the exceptions thrown upstream of it.
onErrorResume
is a very useful operator because given the exception thrown, you can dynamically generate a fallback Publisher
or value wrapped in Publisher
.
Taking this into account, and with the help of the error
method from Mono
or Flux
, you can replicate the behavior of catching an exception, wrapping it into a different exception, and re-throw it.
For example, the following imperative code:
try {
i = i/(i-3);
} catch (ArithmeticException e) {
throw new RuntimeException(
"Unexpected exception", e
);
}
Would be the equivalent of the following reactive code:
Flux<Integer> integerFlux =
Flux.just(1, 2, 3, 4, 5);
integerFlux
.map(i -> i/(i-3))
.onErrorResume(e ->
Flux.error(
new RuntimeException("Unexpected exception", e)
)
)
.subscribe(System.out::println,
System.out::println
);
And this would be the result:
0
-2
java.lang.RuntimeException: Unexpected exception
onErrorMap
Another option for replicating the functionality of “catching, wrapping into a different exception, and re-throwing it” is the onErrorMap
operator.
There are three versions of this operator for Mono<T>
.
The first version transforms any error emitted by the Mono
by synchronously applying a function to it:
Mono<T> onErrorMap(
Function<? super Throwable, ? extends Throwable> mapper
)
The second version transforms an error emitted by the Mono
by synchronously applying a function to it if the error matches the given type:
<E extends Throwable> Mono<T> onErrorMap(
Class<E> type,
Function<? super E,? extends Throwable> mapper
)
And the third version transforms an error emitted by the Mono
by synchronously applying a function to it if the error matches the given predicate:
Mono<T> onErrorMap(
Predicate<? super Throwable> predicate,
Function<? super Throwable, ? extends Throwable> mapper
)
Flux
also provides three versions. The first one transforms any error emitted by the Flux
by synchronously applying a function to it:
Flux<T> onErrorMap(
Function<? super Throwable, ? extends Throwable> mapper
)
The second one transforms an error emitted by the Flux
by synchronously applying a function to it if the error matches the given type:
<E extends Throwable> Flux<T> onErrorMap(
Class<E> type,
Function<? super E,? extends Throwable> mapper
)
And the third one transforms an error emitted by the Flux
by synchronously applying a function to it if the error matches the given predicate:
Flux<T> onErrorMap(
Predicate<? super Throwable> predicate,
Function<? super Throwable,? extends Throwable> mapper
)
For example, the previous imperative code:
try {
i = i/(i-3);
} catch (ArithmeticException e) {
throw new RuntimeException(
"Unexpected exception", e
);
}
Would look like this using onErrorMap
:
Flux<Integer> integerFlux =
Flux.just(1, 2, 3, 4, 5);
integerFlux
.map(i -> i/(i-3))
.onErrorMap(e -> new RuntimeException(
"Unexpected exception", e)
)
.subscribe(System.out::println,
System.out::println
);
The result is the same as before:
0
-2
java.lang.RuntimeException: Unexpected exception
The only difference with onErrorResume
is that we didn’t have to use the error
method to wrap the exception in a Publisher
, the function onErrorMap
takes returns the exception directly.
onErrorContinue
So all errors are terminal, but what if we need to keep processing the stream after an exception is thrown? For this, we can use the onErrorContinue
operator.
There are three versions of this operator for Mono<T>
. All of them let compatible operators upstream recover from errors by dropping the incriminating element from the sequence and continuing with subsequent elements:
Mono<T> onErrorContinue(
BiConsumer<Throwable,Object> errorConsumer
)
// Only errors matching the specified type
// are recovered from.
<E extends Throwable> Mono<T> onErrorContinue(
Class<E> type,
BiConsumer<Throwable,Object> errorConsumer
)
// Takes a Predicate used to filter which errors
// should be resumed from.
<E extends Throwable> Mono<T> onErrorContinue(
Predicate<E> errorPredicate,
BiConsumer<Throwable,Object> errorConsumer
)
And here are the versions for Flux<T>
:
Flux<T> onErrorContinue(
BiConsumer<Throwable,Object> errorConsumer
)
// Only errors matching the specified type
// are recovered from.
<E extends Throwable> Flux<T> onErrorContinue(
Class<E> type,
BiConsumer<Throwable,Object> errorConsumer
)
// Takes a Predicate used to filter which errors
// should be resumed from.
<E extends Throwable> Flux<T> onErrorContinue(
Predicate<E> errorPredicate,
BiConsumer<Throwable,Object> errorConsumer
)
As you can see from the comments, onErrorContinue
allows upstream operators (this is a key concept) to drop the element that caused the exception and continue processing the subsequent elements.
The exception and the value that caused that exception are passed to the BiConsumer
all versions take.
Here’s an example:
Flux<Integer> integerFlux =
Flux.just(1, 2, 3, 4, 5);
integerFlux
.map(i -> i/(i-3))
.onErrorContinue((e, i) -> {
System.out.format(
"The value %d caused the exception: %s\n", i, e
);
})
.subscribe(System.out::println,
System.out::println
);
This is the result:
0
-2
The value 3 caused the exception: java.lang.ArithmeticException: / by zero
4
2
As you can see, after the error, the sequence continued processing the elements 4
and 5
.
However, throwing an exception inside the BiConsumer
will propagate it downstream in place of the original one.
Here’s an example:
Flux<Integer> integerFlux =
Flux.just(1, 2, 3, 4, 5);
integerFlux
.map(i -> i/(i-3))
.onErrorContinue((e, i) -> {
System.out.format(
"The value %d caused the exception: %s\n", i, e
);
throw new RuntimeException(e);
})
.subscribe(System.out::println,
System.out::println
);
This is the result:
0
-2
The value 3 caused the exception: java.lang.ArithmeticException: / by zero
java.lang.RuntimeException: java.lang.ArithmeticException: / by zero
Despite all this, the use of onErrorContinue
is not recommended.
Consider this example:
Flux<Integer> integerFlux = Flux.just(1, 2, 3, 4, 5);
integerFlux
.map(i -> i/(i-3))
.onErrorResume(e -> Mono.just(99))
.onErrorContinue((e, i) -> {
System.out.format(
"The value %d caused the exception: %s\n", i, e
);
throw new RuntimeException(e);
})
.subscribe(System.out::println,
System.out::println
);
What do you think will happen?
Will the error be caught by onErrorResume
?
By onErrorContinue
?
Both?
This is the result:
0
-2
The value 3 caused the exception: java.lang.ArithmeticException: / by zero
99
Well, the error was caught by both, onErrorResume
and onErrorContinue
. However, onErrorContinue
caught the error first and didn’t continue processing elements 4
and 5
, the fallback Mono
was emitted instead.
On the other hand, if we switch the order of onErrorResume
and onErrorContinue
:
Flux<Integer> integerFlux = Flux.just(1, 2, 3, 4, 5);
integerFlux
.map(i -> i/(i-3))
.onErrorContinue((e, i) -> {
System.out.format(
"The value %d caused the exception: %s\n", i, e
);
})
.onErrorResume(e -> Mono.just(99))
.subscribe(System.out::println,
System.out::println
);
This will be the result:
0
-2
The value 3 caused the exception: java.lang.ArithmeticException: / by zero
4
2
onErrorResume
will not work, it will do nothing.
So you have to be careful when mixing onErrorResume
and onErrorContinue
. In particular, because sometimes a caller might add onErrorContinue
to the chain of operators, unexpectedly changing the behavior.
According to the documentation:
Note that onErrorContinue() is a specialist operator that can make the behaviour of your reactive chain unclear. It operates on upstream, not downstream operators, it requires specific operator support to work, and the scope can easily propagate upstream into library code that didn’t anticipate it (resulting in unintended behavior.)
Next, the documentation goes on to provide a solution that will avoid leaking upstream:
In most cases, you should instead handle the error inside the specific function which may cause it. Specifically, on each inner publisher you can use doOnError to log the error, and onErrorResume(e -> Mono.empty()) to drop erroneous elements.
Here’s an example that encapsulates the code that throws an exception in a flatMap
operator:
Flux<Integer> integerFlux =
Flux.just(1, 2, 3, 4, 5);
integerFlux
.flatMap(val ->
Mono.just(val)
.map(i -> i/(i-3))
.doOnError(
e -> System.out.println(
"Inside exception: " + e
)
)
.onErrorResume(e -> Mono.empty())
)
.subscribe(System.out::println,
System.out::println
);
This is the result:
0
-2
Inside exception: java.lang.ArithmeticException: / by zero
4
2
It has the same effect as using onErrorContinue
, but you won’t have unexpected side effects if this operator appears before flatMap
:
Flux<Integer> integerFlux =
Flux.just(1, 2, 3, 4, 5);
integerFlux
.onErrorContinue((e, i) -> {
System.out.format(
"The value %d caused the exception: %s\n", i, e
);
})
.flatMap(val ->
Mono.just(val)
.map(i -> i/(i-3))
.doOnError(
e -> System.out.println(
"Inside exception: " + e
)
)
.onErrorResume(e -> Mono.empty())
)
.subscribe(System.out::println,
System.out::println
);
For the above example, onErrorContinue
won’t have any effect:
0
-2
Inside exception: java.lang.ArithmeticException: / by zero
4
2
onErrorStop
If we modify the previous example to place onErrorContinue
after the flatMap
operator:
Flux<Integer> integerFlux =
Flux.just(1, 2, 3, 4, 5);
integerFlux
.flatMap(val ->
Mono.just(val)
.map(i -> i/(i-3))
.doOnError(e ->
System.out.println(
"Inside exception: " + e
)
)
.onErrorResume(e -> Mono.empty())
)
.onErrorContinue((e, i) -> {
System.out.format(
"The value %d caused the exception: %s\n", i, e
);
})
.subscribe(System.out::println,
System.out::println
);
This time, onErrorContinue
will be executed:
0
-2
The value 3 caused the exception: java.lang.ArithmeticException: / by zero
4
2
But if you don’t want this behavior, you can use onErrorStop
:
// For Mono
Mono<T> onErrorStop()
// For Flux
Flux<T> onErrorStop()
If an onErrorContinue(BiConsumer)
variant has been used downstream, onErrorStop
reverts to the default mode where errors are terminal events upstream. In other words, after onErrorStop
, onErrorContinue
will have no effect.
This way, if we add to the above example the onErrorStop
operator (after onErrorResume
and before onErrorContinue
):
Flux<Integer> integerFlux =
Flux.just(1, 2, 3, 4, 5);
integerFlux
.flatMap(val ->
Mono.just(val)
.map(i -> i/(i-3))
.doOnError(e ->
System.out.println(
"Inside exception: " + e
)
)
.onErrorResume(e -> Mono.empty())
.onErrorStop()
)
.onErrorContinue((e, i) -> {
System.out.format(
"The value %d caused the exception: %s\n", i, e
);
})
.subscribe(System.out::println,
System.out::println
);
This time, onErrorContinue
will have no effect:
0
-2
Inside exception: java.lang.ArithmeticException: / by zero
4
2