反应式编程实战 - RxJava 2.x 基本模式
source link: http://blog.720ui.com/2018/rxjava2_02_observable/?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
RxJava 2.x 提供了五种模式,如下所示。
模式/类型 描述 接口 消费者 Observable 支持 0…N个数据,不支持背压 io.reactivex.Observable Observer Flowable 支持 0…N个数据 支持背压 io.reactivex.Flowable Subscriber Single 只支持1个数据 io.reactivex.Single SingleObserver Completable 不支持数据 io.reactivex.Completable CompletableObserver Maybe 只支持0或1个数据 io.reactivex.Maybe MaybeObserverObservable
创建 Observable 非常容易,我们首先需要创建一个 Observable 作为被观察者,然后在创建一个 Observer 作为观察者,然后通过 subscribe() 进行订阅。
public class ObservableDemo { public static void main(String[] args) { Observable<String> observable = Observable.create(emitter -> { emitter.onNext("Hello World"); emitter.onNext("Hello World"); emitter.onComplete(); emitter.onNext("Hello World"); }); Observer<String> observer = new Observer<String>() { @Override public void onSubscribe(Disposable d) { System.out.println("Observer.onSubscribe"); } @Override public void onNext(String s) { System.out.println("Observer.onNext: " + s); } @Override public void onError(Throwable e) { System.out.println("Observer.onError"); } @Override public void onComplete() { System.out.println("Observer.onComplete"); } }; observable.subscribe(observer); } }
我们可以使用 create 创建一个 Observable,它拥有 onNext, onError, onCompleted 方法。其中,onNext用于发射数据项,可以多次调用,每调用一次发射一条数据, onError 或 onCompleted 只能调用一次,onError发射错误事件,除非使用 retry() 操作符来截获错误,否则事件流通常会终止。onCompleted 传递一个完成事件,表示不会再发生onNext调用。两者之间互斥,此后就不能再调用该 Observable 的其他方法。
这里,我们也可以改造成链式调用。
public class ObservableDemo2 { public static void main(String[] args) { Observable.<String>create(emitter -> { emitter.onNext("Hello World"); emitter.onNext("Hello World"); emitter.onComplete(); emitter.onNext("Hello World"); }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { System.out.println("Observer.onSubscribe"); } @Override public void onNext(String s) { System.out.println("Observer.onNext: " + s); } @Override public void onError(Throwable e) { System.out.println("Observer.onError"); } @Override public void onComplete() { System.out.println("Observer.onComplete"); } }); } }
阅读 RxJava 2.x 源码 io.reactivex.Observable,我们可以知道 subscribe 具有很多重载的方法。有兴趣的读者,可以深入了解下。
我们可以省略 onComplete(),只实现 onNext() 和 onError()。这将不再对 onComplete() 执行任何操作。我们甚至可以忽略 onError(),只指定 onNext()。但是,不实现 onError() 是在生产环境中应该避免的事情。在事件流的任何地方发生的错误都将传播到 onError() 进行处理,然后终止事件流。如果我们没有为 onError() 指定一个操作,那么该错误将不会处理。当然,如果出现错误,我们可以先尝试使用 retry() 操作符恢复并重新订阅可观察到的数据项。
public final Disposable subscribe() public final Disposable subscribe(Consumer<? super T> onNext) public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) public final void subscribe(Observer<? super T> observer)
这里,我们简单来了解一下 subscribe(Consumer<? super T> onNext)
的使用吧。
public class ObservableDemo3 { public static void main(String[] args) { Observable.<String>create(emitter -> { emitter.onNext("Hello World"); emitter.onNext("Hello World"); emitter.onComplete(); emitter.onNext("Hello World"); }).subscribe(System.out::println); } }
注意, onNext, onError, onCompleted 方法不需要直接推送到最终的观察者,它们可以通过 map() 和 filter() 等操作符创建新的 Observable 然后继续发送。
Flowable 是唯一支持背压的模式,它的用法与 Observable 非常相似。(关于背压,笔者会在之后的文章中进行讲解。)
public class FlowableDemo { public static void main(String[] args) { Flowable.<String>create(e -> { e.onNext("Hello world!"); e.onNext("Hello World"); e.onComplete(); e.onNext("Hello World"); }, BackpressureStrategy.MISSING).subscribe(new Subscriber<String>(){ @Override public void onSubscribe(Subscription subscription) { System.out.println("Subscriber.onSubscribe"); } @Override public void onNext(String s) { System.out.println("Subscriber.onNext: " + s); } @Override public void onError(Throwable throwable) { System.out.println("Subscriber.onError"); } @Override public void onComplete() { System.out.println("Subscriber.onComplete"); } }); } }
阅读 RxJava 2.x 源码 io.reactivex.Flowable
,我们可以知道 subscribe 也具有很多重载的方法。
public final Disposable subscribe() public final Disposable subscribe(Consumer<? super T> onNext) public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Subscription> onSubscribe) public final void subscribe(FlowableSubscriber<? super T> s) public final void subscribe(Subscriber<? super T> s)
Single
Single 的工作就像 Observable 一样,但是它只有 onSuccess
?事件和 onError
事件,并且它有自己的 SingleObserver
接口。 onSuccess
整合了 onNext
和 onComplete
事件,因此,这里 onSuccess
只能发送一个数据,换句话说,即使多次发送也不会产生效果。
public class SingleDemo { public static void main(String[] args) { Single.<String>create(e -> { e.onSuccess("success"); e.onSuccess("success"); }).subscribe(new SingleObserver<String>(){ @Override public void onSubscribe(Disposable d) { System.out.println("SingleObserver.onSubscribe"); } @Override public void onSuccess(String s) { System.out.println("SingleObserver.onSuccess:"+s); } @Override public void onError(Throwable e) { System.out.println("SingleObserver.onError"); } }); } }
从控制台的打印结果可以看出,即使多次发送“success”,但是只会消费一次。
阅读 RxJava 2.x 源码 io.reactivex.Single
,我们可以知道 subscribe 也具有很多重载的方法。
public final Disposable subscribe() public final Disposable subscribe(final BiConsumer<? super T, ? super Throwable> onCallback) public final Disposable subscribe(Consumer<? super T> onSuccess) public final Disposable subscribe(final Consumer<? super T> onSuccess, final Consumer<? super Throwable> onError) public final void subscribe(SingleObserver<? super T> subscriber)
这里,我们简单来了解一下 subscribe(Consumer<? super T> onSuccess)
的使用吧。
public class SingleDemo2 { public static void main(String[] args) { Single.<String>create(e -> { e.onSuccess("success"); }).subscribe(System.out::println); } }
我们可以通过 toObservable 转换成一个 Observable 对象。
Single.just("success").toObservable().subscribe(System.out::println);
Completable
Completable 不发送数据,只有 onComplete
事件和 onError
事件。
public class CompletableDemo { public static void main(String[] args) { Completable.create(e -> { e.onComplete(); }) .subscribe(System.out::println); } }
此外,我们可以通过 complete()
快速创建一个 Completable 对象,它会立即调用 onComplete
事件。
Completable.complete().subscribe(System.out::println);
或者,也可以通过 fromAction()
或 fromRunnable()
在调用 onComplete
事件之前执行指定的操作。
Completable.fromAction(System.out::println).subscribe();
Maybe
Maybe 结合了 Single 和 Completable 特性。Maybe 包含 onSuccess
、 onError
、 onComplete
事件。 这里, onSuccess
可以发送 0 ~ 1 个数据,换句话说,即使多次发送也不会产生效果。如果调用 onComplete
事件,就会停止发送数据。
public class MaybeDemo { public static void main(String[] args) { Maybe.<String>create(e -> { e.onComplete(); e.onSuccess("success"); e.onSuccess("success"); }).subscribe(System.out::println); } }
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK