129

反应式编程实战 - RxJava 2.x 基本模式

 5 years ago
source link: http://blog.720ui.com/2018/rxjava2_02_observable/?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
mIvYfmi.png!web

RxJava 2.x 提供了五种模式,如下所示。

模式/类型 描述 接口 消费者 Observable 支持 0…N个数据,不支持背压 io.reactivex.Observable Observer Flowable 支持 0…N个数据 支持背压 io.reactivex.Flowable Subscriber Single 只支持1个数据 io.reactivex.Single SingleObserver Completable 不支持数据 io.reactivex.Completable CompletableObserver Maybe 只支持0或1个数据 io.reactivex.Maybe MaybeObserver

Observable

创建 Observable 非常容易,我们首先需要创建一个 Observable 作为被观察者,然后在创建一个 Observer 作为观察者,然后通过 subscribe() 进行订阅。

public class ObservableDemo {
    public static void main(String[] args) {
        Observable<String> observable = Observable.create(emitter -> {
            emitter.onNext("Hello World");
            emitter.onNext("Hello World");
            emitter.onComplete();
            emitter.onNext("Hello World");
        });

        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("Observer.onSubscribe");
            }
            @Override
            public void onNext(String s) {
                System.out.println("Observer.onNext: " + s);
            }
            @Override
            public void onError(Throwable e) {
                System.out.println("Observer.onError");
            }
            @Override
            public void onComplete() {
                System.out.println("Observer.onComplete");
            }
        };

        observable.subscribe(observer);
    }
}

我们可以使用 create 创建一个 Observable,它拥有 onNext, onError, onCompleted 方法。其中,onNext用于发射数据项,可以多次调用,每调用一次发射一条数据, onError 或 onCompleted 只能调用一次,onError发射错误事件,除非使用 retry() 操作符来截获错误,否则事件流通常会终止。onCompleted 传递一个完成事件,表示不会再发生onNext调用。两者之间互斥,此后就不能再调用该 Observable 的其他方法。

这里,我们也可以改造成链式调用。

public class ObservableDemo2 {
    public static void main(String[] args) {
        Observable.<String>create(emitter -> {
            emitter.onNext("Hello World");
            emitter.onNext("Hello World");
            emitter.onComplete();
            emitter.onNext("Hello World");
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("Observer.onSubscribe");
            }
            @Override
            public void onNext(String s) {
                System.out.println("Observer.onNext: " + s);
            }
            @Override
            public void onError(Throwable e) {
                System.out.println("Observer.onError");
            }
            @Override
            public void onComplete() {
                System.out.println("Observer.onComplete");
            }
        });
    }
}

阅读 RxJava 2.x 源码 io.reactivex.Observable,我们可以知道 subscribe 具有很多重载的方法。有兴趣的读者,可以深入了解下。

我们可以省略 onComplete(),只实现 onNext() 和 onError()。这将不再对 onComplete() 执行任何操作。我们甚至可以忽略 onError(),只指定 onNext()。但是,不实现 onError() 是在生产环境中应该避免的事情。在事件流的任何地方发生的错误都将传播到 onError() 进行处理,然后终止事件流。如果我们没有为 onError() 指定一个操作,那么该错误将不会处理。当然,如果出现错误,我们可以先尝试使用 retry() 操作符恢复并重新订阅可观察到的数据项。

public final Disposable subscribe()
public final Disposable subscribe(Consumer<? super T> onNext)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) 
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) 
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe)
public final void subscribe(Observer<? super T> observer)

这里,我们简单来了解一下 subscribe(Consumer&lt;? super T> onNext) 的使用吧。

public class ObservableDemo3 {
    public static void main(String[] args) {
        Observable.<String>create(emitter -> {
            emitter.onNext("Hello World");
            emitter.onNext("Hello World");
            emitter.onComplete();
            emitter.onNext("Hello World");
        }).subscribe(System.out::println);
    }
}

注意, onNext, onError, onCompleted 方法不需要直接推送到最终的观察者,它们可以通过 map() 和 filter() 等操作符创建新的 Observable 然后继续发送。

ZfAn2mu.png!web

Flowable 是唯一支持背压的模式,它的用法与 Observable 非常相似。(关于背压,笔者会在之后的文章中进行讲解。)

public class FlowableDemo {
    public static void main(String[] args) {
        Flowable.<String>create(e -> {
            e.onNext("Hello world!");
            e.onNext("Hello World");
            e.onComplete();
            e.onNext("Hello World");
        }, BackpressureStrategy.MISSING).subscribe(new Subscriber<String>(){
            @Override
            public void onSubscribe(Subscription subscription) {
                System.out.println("Subscriber.onSubscribe");
            }
            @Override
            public void onNext(String s) {
                System.out.println("Subscriber.onNext: " + s);
            }
            @Override
            public void onError(Throwable throwable) {
                System.out.println("Subscriber.onError");
            }
            @Override
            public void onComplete() {
                System.out.println("Subscriber.onComplete");
            }
        });
    }
}

阅读 RxJava 2.x 源码 io.reactivex.Flowable ,我们可以知道 subscribe 也具有很多重载的方法。

public final Disposable subscribe() 
public final Disposable subscribe(Consumer<? super T> onNext)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Subscription> onSubscribe)
public final void subscribe(FlowableSubscriber<? super T> s)
public final void subscribe(Subscriber<? super T> s)

Single

Single 的工作就像 Observable 一样,但是它只有 onSuccess ?事件和 onError 事件,并且它有自己的 SingleObserver 接口。 onSuccess 整合了 onNextonComplete 事件,因此,这里 onSuccess 只能发送一个数据,换句话说,即使多次发送也不会产生效果。

public class SingleDemo {
    public static void main(String[] args) {
        Single.<String>create(e -> {
            e.onSuccess("success");
            e.onSuccess("success");
        }).subscribe(new SingleObserver<String>(){
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("SingleObserver.onSubscribe");
            }
            @Override
            public void onSuccess(String s) {
                System.out.println("SingleObserver.onSuccess:"+s);
            }
            @Override
            public void onError(Throwable e) {
                System.out.println("SingleObserver.onError");
            }
        });
    }
}

从控制台的打印结果可以看出,即使多次发送“success”,但是只会消费一次。

阅读 RxJava 2.x 源码 io.reactivex.Single ,我们可以知道 subscribe 也具有很多重载的方法。

public final Disposable subscribe()
public final Disposable subscribe(final BiConsumer<? super T, ? super Throwable> onCallback)
public final Disposable subscribe(Consumer<? super T> onSuccess)
public final Disposable subscribe(final Consumer<? super T> onSuccess, final Consumer<? super Throwable> onError)
public final void subscribe(SingleObserver<? super T> subscriber)

这里,我们简单来了解一下 subscribe(Consumer&lt;? super T> onSuccess) 的使用吧。

public class SingleDemo2 {
    public static void main(String[] args) {
        Single.<String>create(e -> {
            e.onSuccess("success");
        }).subscribe(System.out::println);
    }
}

我们可以通过 toObservable 转换成一个 Observable 对象。

Single.just("success").toObservable().subscribe(System.out::println);

Completable

Completable 不发送数据,只有 onComplete 事件和 onError 事件。

public class CompletableDemo {
    public static void main(String[] args) {
        Completable.create(e -> {
            e.onComplete();
        })
        .subscribe(System.out::println);
    }
}

此外,我们可以通过 complete() 快速创建一个 Completable 对象,它会立即调用 onComplete 事件。

Completable.complete().subscribe(System.out::println);

或者,也可以通过 fromAction()fromRunnable() 在调用 onComplete 事件之前执行指定的操作。

Completable.fromAction(System.out::println).subscribe();

Maybe

Maybe 结合了 Single 和 Completable 特性。Maybe 包含 onSuccessonErroronComplete 事件。 这里, onSuccess 可以发送 0 ~ 1 个数据,换句话说,即使多次发送也不会产生效果。如果调用 onComplete 事件,就会停止发送数据。

public class MaybeDemo {
    public static void main(String[] args) {
        Maybe.<String>create(e -> {
            e.onComplete();
            e.onSuccess("success");
            e.onSuccess("success");
        }).subscribe(System.out::println);
    }
}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK