RxJava2.0 同步、异步
本文并非原创,通过各位博主博客学习后总结而得,以方便自己学习,在此感谢各位前辈,并在下面注明出处。
RxJava 同步
当上下游工作在同一个线程中时, 这时候是一个同步的订阅关系, 也就是说上游每发送一个事件必须等到下游接收处理完了以后才能接着发送下一个事件。
RxJava 异步
当上下游工作在不同的线程中时, 这时候是一个异步的订阅关系, 这个时候上游发送数据不需要等待下游接收, 为什么呢, 因为两个线程并不能直接进行通信, 因此上游发送的事件并不能直接到下游里去, 这个时候就需要一个田螺姑娘来帮助它们俩, 这个田螺姑娘就是水缸 ! 上游把事件发送到水缸里去, 下游从水缸里取出事件来处理, 因此, 当上游发事件的速度太快, 下游取事件的速度太慢, 水缸就会迅速装满, 然后溢出来, 最后就 OOM 了。
举例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { for (int i = 0; ; i++) { //无限循环发事件 emitter.onNext(i); } } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Thread.sleep(2000); Log.d(TAG, "" + integer); } });
|
这个就是上面解释的,上游发送的所有事件都放到水缸里了, 所以瞬间水缸就满了,就直接 Throwing OutOfMemoryError 掉了。
同步、异步区别
从上面两图中我们可以看出, 同步和异步的区别仅仅在于是否有 水缸
。源头找到了, 只要有 水缸
, 就会出现上下游发送事件速度不平衡的情况, 因此当我们以后遇到这种情况时, 仔细思考一下水缸在哪里, 找到水缸, 你就找到了解决问题的办法。
避免 OOM 的几种有效果方案
查看效率可以在 studio 中执行代码,然后打开 Android Monitor 或 Profiler,查看内存曲线,查看差异
上游不限制,下游控制延迟
这段代码很简单, 上游同样无限循环的发送事件, 在下游每次接收事件前延时2秒. 上下游工作在同一个线程里
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { for (int i = 0; ; i++) { //无限循环发事件 emitter.onNext(i); } } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Thread.sleep(2000); Log.d(TAG, "" + integer); } });
|
上游控制数据量
那我们可以只放我们需要的事件到水缸里呀, 只放一部分数据到水缸里, 这样不就不会溢出来了么
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { for (int i = 0; ; i++) { emitter.onNext(i); } } }).subscribeOn(Schedulers.io()) .filter(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer % 10 == 0; } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "" + integer); } });
|
在这段代码中我们增加了一个filter, 只允许能被10整除的事件通过。
可以看到, 通过减少进入水缸的事件数量的确可以有效缓解
上下游流速不均衡的问题, 但是力度还不是特别够。
通过 sample 操作符
sample 操作符, 这个操作符每隔指定的时间就从上游中取出一个事件发送给下游。
下面修改代码每隔 2 秒取一个事件给下游:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { for (int i = 0; ; i++) { emitter.onNext(i); } } }).subscribeOn(Schedulers.io()) /*sample取样*/ .sample(2, TimeUnit.SECONDS) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "" + integer); } });
|
通过查看曲线变化可以看到, 虽然上游仍然一直在不停的发事件, 但是我们只是每隔一定时间取一个放进水缸里, 并没有全部放进水缸里, 因此这种操作内存占有率也是很低的。
第二、三这两种方法归根到底其实就是减少放进水缸的事件的数量, 是以数量取胜的。但是这个方法有个缺点, 就是丢失了大部分的事件。
减慢上游发送事件的速度
既然上游发送事件的速度太快, 那我们就适当减慢发送事件的速度, 从速度上取胜。
举栗:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { for (int i = 0; ; i++) { emitter.onNext(i); Thread.sleep(2000); //每次发送完事件延时2秒 } } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "" + integer); } });
|
上游每次发送完事件后都延时了 2 秒。查看 Profiler 内存曲线是平滑的,而且事件也没有丢失,上游通过适当的延时,不但减缓了事件进入水缸的速度,也可以让下游有充足的时间从水缸里取出事件来处理,这样一来, 就不至于导致大量的事件涌进水缸,也就不会OOM啦。
到目前为止, 我们没有依靠任何其他的工具, 就轻易解决了上下游流速不均衡的问题。
总结一下, 上面提到的治理办法就两种:
- 从数量上进行治理, 减少发送进水缸里的事件。
- 从速度上进行治理, 减缓事件发送进水缸的速度。
以上的学习,让我对如何处理上下游流速不均衡已经有了基本的认识了。
Flowable 虽然也可以实现,但是这里并没有使用 Flowable, 所以很多时候仔细去分析问题, 找到问题的原因, 从源头去解决才是最根本的办法。
Flowable,其实没什么神秘的, 它用到的办法和上面所讲的基本上是一样的, 只是它稍微做了点封装。