12

Handling backpressue in Rxjava

 3 years ago
source link: https://ninadmg.medium.com/handling-backpressue-in-rxjava-4d6383113688
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Handling backpressue in Rxjava

RxJava 2 introduced the concept of backpressure. Backpressure is nothing but a way for the subscriber to limit the number of items emitted by the emitter. Here we will take a look at how we can handle backpressure in RXJava2

What is the concept of Backpressure?

The concept of backpressure is that if the subscriber cannot accept any more of new events then the emitter should not send more events.
Imagine this case where the subscriber is writing every item that it receives to a database. Now if the items are emitted at a very high frequency the subscriber will not be able to keep up with the emitted items. The subscriber will need a way to tell the emitter that it can accept only one item at a time. Backpressure is a mechanism where the emitter emits events only if the subscriber requests for an event.

This essentially makes the RX stream a pull-based stream.

Instead of the emitter emitting all the items in one stretch, the emitter will emit only when the subscriber requests for the data.

Flowables

Since observables do not have back pressure support, a new concept of “flowables” were introduced in Rxjava2. The flowable stream is just like the observable stream. The only difference is that observable is a push-based stream and flowable is a pull-based stream. i.e. an item is emitted only when the subscriber requests for it.

Image for post
Image for post

In the above code snippet, you can see that the subscription object of the flowable is used to request an item. Here only one item will be emitted as we are requesting only for a single item. The frequency of the item emitted is decided by the subscriber.
Here since the frequency items emitted is controlled by the subscriber, the subscriber can request events in its own pace. The subscriber can request for 5 items, once the 5 items are processed the subscriber can request for the next 5 items.

Note: you might have already used flowables and subscribed without explicitly calling request. This is because if the subscribe method is called without the on-subscription parameter, the flowable internally calls request with Long.MAX_VALUE as parameter this makes the flowable to emit just everything it has.

Backpressure Strategy

There are still some cases where we might not be able to control the rate of items emitted. In those cases, we can use a backpressure strategy. Backpressure strategy is like a bridge to the non-back pressure world.
When a flowable operator is not able to keep up with the number of items that are emitted, the operator stores it in a buffer. The backpressure strategy decides what to do when this buffer is full. The backpressure strategy decides if the events should be dropped or replaced when the buffer is full.
We can define any of the 5 back pressure strategies when creating a flowable. This is helpful in cases where we need to throttle the incoming events.

Image for post
Image for post

Creating Flowables that support backpressure

Now that we know how to request and consume events that support back pressure, let us talk about creating flowable that emit only when requested. Flowable can be created using 2 methods. Flowable.create() and Flowable.generate().

Create

Flowable.create() is used to create a flowable with a backpressure strategy. This means that the flowable created using this method will not be a pull-based. The flowable will emit item even if the subscriber did not request anything.

Image for post
Image for post

The emitter emits items when subscribed irrespective of the number of items requested. The remaining items are stored in the buffer of the operators.

Image for post
Image for post

In the above code snipped the flowable will emit 5000 items. But when the downstream is not able to keep up the operators will start to drop events that it cannot accommodate in its buffer.

Generate

Flowable.generate() is used to create a flowable that emits only when requested. The generate method has 2 parameters, the first one in the initial state, and the next one generator that is given the state and an emitter.
The generator function is called every time the subscriber requests for an item. This would make sense when you see the code.

Image for post
Image for post

The generate method emits an item only when it is requested.

Image for post
Image for post

In the above code snippet, the initial state is 1. The biFunction is the generator that emit events. when the subscriber requests for the first time the generator biFunction is called with the initial state (1) and an emitter. Here we emit the item using the on next method and return the new state. when the subscriber requests again, the biFunction will be called with the new state and emitter, and the next item is emitted.
The emitter’s onNext can be called only once, calling it multiple times will throw IllegalStateException. This makes sure that we are not emitting more items requested.

Winding-up

  • For all the cases you need to limit the number of items emitted, Flowable should be used instead of observable.
  • If you are in control of the emitter and the subscriber then use the generate method of the flowable.
  • Use the backpressure strategy if the emitter cannot be paused.

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK