RxJava 入门
- 入门(概念、基础使用)
- 进阶(操作符(map、flatmap、zip、defer、contatMap等等),背压等)
- 实战(网络请求等)
- 源码解析(变换、线程调度,源码探讨)
概念
下面是摘自 ReactiveX 官网 的一段话。
ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences.(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)
其中提到了几个概念,观察者模式、响应式编程。
响应式编程
概念:响应式编程是一种通过异步和数据流来构建事物关系的编程模型。
一般的编码模式中,“人”在其中扮演了过重的角色,关心程序中的每一部分。某种意义上这是一种顺序性思维的编程,我要做什么,然后做什么,最后做什么,按部就班编写就好了。具体如下图:
而响应式编程,全都是事物与事物之间的关系,解脱了”人”,之后一个事物发生变化另一个事物就自动响应。如下:
个人感觉响应式编程就是用异步数据流进行编程。流是响应式的核心,可以基于任何东西创建数据流,响应式编程就是根据数据流的流向进行一系列的操作。
观察者模式
观察者模式(Observer Pattern):定义了对象间的一种一对多的依赖关系,当一个对象状态发生改变时,其相关依赖对象皆得到通知并被自动更新。观察者模式又叫做发布-订阅(Publish/Subscribe)模式、模型-视图(Model/View)模式、源-监听器(Source/Listener)模式或从属者(Dependents)模式。
观察者模式面向的需求是:A 对象(观察者)对 B 对象(被观察者)的某种变化高度敏感,需要在 B 变化的一瞬间做出反应。举个例子,新闻里喜闻乐见的警察抓小偷,警察需要在小偷伸手作案的时候实施抓捕。在这个例子里,警察是观察者,小偷是被观察者,警察需要时刻盯着小偷的一举一动,才能保证不会漏过任何瞬间。程序的观察者模式和这种真正的『观察』略有不同,观察者不需要时刻盯着被观察者(例如 A 不需要每过 2ms 就检查一次 B 的状态),而是采用 注册(Register) 或者称为 订阅(Subscribe) 的方式,告诉被观察者:我需要你的某某状态,你要在它变化的时候通知我。 Android 开发中一个比较典型的例子是点击监听器 OnClickListener
。对设置 OnClickListener
来说, View
是被观察者, OnClickListener
是观察者,二者通过 setOnClickListener()
方法达成订阅关系。订阅之后用户点击按钮的瞬间,Android Framework 就会将点击事件发送给已经注册的 OnClickListener
。采取这样被动的观察方式,既省去了反复检索状态的资源消耗,也能够得到最高的反馈速度。当然,这也得益于我们可以随意定制自己程序中的观察者和被观察者,而警察叔叔明显无法要求小偷『你在作案的时候务必通知我』。
RxJava 简介
我所理解的RxJava的核心优势应该是它可以对复杂逻辑进行拆分成为一个一个的Observable后,RxJava的各种操作符予这些解耦的Observable能够合理的进行再组织的能力,并且它给予了你足够丰富的再组织能力。这种分拆再组织的能力是十分强大的,只有运用好RxJava这种强大的能力,才能真正意义上使你原来非常复杂的揉在一团的逻辑代码变得清晰、简洁,本质上是因为RxJava给你提供了这种强大方便的组织能力,我觉得有点像一种编程模式,你可以放心的将复杂的逻辑拆块,最后RxJava给你提供了丰富的组织、变换、串联、控制这些块的能力,只有这个时候你才会真正觉得这是个好东西,而不应该是跟风使用,但是心里也说不清楚为什么要使用。
RxJava 有三个基本概念: Observable
(被观察者),Observer
(观察者),subscribe
(订阅)。Observable
和 Observer
通过 subscribe()
方法实现订阅关系,从而 Observable
可以在需要的时候发出事件来通知 Observer
。
Github 链接:
RxJava https://github.com/ReactiveX/RxJava
RxAndroid https://github.com/ReactiveX/RxAndroid
gradle 依赖:
implementation 'io.reactivex.rxjava2:rxjava:2.x.y'
implementation 'io.reactivex.rxjava2:rxandroid:2.x.y'
RxJava 使用
创建被观察者
1 | Observable<String> observable = Observable |
可以看到,这里传入了一个 ObservableOnSubscribe
对象作为参数。ObservableOnSubscribe
会被存储在返回的 Observable
对象中,当 Observable
被订阅的时候,subscribe
方法就会自动被调用,事件序列就会依照设定依次触发(对于上面的代码,就是观察者 observer
将会被调用三次 onNext()
和一次 onComplete()
)。
此外,还可以通过其他方法来创建被观察者。
just(T...)
: 将传入的参数依次发送出来。1
Observable observable = Observable.just("Hello", "World");
fromArray(T...items)
: 将传入的数组或 Iterable 拆分成具体对象后,依次发送出来。1
2String[] words = {"Hello", "Hi", "Aloha"};
Observable<String> observable = Observable.fromArray(words);
除 Observable
外还有 Flowable
等被观察者类型。
创建观察者
1 | Observer<String> observer = new Observer<String>() { |
在观察者中进行响应事件对应的相关操作。
订阅
1 | observable.subscribe(observer); |
这里的写法是被观察者订阅了观察者,而不是观察者订阅被观察者,是为了保证流式API调用风格。
1 | observable |
上面就是一个非常简易的RxJava流式API的调用:同一个调用主体一路调用下来,一气呵成。
RxJava 的这个实现,是一条从上到下的链式调用,没有任何嵌套,这在逻辑的简洁性上是具有优势的。
整个流程如下图所示:
结合流程图的相应代码实例如下:
1 | //创建被观察者,是事件传递的起点 |
注意:当调用订阅操作(即调用Observable.subscribe()方法)的时候,被观察者才真正开始发出事件。
线程调度
至此,在 RxJava 的默认规则中,事件的发出和消费都是在同一个线程的。
在 RxJava 中,通过 Scheduler
来指定每一段代码应该运行在什么样的线程。下表展示了RxJava中可用的调度器种类:
调度器类型 | 效果 |
---|---|
Schedulers.computation( ) | 用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量 |
Schedulers.from(executor) | 使用指定的Executor作为调度器 |
Schedulers.immediate( ) | 在当前线程立即开始执行任务 |
Schedulers.io( ) | 用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器 |
Schedulers.newThread( ) | 为每个任务创建一个新线程 |
Schedulers.trampoline( ) | 当其它排队的任务完成后,在当前线程排队开始执行 |
AndroidSchedulers.mainThread() | Android 主线程 |
subscribeOn()
: 指定 subscribe()
所发生的线程,或者叫做事件产生的线程。
observeOn()
: 指定 Observer
所运行在的线程。或者叫做事件消费的线程。
1 | //new Observable.just()执行在新线程 |
注意:
subscribeOn()
它指示Observable
在一个指定的调度器上创建(只作用于被观察者创建阶段)。只能指定一次,如果指定多次则以第一次为准observeOn()
指定在事件传递(加工变换)和最终被处理(观察者)的发生在哪一个调度器。可指定多次,每次指定完都在下一步生效。
=======================================================
进阶(操作符(map、flatmap、zip、defer、contatMap等等),背压等)
操作符 | 说明 |
---|---|
Create | 使用一个函数从头开始创建一个Observable |
Defer | 直到有观察者订阅时才创建Observable,并且为每个观察者创建一个新的Observable |
Empty | 创建一个不发射任何数据但是正常终止的Observable |
Never | 创建一个不发射数据也不终止的Observable |
Throw | 创建一个不发射数据以一个错误终止的Observable |
From | 将其它种类的对象和数据类型转换为Observable |
Interval | 创建一个按固定时间间隔发射整数序列的Observable |
Just | 创建一个发射指定值的Observable |
Range | 创建一个发射特定整数序列的Observable |
Repeat | 创建一个发射特定数据重复多次的Observable |
Start | 返回一个Observable,它发射一个类似于函数声明的值 |
Timer | 创建一个Observable,它在一个给定的延迟后发射一个特殊的值 |
操作符 | 说明 |
---|---|
操作符 | 说明 |
---|---|
=======================================================
1 | private void testRxJava() { |
1 |
|
参考资料: