47

RxJava 2.x 源码分析

 5 years ago
source link: https://ljuns.itscoder.com/2018/05/31/RxJava 2.x源码分析/?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

本次分析的 RxJava 版本信息如下:

implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
implementation 'io.reactivex.rxjava2:rxjava:2.1.6'

先来个简单例子:

// 创建被观察者
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e)throws Exception {
        e.onNext(1);
        e.onError(new Throwable("error"));
        e.onComplete();
    }
});
// 创建观察者
Observer<Integer> observer = new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d){
        Log.d(TAG, "onSubscribe: ");
    }
    @Override
    public void onNext(Integer integer){
        Log.d(TAG, "onNext: " + integer);
    }
    @Override
    public void onError(Throwable e){
        Log.d(TAG, "onError: " + e.getMessage());
    }
    @Override
    public void onComplete(){
        Log.d(TAG, "onComplete: ");
    }
};
// 订阅
observable.subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(observer);

以下的分析都基于这个例子,分析过程会分为三部分:

  • subscribe() 流程
  • subscribeOn() 流程
  • observeOn() 流程

subscribe()

把 subscribe() 放在前面是因为后续的 subscribeOn() 和 observeOn() 流程都它的支撑,但是单纯的分析 subscribe() 流程没多大意义,所以这个流程基于 observable.subscribe(observer)

create()

首先是被观察者的创建过程,即 Observable.create(new ObservableOnSubscribe<Integer>() {...}) 。ObservableOnSubscribe 是个接口,里面只有一个 subscribe() 方法,所以重点在 create() 方法:

public static <T> Observable<T>create(ObservableOnSubscribe<T> source){
    // 检查是否为 null, 如果 null 就抛出 NullPointerException
    ObjectHelper.requireNonNull(source, "source is null");
    // onAssembly() 是个 hook 操作,我也不知道 hook 是啥,反正直接返回传入值
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

看来 ObservableCreate 才是目标:

public final class ObservableCreate<T>extends Observable<T>{
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source){
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<?super T> observer){
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    // 暂时省略了两个内部类
}

很简单,只有两个方法,并且把 ObservableOnSubscribe 对象缓存为 source。

到这里 create() 方法算是执行完了,一句话总结: create() 方法实际上创建了一个 ObservableCreate 对象,并且持有了 ObservableOnSubscribe 对象的引用。

subscribe()

现在看看订阅方法 subscribe(),它在 Observable 抽象类中:

public final void subscribe(Observer<?super T> observer){
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        // 这也是一个 hook
        observer = RxJavaPlugins.onSubscribe(this, observer);
        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);
        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}

protected abstract void subscribeActual(Observer<?super T> observer);

subscribe() 主要就是调用了抽象方法 subscribeActual(),根据刚刚在 create() 方法的分析,这里调用到了 ObservableCreate 的 subscribeActual() 方法:

@Override
protected void subscribeActual(Observer<?super T> observer){
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);

    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

先是通过 observer 创建出一个 CreateEmitter 对象,CreateEmitter 是 ObservableCreate 其中一个内部类,主要功能就是对 Observer 的四个方法(onSubscribe()、onNext()、onError()、onComplete())进行了包装,并且提供了 dispose 系列方法:

static final class CreateEmitter<T>extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

        private static final long serialVersionUID = -3434801548987643227L;
        final Observer<? super T> observer;
        
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }
        
        @Override
        public void onNext(T t){
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t){
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }
        
        @Override
        public boolean tryOnError(Throwable t){
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }
        
        @Override
        public void onComplete(){
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }
        
        @Override
        public void setDisposable(Disposable d){
            DisposableHelper.set(this, d);
        }
        
        @Override
        public void setCancellable(Cancellable c){
            setDisposable(new CancellableDisposable(c));
        }
        
        @Override
        public ObservableEmitter<T> serialize(){
            return new SerializedEmitter<T>(this);
        }
        
        @Override
        public void dispose(){
            DisposableHelper.dispose(this);
        }
        
        @Override
        public boolean isDisposed(){
            return DisposableHelper.isDisposed(get());
        }
    }

回到 subscribeActual() 方法,然后执行了 onSubscribe() 方法,并把 CreateEmitter 对象引用传递出去,这时外部的 onSubscribe() 就得到了执行。

接下来执行 source.subscribe(parent) ,这个 source 就是 Observable.create() 方法传递的参数,所以这时就到了发送事件的地方。

调用 e.onNext()、e.onComplete()、e.onError() 其实都是调用了 CreateEmitter 中对应的方法,根据上面提供的 CreateEmitter 类源码可知最终调用的都是创建 Observer 时实现的方法。

observable.subscribe(observer) 流程算是完了,来个图总结下这个流程:

yMbAfiE.jpg!web

subscribeOn()

subscribeOn() 指定 Observable 在哪个调度器上执行,以下流程基于 subscribeOn(Schedulers.io()).subscribe()

Schedulers.io()

Schedulers 的源码并不多,这里只留下和 Schedulers.io() 相关的代码:

public final class Schedulers{
    ...
    @NonNull
    static final Scheduler IO;
    
    ...
    static final class IoHolder{
        static final Scheduler DEFAULT = new IoScheduler();
    }

    static {
        IO = RxJavaPlugins.initIoScheduler(new IOTask());
        ...
    }
    ...
    @NonNull
    public static Scheduler io(){
        // hook
        return RxJavaPlugins.onIoScheduler(IO);
    }
	...
    static final class IOTaskimplements Callable<Scheduler>{
        @Override
        public Scheduler call()throws Exception {
            return IoHolder.DEFAULT;
        }
    }
	...
}

/**
 * 下面是 RxJavaPlugins 类其中两个方法
 */
public static Scheduler initIoScheduler(@NonNull Callable<Scheduler> defaultScheduler){
    ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
    Function<? super Callable<Scheduler>, ? extends Scheduler> f = onInitIoHandler;
    if (f == null) {
        return callRequireNonNull(defaultScheduler);
    }
    return applyRequireNonNull(f, defaultScheduler);
}

static Scheduler callRequireNonNull(@NonNull Callable<Scheduler> s){
    try {
        return ObjectHelper.requireNonNull(s.call(), "Scheduler Callable result can't be null");
    } catch (Throwable ex) {
        throw ExceptionHelper.wrapOrThrow(ex);
    }
}

Schedulers.io() 相关源码都放在一起了,这样更清晰。

所以 Schedulers.io() 就是创建了一个 IoScheduler 对象

subscribeOn()

public final Observable<T> subscribeOn(Scheduler scheduler){
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    // onAssembly() 是个 hook
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

subscribeOn() 方法主要是创建了一个 ObservableSubscribeOn 对象,需要两个参数:

Schedulers.io()

ObservableSubscribeOn 类和 ObservableCreate 很相似,都是只有两个方法:

public final class ObservableSubscribeOn<T>extends AbstractObservableWithUpstream<T,T>{
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler){
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s){
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
	// 暂时省略了两个内部类
}

调用了父类构造方法缓存 source ,还缓存了 scheduler。

小结:subscribeOn(Schedulers.io()) 过程就是创建了一个 ObservableSubscribeOn 对象,并且缓存了 ObservableCreate 和 IoScheduler 两个对象。

subscribe()

在 subscribe() 流程部分已经分析过 subscribe() 的内部执行逻辑了,但是这次的调用对象变成了 ObservableSubscribeOn,所以变成了调用 ObservableSubscribeOn 的 subscribeActual() 方法。

这次单独把 subscribeActual() 方法拿出来:

@Override
public void subscribeActual(final Observer<? super T> s){
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

    s.onSubscribe(parent);
	// 重中之重
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

首先把 Observer 对象包装成 SubscribeOnObserver, SubscribeOnObserver 类是 ObservableSubscribeOn 的其中一个内部类:

static final class SubscribeOnObserver<T>extends AtomicReference<Disposable>implements Observer<T>,Disposable{
    private static final long serialVersionUID = 8094547886072529208L;
    final Observer<? super T> actual;

    final AtomicReference<Disposable> s;

    SubscribeOnObserver(Observer<? super T> actual) {
        this.actual = actual;
        this.s = new AtomicReference<Disposable>();
    }

    @Override
    public void onSubscribe(Disposable s){
        DisposableHelper.setOnce(this.s, s);
    }

    @Override
    public void onNext(T t){
        actual.onNext(t);
    }

    @Override
    public void onError(Throwable t){
        actual.onError(t);
    }

    @Override
    public void onComplete(){
        actual.onComplete();
    }

    @Override
    public void dispose(){
        DisposableHelper.dispose(s);
        DisposableHelper.dispose(this);
    }

    @Override
    public boolean isDisposed(){
        return DisposableHelper.isDisposed(get());
    }

    void setDisposable(Disposable d){
        DisposableHelper.setOnce(this, d);
    }
}

回到 subscribeActual() 方法,执行 s.onSubscribe(parent) 也就是调用了 Observer 的 onSubscribe() 方法,到现在还没有涉及到线程的东西, 所以 onSubscribe() 方法是在主线程回调的。

再看 parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); ,这里嵌套了 3 层,一层一层来:

new SubscribeTask(parent)

SubscribeTask 实现了 Runnable 接口,是 ObservableSubscribeOn 另一个内部类:

final class SubscribeTaskimplements Runnable{
    private final SubscribeOnObserver<T> parent;

    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }

    @Override
    public void run(){
        source.subscribe(parent);
    }
}

SubscribeTask 目前缓存了 Observer 的包装类 SubscribeOnObserver。

scheduler.scheduleDirect(new SubscribeTask(parent))

先调用 Scheduler 的 scheduleDirect() 方法:

public Disposable scheduleDirect(@NonNull Runnable run){
    return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}

public Disposable scheduleDirect(@NonNull Runnable run,long delay, @NonNull TimeUnit unit){
    final Worker w = createWorker();
	// onSchedule(run) 直接返回了 run
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    DisposeTask task = new DisposeTask(decoratedRun, w);

    w.schedule(task, delay, unit);

    return task;
}

createWorker() 是个抽象方法,根据前面的分析,此处调用了 IoScheduler 的 createWorker() 方法,而 createWorker() 方法只是返回了一个 IoScheduler 的内部类 EventLoopWorker 对象。

DisposeTask 实现了 Runnable 和 Disposable 两个接口,并且在 run() 方法中调用了 decoratedRun.run()

所以 w.schedule(task, delay, unit) 才是一切的开始,别忘了此处 w 是 IoScheduler 的内部类 EventLoopWorker 对象。EventLoopWorker 的 schedule() 方法又调用了 ThreadWorker 的 scheduleActual() 方法,而 ThreadWorker 是继承自 NewThreadWorker 的。

在 NewThreadWorker 中创建了一个线程池,并且缓存为 executor:

executor = SchedulerPoolFactory.create(threadFactory);

public static ScheduledExecutorService create(ThreadFactory factory){
    final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
    if (PURGE_ENABLED && exec instanceof ScheduledThreadPoolExecutor) {
        ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
        POOLS.put(e, exec);
    }
    return exec;
}

所以接下来看看 NewThreadWorker 的 scheduleActual() 方法:

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent){
    // 直接返回 run
    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

    if (parent != null) {
        if (!parent.add(sr)) {
            return sr;
        }
    }

    Future<?> f;
    try {
        if (delayTime <= 0) {
            f = executor.submit((Callable<Object>)sr);
        } else {
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }
        sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
        if (parent != null) {
            parent.remove(sr);
        }
        RxJavaPlugins.onError(ex);
    }

    return sr;
}

在上述代码第 16 行,把 sr 放到了线程池中执行,所以从这里开始就是在子线程中执行的操作。这个 sr 就是在第 5 行声明的 ScheduledRunnable 对象,ScheduledRunnable 实现了 Runnable 和 Callable 接口,所以根据线程池的尿性,最后肯定是执行 ScheduledRunnable 的 call() 方法:

@Override
public Object call(){
    // Being Callable saves an allocation in ThreadPoolExecutor
    run();
    return null;
}

@Override
public void run(){
    lazySet(THREAD_INDEX, Thread.currentThread());
    try {
        try {
            actual.run();
        } catch (Throwable e) {
            // Exceptions.throwIfFatal(e); nowhere to go
            RxJavaPlugins.onError(e);
        }
    } finally {
        ...
    }
}

此处的 actual 就是前面传递过来的 DisposeTask 对象,在 DisposeTask 的 run() 方法又调用了 decoratedRun.run() ,而 decoratedRun 又是个 SubscribeTask 对象,所以又到了 SubscribeTask 的 run() 方法执行 source.subscribe(parent)

由于 SubscribeTask 是 ObservableSubscribeOn 的内部类,所以此处的 source 是个 ObservableCreate 对象,这样就回到了第一部分:subscribe() 流程,但是不一样的是这次的执行都是在线程池中执行的。

subscribeOn(Schedulers.io()).subscribe() 流程也执行完了,上个图压压惊:

7JfUZjM.jpg!web

observeOn()

observeOn() 指定一个观察者在哪个调度器上观察这个 Observable,以下流程基于 observable.observeOn(AndroidSchedulers.mainThread()).subscribe()

AndroidSchedulers.mainThread()

AndroidSchedulers 的源码也不多,把 AndroidSchedulers 中涉及到 RxJavaPlugins 的两个方法也放在了一起,清晰:

public final class AndroidSchedulers{

    private static final class MainHolder{

        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call()throws Exception {
                    return MainHolder.DEFAULT;
                }
            });

    /** A {@linkScheduler} which executes actions on the Android main thread. */
    public static Scheduler mainThread(){
        // 直接返回 MAIN_THREAD
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }

    /** A {@linkScheduler} which executes actions on {@codelooper}. */
    public static Scheduler from(Looper looper){
        if (looper == null) throw new NullPointerException("looper == null");
        return new HandlerScheduler(new Handler(looper));
    }

    private AndroidSchedulers(){
        throw new AssertionError("No instances.");
    }
}

/**
 * 下面是 RxJavaPlugins 类其中两个方法
 */
public static Scheduler initMainThreadScheduler(Callable<Scheduler> scheduler){
    if (scheduler == null) {
        throw new NullPointerException("scheduler == null");
    }
    Function<Callable<Scheduler>, Scheduler> f = onInitMainThreadHandler;
    if (f == null) {
        return callRequireNonNull(scheduler);
    }
    return applyRequireNonNull(f, scheduler);
}

static Scheduler callRequireNonNull(Callable<Scheduler> s){
    try {
        Scheduler scheduler = s.call();
        if (scheduler == null) {
            throw new NullPointerException("Scheduler Callable returned null");
        }
        return scheduler;
    } catch (Throwable ex) {
        throw Exceptions.propagate(ex);
    }
}

所以 AndroidSchedulers.mainThread() 就是创建了一个 HandlerScheduler 对象,这个 HandlerScheduler 里面缓存了一个用 MainLooper 构造的 Handler 对象。

observeOn()

public final Observable<T> observeOn(Scheduler scheduler){
    return observeOn(scheduler, false, bufferSize());
}

public final Observable<T> observeOn(Scheduler scheduler,boolean delayError, int bufferSize){
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    // onAssembly() 是个 hook
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

observeOn() 方法主要是创建了一个 ObservableObserveOn 对象,传入了四个参数,目前只需关注前面两个即可:

AndroidSchedulers.mainThread()

ObservableObserveOn 类也比较简单,调用了父类构造方法缓存 source ,还缓存了 scheduler:

public final class ObservableObserveOn<T>extends AbstractObservableWithUpstream<T,T>{
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler,boolean delayError, int bufferSize){
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<?super T> observer){
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
	// 暂时省略内部类
}

小结:observeOn(AndroidSchedulers.mainThread()) 过程就是创建了一个 ObservableObserveOn 对象,并且缓存了 ObservableCreate 和 HandlerScheduler 两个对象。

subscribe()

在 subscribe() 流程部分和 subscribeOn() 流程部分都已经分析过 subscribe() 的内部执行逻辑了,但是这次的调用对象变成了 ObservableObserveOn,所以变成了调用 ObservableObserveOn 的 subscribeActual() 方法。

这次单独把 subscribeActual() 方法拿出来:

@Override
protected void subscribeActual(Observer<?super T> observer){
    // 判断是否当前线程,忽略
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();

        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

直接看 else 部分,createWorker() 是个抽象方法,根据前面的分析,此处调用了 HandlerScheduler 的 createWorker() 方法,只是返回了一个 HandlerScheduler 的内部类 HandlerWorker 对象。

ObserveOnObserver 是 ObservableObserveOn 的内部类,实现了 Observer 和 Runnable 接口,这里只看一下它的构造方法:

ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
    this.actual = actual;
    this.worker = worker;
    this.delayError = delayError;
    this.bufferSize = bufferSize;
}

接着看 source.subscribe() ,此处的 source 是个 ObservableCreate 对象,所以再来看看 ObservableCreate 的 subscribe() 方法:

@Override
protected void subscribeActual(Observer<?super T> observer){
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);

    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

这个方法其实前面已经了解过了,但是由于调用对象的不同导致里面的执行逻辑也不同。

这里的 observer 是经过前面封装的 ObserveOnObserver 对象,CreateEmitter 和前面是一样的,主要功能就是对 Observer 的四个方法(onSubscribe()、onNext()、onError()、onComplete())进行了包装,并且提供了 dispose 系列方法。

执行 observer.onSubscribe(parent) 就去到了 ObserveOnObserver 的 onSubscribe() 方法:

@Override
public void onSubscribe(Disposable s){
    if (DisposableHelper.validate(this.s, s)) {
        this.s = s;
        if (s instanceof QueueDisposable) {
            @SuppressWarnings("unchecked")
            QueueDisposable<T> qd = (QueueDisposable<T>) s;
            int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

            if (m == QueueDisposable.SYNC) {
                sourceMode = m;
                queue = qd;
                done = true;
                actual.onSubscribe(this);
                schedule();
                return;
            }
            if (m == QueueDisposable.ASYNC) {
                sourceMode = m;
                queue = qd;
                actual.onSubscribe(this);
                return;
            }
        }
        queue = new SpscLinkedArrayQueue<T>(bufferSize);

        actual.onSubscribe(this);
    }
}

里面都会执行到 actual.onSubscribe(this) ,根据前面提供的 ObserveOnObserver 构造方法可知这个 actual 就是外部传递的 observer,所以此时外部的 onSubscribe() 方法被回调。

回到 subscribeActual() 方法看 source.subscribe(parent) ,这个 source 就是 Observable.create() 方法传递的参数,所以这时就到了发送事件的地方。

调用 e.onNext()、e.onComplete()、e.onError() 其实都是调用了 CreateEmitter 中对应的方法,根据上面分析可知最终调用的都是 ObserveOnObserver 中对应的方法:

public void onNext(T t){
    if (done) {
        return;
    }

    if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t);
    }
    schedule();
}

@Override
public void onError(Throwable t){
    if (done) {
        RxJavaPlugins.onError(t);
        return;
    }
    error = t;
    done = true;
    schedule();
}

@Override
public void onComplete(){
    if (done) {
        return;
    }
    done = true;
    schedule();
}

void schedule(){
    if (getAndIncrement() == 0) {
        worker.schedule(this);
    }
}

在 ObserveOnObserver 中对应的方法又都调用了 schedule() 方法,然后在调用了 worker.schedule(this)

还记得这个 worker 是谁么?是个 HandlerScheduler 对象,所以看一下它的 schedule() 方法:

@Override
public Disposable schedule(Runnable run,long delay, TimeUnit unit){
    if (run == null) throw new NullPointerException("run == null");
    if (unit == null) throw new NullPointerException("unit == null");

    if (disposed) {
        return Disposables.disposed();
    }
	// 直接返回 run
    run = RxJavaPlugins.onSchedule(run);

    ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

    Message message = Message.obtain(handler, scheduled);
    message.obj = this; // Used as token for batch disposal of this worker's runnables.
    handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

    // Re-check disposed state for removing in case we were racing a call to dispose().
    if (disposed) {
        handler.removeCallbacks(scheduled);
        return Disposables.disposed();
    }

    return scheduled;
}

ScheduledRunnable 是 HandlerScheduler 的内部类,实现了 Runnable 和 Disposable 接口。

下面就很熟悉了,通过 Handler 来发送消息,这个 Handler 是 AndroidSchedulers.mainThread() 中构建的,所以是运行在主线程的。

由于构建 Message 传递了 ScheduledRunnable 对象,所以最后回到了 ScheduledRunnable 的 run() 方法,这样就切换到了主线程:

@Override
public void run(){
    try {
        delegate.run();
    } catch (Throwable t) {
        IllegalStateException ie =
            new IllegalStateException("Fatal Exception thrown on Scheduler.", t);
        RxJavaPlugins.onError(ie);
        Thread thread = Thread.currentThread();
        thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
    }
}

这个 delegate 其实就是创建 ScheduledRunnable 传递进来的 run,也就是 ObserveOnObserver(别忘了 ObserveOnObserver 实现了 Runnable 接口),所以就跳转到了 ObserveOnObserver 的 run() 方法:

@Override
public void run(){
    if (outputFused) {
        drainFused();
    } else {
        drainNormal();
    }
}

drainFused()drainNormal() 里面就是关于 onNext()、onComplete()、onError() 的回调。

到这里 observable.observeOn(AndroidSchedulers.mainThread()).subscribe() 流程也执行完了,再来个图:

ii67fim.jpg!web


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK