其实ReativeX关于Observable或者Subscriber并没有什么好说的,无非就是观察者模式的拓展。
真正让ReactiveX变得牛逼的是Operator,或者叫”reactive extensions”这也是为什么这套库要叫ReactiveX。

当然,除此之外,还有其它一些ReactiveX的重要概念。理解他们能够让我们对ReactiveX的使用更加得心应手。

总览

Creating Observables
Create, Defer, Empty/Never/Throw, From, Interval, Just, Range, Repeat, Start, and Timer

Transforming Observable Items
Buffer, FlatMap, GroupBy, Map, Scan, and Window

Filtering Observables
Debounce, Distinct, ElementAt, Filter, First, IgnoreElements, Last, Sample, Skip, SkipLast, Take, and TakeLast

Combining Observables
And/Then/When, CombineLatest, Join, Merge, StartWith, Switch, and Zip

Error Handling Operators
Catch and Retry
Utility Operators
Delay, Do, Materialize/Dematerialize, ObserveOn, Serialize, Subscribe, SubscribeOn, TimeInterval, Timeout, Timestamp, and Using
Conditional and Boolean Operators
All, Amb, Contains, DefaultIfEmpty, SequenceEqual, SkipUntil, SkipWhile, TakeUntil, and TakeWhile

Mathematical and Aggregate Operators
Average, Concat, Count, Max, Min, Reduce, and Sum

Converting Observables
To

Connectable Observable Operators
Connect, Publish, RefCount, and Replay

Backpressure Operators
a variety of operators that enforce particular flow-control policies

Creating Observable

用来创建新的Observable对象的操作符

  • Create 通过自己的代码调用Subscriber的方法,来手动控制事件的产生。

  • Defer 定义一个构建Observable的工厂函数,每当subscribe的时候,就创建一个全新的Subscriber一个全新的Observable,并且不订阅的话就不创建。

  • Empty / Never / Throw 非常简单的Observable,Empty只调用一次onComplete Throw只调用一次onError Never则什么也不调用。

  • From 通过一些常用的数据结构来构建Observable,包括但不限于Future,Callback,数组

  • Interval 每过一定时间,就发送一个递增的整数

  • Just 给这个函数传入什么,生成的Observable就产出什么。一般是一个或多个同一类型的对象。

  • Range 发送一系列连续的整数。

  • Repeat 重复发送一系列对象

  • Timer 定时一个时间间隔,然后发送一个值。

以上 Repeat 在 RxJava中不是Observable创建函数,而是对于已有的Observable的修饰。其它都是静态的构造函数,可以方便的构造Observable

Transforming Observables

用来对Observable所发送的数据进行处理转化的操作符。

  • Buffer 用来周期性的将事件收集在一起,打包成数组发送给下一级。可以是以数量为周期,时间为周期,也可以自己定义停止Buffer的条件。
  • FlatMap 把发送 Observable<T> 的 Observable 转化成发送T的Observable,即将两层的Observable压缩成了一层。
  • GroupBy 通过定义key函数,将一个Observable拆分成多个Observable。一个发送Observable的Observable
  • Map 最常用的转换操作符,一个口进,一个口出。中间进行一些转化。
  • Scan 重在两个相邻的事件的关系,将每两个事件之间进行处理和转化
  • Window window与Buffer很像,都是对于事件进行收集打包发送,但是与Buffer不同的是,Buffer是返回一个数组,而Window则是返回一个Observable

Filter Observables

用来选择判断哪些可以通过的操作符。

  • Debounce 每隔一定的时间间隔,才会接收一次事件,其它的事件都会被过滤掉。与throttle很像。

  • Sample 也叫throttle。在一定的事件内,只会接收一次事件。当到达这个时间点后,找这个时间段内第一个/或者最后一个数据,throttleFirst throttleLast。

  • Distinct 去除重复的单元。

  • ElementAt 顾名思义,只接收第n个数据。

  • Filter 只有经过预先定义的函数,返回true的,才接收。

  • First 只接收第一个数据。或者第一个符合要求,返回为true的数据。

  • IgnoreElements 不接收任何事件,只在所有事件都完成后通知我。

  • Last 只接受最后一个数据。

  • Skip 跳过前n个元素

  • SkipLast 跳过最后n个元素

  • Take 只接收前n个元素

  • TakeLast 只接收最后n个元素

Combining Observable

用来把多个事件流合成一个事件流的操作符

  • And/Then/When RxJava中没有这一函数只在RxJavaJoins库中有。

  • StartWith 在发送数据之前,先发送startWith传入的数据。

  • CombineLatest 传入两个Observable参数,共同发送信号。每接收到一个数据,都与另一个Observable的最后一个数据一起交给一个预设的函数处理,并继续发送这个函数的结果

  • JoinCombineLast有些相似,都是选取两个Observable中的各一个值,进行合成运算。只不过join需要为每个数据规定特有的有效期,只有在有效期之内出现对应的另一个Observable的数据,才能进行合成。

  • Zip Zip与上面两个相同,也是一个combine操作符。只不过它combine的必须是两个index相同的元素。

  • Merge 把多个Observable的数据统一发向一个接收器,同一个线程的时候,则深度优先。

  • Switch 类似于merge。merge是把所有收到的数据,全部发出去,而Switch则是把,每一次有一个Observable通知说,我要开始发信息了,那么Switch就会扔掉前面的Observable,开始只关注这个已经开始的Observable。

Error Handling Operators

决定当发生错误时该怎么办的操作符。

  • Catch 在RxJava中叫onErrorResumeNext 如果发生了按这个预定的数据,继续发送下去。

  • Retry 再订阅一次,从而重新发送所有的数据。

Observable Utility Operators

一些非常有用的工具操作符。

  • Delay 过三秒以后再开始按原样发送数据。

  • Do 对Observable生命周期的监听,doOnCompleted doOnEach doOnError 等等的呢过,都是生命周期的回调。

  • Materialize/Dematerialize 有一个类叫做Notification,onNext onCompleted这些都算是Notification,Materialize的意思就是当接收到数据,或者onCompleted这样的信号时,通通包装成Notification,交给下一级Subscriber。Dematerialize相反,把Notification拆箱。

  • ObserveOn 为观察者的执行指定Scheduler

  • Serialize 当Observable的行为比较复杂是,尤其是在使用多个线程,共同向Subscriber中发送信息时,很有可能一个线程执行了onComplete而另一个线程正在执行的onNext还没有返回,或者两个线程同步的执行onNext,这都存在问题。Serialize就是为了解决这样的问题,通过一定的等待,避免这一问题出现。

  • Subscribe 订阅的关键函数。

  • SubscribeOn 为事件源的执行指定Scheduler

  • TimeInterval 传递时间源之间的时间间隔

  • Timeout 两个事件的间隔超过预设时间就报错。

  • TimeStamp 对每一个事件进行包装,打上时间戳

  • Using 在开始监听的时候,创建Observable同时创建一个资源,同时在Observable完成,或者手动结束订阅的时候,释放资源。

Conditional and Boolean Operators

对Observable发送的数据进行评估挑选的操作符。

  • All 判断Observable的所有数据,是否都符合某个条件,向下返回boolean。
  • Amb 多个Observable进行竞争,第一个开始返回数据的订阅源,以后发送数据,其它停止发送。注意,如果第一个发送的订阅源发送的是onComplete这样的Notification那么就停止。
  • Contains 判断一个Observable是否发送了特殊的数据。
  • DefaultIfEmpty 原样发送上级数据源传递过来的数据,如果数据源为空(即直接Complete了),那么就返回默认数据
  • SequenceEqual 传给这个操作符两个Observable,如果每一个元素都相同,并且顺序也相同,结束的方式也相同(onError onCompleted),那么就返回true。
  • SkipUntil 作为一个Observable的实例函数,接收另一个Observable参数,当参数Observable发送信息的时候,开始发送本身数据源的数据
  • SkipWhile 传入一个函数作为判断条件,如果条件成立,就跳过这个数据(不往下发送),当条件不成立的时候,后面都发送,包括不成立的这个。
  • TakeUntilSkipUntil 相对,在另一个Observable没有发数据时,接收,如果开始发数据了,就不接收了。
  • TakeWhileSkipWhile 相对,当条件成立的时候,接收数据。

Mathematical and Aggregate Operators

对Observable返回的所有数据进行处理的操作符。

  • Concat 顺次连接多个Observable发送的数据,而不是插入
  • Count 计算Observable发送数据的数量
  • Reduce 累积计算。传入一个函数,对每两个数据都使用这一函数进行拼接。注意,上面的例子,如果函数换成 x, y -> x * 10 + y 最后返回的就是 12345

  • Average 不包含在RxJava基本包中,而包含在RxJava-Math中,取平均值

  • Max 取最大值。在RxJava-Math中。
  • Min 取最小值。同上
  • Sum 求和,同上

Backpressure Operators

不是某一个具体的函数。当Observable产生数据的速度,明显大于Subscriber消耗数据的素的的时候,就需要定义缓存。有一些策略。

可以缓存下来,等到Subscriber请求的时候再发送,请求一次发送一次。

也存在其他的策略,比如把过快的扔掉。

每次请求的时候,只返回最新的数据。

Connectable Observable Operators

  • connect 可连接的Observable (connectable Observable)与普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了Connect操作符时才会开始。用这个方法,你可以等待所有的观察者都订阅了Observable之后再开始发射数据。
  • Publish 将普通的Observable换换为可连接的Observable
  • RefCount 让一个可连接的Observable行为像普通的Observable
  • Replay 保证所有的观察者收到相同的数据序列,即使它们在Observable开始发射数据之后才订阅

Operators to Convert Observables

  • To BlockingObservable.from BlockingObservable.getIterator nest toBlocking BlockingObservable.toFuture BlockingObservable.toIterable toList toMap toMultiMap toSortedList
    都是对与Observable进行简单的变化。

Single

不返回一个一个数据,而返回onSuccess 或 onError。

Subject

Subject可以看成是一个桥梁或者代理,在某些ReactiveX实现中(如RxJava),它同时充当了Observer和Observable的角色。因为它是一个Observer,它可以订阅一个或多个Observable;又因为它是一个Observable,它可以转发它收到(Observe)的数据,也可以发射新的数据。