RxJava2.0 学习笔记 - 常用关键字解释、线程调度的使用

Author Avatar
dev.liang 12月 17, 2017
  • 在其它设备中阅读本文章

RxJava2.0

本文并非原创,通过各位博主博客学习后总结而得,以方便自己学习,在此感谢各位前辈,并在下面注明出处。

ObservableEmitter、Disposable、subscribe() 解释

ObservableEmitter

Emitter 是发射器的意思,那就很好猜了,这个就是用来发出事件的,它可以发出三种类型的事件,通过调用 emitter 的 onNext(T value)、onComplete() 和 onError(Throwable error) 就可以分别发出 next 事件、complete 事件和 error 事件。

注意,并不意味着可以随意发射乱七八糟的事件,需要满足一定的规则:

  • 上游可以发送无限个 onNext, 下游也可以接收无限个 onNext.
  • 当上游发送了一个 onComplete 后, 上游 onComplete 之后的事件将会继续发送, 而下游收到 onComplete 事件之后将不再继续接收事件.
  • 当上游发送了一个 onError 后, 上游 onError 之后的事件将继续发送, 而下游收到 onError 事件之后将不再继续接收事件.
  • 上游可以不发送 onComplete 或 onError.
  • 最为关键的是 onComplete 和 onError 必须唯一并且互斥, 即不能发多个 onComplete, 也不能发多个 onError, 也不能先发一个 onComplete, 然后再发一个 onError, 反之亦然

注: 关于 onComplete 和 onError 唯一并且互斥这一点, 是需要自行在代码中进行控制, 如果你的代码逻辑中违背了这个规则, 并不一定会导致程序崩溃. 比如发送多个onComplete是可以正常运行的, 依然是收到第一个 onComplete 就不再接收了, 但若是发送多个 onError, 则收到第二个 onError 事件会导致程序会崩溃, 代码仓库里 里都有写出。

Disposable

单词的字面意思是一次性用品,用完即可丢弃的. 那么在RxJava中怎么去理解它呢, 对应于上面的水管的例子, 我们可以把它理解成两根管道之间的一个机关, 当调用它的 dispose() 方法时, 它就会将两根管道切断, 从而导致下游收不到事件.

注意: 调用 dispose() 并不会导致上游不再继续发送事件, 上游会继续发送剩余的事件.

subscribe()

subscribe 有好几个重载方法

1
2
3
4
5
6
public final Disposable subscribe() {}
public final Disposable subscribe(Consumer<? super T> onNext) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
public final void subscribe(Observer<? super T> observer) {}
  • 最后一个带有 Observer 参数的在 RxUsage 中的 commonUsageForChain()已使用过;
  • 不带任何参数的 subscribe() 表示下游不关心任何事件,你上游尽管发你的数据去吧, 下游可不关心你发什么;
  • 带有一个 Consumer 参数的方法表示下游只关心 onNext 事件, 其他的事件我假装没看见, 因此我们如果只需要 onNext 事件可以这么写;
  • 其他几个方法同理, 这里就不解释啦,下面示例已列出。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emitter 1");
emitter.onNext(1);
Log.d(TAG, "emitter 2");
emitter.onNext(2);
Log.d(TAG, "emitter 3");
emitter.onNext(3);
Log.d(TAG, "emitter complete");
emitter.onComplete();
Log.d(TAG, "emitter 4");
emitter.onNext(4);
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "onNext: " + integer);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {

}
}...);

RxJava 强大的线程控制之线程调度

上下游默认是在同一个线程工作

在主线程中分别创建上游和下游, 然后将他们连接在一起, 同时分别打印出它们所在的线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
Log.d(TAG, "emit 1");
emitter.onNext(1);
}
});

Consumer<Integer> consumer = new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
Log.d(TAG, "onNext: " + integer);
}
};

observable.subscribe(consumer);
}

运行结果为:

1
2
3
4
E/RxUsage: TAG: Observable thread is : main
E/RxUsage: TAG: emit 1
E/RxUsage: TAG: Observer thread is :main
E/RxUsage: TAG: onNext: 1

在子线程中做耗时的操作, 然后回到主线程中来操作 UI

thread

黄色水管表示子线程, 深蓝色水管表示主线程;
我们需要先改变上游发送事件的线程, 让它在子线程中发送事件, 然后再改变下游的线程, 让它去主线程接收事件. 通过 RxJava 内置的线程调度器可以轻松做到这一点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static void testPrintThreadName(){
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
Log.d(TAG, "emit 1");
emitter.onNext(1);
}
});

Consumer<Integer> consumer = new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
Log.d(TAG, "onNext: " + integer);
}
};

observable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(consumer);
}
1
2
3
4
E/RxUsage: Observable thread is : RxNewThreadScheduler-1
E/RxUsage: emit 1
E/RxUsage: Observer thread is :main
E/RxUsage: onNext: 1

从打印结果可以看出, 上游发送事件的线程的确改变了, 是在一个叫 RxNewThreadScheduler-1 的线程中发送的事件, 而下游仍然在主线程中接收事件,这就完美实现了在 Android 中在子线程中做完耗时操作,然后在主线程更新 UI 的操作。

切换线程主要通过以下两行操作来实现的

1
2
.subscribeOn(Schedulers.newThread())                                              
.observeOn(AndroidSchedulers.mainThread())

subscribeOn() 指定的是上游发送事件的线程;observeOn() 指定的是下游接收事件的线程;

补充:

多次指定 上游 的线程只有第一次指定的有效, 也就是说多次调用 subscribeOn() 只有第一次的有效, 其余的会被忽略;
多次指定 下游 的线程是可以的, 也就是说每调用一次 observeOn() , 下游的线程就会切换一次

为上面补充举个例子,方法见 RxUsage.java - testPrintThreadNameMoreChange()

1
2
3
4
5
observable.subscribeOn(Schedulers.newThread())     
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.io())
.subscribe(consumer);

上下游分别指定了两次发送事件的线程,上游分别是newThread和IO线程, 下游分别是main和IO线程。结果如下:

1
2
3
4
E/RxUsage: Observable thread is : RxNewThreadScheduler-1
E/RxUsage: emit 1
E/RxUsage: Observer thread is :RxCachedThreadScheduler-2
E/RxUsage: onNext: 1

可以看到, 上游虽然指定了两次线程, 但只有第一次指定的有效, 依然是在 RxNewThreadScheduler 线程中, 而下游则在 RxCachedThreadScheduler 中, 这个 CacheThread 就是IO线程池中的一个。

可以通过 doOnNext()清晰的看到下游的线程切换过程, 打印 log

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
observable.subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "After observeOn(mainThread), current thread is: " + Thread.currentThread().getName());
}
})
.observeOn(Schedulers.io())
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "After observeOn(io), current thread is: " + Thread.currentThread().getName());
}
})
.subscribe(consumer);

上下游线程切换之后, 把当前的线程打印出来, 运行结果:

1
2
3
4
5
6
7
8
E/RxUsage: Observable thread is : RxNewThreadScheduler-1
E/RxUsage: emit 1
E/RxUsage: After subscribeOn(newThread), current thread is: RxNewThreadScheduler-1
E/RxUsage: After subscribeOn(io), current thread is: RxNewThreadScheduler-1
E/RxUsage: After observeOn(mainThread), current thread is: main
E/RxUsage: After observeOn(io), current thread is: RxCachedThreadScheduler-2
E/RxUsage: Observer thread is :RxCachedThreadScheduler-2
E/RxUsage: onNext: 1

可以看到, 每调用一次 subscribeOnobserveOn() 线程都会切换一次, 项目中如果有有类似的需求时, 可灵活处理。

在 RxJava 中, 内置的线程:

  • Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作
  • Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作
  • Schedulers.newThread() 代表一个常规的新线程
  • AndroidSchedulers.mainThread() 代表Android的主线程

在 RxJava 内部使用的是线程池来维护这些线程, 效率相对比较高.

RxJava 在 Android 中的实践

对于 Android 开发来说, 经常会将一些耗时的操作放在后台, 比如网络请求或者读写文件,操作数据库等等,等到操作完成之后回到主线程去更新 UI 这样的场景经常用到。

要使用 Retrofit,先添加 Gradle 配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/*RxJava*/
implementation 'io.reactivex.rxjava2:rxjava:2.2.8'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
/*retrofit*/
implementation 'com.squareup.retrofit2:retrofit:2.5.0'
/*Gson converter*/
implementation 'com.squareup.retrofit2:converter-gson:2.5.0'
/*RxJava2 Adapter*/
implementation 'com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0'
/*okHttp*/
implementation 'com.squareup.okhttp3:okhttp:3.12.0'
implementation 'com.squareup.okhttp3:logging-interceptor:3.9.0'
/*fastJson*/
implementation 'com.alibaba:fastjson:1.2.56'

具体操作可点击查看代码,主要有这几个类,另外使用了 api.github 接口:

GithubService.java

Repo.java

RetrofitClient

都统一在 MainActivity 中进行调用。

看似很完美, 但我们忽略了一点, 如果在请求的过程中 Activity 已经退出了, 这个时候如果回到主线程去更新 UI, 那么APP肯定就崩溃了, 可以往上翻 Disposable , 说它是个开关, 调用它的dispose()方法时就会切断水管, 使得下游收不到事件, 既然收不到事件, 那么也就不会再去更新UI了. 因此我们可以在 Activity 中将这个 Disposable 保存起来, 当 Activity退出时, 切断它即可。

那如果有多个 Disposable 该怎么办呢, RxJava 中已经内置了一个容器 CompositeDisposable, 每当我们得到一个 Disposable 时就调用 CompositeDisposable.add() 将它添加到容器中, 在退出的时候, 调用 CompositeDisposable.clear() 即可切断所有的水管。

说明

本文并非原创,通过各位博主博客学习后总结而得,以方便自己学习,在此感谢各位前辈,并在下面注明出处。