入坑rxjava:rxjava2源码剖析(二)

入坑rxjava系列第四章

Map

首先从map()方法出发:

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

其中传入了Function对象作为参数:

public interface Function<T, R> {
    @NonNull
    R apply(@NonNull T t) throws Exception;
}

作用主要就是接受一个值(T)然后返回另一个值(R)

再来创建了一个ObservableMap对象:

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {

    ...

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }


    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }

            U v;

            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            actual.onNext(v);
        }

        @Override
        public int requestFusion(int mode) {
            return transitiveBoundaryFusion(mode);
        }

        @Nullable
        @Override
        public U poll() throws Exception {
            T t = qs.poll();
            return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
        }
    }
}

它继承自AbstractObservableWithUpstream,而AbstractObservableWithUpstream又继承自Observable,所以这个对象其实就是将上游的Observable和变换函数Function保存起来。

在订阅后具体的实现方法subscribeActual()中,采用MapObserver订阅上游的Observable,而其自身通过装饰者模式,对下游(终点)的Observer进行修饰。

actual 代表下游的 Observermapper 为传入的 Function,可以看到在接下来的onNext ()方法中,通过调用mapper.apply(t)进行相应的转化,然后继续调用下游的onNext()将转换结果暴露出去,至此完成了本次操作符的操作。

FlatMap

查看flatMap()方法,可以看到有多个方法重载:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
    return flatMap(mapper, false);
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors) {
    return flatMap(mapper, delayErrors, Integer.MAX_VALUE);
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency) {
    return flatMap(mapper, delayErrors, maxConcurrency, bufferSize());
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
        boolean delayErrors, int maxConcurrency, int bufferSize) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    //这里为false跳过
      if (this instanceof ScalarCallable) {
        @SuppressWarnings("unchecked")
        T v = ((ScalarCallable<T>)this).call();
        if (v == null) {
            return empty();
        }
        return ObservableScalarXMap.scalarXMap(v, mapper);
    }
    return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}

其最终方法将会传入4个参数:

  • mapperFunction对象
  • maxConcurrency:最大并发数
  • delayErrors:表示异常时,是否需要延迟到事件被观察结束后才抛出
  • bufferSize:缓存的事件总数大小

然后返回了一个ObservableFlatMap对象,与上述的map类似,该对象同样继承自Observable,采用了装饰模式进行了包装,具体实现看到方法subscribeActual

public void subscribeActual(Observer<? super U> t) {
        //传入t为callable对象才执行,这里为false跳过
        if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
            return;
        }
        source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
    }

这里采用MergeObserver进行订阅,继续看看它的onNext()方法:

public void onNext(T t) {
            // safeguard against misbehaving sources
            if (done) {
                return;
            }
            ObservableSource<? extends U> p;
            try {
                p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                s.dispose();
                onError(e);
                return;
            }

            //maxConcurrency 默认为Integer.MAX_VALUE,这里为false跳过
            if (maxConcurrency != Integer.MAX_VALUE) {
                synchronized (this) {
                    if (wip == maxConcurrency) {
                        sources.offer(p);
                        return;
                    }
                    wip++;
                }
            }

            subscribeInner(p);
        }

在接收到发射的数据后,通过mapper.apply(t)将数据转化为ObservableSource对象

void subscribeInner(ObservableSource<? extends U> p) {
            for (;;) {
               //false
                if (p instanceof Callable) {
                    tryEmitScalar(((Callable<? extends U>)p));

                    if (maxConcurrency != Integer.MAX_VALUE) {
                        synchronized (this) {
                            p = sources.poll();
                            if (p == null) {
                                wip--;
                                break;
                            }
                        }
                    } else {
                        break;
                    }
                } else {
                    InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
                    if (addInner(inner)) {
                        p.subscribe(inner);
                    }
                    break;
                }
            }
        }

最终进行订阅的是InnerObserver,再看它的onNext()方法:

public void onNext(U t) {
            if (fusionMode == QueueDisposable.NONE) {
                parent.tryEmit(t, this);
            } else {
                parent.drain();
            }
        }

void tryEmit(U value, InnerObserver<T, U> inner) {
            if (get() == 0 && compareAndSet(0, 1)) {
                actual.onNext(value);
                if (decrementAndGet() == 0) {
                    return;
                }
            } else {
                SimpleQueue<U> q = inner.queue;
                if (q == null) {
                    q = new SpscLinkedArrayQueue<U>(bufferSize);
                    inner.queue = q;
                }
                q.offer(value);
                if (getAndIncrement() != 0) {
                    return;
                }
            }
            drainLoop();
        }

fusionMode默认为None,所以走方法tryEmit(),由于MergeObserver继承于AtomicInteger,所以这里就用到了它的同步机制,同时只有一个value通过actual进行发射。但是AtomicInteger CAS锁只保证操作的原子性,所有发射的顺序并不是固定的。而没获取到锁的时候数据将放入队列中,并通过drainLoop ()方法循环获取,然后发射。

flatmapmap最大的不同在于,它是将一个发射的数据源Observable变换为多个Observables,然后再将他们合并放入一个单独的Observable

版权声明:

本文标题:入坑rxjava:rxjava2源码剖析(二)

作者:Rabtman

原始链接:https://blog.rabtman.com/2017/08/12/zero_rx_source_2/

本文采用署名-非商业性使用-禁止演绎4.0进行许可。

非商业转载请保留以上信息。商业转载请联系作者本人。