

Android Rxjava :最简单&全面背压讲解 (Flowable)
source link: https://blog.51cto.com/u_16163480/6990190
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Android Rxjava :最简单&全面背压讲解 (Flowable)
精选 原创阅读本文需要对Rxjava了解,如果还没有了解或者使用过Rxjava的兄die们,推荐观看 Android Rxjava:图解不一样的诠释 进行学习。
Rxjava背压
:被观察者发送事件的速度大于观察者接收事件的速度时,观察者内会创建一个无限制大少的缓冲池存储未接收的事件,因此当存储的事件越来越多时就会导致OOM的出现。(注:当subscribeOn与observeOn不为同一个线程时,被观察者与观察者内存在不同时长耗时任务,就会使发送与接收速度存在差异。)
public void backpressureSample(){
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
int i = 0;
while(true){
Thread.sleep(500);
i++;
e.onNext(i);
Log.i(TAG,"每500ms发送一次数据:"+i);
}
}
}).subscribeOn(Schedulers.newThread())//使被观察者存在独立的线程执行
.observeOn(Schedulers.newThread())//使观察者存在独立的线程执行
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Thread.sleep(5000);
Log.e(TAG,"每5000m接收一次数据:"+integer);
}
});
}
例子执行效果


通过上述例子可以大概了解背压是如何产生,因此Rxjava2.0版本提供了 Flowable 解决背压问题。 本文章就是使用与分析 Flowable 是如何解决背压问题。 文章中实例 linhaojian的Github


4.使用与原理详解
4.1 Flowable 与 Observable 的区别

上图可以很清楚看出二者的区别,其实Flowable
出来以上的区别之外,它其他所有使用与Observable完全一样。
Flowable
的create例子
public void flowable(){
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
for(int j = 0;j<=150;j++){
e.onNext(j);
Log.i(TAG," 发送数据:"+j);
try{
Thread.sleep(50);
}catch (Exception ex){
}
}
}
},BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE); //观察者设置接收事件的数量,如果不设置接收不到事件
}
@Override
public void onNext(Integer integer) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.e(TAG,"onNext : "+(integer));
}
@Override
public void onError(Throwable t) {
Log.e(TAG,"onError : "+t.toString());
}
@Override
public void onComplete() {
Log.e(TAG,"onComplete");
}
});
}
4.2 BackpressureStrategy媒体类
从Flowable源码查看,缓存池默认大少为:128
public abstract class Flowable<T> implements Publisher<T> {
/** The default buffer size. */
static final int BUFFER_SIZE;
static {
BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128));
}
.....
}
通过上面的例子,我们可以看到create方法中的包含了一个BackpressureStrategy媒体类,其包含5种类型:
4.2.1. ERROR
把上面例子改为ERROR类型,执行结果如下:

总结 :当被观察者发送事件大于128时,观察者抛出异常并终止接收事件,但不会影响被观察者继续发送事件。
4.2.2. BUFFER
把上面例子改为BUFFER类型,执行结果如下:

总结 :与Observable一样存在背压问题,但是接收性能比Observable低,因为BUFFER类型通过BufferAsyncEmitter添加了额外的逻辑处理,再发送至观察者。
4.2.3. DROP
把上面例子改为DROP类型,执行结果如下:

总结 :每当观察者接收128事件之后,就会丢弃部分事件。
4.2.4. LATEST
把上面例子改为LATEST类型,执行结果如下:

总结 :LATEST与DROP使用效果一样,但LATEST会保证能接收最后一个事件,而DROP则不会保证。
4.2.5. MISSING
把上面例子改为MISSING类型,执行结果如下:

总结 :MISSING就是没有采取背压策略的类型,效果跟Obserable一样。
在设置MISSING类型时,可以配合onBackPressure相关操作符使用,也可以到达上述其他类型的处理效果。
4.3 onBackPressure相关操作符
使用例子:
Flowable.interval(50,TimeUnit.MILLISECONDS)
.onBackpressureDrop()//效果与Drop类型一样
.subscribeOn(Schedulers.newThread()) .observeOn(Schedulers.newThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.e(TAG,"onNext : "+(aLong));
}
});
onBackpressureBuffer :与BUFFER类型一样效果。 onBackpressureDrop :与DROP类型一样效果。 onBackpressureLaster :与LASTER类型一样效果。
4.4 request()
4.4.1 request(int count):设置接收事件的数量.
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
for(int j = 0;j<50;j++){
e.onNext(j);
Log.i(TAG," 发送数据:"+j);
try{
Thread.sleep(50);
}catch (Exception ex){
}
}
}
},BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(10); //观察者设置接收事件的数量,如果不设置接收不到事件
}
@Override
public void onNext(Integer integer) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.e(TAG,"onNext : "+(integer));
}
@Override
public void onError(Throwable t) {
Log.e(TAG,"onError : "+t.toString());
}
@Override
public void onComplete() {
Log.e(TAG,"onComplete");
}
});

4.4.2 request扩展使用
request还可进行扩展使用,当遇到在接收事件时想追加接收数量(如:通信数据通过几次接收,验证准确性的应用场景),可以通过以下方式进行扩展:
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
for(int j = 0;j<50;j++){
e.onNext(j);
Log.i(TAG," 发送数据:"+j);
try{
Thread.sleep(50);
}catch (Exception ex){
}
}
}
},BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread()) .subscribe(new Subscriber<Integer>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
subscription = s;
s.request(10); //观察者设置接收事件的数量,如果不设置接收不到事件
}
@Override
public void onNext(Integer integer) {
if(integer==5){
subscription.request(3);
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.e(TAG,"onNext : "+(integer));
}
@Override
public void onError(Throwable t) {
Log.e(TAG,"onError : "+t.toString());
}
@Override
public void onComplete() {
Log.e(TAG,"onComplete");
}
});

总结:可以动态设置观察者接收事件的数量,但不影响被观察者继续发送事件。
4.5 requested
requested 与 request不是同一的函数,但它们都是属于FlowableEmitter类里的方法,那么requested()是有什么作用呢,看看以下例子:
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
for(int j = 0;j<15;j++){
e.onNext(j);
Log.i(TAG,e.requested()+" 发送数据:"+j);
try{
Thread.sleep(50);
}catch (Exception ex){
}
}
}
},BackpressureStrategy.BUFFER)//
.subscribeOn(Schedulers.newThread())//
.observeOn(Schedulers.newThread())
.subscribe(new Subscriber<Integer>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
subscription = s;
s.request(10); //观察者设置接收事件的数量,如果不设置接收不到事件
}
@Override
public void onNext(Integer integer) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.e(TAG,"onNext : "+(integer));
}
@Override
public void onError(Throwable t) {
Log.e(TAG,"onError : "+t.toString());
}
@Override
public void onComplete() {
Log.e(TAG,"onComplete");
}
});

从图中我们可以发现,requested打印的结果就是 剩余可接收的数量 ,它的作用就是可以检测剩余可接收的事件数量。
到此,Flowable
讲解完毕。
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK