Combining Publishers
Sometimes, you’ll want to combine the values of two or more publishers or sequences. For this, there are three operators (and some of their variations) that you must learn:
merge
As the name implies, this operator will merge two or more publishers into a Flux
with the elements interleaved.
Here’s the marble diagram for this operator:
merge
eagerly subscribes to the streams it receives as parameters, that’s why the order of the output elements is not guaranteed.
It’s only available for Flux
and it takes a variable number of arguments of type Publisher
:
static Flux<I> merge(Publisher<? extends I>... sources)
Here’s an example of how to use it:
Flux<Integer> flux1 = Flux.just(1, 2, 3);
Flux<Integer> flux2 = Flux.just(4, 5, 6);
Flux<Integer> mergedFlux =
Flux.merge(flux1, flux2);
mergedFlux.subscribe(System.out::println);
Notice that merge
is a static method. If you want to use it as an instance method, it won’t work properly. In this case, you’ll have to use mergeWith
:
Flux<T> mergeWith(Publisher<? extends T> other)
Here’s the previous example using mergeWith
:
Flux<Integer> flux1 = Flux.just(1, 2, 3);
Flux<Integer> flux2 = Flux.just(4, 5, 6);
Flux<Integer> mergedFlux = flux1.mergeWith(flux2);
mergedFlux.subscribe(System.out::println);
This is the result for both examples:
1
2
3
4
5
6
But remember, like flatMap
, these operators don’t guarantee the order of the output elements.
mergeWith
is also available for Mono
:
Flux<T> mergeWith(Publisher<? extends T> other)
As you can see, it accepts a Publisher
and returns a Flux
, so it’s used just like the Flux
version.
Back to merge
, there are versions that take a concurrency
and a prefetch
argument:
static <I> Flux<I> merge(
int prefetch,
Publisher<? extends I>... sources
)
static <T> Flux<T> merge(
Publisher<? extends Publisher<? extends T>> source,
int concurrency
)
static <T> Flux<T> merge(
Publisher<? extends Publisher<? extends T>> source,
int concurrency,
int prefetch
)
Remember:
concurrency
indicates the maximum number of inner sources the operator subscribes to at the same time.prefetch
indicates the inner source request size.
There’s also a version that takes elements from an Iterable
:
static <I> Flux<I> merge(
Iterable<? extends Publisher<? extends I>> sources
)
And even one that takes elements from a Publisher
of Publisher
:
static <T> Flux<T> merge(
Publisher<? extends Publisher<? extends T>> source
)
Then we have mergeComparing
, which orders the sequence by picking the smallest values from each source either as defined by their natural order or as defined by the provided Comparator
:
// Generic type "I" must be a Comparable type that has a natural order
static <I extends Comparable<? super I>> Flux<I>
mergeComparing(
Publisher<? extends I>... sources
)
static <T> Flux<T> mergeComparing(
Comparator<? super T> comparator,
Publisher<? extends T>... sources
)
static <T> Flux<T> mergeComparing(
int prefetch,
Comparator<? super T> comparator,
Publisher<? extends T>... sources
)
However, this operator doesn’t order all the elements of all the sequences. From the documentation:
Merge data from provided Publisher sequences into an ordered merged sequence, by picking the smallest values from each source (as defined by their natural order). This is not a sort(), as it doesn’t consider the whole of each sequences. Instead, this operator considers only one value from each source and picks the smallest of all these values, then replenishes the slot for that picked source.
Here’s an example:
Flux<Integer> flux1 = Flux.just(48, 45, 9);
Flux<Integer> flux2 = Flux.just(26, 58, 2);
Flux<Integer> mergedFlux =
Flux.mergeComparing(flux1, flux2);
mergedFlux.subscribe(System.out::println);
And this is the result when you run it:
26
48
45
9
58
2
Let me explain what mergeComparing
is doing here:
- First, it compares
48
and26
. The smallest value is26
, so this one is added to the outputFlux
. - Next, it compares
48
and58
. Since48
is the smallest value, this one is added to the output. - Next, it compares
45
and58
. The value45
is added to the output. - Next, it compares
9
and58
. The value9
is added to the output. - Finally, the only elements left are
58
and2
. Since both are fromflux2
, they are added to the output without being compared.
So this may not be what you need if you want an ordered output.
There’s also a mergeSequential
operator, which subscribes eagerly to the sequences provided, but unlike merge
, it merges into the final sequence the emitted values in subscription order.
Here’s an example:
Flux<Integer> flux1 = Flux.just(10, 20, 30);
Flux<Integer> flux2 = Flux.just(40, 50, 60);
Flux<Integer> mergedFlux = Flux.mergeSequential(flux1, flux2);
mergedFlux.subscribe(System.out::println);
That will print the elements in subscription order:
10
20
30
40
50
60
mergeSequential
also has versions that get all the source sequences from a Publisher
or an Iterator
:
static <T> Flux<T> mergeSequential(
Publisher<? extends Publisher<? extends T>> sources
)
static <I> Flux<I> mergeSequential(
Iterable<? extends Publisher<? extends I>> sources
)
That take a concurrency
and a prefetch
argument:
static <I> Flux<I> mergeSequential(
int prefetch, Publisher<? extends I>... sources
)
static <T> Flux<T> mergeSequential(
Publisher<? extends Publisher<? extends T>> sources,
int maxConcurrency,
int prefetch
)
static <I> Flux<I> mergeSequential(
Iterable<? extends Publisher<? extends I>> sources,
int maxConcurrency,
int prefetch
)
And to delay errors:
static <I> Flux<I> mergeSequentialDelayError(
int prefetch,
Publisher<? extends I>... sources
)
static <T> Flux<T> mergeSequentialDelayError(
Publisher<? extends Publisher<? extends T>> sources,
int maxConcurrency,
int prefetch
)
static <I> Flux<I> mergeSequentialDelayError(
Iterable<? extends Publisher<? extends I>> sources,
int maxConcurrency,
int prefetch
)
concat
As the name implies, this operator concatenates all the sequences passed as arguments. It does this sequentially:
- First, it subscribes to the first sequence.
- Then, it waits for the sequence to complete before subscribing to the next.
- And it repeats these steps until the last sequence completes.
concat
is only available for Flux
:
static <T> Flux<T> concat(Publisher<? extends T>... sources)
Take a look at its marble diagram:
Here’s an example of this operator:
Flux<Integer> flux1 = Flux.just(10, 20, 30);
Flux<Integer> flux2 = Flux.just(40, 50, 60);
Flux<Integer> concatFlux = Flux.concat(flux1, flux2);
concatFlux.subscribe(System.out::println);
This is the result:
10
20
30
40
50
60
But we can also use the operator concatWith
(just for Flux
) to do the same:
Flux<T> concatWith(Publisher<? extends T> other)
The difference, just like with mergeWith
, is that this operator is used as an instance method:
Flux<Integer> flux1 = Flux.just(10, 20, 30);
Flux<Integer> flux2 = Flux.just(40, 50, 60);
Flux<Integer> concatFlux = flux1.concatWith(flux2);
concatFlux.subscribe(System.out::println);
The result is basically the same as the result of the mergeSequential
operator.
That’s because mergeSequential
and concat
do the same thing but in a slightly different way. mergeSequential
subscribes to the source sequences eagerly, while concat
do it sequentially.
However, this means that if any error occurs during the execution of concat
, it will interrupt the sequence immediately, producing a different output than mergeSequential
.
You can compare the relationship between mergeSequential
and concat
with the relationship between flatMapSequential
and concatMap
.
concat
also has versions that get all the source sequences from a Publisher
or an Iterator
:
static <T> Flux<T> concat(Publisher<? extends Publisher<? extends T>> sources)
static <T> Flux<T> concat(Iterable<? extends Publisher<? extends T>> sources)
Take a prefetch
argument:
static <T> Flux<T> concat(
Publisher<? extends Publisher<? extends T>> sources,
int prefetch
)
And to delay errors:
static <T> Flux<T> concatDelayError(
Publisher<? extends T>... sources
)
static <T> Flux<T> concatDelayError(
Publisher<? extends Publisher<? extends T>> sources
)
static <T> Flux<T> concatDelayError(
Publisher<? extends Publisher<? extends T>> sources,
int prefetch
)
static <T> Flux<T> concatDelayError(
Publisher<? extends Publisher<? extends T>> sources,
boolean delayUntilEnd,
int prefetch
)
zip
This operator waits for two or more sources to emit an element to combine these elements into an object that acts as a container, a tuple.
A tuple is an immutable data structure that groups non-null values. In Reactor, we have tuples that can group from two (Tuple2<T1, T2>) to eight values (Tuple8<T1, T2, T3, T4, T5, T6, T7, T8>).
Taking Tuple2<T1,T2>
as example:
T1
is the type of the first non-null value held by the tuple.T2
is the type of the second non-null value held by the tuple.
If you take a look at the javadoc for Tuple2<T1,T2>
, you’ll find methods to get the values the tuple holds:
// Get the object at the given index
Object get(int index)
// Get the first object of the tuple
T1 getT1()
// Get the second object of the tuple
T2 getT2()
// Return an immutable Iterator<Object>
// for the content of the tuple
Iterator<Object> iterator()
// Return the number of elements in the tuple
int size()
To map a function to either the first or the second value:
// Map the T1 value of the tuple into a different
// value and type, keeping the T2 value intact
Tuple2<R,T2> mapT1(Function<T1,R> mapper)
// Map the T2 value of the tuple into a different
// value and type,keeping the T1 value intact
Tuple2<T1,R> mapT2(Function<T2,R> mapper)
And to turn the tuple into an array or list:
Object[] toArray()
List<Object> toList()
The zip
operator is available for Mono
and Flux
.
Take a look at its marble diagram for Mono
:
However, this operator and has a lot of versions. We’ll review the most important ones, starting with the version for Tuple2
:
// For Mono
static <T1,T2> Mono<Tuple2<T1,T2>> zip(
Mono<? extends T1> p1,
Mono<? extends T2> p2
)
// For Flux
static <T1,T2> Flux<Tuple2<T1,T2>> zip(
Publisher<? extends T1> source1,
Publisher<? extends T2> source2
)
This version works the same for Mono
and Flux
. So, assuming we have two sequences of type Flux<Integer>
, the zip
operator will return a Flux
of type Flux<Tuple2<Integer, Integer>>
:
Flux<Integer> flux1 = Flux.just(1, 2, 3);
Flux<Integer> flux2 = Flux.just(4, 5, 6);
Flux<Tuple2<Integer, Integer>> zippedFlux =
Flux.zip(flux1, flux2);
zippedFlux.subscribe(System.out::println);
Here’s the output for the above example, which shows the string representation of Tuple2<Integer, Integer>
:
[1,4]
[2,5]
[3,6]
Now, you might be wondering what happens if the sequences have a different number of elements, for example:
Flux<Integer> flux1 = Flux.just(1, 2, 3);
Flux<Integer> flux2 = Flux.just(4, 5);
Flux<Tuple2<Integer, Integer>> zippedFlux =
Flux.zip(flux1, flux2);
zippedFlux.subscribe(System.out::println);
Well, zip
will combine the elements into a Tuple2
until any of the sequences completes. Here’s the result of the above example:
[1,4]
[2,5]
Another useful version is one that takes a BiFunction
that specifies how the elements should be combined:
// For Mono
static <T1,T2,O> Mono<O> zip(
Mono<? extends T1> p1,
Mono<? extends T2> p2,
BiFunction<? super T1,? super T2,? extends O> combinator
)
// For Flux
static <T1,T2,O> Flux<O> zip(
Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
BiFunction<? super T1,? super T2,? extends O> combinator
)
Remember that BiFunction
is a functional interface that defines a method that takes two arguments of type T
and U
to produce a result of type R
:
public interface BiFunction<T, U, R> {
R apply(T var1, U var2);
}
Here’s an example:
Flux<Integer> flux1 = Flux.just(1, 2, 3);
Flux<Integer> flux2 = Flux.just(4, 5, 6);
Flux<Integer> zippedFlux =
Flux.zip(flux1, flux2, (i1, i2) -> i1 + i2);
zippedFlux.subscribe(System.out::println);
And this is the result:
5
7
9
Notice that this version of zip
doesn’t produce a tuple, it combines the elements of both sequences into one according to the provided function. In the above example, 1 + 4
, 2 + 5
, and 3 + 6
.
Then we have the versions of zip
that take from three to eight sequences to produce tuples from three to eight elements.
Here are some examples for Mono
:
// For Mono
static <T1,T2,T3>
Mono<Tuple3<T1,T2,T3>> zip(
Mono<? extends T1> p1,
Mono<? extends T2> p2,
Mono<? extends T3> p3
)
static <T1,T2,T3,T4,T5>
Mono<Tuple5<T1,T2,T3,T4,T5>> zip(
Mono<? extends T1> p1,
Mono<? extends T2> p2,
Mono<? extends T3> p3,
Mono<? extends T4> p4,
Mono<? extends T5> p5
)
static <T1,T2,T3,T4,T5,T6,T7>
Mono<Tuple7<T1,T2,T3,T4,T5,T6,T7>> zip(
Mono<? extends T1> p1,
Mono<? extends T2> p2,
Mono<? extends T3> p3,
Mono<? extends T4> p4,
Mono<? extends T5> p5,
Mono<? extends T6> p6,
Mono<? extends T7> p7
)
Some for Flux
:
static <T1,T2,T3,T4>
Flux<Tuple4<T1,T2,T3,T4>> zip(
Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4
)
static <T1,T2,T3,T4,T5,T6>
Flux<Tuple6<T1,T2,T3,T4,T5,T6>> zip(
Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4,
Publisher<? extends T5> source5,
Publisher<? extends T6> source6
)
static <T1,T2,T3,T4,T5,T6,T7,T8>
Flux<Tuple8<T1,T2,T3,T4,T5,T6,T7,T8>> zip(
Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
Publisher<? extends T3> source3,
Publisher<? extends T4> source4,
Publisher<? extends T5> source5,
Publisher<? extends T6> source6,
Publisher<? extends T7> source7,
Publisher<? extends T8> source8
)
And here’s an example for Tuple4
:
Flux<Integer> flux1 = Flux.just(1, 2, 3);
Flux<Integer> flux2 = Flux.just(4, 5, 6);
Flux<Integer> flux3 = Flux.just(7, 8, 9);
Flux<Integer> flux4 = Flux.just(10, 11, 12);
Flux<
Tuple4<
Integer,
Integer,
Integer,
Integer
>
> zippedFlux =
Flux.zip(flux1, flux2, flux3, flux4);
zippedFlux.subscribe(System.out::println);
In this case, each tuple contains four elements:
[1,4,7,10]
[2,5,8,11]
[3,6,9,12]
There’s also a version of zip
that takes a variable number of sequences and uses a combination function that receive the elements as an array of objects:
// For Mono
static <R> Mono<R> zip(
Function<? super Object[],? extends R> combinator,
Mono<?>... monos
)
// For Flux
static <I,O> Flux<O> zip(
Function<? super Object[],? extends O> combinator,
Publisher<? extends I>... sources
)
// This version also takes a prefetch argument
// to specify each source request size
static <I,O> Flux<O> zip(
Function<? super Object[],? extends O> combinator,
int prefetch,
Publisher<? extends I>... sources
)
Here’s an example that adds up the pairs of element from the sources passed as arguments:
Flux<Integer> flux1 = Flux.just(1, 2, 3);
Flux<Integer> flux2 = Flux.just(4, 5, 6);
Flux<Integer> zippedFlux = Flux.zip(
(Object[] elements) ->
// Turn the array into a stream
Arrays.stream(elements)
// Cast an element to create an IntStream
.mapToInt(e -> (Integer)e)
// Add up all the elements in the stream
.sum()
,flux1, flux2);
zippedFlux.subscribe(System.out::println);
This is the result:
5
7
9
However, there are versions that can also receive the sequences from an Iterable
or as a Publisher
of Publisher
objects (the latter option only in the case of Flux
):
// For Mono
static <R> Mono<R> zip(
Iterable<? extends Mono<?>> monos,
Function<? super Object[],? extends R> combinator
)
// For Flux
static <O> Flux<O> zip(
Iterable<? extends Publisher<?>> sources,
Function<? super Object[],? extends O> combinator
)
static <O> Flux<O> zip(
Iterable<? extends Publisher<?>> sources,
int prefetch,
Function<? super Object[],? extends O> combinator
)
static <TUPLE extends Tuple2,V> Flux<V> zip(
Publisher<? extends Publisher<?>> sources,
Function<? super TUPLE,? extends V> combinator
)
Notice there’s a version that takes a function that receives as argument a tuple instead of receiving an array of objects. Since all tuple classes from three to eight elements extend from Tuple2
, the function can receive any tuple it requires.
For example, with a Flux
of three Flux
sequences, the function will receive an argument of type Tuple3
. However, it won’t always be necessary to cast the argument:
Flux<Flux<Integer>> fluxOfFlux = Flux.just(
Flux.just(1, 2, 3),
Flux.just(4, 5, 6),
Flux.just(7, 8, 9)
);
Flux<Integer> zippedFlux = Flux.zip(
fluxOfFlux,
(Tuple2 tuple) -> {
int total = 0;
for (int i = 0; i < tuple.size(); i++)
total += (Integer)tuple.get(i);
return total;
}
);
zippedFlux.subscribe(System.out::println);
This will be the result:
12
15
18
Finally, we have a zipWith
operator that can be used as an instance method:
Flux<Integer> flux1 = Flux.just(1, 2, 3);
Flux<Integer> flux2 = Flux.just(4, 5, 6);
Flux<Tuple2<Integer, Integer>> zippedFlux =
flux1.zip(flux2);
zippedFlux.subscribe(System.out::println);
This zipWith
operator offers fewer versions than zip
. Versions that take a BiFunction
to combine the elements, a prefetch
argument, and versions that take the elements to combine from an Iterable
(the last two for Flux
):
// For Mono:
Mono<O> zipWith(
Mono<? extends T2> other,
BiFunction<? super T,? super T2,? extends O> combinator
)
// For Flux:
Flux<V> zipWith(
Publisher<? extends T2> source2,
BiFunction<? super T,? super T2,? extends V> combinator
)
Flux<Tuple2<T,T2>> zipWith(
Publisher<? extends T2> source2,
int prefetch
)
Flux<V> zipWith(
Publisher<? extends T2> source2,
int prefetch,
BiFunction<? super T,? super T2,? extends V> combinator
)
Flux<Tuple2<T,T2>> zipWithIterable(
Iterable<? extends T2> iterable
)
Flux<V> zipWithIterable(
Iterable<? extends T2> iterable,
BiFunction<? super T,? super T2,? extends V> zipper
)