Parallelizing work with ParallelFlux
In Reactor, you can easily parallelize work with ParallelFlux
, which publishes elements to an array of Subscribers
in parallel rails or groups.
ParallelFlux
is another implementation of the Publisher
interface but is not a subclass of Flux
, it implements its own versions of operators such as concatMap
, filter
, map
, and flatMap
, just to mention a few. You can review all the operators it implements on its javadoc page (in particular, it does not have a doFinally
life cycle hook).
Having said that, you create a ParallelFlux
from an existing Flux
using the parallel
method:
Flux.just(1, 2, 3, 4, 5)
.parallel()
.subscribe();
However, this only divides the sequence into multiple sub-sequences (the rails), but does not change the execution model.
If we add a statement to print the name of the current thread to the above example:
Flux.just(1, 2, 3, 4, 5)
.parallel()
.subscribe(i -> System.out.format(
"subscribe(%d) - %s\n",
i,
Thread.currentThread().getName())
);
This will be the result:
subscribe(1) - main
subscribe(2) - main
subscribe(3) - main
subscribe(4) - main
subscribe(5) - main
As you can see, it executes the sequence on the main (default) thread.
If we want to parallelize the work, we have to tell ParallelFlux
where to run each rail with the runOn
operator, which takes the Scheduler
where the operators downstream will be executed.
There are two versions of runOn
. The first one specifies where each rail will observe its incoming values with possible work-stealing and default prefetch amount:
ParallelFlux<T> runOn(Scheduler scheduler)
While the second one allows you to specify the prefetch amount:
ParallelFlux<T> runOn(Scheduler scheduler, int prefetch)
The prefetch
parameter specifies the number of values to request on each rail from the source. The default prefetch value is defined by Queues.SMALL_BUFFER_SIZE:
public static final int SMALL_BUFFER_SIZE =
Math.max(16,
Integer.parseInt(
System.getProperty("reactor.bufferSize.small", "256")
)
);
About the Scheduler
, the best option is to pass a parallel Scheduler
.
Here’s an example:
Flux.just(1, 2, 3, 4, 5)
.parallel()
.runOn(
Schedulers.parallel()
)
.map(i -> {
System.out.format(
"map(%d) - %s\n",
i,
Thread.currentThread().getName());
return i * 10;
})
.flatMap(i -> {
System.out.format(
"flatMap(%d) - %s\n",
i,
Thread.currentThread().getName()
);
return Mono.just(i * 10);
})
.subscribe(i -> System.out.format(
"subscribe(%d) - %s\n",
i,
Thread.currentThread().getName())
);
Thread.sleep(1000);
This is the result:
map(1) - parallel-1
flatMap(10) - parallel-1
map(2) - parallel-2
flatMap(20) - parallel-2
map(4) - parallel-4
flatMap(40) - parallel-4
map(5) - parallel-5
flatMap(50) - parallel-5
map(3) - parallel-3
flatMap(30) - parallel-3
subscribe(300) - parallel-3
subscribe(100) - parallel-1
subscribe(500) - parallel-5
subscribe(200) - parallel-2
subscribe(400) - parallel-4
This time, everything is run in five parallel threads.
But just like publishOn
, the place of runOn
matters.
If we move runOn
after map
:
Flux.just(1, 2, 3, 4, 5)
.parallel()
.map(i -> {
System.out.format(
"map(%d) - %s\n",
i,
Thread.currentThread().getName()
);
return i * 10;
})
.runOn(
Schedulers.parallel()
)
.flatMap(i -> {
System.out.format(
"flatMap(%d) - %s\n",
i,
Thread.currentThread().getName()
);
return Mono.just(i * 10);
})
.subscribe(i -> System.out.format(
"subscribe(%d) - %s\n",
i,
Thread.currentThread().getName())
);
Thread.sleep(1000);
This will be the result:
map(1) - main
map(2) - main
flatMap(10) - parallel-1
map(3) - main
subscribe(100) - parallel-1
map(4) - main
flatMap(30) - parallel-3
subscribe(300) - parallel-3
flatMap(20) - parallel-2
flatMap(40) - parallel-4
subscribe(400) - parallel-4
map(5) - main
subscribe(200) - parallel-2
flatMap(50) - parallel-5
subscribe(500) - parallel-5
As you can see, map
is executed on the main thread, unlike flatMap
and subscribe
.
By default, ParallelFlux
splits the sequence into the total number of available CPU cores in a round-robin fashion. However, there are other two versions of parallel
to customize this. The first one prepares a Flux
by dividing data on a number of rails matching the provided parallelism parameter in a round-robin fashion:
ParallelFlux<T> parallel(
int parallelism
)
The second one uses a custom prefetch amount and queue for dealing with the source Flux
’s values:
ParallelFlux<T> parallel(
int parallelism,
int prefetch
)
Once again, the default prefetch value is defined by Queues.SMALL_BUFFER_SIZE.
This way, we can create only two rails to execute the sequence in two parallel threads by using parallel(2)
:
Flux.just(1, 2, 3, 4, 5)
.parallel(2)
.runOn(
Schedulers.parallel()
)
.map(i -> {
System.out.format(
"map(%d) - %s\n",
i,
Thread.currentThread().getName()
);
return i * 10;
})
.flatMap(i -> {
System.out.format(
"flatMap(%d) - %s\n",
i,
Thread.currentThread().getName()
);
return Mono.just(i * 10);
})
.subscribe(i -> System.out.format(
"subscribe(%d) - %s\n",
i,
Thread.currentThread().getName())
);
Thread.sleep(1000);
This is the result:
map(2) - parallel-2
flatMap(20) - parallel-2
subscribe(200) - parallel-2
map(4) - parallel-2
flatMap(40) - parallel-2
subscribe(400) - parallel-2
map(1) - parallel-1
flatMap(10) - parallel-1
subscribe(100) - parallel-1
map(3) - parallel-1
flatMap(30) - parallel-1
subscribe(300) - parallel-1
map(5) - parallel-1
flatMap(50) - parallel-1
subscribe(500) - parallel-1
As you can see, only two threads (parallel-1
and parallel-2
) are used this time.
In addition to runOn
, ParallelFlux
has other methods that you must know:
- from(org.reactivestreams.Publisher<? extends T>) to start processing a regular Publisher in rails, where each rail covers a subset of the original
Publisher
’s data. This is an alternative toFlux.parallel()
, which is a shortcut to achieve that on aFlux
. - sequential() to merge the sources back into a single
Flux
. - then() to emit an
onComplete
oronError
signal once all values across rails have been observed. It returns aMono<Void>
. - subscribe(Subscriber) if you simply want to subscribe to the merged sequence. Other variants like
subscribe(Consumer)
instead do multiple subscribes, one on each rail (this means that the lambdas should be as stateless and side-effect free as possible).
For example, you can use sequential()
to switch back to a normal Flux
that will sequentially process the stream:
Flux.just(1, 2, 3, 4, 5)
.parallel()
.runOn(
Schedulers.parallel()
)
.map(i -> {
System.out.format(
"map(%d) - %s\n",
i,
Thread.currentThread().getName()
);
return i * 10;
})
.sequential()
.publishOn(
Schedulers.boundedElastic()
)
.flatMap(i -> {
System.out.format(
"flatMap(%d) - %s\n",
i,
Thread.currentThread().getName()
);
return Mono.just(i * 10);
})
.subscribe(i -> System.out.format(
"subscribe(%d) - %s\n",
i,
Thread.currentThread().getName())
);
Thread.sleep(1000);
Notice that we also need to specify another Scheduler
to process the stream sequentially with publishOn
.
This way, flatMap
and subscribe
will be executed in a bounded elastic thread:
map(1) - parallel-3
map(3) - parallel-5
map(4) - parallel-6
map(5) - parallel-1
map(2) - parallel-4
flatMap(10) - boundedElastic-1
subscribe(100) - boundedElastic-1
flatMap(30) - boundedElastic-1
subscribe(300) - boundedElastic-1
flatMap(40) - boundedElastic-1
subscribe(400) - boundedElastic-1
flatMap(50) - boundedElastic-1
subscribe(500) - boundedElastic-1
flatMap(20) - boundedElastic-1
subscribe(200) - boundedElastic-1
Or with groups()
, you can group the elements based on the index of the processing thread.
Here’s an example:
Flux.just(1, 2, 3, 4, 5, 6)
.parallel(2)
//.runOn(Schedulers.parallel())
.groups()
.flatMap(group -> {
System.out.format(
"flatMap(%d) - %s\n",
group.key(),
Thread.currentThread().getName()
);
return group.collectList();
})
.subscribe(l -> System.out.format(
"subscribe(%s) - %s\n",
l.toString(),
Thread.currentThread().getName())
);
groups()
returns a normal Flux
of type Flux<GroupedFlux<Integer,T>>
, so runOn
is optional, it won’t have any effect here because we’re not longer working with a ParallelFlux
.
This is the result:
flatMap(0) - main
flatMap(1) - main
subscribe([1, 3, 5]) - main
subscribe([2, 4, 6]) - main
With parallel(2)
, we create two rails. Then, they are converted with groups()
into two groups (0
and 1
). One group will have the elements [1, 3, 5]
and the other one the elements [2, 4, 6]
.
Also, notice that flatMap
and subscribe
are executed in the main (default) thread, as there’s no publishOn
operator to change the threading context.