Skip to main content Link Menu Expand (external link) Document Search Copy Copied
Dark theme

Life Cycle Hooks


We’ve been using the version of the subscribe method that takes a Consumer that is invoked for each element that a Mono or Flux emit.

This Consumer is invoked for every onNext signal.

However, the log operator (available for Mono and Flux) allows us to log all the Reactive Streams signals using Logger:

Flux<T> log()

This operator has more versions.

There’s one to observe Reactive Streams signals matching the passed filter options, and trace them using a specific user-provided Logger (instead of resolving one) at Level.INFO level:

Flux<T> log(Logger logger)

Another one that also takes a boolean to capture the current stack to display operator class/line number (default in other versions is false), and a varargs SignalType option to filter log messages from particular signals:

Flux<T> log(Logger logger, Level level, boolean showOperatorLine, SignalType... options)

By default, this operator will use Level.INFO and java.util.logging. However, SLF4J will be used if it’s available.

There’s also a version that takes the category to be mapped into logger configuration (e.g. org.springframework.reactor). If category ends with "." like "reactor.", a generated operator suffix will be added (e.g. "reactor.Flux.Map"):

Flux<T> log(String category)

Another that takes the level to enforce for the sequence (only FINEST, FINE, INFO, WARNING, and SEVERE), and a varargs SignalType option to filter log messages:

Flux<T> log(String category, Level level, SignalType... options)

And another that in addition to the category, level, and SignalType options, also takes a boolean to capture the current stack to display operator class/line number:

Flux<T> log(String category, Level level, boolean showOperatorLine, SignalType... options)

Consider this example:

Flux.just(1, 2, 3)
    .log()
    .subscribe(System.out::println);

It will print information about the signals onSubscribe, onNext, and onComplete (in addition to the values of the Flux):

[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(1)
1
[ INFO] (main) | onNext(2)
2
[ INFO] (main) | onNext(3)
3
[ INFO] (main) | onComplete()

However, if we want to add a customized behavior to these signals, Reactor provides some hooks to do it.

First, the hooks for Mono<T>.

This hook adds behavior after the Mono terminates, either successfully or with an error:

Mono<T> doAfterTerminate(Runnable afterTerminate)

This one adds behavior after the Mono terminates for any reason, including cancellation:

Mono<T> doFinally(Consumer<SignalType> onFinally)

This one adds behavior triggered before the Mono is subscribed to, which should be the first event:

Mono<T> doFirst(Runnable onFirst)

This one adds behavior triggered when the Mono is cancelled:

Mono<T> doOnCancel(Runnable onCancel)

The following hook potentially modifies the behavior of the whole chain of operators upstream (before) of this one, conditionally cleaning up elements that get discarded by these operators. discardHook must be idempotent and safe to use on any instance of the desired type. Also, calls to this method are additive, and the order of invocation is the same as the order of declaration:

Mono<T> doOnDiscard(Class<R> type, Consumer<? super R> discardHook)

This one adds behavior triggered when the Mono emits an item, fails with an error or completes successfully:

Mono<T> doOnEach(Consumer<? super Signal<T>> signalConsumer)

This one adds behavior when the Mono completes with an error matching the given exception type (the Consumer is executed first, then the onError signal is propagated):

<E extends Throwable> Mono<T> doOnError(Class<E> exceptionType, Consumer<? super E> onError)

The following version adds behavior triggered when the Mono completes with an error:

Mono<T> doOnError(Consumer<? super Throwable> onError)

And this version adds behavior when the Mono completes with an error matching the given predicate:

Mono<T> doOnError(Predicate<? super Throwable> predicate, Consumer<? super Throwable> onError)

Moving on, this hook adds behavior triggered when the Mono emits an item successfully:

Mono<T> doOnNext(Consumer<? super T> onNext)

This one adds behavior triggering a LongConsumer when the Mono receives any request:

Mono<T> doOnRequest(LongConsumer consumer)

This one adds behavior when the Mono is being subscribed, that is to say when a Subscription has been produced by the Publisher and is being passed to Subscriber.onSubscribe(Subscription):

Mono<T> doOnSubscribe(Consumer<? super Subscription> onSubscribe)

This one adds behavior as soon as the Mono can be considered to have completed successfully:

Mono<T> doOnSuccess(Consumer<? super T> onSuccess)

And this adds behavior when the Mono terminates, either by completing with a value, completing empty or failing with an error:

Mono<T> doOnTerminate(Runnable onTerminate)

For Flux<T>, this hook adds behavior after the Flux terminates, either successfully or with an error:

Flux<T> doAfterTerminate(Runnable afterTerminate)

This one adds behavior after the Flux terminates for any reason, including cancellation:

Flux<T> doFinally(Consumer<SignalType> onFinally)

This one adds behavior triggered before the Flux is subscribed to, which should be the first event:

Flux<T> doFirst(Runnable onFirst)

This one adds behavior triggered when the Flux is cancelled:

Flux<T> doOnCancel(Runnable onCancel)

This one adds behavior when the Flux completes successfully:

Flux<T> doOnComplete(Runnable onComplete)

The following hook potentially modifies the behavior of the whole chain of operators upstream (before) of this one, conditionally cleaning up elements that get discarded by these operators. discardHook must be idempotent and safe to use on any instance of the desired type, and calls to this method are additive, and the order of invocation is the same as the order of declaration:

Flux<T> doOnDiscard(Class<R> type, Consumer<? super R> discardHook)

This one adds behavior when the Flux emits an item, fails with an error or completes successfully:

Flux<T> doOnEach(Consumer<? super Signal<T>> signalConsumer)

This one adds behavior when the Flux completes with an error matching the given exception type (the Consumer is executed first, then the onError signal is propagated):

<E extends Throwable> Flux<T> doOnError(Class<E> exceptionType, Consumer<? super E> onError)

This version adds behavior triggered when the Flux completes with an error:

Flux<T> doOnError(Consumer<? super Throwable> onError)

This other version adds behavior when the Flux completes with an error matching the given predicate:

Flux<T> doOnError(Predicate<? super Throwable> predicate, Consumer<? super Throwable> onError)

Moving on, this hook adds behavior triggered when the Flux emits an item successfully:

Flux<T> doOnNext(Consumer<? super T> onNext)

This one adds behavior triggering a LongConsumer when the Flux receives any request:

Flux<T> doOnRequest(LongConsumer consumer)

This adds behavior when the Flux is being subscribed, that is to say when a Subscription has been produced by the Publisher and is being passed to the Subscriber.onSubscribe(Subscription):

Flux<T> doOnSubscribe(Consumer<? super Subscription> onSubscribe)

And this adds behavior when the Flux terminates, either by completing with a value, completing empty or failing with an error:

Flux<T> doOnTerminate(Runnable onTerminate)

These hooks do not change the sequence’s data, so most of the time, they are used to isolate side effects like logging.

Also, notice that, except for doFirst and doFinally, the names of all these hooks start with doOn.

However, Mono and Flux have basically the same hooks. The only exception is the hook to add behavior when the sequence completes successfully. For Mono is doOnSuccess, which takes the element emitted, whereas for Flux is doOnComplete, which doesn’t take the elements emitted.

Here’s an example for Mono:

Mono.just(1)
    .log()
    .doOnSuccess(
        i -> System.out.println(
                "Mono completed successfully: " + i)
    )
    .subscribe();

This is the result:

[ INFO] (main) | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(1)
Mono completed successfully: 1
[ INFO] (main) | onComplete()

And here’s an example for Flux:

Flux.just(1, 2, 3)
    .log()
    .doOnComplete(
        () -> System.out.println(
                "Flux completed successfully")
    )
    .subscribe();

And this is the result:

[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(1)
[ INFO] (main) | onNext(2)
[ INFO] (main) | onNext(3)
[ INFO] (main) | onComplete()
Flux completed successfully 

Notice that we still need to subscribe, otherwise, nothing will be executed.

Another thing to notice is that although some methods seem to be the same, some of them take a Runnable while others take either the value or the signal emitted.

The following example adds print statements to some hooks so you can see at what point they are executed:

Flux.just(1, 2, 3)
    .log()
    .doOnSubscribe(
        subscription -> 
            System.out.println("doOnSubscribe: " 
                                  + subscription)
    )
    .doOnRequest(
        l -> 
            System.out.println("doOnRequest: " 
                                  + l)
    )
    .doFirst(
        () -> 
            System.out.println("doFirst")
    )
    .doOnNext(
        i ->
            System.out.println("doOnNext: " 
                                  + i)
    )
    .doOnEach(
        integerSignal -> 
            System.out.println("doOnEach: " 
                                  + integerSignal)
    )
    .doFinally(
        signalType ->
            System.out.println("doFinally: " 
                                  + signalType)
    )
    .doAfterTerminate(
        () -> 
            System.out.println("doAfterTerminate")
    )
    .doOnComplete(
        () -> 
            System.out.println("doOnComplete")
    )
    .doOnTerminate(
        () -> 
            System.out.println("doOnTerminate")
    )
    .subscribe();

This is the result:

doFirst
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
doOnSubscribe: reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber@67c27493
doOnRequest: 9223372036854775807
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(1)
doOnNext: 1
doOnEach: doOnEach_onNext(1)
[ INFO] (main) | onNext(2)
doOnNext: 2
doOnEach: doOnEach_onNext(2)
[ INFO] (main) | onNext(3)
doOnNext: 3
doOnEach: doOnEach_onNext(3)
[ INFO] (main) | onComplete()
doOnEach: onComplete()
doOnComplete
doOnTerminate
doAfterTerminate
doFinally: onComplete

The first hook that is executed is doFirst. It takes a Runnable so use this hook if you want to execute something before subscribing.

After the subscription is made, doOnSubscribe is executed, receiving the Subscription object.

Next, doOnRequest, receiving a very big number (the value of Long.MAX_VALUE). That’s why, in practical terms, the log shows the request as unbounded.

Then, the Flux starts emitting values, calling onNext, doOnNext, and doOnEach (in that order) for each of these values.

After the last value, there’s a last call to doOnEach showing the onComplete signal.

Finally, doOnComplete, doOnTerminate, doAfterTerminate, and doFinally are executed in that order.