RxJava2.0 学习笔记 - 常见操作符的使用

Author Avatar
dev.liang 12月 20, 2017
  • 在其它设备中阅读本文章

RxJava2.0 操作符

本文并非原创,通过各位博主博客学习后总结而得,以方便自己学习,在此感谢各位前辈,并在下面注明出处。

操作符目录

map

map 是 RxJava 中最简单的一个变换操作符, 它的作用就是对上游发送的每一个事件应用一个函数, 使得每一个事件都按照指定的函数去变化。用下面图表示:

map

图中 map 中的函数作用是将圆形事件转换为矩形事件, 从而导致下游接收到的事件就变为了矩形。用代码来表示为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("234");
emitter.onNext("678");
emitter.onNext("456");
}
}).map(new Function<String, Float>() {
@Override
public Float apply(String string) throws Exception {
return Float.valueOf(string);
}
}).subscribe(new Consumer<Float>() {
@Override
public void accept(Float aFloat) throws Exception {
mRxOperatorsText.append("accept : " + aFloat + "\n");
Log.e(TAG, "accept : " + aFloat + "\n");
}
});

在上游我们发送的是 String 类型, 而在下游我们接收的是 Float 类型, 中间起转换作用的就是 map 操作符, 运行结果为:

1
2
3
E/RxUsage: accept : 234.0
E/RxUsage: accept : 678.0
E/RxUsage: accept : 456.0

通过 map, 可以将上游发来的事件转换为任意的类型, 可以是一个Object, 也可以是一个集合。

flatMap

flatMap 将一个发送事件的上游 Observable 变换为多个发送事件的 Observables,然后将它们发射的事件合并后放进一个单独的 Observable 里。

但有个需要注意的是,flatMap 并不能保证事件的顺序,如果需要保证顺序,需要用到下面提到的 ConcatMap。

flatMap.png

中间flatMap的作用是将圆形的事件转换为一个发送矩形事件和三角形事件的新的上游Observable.

看看分解动作:

flatMap.png

上游每发送一个事件,flatMap 都将创建一个新的水管, 然后发送转换之后的新的事件,下游接收到的就是这些新的水管发送的数据。这里需要注意的是,flatMap 并不保证事件的顺序,也就是图中所看到的,并不是事件 1 就固定在事件 2 的前面。如果需要保证顺序则需要使用 concatMap。

代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("I am value " + integer);
}
int delayTime = (int) (1 + Math.random() * 10);
return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);
}
}).subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
// 这个打印出来是不完整的,偶尔会丢失数据!!!
// 有人说 Android8.0和Android9.0 兼容性有问题了,使用 flatmap,下游无法接收到全部的消息
// 还未验证 !!!
Log.e(TAG, "flatMap : accept : " + s + "\n");

//这个打印出来是完整的,下游不会丢失数据
mRxOperatorsText.append("stringBuffer flatMap : accept : " + s + "\n");
Log.e(TAG, mRxOperatorsText.toString());
}
});

多次打印结果:

1
2
3
4
5
6
7
8
E/RxUsage: flatMap : accept : I am value 1
E/RxUsage: flatMap : accept : I am value 1
E/RxUsage: flatMap : accept : I am value 3
E/RxUsage: flatMap : accept : I am value 3
E/RxUsage: flatMap : accept : I am value 2
E/RxUsage: flatMap : accept : I am value 3
E/RxUsage: flatMap : accept : I am value 2
E/RxUsage: flatMap : accept : I am value 2
1
2
3
4
5
6
7
E/RxUsage: flatMap : accept : I am value 1
E/RxUsage: flatMap : accept : I am value 1
E/RxUsage: flatMap : accept : I am value 2
E/RxUsage: flatMap : accept : I am value 2
E/RxUsage: flatMap : accept : I am value 3
E/RxUsage: flatMap : accept : I am value 3
E/RxUsage: flatMap : accept : I am value 3

看了很多博客,显示的结果 1-3 都是完整循环多次后打印出来的,如这样:

1
2
3
4
5
6
7
8
9
D/TAG: I am value 1
D/TAG: I am value 1
D/TAG: I am value 1
D/TAG: I am value 3
D/TAG: I am value 3
D/TAG: I am value 3
D/TAG: I am value 2
D/TAG: I am value 2
D/TAG: I am value 2

不过当使用 StringBuffer append 后结果是正常的,详细可查看代码 RxUsage/testFlatMap 有网友评论说是和手机版本有关系,还待验证…,回头验证一下!!!???

flatMap 在项目中的实践

主要用于解决 嵌套请求 问题。

举一个是实际的例子,如果是一个新用户, 必须先注册, 等注册成功之后再去自动登录;这是一个嵌套的网络请求, 首先需要去请求注册, 待注册成功回调了再去请求登录的接口。

接口:

1
2
3
4
5
6
7
public interface Api {
@GET
Observable<LoginResponse> login(@Body LoginRequest request);

@GET
Observable<RegisterResponse> register(@Body RegisterRequest request);
}

可以看到登录和注册返回的都是一个上游 Observable, 而 flatMap 操作符的作用就是把一个 Observable 转换为另一个 Observable。

链式代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
api.register(new RegisterRequest())            //发起注册请求
.subscribeOn(Schedulers.io()) //在IO线程进行网络请求
.observeOn(AndroidSchedulers.mainThread()) //回到主线程去处理请求注册结果
.doOnNext(new Consumer<RegisterResponse>() {
@Override
public void accept(RegisterResponse registerResponse) throws Exception {
/*先根据注册的响应结果去做一些操作*/
}
})
.observeOn(Schedulers.io()) //回到IO线程去发起登录请求
.flatMap(new Function<RegisterResponse, ObservableSource<LoginResponse>>() {
@Override
public ObservableSource<LoginResponse> apply(RegisterResponse registerResponse) throws Exception {
return api.login(new LoginRequest());
}
})
.observeOn(AndroidSchedulers.mainThread()) //回到主线程去处理请求登录的结果
.subscribe(new Consumer<LoginResponse>() {
@Override
public void accept(LoginResponse loginResponse) throws Exception {
Toast.makeText(MainActivity.this, "登录成功", Toast.LENGTH_SHORT).show();
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Toast.makeText(MainActivity.this, "登录失败", Toast.LENGTH_SHORT).show();
}
});

再次补充:

subscribeOn() 指定的是上游发送事件的线程;
observeOn() 指定的是下游接收事件的线程;
多次指定 上游 的线程只有第一次指定的有效, 也就是说多次调用 subscribeOn() 只有第一次的有效, 其余的会被忽略;
多次指定 下游 的线程是可以的, 也就是说每调用一次 observeOn() , 下游的线程就会切换一次

concatMap

它和 flatMap 的作用几乎一模一样, 只是它的结果是严格按照上游发送的顺序来发送的, 代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).concatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("I am value " + integer);
}
int delayTime = (int) (1 + Math.random() * 10);
return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);
}
}).subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.e(TAG, "concatMap : accept : " + s + "\n");
mRxOperatorsText.append("concatMap : accept : " + s + "\n");
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {

}
});

打印结果:

1
2
3
4
5
6
E/RxUsage: concatMap : accept : I am value 1
E/RxUsage: concatMap : accept : I am value 1
E/RxUsage: concatMap : accept : I am value 2
E/RxUsage: concatMap : accept : I am value 2
E/RxUsage: concatMap : accept : I am value 3
E/RxUsage: concatMap : accept : I am value 3

结果是有序的,但是没有打印循环三次的结果???需要再查一下原因,更新到博客里。

zip

zip 通过一个函数将多个 Observable 发送的事件结合到一起,然后发送这些组合到一起的事件。
它按照严格的顺序应用这个函数。
它只发射与发射数据项最少的那个 Observable 一样多的数据。

map

从这个图中可以看见, 这次上游和以往不同的是, 有两根水管啦。
其中一根水管负责发送圆形事件, 另外一根水管负责发送三角形事件, 通过 zip 操作符, 使得圆形事件和三角形事件合并为了一个矩形事件。

分解动作:

map

分解动作解析:
组合的过程是分别从 两根水管里各取出一个事件 来进行组合, 并且一个事件只能被使用一次, 组合的顺序是严格按照事件发送的顺利 来进行的, 也就是说不会出现圆形 1 事件和三角形 B 事件进行合并, 也不可能出现圆形 2 和三角形 A 进行合并的情况。

最终下游收到的事件数量是和上游中发送事件最少的那一根水管的事件数量相同。这个也很好理解, 因为是从每一根水管里取一个事件来进行合并, 最少的那个肯定就最先取完, 这个时候其他的水管尽管还有事件 , 但是已经没有足够的事件来组合了, 因此下游就不会收到剩余的事件了。

代码演示下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/**
* zip 组合事件的过程就是分别从发射器 A 和发射器 B 各取出一个事件来组合,并且一个事件只能被使用一次,
* 组合的顺序是严格按照事件发送的顺序来进行的,所以上面截图中,可以看到,1 永远是和 A 结合的,2 永远是和 B 结合的。
* <p>
* 最终接收器收到的事件数量是和发送器发送事件最少的那个发送器的发送事件数目相同, getStringObservable()发送器发的事件数目最少.
*/
public static void testZip() {
Observable.zip(getIntegerObservable(), getStringObservable(), new BiFunction<Integer, String, String>() {
@Override
public String apply(@NonNull Integer integer, @NonNull String s) throws Exception {
Log.e(TAG, "s + integer : " + s + integer + "\n");
return s + integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
mRxOperatorsText.append("zip : accept : " + s + "\n");
Log.e(TAG, "zip : accept : " + s + "\n");
}
});
}

private static Observable<String> getStringObservable() {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
if (!e.isDisposed()) {
e.onNext("A");
mRxOperatorsText.append("String emit : A \n");
Log.e(TAG, "getStringObservable --> String emit : A \n");
e.onNext("B");
mRxOperatorsText.append("String emit : B \n");
Log.e(TAG, "getStringObservable --> String emit : B \n");
e.onNext("C");
mRxOperatorsText.append("String emit : C \n");
Log.e(TAG, "getStringObservable --> String emit : C \n");
}
}
});
}

private static Observable<Integer> getIntegerObservable() {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
if (!e.isDisposed()) {
e.onNext(1);
mRxOperatorsText.append("Integer emit : 1 \n");
Log.e(TAG, "getIntegerObservable --> Integer emit : 1 \n");
e.onNext(2);
mRxOperatorsText.append("Integer emit : 2 \n");
Log.e(TAG, "getIntegerObservable --> Integer emit : 2 \n");
e.onNext(3);
mRxOperatorsText.append("Integer emit : 3 \n");
Log.e(TAG, "getIntegerObservable --> Integer emit : 3 \n");
e.onNext(4);
mRxOperatorsText.append("Integer emit : 4 \n");
Log.e(TAG, "getIntegerObservable --> Integer emit : 4 \n");
e.onNext(5);
mRxOperatorsText.append("Integer emit : 5 \n");
Log.e(TAG, "getIntegerObservable --> Integer emit : 5 \n");
}
}
});
}

运行结果,log 日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
E/RxUsage: getIntegerObservable --> Integer emit : 1 
E/RxUsage: getIntegerObservable --> Integer emit : 2
E/RxUsage: getIntegerObservable --> Integer emit : 3
E/RxUsage: getIntegerObservable --> Integer emit : 4
E/RxUsage: getIntegerObservable --> Integer emit : 5
E/RxUsage: s + integer : A1
E/RxUsage: zip : accept : A1
E/RxUsage: getStringObservable --> String emit : A
E/RxUsage: s + integer : B2
E/RxUsage: zip : accept : B2
E/RxUsage: getStringObservable --> String emit : B
E/RxUsage: s + integer : C3
E/RxUsage: zip : accept : C3
E/RxUsage: getStringObservable --> String emit : C

一开始看到这个日志,我自己也有很多疑问,比如为什么感觉是水管一(getIntegerObservable) 发送完了之后, 水管二(getStringObservable)才开始发送呢??

因为我们两根水管都是运行在同一个线程里, 同一个线程里执行代码肯定有先后顺序的。因此我们来稍微改一下, 不让他们在同一个线程,为了效果更明显,需要添加延迟 Thread.sleep(1000); , 试试效果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
public static void testZip() {
Observable.zip(getIntegerObservable(), getStringObservable(), new BiFunction<Integer, String, String>() {
@Override
public String apply(@NonNull Integer integer, @NonNull String s) throws Exception {
Log.e(TAG, "s + integer : " + s + integer + "\n");
return s + integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
mRxOperatorsText.append("zip : accept : " + s + "\n");
Log.e(TAG, "zip : accept : " + s + "\n");
}
});
}

private static Observable<String> getStringObservable() {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
if (!e.isDisposed()) {
Log.e(TAG, "getStringObservable --> String emit : A \n");
mRxOperatorsText.append("String emit : A \n");
e.onNext("A");
Thread.sleep(1000);

Log.e(TAG, "getStringObservable --> String emit : B \n");
mRxOperatorsText.append("String emit : B \n");
e.onNext("B");
Thread.sleep(1000);

Log.e(TAG, "getStringObservable --> String emit : C \n");
mRxOperatorsText.append("String emit : C \n");
e.onNext("C");
Thread.sleep(1000);

Log.e(TAG, "e getStringObservable onComplete");
e.onComplete();
}
}
}).subscribeOn(Schedulers.io());
}

private static Observable<Integer> getIntegerObservable() {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
if (!e.isDisposed()) {

Log.e(TAG, "getIntegerObservable --> Integer emit : 1 \n");
mRxOperatorsText.append("Integer emit : 1 \n");
e.onNext(1);
Thread.sleep(1000);

Log.e(TAG, "getIntegerObservable --> Integer emit : 2 \n");
mRxOperatorsText.append("Integer emit : 2 \n");
e.onNext(2);
Thread.sleep(1000);

Log.e(TAG, "getIntegerObservable --> Integer emit : 3 \n");
mRxOperatorsText.append("Integer emit : 3 \n");
e.onNext(3);
Thread.sleep(1000);

Log.e(TAG, "getIntegerObservable --> Integer emit : 4 \n");
mRxOperatorsText.append("Integer emit : 4 \n");
e.onNext(4);
Thread.sleep(1000);

Log.e(TAG, "getIntegerObservable --> Integer emit : 5 \n");
mRxOperatorsText.append("Integer emit : 5 \n");
e.onNext(5);
Thread.sleep(1000);

Log.e(TAG, "e getIntegerObservable onComplete");
e.onComplete();
}
}
}).subscribeOn(Schedulers.io());
}

在展示打印结果之前,上面的操作会报下面这个异常。

异常打印:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
W/System.err: io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | java.lang.InterruptedException
W/System.err: at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:367)
W/System.err: at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:73)
W/System.err: at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:43)
W/System.err: at io.reactivex.Observable.subscribe(Observable.java:12267)
W/System.err: at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
W/System.err: at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:578)
W/System.err: at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
W/System.err: at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
W/System.err: at java.util.concurrent.FutureTask.run(FutureTask.java:266)
W/System.err: at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:301)
W/System.err: at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)
W/System.err: at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
W/System.err: at java.lang.Thread.run(Thread.java:764)
W/System.err: Caused by: java.lang.InterruptedException
W/System.err: at java.lang.Thread.sleep(Native Method)
W/System.err: at java.lang.Thread.sleep(Thread.java:373)
W/System.err: at java.lang.Thread.sleep(Thread.java:314)
W/System.err: at com.android.rxjavaproject.RxUsage$28.subscribe(RxUsage.java:458)
W/System.err: at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
W/System.err: ... 10 more

解决方法是把 Thread.sleep() 替换为 SystemClock.sleep(),下面展示打印结果。

修改后的 gif 打印:(gif 更新与 2019 年…)

zip

使用 Thread.sleep() 时候,可能细心点的朋友又看出端倪了, getIntegerObservable 这个水管明明发送了五条个数据 + 一个 Complete, 之前不加 Thread.sleep() 明明还有的, 为啥到这里没了呢?

这是因为 zip 发送的事件数量跟上游中发送事件最少的那一根水管的事件数量是有关的, 在这个例子里我们getStringObservable 这根水管只发送了三个事件然后就发送了Complete, 这个时候尽管另外一根水管还有事件4、5 onComplete 没有发送, 但是它们发不发送还有什么意义呢?下游的打印输出里也不会输出的!

另外使用 SystemClock.sleep() 的时候,getIntegerObservable 水管中的事件 4、5 和 onComplete 也都打印了,但是下游的 accept 中是不会接受的。

详细 demo 代码 RxUsage -> testZip()

zip 在项目实践

场景:
比如一个界面需要展示用户的一些信息, 而这些信息分别要从两个服务器接口中获取, 而只有当两个都获取到了之后才能进行展示, 这个时候就可以用Zip了。

首先分别定义这两个请求接口:

1
2
3
4
5
6
7
8
public interface Api {
@GET
Observable<UserBaseInfoResponse> getUserBaseInfo(@Body UserBaseInfoRequest request);

@GET
Observable<UserExtraInfoResponse> getUserExtraInfo(@Body UserExtraInfoRequest request);

}

接着用Zip来打包请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Observable<UserBaseInfoResponse> observable1 = api.getUserBaseInfo(new UserBaseInfoRequest())
.subscribeOn(Schedulers.io());

Observable<UserExtraInfoResponse> observable2 = api.getUserExtraInfo(new UserExtraInfoRequest())
.subscribeOn(Schedulers.io());

Observable.zip(observable1, observable2, new BiFunction<UserBaseInfoResponse, UserExtraInfoResponse, UserInfo>() {
@Override
public UserInfo apply(UserBaseInfoResponse baseInfo,
UserExtraInfoResponse extraInfo) throws Exception {
return new UserInfo(baseInfo, extraInfo);
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<UserInfo>() {
@Override
public void accept(UserInfo userInfo) throws Exception {
//do something;
}
});

filter

在这段代码中增加了一个filter,过滤一些条件,只允许被过滤后的事件通过
举栗:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) {
emitter.onNext(i);
}
}
}).subscribeOn(Schedulers.io())
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer % 10 == 0;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "" + integer);
}
});

在这段代码中我们增加了一个 filter, 只允许能被 10 整除的事件通过。

sample 操作符

sample 操作符, 这个操作符每隔指定的时间就从上游中取出一个事件发送给下游。
下面代码每隔 2 秒取一个事件给下游:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) {
emitter.onNext(i);
}
}
}).subscribeOn(Schedulers.io())
/*sample取样*/
.sample(2, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "" + integer);
}
});

说明

本文并非原创,通过各位博主博客学习后总结而得,以方便自己学习,在此感谢各位前辈,并在下面注明出处。