Subscribe to a Mono in Java Reactor

To consume from the Reactive Stream, in this case, a Mono, we need to subscribe to it. In this post, you will learn how to subscribe to a Mono in Java Reactor.

How to Subscribe to a Mono?

When we subscribe to a Publisher(Mono), it starts emitting signals like:

  • onNext 
    When the Publisher receives a request from the Consumer, it starts emitting data in the form of a stream of events by invoking the onNext() method of the Subscriber interface.
  • onError
    In case of an error, the exception will be sent in the form of an event to the Subscriber.
    Using the onError() method.
  • onComplete
    When the Publisher finishes with sending data, it will notify the Consumer via the onComplete() method.

Take a look at Introduction to Reactive Streams for more on Stream events.

Nothing will happen with the Publisher if we don’t start consuming it.

When we call one of the overloaded subscribe() methods from the Subscriber interface, we are requesting the Publisher to start emitting data.

Let’s see an example:

import reactor.core.publisher.Mono;

class ReactiveJavaTutorial {

  public static void main(String[] args) {

    // create a Mono
    Mono<String> mono = Mono.just("Hello");

    // subscribe to a Mono
    mono.subscribe();

  }
}

Here, we started consuming the data, but we are not doing anything with it. 

Let’s print what we get from Mono:

import reactor.core.publisher.Mono;

class ReactiveJavaTutorial {

  public static void main(String[] args) {

    // create a Mono
    Mono<String> mono = Mono.just("Hello");

    // subscribe to a Mono
    mono.subscribe(data -> System.out.println(data));

  }
}
Output: Hello
 
This was one simple example of how to subscribe to a Mono using the subscribe(Consumer<? super T> consumer) method that subscribes a Consumer to the Mono.

Let’s try something else. Let’s subscribe and define what should be triggered for each of the 3 events.
import reactor.core.publisher.Mono;

class ReactiveJavaTutorial {

  public static void main(String[] args) {

    // create a Mono
    Mono<String> mono = Mono.just("Hello");

    // subscribe to a Mono
    mono.subscribe(
            data -> System.out.println(data), // onNext
            err -> System.out.println(err),  // onError
            () -> System.out.println("Completed!") // onComplete
    );

  }
}
Output: Hello Completed!
 
We used here the subscribe(Consumer<? super T> consumer, Consumer <? super Throwable> errorConsumer, Runnable completeConsumer) method that subscribes a Consumer to this Mono that will respectively consume all the elements in the sequence, handle errors and react to completion.

Now, let’s invoke the onError signal:

import reactor.core.publisher.Mono;

class ReactiveJavaTutorial {

  public static void main(String[] args) {

    // create a Mono
    Mono<String> mono = Mono.fromSupplier(() -> {
      throw new RuntimeException("Exception occurred!");
    });

    // subscribe to a Mono
    mono.subscribe(
            data -> System.out.println(data), // onNext
            err -> System.out.println("ERROR: " + err),  // onError
            () -> System.out.println("Completed!") // onComplete
    );

  }
}
Output: ERROR: java.lang.RuntimeException: Exception occurred!
 
That was all about how to subscribe to a Mono in Java Reactor. Proceed to the next lesson.

Happy coding!

Leave a Reply

Your email address will not be published. Required fields are marked *