Skip to main content Link Menu Expand (external link) Document Search Copy Copied
Dark theme

Using publishOn


This operator allows you to modify the threading context (Scheduler) of the downstream operators in a reactive sequence, meaning that the signals onNext, onComplete, and onError will run on the provided Scheduler.

Here’s the definition of this operator for Mono:

Mono<T> publishOn(Scheduler scheduler)

It runs onNext, onComplete and onError on the supplied Scheduler.

For Flux, there are three versions of this operator that run onNext, onComplete and onError on the supplied Scheduler, discarding elements internally queued for backpressure upon cancellation or error triggered by a data signal.

The first version only takes the Scheduler:

Flux<T> publishOn(
    Scheduler scheduler
)

The seconds one takes an additional prefetch argument that defines the asynchronous boundary capacity:

Flux<T> publishOn(
    Scheduler scheduler, 
    int prefetch
)

And the third version takes and additional delayError argument that indicates if the buffer is consumed before forwarding any error:

Flux<T> publishOn(
    Scheduler scheduler, 
    boolean delayError, 
    int prefetch
)

The operators after publishOn will keep using the specified threading context up to a new occurrence of this operator. So the place of publishOn matters.

Let’s review an example:

Flux.just(1, 2, 3, 4, 5)
    .map(i -> {
        System.out.format("map(%d) - %s\n",
                i,
                Thread.currentThread().getName());
        return i * 10;
    })
    .publishOn(
        Schedulers.newSingle("singleScheduler")
    )
    .flatMap(i -> {
        System.out.format("flatMap(%d) - %s\n",
                i,
                Thread.currentThread().getName());
        return Mono.just(i * 10);
    })
    .subscribe(i -> System.out.format(
                        "subscribe(%d) - %s\n",
                        i,
                        Thread.currentThread().getName()
                    )
    );
Thread.sleep(1000);

Between the map and flatMap operators, there’s a call to publishOn that will change the threading context to a new single Scheduler.

You can see this when running the program:

map(1) - main
map(2) - main
flatMap(10) - singleScheduler-1
map(3) - main
map(4) - main
map(5) - main
subscribe(100) - singleScheduler-1
flatMap(20) - singleScheduler-1
subscribe(200) - singleScheduler-1
flatMap(30) - singleScheduler-1
subscribe(300) - singleScheduler-1
flatMap(40) - singleScheduler-1
subscribe(400) - singleScheduler-1
flatMap(50) - singleScheduler-1
subscribe(500) - singleScheduler-1

Now, if we add another call to publishOn before the map operator:

Flux.just(1, 2, 3, 4, 5)
    .publishOn(
        Schedulers.newParallel("parallelScheduler")
    )
    .map(i -> {
        System.out.format(
                "map(%d) - %s\n",
                i,
                Thread.currentThread().getName()
        );
        return i * 10;
    })
    .publishOn(
        Schedulers.newSingle("singleScheduler")
    )
    .flatMap(i -> {
        System.out.format(
                "flatMap(%d) - %s\n",
                i,
                Thread.currentThread().getName()
        );
        return Mono.just(i * 10);
    })
    .subscribe(i -> System.out.format(
            "subscribe(%d) - %s\n",
            i,
            Thread.currentThread().getName())
    );
Thread.sleep(1000);

map will be executed on a parallel Scheduler, while flatMap and the subscription will be executed on the context specified by the latest call to publishOn:

map(1) - parallelScheduler-1
map(2) - parallelScheduler-1
flatMap(10) - singleScheduler-2
subscribe(100) - singleScheduler-2
map(3) - parallelScheduler-1
map(4) - parallelScheduler-1
map(5) - parallelScheduler-1
flatMap(20) - singleScheduler-2
subscribe(200) - singleScheduler-2
flatMap(30) - singleScheduler-2
subscribe(300) - singleScheduler-2
flatMap(40) - singleScheduler-2
subscribe(400) - singleScheduler-2
flatMap(50) - singleScheduler-2
subscribe(500) - singleScheduler-2

Similarly, if we move the second call to publishOn after the flatMap operator:

Flux.just(1, 2, 3, 4, 5)
    .publishOn(
        Schedulers.newParallel("parallelScheduler")
    )
    .map(i -> {
        System.out.format(
                "map(%d) - %s\n",
                i,
                Thread.currentThread().getName()
        );
        return i * 10;
    })
    .flatMap(i -> {
        System.out.format(
                "flatMap(%d) - %s\n",
                i,
                Thread.currentThread().getName()
        );
        return Mono.just(i * 10);
    })
    .publishOn(
        Schedulers.newSingle("singleScheduler")
    )
    .subscribe(i -> System.out.format(
            "subscribe(%d) - %s\n",
            i,
            Thread.currentThread().getName())
    );
Thread.sleep(1000);

Only the subscription will happen on the single Scheduler:

map(1) - parallelScheduler-2
flatMap(10) - parallelScheduler-2
map(2) - parallelScheduler-2
flatMap(20) - parallelScheduler-2
subscribe(100) - singleScheduler-3
subscribe(200) - singleScheduler-3
map(3) - parallelScheduler-2
flatMap(30) - parallelScheduler-2
map(4) - parallelScheduler-2
flatMap(40) - parallelScheduler-2
subscribe(300) - singleScheduler-3
subscribe(400) - singleScheduler-3
map(5) - parallelScheduler-2
flatMap(50) - parallelScheduler-2
subscribe(500) - singleScheduler-3

A final note. Switching between threads hurts performance, it’s a heavy operation. For that reason, you should only use publishOn when you really need to move the execution of a sequence to another Scheduler (thread).