Skip to main content Link Menu Expand (external link) Document Search Copy Copied
Dark theme

Aggregating a Flux


Do you remember the map-filter-reduce pattern/model?

Well, we have talked about the map, filter, and other related operators. Now it’s time to talk about reduce.

Reduce is an aggregation operation. An aggregation operation groups a collection of values to compute a single value.

In this lesson, we’ll review the following aggregation operators:

These operators are only available for Flux. Think about it, do they make sense for Mono?

reduce

As the name implies, this operator reduces the values of a Flux into a single object.

There are three versions of reduce:

Mono<A> reduce(
    A initial, 
    BiFunction<A,? super T,A> accumulator
)

Mono<T> reduce(
    BiFunction<T,T,T> aggregator
)

Mono<A> reduceWith(
    Supplier<A> initial, 
    BiFunction<A,? super T,A> accumulator
)

The first version takes the initial value that will be passed to the BiFunction, the function in charge of computing the single value, along with the first value of the Flux. This will return an intermediary value that will be passed along to the BiFunction with the next value of the Flux and so on.

In any case, notice two things:

  • The value returned by the reduce operator is wrapped in a Mono.
  • The type of the initial value (for the versions that take an initial value) is represented by A. The returned Mono also has a type A, which means that the reduce function can convert the elements from type T to type A.

For instance, this is the marble diagram for the version that provides an initial value:

reduce marble diagram

Here’s an example where the reducing function adds all the elements of a Flux of integers:

Flux<Integer> integerFlux = 
    Flux.just(1, 2, 3);

integerFlux
    .reduce(10, (a, b) -> {
        int result = a + b;
        System.out.format(
            "[%d + %d] = %d\n", a, b, result
        );
        return result;
    })
    .subscribe(System.out::println);
}

Of course, the reducing function could be as simple as (a, b) -> a + b, but in the example, I’ve added a statement to print the values of the arguments so you can see all the operations performed by the function.

Here’s the result:

[10 + 1] = 11
[11 + 2] = 13
[13 + 3] = 16
16

Our initial value is 10, so the first operation is 10 + 1. The result, 11, is carried over to the next iteration.

This way, the function receives 11 and 2, returning 13 as the result.

Finally, the function receives 13 and 3, which gives us the final result, 16.

To try the second version of the reduce operator, we just need to remove the first argument:

Flux<Integer> integerFlux = 
    Flux.just(1, 2, 3);

integerFlux
    .reduce((a, b) -> {
        int result = a + b;
        System.out.format(
            "[%d + %d] = %d\n", a, b, result
        );
        return result;
    })
    .subscribe(System.out::println);
}

Since we don’t have an initial value to work with, the first two elements of the Flux will be passed to the function.

Here’s the result:

[1 + 2] = 3
[3 + 3] = 6
6

Finally, for the third version, reduceWith, we can wrap the initial value in a Supplier, which will be called on subscription and passed to the the BiFunction.

Here’s the first example modified to use reduceWith:

Flux<Integer> integerFlux = 
    Flux.just(1, 2, 3);

integerFlux
    .reduceWith(() -> 10, (a, b) -> {
        int result = a + b;
        System.out.format(
            "[%d + %d] = %d\n", a, b, result
        );
        return result;
    })
    .subscribe(System.out::println);

The result will be the same:

[10 + 1] = 11
[11 + 2] = 13
[13 + 3] = 16
16

scan

This operator works the same as reduce. Actually, all versions of scan are identical to the versions of reduce:

Flux<A> scan(
    A initial, 
    BiFunction<A,? super T,A> accumulator
)

Flux<T> scan(
    BiFunction<T,T,T> accumulator
)

Flux<A> scanWith(
    Supplier<A> initial, 
    BiFunction<A,? super T,A> accumulator
)

The only difference is that scan emits each intermediary value.

For instance, this is the marble diagram for the version that provides an initial value:

scan marble diagram

Here’s an example for the version that takes an initial value:

Flux<Integer> integerFlux = 
    Flux.just(1, 2, 3);

integerFlux
    .scan(10, (a, b) -> a + b)
    .subscribe(System.out::println);

This is the result:

10
11
13
16

Here’s the example for the second version:

Flux<Integer> integerFlux = 
    Flux.just(1, 2, 3);

integerFlux
    .scan((a, b) -> a + b)
    .subscribe(System.out::println);

This is the result:

1
3
6

And for scanWith:

Flux<Integer> integerFlux = 
    Flux.just(1, 2, 3);

integerFlux
    .scanWith(() -> 10, (a, b) -> a + b)
    .subscribe(System.out::println);

This is the result:

10
11
13
16

collect

collect* operators allow you to collect all the elements of a Flux in a container, which can be a List, a Map, or another one implementing the Collector<T,A,R> interface.

Here are some examples of the available collect* operators.

This one collects all elements of the Flux into a container using the Collector Java 8 Stream API:

Mono<R> collect(
    Collector<? super T,A,? extends R> collector
)

This one collects all elements of the Flux into a List:

Mono<List<T>> collectList()

The following collects all elements of the Flux into a Map. The key is extracted from each element by applying the keyExtractor function, the value is extracted by the valueExtractor function, and if several elements map to the same key, the associated value will be the most recent element:

Mono<Map<K,V>> collectMap(
    Function<? super T,? extends K> keyExtractor, 
    Function<? super T,? extends V> valueExtractor
)

The following collects all elements of the Flux into a Map of key-List pairs. The key is extracted from each element by applying the keyExtractor function, the value is extracted by the valueExtractor function, and every element mapping to the same key is stored in the List associated to that key:

Mono<Map<K,Collection<V>>> collectMultimap(
    Function<? super T,? extends K> keyExtractor,
    Function<? super T,? extends V> valueExtractor
)

And this one collects all elements of the Flux until it completes, and then sort them using the provided Comparator into a List:

Mono<List<T>> collectSortedList(
    @Nullable Comparator<? super T> comparator
)

All these operators generate an empty container if the source Flux is empty.

For instance, this is the marble diagram for the version collect(Collector<? super T,A,? extends R> collector):

collect marble diagram

Let’s review a few examples, so I can show you how to use collectMap, collectMultimap, and collectSortedList.

For collectMap, the next example uses the version that takes a key and a value extractor:

Flux<Integer> integerFlux = 
    Flux.just(11, 22, 33, 34);

Mono<Map<Integer, Integer>> monoMap = 
    integerFlux.collectMap(
                i -> i / 10,
                i -> i % 10
    );

monoMap.subscribe(System.out::println);

As the key, monoMap uses the tens part of each element. As the value, monoMap uses the remainder when we divide the value by ten.

This is the result:

{1=1, 2=2, 3=4}

Notice that the third element (33) was replaced by the last one (34). Remember, if many elements map to the same key, the associated value will be the most recently emitted element.

If we don’t want this behavior, we have to use collectMultimap:

Flux<Integer> integerFlux = 
    Flux.just(11, 22, 33, 34);

Mono<Map<Integer, Collection<Integer>>> monoMap =
    integerFlux.collectMultimap(
            i -> i / 10,
            i -> i % 10
    );

monoMap.subscribe(System.out::println);

In this case, the map contains a collection to preserve all the elements that map to the same key:

{1=[1], 2=[2], 3=[3, 4]}

We can use collectSortedList to sort the elements of the Flux.

The following example sorts the elements in reverse order:

Flux<Integer> integerFlux = 
    Flux.just(1, 2, 3);

Mono<List<Integer>> monoSortedList =
    integerFlux.collectSortedList(
            Comparator.reverseOrder()
    );

monoSortedList.subscribe(System.out::println);

This is the result:

[3, 2, 1]

Finally, notice that all these operators return the container inside of a Mono, meaning that, even though List or Map are not asynchronous types, the collection process is done asynchronously.

hasElement(s)/all/any

These operators can be considered aggregation operations because they reduce the value of a sequence to a boolean value.

Here are the definitions of these operators.

For Mono, hasElement emits a single boolean value that indicates if the Mono has an element:

Mono<Boolean> hasElement()

It’s the only operator in this lesson that is available for Mono.

For Flux, we have a hasElement operator that emits a single boolean value that indicates if any of the elements of the Flux is equal to the provided value:

Mono<Boolean> hasElement(T value)

And a hasElements operator that emits a single boolean value that indicates if the Flux has, at least, one element:

Mono<Boolean> hasElements()

As well as an all operator that emits a single boolean value if all values of the Flux match the predicate:

Mono<Boolean> all(Predicate<? super T> predicate)

And an any operator that emits a single boolean value if any of the values of the Flux match the predicate:

Mono<Boolean> any(Predicate<? super T> predicate)

Notice that all these operators return the boolean value wrapped in a Mono.

This is the marble diagram of this operator (for Mono):

hasElement Mono marble diagram

Here’s an example that prints false:

Mono<Integer> integerMono = 
    Mono.just(1);

integerMono
    .filter(i -> i > 1)
    .hasElement()
    .subscribe(System.out::println);

The Flux equivalent takes the value to test if any of the elements is equal to it:

Flux<Integer> integerFlux = 
    Flux.just(1, 2, 3);

integerFlux
    .filter(i -> i > 2)
    .hasElement(3)
    .subscribe(System.out::println);

The above example only filters out the first two elements so hasElement(3) returns true.

On the other hand, hasElements (only available for Flux), returns true if the Flux has, at least, one element, like in the following example:

Flux<Integer> integerFlux = 
    Flux.just(1, 2, 3);

integerFlux
    .filter(i -> i > 2)
    .hasElements()
    .subscribe(System.out::println);

Now, if you want to test, for example, if all the elements of a Flux are even, you can use the all operator:

Flux<Integer> integerFlux = 
    Flux.just(1, 2, 3);

integerFlux
    .filter(i -> i > 2)
    .hasElements()
    .subscribe(System.out::println);

In this case, the result is false.

And if you want to test, for example, if at least one value is even, use the any operator:

Flux<Integer> integerFlux = 
    Flux.just(1, 2, 3);

integerFlux
    .any(i -> i % 2 == 0)
    .subscribe(System.out::println);

The result will be true.

count

Finally, one the easiest operators for Flux you’ll ever find:

Mono<Long> count()

This operator counts the number of values in a Flux.

This is the marble diagram for this operator:

count marble diagram

Here’s an example:

Flux<Integer> integerFlux = 
    Flux.just(1, 2, 3, 4, 5);

Mono<Long> integerMono = integerFlux.count();
integerMono.subscribe(System.out::println);

The value is wrapped in a Mono<Long>, however, the output shouldn’t be a surprise:

5