入坑rxjava:什么是背压(Backpressure)?

入坑rxjava系列第二章

什么是背压

关于背压,来自官方wiki如此说道:

In RxJava it is not difficult to get into a situation in which an Observable is emitting items more rapidly than an operator or subscriber can consume them. This presents the problem of what to do with such a growing backlog of unconsumed items.

翻译一下就是这样的:

被观察者发送的消息速度大于它的操作符或者订阅者的处理速度,从而导致消息处理不过来,不断地堆积。

举个通俗的例子,小学时候大伙总能遇到这么一个数学题:

一个水池,我们一边往里注水,又一边往外放水,如果注水的速度大于放水的速度时,那么过一段时间水池就会被注满并且溢出。

再结合代码来看看:

Observable.interval(1, TimeUnit.MILLISECONDS)
        .observeOn(Schedulers.newThread())
        .subscribe(new Action1<Long>() {
          @Override
          public void call(Long aLong) {
            SystemClock.sleep(1000);
            Log.d(MainActivity.TAG, "<--------" + aLong + "--------->");
          }
        }, new Action1<Throwable>() {
          @Override
          public void call(Throwable throwable) {
            throwable.printStackTrace();
          }
        });

运行结果:

missBackpressureException

为了避免上述情况的发生,我们就需要对从上游到下游的流速进行控制。

所以简单来说,背压是一种流速控制的策略

如何才能进行背压?

在Rx1中,并不是所有的Observable都支持背压的。

首先我们需要了解2个Observable的形象描述:

  • Cold Observables:在Observable创建后,只有在被订阅之后才会开始发送事件。
  • Hot Observables:在Observable创建后就开始发送事件,就算不存在被订阅。

其中Hot Observables是不支持背压的,而Cold Observables中也有一部分不支持背压,但我们可以通过一些operator来转化成支持Backpressure的Observable。这些operator包括:

  • onBackpressureBuffer
  • onBackpressureDrop
  • onBackpressureLatest
  • onBackpressureBlock(已过期)

而在RxJava 2.x中,Observable不再支持Backpressure(即使阻塞崩溃也不会抛出MissingBackpressureException

),而是改用Flowable来专门支持Backpressure。上面提到的四种operator的前三种分别对应Flowable的三种Backpressure策略:

  • BackpressureStrategy.BUFFER
  • BackpressureStrategy.DROP
  • BackpressureStrategy.LATEST

流速控制的手段

背压(Backpressure)

Backpressure,也称为Reactive Pull(响应式拉取)。在RxJava的观察者模型中,被观察者是主动的推送数据给观察者,观察者是被动接收的。而响应式拉取则反过来,观察者主动从被观察者那里去拉取数据,而被观察者变成被动的等待通知再发送数据。即下游需要多少(通过request(long n)指定拉取个数),上游就发送多少,以此来达到流速的控制。

Observable.range(0, 10000)
        .observeOn(Schedulers.newThread())
        .subscribe(new Subscriber<Integer>() {

          @Override
          public void onStart() {
            request(1);
          }

          @Override
          public void onCompleted() {

          }

          @Override
          public void onError(Throwable throwable) {

          }

          @Override
          public void onNext(Integer integer) {
            Log.d(MainActivity.TAG, "<--------" + integer + "--------->");
            request(1);
          }
        });

节流(Throttling)

节流(Throttling),对于消费不过来的事件进行丢弃。主要有以下三种策略:

  • sample (throttleLast):每隔一段时间从上游中取一个值,如果这段时间来内从上游过来了多个值,则选择最后一个值,所以sample也叫作throttleLast。
  • throttleFirst:与sample类似,每隔一段时间从上游中取一个值,如果这段时间来内从上游过来了多个值,此时选择最开始的一个值。
  • debounce (throttleWithTimeout):能将连续发送的事件之间较大的间隙找出来。即预设了一个时间段,如果超过这段时间都没有收到上游发送的事件,则就认为是超时,此时就会把超时前从上游收到的最后一个事件发送到下游。

打包处理(Buffers and windows)

打包处理(Buffers and windows),就是将上游发送的多个事件打包成一个再分发到下游,这样就可以减少下游需要处理的事件个数。主要有以下两种方式:

  • buffer:将指定的一段时间内的事件打包成list再发送到下游。
  • window:与buffer基本一致,只是打包后仍然是一个Observable。

总结

  • 背压的出现是为了解决上下游速度不一致的问题。
  • 在RxJava2.0中相比1.0在背压的策略上有了很大的提升,但是使用背压不一定就能得到我们期待的结果。
  • 从需求出发,切合实际情况制定出属于自己的策略。

结合demo食用,效果更佳,请点传送门

版权声明:

本文标题:入坑rxjava:什么是背压(Backpressure)?

作者:Rabtman

原始链接:https://blog.rabtman.com/2017/03/02/zero_rx_backpressure/

本文采用署名-非商业性使用-禁止演绎4.0进行许可。

非商业转载请保留以上信息。商业转载请联系作者本人。