RxJava入门
ReactiveX是Reactive Extensions的缩写,一般简写为Rx,最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,Rx库支持Java、.NET、JavaScript和C++,Rx近几年越来越流行了,现在已经支持几乎全部的流行编程语言了,Rx的大部分语言库由ReactiveX这个组织负责维护,比较流行的有RxJava/RxJS/Rx.NET,社区网站是 reactivex.io。
在ReactiveX中,一个观察者(Observer)订阅一个可观察对象(Observable)。观察者对Observable发射的数据或数据序列作出响应。这种模式可以极大地简化并发操作,因为它创建了一个处于待命状态的观察者哨兵,在未来某个时刻响应Observable的通知,不需要阻塞等待Observable发射数据。
本文主要聚焦于RxJava
先来简单介绍 观察者模式里一些概念:
Observable:发射源,英文释义“可观察的”,在观察者模式中称为“被观察者”或“可观察对象”;
Observer:接收源,英文释义“观察者”,就是观察者模式中的“观察者”,可接收Observable、Subject发射的数据;
Subject:Subject是一个比较特殊的对象,既可充当发射源,也可充当接收源。它是消息收发的桥梁。
Subscriber:“订阅者”,也是接收源,Subscriber实现了Observer接口,比Observer多了一个方法unsubscribe(),用来取消订阅,当你不再想接收数据了,可以调用unsubscribe()方法停止接收,Observer 在 subscribe() 过程中,最终也会被转换成 Subscriber 对象,一般情况下,建议使用Subscriber作为接收源;
Subscription :Observable调用subscribe( )方法返回的对象,同样有unsubscribe( )方法,可以用来取消订阅事件;
Action~:RxJava中的一个接口,
Action~
是用来代替并简化Subscriber
,它只有无返回值的call()
方法,对应 ~ 为09时,它们有09个入参;Func~:与
Action~
非常相似,也有call()方法。但是它是有返回值的,同样也有Func0、Func1…Func9;
首先来看RxJva最简单的使用方式:
1 | //消息发送者 |
这样就形成RxJava一个简单的用法,sender发射”Hello World!”,将会被receiver的onNext的接收,通过这个例子,这就是 “观察者模式”。RxJava让观察者模式变得更简单和简洁,而RxJava所有的一切都将围绕这两个点展开,一个是发射数据,一个是接收数据。
1. Observable的创建:
just( )
— 将一个或多个对象转换成发射这些对象的Observable,并触发onNext( )
操作from( )
— 将一个Iterable, 一个Future, 或者一个数组转换成一个Observablerepeat( )
— 创建一个重复发射指定数据或数据序列的ObservablerepeatWhen( )
— 创建一个重复发射指定数据或数据序列的Observable,它依赖于另一个Observable发射的数据create( )
— 使用一个函数从头创建一个Observabledefer( )
— 只有当订阅者订阅才创建Observable;为每个订阅创建一个新的Observablerange( )
— 创建一个发射指定范围的整数序列的Observableinterval( )
— 创建一个按照给定的时间间隔发射整数序列的Observabletimer( )
— 创建一个在给定的延时之后发射单个数据的Observableempty( )
— 创建一个什么都不做直接通知完成的Observableerror( )
— 创建一个什么都不做直接通知错误的Observablenever( )
— 创建一个不发射任何数据的Observable
1 | Observer<Integer> receiver = new Observer<Integer>() { |
2. 转换操作:
- map( ) — 对序列的每一项都应用一个函数来进行变换
- flatMap( ) 将Observable发射的数据集合变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable
switchMap( )
— 将Observable发射的数据集合变换为Observables集合,然后只发射这些Observables最近发射的数据- scan( ) — 对Observable发射的每一项数据应用一个函数,然后按顺序依次发射每一个值
groupBy( )
— 将Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据buffer( )
— 它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个- window( ) — 定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是在同一个Obserable中发送
cast( )
— 在发射之前强制将Observable发射的所有数据转换为指定类型
1 | Observable.just(1, 2, 3, 4, 5) |
3. 过滤操作
filter( )
— 过滤数据takeLast( )
— 只发射最后的N项数据last( )
— 只发射最后的一项数据lastOrDefault( )
— 只发射最后的一项数据,如果Observable为空就发射默认值takeLastBuffer( )
— 将最后的N项数据当做单个数据发射skip( )
— 跳过开始的N项数据skipLast( )
— 跳过最后的N项数据take( )
— 只发射开始的N项数据first( )
and takeFirst( ) — 只发射第一项数据,或者满足某种条件的第一项数据firstOrDefault( )
— 只发射第一项数据,如果Observable为空就发射默认值elementAt( )
— 发射第N项数据elementAtOrDefault( )
— 发射第N项数据,如果Observable数据少于N项就发射默认值sample( )
or throttleLast( ) — 定期发射Observable最近的数据throttleFirst( )
— 定期发射Observable发射的第一项数据throttleWithTimeout( )
or debounce( ) — 只有当Observable在指定的时间后还没有发射数据时,才发射一个数据timeout( )
— 如果在一个指定的时间段后还没发射数据,就发射一个异常distinct( )
— 过滤掉重复数据distinctUntilChanged( )
— 过滤掉连续重复的数据ofType( )
— 只发射指定类型的数据ignoreElements( )
— 丢弃所有的正常数据,只发射错误或完成通知
聚合操作
concat( )
— 顺序连接多个Observablescount( )
and countLong( ) — 计算数据项的个数并发射结果reduce( )
— 对序列使用reduce()函数并发射最终的结果collect( )
— 将原始Observable发射的数据放到一个单一的可变的数据结构中,然后返回一个发射这个数据结构的ObservabletoList( )
— 收集原始Observable发射的所有数据到一个列表,然后返回这个列表toSortedList( )
— 收集原始Observable发射的所有数据到一个有序列表,然后返回这个列表toMap( )
— 将序列数据转换为一个Map,Map的key是根据一个函数计算的toMultiMap( )
— 将序列数据转换为一个列表,同时也是一个Map,Map的key是根据一个函数计算的
4. Subject
上文中介绍Subject
是桥梁或者代理,同时充当了Observer和Observable的角色。因为它是一个Observer,它可以订阅一个或多个Observable;又因为它是一个Observable,它可以转发它收到(Observe)的数据,也可以发送新的数据。
Subject的种类:
- AsyncSubject:一个AsyncSubject只在原始Observable完成后,发射来自原始Observable的最后一个值。(如果原始Observable没有发射任何值,AsyncObject也不发射任何值)它会把这最后一个值发射给任何后续的观察者:
1 | AsyncSubject<String> asyncSubject = AsyncSubject.create(); |
- BehaviorSubject:发射被订阅前的最后一条数据(如果被订阅时还没有发射数据,则发射一个默认值),并继续发射被订阅后接收到到的数据:
1 | BehaviorSubject<String> behaviorSubject = BehaviorSubject.create("default"); |
- PublishSubject:其订阅者只会收到订阅时间后,
PublishSubject
发射的数据:
1 | PublishSubject<String> publishSubject = PublishSubject.create(); |
- ReplaySubject:
ReplaySubject
会发射所有数据给所有订阅者,无论订阅者什么时间点订阅消息。
1 | ReplaySubject<String>replaySubject = ReplaySubject.create(); |
- SerializedSubject:当多线程往
Subject
中写数据时,能导致同时(非顺序)调用,这会违反Observable协议,给Subject的结果增加了不确定性。为解决此类问题,只需要将相应的Subject 包装在SerializedSubject
里。
5. Scheduler 调度器
RxJava提供了便捷的线程调度工具Scheduler,可以很简便地指定发射源或者订阅者在哪个特定的调度器(Scheduler)上执行。
RxJava提供ObserveOn
和SubscribeOn
两种调度方法,ObserveOn指示Observable
在哪个特定的调度器上调用观察者的onNext
, onError
和onCompleted
方法(指定观察者在哪个调度器上执行逻辑)。SubscribeOn
用于指定Observable
将全部的处理过程(包括发射数据和通知)放在特定的调度器上执行(Observer
内容执行的地方)。
调度器的种类:
调度器类型 | 效果 |
---|---|
Schedulers.computation( ) | 用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量 |
Schedulers.from(executor) | 使用指定的Executor作为调度器 |
Schedulers.immediate( ) | 在当前线程立即开始执行任务 |
Schedulers.io( ) | 用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器 |
Schedulers.newThread( ) | 为每个任务创建一个新线程 |
Schedulers.trampoline( ) | 当其它排队的任务完成后,在当前线程排队开始执行 |
5.1 SubscribeOn and ObserveOn
subscribeOn()
方法来用于指定Observable
自身在哪个调度器上执行。ObserveOn
指定一个观察者在哪个调度器上观察找个Observable
1 | Observable.create(((Observable.OnSubscribe<Integer>) subscriber -> { |
参考: