Getting started with Reactive Programming and Reactor Core

Before getting into this tutorial, I recommend that you read our “Understanding reactive programming in Java” post, as it will lay down the foundation to what we will do in this tutorial and will make things a lot easier to understand.

In this post, we will go through a sequence of steps that will allow you to get to know Reactor core and the basics of reactive programming if you are just getting started.

What is Reactor Core?

Reactor Core (AKA Project Reactor) is a reactive programming framework which follows the Reactive Streams specifications. Reactor Core is fully non-blocking and can integrate directly with Java 8 functional APIs.
What we like about Reactor Core is that it acts as the basis of the Reactive Spring framework. Therefore, knowledge gained of the Reactor Core is directly applicable to Reactive Spring, and it also will level that learning curve when learning Reactive Spring.

It is important to point out that Reactor and Reactive Streams are different from RxJava. Thanks to the Java community, we have many reactive frameworks which could be used to create reactive systems in Java. So, it is important not to confuse or mix up these frameworks as they have similarities and differences.

The best way to learn something is by example, so let us start by building a basic reactive program.

Adding the required dependencies

In order to use Reactor in your Java application, you will need to add the required dependencies. If you are using maven, you can add the Reactor core dependency as follows.

        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
            <version>3.2.12.RELEASE</version>
        </dependency>


Please make sure to pick up the latest version of the dependency by checking the Maven Repository

For demonstration purposes, we will also add a dependency to Logback. This will allow us to log statements from different threads and track how our program executes.

        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.1.3</version>
        </dependency>



Configure Logback

As mentioned before, we would like to track our program execution using Logback. For that, we would need to configure Logback. We will keep our example as simple as possible and for that, we will be using a simple standard output appender.

Let us start by create a logback.xml file in the resources folder.

<?xml version="1.0" encoding="UTF-8"?>
<configuration>

    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
   
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
    </appender>

    <root level="INFO">
        <appender-ref ref="STDOUT" />
    </root>
</configuration>


The most important part of this configuration is the %thread . This will add the thread name to each logging statement, allowing us to track program execution.

Getting started with Reactor Core in Java

In our Understanding reactive programming in Java post, we mentioned that there are 4 main tools to use when writing a Reactive program. These are a Publisher, a Subscriber, a Subscription and a Producer.

Since Reactor Core follows the Reactive Streams specifications, it implements two types of publishers.

  • The first is a Mono, which is a producer that produces zero or one items.
  • The second is a Flux, which is a producers that produces zero to n items.


In order to observe how a reactive program works differently from a traditional sequential execution one, we will start by writing a simple program that logs a list of strings, one by one.

The first step is to create a publisher which emits strings. For this, we will use the Flux.just(T…data) function.

Flux.just("Word 1", "Word 2", "Word 3");


This will create a flux which will publish an n amount of strings every time n amount of items are requested through the subscriptions, or until the flux is complete and has no more items to publish.

The next step is to define a subscriber and a subscription to request and consume the published items. The advantage of using Reactor Core is that there are tons of built in functionalities. Therefore, we will not need to implement everything from scratch.

Again, for the sake of simplicity, we will subscribe to our publisher using the  Flux.subscribe method. In our implementation, there will not be any backpressure implemented. This means that the subscriber will not dictate the rate at which the items are published and all items will be published as fast as the publisher can emit them.

To subscribe to a publisher, the following functions need to be implemented:

  • onNext: This function will be called when an item is published by the publisher.
  • OnError: This function will be called when an error occurs on the publisher’s side.
  • onComplete: This function will be called when the publisher is done publishing all items.
  • onSubscribe: This function is called once the subscriber has subscribed to the publisher.


These functions can be passed to the subscribe method either via passing a subscriber to the function, or by passing Consumers to consume each of the callback events that occur.

Consumer is an interface that was introduced in Java 8. The interface has a single abstract function which accepts a single argument of a specific Type. The function has no return type and it’s implementation depends on what the programmer defines. In other words, it just “consumes” the given data. The consumer interface was introduced to facilitate writing more parts of Java programs in a functional way.

The most basic approach is to provide consumers to the subscribe function. For this, we will need to pass three consumers. The first will implement the onNext function / consumer. The second will implement the onError. We will leave this one empty for now. The third consumer takes no arguments as it will implement the onComplete function.

You can find the complete example below.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;


public class ReactiveExample {

    private static final Logger log = LoggerFactory.getLogger(ReactiveExample.class);

    public static void main(String[]args){
        Flux<String> myStrings = Flux.just("Word 1", "Word 2", "Word 3");
        log.info("Some log message");
        myStrings.subscribe(
                s -> log.info("Got data {}", s), //onNext function implementation
                err -> {}, //onError function implementation
                () -> log.info("Completed") //onComplete function implementation
        );
        log.info("Some other log message");
    }

}


One of the advantages of using Reactor Core is helps us setup a simple publisher/subscriber in a few lines of codes. We did not need to implement the onSubscribe function as a default one is already built in.

If we run this program, we will get the following results.

16:59:32.800 [main] INFO  ReactiveExample - Some log message
16:59:32.810 [main] INFO  ReactiveExample - Got data Word 1
16:59:32.812 [main] INFO  ReactiveExample - Got data Word 2
16:59:32.812 [main] INFO  ReactiveExample - Got data Word 3
16:59:32.812 [main] INFO  ReactiveExample - Completed
16:59:32.812 [main] INFO  ReactiveExample - Some other log message

Process finished with exit code 0


There are a few things to notice here:

  • The whole program ran on the main thread.
  • All data items were consumed before the complete function was called.
  • The Flux we created respected the order of the items when it was publishing the items in a reactive stream.


As you have noticed, this program execution does not differ from a regular Java program as all operations were executed on the main thread. In order to start taking advantage of the Reactor Core libaray, we will have to define a thread pool or a scheduler.

For this, we can use the Schedulers class from the Reactor Core library. In the next implementation, we will use the Schedulers class to create a thread pool from which, threads will be used to execute the onNext, onError and the onComplete functions.

We can do this by simply using the Schedulers.newSingle function. The function will take as an argument the thread name prefix and will return a Scheduler that hosts a single threaded executor-based worker.

We can the use Flux’s subscribeOn function, to indicate that the subscriber functions need to be executed on the given scheduler. Our example will now look as follows.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;


public class ReactiveExample {

    private static final Logger log = LoggerFactory.getLogger(ReactiveExample.class);

    public static void main(String[]args){
        Flux<String> myStrings = Flux.just("Word 1", "Word 2", "Word 3");
        log.info("Some log message");
        myStrings.subscribeOn(Schedulers.newParallel("SomeParallelThread"))
                .subscribe(
                s -> log.info("Got data {}", s), //onNext function implementation
                err -> {}, //onError function implementation
                () -> log.info("Completed") //onComplete function implementation
        );

        log.info("Some other log message");
    }

}

If we run the program now, we will get the following output.

17:12:59.669 [main] INFO  ReactiveExample - Some log message
17:12:59.697 [main] INFO  ReactiveExample - Some other log message
17:12:59.700 [SomeParallelThread-1] INFO  ReactiveExample - Got data Word 1
17:12:59.702 [SomeParallelThread-1] INFO  ReactiveExample - Got data Word 2
17:12:59.702 [SomeParallelThread-1] INFO  ReactiveExample - Got data Word 3
17:12:59.702 [SomeParallelThread-1] INFO  ReactiveExample - Completed


Notice from the new results that:

  • The subscriber functions were executed on a different thread.
  • The subscribe function returns instantaneously. 

While this example might be a simple one, the advantages are big. Since the subscribe function returns instantaneously, it does not need to wait for all the items to be processed. This means that your application remains responsive as the main thread is free.

In a user interactive application, this could be useful in improving user experience. For example, you send your user a notification from the main thread (as an example, please do not do that in real life) that their request is being processed, while the other thread is processing their results.

One last thing to note is that if we provided the subscribeOn method a multi-threaded scheduler, the items emitted by the publisher would still be processed by a single thread. This is because the subscriber will run on the first thread that it finds in the provided thread pool. If you would like to introduce parallelism there, you would need to convert your Flux to a ParallelFlux. We will explore parallelism in a future post.

Why learn Reactor Core First?

We have previously mentioned that Reactor Core is one of various solutions that exist in the reactive Java programming sphere. So why would one prefer to start learning Reactor Core before other frameworks?

Well, it all depends on your application stack and environment. For example, if you plan to introduce Reactive programming to an existing Spring based application, or if you intend to create a new Spring based micro-service, then I would recommend learning Reactor Core first. This is because Reactive Spring and Webflux are based on the Reactor Core library. Therefore, whatever you learn from Reactor Core can be directly applicable to your Spring applications.

However, if you are an Android developer, then you might prefer to go to the RxJava route. For example, RxAndroid is a reactive android framework which is based on RxJava. Therefore, it would make sense to put your initial learning effort there.

Nonetheless, most reactive frameworks are based on the same principles. Therefore, when you work on one framework and then switch to working on another project with the other framework, you will find the transition to be mostly smooth. 

Summary

In this tutorial, we discussed what Reactor Core is, how to use it and why would you prefer using it over other reactive libraries in Java. However, we just covered the tip of the ice-berg here.

There are still tons of topics to be discussed here, such as reactive processors, parallel execution, blocking vs non-blocking APIs, backpressure, to name a few.

We intend to keep publishing more tutorials related to Reactor Core, so stay tuned!