Hystrix-RxJava入门

RxJava入门

ReactiveX是Reactive Extensions的缩写,一般简写为Rx,最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,Rx库支持Java、.NET、JavaScript和C++,Rx近几年越来越流行了,现在已经支持几乎全部的流行编程语言了,Rx的大部分语言库由ReactiveX这个组织负责维护,比较流行的有RxJava/RxJS/Rx.NET,社区网站是 reactivex.io。

在ReactiveX中,一个观察者(Observer)订阅一个可观察对象(Observable)。观察者对Observable发射的数据或数据序列作出响应。这种模式可以极大地简化并发操作,因为它创建了一个处于待命状态的观察者哨兵,在未来某个时刻响应Observable的通知,不需要阻塞等待Observable发射数据。

本文主要聚焦于RxJava
先来简单介绍 观察者模式里一些概念:

Observable:发射源,英文释义“可观察的”,在观察者模式中称为“被观察者”或“可观察对象”;

Observer:接收源,英文释义“观察者”,就是观察者模式中的“观察者”,可接收Observable、Subject发射的数据;

Subject:Subject是一个比较特殊的对象,既可充当发射源,也可充当接收源。它是消息收发的桥梁。

Subscriber:“订阅者”,也是接收源,Subscriber实现了Observer接口,比Observer多了一个方法unsubscribe(),用来取消订阅,当你不再想接收数据了,可以调用unsubscribe()方法停止接收,Observer 在 subscribe() 过程中,最终也会被转换成 Subscriber 对象,一般情况下,建议使用Subscriber作为接收源;

Subscription :Observable调用subscribe( )方法返回的对象,同样有unsubscribe( )方法,可以用来取消订阅事件;

Action~:RxJava中的一个接口,Action~是用来代替并简化Subscriber,它只有无返回值的call()方法,对应 ~ 为09时,它们有09个入参;

Func~:与Action~非常相似,也有call()方法。但是它是有返回值的,同样也有Func0、Func1…Func9;

首先来看RxJva最简单的使用方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//消息发送者
Observable<String> sender = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
while(下载进度<100%) {
subscriber.onNext("当前已下载" + 下载进度);
}

subscriber.onNext("Hello World");
}
});
//消息订阅者
Observer<String> receiver = new Observer<String>() {
@Override
public void onCompleted() {
//数据接收完成时调用
}
@Override
public void onError(Throwable e) {
//发生错误调用
}
@Override
public void onNext(String s) {
// 获取页面句柄,渲染后展示给用户看当前下载进度
viewHander.show(s);
}
};
//消息订阅
sender.subscribe(receiver);

这样就形成RxJava一个简单的用法,sender发射”Hello World!”,将会被receiver的onNext的接收,通过这个例子,这就是 “观察者模式”。RxJava让观察者模式变得更简单和简洁,而RxJava所有的一切都将围绕这两个点展开,一个是发射数据,一个是接收数据。

1. Observable的创建:

  • just( ) — 将一个或多个对象转换成发射这些对象的Observable,并触发onNext( )操作
  • from( ) — 将一个Iterable, 一个Future, 或者一个数组转换成一个Observable
  • repeat( ) — 创建一个重复发射指定数据或数据序列的Observable
  • repeatWhen( ) — 创建一个重复发射指定数据或数据序列的Observable,它依赖于另一个Observable发射的数据
  • create( ) — 使用一个函数从头创建一个Observable
  • defer( ) — 只有当订阅者订阅才创建Observable;为每个订阅创建一个新的Observable
  • range( ) — 创建一个发射指定范围的整数序列的Observable
  • interval( ) — 创建一个按照给定的时间间隔发射整数序列的Observable
  • timer( ) — 创建一个在给定的延时之后发射单个数据的Observable
  • empty( ) — 创建一个什么都不做直接通知完成的Observable
  • error( ) — 创建一个什么都不做直接通知错误的Observable
  • never( ) — 创建一个不发射任何数据的Observable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Observer<Integer> receiver = new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer s) {
System.out.print(s);
}
};
observable1 = Observable.just(1, 2, 3);
observable2 = Observable.from(new ArrayList(1, 2, 3));
observable3 = Observable.just(1, 2, 3).repeat(2);//输出1,2,3,1,2,3
deferObservable4 = Observable.defer(new Func0<Observable<Integer>>() {
@Override
public Observable<Integer> call() {
return Observable.just(1, 2, 3);
}});
observable5 = Observable.range(1, 5);//输出1,2,3,4,5

2. 转换操作:

  • map( ) — 对序列的每一项都应用一个函数来进行变换
  • flatMap( ) 将Observable发射的数据集合变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable
  • switchMap( ) — 将Observable发射的数据集合变换为Observables集合,然后只发射这些Observables最近发射的数据
  • scan( ) — 对Observable发射的每一项数据应用一个函数,然后按顺序依次发射每一个值
  • groupBy( ) — 将Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据
  • buffer( ) — 它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个
  • window( ) — 定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是在同一个Obserable中发送
  • cast( ) — 在发射之前强制将Observable发射的所有数据转换为指定类型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
Observable.just(1, 2, 3, 4, 5)
//执行换换
.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return String.valueOf(integer);
}
})

//flatMap
Observable.just("南昌", "深圳", "天津", "北京").flatMap(new Func1<String, Observable<WeatherInfo>>() {
@Override
public Observable<WeatherInfo> call(String s) {
return getWeather(s);
}
}).subscribe(new Subscriber<WeatherInfo>() {
@Override
public void onCompleted() {
}

@Override
public void onError(Throwable e) {
}

@Override
public void onNext(WeatherInfo weatherInfo) {
}
});
// scan方法 输出 1, 2, 6
Observable.from(Lists.newArrayList(1,2,3)).scan((x, y) -> x * y).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer);
}
});
//window 结果:1,2,3,||4,5,6,||7,8,9,||10,||
Observable.range(1, 10).window(3).subscribe(new Action1<Observable<Integer>>() {
@Override
public void call(Observable<Integer> integerObservable) {
integerObservable.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.print("||");
}

@Override
public void onError(Throwable e) {
}

@Override
public void onNext(Integer integer) {
System.out.print(integer + ",");
}
});
}
});
//window还有更多操作:
window(count,skip)、window(timeSpan, TimeUnit)...

3. 过滤操作

  • filter( ) — 过滤数据
  • takeLast( ) — 只发射最后的N项数据
  • last( ) — 只发射最后的一项数据
  • lastOrDefault( ) — 只发射最后的一项数据,如果Observable为空就发射默认值
  • takeLastBuffer( ) — 将最后的N项数据当做单个数据发射
  • skip( ) — 跳过开始的N项数据
  • skipLast( ) — 跳过最后的N项数据
  • take( ) — 只发射开始的N项数据
  • first( ) and takeFirst( ) — 只发射第一项数据,或者满足某种条件的第一项数据
  • firstOrDefault( ) — 只发射第一项数据,如果Observable为空就发射默认值
  • elementAt( ) — 发射第N项数据
  • elementAtOrDefault( ) — 发射第N项数据,如果Observable数据少于N项就发射默认值
  • sample( ) or throttleLast( ) — 定期发射Observable最近的数据
  • throttleFirst( ) — 定期发射Observable发射的第一项数据
  • throttleWithTimeout( ) or debounce( ) — 只有当Observable在指定的时间后还没有发射数据时,才发射一个数据
  • timeout( ) — 如果在一个指定的时间段后还没发射数据,就发射一个异常
  • distinct( ) — 过滤掉重复数据
  • distinctUntilChanged( ) — 过滤掉连续重复的数据
  • ofType( ) — 只发射指定类型的数据
  • ignoreElements( ) — 丢弃所有的正常数据,只发射错误或完成通知

聚合操作

  • concat( ) — 顺序连接多个Observables
  • count( ) and countLong( ) — 计算数据项的个数并发射结果
  • reduce( ) — 对序列使用reduce()函数并发射最终的结果
  • collect( ) — 将原始Observable发射的数据放到一个单一的可变的数据结构中,然后返回一个发射这个数据结构的Observable
  • toList( ) — 收集原始Observable发射的所有数据到一个列表,然后返回这个列表
  • toSortedList( ) — 收集原始Observable发射的所有数据到一个有序列表,然后返回这个列表
  • toMap( ) — 将序列数据转换为一个Map,Map的key是根据一个函数计算的
  • toMultiMap( ) — 将序列数据转换为一个列表,同时也是一个Map,Map的key是根据一个函数计算的

4. Subject

上文中介绍Subject是桥梁或者代理,同时充当了Observer和Observable的角色。因为它是一个Observer,它可以订阅一个或多个Observable;又因为它是一个Observable,它可以转发它收到(Observe)的数据,也可以发送新的数据。

Subject的种类

  • AsyncSubject:一个AsyncSubject只在原始Observable完成后,发射来自原始Observable的最后一个值。(如果原始Observable没有发射任何值,AsyncObject也不发射任何值)它会把这最后一个值发射给任何后续的观察者:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
AsyncSubject<String> asyncSubject = AsyncSubject.create();
asyncSubject.onNext("asyncSubject1");
asyncSubject.onNext("asyncSubject2");
asyncSubject.onNext("asyncSubject3");
asyncSubject.onCompleted();
asyncSubject.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println(s);//只输出 asyncSubject3
}
});
  • BehaviorSubject:发射被订阅前的最后一条数据(如果被订阅时还没有发射数据,则发射一个默认值),并继续发射被订阅后接收到到的数据:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
 BehaviorSubject<String> behaviorSubject = BehaviorSubject.create("default");
behaviorSubject.onNext("behaviorSubject1");
behaviorSubject.onNext("behaviorSubject2");
behaviorSubject.subscribe(new Observer<String>(){
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println(s);

}
});
behaviorSubject.onNext("behaviorSubject3");
behaviorSubject.onNext("behaviorSubject4");
/*
输出:
asyncSubject2
asyncSubject3
asyncSubject4
*/
  • PublishSubject:其订阅者只会收到订阅时间后,PublishSubject发射的数据:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
PublishSubject<String> publishSubject = PublishSubject.create();
publishSubject.onNext("publishSubject1");
publishSubject.onNext("publishSubject2");
publishSubject.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
});
publishSubject.onNext("publishSubject3");
publishSubject.onNext("publishSubject4");
/*
输出:
asyncSubject3
asyncSubject4
*/
  • ReplaySubjectReplaySubject会发射所有数据给所有订阅者,无论订阅者什么时间点订阅消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
ReplaySubject<String>replaySubject = ReplaySubject.create();    
replaySubject.onNext("replaySubject:pre1");
replaySubject.onNext("replaySubject:pre2");
replaySubject.onNext("replaySubject:pre3");
replaySubject.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});
replaySubject.onNext("replaySubject:after1");
replaySubject.onNext("replaySubject:after2");
/*
输出:
replaySubject:pre1
replaySubject:pre2
replaySubject:pre3
replaySubject:after1
replaySubject:after2
*/
  • SerializedSubject:当多线程往Subject中写数据时,能导致同时(非顺序)调用,这会违反Observable协议,给Subject的结果增加了不确定性。为解决此类问题,只需要将相应的Subject 包装在SerializedSubject里。

5. Scheduler 调度器

RxJava提供了便捷的线程调度工具Scheduler,可以很简便地指定发射源或者订阅者在哪个特定的调度器(Scheduler)上执行。

RxJava提供ObserveOnSubscribeOn两种调度方法,ObserveOn指示Observable在哪个特定的调度器上调用观察者的onNext, onErroronCompleted方法(指定观察者在哪个调度器上执行逻辑)。SubscribeOn用于指定Observable将全部的处理过程(包括发射数据和通知)放在特定的调度器上执行(Observer内容执行的地方)。

调度器的种类:

调度器类型 效果
Schedulers.computation( ) 用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量
Schedulers.from(executor) 使用指定的Executor作为调度器
Schedulers.immediate( ) 在当前线程立即开始执行任务
Schedulers.io( ) 用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器
Schedulers.newThread( ) 为每个任务创建一个新线程
Schedulers.trampoline( ) 当其它排队的任务完成后,在当前线程排队开始执行

5.1 SubscribeOn and ObserveOn

subscribeOn()方法来用于指定Observable自身在哪个调度器上执行。
ObserveOn指定一个观察者在哪个调度器上观察找个Observable

1
2
3
4
5
6
7
8
9
10
11
12
13
Observable.create(((Observable.OnSubscribe<Integer>) subscriber -> {
System.out.println(Thread.currentThread().getName());
subscriber.onNext(1);
})).subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation()).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer + ", Thread: " + Thread.currentThread().getName());
}
});
//输出结果:
RxIoScheduler-2
1, Thread: RxComputationScheduler-1

参考:

  1. RxJava Home
    1. Creating Observables
    2. Transforming Obserables
    3. Filtering Observables
    4. Subject
    5. Scheduler
  2. ReactiveX Operators
  3. 我所理解的RxJava(一)
  4. 我所理解的RxJava(二)
  5. ReactiveX/RxJava中文版
您的支持是我创作源源不断的动力