E/RxUsage: flatMap : accept : I am value 1 E/RxUsage: flatMap : accept : I am value 1 E/RxUsage: flatMap : accept : I am value 3 E/RxUsage: flatMap : accept : I am value 3 E/RxUsage: flatMap : accept : I am value 2 E/RxUsage: flatMap : accept : I am value 3 E/RxUsage: flatMap : accept : I am value 2 E/RxUsage: flatMap : accept : I am value 2
1 2 3 4 5 6 7
E/RxUsage: flatMap : accept : I am value 1 E/RxUsage: flatMap : accept : I am value 1 E/RxUsage: flatMap : accept : I am value 2 E/RxUsage: flatMap : accept : I am value 2 E/RxUsage: flatMap : accept : I am value 3 E/RxUsage: flatMap : accept : I am value 3 E/RxUsage: flatMap : accept : I am value 3
看了很多博客,显示的结果 1-3 都是完整循环多次后打印出来的,如这样:
1 2 3 4 5 6 7 8 9
D/TAG: I am value 1 D/TAG: I am value 1 D/TAG: I am value 1 D/TAG: I am value 3 D/TAG: I am value 3 D/TAG: I am value 3 D/TAG: I am value 2 D/TAG: I am value 2 D/TAG: I am value 2
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onNext(3); } }).concatMap(new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(@NonNull Integer integer) throws Exception { List<String> list = new ArrayList<>(); for (int i = 0; i < 3; i++) { list.add("I am value " + integer); } int delayTime = (int) (1 + Math.random() * 10); return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS); } }).subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<String>() { @Override public void accept(@NonNull String s) throws Exception { Log.e(TAG, "concatMap : accept : " + s + "\n"); mRxOperatorsText.append("concatMap : accept : " + s + "\n"); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception {
} });
打印结果:
1 2 3 4 5 6
E/RxUsage: concatMap : accept : I am value 1 E/RxUsage: concatMap : accept : I am value 1 E/RxUsage: concatMap : accept : I am value 2 E/RxUsage: concatMap : accept : I am value 2 E/RxUsage: concatMap : accept : I am value 3 E/RxUsage: concatMap : accept : I am value 3
结果是有序的,但是没有打印循环三次的结果???需要再查一下原因,更新到博客里。
zip
zip 通过一个函数将多个 Observable 发送的事件结合到一起,然后发送这些组合到一起的事件。 它按照严格的顺序应用这个函数。 它只发射与发射数据项最少的那个 Observable 一样多的数据。
从这个图中可以看见, 这次上游和以往不同的是, 有两根水管啦。 其中一根水管负责发送圆形事件, 另外一根水管负责发送三角形事件, 通过 zip 操作符, 使得圆形事件和三角形事件合并为了一个矩形事件。
分解动作:
分解动作解析: 组合的过程是分别从 两根水管里各取出一个事件 来进行组合, 并且一个事件只能被使用一次, 组合的顺序是严格按照事件发送的顺利 来进行的, 也就是说不会出现圆形 1 事件和三角形 B 事件进行合并, 也不可能出现圆形 2 和三角形 A 进行合并的情况。
W/System.err: io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | java.lang.InterruptedException W/System.err: at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:367) W/System.err: at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:73) W/System.err: at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:43) W/System.err: at io.reactivex.Observable.subscribe(Observable.java:12267) W/System.err: at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96) W/System.err: at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:578) W/System.err: at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66) W/System.err: at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57) W/System.err: at java.util.concurrent.FutureTask.run(FutureTask.java:266) W/System.err: at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:301) W/System.err: at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167) W/System.err: at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641) W/System.err: at java.lang.Thread.run(Thread.java:764) W/System.err: Caused by: java.lang.InterruptedException W/System.err: at java.lang.Thread.sleep(Native Method) W/System.err: at java.lang.Thread.sleep(Thread.java:373) W/System.err: at java.lang.Thread.sleep(Thread.java:314) W/System.err: at com.android.rxjavaproject.RxUsage$28.subscribe(RxUsage.java:458) W/System.err: at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40) W/System.err: ... 10 more