Skip to main content Link Menu Expand (external link) Document Search Copy Copied
Dark theme

Going Back to Synchronous Types


If you’re working with Mono or Flux but you need to return, at the end of a chain of operators, a synchronous type like a List or an Integer, you can use one of the block* methods that Reactor provides.

These methods subscribe to a Publisher and, as the name implies, block the execution of the program to get the result.

Block

Here are the methods for Mono.

block subscribes to the Mono and blocks indefinitely until the element is received, returning that element:

T block()

Another version subscribes to the Mono and blocks until the element is received or the timeout expires, returning the element:

T block(Duration timeout)

blockOptional subscribes to the Mono and blocks indefinitely until the element is received, or the Mono completes empty, returning either the element wrapped in an Optional or an empty Optional:

Optional<T> blockOptional()

Another version subscribes to the Mono and blocks until the element is received, the Mono completes empty, or the timeout expires, returning either the element wrapped in an Optional or an empty Optional:

Optional<T> blockOptional(Duration timeout)

And here are the methods for Flux.

blockFirst subscribes to the Flux and blocks indefinitely until the first value is sent or the Flux completes, returning the first value or null:

T blockFirst()

Another version subscribes to the Flux and blocks until the first value is sent, the Flux completes, or the timeout expires, returning the first value or null:

T blockFirst(Duration timeout)

blockLast subscribes to the Flux and blocks indefinitely until the last value is sent or the Flux completes, returning the last value or null:

T blockLast()

While another version subscribes to the Flux and blocks until the upstream signals its last value, completes or a timeout expires:

T blockLast(Duration timeout)

We have four options for each Publisher. Two of them take as an argument a timeout as a Duration object. If the timeout expires, a RuntimeException is thrown.

Here’s a simple example that returns the Integer value inside of a Mono (remember, sometimes it’s useful to think of Mono or Flux as containers):

Mono<Integer> myMono = Mono.just(1);
Integer valueMono = myMono.block();
System.out.println(valueMono);

Notice there’s no subscribe method.

This is the result:

1

For the versions that take a timeout, the following example forces a timeout expiration by delaying the publishing of elements with the delayElements operator:

Integer valueFlux = 
    Flux.just(1, 2, 3)
        .delayElements(Duration.ofSeconds(1))
        .blockLast(Duration.ofMillis(1));

In this case, this piece of code throws an exception:

java.lang.IllegalStateException: Timeout on blocking read for 1000000 NANOSECONDS

	at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:123)
	at reactor.core.publisher.Flux.blockLast(Flux.java:2669)
	at net.eherrera.reactor.m8.Test_01_BlockMethods.example_02_BlockWithDuration(Test_01_BlockMethods.java:24)
	...

These methods work in a synchronous way, not in an asynchronous way. Blocking defeats the purpose of reactive programming, so you’ll rarely have to use these methods. Besides, blocking the execution in a reactive web server that works with a limited amount of threads, can bring it down easily.

The only two uses cases I can think of for these block* methods are:

  • When you need the result of a sequence to call a third-party library that doesn’t have a reactive version.
  • In unit tests, if you’re not using a library like StepVerifier, and you need to assert the result of a sequence.

However, you must know that in the case of Flux, the following methods block the execution every time you get an element from the transformed Iterable or Stream:

// Transform this Flux into a lazy Iterable,
// blocking on Iterator.next() calls
Iterable<T> toIterable()
    
// Transform this Flux into a lazy Iterable,
// blocking on Iterator.next() calls
Iterable<T> toIterable(int batchSize)

// Transform this Flux into a lazy Iterable,
// blocking on Iterator.next() calls
Iterable<T> toIterable(
    int batchSize, 
    Supplier<Queue<T>> queueProvider
)
    
// Transform this Flux into a lazy Stream,
// blocking for each source onNext call
Stream<T> toStream()
    
// Transform this Flux into a lazy Stream,
// blocking for each source onNext call
Stream<T> toStream(int batchSize)

Of course, this shouldn’t be a surprise, since Iterable and Stream are not reactive.

On the other hand, Mono can only be transformed into a CompletableFuture:

// To transform the Mono into a CompletableFuture,
// completing on onNext or onComplete,
// and failing on onError.
CompletableFuture<T> toFuture()

The result (or null) is passed asynchronously to CompletableFuture’s complete method, but to get it, you have to call the get() method.

Also, here’s a note from the documentation to keep in mind:

All of these methods except Mono#toFuture will throw an UnsupportedOperatorException if called from within a Scheduler marked as “non-blocking only” (by default parallel() and single()).

But remember, neven block the execution.