网站首页 > java教程 正文
前言
很快我们就迎来了第二期,上一期我们主要讲解了 RxJava 1.x 到 2.x 的变化概览,相信各位熟练掌握RxJava 1.x的老司机们随便看一下变化概览就可以上手RxJava 2.x了,但为了满足更广大的年轻一代司机(未来也是老司机),在本节中,我们将学习RxJava 2.x 强大的操作符章节。
正题
Create
create
操作符应该是最常见的操作符了,主要用于产生一个Obserable
被观察者对象,为了方便大家的认知,以后的教程中统一把被观察者Observable
称为发射器(上游事件),观察者Observer
称为接收器(下游事件)。
Observable.create(new ObservableOnSubscribe<Integer> { @Override
public void subscribe(@Non ObservableEmitter<Integer> e) throws Exception {
mRxOperatorsText.append("Observable emit 1" + "\n");
Log.e(TAG, "Observable emit 1" + "\n");
e.onNext(1);
mRxOperatorsText.append("Observable emit 2" + "\n");
Log.e(TAG, "Observable emit 2" + "\n");
e.onNext(2);
mRxOperatorsText.append("Observable emit 3" + "\n");
Log.e(TAG, "Observable emit 3" + "\n");
e.onNext(3);
e.onComplete;
mRxOperatorsText.append("Observable emit 4" + "\n");
Log.e(TAG, "Observable emit 4" + "\n" );
e.onNext(4);
}
}).subscribe(new Observer<Integer> { private int i; private Disposable mDisposable; @Override
public void onSubscribe(@Non Disposable d) {
mRxOperatorsText.append("onSubscribe : " + d.isDisposed + "\n");
Log.e(TAG, "onSubscribe : " + d.isDisposed + "\n" );
mDisposable = d;
} @Override
public void onNext(@Non Integer integer) {
mRxOperatorsText.append("onNext : value : " + integer + "\n");
Log.e(TAG, "onNext : value : " + integer + "\n" );
i++; if (i == 2) { // 在RxJava 2.x 中,新增的Disposable可以做到切断的操作,让Observer观察者不再接收上游事件
mDisposable.dispose;
mRxOperatorsText.append("onNext : isDisposable : " + mDisposable.isDisposed + "\n");
Log.e(TAG, "onNext : isDisposable : " + mDisposable.isDisposed + "\n");
}
} @Override
public void onError(@Non Throwable e) {
mRxOperatorsText.append("onError : value : " + e.getMessage + "\n");
Log.e(TAG, "onError : value : " + e.getMessage + "\n" );
} @Override
public void onComplete {
mRxOperatorsText.append("onComplete" + "\n");
Log.e(TAG, "onComplete" + "\n" );
}
});
输出:
需要注意的几点是:
在发射事件中,我们在发射了数值 3 之后,直接调用了
e.onComlete
,虽然无法接收事件,但发送事件还是继续的。另外一个值得注意的点是,在 RxJava 2.x 中,可以看到发射事件方法相比 1.x 多了一个 throws Excetion,意味着我们做一些特定操作再也不用 try-catch 了。
并且 2.x 中有一个
Disposable
概念,这个东西可以直接调用切断,可以看到,当它的isDisposed
返回为 false 的时候,接收器能正常接收事件,但当其为 true 的时候,接收器停止了接收。所以可以通过此参数动态控制接收事件了。
Map
Map
基本算是 RxJava 中一个最简单的操作符了,熟悉 RxJava 1.x 的知道,它的作用是对发射时间发送的每一个事件应用一个函数,是的每一个事件都按照指定的函数去变化,而在 2.x 中它的作用几乎一致。
Observable.create(new ObservableOnSubscribe<Integer> { @Override
public void subscribe(@Non ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).map(new Function<Integer, String> { @Override
public String apply(@Non Integer integer) throws Exception { return "This is result " + integer;
}
}).subscribe(new Consumer<String> { @Override
public void accept(@Non String s) throws Exception {
mRxOperatorsText.append("accept : " + s +"\n");
Log.e(TAG, "accept : " + s +"\n" );
}
});
输出:
是的,map
基本作用就是将一个Observable
通过某种函数关系,转换为另一种Observable
,上面例子中就是把我们的Integer
数据变成了String
类型。从Log日志显而易见。
Zip
zip
专用于合并事件,该合并不是连接(连接操作符后面会说),而是两两配对,也就意味着,最终配对出的Observable
发射事件数目只和少的那个相同。
Observable.zip(getStringObservable, getIntegerObservable, new BiFunction<String, Integer, String> { @Override
public String apply(@Non String s, @Non Integer integer) throws Exception { return s + integer;
}
}).subscribe(new Consumer<String> { @Override
public void accept(@Non String s) throws Exception {
mRxOperatorsText.append("zip : accept : " + s + "\n");
Log.e(TAG, "zip : accept : " + s + "\n");
}
});
private Observable<String> getStringObservable { return Observable.create(new ObservableOnSubscribe<String> { @Override
public void subscribe(@Non ObservableEmitter<String> e) throws Exception { if (!e.isDisposed) {
e.onNext("A");
mRxOperatorsText.append("String emit : A \n");
Log.e(TAG, "String emit : A \n");
e.onNext("B");
mRxOperatorsText.append("String emit : B \n");
Log.e(TAG, "String emit : B \n");
e.onNext("C");
mRxOperatorsText.append("String emit : C \n");
Log.e(TAG, "String emit : C \n");
}
}
});
} private Observable<Integer> getIntegerObservable { return Observable.create(new ObservableOnSubscribe<Integer> { @Override
public void subscribe(@Non ObservableEmitter<Integer> e) throws Exception { if (!e.isDisposed) {
e.onNext(1);
mRxOperatorsText.append("Integer emit : 1 \n");
Log.e(TAG, "Integer emit : 1 \n");
e.onNext(2);
mRxOperatorsText.append("Integer emit : 2 \n");
Log.e(TAG, "Integer emit : 2 \n");
e.onNext(3);
mRxOperatorsText.append("Integer emit : 3 \n");
Log.e(TAG, "Integer emit : 3 \n");
e.onNext(4);
mRxOperatorsText.append("Integer emit : 4 \n");
Log.e(TAG, "Integer emit : 4 \n");
e.onNext(5);
mRxOperatorsText.append("Integer emit : 5 \n");
Log.e(TAG, "Integer emit : 5 \n");
}
}
});
}
输出:
需要注意的是:
zip
组合事件的过程就是分别从发射器 A 和发射器 B 各取出一个事件来组合,并且一个事件只能被使用一次,组合的顺序是严格按照事件发送的顺序来进行的,所以上面截图中,可以看到,1 永远是和 A 结合的,2 永远是和 B 结合的。最终接收器收到的事件数量是和发送器发送事件最少的那个发送器的发送事件数目相同,所以如截图中,5 很孤单,没有人愿意和它交往,孤独终老的单身狗。
Concat
对于单一的把两个发射器连接成一个发射器,虽然 zip
不能完成,但我们还是可以自力更生,官方提供的concat
让我们的问题得到了完美解决。
Observable.concat(Observable.just(1,2,3), Observable.just(4,5,6))
.subscribe(new Consumer<Integer> { @Override
public void accept(@Non Integer integer) throws Exception {
mRxOperatorsText.append("concat : "+ integer + "\n");
Log.e(TAG, "concat : "+ integer + "\n" );
}
});
输出:
FlatMap
FlatMap
是一个很有趣的东西,我坚信你在实际开发中会经常用到。它可以把一个发射器Observable
通过某种方法转换为多个Observables
,然后再把这些分散的Observables
装进一个单一的发射器Observable
。但有个需要注意的是,flatMap
并不能保证事件的顺序,如果需要保证,需要用到我们下面要讲的ConcatMap
。
Observable.create(new ObservableOnSubscribe<Integer> { @Override
public void subscribe(@Non ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).flatMap(new Function<Integer, ObservableSource<String>> { @Override
public ObservableSource<String> apply(@Non Integer integer) throws Exception {
List<String> list = new ArrayList<>; for (int i = 0; i < 3; i++) {
list.add("I am value " + integer);
} int delayTime = (int) (1 + Math.random * 10); return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);
}
}).subscribeOn(Schedulers.newThread)
.observeOn(AndroidSchedulers.mainThread)
.subscribe(new Consumer<String> { @Override
public void accept(@Non String s) throws Exception {
Log.e(TAG, "flatMap : accept : " + s + "\n");
mRxOperatorsText.append("flatMap : accept : " + s + "\n");
}
});
输出:
一切都如我们预期中的有意思,为了区分 concatMap
(下一个会讲),我在代码中特意动了一点小手脚,我采用一个随机数,生成一个时间,然后通过delay
(后面会讲)操作符,做一个小延时操作,而查看 Log 日志也确认验证了我们上面的说法,它是无序的。
concatMap
上面其实就说了,concatMap
与FlatMap
的唯一区别就是concatMap
保证了顺序,所以,我们就直接把flatMap
替换为concatMap
验证吧。
Observable.create(new ObservableOnSubscribe<Integer> { @Override
public void subscribe(@Non ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).concatMap(new Function<Integer, ObservableSource<String>> { @Override
public ObservableSource<String> apply(@Non Integer integer) throws Exception {
List<String> list = new ArrayList<>; for (int i = 0; i < 3; i++) {
list.add("I am value " + integer);
} int delayTime = (int) (1 + Math.random * 10); return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);
}
}).subscribeOn(Schedulers.newThread)
.observeOn(AndroidSchedulers.mainThread)
.subscribe(new Consumer<String> { @Override
public void accept(@Non String s) throws Exception {
Log.e(TAG, "flatMap : accept : " + s + "\n");
mRxOperatorsText.append("flatMap : accept : " + s + "\n");
}
});
输出:
写在最后
好了,这一节就先介绍到这里,下一节我们将学习其它的一些操作符,在操作符讲完后再带大家进入实际情景,希望持续关注
结尾:
文章中的操作符的连接:http://reactivex.io/documentation/operators.html
GitHub 上的 Demo 专为大家倾心打造。
传送门:https://github.com/nanchen2251/RxJava2Examples
近期文章:
作为Android开发者,你想知道的都在这了!
作为Android开发者,你想知道的都在这了!
你熟悉又陌生的网络加密
等等,先别走!「码个蛋」又有活动了!参与活动不但可以培养自己的好习惯,还能拿到「码个蛋」IP系列专属奖品,速度要快...
今日问题:
大家项目中都用了哪些新的技术呢?
留言格式:
打卡x 天,答:xxx。
告诉你一个小技巧:
只需3步,你将不会错过任何一篇文章!
猜你喜欢
- 2024-10-13 深度分析ClassLoader机制,不可错过这一篇
- 2024-10-13 RxJava2.X 源码解析(一):探索RxJava2分发订阅流程
- 2024-10-13 与其他语言相比,Java有多安全?(java语言有哪些缺点)
- 2024-10-13 JVM 配置参数 -D,-X,-XX 的区别
- 2024-10-13 mysql-connector-java与MySQL 8.X版本建立连接
- 2024-10-13 浙江大学终于把java整理成漫画书了,动画教学更生动,允许白嫖
- 2024-10-13 java 核心技术-12版 卷Ⅰ- 4.1 面向对象程序设计概述
- 2024-10-13 SpringBoot2.x配置多数据源(springboot如何配置多数据源)
- 2024-10-13 JAVA 中获取比X大1位数中最小的数
- 2024-10-13 玩大了!Log4j 2.x 再爆雷(log4j最新版本)
你 发表评论:
欢迎- 最近发表
-
- Java常量定义防暴指南:从"杀马特"到"高富帅"的华丽转身
- Java接口设计原则与实践:优雅编程的艺术
- java 包管理、访问修饰符、static/final关键字
- Java工程师的代码规范与最佳实践:优雅代码的艺术
- 编写一个java程序(编写一个Java程序计算并输出1到n的阶乘)
- Mycat的搭建以及配置与启动(mycat部署)
- Weblogic 安装 -“不是有效的 JDK Java 主目录”解决办法
- SpringBoot打包部署解析:jar包的生成和结构
- 《Servlet》第05节:创建第一个Servlet程序(HelloSevlet)
- 你认为最简单的单例模式,东西还挺多
- 标签列表
-
- java反编译工具 (77)
- java反射 (57)
- java接口 (61)
- java随机数 (63)
- java7下载 (59)
- java数据结构 (61)
- java 三目运算符 (65)
- java对象转map (63)
- Java继承 (69)
- java字符串替换 (60)
- 快速排序java (59)
- java并发编程 (58)
- java api文档 (60)
- centos安装java (57)
- java调用webservice接口 (61)
- java深拷贝 (61)
- 工厂模式java (59)
- java代理模式 (59)
- java.lang (57)
- java连接mysql数据库 (67)
- java重载 (68)
- java 循环语句 (66)
- java反序列化 (58)
- java时间函数 (60)
- java是值传递还是引用传递 (62)
本文暂时没有评论,来添加一个吧(●'◡'●)