入坑rxjava:rxjava2源码剖析(三)

入坑rxjava系列第五章

rxjava线程调度

关于rxjava的线程调度,我们首先从最常用的代码看起:

Observable.create((ObservableOnSubscribe<Integer>) e -> {
            e.onNext(1);
            e.onNext(2);
            e.onComplete();
        }).subscribeOn(Schedulers.io())
          .observeOn(AndroidSchedulers.mainThread())
          .subscribe(i -> System.out.println("onNext : " + i));

subscribeOn()

public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

创建了ObservableSubscribeOn对象:

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
}

与前面文章分析的一样,都是将传入对象采用装饰模式进行包装,我们可以看到具体实现方法subscribeActual先是创建了SubscribeOnObserver对象对Observer进行一次wrapper操作,然后就将其回调出去,所以接下来下游的onSubscribe所在线程将与其无关,最后通过执行parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));进行线程的调度:

public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        //这里创建的worker即一开始由Schdulers.io()所提供的
        final Worker w = createWorker();
        //hook相关
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);
       //执行任务
        w.schedule(task, delay, unit);

        return task;
}

createWorkerScheduler的抽象方法,由于我们调用的是subscribeOn(Schedulers.io()),所以具体实现代码如下:

public Worker createWorker() {
        return new EventLoopWorker(pool.get());
}

static final class EventLoopWorker extends Scheduler.Worker {
        private final CompositeDisposable tasks;
        private final CachedWorkerPool pool;
        private final ThreadWorker threadWorker;

        final AtomicBoolean once = new AtomicBoolean();

        EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.tasks = new CompositeDisposable();
            this.threadWorker = pool.get();
        }

        @Override
        public void dispose() {
            if (once.compareAndSet(false, true)) {
                tasks.dispose();

                // releasing the pool should be the last action
                pool.release(threadWorker);
            }
        }

        @Override
        public boolean isDisposed() {
            return once.get();
        }

        @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }

            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
}

具体执行了threadWorker.scheduleActual:

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }

        Future<?> f;
        try {
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }

        return sr;
    }

其实就是到线程池当中执行异步操作。

具体要执行的任务正是SubscribeTask这个实现了Runnable的类:

final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }
  • parent是通过SubscribeOnObserver包装后的下游的Observer
  • source是通过ObservableSubscribeOn包装后的上游的Observable
    而这里在run()方法就是简单的下游订阅了上游,然后触发了事件分发。

多次subscribeOn()

由于订阅的顺序是下游订阅上游,那么当最上游被订阅时,所在的线程必须是距离上游最近的subscribeOn()所提供的线程,所以即使代码中多次调用subscribeOn()最终Observable总是在第一个subscribeOn()所在的线程中执行。

observeOn()

public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
}

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

ObservableObserveOn

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
}

还是那个熟悉的配方,一套包装下来与subscribeOn()最大的不同在于它并不是立刻开始执行,而是实例化了ObserveOnObserver对象再次进行包装:

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {

       ...

        //队列
        SimpleQueue<T> queue;

        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.actual = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }

       @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            //如果上一级不是一场,则将数据存入队列
            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            } 
            //对应线程取出数据并交由下游的Observer
            schedule();
        }

        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }

        ...

}

onNext()中通过schedule()方法进行调度,具体实现我们看run()方法:

@Override
public void run() {
      //outputFused一般是false
    if (outputFused) {
        drainFused();
    } else {
        drainNormal();
    }
}

drainNormal()

void drainNormal() {
            int missed = 1;

            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = actual;

            for (;;) {
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }

                for (;;) {
                    boolean d = done;
                    T v;

                    try {
                        v = q.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        s.dispose();
                        q.clear();
                        a.onError(ex);
                        worker.dispose();
                        return;
                    }
                    boolean empty = v == null;

                    if (checkTerminated(d, empty, a)) {
                        return;
                    }

                    if (empty) {
                        break;
                    }

                    a.onNext(v);
                }

                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }

方法checkTerminated()是检查任务是否终止,主要做如下判断:

  1. 订阅是否被取消 cancelled==true
  2. 订阅是否调度完毕 done==true,并且onErroronComplete被调用

这里就是不断从队列中取出数据,然后分发给下游Observer

多次observerOn()

不同于subscribeOn()observerOn()每次操作都将切换到对应的线程然后交由下游的Observer来处理,所以如果进行多次调用,则线程的调度会持续到下一个observerOn()之前。

总结

subscribeOn()

  • 订阅顺序从下到上,上游被订阅时,先切换线程然后立刻执行任务。
  • 当存在多个subscribeOn()方法时,仅第一个有效。

observerOn()

  • 订阅顺序从下到上,上游被订阅时,会将对应的worker创建并进行一次包装储存起来,不会立刻切换线程。
  • 当数据从上游发送过来时,数据将储存在队列中,然后切换线程,在新线程中将数据发送给下游的Observer
  • 当存在多个observerOn()方法时,线程的调度仅持续到下一个observerOn()之前。

版权声明:

本文标题:入坑rxjava:rxjava2源码剖析(三)

作者:Rabtman

原始链接:https://blog.rabtman.com/2017/09/10/zero_rx_source_3/

本文采用署名-非商业性使用-禁止演绎4.0进行许可。

非商业转载请保留以上信息。商业转载请联系作者本人。