我在前面提到了几个问题,下面将尽可能的试着把它们说明白。

  1. map函数的Func是以什么形式,存储在哪里,在何时,由谁操控着,来处理订阅和观察的结果(参数)的?如果有n层map,那么这种关系将较为复杂。
  2. flatMap是怎样的原理?和map有什么区别
  3. 线程是如何生效的?
  4. subscription在unsubscribe的时候,仅仅取消了订阅者的关注,还是停下了整个调用链?

从map函数说起

如果我们自己构造map函数的时候,我们会怎么构造?
我是这样想的,我会定义一个Subscriber来接收,同时穿件一个新的Observable来发送新的信号,当接收到一个信号的时候,处理,然后就由新的Observable发送一个。
在这中间,或是在Subscriber,或是在Observable中用Func来处理Event。即可达到map的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Subscriber mFatherSubscriber = null;
Observable mChildObservable = null;
Observable Father = this;

public Observable<Event> map(Func) {
mChildObservable = Observable.create(new OnSubscriber{
call(Subscriber s) {
s.onNext(Event);
}
});
this.mFatherSubscriber = new Subscriber() {
onNext(Event) {
mChildObservable.emit(Func(Event));
}
};
return mChildObservable;
}

这样确实可以实现map的功能,但是对比RxJava的实现,有以下几点不足,或者说没有考虑到。

第一,打破了完全的函数式编程结构。这样的话需要给Observable留出转发的接口,而Observable对应着的却是产生事件的行为,没有接收信息的行为。而给他加上这个接口,整个思想就会造成混乱,无法让用户很好的体验到事件流的魅力。

第二,各个级别拆的太散了,想要控制后面的行为,比如控制后面的map执行的线程,就需要把这样那样的信息也传递下去。这需要预留大量的接收入口,不太好。

第三,更加复杂的结构,比如flatMap相对来说,就不好兼容。而RxJava却用Operator类将所有的行为统一到了一起。

那么RxJava是怎么实现map的呢?

用自然语言描述。

我先用语言简单描述一下,然后再深入到代码中。

在研究map之前,我想再啰嗦一遍subscribe()函数运行的过程。

在subscribe函数中,有两个操作对象,一个是被调用的Observable事件源,一个是传进来的Subscriber接收者。
当subscribe调用的时候,就会把Subscriber交给Observable,由Observable来进行调用。
代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
public void static subscribe(Subscriber s, Observable o) {
s.onStart();
try {
o.onSubscriber.call(s);
} catch (Throwable e) {
try {
s.onError(e);
} catch (Throwable e) {
throw_it(e);
}
}
}

总之,会有一个动作,就是把Subscriber交给Observable里的OnSubscriber,并进行进一步处理。

OK,让我们回到map的问题上。

当执行map(Func)的时候,会构建一个新的Observable(这个构建的过程叫做lift())。如果紧跟着map后面调用subscribe的话,那么一个Subscriber就会被交给这个Observable。

那么这个新的夹在数据源Observable和订阅者Subscriber之间的Observable都做了哪些事呢?

这个Observable会把接收到的Subscriber首先包装成一个新的Subscriber,新的Subscriber会把传来的信号先经过Func处理,然后再交给被包装的那个Subscriber。

然后介于中间的Observable会把这个包装过的Subscriber交给上一级的OnSubscriber来处理。上一级的OnSubscriber中也可能有自己的处理方式,比如再把Subscriber包装一层,直到遇到真正的数据源,开始给这包装了一层又一层的Subscriber传递数据,这一层又一层的包装,就是一层又一层的中间函数处理。

用代码描述

我们依旧从map函数看进去,然后了解Operator类,以及lift函数。

Observable#map方法

1
2
3
4
5
6
//T是当前所在的Observable实例的T泛型,代表这一级传递过来的参数的类型,
//而R则是在这个Func1中传入的,能够规定新产生的Observable类型。
public <R> Observable<R> map(Func1<T, R> func) {
//首先把func包装成了一个OperatorMap,再经过lift生成新的Observable
return lift(new OperatorMap<T, R>(func));
}

OperatorMap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public final class OperatorMap<T, R> implements Operator<R, T> {
//记录传进来的map函数
private final Func1<? super T, ? extends R> transformer;
//构造函数没有什么好说的
public OperatorMap(Func1<? super T, ? extends R> transformer) {
this.transformer = transformer;
}

//这个函数有一点意思了,会把传进来的Subscriber包装成新的Subscriber。
//但是有趣的是,新的Subscriber只有在这个函数被调用的时候才会被创建,
//或者说,只有在拿到Subscriber的时候,才会包装它创建这个新的Subscriber。
//而在此之前呢,函数Func1被保存在这个叫做Operator的对象中,
//而这个Operator则会保存在Obervable中,通过闭包的方式,非常的奇妙。
@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
return new Subscriber<T>(o) {

@Override
public void onCompleted() {
o.onCompleted();
}

@Override
public void onError(Throwable e) {
o.onError(e);
}

@Override
public void onNext(T t) {
try {
o.onNext(transformer.call(t));
} catch (Throwable e) {
Exceptions.throwOrReport(e, this, t);
}
}

};
}

}

所以,我们归纳一下OperatorMap的作用。暂时存储Func1,需要的时候,就会把传进来的Subscriber包装上Func形成新的Subscriber。我们可以说,这个Operator生成了一个新函数,返回了出来。有点函数式编程的意思了吧。

Observable#lift函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//注意参数这个final,这个Operator将被保存在内部类中,其中的函数也得到了保存,
//并在以后这个新生成的Observable被调用的时候,生成一个新的包装过的Subscriber
public <R> Observable<R> lift(final Operator<R, T> o) {
//lift函数新建了一个Observable
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> s) {
try {
//包装出来的新函数。
Subscriber<? super T> st = o.call(s);
try {
st.onStart();
//将交给st交给上一级Observable处理。
//上一级Observable也是保存在这一内部类实例中。
onSubscribe.call(st);
} catch (Throwable e) {
st.onError(e);
}
} catch (Throwable e) {
s.onError(e);
}
}
});
}

所以我们就可以解答第一个问题了。

map函数的Func是以什么形式,存储在哪里,在何时,由谁操控着,来处理订阅和观察的结果(参数)的?如果有n层map,那么这种关系将较为复杂。

map函数的func储存在Operator中,Operator储存在map函数返回的新的Observable中,以闭包的形式保存在实例中。

在调用subscribe的时候才会被使用,首先由Operator包装到下级传上来的Subscriber上面。然后再统一传给上级,最后,传到源头的时候,就会被调用。

先溯流而上,不断包装,不断保存下设置的信息,不管是函数也好,线程控制也好通通都会被包装被记录下来。溯流到源头以后,数据源传递过来的数据,就会沿着各层包装制定好的路线一次运行。

控制数据的流向——flatMap

前面已经说了,RxJava优势的第二点是明晰的事件流模型,这个所谓的流反应在时间上,一段时间内持续不断的有事件传来,形成一个流,如何控制这种流,流量,流向,ReactiveX都提供了相当优雅的解决方案。

flatMap的作用

我首先还是用自然语言来简单介绍一下flatMap

flatMap接收一个事件的数据,返回一个Observable。这是什么意思呢,函数内部会首先执行这个新返回的Observable,把这个Observable的事件全部都发给Subscriber,再接着返回源头的Observable,继续给flatMap发事件,以此类推。

flatMap原理

1
2
3
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
return merge(map(func));
}

首先Fuc经过了一层map,这意味着map返回的是一个Observable。前面也说了,map相当于对Subscriber进行了一层处理。现在最底端的这个Observable返回的是一个Observable。

然后再看merge函数,merge函数将对这个Observable再向下处理一层。(现在Rxjava所谓的Observable串的结构体系已经初现端倪了)

merge()发生了什么呢?

1
2
3
public final static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
return source.lift(OperatorMerge.<T>instance(false));
}

source对应着map返回的一个Observable,这个map返回的Observable传递的是Observable信号。对它进行lift,意味着用OperatorMerge接收下级Subscriber进行包装生成新的Subscriber。再交给上级的Observable。这个新的Subscriber接收的是Observable信号,交给下一级的Subscriber时(通过参数传入),交的是T类型的信号。

所以答案就呼之欲出了,就是把Observable中的信号通通发送给下一级的Subscriber。这一个信号就算是处理完了。

我们来归纳下,merge函数的作用,能够把Observable> 处理成 Observable,即把Observable类型的输入,通过emitLoop(),把这个Observable里的数据全部发出,交给下一级的Subscriber,从而变成输出T类型的Observable

线程是如何生效的

1
2
3
4
public final Observable<T> observeOn(Scheduler scheduler) {
// 在事件源执行时,通过OperatorObserveOn对于Subscriber进行包装。
return lift(new OperatorObserveOn<T>(scheduler));
}

在事件源执行时,通过OperatorObserveOn对于Subscriber进行包装。

其实这层包装,就是一层线程切换,决定了,接下来的任务(下级的Subscriber,或者说内层的Subscriber)在哪个线程执行。

1
2
3
4
5
6
7
public final Observable<T> subscribeOn(Scheduler scheduler) {
return nest().lift(new OperatorSubscribeOn<T>(scheduler));
}

public final Observable<Observable<T>> nest() {
return just(this);
}

nest其实就是把现在的Observable进行返回。我们姑且把这个叫做obs_A,对它进行lift意味着,把传给它的Subscriber先用OperatorSubscribeOn进行包装,然后再交给上级Observable进行执行。

也就是说obj_A返回的Observable,会交给OperatorSubscribeOn生成的新Subscriber,而新Subscriber保存着内层的Subscribe。再进行进一步传递。于是,就有了机会在这中间进行线程的切换。

Subscription#unsubscribe方法

OK,现在我们已经很清楚,每一个map,flatMap,subscribeOn,observeOn等等等,其实都是构建了一个新的Observable,也就是说构成了一个Observable串。

那么,当调用unsubscribe的时候,发生了什么呢?是整个Observable链条每一节都会断掉吗?

unsubscribe方法是Subscriber里的一个方法

1
2
3
4
5
6
7
8
public abstract class Subscriber<T> implements Observer<T>, Subscription {
private final SubscriptionList subscriptions;

@Override
public final void unsubscribe() {
subscriptions.unsubscribe();
}
}

看起来,所有的Subscriber都被存储在一起了,我们再看一下SubscriptionList。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public final class SubscriptionList implements Subscription {

private LinkedList<Subscription> subscriptions;

public void add(final Subscription s) {
// add in linkedList
}

public void remove(final Subscription s) {
// remove from linkedList
}
//经过一些列的同步判断,对于Subscriptions中保存的所有Subscriber,调用unsubscribe()
@Override
public void unsubscribe() {
if (!unsubscribed) {
List<Subscription> list;
synchronized (this) {
if (unsubscribed) {
return;
}
unsubscribed = true;
list = subscriptions;
subscriptions = null;
}
// we will only get here once
unsubscribeFromAll(list);
}
}

private static void unsubscribeFromAll(Collection<Subscription> subscriptions) {
if (subscriptions == null) {
return;
}
List<Throwable> es = null;
for (Subscription s : subscriptions) {
try {
s.unsubscribe();
} catch (Throwable e) {
if (es == null) {
es = new ArrayList<Throwable>();
}
es.add(e);
}
}
Exceptions.throwIfAny(es);
}

从上面代码中看,其实就是调用一个Subscriber的unsubscribe的时候,就是对于在SubscriptionList中的所有Subscriber调用unsubscribe。

那么SubscriptionList中都有哪些Subscriber呢?

我们知道Operator的实质是对Subscriber进行包装。

1
2
3
4
5
6
@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
return new Subscriber<T>(o) {
// implement
};
}

我们看到,包装过的外层的Subscriber,在构造的时候,会把内层的订阅者传进来。
我们来看一下Subscriber的构造函数。

1
2
3
4
5
6
7
8
protected Subscriber(Subscriber<?> subscriber) {
this(subscriber, true);
}

protected Subscriber(Subscriber<?> subscriber, boolean shareSubscriptions) {
this.subscriber = subscriber;
this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList();
}

于是所有的Subscriber其实都共享着同一个SubscriptionList,换句话说,记录了所有的Subscriber。同时SubscriptionList还有一个boolean型的标记,标记着是否已经停止。在需要时,就会针对这个进行类型进行判断,看是否该终止这一发送行为。

以下是Observable.from(Iterable)对应的OnSubscriber的call函数。

1
2
3
4
5
6
7
8
@Override
public void call(final Subscriber<? super T> o) {
final Iterator<? extends T> it = is.iterator();
if (!it.hasNext() && !o.isUnsubscribed())
o.onCompleted();
else
o.setProducer(new IterableProducer<T>(o, it));
}

我们可以看到在这个函数中,就对于是否已经停止订阅进行了判断。

所以,当我们调用unsubscribe的时候,整个Observable调用链,不一定全部断裂,而是看需求,有需求(持续发送信号)的Observable,就会判断Subscriber是否停止接受。而Subscriber之所以可以看到最底端,或者最内层的Subscriber是否接收信号,是因为他们共享着同一个SubscriptionList作为判断标识。

所以,要注意,unsubscribe只是标明Subscriber对于这个事件源不感兴趣了,但是不意味着,Observable以及中间层的各个Observable会立刻停止发送信息,有时,即便没有了订阅者,Observable也还是会继续发送信息。