入坑rxjava:rxjava2源码剖析(一)

入坑rxjava系列第三章

前言

通过下面例子,我们将开始对rxjava进行源码剖析:

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
      @Override
      public void subscribe(ObservableEmitter<String> e) throws Exception {
        e.onNext("onNext");
        e.onComplete();
      }
    });
    Observer observer = new Observer<String>() {
      @Override
      public void onSubscribe(Disposable d) {
        Log.d(TAG, "onSubscribe:" + d.isDisposed());
      }

      @Override
      public void onNext(String o) {
        Log.d(TAG, "onNext:" + o);
      }

      @Override
      public void onError(Throwable e) {
        Log.d(TAG, "onError:" + e.getMessage());
      }

      @Override
      public void onComplete() {
        Log.d(TAG, "onComplete:");
      }
    };
    observable.subscribe(observer);

让我们带着以下疑问出发:

  1. observable如何生产事件
  2. observable何时生产事件
  3. observer如何接收上游事件
  4. observableobserver的关联

Observable

首先看看入口类observable:

public abstract class Observable<T> implements ObservableSource<T> {
    @Override
    protected abstract void subscribeActual(Observer observer) ;

    @Override  //实现了ObservableSource的方法
    public final void subscribe(Observer<? super T> observer) {
        //省略一堆判空等处理 
        subscribeActual(observer);
}

public interface ObservableSource<T> {
    void subscribe(@NonNull Observer<? super T> observer);
}

由此可以看出被观察者在被订阅后,通过实现observable中的subscribeActual来完成具体事件的生产。

Observable.create

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        //RxJavaPlugins.onAssembly,提供了一系列hook的方法,没配置的时候默认直接返回对象,这里不做深入探讨
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

ObservableOnSubscribe:

public interface ObservableOnSubscribe<T> {
    void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}

ObservableEmitter:

public interface ObservableEmitter<T> extends Emitter<T> {

    void setDisposable(@Nullable Disposable d);

    void setCancellable(@Nullable Cancellable c);

    boolean isDisposed();

    @NonNull
    ObservableEmitter<T> serialize();
}

public interface Emitter<T> {

    void onNext(@NonNull T value);

    void onError(@NonNull Throwable error);

    void onComplete();
}

通过接口ObservableOnSubscribe将被订阅事件的具体事件传入,参数ObservableEmitter则是用于建立被观察者观察者之间的桥梁,里面描述了这个事件进行状态的结果。

ObservableCreate:


...

   public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //发射器
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //将发射器回调给观察者
        observer.onSubscribe(parent);

        try {
            //调用事件远的subscribe方法生产事件,同时将发射器传给事件源
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

...

ObservableCreate继承于Observable,是方法subscribeActual的具体实现。由上面我们可以看出,事件源只有在observable.subscribe(observer)被执行的时候才会生产事件,所以observable被创建出来的时候并不生产事件,也不发射事件。

CreateEmitter:

static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

        ...

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
            } else {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

        @Override
        public void setDisposable(Disposable d) {
            DisposableHelper.set(this, d);
        }

        @Override
        public void setCancellable(Cancellable c) {
            setDisposable(new CancellableDisposable(c));
        }

        @Override
        public ObservableEmitter<T> serialize() {
            return new SerializedEmitter<T>(this);
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
    }

CreateEmitter从名字就可以看出这个是发射器,用于将上游生产的事件发射到下游。实现了接口ObservableEmitterDisposable,而Disposable是用于判断事件是否中断发射。

总结

  1. Observable.create创建事件源后,并不会生产也不会发射事件
  2. CreateEmitter中,只有observableobserver没被dispose才会进行方法onXXX()的回调,并且方法onComplete()onError() 互斥只能执行一次,在执行任意一个后都会自动dispose()
  3. 当调用observable.subscribe(observer)进行订阅后,会调用Observable实现类中的subscribeActual方法,此时事件源才会生产事件,并且通过创建发射器将时间发射给下游的observer进行消费从而完成整个事件流的处理。

版权声明:

本文标题:入坑rxjava:rxjava2源码剖析(一)

作者:Rabtman

原始链接:https://blog.rabtman.com/2017/07/15/zero_rx_source_1/

本文采用署名-非商业性使用-禁止演绎4.0进行许可。

非商业转载请保留以上信息。商业转载请联系作者本人。