Nothing Happens Until You Subscribe
If you run the examples we’ve reviewed so far, you’ll see that the elements contained in the publishers are not printed (only a few messages to show you how eager creation works). We also didn’t do anything with them.
We can add a lot of operators, there are a lot to choose from:
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.map(n -> n * 2)
.skip(1)
.count()
But the truth is that none of these operators will be executed until someone subscribes to the Flux
.
Do you remember the diagram of how reactive streams work?
The call to the subscribe
method in the Publisher
interface is what triggers everything, the creation of the Subscription
object so the client can request elements, the sending of those elements, etc.
From the Reactor documentation:
Once you subscribe, a chain of Subscriber objects is created, backward (up the chain) to the first publisher. This is effectively hidden from you. All you can see is the outer layer of Flux (or Mono) and Subscription, but these intermediate operator-specific subscribers are where the real work happens.
So always remember:
Nothing happens until you subscribe.
But, what are the options to subscribe to a Publisher
?
In Reactor, we have many versions of the subscribe
method for Mono
and Flush
.
For example, to subscribe to the publisher and request elements without limits:
Disposable subscribe()
To subscribe to the publisher with a Consumer
that will receive all the published elements:
Disposable subscribe(Consumer<? super T> consumer)
To subscribe to the publisher with a Consumer
that will receive all the published elements, as well as a Consumer
that will handle errors:
Disposable subscribe(
Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer
)
To subscribe to the publisher with a Consumer
that will receive all the published elements, as well as a Consumer
that will handle errors, and a Runnable
that will be executed when the sequence completes:
Disposable subscribe(
Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer
)
To subscribe to the publisher with a Consumer
that will receive all the published elements, as well as a Consumer
that will handle errors, a Runnable
that will be executed when the sequence completes, and a Context
object that will be tied to the subscription:
Disposable subscribe(
Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer, Context initialContext
)
And to subscribe to the publisher with a Subscriber
object to have more control over backpressure and the request:
void subscribe(Subscriber<? super T> actual)
Disposable is a functional interface that represents a task or resource that can be canceled or disposed:
@FunctionalInterface
public interface Disposable {
// Cancel or dispose the underlying task or resource.
void dispose();
// Optionally return true when the resource or task is disposed.
default boolean isDisposed() {
return false;
}
// ...
}
If we subscribe to the following Mono
this way:
Mono.just(1).subscribe();
The element it wraps will be published. However, it is not useful since we have no proof that this happened.
Of course, there are ways to verify this. For example, in unit tests, we can use the interface StepVerifier (we’ll cover this topic later in this course).
In a web framework like Spring WebFlux, the framework itself calls the subscribe()
method at the appropriate point in the execution flow, you don’t do it yourself. We know the code is executed because something is returned to the client or there are side-effects like saving something in a database.
But for the examples shown here, it’s better to use the version that takes a Consumer and do something with the value it receives. Like printing it:
Mono.just(1).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) {
System.out.println(integer);
}
});
Of course, we can simplify the code with a lambda expression:
Mono.just(1).subscribe(
integer -> System.out.println(integer)
);
Or with a method reference:
Mono.just(1).subscribe(System.out::println);
Other versions of subscribe
allow you to get the exception thrown, if there’s an error, by passing another Consumer
. Take the following code as an example:
private Integer getException() {
return 1/0;
}
// ...
Mono.defer(() -> Mono.just(getException()))
.subscribe(
System.out::println,
e -> System.out.println("Message: " + e.getMessage())
);
The reason I’m using Mono.defer
is because, this way, the exception will be thrown lazily, at subscription time. Otherwise, the exception will be thrown eagerly, when the Mono
is created, without having a chance to subscribe to it.
You can also execute an action when all elements have been published (notice that the arguments are optional):
Mono.just(1)
.subscribe(
null,
null,
() -> System.out.println("The end")
);
However, something to be aware of is that the version that takes an argument of type Consumer<? super Subscription> is deprecated for Flux
and it is removed in Reactor 3.5. This version was useful because, with the Subscription
object, you can modify the amount of data the publisher sends (backpressure). Here’s an example:
Flux.just(1, 2, 3)
.subscribe( // It can be marked as deprecated or give an error
System.out::println,
null,
null,
s -> s.request(1)
);
A good alternative for this is the more verbose subscribe(Subscriber)
version. This would be the equivalent of the previous example:
Flux.just(1, 2, 3)
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription s) {
s.request(1L);
}
@Override
protected void hookOnNext(Integer value) {
System.out.println(value);
}
});
The BaseSubscriber abstract class gives you more control by overriding some hooks (some are optional).
Like this optional hook, executed after any of the termination events (like SignalType.ON_ERROR
, SignalType.ON_COMPLETE
, SignalType.CANCEL
, etc.):
protected void hookFinally(SignalType type)
There’s another optional hook executed when the subscription is cancelled by calling the Subscriber
’s cancel()
method:
protected void hookOnCancel()
An optional hook for processing something on completion:
protected void hookOnComplete()
Or this optional hook for error processing:
protected void hookOnError(Throwable throwable)
There’s also a hook for processing of onNext
values:
protected void hookOnNext(T value)
And this hook, for further processing onSubscribe
’s Subscription
:
protected void hookOnSubscribe(Subscription subscription)
All right, now you’re ready to start learning about operators.