AsyncTask是常见的异步任务的处理类。短小精悍,只有不带注释估计只有500行左右。通过学习它的实现,能够更加清楚多线程任务应该注意的地方。

重点

  • 消息机制在多线程中的应用

  • 线程池的应用以及利弊分析

  • Java原生异步工具类:Future || Callable || Runnable

目录

[toc]

##工作流程


###综述


AsyncTask短小精悍,结构也十分简单。初始化的时候创建一个FutureTask类,Task为一个包含了doInBackground的Callable类。

在执行的时候,就将这个FutureTask交给线程池去运行就ok了。

###初始化


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60

/**

* Creates a new asynchronous task. This constructor must be invoked on the UI thread.

*/


public AsyncTask() {

mWorker = new WorkerRunnable<Params, Result>() {

public Result call() throws Exception {

mTaskInvoked.set(true);



Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);

//noinspection unchecked

return postResult(doInBackground(mParams));

}

};



mFuture = new FutureTask<Result>(mWorker) {

@Override

protected void done() {

try {

postResultIfNotInvoked(get());

} catch (InterruptedException e) {

android.util.Log.w(LOG_TAG, e);

} catch (ExecutionException e) {

throw new RuntimeException("An error occured while executing doInBackground()",

e.getCause());

} catch (CancellationException e) {

postResultIfNotInvoked(null);

}

}

};

}

这里初始化的两个类分别是:

  • 继承自Callable方法的WorkerRunnable。泛型Params为类中的一个域的类型

  • FutureTask类。该类拓展自接口RunnabelFuture<V> extends Runnable, Future<V>该类既作为Future也可以作为Runnable方法。
    FutureTask由刚刚的Worker::Callable来创建。也就是说一会儿会执行的其实就是这个Callable
    这里牵涉到FutureTask<V>的实现和运用,我们会在后面介绍,这里先放一放。

下面我们跟踪一下,mWorker = new WorkerRunnable<Params, Result>()内部类中究竟发生了什么。

  • 首先,看一下这个WorkerRunnable类。继承自Callable,同时包含一个泛型参数数组
1
2
3
4
5
6
7
8

private static abstract class WorkerRunnable<Params, Result>

implements Callable<Result> {


Params[] mParams;

}
  • 其次,当它被调用时,会做哪些工作?见上面代码,运行了两个函数,一个为mTaskInvoked.set(True),另一个为,postResult(doInBackground(mParams))其结果会作为该Callable的返回值返回

1.mTaskInvoked是一个bool量,具体标志着什么,一会儿再说。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

private final AtomicBoolean mTaskInvoked = new AtomicBoolean();

...

/** 在上文中被set(Ture) */

...

private void postResultIfNotInvoked(Result result) {

final boolean wasTaskInvoked = mTaskInvoked.get();

if (!wasTaskInvoked) {

postResult(result);

}

}

2.postResult(Result result)实际就是把一个Message插入了消息队列。
getHandler()是单例模式的获取方法, 绑定的是主线程的消息队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14

private Result postResult(Result result) {

@SuppressWarnings("unchecked")

Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,

new AsyncTaskResult<Result>(this, result));

message.sendToTarget();

return result;

}

这里的AsyncTaskResult是一个结构体,里面持有的一个是AsyncTask本身,另一个是Result[] Data数组。这里其实没有什么神奇的,就是把结果(MESSAGE_POST_RESULT)插入队列最终由Handler处理。
值得注意的是,这个Handler实例除了处理结果,还处理一种消息,MESSAGE_POST_PROGRESS

###执行 execute(Params)


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66

...



/**

* Executes the task with the specified parameters. The task returns

* itself (this) so that the caller can keep a reference to it.

*/


public final AsyncTask<Params, Progress, Result> execute(Params... params) {

return executeOnExecutor(sDefaultExecutor, params);

}

...

public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,

Params... params)
{


if (mStatus != Status.PENDING) {

switch (mStatus) {

case RUNNING:

throw new IllegalStateException("Cannot execute task:"

+ " the task is already running.");

case FINISHED:

throw new IllegalStateException("Cannot execute task:"

+ " the task has already been executed "

+ "(a task can be executed only once)");

}

}



mStatus = Status.RUNNING;



onPreExecute();



mWorker.mParams = params;

exec.execute(mFuture);



return this;

}

前几行就是判断一下状态,状态不对会报错,这没什么好讲的。判断完以后准备onPreExecute(),赋参数,然后将初始化时创建的FutureTask实例交给特定的线程池来执行。

FutureTask会被传入一个Callable实例,这就是主要需要计算的任务。在这里这个CallablemWorker,在这里面执行了doInBackground(),并且最终Callable返回的也将是doInBackground()方法的返回值。另外这个结果还会被handler发往消息队列进行处理。

我们接着说,这个FutureTask里除了执行worker主要计算功能以外还做了一些调整。当计算任务之行结束后,会调用重写的done()方法。这个方法中抛开各种try...catch...其实就只有一句

1
2

postResultIfNotInvoked(get());

跟踪这个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14

private void postResultIfNotInvoked(Result result) {

final boolean wasTaskInvoked = mTaskInvoked.get();

if (!wasTaskInvoked) {

postResult(result);

}

}

...

这个方法是考虑到任务被Cancel掉,依旧要把这个结果给post到消息队列中。这个结果是get()产生的,由于是在done()里执行,所以get()函数无需挂起等待,直接就拿到了结果,本来这里应该是Worker::Callable的结果。但是由于被Cancel掉了,于是会触发CancelltionException,这是RuntimeException,程序并不会终止,会返回什么,请看下一篇博客。

##线程池的处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92

private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();

private static final int CORE_POOL_SIZE = CPU_COUNT + 1;

private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;

private static final int KEEP_ALIVE = 1;



/**

* An {@link Executor} that can be used to execute tasks in parallel.

*/


public static final Executor THREAD_POOL_EXECUTOR

= new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE,

TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);

...



/**

* An {@link Executor} that executes tasks one at a time in serial

* order. This serialization is global to a particular process.

*/


public static final Executor SERIAL_EXECUTOR = new SerialExecutor();



...



private static class SerialExecutor implements Executor {

final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();

Runnable mActive;



public synchronized void execute(final Runnable r) {

mTasks.offer(new Runnable() { //offer means enqueue at last

public void run() {

try {

r.run();

} finally {

scheduleNext();

}

}

});

if (mActive == null) {

scheduleNext();

}

}



protected synchronized void scheduleNext() {

if ((mActive = mTasks.poll()) != null) {

THREAD_POOL_EXECUTOR.execute(mActive);

}

}

}

所有的线程池都拓展自Executor接口

java.util.concurrent提供了一系列已经搞好的线程池,在类Executors中。

线程池的存在是为了现成资源的复用。

而在AsyncTask里则是自己实现的线程池。在THREAD_POOL_EXECUTOR定义了池的大小、配置,动态的由机子来决定。

同时也自己写了SerialExecutor作为线程池的管理器。

管理思路很简单,类中维护了一个双端队列,先进先出。当有一个任务要执行的时候,就将该任务放入队列尾部,同时从头部取出一个任务放入线程池执行(如果没有就忽略)。

注意,这里将任务(Runnable)外面又包裹了一层Runnable来保证每执行完一个就执行下一个。

参考资料:Java ThreadPool

##消息处理

###InternalHandler

  • 该类为继承自Handler的子类,并有一个对应的全局变量,持着引用。

  • 绑定线程为主线程

  • 这个真的没什么好说的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64

private static class InternalHandler extends Handler {

public InternalHandler(){

super(Looper.getMainLooper());

}



@SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})

@Override

public void handleMessage(Message msg) {

AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;

switch (msg.what) {

case MESSAGE_POST_RESULT:

// There is only one result

result.mTask.finish(result.mData[0]);

break;

case MESSAGE_POST_PROGRESS:

result.mTask.onProgressUpdate(result.mData);

break;

}

}

}





@SuppressWarnings({"RawUseOfParameterizedType"})

private static class AsyncTaskResult<Data> {

final AsyncTask mTask;

final Data[] mData;



AsyncTaskResult(AsyncTask task, Data... data) {

mTask = task;

mData = data;

}

}

##写法借鉴与知识点梳理


  1. 我们看到InternalHandler类为static的,这是为了防止内存泄漏。
  • 这是因为static的类就没办法引用外部的变量了

  • 就是怕Handler引用了Activity变量。这样的话,很可能出现一种情况就是,Activityfinish()了,但却依然被持有引用导致无法回收,造成泄露。

  • 如果非要用Activity引用,应该被WeakReference持有,这样就可以被回收了。

  • Activity中的内部类,都应该想想是否可以控制其生命周期,否则的话最好声明成静态类。

  • 参考资料:Handler引起内存泄漏

  1. WorkRunable实际上继承自Callable,那么CallableRunnable之间有什么区别?
  1. Future其实很简单,就是一种Callable方法的包装。管理了线程的睡眠唤醒,取消,状态,etc…
  • 这里用了FutureTask<V>::RunnableFuture<V>同时具有Runnable(可以放入线程池)和Future(控制任务状态)的特性。

  • 值得注意的是,Future不是都是和Callable结合吗?Runnable也没有结果啊?作者考虑到了这一点,传入的Runnable都会被包装Executors.callable(runnable,result);包装方法在Executors类中,可见就是为了兼容线程池操作。

  • Runnable方法可是没有返回值的。所以计算结果还是得通过V get()获得。

  1. 多线程