代码示例
当我们需要多线程执行任务并获取任务的结果时,可以将Callable
任务提交到线程池,提交后会返回一个Future
对象,调用Future
的get()
方法就可以拿到线程执行的结果。如果将每个线程的Future
对象保存到一个集合中,再依次获取每个Future
的结果,最终我们就可以获取所以线程的结果集。
示例代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39 4j
public class CompleteServiceTest {
private static final ExecutorService executor = Executors.newFixedThreadPool(10);
static class TestCallable implements Callable<Integer> {
private int index;
TestCallable(int index) {
this.index = index;
}
public Integer call() throws Exception {
// 当index为3的倍数时睡眠3s
if (index % 3 == 0) {
TimeUnit.SECONDS.sleep(3L);
}
return index;
}
}
public void futureListTest() throws ExecutionException, InterruptedException {
List<Future<Integer>> futureList = new ArrayList<>();
for (int i = 1; i <= 5; i++) {
TestCallable callable = new TestCallable(i);
Future<Integer> future = executor.submit(callable);
futureList.add(future);
}
for (Future<Integer> future : futureList) {
log.info("线程:{} 执行任务结束", future.get());
}
}
}
运行futureListTest()
方法打印结果:1
2
3
4
516:47:53.850 [main] INFO com.lzumetal.multithread.threadpool.CompleteServiceTest - 线程:1 执行任务结束
16:47:53.865 [main] INFO com.lzumetal.multithread.threadpool.CompleteServiceTest - 线程:2 执行任务结束
16:47:56.842 [main] INFO com.lzumetal.multithread.threadpool.CompleteServiceTest - 线程:3 执行任务结束
16:47:56.842 [main] INFO com.lzumetal.multithread.threadpool.CompleteServiceTest - 线程:4 执行任务结束
16:47:56.842 [main] INFO com.lzumetal.multithread.threadpool.CompleteServiceTest - 线程:5 执行任务结束
但上面这种多线程执行任务的方式有个不好的地方,因为我们用遍历的方式调用future.get()
,而future.get()
方法又是阻塞的,所以会有上面例子中的情况,线程4和线程5已经执行完任务了,但还是要等线程3执行完后,才能拿到线程4和线程5的执行结果。
如果使用CompletionService
则可以很好地解决这个问题,因为它可以异步非阻塞地获取并行任务的执行结果。基于上面已有代码,CompletionService
测试代码如下:
1 |
|
打印结果:1
2
3
4
517:02:29.650 [main] INFO com.lzumetal.multithread.threadpool.CompleteServiceTest - 线程:1 执行任务结束
17:02:29.660 [main] INFO com.lzumetal.multithread.threadpool.CompleteServiceTest - 线程:2 执行任务结束
17:02:29.668 [main] INFO com.lzumetal.multithread.threadpool.CompleteServiceTest - 线程:4 执行任务结束
17:02:29.670 [main] INFO com.lzumetal.multithread.threadpool.CompleteServiceTest - 线程:5 执行任务结束
17:02:32.667 [main] INFO com.lzumetal.multithread.threadpool.CompleteServiceTest - 线程:3 执行任务结束
可以看到这种多线程执行的方式效率要更好,任务只要一执行完毕我们就可以先拿到结果,而不必按照一定的顺序去获取结果。
源码分析
CompletionService
异步非阻塞获取执行结果的实现原理其实很简单,就是写一个回调函数,任务执行完毕后都调用这个回调函数,这个回调函数的操作是将执行结果放到队列里。最终的效果是每当有一个任务执行完毕,队里里面都会增加一个执行的结果,先执行完的先放,只要队列里有元素,我们就从队列里拉取到这个任务结果。
具体ExecutorCompletionService
源码里是怎么写的呢?
首先看一下ExecutorCompletionService
的成员变量:1
2
3
4
5
6
7//真正执行任务的线程池
private final Executor executor;
private final AbstractExecutorService aes;
//用来保存任务Future的队列
private final BlockingQueue<Future<V>> completionQueue;
再看下构造方法,初始化了一个阻塞队列,用来存储已完成的任务。1
2
3
4
5
6
7
8
9
10public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
//创建阻塞队列
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
然后看一下提交任务的submit()
方法。1
2
3
4
5
6
7
8public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
//将我们的callable任务最终包装成QueueingFuture
executor.execute(new QueueingFuture(f));
return f;
}
可以看到,我们提交的Callable
任务被包装成QueueingFuture
,而QueueingFuture
是FutureTask
的子类,而FutureTask
又实现了Runnable
接口,所以最终执行了FutureTask
中的run()
方法。来看一下该方法:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 public void run() {
//判断执行状态,保证callable任务只被运行一次
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//这里回调我们创建的callable对象中的call方法
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
//处理执行结果
set(result);
}
} finally {
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
可以看到在FutureTask
的run()
方法中,最终回调我们提交的线程池的Callable
任务的call()
方法,执行结束之后,通过set(result)
方法处理call()
方法的返回结果:1
2
3
4
5
6
7
8
9protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
//设置执行结果v,并标记线程执行状态为:NORMAL
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
//完成执行
finishCompletion();
}
}
继续跟进finishCompletion()
方法,在该方法中会调用到done()
方法,而QueueingFuture
类是重写了这个方法的,就是将任务提交后的Future
添加到了阻塞队列里。1
2
3
4
5
6
7
8private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
至此,相关源码梳理完毕。