40

Android开发之从零开始学RxJava 2.x(一)认识Rxjava

 5 years ago
source link: https://blog.csdn.net/dmk877/article/details/80772129?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
欢迎转载,转载请注明出处:https://mp.csdn.net/mdeditor/80772129

落地98k,没有倍镜怪谁,让你扶我你却丢个手雷。

QrAN3eQ.jpg!web

哈哈,大家好,喜欢装逼的我又出现了,今天给大家带来的是RxJava的相关讨论,RxJava已经出来很久了,也是一直在用,但是总感觉没有完全掌握它,所以花了点时间也阅读了很多文章以及官方的文档,决定对其好好总结一番分享点有价值的文章,写一个系列,对于RxJava1.x就不做讨论了过去的就让它过去吧0.0,在写本博客时GitHub上最新的RxJava版本是2.1.16, 地址:https://github.com/ReactiveX/RxJava ,那么本系列也就是从0-1带着大家一起学习RxJava2.x的用法。如有疑问欢迎留言,如有谬误欢迎批评指正。

通过本篇博客你将学到以下知识点

①什么是RxJava

②RxJava的优势是什么

③RxJava如何使用

④主要用来做什么?

一、认识RxJava及RxJava的优势

首先来看第一个问题,RxJava到底是个什么东东?

官网给出的描述是这样的:RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.

翻译过来就是:RxJava是一个为使用可观测的序列来组成异步的、基于事件的程序而产生的库。说白了RxJava就是用来解决异步的,解决异步的方法有很多为什么RxJava这么流行呢?肯定它有它强于其它方法的优势。那咱们来看看它的优势是什么?

1.函数式风格:对可观察数据流使用无副作用的输入输出函数,避免了程序里错综复杂的状态

2.代码书写逻辑清晰:Rx的操作符通通常可以将复杂的难题简化为逻辑非常简单的代码,可读性非常强,随着程序逻辑的复杂,依然保持简洁,解耦了各个模块操作,单一化,不嵌套。

3.异步错误处理:传统的try/catch没办法处理异步计算,Rx提供了合适的错误处理机制

4.轻松使用并发:Rx的Observables和Schedulers让开发者可以摆脱底层的线程同步和各种并发问题

5.轻量级框架、支持Java 8 lambda、支持Java 6+和Android 2.3+

有没有感觉很厉害,有木有被RxJava的气场震慑到?

QRnMVru.jpg!web

二、RxJava的使用

清楚了RxJava的定义以及优点之后,接着我们来看下RxJava如何使用,对于RxJava的使用首先必须明白以下三个概念

1.Observable(被观察者)

2.Observer(观察者)

3.Subscribe(订阅)

为了更好的理解这三者之间的协作可以看下图

yUNVBnb.png!web

其中上游是Observable,下游是Observer,上游与下游建立连接是通过Subscribe方法建立连接的,这样理解是不是更好理解?

Vba6zmu.jpg!web

对于使用RxJava总共有三个步骤,可以参考上图

①创建上游的Observable

②创建下游的Observer

③建立连接

说了这么多,来搞一波代码尝尝咸淡

首先我们在Gradle中做如下配置

implementation 'io.reactivex.rxjava2:rxjava:2.1.16'

接着就可以肆无忌惮的装逼了,按照上述说的三个步骤代码如下

//创建上游的Observable
Observable observable=Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {

emitter.onNext("Hello");
emitter.onNext("Word");
emitter.onNext("!!!");

emitter.onComplete();

}
});

//创建下游的Observer
Observer observer=new Observer<String>(){

@Override
public void onSubscribe(Disposable d) {

Log.d(TAG,"onSubscribe");
}

@Override
public void onNext(String s) {

Log.d(TAG,"onNext:"+s);
}

@Override
public void onError(Throwable e) {

Log.d(TAG,"onError");
}

@Override
public void onComplete() {

Log.d(TAG,"onComplete");
}
};
//连接上游和下游
observable.subscribe(observer);

打印日志如下:

QFfqieN.png!web

我们来分析下执行的过程首先我们分别用代码创建了上游的Observable对象和下游的Observer对象,创建好这两个对象之后通过Observable对象的subscribe()方法将上游与下游建立了连接。建立连接成功之后会调用下游observer的onSubscribe回调走onSubscribe方法预示着上游与下游建立了连接,此时打印了“onSubscribe”,之后上游会依次发送三个事件,代码如下

emitter.onNext("Hello");
emitter.onNext("Word");
emitter.onNext("!!!");
emitter.onComplete();

因为上游与下游已经建立了连接所以下游会收到这三个事件,对应的会走下游observer的onNext方法的回调,所以会依次打印”Hello”、”Word”、”!!!”,之后上游发送发送emitter.onComplete();预示着上游发送事件结束,此时会调用下游的onComplete方法的回调,此时会打印onComplete。以上就是一个简单的RxJava的小案例。

当然对于刚接触到RxJava的同学可能还是有点陌生,那接下来咱们就来把这个案例理解透彻,首先来看看两个对象ObservableEmitter和Disposable这两个对象,其中Emitter是发射的意思,ObservableEmitter的源码实现中共有三个方法:

onNext:用来发送数据,可多次调用,每调用一次发送一条数据

onError:用来发送异常通知,只发送一次,若多次调用则第二次调用时报错

onComplete:用来发送完成通知,只发送一次,若多次调用只发送第一条

在一个正确运行的事件序列中,onCompleted() 和 onError() 必须唯一并且互斥,数据在发送时,出现异常可以调用onError发送异常通知也可以不调用,因为其所在的方法subscribe会抛出异常,若数据在全部发送完之后均正常可以调用onComplete发送完成通知;其中,onError与onComplete不做强制性调用,并且两者是事件序列中的最后一个。Observable可以发送无限个onNext, 观察者也可以接收无限个onNext。Observable发送了一个onComplete(或者onError)后,可以继续发送onComplete(或者onError)后续事件,但观察者收到onComplete(或者onError)后不再接收事件。Observable可以不发送onComplete或onError。

对象Observer中的三个方法(onNext,onError,onComplete)正好与Emitter中的三个方法相对应,分别对Emitter中对应方法的行为作出响应。

Emitter调用onNext发送数据时,Observer会通过onNext接收数据。

Emitter调用onError发送异常通知时,Observer会通过onError接收异常通知,此时不再接收上游发送的数据(此时上游是可以发送数据的)

Emitter调用onComplete发送完成通知时,Observer会通过onComplete接收完成通知,并且不再接收上游发送的数据(此时上游是可以发送数据的)

eUzayaV.png!web

1.中间发送一个oncomplete事件,原理图如下:

vymYjiu.png!web

代码:

//创建上游的Observable
Observable observable=Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {

Log.d(TAG,"上游 发射---->a");
emitter.onNext("发射---->a");
Log.d(TAG,"上游 发射---->b");
emitter.onNext("发射---->b");

emitter.onComplete();

Log.d(TAG,"上游 发射---->c");
emitter.onNext("发射---->c");
Log.d(TAG,"上游 发射---->d");
emitter.onNext("发射---->d");

}
});

//创建下游的Observer
Observer observer=new Observer<String>(){

@Override
public void onSubscribe(Disposable d) {

Log.d(TAG,"onSubscribe");
}

@Override
public void onNext(String s) {

Log.d(TAG,"下游onNext接收:"+s);
}

@Override
public void onError(Throwable e) {

Log.d(TAG,"onError");
}

@Override
public void onComplete() {

Log.d(TAG,"onComplete");
}
};
//连接上游和下游
observable.subscribe(observer);

打印日志如下:

nEvmMvn.png!web

从日志中可以看出在上游发送onComplete方法之后,上游还是正常的发送事件,但是下游却没有接收到上游发送的事件。

2.中间发送一个onerror,原理图如下:

QrYjIri.png!web

代码只需修改上游的发送事件的代码:

//创建上游的Observable
Observable observable=Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {

Log.d(TAG,"上游 发射---->a");
emitter.onNext("发射---->a");
Log.d(TAG,"上游 发射---->b");
emitter.onNext("发射---->b");

emitter.onError(new NullPointerException());

Log.d(TAG,"上游 发射---->c");
emitter.onNext("发射---->c");
Log.d(TAG,"上游 发射---->d");
emitter.onNext("发射---->d");

}
});

日志:

jIn2uye.png!web

从日志中可以看出在发送了onError事件后,上游是可以发送数据的,但是下游将不再接收上游发送的数据。

3.发送多个onError或onComple

前面我们说到onComplete和onError必须唯一并且互斥若发送多个onComplete是可以正常运行的, 依然是收到第一个onComplete就不再接收了, 但若是发送多个onError, 则收到第二个onError事件会导致程序会崩溃,这一点留给大家去验证。

了解完ObservableEmitter之后我们再来了解一下Disposable 这个对象,Disposable这个对象是用来切断上游与下游的连接的一个对象,切断之后上游可以继续发送事件,但是下游将不会再收到上游发送的事件,废话不多说咱们来看一段代码:

//创建上游的Observable
Observable observable=Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {

Log.d(TAG,"上游 发射---->a");
emitter.onNext("发射---->a");
Log.d(TAG,"上游 发射---->b");
emitter.onNext("发射---->b");
Log.d(TAG,"上游 发射---->c");
emitter.onNext("发射---->c");
Log.d(TAG,"上游 发射---->d");
emitter.onNext("发射---->d");

}
});

//创建下游的Observer
Observer observer=new Observer<String>(){

@Override
public void onSubscribe(Disposable d) {

disposable=d;
Log.d(TAG,"onSubscribe");
}

@Override
public void onNext(String s) {

if(s.equals("发射---->b")){

disposable.dispose();
}

Log.d(TAG,"下游onNext接收:"+s);
}

@Override
public void onError(Throwable e) {

Log.d(TAG,"onError");
}

@Override
public void onComplete() {

Log.d(TAG,"onComplete");
}
};
//连接上游和下游
observable.subscribe(observer);

日志:

BRjEruN.png!web

从日志中我们可以看出,在调用了dispose之后,上游的发送是没有受影响的,但是下游收不到上游发送的数据。

对于之前所述的代码写熟练之后就是经常说的链式调用

//RxJava链式调用
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {

}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onNext(String s) {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {

}
});

有木有感觉高大上?

四、真实项目中使用RxJava

到这里可能部分同学对于RxJava可以用在什么地方可能还不是特别清楚,其实RxJava的用处是非常非常大的,比如网络请求、读写文件等等,多处可以用到,因为本系列是从零开始学,所以这里我就先举个简单的例子比如网络请求RxJava可以与RxAndroid结合在一起使用对线程进行控制。代码如下

//RxJava真实开发中的作用
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
//通过设置此方法的回调运行在子线程中,可以进行网络请求等一些耗时的操作
//比如请求网络拿到数据通过调用emitter.onNext(response);将请求的数据发送到下游
}
}).subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
//通过设置Observer运行在主线程,拿到网络请求的数据进行解析使用
@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onNext(String s) {
//在此接收上游异步获取的数据,比如网络请求过来的数据进行处理
}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {

}
});

对于这个使用不懂没关系先有个大致的印象RxJava到底可以用在什么地方,后续会有一系列关于RxJava的文章来讲解具体的用法,记得关注和点赞,好了,相信通过本篇博客大家会对RxJava有了一定的认识。锁定本台敬请关注接下来的系列文章。

欢迎转载,转载请注明出处:https://mp.csdn.net/mdeditor/80772129


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK