More Operators For Transforming Sequences
In the last module, we reviewed map
and flatMap
, but there are more operators for transforming sequences.
In this lesson, we’re going to review:
cast
This operator is very simple but can be useful sometimes. It allows us to cast the type of the elements of a Publisher
to an assignable type according to the isInstance
operator. It works like the map
operator, but the only transformation it allows is casting the type.
Here’s the definition for Mono
and Flux
:
// For Mono
Mono<E> cast(Class<E> clazz)
// For Flux
Flux<E> cast(Class<E> clazz)
It works the same for both.
Take a look at the marble diagram for Mono
:
Here’s an example for Flux
that converts all the elements of a Flux<Number>
to a Flux<Long>
:
Flux<Number> fluxNumber = Flux.just(1L, 2L, 3L);
Flux<Long> fluxLong = fluxNumber.cast(Long.class);
fluxLong.subscribe(System.out::println);
In this example, we’re explicitly working with long
values (notice the L
). If we were casting to a Flux<Integer>
:
Flux<Number> fluxString = Flux.just(1L, 2L, 3L);
Flux<Integer> fluxInteger = fluxString.cast(Integer.class);
fluxInteger.subscribe(System.out::println);
The program will throw a ClassCastException
:
[ERROR] (main) Operator called default onErrorDropped - reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.ClassCastException: Cannot cast java.lang.Long to java.lang.Integer
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.ClassCastException: Cannot cast java.lang.Long to java.lang.Integer
Caused by: java.lang.ClassCastException: Cannot cast java.lang.Long to java.lang.Integer
at java.base/java.lang.Class.cast(Class.java:3889)
As you can see, under the hood, this operator uses the method Class.cast()
, which internally uses the isInstance
operator, so the above code will also throw an exception if we try to cast Flux<Number>
to Flux<String>
, for example.
concatMap
In the previous module, we talked about how flatMap
cannot guarantee that the order of the output elements will be preserved.
If you run the following example:
Flux<Integer> fluxInt = Flux.just(1, 2, 3, 4);
Flux<Integer> fluxFlat =
fluxInt
.flatMap(i -> transformAsyncPublisherDelay(i));
fluxFlat.subscribe(System.out::println);
// ...
public Publisher<Integer> transformAsyncPublisherDelay(int i) {
return i % 2 == 0
? Flux.just(i, i+1)
: Mono.just(i * 10);
}
This will be the result:
10
30
2
3
4
5
But if you use flatMapSequential
:
Flux<Integer> fluxInt = Flux.just(1, 2, 3, 4);
Flux<Integer> fluxFlat =
fluxInt
.flatMapSequential(i -> transformAsyncPublisherDelay(i));
fluxFlat.subscribe(System.out::println);
The order will be preserved:
10
2
3
30
4
5
concatMap
is similar to flatMapSequential
. It’s only available for Flux
:
Flux<V> concatMap(
Function<? super T,? extends Publisher<? extends V>> mapper
)
Here’s an example:
Flux<Integer> fluxInt = Flux.just(1, 2, 3, 4);
Flux<Integer> fluxConcat =
fluxInt
.concatMap(i -> transformAsyncPublisherDelay(i));
fluxConcat.subscribe(System.out::println);
This is the result:
10
2
3
30
4
5
As you can see, it also preserves the order of the elements.
However, under the hood, flatMapSequential
and concatMap
work differently:
flatMapSequential
subscribes to its inner publishers eagerly (likeflatMap
), queueing faster publishers until earlier publishers complete to maintain the order.concatMap
waits for one inner publisher to complete before subscribing to the next one to maintain the order. Hence the name, it concatenates the inner publishers from each source element sequentially.
Take a look at the marble diagram of concatMap
:
In practical terms, the result is the same, but be careful. In case of an error, for example, with concatMap
the concatenation will stop at that point, possibly resulting in a different output if you had used flatMapSequential
. So always take into account how these operators perform the transformation.
Like flapMap
, concatMap
also has versions that take a prefetch
argument, one that delays errors, and others to transform the elements into Iterable
objects:
Flux<V> concatMap(
Function<? super T,? extends Publisher<? extends V>> mapper,
int prefetch
)
Flux<V> concatMapDelayError(
Function<? super T,? extends Publisher<? extends V>> mapper
)
Flux<V> concatMapDelayError(
Function<? super T,? extends Publisher<? extends V>> mapper,
boolean delayUntilEnd,
int prefetch
)
Flux<V> concatMapDelayError(
Function<? super T,? extends Publisher<? extends V>> mapper,
int prefetch
)
Flux<R> concatMapIterable(
Function<? super T,? extends Iterable<? extends R>> mapper
)
Flux<R> concatMapIterable(
Function<? super T,? extends Iterable<? extends R>> mapper,
int prefetch
)
For a more detailed example, take a look at this Stack Overflow answer.
switchMap
In addition to concatMap
, there’s another operator that works like flatMapSequential
, switchMap
.
Take a look at the marble diagram of switchMap
:
- When the source
Flux
emits the first circle (dark gray),switchMap
subscribes to the inner publisher for that circle. - When the source
Flux
emits the next circle (light gray),switchMap
cancels the subscription to the publisher of the first circle (dark gray) and subscribes to the publisher for the next one (light gray). This means that no further elements from the first publisher will be included in the output. - Step two is repeated for the next elements of the
Flux
. This means that not all the transformed elements will be included in the output, most likely, only the ones produced for the last elements of the sourceFlux
.
Here’s the definition of switchMap
for Flux
:
Flux<V> switchMap(
Function<? super T,Publisher<? extends V>> fn
)
It’s used just like flapMap
and similar operators:
Flux<Integer> fluxInt = Flux.just(1, 2, 3, 4);
Flux<Integer> fluxSwitch = fluxInt
.switchMap(i -> transformAsyncPublisherDelay(i));
fluxSwitch.subscribe(System.out::println);
However, for the above example, this will be the result:
10
30
4
5
As you can see, the elements 2
and 3
were not emitted.
This operator is used when you’re interested only in the last results. For example, when you’re programming an autocomplete widget, you’re only interested in the last letters (or the complete word) typed by the user. If the user inputs Reactor, there’s no need to show the results for R or Re after the user types the complete word.
If you want to know more, this article compares all the *Map
operators we’ve talked about and reviews in more detail switchMap
.