Aggregating a Flux
Do you remember the map-filter-reduce pattern/model?
Well, we have talked about the map
, filter
, and other related operators. Now it’s time to talk about reduce
.
Reduce is an aggregation operation. An aggregation operation groups a collection of values to compute a single value.
In this lesson, we’ll review the following aggregation operators:
These operators are only available for Flux
. Think about it, do they make sense for Mono
?
reduce
As the name implies, this operator reduces the values of a Flux
into a single object.
There are three versions of reduce
:
Mono<A> reduce(
A initial,
BiFunction<A,? super T,A> accumulator
)
Mono<T> reduce(
BiFunction<T,T,T> aggregator
)
Mono<A> reduceWith(
Supplier<A> initial,
BiFunction<A,? super T,A> accumulator
)
The first version takes the initial value that will be passed to the BiFunction
, the function in charge of computing the single value, along with the first value of the Flux
. This will return an intermediary value that will be passed along to the BiFunction
with the next value of the Flux
and so on.
In any case, notice two things:
- The value returned by the
reduce
operator is wrapped in aMono
. - The type of the initial value (for the versions that take an initial value) is represented by
A
. The returnedMono
also has a typeA
, which means that the reduce function can convert the elements from typeT
to typeA
.
For instance, this is the marble diagram for the version that provides an initial value:
Here’s an example where the reducing function adds all the elements of a Flux
of integers:
Flux<Integer> integerFlux =
Flux.just(1, 2, 3);
integerFlux
.reduce(10, (a, b) -> {
int result = a + b;
System.out.format(
"[%d + %d] = %d\n", a, b, result
);
return result;
})
.subscribe(System.out::println);
}
Of course, the reducing function could be as simple as (a, b) -> a + b
, but in the example, I’ve added a statement to print the values of the arguments so you can see all the operations performed by the function.
Here’s the result:
[10 + 1] = 11
[11 + 2] = 13
[13 + 3] = 16
16
Our initial value is 10
, so the first operation is 10 + 1
. The result, 11
, is carried over to the next iteration.
This way, the function receives 11
and 2
, returning 13
as the result.
Finally, the function receives 13
and 3
, which gives us the final result, 16
.
To try the second version of the reduce
operator, we just need to remove the first argument:
Flux<Integer> integerFlux =
Flux.just(1, 2, 3);
integerFlux
.reduce((a, b) -> {
int result = a + b;
System.out.format(
"[%d + %d] = %d\n", a, b, result
);
return result;
})
.subscribe(System.out::println);
}
Since we don’t have an initial value to work with, the first two elements of the Flux
will be passed to the function.
Here’s the result:
[1 + 2] = 3
[3 + 3] = 6
6
Finally, for the third version, reduceWith
, we can wrap the initial value in a Supplier, which will be called on subscription and passed to the the BiFunction
.
Here’s the first example modified to use reduceWith
:
Flux<Integer> integerFlux =
Flux.just(1, 2, 3);
integerFlux
.reduceWith(() -> 10, (a, b) -> {
int result = a + b;
System.out.format(
"[%d + %d] = %d\n", a, b, result
);
return result;
})
.subscribe(System.out::println);
The result will be the same:
[10 + 1] = 11
[11 + 2] = 13
[13 + 3] = 16
16
scan
This operator works the same as reduce
. Actually, all versions of scan
are identical to the versions of reduce
:
Flux<A> scan(
A initial,
BiFunction<A,? super T,A> accumulator
)
Flux<T> scan(
BiFunction<T,T,T> accumulator
)
Flux<A> scanWith(
Supplier<A> initial,
BiFunction<A,? super T,A> accumulator
)
The only difference is that scan
emits each intermediary value.
For instance, this is the marble diagram for the version that provides an initial value:
Here’s an example for the version that takes an initial value:
Flux<Integer> integerFlux =
Flux.just(1, 2, 3);
integerFlux
.scan(10, (a, b) -> a + b)
.subscribe(System.out::println);
This is the result:
10
11
13
16
Here’s the example for the second version:
Flux<Integer> integerFlux =
Flux.just(1, 2, 3);
integerFlux
.scan((a, b) -> a + b)
.subscribe(System.out::println);
This is the result:
1
3
6
And for scanWith
:
Flux<Integer> integerFlux =
Flux.just(1, 2, 3);
integerFlux
.scanWith(() -> 10, (a, b) -> a + b)
.subscribe(System.out::println);
This is the result:
10
11
13
16
collect
collect*
operators allow you to collect all the elements of a Flux
in a container, which can be a List, a Map, or another one implementing the Collector<T,A,R> interface.
Here are some examples of the available collect*
operators.
This one collects all elements of the Flux
into a container using the Collector
Java 8 Stream API:
Mono<R> collect(
Collector<? super T,A,? extends R> collector
)
This one collects all elements of the Flux
into a List
:
Mono<List<T>> collectList()
The following collects all elements of the Flux
into a Map
. The key is extracted from each element by applying the keyExtractor
function, the value is extracted by the valueExtractor
function, and if several elements map to the same key, the associated value will be the most recent element:
Mono<Map<K,V>> collectMap(
Function<? super T,? extends K> keyExtractor,
Function<? super T,? extends V> valueExtractor
)
The following collects all elements of the Flux
into a Map
of key-List
pairs. The key is extracted from each element by applying the keyExtractor
function, the value is extracted by the valueExtractor
function, and every element mapping to the same key is stored in the List
associated to that key:
Mono<Map<K,Collection<V>>> collectMultimap(
Function<? super T,? extends K> keyExtractor,
Function<? super T,? extends V> valueExtractor
)
And this one collects all elements of the Flux
until it completes, and then sort them using the provided Comparator
into a List
:
Mono<List<T>> collectSortedList(
@Nullable Comparator<? super T> comparator
)
All these operators generate an empty container if the source Flux
is empty.
For instance, this is the marble diagram for the version collect(Collector<? super T,A,? extends R> collector)
:
Let’s review a few examples, so I can show you how to use collectMap
, collectMultimap
, and collectSortedList
.
For collectMap
, the next example uses the version that takes a key and a value extractor:
Flux<Integer> integerFlux =
Flux.just(11, 22, 33, 34);
Mono<Map<Integer, Integer>> monoMap =
integerFlux.collectMap(
i -> i / 10,
i -> i % 10
);
monoMap.subscribe(System.out::println);
As the key, monoMap
uses the tens part of each element. As the value, monoMap
uses the remainder when we divide the value by ten.
This is the result:
{1=1, 2=2, 3=4}
Notice that the third element (33
) was replaced by the last one (34
). Remember, if many elements map to the same key, the associated value will be the most recently emitted element.
If we don’t want this behavior, we have to use collectMultimap
:
Flux<Integer> integerFlux =
Flux.just(11, 22, 33, 34);
Mono<Map<Integer, Collection<Integer>>> monoMap =
integerFlux.collectMultimap(
i -> i / 10,
i -> i % 10
);
monoMap.subscribe(System.out::println);
In this case, the map contains a collection to preserve all the elements that map to the same key:
{1=[1], 2=[2], 3=[3, 4]}
We can use collectSortedList
to sort the elements of the Flux
.
The following example sorts the elements in reverse order:
Flux<Integer> integerFlux =
Flux.just(1, 2, 3);
Mono<List<Integer>> monoSortedList =
integerFlux.collectSortedList(
Comparator.reverseOrder()
);
monoSortedList.subscribe(System.out::println);
This is the result:
[3, 2, 1]
Finally, notice that all these operators return the container inside of a Mono
, meaning that, even though List
or Map
are not asynchronous types, the collection process is done asynchronously.
hasElement(s)/all/any
These operators can be considered aggregation operations because they reduce the value of a sequence to a boolean
value.
Here are the definitions of these operators.
For Mono
, hasElement
emits a single boolean
value that indicates if the Mono
has an element:
Mono<Boolean> hasElement()
It’s the only operator in this lesson that is available for Mono
.
For Flux
, we have a hasElement
operator that emits a single boolean
value that indicates if any of the elements of the Flux
is equal to the provided value:
Mono<Boolean> hasElement(T value)
And a hasElements
operator that emits a single boolean
value that indicates if the Flux
has, at least, one element:
Mono<Boolean> hasElements()
As well as an all
operator that emits a single boolean
value if all values of the Flux
match the predicate:
Mono<Boolean> all(Predicate<? super T> predicate)
And an any
operator that emits a single boolean
value if any of the values of the Flux
match the predicate:
Mono<Boolean> any(Predicate<? super T> predicate)
Notice that all these operators return the boolean
value wrapped in a Mono
.
This is the marble diagram of this operator (for Mono
):
Here’s an example that prints false
:
Mono<Integer> integerMono =
Mono.just(1);
integerMono
.filter(i -> i > 1)
.hasElement()
.subscribe(System.out::println);
The Flux
equivalent takes the value to test if any of the elements is equal to it:
Flux<Integer> integerFlux =
Flux.just(1, 2, 3);
integerFlux
.filter(i -> i > 2)
.hasElement(3)
.subscribe(System.out::println);
The above example only filters out the first two elements so hasElement(3)
returns true
.
On the other hand, hasElements
(only available for Flux
), returns true
if the Flux
has, at least, one element, like in the following example:
Flux<Integer> integerFlux =
Flux.just(1, 2, 3);
integerFlux
.filter(i -> i > 2)
.hasElements()
.subscribe(System.out::println);
Now, if you want to test, for example, if all the elements of a Flux
are even, you can use the all
operator:
Flux<Integer> integerFlux =
Flux.just(1, 2, 3);
integerFlux
.filter(i -> i > 2)
.hasElements()
.subscribe(System.out::println);
In this case, the result is false
.
And if you want to test, for example, if at least one value is even, use the any
operator:
Flux<Integer> integerFlux =
Flux.just(1, 2, 3);
integerFlux
.any(i -> i % 2 == 0)
.subscribe(System.out::println);
The result will be true
.
count
Finally, one the easiest operators for Flux
you’ll ever find:
Mono<Long> count()
This operator counts the number of values in a Flux
.
This is the marble diagram for this operator:
Here’s an example:
Flux<Integer> integerFlux =
Flux.just(1, 2, 3, 4, 5);
Mono<Long> integerMono = integerFlux.count();
integerMono.subscribe(System.out::println);
The value is wrapped in a Mono<Long>
, however, the output shouldn’t be a surprise:
5