Combine Flux and Mono Publishers

We often have a case where we have to combine two Publishers into one.

For example, we have an application that accepts the login request and calls two external services to validate the user credentials. One service returns Flux, and the other returns Mono. To return a correct response to the user, we need to combine Flux and Mono and extract the required user info.

How to combine Flux and Mono?

We can combine Flix and Mono with the following methods:

  • concat() and concatWith()
  • merge() and mergeWith()
  • mergeSequential()
  • zip() and zipWith()


Combine Publishers using the concat() and concatWith() methods

The concat() and concatWith() methods are used to combine two Reactive Streams Publishers into one. Concatenation happens in a sequence, meaning that the first one gets subscribed first.  The code waits for the completion signal of the first one. Then it subscribes to the second Publisher.

The concat() method is a static method declared in a Flux class. The concatWith() method is an instance method, and it is located in both Flux and Mono classes.

Example 1

Combine two Publishers using the concat() method:

class ReactiveJavaTutorial {

  public static void main(String[] args) {

    Flux<String> firstFlux = Flux.fromArray(new String[]{"a", "b", "c"});
    Flux<String> secondFlux = Flux.fromArray(new String[]{"d", "e", "f"});

    // combine two Flux Publishers
    Flux<String> combinedFlux = Flux.concat(firstFlux, secondFlux);

    // subscribe and print the elements of a combined Flux
    combinedFlux.subscribe(element -> System.out.print(element + " "));

  }
}
Output: a b c d e f

Example 2

Combine two Publishers using the concatWith() method:
class ReactiveJavaTutorial {

  public static void main(String[] args) {

    Flux<String> flux = Flux.fromArray(new String[]{"a", "b", "c"});
    Mono<String> mono = Mono.just("f");

    // combine Flux and Mono
    Flux<String> combinedFlux = flux.concatWith(mono);

    // subscribe and print the elements of a combined Flux
    combinedFlux.subscribe(element -> System.out.print(element + " "));

  }
}
Output: a b c f

Combine Publishers using the merge() and mergeWith() methods

With merge() and mergeWith() methods, both Publishers get subscribed at the same time (eagerly). This means that we can get a combined Flux containing a mixed data from both Publishers.

The merge() method is a static method of Flux class, and the mergeWith() is an instance method contained in both Mono and Flux classes.

For this example, we will use the StepVerifier class to assert the order of the elements in a combinedFlux. As a result, you will see that data is mixed. 

Add this dependency to your project:

<dependency>
    <groupid>io.projectreactor</groupid>
    <artifactid>reactor-test</artifactid>
    <version>3.1.4.RELEASE</version>
    <scope>test</scope>
</dependency>


Example

class ReactiveJavaTutorial {

  public static void main(String[] args) {

    StepVerifier.create(returnMerged())
            .expectNext("a")
            .expectNext("c")
            .expectNext("b")
            .expectNext("d")
            .expectComplete()
            .verify();
  }

  private static Flux<String> returnMerged() {

    Flux <String> firstFlux = Flux.fromArray(new String[]{"a", "b"})
            .delayElements(Duration.ofMillis(100));
    Flux <String> secondFlux = Flux.fromArray(new String[]{"c", "d"})
            .delayElements(Duration.ofMillis(125));

    return Flux.merge(firstFlux, secondFlux);
  }

}


Here, you can see that the order of elements is not quite right because we introduced a delay between the elements of the Publishers. We used the delayElements(Duration delay) method that delays each of these Flux elements (Subscriber.onNext(T) signals) by a given Duration.

The same outcome with the mergeWith() method:

class ReactiveJavaTutorial {

  public static void main(String[] args) {

    StepVerifier.create(returnMerged())
            .expectNext("a")
            .expectNext("c")
            .expectNext("b")
            .expectNext("d")
            .expectComplete()
            .verify();
  }

  private static Flux<String> returnMerged() {

    Flux<String> firstFlux = Flux.fromArray(new String[]{"a", "b"})
            .delayElements(Duration.ofMillis(100));
    Flux<String> secondFlux = Flux.fromArray(new String[]{"c", "d"})
            .delayElements(Duration.ofMillis(125));

    return firstFlux.mergeWith(secondFlux);
  }

}


Combine Publishers using the mergeSequential() method

The mergeSequential() method is a static method of the Flux class that subscribes to the Publishers eagerly, just like the merge() and mergeWith() methods, but in this case, the merge happens in a sequence.

Example

class ReactiveJavaTutorial {

  public static void main(String[] args) {

    StepVerifier.create(returnMerged())
            .expectNext("a")
            .expectNext("b")
            .expectNext("c")
            .expectNext("d")
            .expectComplete()
            .verify();
  }

  private static Flux<String> returnMerged() {

    Flux<String> firstFlux = Flux.fromArray(new String[]{"a", "b"})
            .delayElements(Duration.ofMillis(100));
    Flux<String> secondFlux = Flux.fromArray(new String[]{"c", "d"})
            .delayElements(Duration.ofMillis(125));

    return Flux.mergeSequential(firstFlux, secondFlux);
  }
}


Note: If you do not care about the order of elements, you can use one of the merge functions. Otherwise always use concat() and concatWith().

Combine Flux and Mono using the zip() and zipWith() methods

The zip() method is a static method of the Flux class. It is used to merge up to 8 Publishers into one.
There are multiple overloaded versions of the zip() method. In this example, we will use the following one: 

  public static <T1, T2, O> Flux<O> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, 
BiFunction<? super T1, ? super T2, ? extends O> combinator)


This method accepts three arguments. The first two are Publishers that we want to merge, and the third one is a Lambda (combinator).

Example

class ReactiveJavaTutorial {

  public static void main(String[] args) {

    Flux<String> flux1 = Flux.just("A", "B", "C");
    Flux<String> flux2 = Flux.just("D", "E", "F");

    Flux.zip(flux1, flux2, (first, second) -> first + second).subscribe(System.out::println);

  }
}
Output: AD BE CF
 
You see that the output is a bit odd. It’s because this method waits for all the Publishers to emit one element and combines these elements into an output value (specified in the combinator).

The fluxWith() method works the same way, just it is limited to only two Publishers:

class ReactiveJavaTutorial {

  public static void main(String[] args) {

    Flux<Integer> flux1 = Flux.just(1, 2, 3);
    Flux<Integer> flux2 = Flux.just(4, 5, 6);

    flux1.zipWith(flux2, (firstNum, secondNum) -> firstNum * secondNum).subscribe(System.out::println);
 
 }
}
Output: 4 10 18
 
If one Flux has more elements than the other one, these elements will be ignored:
class ReactiveJavaTutorial {

  public static void main(String[] args) {

    Flux<Integer> flux1 = Flux.just(1, 2, 3, 4, 5);
    Flux<Integer> flux2 = Flux.just(4, 5, 6);

    flux1.zipWith(flux2, (firstNum, secondNum) -> firstNum * secondNum).subscribe(System.out::println);
 
 }
}
Output: 4 10 18
 
The Publishers are subscribed eagerly in both methods.
 
That’s it!

Leave a Reply

Your email address will not be published. Required fields are marked *