4

Reactive Streams with JAVA 9

 3 years ago
source link: https://blog.knoldus.com/reactive-streams-with-java-9/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Reactive Streams with JAVA 9

Reading Time: 5 minutes

With the introduction of Java 9, Java community has started to show its support directly towards Reactive streams, which was earlier used by leveraging third-party libraries. Please visit my earlier blog on Reactive streams to understand the basic ideologies which are working behind it like the push-pull model or backpressure. As a part of this blog, we will explore how we can leverage the same in our code while working with Java 9.

Java 9 includes basic interfaces for each of the fundamental Reactive Stream concepts in the Flow Concurrency library. This allows all Java applications to depend on this one library for Reactive Stream interfaces, rather than deciding on a specific implementation.

Java 9 Flow API:

It consists of interrelated interfaces and static methods for establishing flow-controlled components in which Publishers produce items that are consumed by one or more Subscribers, each managed by a Subscription.
These interfaces correspond to the reactive-streams specification. Communication relies on a simple form of flow control (method Flow.Subscription.request(long)) that can be used to avoid resource management problems that may otherwise occur in “push” based systems.

Modifier and Type Class Description static interface  Flow.Processor<T,R>

A component that acts as both a Subscriber and Publisher.

static interface  Flow.Publisher<T>

A producer of items (and related control messages) received by Subscribers.

static interface  Flow.Subscriber<T>

A receiver of messages.

static interface  Flow.Subscription

To understand the entire cycle of publishing-subscribing, let’s create a Student class that we will use to create the stream message to be sent from publisher to subscriber.

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.ToString;

@Builder
@Getter
@AllArgsConstructor
@ToString
public class Student {
    int id;
    String name;
}

We also have a utility class to create a list of students for our example.

import java.util.ArrayList;
import java.util.List;

public class StudentHelper {

    public static List<Student> getStudents() {

        Student student1 = Student.builder().id(1).name("Jaya").build();
        Student student2 = Student.builder().id(2).name("Rahul").build();
        Student student3 = Student.builder().id(3).name("Megha").build();
        Student student4 = Student.builder().id(4).name("Tapas").build();
        Student student5 = Student.builder().id(5).name("Raghav").build();
        List<Student> studentList = new ArrayList<>();
        studentList.add(student1);
        studentList.add(student2);
        studentList.add(student3);
        studentList.add(student4);
        studentList.add(student5);
        return studentList;
    }
}

Subscriber:

import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;

public class CustomStudentSubscriber implements Subscriber<Student> {

    private Subscription subscription;
    private int counter = 0;

    @Override
    public void onSubscribe(Subscription subscription) {
        System.out.println("onSubscribe for CustomStudentSubscriber called");
        this.subscription = subscription;
        this.subscription.request(1); //requesting data from publisher
        System.out.println("onSubscribe for CustomStudentSubscriber requested 1 student");
    }

    @Override
    public void onNext(Student student) {
        System.out.println("Processing Student " + student);
        counter++;
        this.subscription.request(1);
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("Some error happened");
        e.printStackTrace();
    }

    @Override
    public void onComplete() {
        System.out.println("All Processing Done");
    }

    public int getCounter() {
        return counter;
    }
}
  • We are using the subscription variable to keep references so that requests can be made in the onNext method.
  • The counter variable is being used to keep count of the number of items processed. Its value is increased in onNext method. This will be used in our main method to wait for execution to finish before ending the main thread.
  • The subscription request is invoked in onSubscribe method to start the processing. After processing the item, it is called again in the onNext method to process the next item
  • onError and onComplete should be used to perform corrective measures when an error occurs or cleanup of resources when processing completes successfully.

Test Program for Reactive Stream :

Let’s look at the test program for our reactive stream implementation. We will use SubmissionPublisher as Publisher here.

import java.util.List;
import java.util.concurrent.SubmissionPublisher;

public class StudentSubscriptionTestApp {

    public static void main(String[] args) throws InterruptedException {

        // Create Publisher
        SubmissionPublisher<Student> publisher = new SubmissionPublisher<>();

        // Register Subscriber
        CustomStudentSubscriber subs = new CustomStudentSubscriber();
        publisher.subscribe(subs);

        List<Student> students = StudentHelper.getStudents();

        // Publish items
        System.out.println("Publishing Items to Subscriber");
        students.forEach(publisher::submit);

        // logic to wait till processing of all messages are over
        while (students.size() != subs.getCounter()) {
            Thread.sleep(10);
        }
        // close the Publisher
        publisher.close();

        System.out.println("Exiting the app");

    }

}

The most important piece of the above code is the subscribe and submit methods invocation of the publisher. We should always close the publisher to avoid any memory leaks.

Output:

Publishing Items to Subscriber
onSubscribe for CustomStudentSubscriber called
onSubscribe for CustomStudentSubscriber requested 1 student
Processing Student Student(id=1, name=Jaya)
Processing Student Student(id=2, name=Rahul)
Processing Student Student(id=3, name=Megha)
Processing Student Student(id=4, name=Tapas)
Processing Student Student(id=5, name=Raghav)
Exiting the app
All Processing Done

Message Transformation:

We use a Processor to transform the message between a publisher and subscriber. Let’s say we have another subscriber who is expecting a different type of message to process. Let’s say this new message type is EngineeringStudent.

import lombok.Getter;
import lombok.Setter;

@Getter
@Setter
public class EngineeringStudent extends Student {
    private int eid;
    public EngineeringStudent(int id, int eid, String name) {
        super(id, name);
        this.eid = eid;
    }

    @Override
    public String toString() {
        return "[id=" + super.getId() + ",name=" + super.getName() + ",eid=" + eid + "]";
    }
}

We have a new subscriber to consume EngineeringStudent stream data.

import java.util.concurrent.Flow;

public class CustomEngineeringStudentSubscriber implements Flow.Subscriber<EngineeringStudent> {

    private Flow.Subscription subscription;
    private int counter = 0;
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.println("Subscribed for Engineering Student");
        this.subscription = subscription;
        this.subscription.request(1); //requesting data from publisher
        System.out.println("onSubscribe requested 1 item for Engineering Student");
    }

    @Override
    public void onNext(EngineeringStudent engineeringStudent) {
        System.out.println("Processing Engineering Student " + engineeringStudent);
        counter++;
        this.subscription.request(1);
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("Some error happened in CustomEngineeringStudentSubscriber");
        e.printStackTrace();
    }

    @Override
    public void onComplete() {
        System.out.println("All Processing Done for CustomEngineeringStudentSubscriber");
    }

    public int getCounter() {
        return counter;
    }
}

Processor:

The important part is the implementation of the Processor interface. Since we want to utilize the SubmissionPublisher, we would extend it and use it wherever applicable.

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Function;

public class CustomProcessor extends SubmissionPublisher<EngineeringStudent> implements Flow.Processor<Student, EngineeringStudent> {

    private Flow.Subscription subscription;
    private Function<Student, EngineeringStudent> function;
    public CustomProcessor(Function<Student, EngineeringStudent> function) {
        super();
        this.function = function;
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(Student std) {
        submit((EngineeringStudent) function.apply(std));
        subscription.request(1);
    }

    @Override
    public void onError(Throwable e) {
        e.printStackTrace();
    }

    @Override
    public void onComplete() {
        System.out.println("Done");
    }

}
  • Function will be used to convert Student object to EngineeringStudent object.
  • We will convert the incoming Student message to the EngineeringStudent message in onNext method and then use SubmissionPublisher submit method to send it to the subscriber.
  • Since Processor works as both subscriber and publisher, we can create a chain of processors between end publishers and subscribers.

MessageTransformation Test:

import java.util.List;
import java.util.concurrent.SubmissionPublisher;

public class ProcessorTestApp {

    public static void main(String[] args) throws InterruptedException {
        // Create End Publisher
        SubmissionPublisher<Student> publisher = new SubmissionPublisher<>();

        // Create Processor
        CustomProcessor transformProcessor = new CustomProcessor(student ->
                new EngineeringStudent(student.getId(), student.getId() + 100, student.getName()));

        //Create End Subscriber
        CustomEngineeringStudentSubscriber subs = new CustomEngineeringStudentSubscriber();

        //Create chain of publisher, processor and subscriber
        publisher.subscribe(transformProcessor); // publisher to processor
        transformProcessor.subscribe(subs); // processor to subscriber

        List<Student> emps = StudentHelper.getStudents();

        // Publish items
        System.out.println("Publishing Items to Subscriber");
        emps.forEach(publisher::submit);

        // Logic to wait for messages processing to finish
        while (emps.size() != subs.getCounter()) {
            Thread.sleep(10);
        }

        // Closing publishers
        publisher.close();
        transformProcessor.close();

        System.out.println("Exiting the app");
    }
}

Output:

Publishing Items to Subscriber
Subscribed for Engineering Student
onSubscribe requested 1 item for Engineering Student
Processing Engineering Student [id=1,name=Jaya,eid=101]
Processing Engineering Student [id=2,name=Rahul,eid=102]
Processing Engineering Student [id=3,name=Megha,eid=103]
Processing Engineering Student [id=4,name=Tapas,eid=104]
Processing Engineering Student [id=5,name=Raghav,eid=105]
Exiting the app
All Processing Done for CustomEngineeringStudentSubscriber
Done

Cancel Subscription:

We can use Subscription cancel method to stop receiving the message in subscriber. Note that if we cancel the subscription, then subscriber will not receive onComplete or onError signal.

Here is a sample code where subscriber is consuming only 2 messages and then canceling the subscription.

@Override
public void onNext(Student student) {
    System.out.println("Processing Student " + student);
    counter++;
    if(counter==2) {
        this.subscription.cancel();
        return;
    }
    this.subscription.request(1);
}

Conclusion:

We learned how to create a processing Flow consisting of a Publisher and a Subscriber and a Processor. Java 9 Flow API is a good move towards reactive programming and to create an asynchronous non-blocking application. However, creating a true reactive application is possible only when all the systems API support it.
The above examples can be found at this Github repo.

I hope, you have liked my blog. If you have any doubts or suggestions to make please drop a comment. Thanks!

References:
Java 9 Flow official doc
knoldus-blog-footer-banner


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK