subscribeOn and publishOn operators in Project Reactor

We use the subscribeOn and publishOn operators for switching the execution context (Scheduler) in a reactive chain.

In the previous lesson, we mentioned that the default behavior is that the same thread that performs a subscription will be used for the whole pipeline execution.

Let’s see one simple example:

class ReactiveJavaTutorial {

  public static void main(String[] args) {

    Flux<String> cities = Flux.just("New York", "London", "Paris", "Amsterdam")
            .map(String::toUpperCase)
            .filter(cityName -> cityName.length() <= 8)
            .map(cityName -> cityName.concat(" City"))
            .log();

    cities.subscribe();

  }
Output: INFO 14040 — [main] reactor.Flux.MapFuseable.1 : | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber) INFO 14040 — [main] reactor.Flux.MapFuseable.1 : | request(unbounded) INFO 14040 — [main] reactor.Flux.MapFuseable.1 : | onNext(NEW YORK City) INFO 14040 — [main] reactor.Flux.MapFuseable.1 : | onNext(LONDON City) INFO 14040 — [main] reactor.Flux.MapFuseable.1 : | onNext(PARIS City) INFO 14040 — [main] reactor.Flux.MapFuseable.1 : | onComplete() }
 
Between the [ ] brackets is a thread name, which is, in this case, main. You can see that the main thread was used through the whole pipeline execution.
 
Sometimes we would like to tell the Reactor not to use the same thread that started the subscription through the whole pipeline. We can do that with the subscribeOn() and publishOn() methods.

The subscribeOn() method

The subscribeOn() method applies to the subscription process. We can place it anywhere in the reactive chain. It accepts Scheduler and picks up the thread from the provided thread pool.

For this example, we use a bounded elastic thread pool (Schedulers.boundElastic()).  See the previous lesson for more on the Scheduler options.

Example

class ReactiveJavaTutorial {

  public static void main(String[] args) {

    Flux<String> cities = Flux.just("New York", "London", "Paris", "Amsterdam")
            .subscribeOn(Schedulers.boundedElastic())
            .map(String::toUpperCase)
            .filter(cityName -> cityName.length() <= 8)
            .map(cityName -> cityName.concat(" City"))
            .log();

    cities.subscribe();

  }
}
Output: INFO 7500 — [main] reactor.Flux.Map.1 : onSubscribe(FluxMap.MapSubscriber) INFO 7500 — [main] reactor.Flux.Map.1 : request(unbounded) INFO 7500 — [boundedElastic-1] reactor.Flux.Map.1 : onNext(NEW YORK City) INFO 7500 — [boundedElastic-1] reactor.Flux.Map.1 : onNext(LONDON City) INFO 7500 — [boundedElastic-1] reactor.Flux.Map.1 : onNext(PARIS City) INFO 7500 — [boundedElastic-1] reactor.Flux.Map.1 : onComplete()
 
You can see that the main thread started the subscription, but it was switched with the boundedElastic-1 thread. We provided the Scheduler (Schedulers.boundElastic()) and one of the threads from its thread pool got picked to replace the main thread.

The publishOn() method

The publishOn() method is similar to the subscribeOn(), but there is one main difference.

First, let’s see one example:

class ReactiveJavaTutorial {

  public static void main(String[] args) {

    Flux.just("New York", "London", "Paris", "Amsterdam")
            .map(ReactiveJavaTutorial::stringToUpperCase)
            .publishOn(Schedulers.boundedElastic())
            .map(ReactiveJavaTutorial::concat)
            .subscribe();
  }

  private static String stringToUpperCase(String name) {
    System.out.println("stringToUpperCase: " + Thread.currentThread().getName());
    return name.toUpperCase();
  }

  private static String concat(String name) {
    System.out.println("concat: " + Thread.currentThread().getName());
    return name.concat(" City");
  }
}


Here, we placed the publishOn() between the two map operators. Let’s see the output:

stringToUpperCase: main
stringToUpperCase: main
stringToUpperCase: main
concat: boundedElastic-1
concat: boundedElastic-1
concat: boundedElastic-1


Everything before the publishOn operator was executed by the main thread and everything after by the boundedElastic-1 thread. That is because the publishOn acts as any other operator. It takes signals from upstream and replays them downstream while executing the callback on a worker from the associated Scheduler.

That is the main difference between the subscribeOn and publishOn operators since the subscribeOn will apply the provided Scheduler to the whole reactive chain, no matter where we placed it.

That was all about how to switch threads in Project Reactor. Proceed to the next lesson.

Happy coding!

Leave a Reply

Your email address will not be published. Required fields are marked *