ReactiveX Operator 总结 及其它重要概念
- 1. 总览
- 2. Creating Observable
- 3. Transforming Observables
- 4. Filter Observables
- 5. Combining Observable
- 6. Error Handling Operators
- 7. Observable Utility Operators
- 8. Conditional and Boolean Operators
- 9. Mathematical and Aggregate Operators
- 10. Backpressure Operators
- 11. Connectable Observable Operators
- 12. Operators to Convert Observables
- 13. Single
- 14. Subject
其实ReativeX关于Observable或者Subscriber并没有什么好说的,无非就是观察者模式的拓展。
真正让ReactiveX变得牛逼的是Operator,或者叫”reactive extensions”这也是为什么这套库要叫ReactiveX。
当然,除此之外,还有其它一些ReactiveX的重要概念。理解他们能够让我们对ReactiveX的使用更加得心应手。
总览
Creating ObservablesCreate
, Defer
, Empty
/Never
/Throw
, From
, Interval
, Just
, Range
, Repeat
, Start
, and Timer
Transforming Observable ItemsBuffer
, FlatMap
, GroupBy
, Map
, Scan
, and Window
Filtering ObservablesDebounce
, Distinct
, ElementAt
, Filter
, First
, IgnoreElements
, Last
, Sample
, Skip
, SkipLast
, Take
, and TakeLast
Combining ObservablesAnd
/Then
/When
, CombineLatest
, Join
, Merge
, StartWith
, Switch
, and Zip
Error Handling OperatorsCatch
and Retry
Utility OperatorsDelay
, Do
, Materialize
/Dematerialize
, ObserveOn
, Serialize
, Subscribe
, SubscribeOn
, TimeInterval
, Timeout
, Timestamp
, and Using
Conditional and Boolean OperatorsAll
, Amb
, Contains
, DefaultIfEmpty
, SequenceEqual
, SkipUntil
, SkipWhile
, TakeUntil
, and TakeWhile
Mathematical and Aggregate OperatorsAverage
, Concat
, Count
, Max
, Min
, Reduce
, and Sum
Converting ObservablesTo
Connectable Observable OperatorsConnect
, Publish
, RefCount
, and Replay
Backpressure Operators
a variety of operators that enforce particular flow-control policies
Creating Observable
用来创建新的Observable对象的操作符
Create
通过自己的代码调用Subscriber的方法,来手动控制事件的产生。Defer
定义一个构建Observable的工厂函数,每当subscribe的时候,就创建一个全新的Subscriber一个全新的Observable,并且不订阅的话就不创建。Empty
/Never
/Throw
非常简单的Observable,Empty只调用一次onComplete Throw只调用一次onError Never则什么也不调用。From
通过一些常用的数据结构来构建Observable,包括但不限于Future,Callback,数组Interval
每过一定时间,就发送一个递增的整数Just
给这个函数传入什么,生成的Observable就产出什么。一般是一个或多个同一类型的对象。Range
发送一系列连续的整数。Repeat
重复发送一系列对象Timer
定时一个时间间隔,然后发送一个值。
以上 Repeat 在 RxJava中不是Observable创建函数,而是对于已有的Observable的修饰。其它都是静态的构造函数,可以方便的构造Observable
Transforming Observables
用来对Observable所发送的数据进行处理转化的操作符。
Buffer
用来周期性的将事件收集在一起,打包成数组发送给下一级。可以是以数量为周期,时间为周期,也可以自己定义停止Buffer的条件。FlatMap
把发送Observable<T>
的 Observable 转化成发送T
的Observable,即将两层的Observable压缩成了一层。GroupBy
通过定义key函数,将一个Observable拆分成多个Observable。一个发送Observable的ObservableMap
最常用的转换操作符,一个口进,一个口出。中间进行一些转化。Scan
重在两个相邻的事件的关系,将每两个事件之间进行处理和转化Window
window与Buffer很像,都是对于事件进行收集打包发送,但是与Buffer不同的是,Buffer是返回一个数组,而Window则是返回一个Observable
Filter Observables
用来选择判断哪些可以通过的操作符。
Debounce
每隔一定的时间间隔,才会接收一次事件,其它的事件都会被过滤掉。与throttle很像。Sample
也叫throttle。在一定的事件内,只会接收一次事件。当到达这个时间点后,找这个时间段内第一个/或者最后一个数据,throttleFirst throttleLast。Distinct
去除重复的单元。ElementAt
顾名思义,只接收第n个数据。Filter
只有经过预先定义的函数,返回true的,才接收。First
只接收第一个数据。或者第一个符合要求,返回为true的数据。IgnoreElements
不接收任何事件,只在所有事件都完成后通知我。Last
只接受最后一个数据。Skip
跳过前n个元素SkipLast
跳过最后n个元素Take
只接收前n个元素TakeLast
只接收最后n个元素
Combining Observable
用来把多个事件流合成一个事件流的操作符
And
/Then
/When
RxJava中没有这一函数只在RxJavaJoins库中有。StartWith
在发送数据之前,先发送startWith传入的数据。CombineLatest
传入两个Observable参数,共同发送信号。每接收到一个数据,都与另一个Observable的最后一个数据一起交给一个预设的函数处理,并继续发送这个函数的结果Join
与CombineLast
有些相似,都是选取两个Observable中的各一个值,进行合成运算。只不过join需要为每个数据规定特有的有效期,只有在有效期之内出现对应的另一个Observable的数据,才能进行合成。Zip
Zip与上面两个相同,也是一个combine操作符。只不过它combine的必须是两个index相同的元素。Merge
把多个Observable的数据统一发向一个接收器,同一个线程的时候,则深度优先。Switch
类似于merge。merge是把所有收到的数据,全部发出去,而Switch则是把,每一次有一个Observable通知说,我要开始发信息了,那么Switch就会扔掉前面的Observable,开始只关注这个已经开始的Observable。
Error Handling Operators
决定当发生错误时该怎么办的操作符。
Catch
在RxJava中叫onErrorResumeNext 如果发生了按这个预定的数据,继续发送下去。Retry
再订阅一次,从而重新发送所有的数据。
Observable Utility Operators
一些非常有用的工具操作符。
Delay
过三秒以后再开始按原样发送数据。Do
对Observable生命周期的监听,doOnCompleted doOnEach doOnError 等等的呢过,都是生命周期的回调。Materialize
/Dematerialize
有一个类叫做Notification,onNext onCompleted这些都算是Notification,Materialize的意思就是当接收到数据,或者onCompleted这样的信号时,通通包装成Notification,交给下一级Subscriber。Dematerialize
相反,把Notification拆箱。ObserveOn
为观察者的执行指定SchedulerSerialize
当Observable的行为比较复杂是,尤其是在使用多个线程,共同向Subscriber中发送信息时,很有可能一个线程执行了onComplete而另一个线程正在执行的onNext还没有返回,或者两个线程同步的执行onNext,这都存在问题。Serialize就是为了解决这样的问题,通过一定的等待,避免这一问题出现。Subscribe
订阅的关键函数。SubscribeOn
为事件源的执行指定SchedulerTimeInterval
传递时间源之间的时间间隔Timeout
两个事件的间隔超过预设时间就报错。TimeStamp
对每一个事件进行包装,打上时间戳Using
在开始监听的时候,创建Observable同时创建一个资源,同时在Observable完成,或者手动结束订阅的时候,释放资源。
Conditional and Boolean Operators
对Observable发送的数据进行评估挑选的操作符。
All
判断Observable的所有数据,是否都符合某个条件,向下返回boolean。Amb
多个Observable进行竞争,第一个开始返回数据的订阅源,以后发送数据,其它停止发送。注意,如果第一个发送的订阅源发送的是onComplete这样的Notification那么就停止。Contains
判断一个Observable是否发送了特殊的数据。DefaultIfEmpty
原样发送上级数据源传递过来的数据,如果数据源为空(即直接Complete了),那么就返回默认数据SequenceEqual
传给这个操作符两个Observable,如果每一个元素都相同,并且顺序也相同,结束的方式也相同(onError onCompleted),那么就返回true。SkipUntil
作为一个Observable的实例函数,接收另一个Observable参数,当参数Observable发送信息的时候,开始发送本身数据源的数据SkipWhile
传入一个函数作为判断条件,如果条件成立,就跳过这个数据(不往下发送),当条件不成立的时候,后面都发送,包括不成立的这个。TakeUntil
与SkipUntil
相对,在另一个Observable没有发数据时,接收,如果开始发数据了,就不接收了。TakeWhile
与SkipWhile
相对,当条件成立的时候,接收数据。
Mathematical and Aggregate Operators
对Observable返回的所有数据进行处理的操作符。
Concat
顺次连接多个Observable发送的数据,而不是插入Count
计算Observable发送数据的数量Reduce
累积计算。传入一个函数,对每两个数据都使用这一函数进行拼接。注意,上面的例子,如果函数换成x, y -> x * 10 + y
最后返回的就是12345
Average
不包含在RxJava基本包中,而包含在RxJava-Math中,取平均值Max
取最大值。在RxJava-Math中。Min
取最小值。同上Sum
求和,同上
Backpressure Operators
不是某一个具体的函数。当Observable产生数据的速度,明显大于Subscriber消耗数据的素的的时候,就需要定义缓存。有一些策略。
可以缓存下来,等到Subscriber请求的时候再发送,请求一次发送一次。
也存在其他的策略,比如把过快的扔掉。
每次请求的时候,只返回最新的数据。
Connectable Observable Operators
connect
可连接的Observable (connectable Observable)与普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了Connect操作符时才会开始。用这个方法,你可以等待所有的观察者都订阅了Observable之后再开始发射数据。Publish
将普通的Observable换换为可连接的ObservableRefCount
让一个可连接的Observable行为像普通的ObservableReplay
保证所有的观察者收到相同的数据序列,即使它们在Observable开始发射数据之后才订阅
Operators to Convert Observables
To
BlockingObservable.from BlockingObservable.getIterator nest toBlocking BlockingObservable.toFuture BlockingObservable.toIterable toList toMap toMultiMap toSortedList
都是对与Observable进行简单的变化。
Single
不返回一个一个数据,而返回onSuccess 或 onError。
Subject
Subject可以看成是一个桥梁或者代理,在某些ReactiveX实现中(如RxJava),它同时充当了Observer和Observable的角色。因为它是一个Observer,它可以订阅一个或多个Observable;又因为它是一个Observable,它可以转发它收到(Observe)的数据,也可以发射新的数据。