RxJava是今年最热门的Android技术,事实上它也确实有这个资格。从去年过年的时候,在AndroidWeekly上RxJava的消息就甚嚣尘上,这两天终于抽时间仔细研究了一下。

这篇文章,主要是RxJava的入门知识,对于RxJava的原理也有了一些涉及。

我所理解的RxJava的优势

RxJava有三方面的优势。

  1. 消除了所有的Callback-Hell。不仅是对于嵌套的Callback,对循环带来的嵌套也进行了消除。
  2. 明晰的事件流模型,使得对于事件的控制与现实世界更加的相似,控制起来更加的灵活。(这是由大量工具函数打造出来的)。比如控制事件流只开启3秒,只接收5次,都如同真的有一条时间线一样。
  3. 对线程进行了封装,能够非常方便的控制任何一个函数的线程切换。线程池,Android主线程的获取都非常简单。

关于RxJava的原理,干说没有什么意思,我想提出几个问题。

  1. map函数的Func是以什么形式,存储在哪里,在何时,由谁操控着,来处理订阅和观察的结果(参数)的?如果有n层map,那么这种关系将较为复杂。
  2. flatMap是怎样的原理?和map有什么区别
  3. 线程是如何生效的?
  4. map, flatMap 以及 filter 对应的函数会在observeOn还是subscribeOn的线程下执行?
  5. subscription在unsubscribe的时候,仅仅取消了订阅者的关注,还是停下了整个调用链?
  6. 理论上是否可以把所有的callback hell全部消除?

RxJava入门

给 Android 开发者的 RxJava 详解

RxJava是什么?

以下是ReactiveX官方定义:

ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences.

It extends the observer pattern to support sequences of data and/or events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety, concurrent data structures, and non-blocking I/O.

ReactiveX这套库,以Observable串为基本结构,基于消息,并被用来组织异步。

Reactive拓展了观察者模式,使得数据以流的模式,获得控制和处理。同时还提供了很多『操作符』函数,使我们能够方便地组织数据流,并且不必考虑底层线程,同步,线程安全,同步数据结构,以及非阻塞IO等问题。


如上所言
ReactiveX的目的是,将软件行为,定义为数据/事件流,并对之进行操控。比如,流在何时开始,流向何处(执行的顺序),执行线程,以及将多条流合为一条,等等。

ReactiveX的基本结构,是观察者模式的一种包装和拓展。每一个『操作符』函数,都会形成一个Observable,多个Observable连成一串,正是这多个Observable定义了对于数据流的控制。

需要申明的是,ReactiveX对于事件的控制是两个维度的。对于某一个事件,进行了一次又一次的转化操作,这是数据流。而随着时间,一个又一个的事件,不断的发出,ReactiveX可以控制何时处理处理哪些,这是事件流。

Observable与Subscriber

Observable是RxJava的主要接口类,定义了许多的静态工具方法,便于我们使用。

1
2
3
4
5
6
Observable.just(1).subscribe(new Action1<Integer>{
@Override
public void call(Integer anInt) {
// do something...
}
})

Observable.just(1)创建了一个Observable,即事件源。这个事件源会向监听者发送一个 『1』 (Integer)。更多更详细的创建方法会在下面介绍。

当调用subscribe方法的时候,事件源就会被触发。(这里说的是Cold Observable,另外还有Hot Observable,即不管有没有订阅者,只要创建,就开始发送信息)

Action1定义了接收函数,即监听者。
在java中一切皆对象,所以当我们想传递一个函数时,我们就要用类把函数包装起来。ReactiveX为我们提供了相当多的函数包装工具类。分别是Action和Func系列。Action代表有参数没有返回值的函数,Action1就是有一个参数,Action2就是有两个参数,ActionN就是有一个数组参数。同理,Func1有一个参数,Func2有两个参数。
在这里Action1是Subscriber类的简化,这里我们传Action,在使用时也会被包装成subscriber

Subscriber的结构

Subscriber是一个抽象类,有三个抽象函数。

分别是onComplete() onError(Throwable t) onNext(T result)

上文的Action1就会被包装成一个在onNext中执行Action1的Subscriber

Observable的创建

在这里我们只是说一下Observable实例的基本结构和原理,创建Observable有很多工具方法,将在后文中提到。

首先提出一个问题,是谁对监听者进行调用?在什么时候调用?
比如View的setOnClickListener函数。是View在调用这个Listener,是在View的performClick函数被调用的时候。

1
2
3
4
5
pubic void performClick() {
if (null != getOnClickListener()) {
getOnClickListner().onClick(this);
}
}

问题一,是一个OnSubscriber,在对Subscriber中的三个函数进行调用。
问题二,在二者绑定在一起的时候,即subscribe函数执行的时候进行调用。

每一个Observable实例都持有一个OnSubscriber实例,该实例负责向Subscriber发送信息。

我们先看一下OnSubscriber的结构。

1
2
3
4
5
6
7
8
9
10
11
new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
try {
subscriber.onNext(1);
subscriber.onComplete();
} catch (Exception e) {
subscriber.onError(e);
}
}
}

这只是一个大概,不过我们还是可以看到,其实subscriber的三个函数的功能,全在于OnSubscriber怎么调用它。

常见操作符

操作符是对于数据流进行处理,会经过两个基本的函数,map,flatMap。

map的功能很简单。就是对进行一个事件传递过来的信息进行parse,进行处理。处理成一个新的结果,继续传递下去。

1
2
3
4
5
6
7
8
Observable.just(1).map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return "" + integer;
}
}).subscribe(new Action1<String>{
//implementing
})

这个map将事件源传递来的Integer转化成了String,并传递给了Subscriber。

flatMap的功能就稍微复杂一些了。flatMap接收一个事件的数据,返回一个Observable。这是什么意思呢,函数内部会首先执行这个新返回的Observable,把这个Observable的事件全部都发给Subscriber,再接着返回源头的Observable,继续给flatMap发事件,以此类推。

解决了Observable过程中,一对多的关系。试想没有这个函数,当我们遇到一对多的时候,即参数是一个数组的时候,我们需要遍历整个数组来出来,这势必无法保证Observable的流式结构,而是会再次陷入Callback-hell中。举个简单的例子:

1
2
3
4
5
query("Hello, world!")  
.subscribe(urls -> {
Observable.from(urls)
.subscribe(url -> System.out.println(url));
});

而当我们有了flatMap就不一样了,flatMap会返回一个Observable,系统会先把这个Observable执行完,把flatMap中返回的Observable里的信号全部发出,交给下一级接受者,然后才会继续接收上一级的信号(当然我说的是在同一线程,这个函数也有可能是异步的,看你怎么设置,如果是其他线程的话,当然也可能出现竞争的情况)

其实相当于一个函数返回了多次。

如图:

1
2
3
4
5
6
7
8
query("Hello, world!")  
.flatMap(new Func1<List<String>, Observable<String>>() {
@Override
public Observable<String> call(List<String> urls) {
return Observable.from(urls);
}
})
.subscribe(url -> System.out.println(url));

看,Observable重新回到了流式结构。

Scheduler与线程管理

前面已经提到了,ReactiveX能够方便的进行线程管理,并且不需要担心同步等问题。二者一特性是通过两个方法,和四个Scheduler组成的。

四个Scheduler

  1. Scheduler.immediate() 这是系统默认的线程管理器。在这个Scheduler控制下的行为,将立刻执行,即不对线程进行切换,(受制于上一级工作所执行的线程,以及调用ReactiveX的线程)
  2. Scheduler.newThread() 在这个Scheduler控制下的行为,将总是开启新的线程进行执行
  3. Scheduler.io() 用于进行io的线程管理器。在这个Scheduler控制下的行为,将在一个特有的线程池中进行,这个线程池中可容纳的线程没有上限。
  4. Scheduler.computation() 用于计算的线程管理器。在这个Scheduler控制下的行为,将在另一个特有的线程池中进行,这个线程池容量固定,容纳线程数与CPU核数相当,与Scheduler.io()相比减少了线程之间的切换,从而最大程度上加快了计算速度。

Scheduler的使用

  • SubscribeOn(Scheduler.newThread()) 决定事件源切换到新启动的线程中执行
  • ObserveOn(Scheduler.io()) 决定下一级以及以后的观察者(回调函数)切换到io()线程池中执行。

举个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Observable.just(...)
.subscribeOn(Scheduler.newThread())
.observeOn(Scheduler.io())
.subscribe(new Subscriber<String>{
@Override
public void onNext(String aString) {
//... save aString in file
}

@Override
public void onComplete() {
}

public void onError(Throwable t) {
}
})

just中想发送的数据,就会切换到一个新线程中启动并发送,而Subscriber将被切换到io线程池中运行。

Scheduler行为深入

在上面的介绍中,我多次提到了切换这个词。我们还是提出几个问题,来搞清楚Scheduler的行为。

1.如果只定义SubscribeOn(),不定义ObserveOn()。Subscriber会在哪里执行?

答,会在SubscribeOn定义的线程中执行。因为没有做另一次切换。

2.如果增加多个ObserveOn(),比如调用了两个ObserveOn()。Subscriber会在哪里执行?

答,会在后一个observeOn()执行。事实上线程进行了两次切换。

3.如果调用两次subscribeOn(),OnSubscriber,即事件源会在哪里执行?

答,会在更靠近OnSubscriber的那个subscribeOn决定。前面说了,Obervable中的OnSubscriber是真正控制事件源产生的类,而这个类会在subscribe()函数被调用的时候,开始发送。换句话说,在subscribe函数中,其实会一级一级的向上找事件源,并且调用事件源,遇到subscribeOn的时候,就切换一次线程,故而由更靠近的subscribeOn决定。

一个例子

1
2
3
4
5
6
7
8
Observable.just(...)                 //在io线程池执行
.subscribeOn(Scheduler.io())
.map() //在io线程池执行
.observeOn(Scheduler.newThread())
.subscribeOn(Scheduler.computation())
.map() //在新线程执行
.map() //在新线程执行
.subscribe() //在新线程执行

在这个例子中,从subscribe往上走,首先切换到computation,再切换到io,到达第一行,ok,现在找到了数据源,开始激发数据源,数据源在io线程池中执行。然后再从数据源往下走,在第一个map中没有进行切换,继续在io线程池。过了第一个map,切换到newThread,所以第二个map是在新线程执行,后面的第三个map以及subscribe里的回调,都是在新线程中执行。

RxJava原理

有关RxJava原理见下文。
RxJava原理

RxJava速查

有关更具体的RxJava的一些工具函数,以及RxJava特有概念,见下文。
RxJava概念及工具函数总结