doOn Callbacks in Project Reactor

Project Reactor has doOn Callback operators that we can use to perform custom actions without modifying the elements in the sequence. The doOn Callbacks allow us to peek into the events emitted by the Publisher (Mono or Flux).

We call them the side effect operators since they don’t change the original sequence.

Exploring the doOn Callbacks in Project Reactor

There are many useful doOn Callback operators available in Project Reactor.
In this post, we will explore the most used ones, such as:

  • doOnSubscribe() – gets invoked for every new subscription from the Subscriber
  • doOnNext() – gets invoked for every element that gets emitted from the Publisher
  • doOnComplete() – gets invoked when the Completion signal gets sent from the Publisher
  • doOnError() – gets invoked when the Error signal is sent from the Publisher
  • doFinally() – gets executed at the end in both scenarios successful and failed.

doOnSubscribe() method

The below example shows how we can peek into the onSubscribe event, which happens when the Subscriber subscribes to the Publisher. In this way, we can get the Subscription object that is being sent from the Publisher.

class ReactiveJavaTutorial {

  public static void main(String[] args) {

    Flux.just("data1", "data2", "data3")
            .doOnSubscribe(subscription -> System.out.println("Subscription: " + subscription))
            .subscribe();
  }
}
Output: Subscription: reactor.core.publisher.FluxArray$ArraySubscription@72bca894

doOnNext() method

With this method, we can peek into every data that is being sent from the Publisher.

class ReactiveJavaTutorial {

  public static void main(String[] args) {

    Flux.just("data1", "data2", "data3")
            .doOnNext(data -> System.out.println("Data: " + data))
            .subscribe();
  }
}
Output: Data: data1 Data: data2 Data: data3

doOnComplete() method

We can set some code that will be executed as soon as the Completion signal is sent from the Publisher, using the doOnComplete() method.

class ReactiveJavaTutorial {

  public static void main(String[] args) {

    Flux.just("data1", "data2", "data3")
            .doOnNext(data -> System.out.println("Data: " + data))
            .doOnComplete(() -> System.out.println("Publisher sent Completion signal!"))
            .subscribe();
  }
}
Output: Data: data1 Data: data2 Data: data3 Publisher sent Completion signal!

doOnError() method

The doOnError() method will be executed if some exception occurs and Publisher sends the onError signal to the Subscriber.

class ReactiveJavaTutorial {

  public static void main(String[] args) {

    Mono mono = Mono.fromSupplier(() -> {
      throw new RuntimeException(" an error occurred!"); // invoking the onError signal from the Publisher (Mono)
    });

    mono.doOnError(err -> System.out.println("Error: " + err)).subscribe();

  }
}
Output: Error: java.lang.RuntimeException: an error occurred!

doFinally() method

If we want to get notified in both successful and failed scenarios, we will use the doOnFinally() method.

class ReactiveJavaTutorial {

  public static void main(String[] args) {

    // successful scenario
    Mono.just("data")
            .doFinally(signal -> System.out.println(signal + " signal received."))
            .subscribe();

    // failed scenario
    Mono mono = Mono.fromSupplier(() -> {
      throw new RuntimeException(" an error occurred!"); // invoking the onError signal from the Publisher (Mono)
    });

    mono.doFinally(signal -> System.out.println(signal + " signal received.")).subscribe();

  }
}
Output: onComplete signal received. onError signal received.
 
The doOn Callbacks are very useful for debugging in the local environment.
 
That’s it!

Leave a Reply

Your email address will not be published. Required fields are marked *