RxJava系列(一) 入门

RxJava 入门

  1. 入门(概念、基础使用)
  2. 进阶(操作符(map、flatmap、zip、defer、contatMap等等),背压等)
  3. 实战(网络请求等)
  4. 源码解析(变换、线程调度,源码探讨)

概念

下面是摘自 ReactiveX 官网 的一段话。

ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences.(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)

其中提到了几个概念,观察者模式、响应式编程。

响应式编程

概念:响应式编程是一种通过异步和数据流来构建事物关系的编程模型。

一般的编码模式中,“人”在其中扮演了过重的角色,关心程序中的每一部分。某种意义上这是一种顺序性思维的编程,我要做什么,然后做什么,最后做什么,按部就班编写就好了。具体如下图:

顺序性思维

而响应式编程,全都是事物与事物之间的关系,解脱了”人”,之后一个事物发生变化另一个事物就自动响应。如下:

响应式编程

个人感觉响应式编程就是用异步数据流进行编程。流是响应式的核心,可以基于任何东西创建数据流,响应式编程就是根据数据流的流向进行一系列的操作。

观察者模式

观察者模式(Observer Pattern):定义了对象间的一种一对多的依赖关系,当一个对象状态发生改变时,其相关依赖对象皆得到通知并被自动更新。观察者模式又叫做发布-订阅(Publish/Subscribe)模式、模型-视图(Model/View)模式、源-监听器(Source/Listener)模式或从属者(Dependents)模式。

观察者模式面向的需求是:A 对象(观察者)对 B 对象(被观察者)的某种变化高度敏感,需要在 B 变化的一瞬间做出反应。举个例子,新闻里喜闻乐见的警察抓小偷,警察需要在小偷伸手作案的时候实施抓捕。在这个例子里,警察是观察者,小偷是被观察者,警察需要时刻盯着小偷的一举一动,才能保证不会漏过任何瞬间。程序的观察者模式和这种真正的『观察』略有不同,观察者不需要时刻盯着被观察者(例如 A 不需要每过 2ms 就检查一次 B 的状态),而是采用 注册(Register) 或者称为 订阅(Subscribe) 的方式,告诉被观察者:我需要你的某某状态,你要在它变化的时候通知我。 Android 开发中一个比较典型的例子是点击监听器 OnClickListener 。对设置 OnClickListener 来说, View 是被观察者, OnClickListener 是观察者,二者通过 setOnClickListener() 方法达成订阅关系。订阅之后用户点击按钮的瞬间,Android Framework 就会将点击事件发送给已经注册的 OnClickListener 。采取这样被动的观察方式,既省去了反复检索状态的资源消耗,也能够得到最高的反馈速度。当然,这也得益于我们可以随意定制自己程序中的观察者和被观察者,而警察叔叔明显无法要求小偷『你在作案的时候务必通知我』。

RxJava 简介

我所理解的RxJava的核心优势应该是它可以对复杂逻辑进行拆分成为一个一个的Observable后,RxJava的各种操作符予这些解耦的Observable能够合理的进行再组织的能力,并且它给予了你足够丰富的再组织能力。这种分拆再组织的能力是十分强大的,只有运用好RxJava这种强大的能力,才能真正意义上使你原来非常复杂的揉在一团的逻辑代码变得清晰、简洁,本质上是因为RxJava给你提供了这种强大方便的组织能力,我觉得有点像一种编程模式,你可以放心的将复杂的逻辑拆块,最后RxJava给你提供了丰富的组织、变换、串联、控制这些块的能力,只有这个时候你才会真正觉得这是个好东西,而不应该是跟风使用,但是心里也说不清楚为什么要使用。

RxJava 有三个基本概念: Observable (被观察者),Observer (观察者),subscribe (订阅)。ObservableObserver 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer

Github 链接:

RxJava https://github.com/ReactiveX/RxJava

RxAndroid https://github.com/ReactiveX/RxAndroid

gradle 依赖:

implementation 'io.reactivex.rxjava2:rxjava:2.x.y'

implementation 'io.reactivex.rxjava2:rxandroid:2.x.y'

RxJava 使用

创建被观察者

1
2
3
4
5
6
7
8
9
Observable<String> observable = Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) {
emitter.onNext("Hello");
emitter.onNext("World");
emitter.onComplete();
}
});

可以看到,这里传入了一个 ObservableOnSubscribe 对象作为参数。ObservableOnSubscribe 会被存储在返回的 Observable 对象中,当 Observable 被订阅的时候,subscribe 方法就会自动被调用,事件序列就会依照设定依次触发(对于上面的代码,就是观察者 observer 将会被调用三次 onNext() 和一次 onComplete())。

此外,还可以通过其他方法来创建被观察者。

  • just(T...): 将传入的参数依次发送出来。

    1
    Observable observable = Observable.just("Hello", "World");
  • fromArray(T...items) : 将传入的数组或 Iterable 拆分成具体对象后,依次发送出来。

    1
    2
    String[] words = {"Hello", "Hi", "Aloha"};
    Observable<String> observable = Observable.fromArray(words);

Observable 外还有 Flowable 等被观察者类型。

创建观察者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onNext(String s) {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {

}
};

在观察者中进行响应事件对应的相关操作。

订阅

1
observable.subscribe(observer);

这里的写法是被观察者订阅了观察者,而不是观察者订阅被观察者,是为了保证流式API调用风格。

1
2
3
4
5
observable
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.filter(s -> s != null)
.subscribe(observer);

上面就是一个非常简易的RxJava流式API的调用:同一个调用主体一路调用下来,一气呵成。

RxJava 的这个实现,是一条从上到下的链式调用,没有任何嵌套,这在逻辑的简洁性上是具有优势的。

整个流程如下图所示:

流程图

结合流程图的相应代码实例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//创建被观察者,是事件传递的起点
Observable.just("On","Off","On","On")
//这就是在传递过程中对事件进行过滤操作
.filter(new Func1<String, Boolean>() {
@Override
public Boolean call(String s) {
return s!=null;
}
})
//实现订阅
.subscribe(
//创建观察者,作为事件传递的终点处理事件
new Subscriber<String>() {
@Override
public void onCompleted() {
Log.d("DDDDDD","结束观察...\n");
}

@Override
public void onError(Throwable e) {
//出现错误会调用这个方法
}
@Override
public void onNext(String s) {
//处理事件
Log.d("DDDDD","handle this---"+s)
}
);

注意:当调用订阅操作(即调用Observable.subscribe()方法)的时候,被观察者才真正开始发出事件。

线程调度

至此,在 RxJava 的默认规则中,事件的发出和消费都是在同一个线程的。

在 RxJava 中,通过 Scheduler 来指定每一段代码应该运行在什么样的线程。下表展示了RxJava中可用的调度器种类:

调度器类型 效果
Schedulers.computation( ) 用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量
Schedulers.from(executor) 使用指定的Executor作为调度器
Schedulers.immediate( ) 在当前线程立即开始执行任务
Schedulers.io( ) 用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器
Schedulers.newThread( ) 为每个任务创建一个新线程
Schedulers.trampoline( ) 当其它排队的任务完成后,在当前线程排队开始执行
AndroidSchedulers.mainThread() Android 主线程

subscribeOn(): 指定 subscribe() 所发生的线程,或者叫做事件产生的线程。

observeOn(): 指定 Observer 所运行在的线程。或者叫做事件消费的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//new Observable.just()执行在新线程
Observable.just(getFilePath())
//指定在新线程中创建被观察者
.subscribeOn(Schedulers.newThread())
//将接下来执行的线程环境指定为io线程
.observeOn(Schedulers.io())
//map就处在io线程
.map(mMapOperater)
//将后面执行的线程环境切换为主线程,
//但是这一句依然执行在io线程
.observeOn(AndroidSchedulers.mainThread())
//指定线程无效,但这句代码本身执行在主线程
.subscribeOn(Schedulers.io())
//执行在主线程
.subscribe(mSubscriber);

注意:

  • subscribeOn() 它指示 Observable 在一个指定的调度器上创建(只作用于被观察者创建阶段)。只能指定一次,如果指定多次则以第一次为准
  • observeOn() 指定在事件传递(加工变换)和最终被处理(观察者)的发生在哪一个调度器。可指定多次,每次指定完都在下一步生效。

=======================================================

进阶(操作符(map、flatmap、zip、defer、contatMap等等),背压等)

操作符 说明
Create 使用一个函数从头开始创建一个Observable
Defer 直到有观察者订阅时才创建Observable,并且为每个观察者创建一个新的Observable
Empty 创建一个不发射任何数据但是正常终止的Observable
Never 创建一个不发射数据也不终止的Observable
Throw 创建一个不发射数据以一个错误终止的Observable
From 将其它种类的对象和数据类型转换为Observable
Interval 创建一个按固定时间间隔发射整数序列的Observable
Just 创建一个发射指定值的Observable
Range 创建一个发射特定整数序列的Observable
Repeat 创建一个发射特定数据重复多次的Observable
Start 返回一个Observable,它发射一个类似于函数声明的值
Timer 创建一个Observable,它在一个给定的延迟后发射一个特殊的值
操作符 说明
操作符 说明

=======================================================

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private void testRxJava() {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) {

}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onNext(String s) {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {

}
})

// 伪代码
Observable.create( ObservableOnSubscribe(ObservableEmitter -> *) ){
ObservableCreate<T>(observableOnSubscribe)
}
.subscribe(Observer){
subscribeActual(observer){
parent = new CreateEmitter<T>(observer)
observer.onSubscribe(parent);
observableOnSubscribe.subscribe(parent);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

// subscribeOn 是改变上流的线程调度,所以只有第一个 subscribeOn 是有用的
// observeOn 是改变下流的线程调度,所以每一个 observeOn 对它下流的 操作都是有用的
ObservableCreate subscribeActual(SubscribeOnObserver){ ObservableOnSubscribe.subscribe(observableEmitter) }
ObservableSubscribeOn subscribeActual(ObserveOnObserver)
ObservableObserveOn subscribeActual(MapObserver)
ObservableMap subscribeActual(SubscribeOnObserver)
ObservableSubscribeOn subscribeActual(ObserveOnObserver)
ObservableObserveOn subscribeActual(MergeObserver)
ObservableFlatMap subscribeActual(Observer)

subscribe(Observer)



Observable.create(...)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.map(s -> s + "!")
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(Function<String, ObservableSource<String>>) s -> Observable.just(s + "!"))
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onNext(String s) {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {

}
});

参考资料:

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×

keyboard_arrow_up 回到顶端