Link Search Menu Expand Document
Dark theme

Creating a Sequence


In the GitHub repository of this course, go to the folder 02 to find the Maven projects for this module in the before and after directories.

All right.

Until now, we’ve been talking about streams and sequences. But I want you to think of Mono and Flux as containers of values of a certain type (T). Zero or one in the case of Mono: [INSERT IMAGE]

And from zero to N (actually, to Long.MAX_VALUE) in the case of Flux: [INSERT IMAGE]

This will help you to grasp some concepts more easily later, so it’s better to think this way since the beginning.

This way, to create these containers and put elements into them, there are many static factory methods available.

Let’s start with the method empty(), it creates an empty container:

Mono<String> emptyMono = Mono.empty();
Flux<String> emptyFlux = Flux.empty();

The above example creates an empty Mono of type String and an empty Flux of type String. If we subscribe to these publisher at this time, they will only emit a completion signal because they contain no values. However, if we ever put values into them, these values must be of type String.

We can put individual elements at the time of creation of the publisher with the method just:

Mono<Integer> integerMono = Mono.just(1);
Flux<Integer> integerFlux = Flux.just(1, 2);

Since Mono can only contain one element, at most, trying to pass more than one argument to this method will result in a compiler error:

Mono<Integer> integerMono = Mono.just(1, 2); // Compiler error

And here’s where the differences start. Look at the definition of the just() method for Mono and Flux:

// Mono
static <T> Mono<T> just(T data)

// Flux
static <T> Flux<T> just(T... data)
static <T> Flux<T> just(T data)

For Flux, the overloaded version of just() allows us to create an empty container:

Flux<Integer> integerFlux = Flux.just(); // No error

But for Mono, the just() method always expects an argument:

Mono<Integer> integerMono = Mono.just(); // Error

Of course, for clarity, it’s better to use the empty() method. However, we don’t always know from the beginning if there are elements to stream. Besides, Mono is supposed to accept zero or one element, right?

Well, for the case when you’re not sure if there’s a element, Mono has two versions of the method justOrEmpty():

static <T> Mono<T> justOrEmpty(Optional<? extends T> data)
static <T> Mono<T> justOrEmpty(T data)

They create a new Mono with the specified element if the argument is not an empty Optional or non-null. Otherwise, it creates an empty Mono:

// Creates an empty Mono
Mono<Integer> emptyMono1 = Mono.justOrEmpty(Optional.empty());
// Also creates an empty Mono
Mono<Integer> emptyMono2 = Mono.justOrEmpty(null); 

Now, you can create create Mono and Flux objects from other objects with the methods from*(). Of course, due to the difference in cardinality, Mono and Flux have different from*() methods.

These are the from*() methods for Mono:

// To create a Mono from another Publisher
static <T> Mono<T> from(Publisher<? extends T> source)
static <I> Mono<I> fromDirect(Publisher<? extends I> source)

// To create a Mono from a Callable, Runnable, or a Supplier
static <T> Mono<T> fromCallable(Callable<? extends T> supplier)
static <T> Mono<T> fromRunnable(Runnable runnable)
static <T> Mono<T> fromSupplier(Supplier<? extends T> supplier)

// To create a Mono from a CompletableFuture, eagerly or lazyly (with a Supplier)
static <T> Mono<T> fromFuture(CompletableFuture<? extends T> future)
static <T> Mono<T> fromFuture(Supplier<? extends CompletableFuture<? extends T>> futureSupplier)

// To create a Mono from a CompletionStage, eagerly or lazyly (with a Supplier)
static <T> Mono<T> fromCompletionStage(CompletionStage<? extends T> completionStage)
static <T> Mono<T> fromCompletionStage(
                     Supplier<? extends CompletionStage<? extends T>> stageSupplier)

I want to highlight three things here.

One. The difference between from and fromDirect is that the first one will cancel the publisher passed as an argument after the first element. Take example the following code:

Flux<Integer> integerFlux = Flux.just(1, 2);
Mono<Integer> mono1 = Mono.from(integerFlux);
Mono<Integer> mono2 = Mono.fromDirect(integerFlux);

Both, mono1 and mono2 will emit only the first element of integerFlux, 1. However, mono1 will cancel the integerFlux after emitting 1, while mono2 will allow integerFlux to emit 2 under the hood. This can cause unintended side effects, so only use fromDirect if you know the publisher passed as argument only emits one element.

Two. About the method fromRunnable, you may be wondering, how can I generate a value for the Mono if the Runnable interface defines a method with void as the return value?

public interface Runnable {
    public void run();
}

Well, most of the time, Mono and Flux are containers of values of a certain type. But with methods like fromRunnable, they can also contain actions. If there’s no value to publish, the return type can be Void.

This way, we can use a Runnable to modify a value (as a side effect) asynchronously:

private int myValue = 0;
// ...
Mono<Void> runnableMono = Mono.fromRunnable(new Runnable() {
    @Override
    public void run() {
        myValue++;
    }
});

Or execute some action (possibly causing a side effect too):

// Using a lambda expression to simplify the code
Mono<Void> runnableMono2 = Mono.fromRunnable(
        () -> System.out.println("Hello from Runnable!")
);

And three. When creating a Mono from a CompletableFuture, which is also an implementation of CompletionStage, we have the choice of creating the Mono either eagerly or lazily.

For example, the following code will print the string Eager because the CompletableFuture will be executed when the Mono is created:

Mono<String> futureMonoEager = Mono.fromFuture(CompletableFuture.supplyAsync(() -> {
    System.out.println("Eager");
    return "Hello from eager future!";
}));

On the other hand, executing the following code (that uses a Supplier to provide the CompletableFuture) will not print anything because the CompletableFuture will be executed until we use the Mono:

Mono<String> futureMonoLazy = Mono.fromFuture(() -> CompletableFuture.supplyAsync(() -> {
    System.out.println("Lazy");
    return "Hello from lazy future!";
}));

About Flux, since it can emit more than one element, we can create one from the most common objects that represent or can contain many elements:

// To create a Flux from another Publisher
static <T> Flux<T> from(Publisher<? extends T> source)

// To create a Flux from an array
static <T> Flux<T> fromArray(T[] array)

// To create a Flux from an Iterable
static <T> Flux<T> fromIterable(Iterable<? extends T> it)

// To create a Flux from a Stream
static <T> Flux<T> fromStream(Stream<? extends T> s)
static <T> Flux<T> fromStream(Supplier<Stream<? extends T>> streamSupplier)

Two things to highlight here.

One. Iterable is the super interface of many other interfaces and classes, probably being List the most used:

List<Integer> myList = Arrays.asList(1, 2, 3);
Flux listFlux = Flux.fromIterable(myList);

And two. The difference between the two fromStream() methods is not about eager and lazy creation like in Mono’s methods fromFuture() and fromCompletionStage(). It’s about reusing the stream from which the publisher is created.

As you probably know, a Stream will throw an exception if you try to use it after it has been operated upon or closed. For example, if we subscribe to the following Flux more than one time, an exception will be thrown:

Stream stream = Stream.of(1, 2, 3);
Flux<Integer> streamFluxUseOneTime = Flux.fromStream(stream);

And that’s because we’ll be using the same stream, which will be closed automatically by the first subscription on cancellation, error or completion.

On the other hand, wrapping the stream in a Supplier will get us a new one each time we subscribe:

Flux<Integer> streamFluxUseMultipleTimes = Flux.fromStream(() -> Stream.of(1, 2, 3));

So it doesn’t matter if the stream is closed, each subscription gets a new stream.

In any case, with both methods, the stream will be executed lazily, until we subscribe to the Flux.

Sometimes, this lazy behavior is useful. For example, when generating the elements of a Monoor Fluxis the result of an expensive operation. Or if the elements can change depending on a certain condition.

For this, we have the defer() method:

// For Mono
static <T> Mono<T> defer(Supplier<? extends Mono<? extends T>> supplier)

// For Flux
static <T> Flux<T> defer(Supplier<? extends Publisher<T>> supplier)

These methods will execute the Supplier passed as an argument (to provide a Publisher) until a subscription is made. In other words, these methods will create the Publisher lazily, using any of the methods we’ve reviewed so far:

Flux<Integer> fluxDeferred = Flux.defer( () -> Flux.just(1, 2, 3));

In addition to this, the Supplier will return a new Publisher for each subscription (like in the stream example above).

Take, for instance, the following code:

Mono<Integer> monoDeferred = Mono.defer(() -> Mono.just(getValue()));
// ...
private Integer getValue() {
    System.out.println("getValue()");
    return 1;
}

Every time you subscribe to monoDeferred, the method getValue() will be executed. But if you define the mono without defer():

Mono<Integer> monoNotDeferred = Mono.just(getValue());

The method getValue() will be executed only once, no matter how many times you subscribe (even if you never do it).

For most advanced usages, we also have the method create():

// For Mono
static <T> Mono<T> create(Consumer<MonoSink<T>> callback)
    
// For Flux
static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter)
static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, 
                          FluxSink.OverflowStrategy backpressure)

Instead of returning a Publisher with a Supplier (as with the defer() method), you use a MonoSink or a FluxSink object passed as the argument of a Consumer. These objects let you emit elements or generate signals through:

// To complete with the given value
void success(T value)

// To complete without any value
void success()
    
// To terminate with the given exception
void error(Throwable e)

In the case of MonoSink.

And:

// To emit a non-null element, generating an onNext signal
FluxSink<T> next(T t)
    
// To fail the sequence, generating an onError signal
void error(Throwable e)
    
// To terminate the sequence successfully, generating an onComplete signal
void complete()

For FluxSink.

Flux also has a version of the create() method that takes an argument to specify how to handle backpressure), but there’s also a generate() method for synchronous and one-by-one emissions:

// To programmatically create a Flux by generating signals one-by-one via a consumer callback
static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator)
    
// To programmatically create a Flux by generating signals one-by-one via a consumer callback,
// supplying an initial state value
static <T,S> Flux<T> generate(Callable<S> stateSupplier, 
                              BiFunction<S,SynchronousSink<T>,S> generator)
    
// To programmatically create a Flux by generating signals one-by-one via a consumer callback,
// supplying an initial state value, and a final cleanup callback.
static <T,S> Flux<T> generate(Callable<S> stateSupplier, 
                              BiFunction<S,SynchronousSink<T>,S> generator, 
                              Consumer<? super S> stateConsumer)

We’ll leave it at that for now. Probably, I’ll write one or more articles later to explain in more detail the methods defer and generate.

The important thing is that you understand that:

  • just() methods are the simpler methods to wrap a value into a Publisher eagerly.
  • from*() methods allow you to create a Publisher from another object, in some cases lazily.
  • defer() allows you to create a Publisher lazily, with a Supplier expression that is evaluated until you subscribe.
  • create() gives you full control to create a Publisher lazily through a MonoSink or a FluxSink object.

In any case, all the action begins when you subscribe to the Publisher.

Let’s see how to do this.