What is a Scheduler?
The concept of a scheduler is closely related to the concept of a thread.
A thread is usually defined as a lightweight process, but it can also be seen as a path of execution in a program.
Every Java application runs on at least, one thread, usually the main application thread.
However, you don’t always have to worry about this. In the case of Reactor, it doesn’t enforce a particular threading model.
Remember, everything starts when you subscribe. This way, by default, most operators run on the thread on which the subscription is made.
Consider this example:
Flux.just(1, 2, 3, 4, 5)
.map(i -> {
System.out.format(
"map(%d) - %s\n",
i,
Thread.currentThread().getName()
);
return i * 10;
})
.flatMap(i -> {
System.out.format(
"flatMap(%d) - %s\n",
i,
Thread.currentThread().getName()
);
return Mono.just(i * 10);
})
.subscribe(i ->
System.out.format(
"subscribe(%d) - %s\n",
i,
Thread.currentThread().getName()
)
);
At every step of the sequence, it prints the name of the thread on which the operator or method is executing.
This is the result:
map(1) - main
flatMap(10) - main
subscribe(100) - main
map(2) - main
flatMap(20) - main
subscribe(200) - main
map(3) - main
flatMap(30) - main
subscribe(300) - main
map(4) - main
flatMap(40) - main
subscribe(400) - main
map(5) - main
flatMap(50) - main
subscribe(500) - main
As you can see, everything is executed on the main thread, the thread where the subscription is made.
If we modify the example to start the subscription in another thread:
Flux<Integer> integerFlux =
Flux.just(1, 2, 3, 4, 5)
.map(i -> {
System.out.format(
"map(%d) - %s\n",
i,
Thread.currentThread().getName()
);
return i * 10;
})
.flatMap(i -> {
System.out.format(
"flatMap(%d) - %s\n",
i,
Thread.currentThread().getName()
);
return Mono.just(i * 10);
});
Thread myThread = new Thread(() ->
integerFlux
.subscribe(i ->
System.out.format(
"subscribe(%d) - %s\n",
i,
Thread.currentThread().getName()
)
)
);
myThread.start();
myThread.join(); // So the program can wait for this thread to finish
This will be the result:
map(1) - Thread-0
flatMap(10) - Thread-0
subscribe(100) - Thread-0
map(2) - Thread-0
flatMap(20) - Thread-0
subscribe(200) - Thread-0
map(3) - Thread-0
flatMap(30) - Thread-0
subscribe(300) - Thread-0
map(4) - Thread-0
flatMap(40) - Thread-0
subscribe(400) - Thread-0
map(5) - Thread-0
flatMap(50) - Thread-0
subscribe(500) - Thread-0
Now everything is executed on Thread-0
instead of on the main
thread.
However, some operators can change the thread on which an operator or set of operators are executed.
One of these operators is delayElements
, we’ve used it before to delay the emission of a sequence of elements by a given duration.
If we modify the first example to add this operator between map
and flatMap
, as well as a Thread.sleep(1000)
statement to give time to the program to execute:
Flux.just(1, 2, 3, 4, 5)
.map(i -> {
System.out.format(
"map(%d) - %s\n",
i,
Thread.currentThread().getName()
);
return i * 10;
})
.delayElements(Duration.ofMillis(10))
.flatMap(i -> {
System.out.format(
"flatMap(%d) - %s\n",
i,
Thread.currentThread().getName()
);
return Mono.just(i * 10);
})
.subscribe(i -> System.out.format(
"subscribe(%d) - %s\n",
i,
Thread.currentThread().getName()
)
);
Thread.sleep(1000);
This will be the result:
map(1) - main
map(2) - main
map(3) - main
map(4) - main
map(5) - main
flatMap(10) - parallel-1
subscribe(100) - parallel-1
flatMap(20) - parallel-2
subscribe(200) - parallel-2
flatMap(30) - parallel-3
subscribe(300) - parallel-3
flatMap(40) - parallel-4
subscribe(400) - parallel-4
flatMap(50) - parallel-5
subscribe(500) - parallel-5
As you can see, after delayElements
, flatMap
, and subscribe
were executed on a different thread for each element: parallel-1
, parallel-2
, parallel-3
, parallel-4
, and parallel-5
.
In the documentation of delayElements, we can read:
Delay each of these Flux elements (Subscriber.onNext(T) signals) by a given Duration. Signals are delayed and continue on the parallel default Scheduler, but empty sequences or immediate error signals are not delayed.
With a Scheduler
, you can change the thread on which the sequence is executed.
Actually, delayElement
(for Mono
) and delayElements
have a version that takes the Scheduler
on which the delayed sequence will run:
// For Mono
Mono<T> delayElement(
Duration delay,
Scheduler timer
)
// For Flux
Flux<T> delayElements(
Duration delay,
Scheduler timer
)
Other operators that can take a Scheduler
, and therefore, can be executed on a different thread are delaySequence, delaySubscription, and interval, just to mention a few.
But how do you create a Scheduler
?
Well, first of all, a Scheduler is an abstraction similar to ExecutorService, which automatically manages a pool of threads and provides an API for running asynchronous tasks, but to execute operators. In fact, some implementations use internally ExecutorService
.
The Schedulers class provides many factory methods to create different types of Scheduler
implementations:
Scheduler.parallel()
returns aScheduler
suited for parallel work that works with a fixed pool of single-threadedExecutorService
-based workers.Scheduler.immediate()
returns aScheduler
that executes tasks immediately, running them on the thread that submitted them (the thread on which an operator is currently processing). ThisScheduler
is used as a null object when you require aScheduler
that doesn’t change the thread on which the operator is executing.Scheduler.single()
returns aScheduler
that works with a single-threadedExecutorService
-based worker. In other words, thisScheduler
works with a single, reusable thread.Scheduler.boundedElastic()
returns aScheduler
that dynamically creates a bounded number ofExecutorService
-based workers, reusing them once they have been shut down. There’s also aScheduler.elastic()
method that does something similar but it’s now deprecated. The difference is that withelastic()
the maximum number of created thread pools is unbounded, while withboundedElastic()
the maximum number of created threads is bounded by a cap (by default ten times the number of available CPU cores, see DEFAULT_BOUNDED_ELASTIC_SIZE). Besides, withboundedElastic()
, the maximum number of task submissions that can be enqueued and deferred on each of these backing threads is bounded (by default 100K additional tasks, see DEFAULT_BOUNDED_ELASTIC_QUEUESIZE). After those limits, a RejectedExecutionException is thrown.Scheduler.fromExecutorService(ExecutorService)
returns aScheduler
which uses the givenExecutorService
to scheduleRunnable
implementations.
In general, a method prefixed with new
(like newSingle) returns a new instance of that particular Scheduler
implementation, while other methods like parallel()
return an instance that will be created on the first call and cached for subsequent calls until it is disposed of.
This way, instead of relying on the default Scheduler
that some operators use, we can create a custom Scheduler
and pass it to the operators that give you the option to provide one.
For example, we can pass a bounded elastic Scheduler
to the delaySubscription operator:
Flux.just(1, 2, 3, 4, 5)
.map(i -> {
System.out.format(
"map(%d) - %s\n",
i,
Thread.currentThread().getName());
return i * 10;
})
.flatMap(i -> {
System.out.format(
"flatMap(%d) - %s\n",
i,
Thread.currentThread().getName());
return Mono.just(i * 10);
})
.delaySubscription(
Duration.ofMillis(10), Schedulers.boundedElastic()
)
.subscribe(i -> System.out.format(
"subscribe(%d) - %s\n",
i,
Thread.currentThread().getName()
)
);
Thread.sleep(1000);
Here’s the result:
map(1) - boundedElastic-1
flatMap(10) - boundedElastic-1
subscribe(100) - boundedElastic-1
map(2) - boundedElastic-1
flatMap(20) - boundedElastic-1
subscribe(200) - boundedElastic-1
map(3) - boundedElastic-1
flatMap(30) - boundedElastic-1
subscribe(300) - boundedElastic-1
map(4) - boundedElastic-1
flatMap(40) - boundedElastic-1
subscribe(400) - boundedElastic-1
map(5) - boundedElastic-1
flatMap(50) - boundedElastic-1
subscribe(500) - boundedElastic-1
By default, the delay is introduced through a parallel shared Scheduler
(parallel-X
). But as you can see, now is executed on a bounded elastic Scheduler
(boundedElastic-1
).
However, for the operators that don’t take a Scheduler
as a parameter, Reactor provides two methods to modify the Scheduler
a reactive sequence is executed on: publishOn
and subscribeOn
.
Let’s review publishOn
first.