Flux vs Mono

Java Reactive Programming : Flux vs Mono

Publisher:

We had already mentioned in the previous article that reactive-streams has defined the specification for reactive libraries. As per that, A Publisher is a provider of a potentially unbounded number of sequenced elements, publishing them according to the demand received from its Subscriber(s). Reactor-core has a set of implementations of this Publisher interface. The 2 important implementations from which we would be creating sequences are Flux and Mono.

Java, as part part of the release 8, had introduced stream and optional. Flux and Mono can be compared with Stream and Optional.

  • Stream is a pipeline of computational operations through which 0 . . . . N elements are conveyed from a data source to produce desired result.
  • Optional is same like stream. But it is 0 or 1 element.

Stream Pipeline:

Lets first see how stream works.

//simple list
List<Integer> intList = Arrays.asList(1,2,3,4,5);
//creating stream from list
Stream<Integer> intStream = intList.stream()
.filter(i -> i > 2 )
.map(i -> i * i);
List<Integer> list1 = intStream
.collect(Collectors.toList());
//print
System.out.println(list1);

//output
[9, 16, 25]

Now lets add this statement to collect the calculated squares into list2 and see what happens

List<Integer> list2 = intStream
.collect(Collectors.toList());

It will throw below exception!!!

java.lang.IllegalStateException: stream has already been operated upon or closed

Note:

  • A stream pipeline can have N number of intermediate operations. But it can have only one terminal operator. That’s why the second collect did not work and throws the exception. Because the elements have been already collected from the stream into a list and the stream is closed.
  • Stream pipeline is synchronous.

Flux:

Flux is an implementation of Publisher. It will emit 0 . . . N elements and/or a complete or an error call. (Image courtesy: project reactor site). Stream pipeline is synchronous whereas Flux pipeline is completely asynchronous.

empty →To emit 0 element / or return empty Flux<T>

Flux.empty()
.subscribe(i -> System.out.println("Received : " + i));

//No output

just →The easiest way to emit an element is using the just method.

  • subscribe method accepts a Consumer<T> where we define what we do with the emitted element.
Flux.just(1)
.subscribe(i -> System.out.println("Received : " + i));

//Output
Received : 1

Unlike stream, Flux can have any number of Observers connected to the pipeline. We can also write like this if we need to connect more than 1 observers to a source.

  • 1 observer might be collecting all elements into a list while other observer could be logging the element details.
Flux<Integer> flux = Flux.just(1);//Observer 1
flux.subscribe(i -> System.out.println("Observer-1 : " + i));
//Observer 2
flux.subscribe(i -> System.out.println("Observer-2 : " + i));

//Output
Observer-1 : 1
Observer-2 : 1
  • just with arbitrary elements
Flux.just('a', 'b', 'c')
.subscribe(i -> System.out.println("Received : " + i));

//Output
Received : a
Received : b
Received : c

We can have multiple observer and each observer will the process the emitted elements independently. They might take their own time. Everything happens asynchronously.

  • The below output shows that the entire pipeline is executed asynchronously by default.
System.out.println("Starts");

//flux emits one element per second
Flux<Character> flux = Flux.just('a', 'b', 'c', 'd')
.delayElements(Duration.ofSeconds(1));
//Observer 1 - takes 500ms to process
flux
.map(Character::toUpperCase)
.subscribe(i -> {
sleep(500);
System.out.println("Observer-1 : " + i);
});
//Observer 2 - process immediately
flux.subscribe(i -> System.out.println("Observer-2 : " + i));

System.out.println("Ends");

//Just to block the execution - otherwise the program will end only with start and end messages
//Output
Starts
Ends
Observer-2 : a
Observer-1 : A
Observer-2 : b
Observer-1 : B
Observer-2 : c
Observer-2 : d
Observer-1 : C
Observer-1 : D

In the above code, I added below log method to better understand the behavior.

  • We have 2 observers subscribed to the source. This is why we have onSubscribe method
  • request(32) — here 32 is the default buffer size. Observer requests for 32 elements to buffer/emit.
  • elements are emitted one-by-one.
  • Once all the elements are emitted. complete call is invoked to inform the observers not to expect any more elements.
Flux<Character> flux = Flux.just('a', 'b', 'c', 'd')
.log()
.delayElements(Duration.ofSeconds(1));

Output:

[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[ INFO] (main) | request(32)
[ INFO] (main) | onNext(a)
[ INFO] (main) | onNext(b)
[ INFO] (main) | onNext(c)
[ INFO] (main) | onNext(d)
[ INFO] (main) | onComplete()
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[ INFO] (main) | request(32)
[ INFO] (main) | onNext(a)
[ INFO] (main) | onNext(b)
[ INFO] (main) | onNext(c)
[ INFO] (main) | onNext(d)
[ INFO] (main) | onComplete()
  • The subscribe method could accept other parameters as well to handle the error and completion calls. So far we have been consuming the elements received via the pipeline. But we could also get some unhandled exception. We can pass the handlers as shown here.
subscribe(
i -> System.out.println("Received :: " + i),
err -> System.out.println("Error :: " + err),
() -> System.out.println("Successfully completed"))
  • Lets take this example. We get the below output as expected. Here we simply divide 10 by each element.
Flux.just(1,2,3)
.map(i -> 10 / i)
.subscribe(
i -> System.out.println("Received :: " + i),
err -> System.out.println("Error :: " + err),
() -> System.out.println("Successfully completed"));

//Output
Received :: 10
Received :: 5
Received :: 3
Successfully completed
  • Now if we slightly modify our map operation as shown here — we would be doing division by zero which will throw RunTimeException which is handled so well here without the ugly try/catch block.
Flux.just(1,2,3)
.map(i -> i / (i-2))
.subscribe(
i -> System.out.println("Received :: " + i),
err -> System.out.println("Error :: " + err),
() -> System.out.println("Successfully completed"));

//Output
Received :: -1
Error :: java.lang.ArithmeticException: / by zero
  • fromArray — when you have array. just should also work here.
String[] arr = {"Hi", "Hello", "How are you"};

Flux.fromArray(arr)
.filter(s -> s.length() > 2)
.subscribe(i -> System.out.println("Received : " + i));

//Output
Received : Hello
Received : How are you
  • fromIterable — When you have collection of elements and like to pass them via Flux pipeline.
List<String> list = Arrays.asList("vins", "guru");
Flux<String> stringFlux = Flux.fromIterable(list)
.map(String::toUpperCase);
  • fromStream — If you have stream of elements.
List<String> list = Arrays.asList("vins", "guru");
Flux<String> stringFlux = Flux.fromStream(list.stream())
.map(String::toUpperCase);
  • Be careful with Streams!! Flux can have more than 1 observer. But below code will throw error saying that the stream has been closed.
//observer-1
stringFlux
.map(String::length)
.subscribe(i -> System.out.println("Observer-1 :: " + i));
//observer-2
stringFlux
.subscribe(i -> System.out.println("Observer-2 :: " + i));
  • The above problem can be fixed by using Supplier<Stream>
Flux.fromStream(() -> list.stream())
.map(String::toUpperCase);
  • range
//To provide a range of numbers
Flux.range(3, 5)

In all the above options, we already have elements found before emitting. What if we need to keep on finding and emitting elements programmatically? Flux has 2 additional methods for that. But these 2 methods need a separate article to explain as we need to understand what they are for and when to use what! Check here.

  • Flux.create
  • Flux.generate

Mono:

Mono is an another implementation of Publisher. It emits at most one item and then (optionally) terminates with an onComplete signal or an onError signal.. (Image courtesy: project reactor site). Like Flux, Mono is also asynchronous in nature.

  • just — to emit one single item
Mono.just(1)
.subscribe(System.out::println);
  • Both Flux and Mono extends the Publisher<T> interface.
Publisher<Integer> publisher1 = Mono.just(1);
Publisher<Integer> publisher2 = Flux.just(1,2,3);
  • Using Callable/Supplier
Mono.fromCallable(() -> 1);
Mono.fromSupplier(() -> "a");
  • fromRunnable — We know that runnable does not accept any parameter and does not return anything either. So what do you think the below code will do?
Mono.fromRunnable(() -> System.out.println("Hello"))
.subscribe(i -> System.out.println("Received :: " + i));
  • The above code would just print “Hello” and nothing else will happen as it is because there is no item to emit. But if we add the error and complete handler, we get the below output. It is helpful if we need to be notified when a runnable is completed.
Mono.fromRunnable(() -> System.out.println("Hello"))
.subscribe(
i -> System.out.println("Received :: " + i),
err -> System.out.println("Error :: " + err),
() -> System.out.println("Successfully completed"));

//Output
Hello
Successfully completed

Summary:

Hopefully this article gave you a high level idea about creating sequences using Flux and Mono. We also have many other options. The methods we had discussed here are very basic ones. We would be discussing a lot about Reactive programming / reactor / Spring WebFlux. Please check all the articles.

Post a Comment

Previous Post Next Post