并发编程-CompletionService异步非阻塞获取并行任务执行结果

代码示例

当我们需要多线程执行任务并获取任务的结果时,可以将Callable任务提交到线程池,提交后会返回一个Future对象,调用Futureget()方法就可以拿到线程执行的结果。如果将每个线程的Future对象保存到一个集合中,再依次获取每个Future的结果,最终我们就可以获取所以线程的结果集。

示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Slf4j
public class CompleteServiceTest {

private static final ExecutorService executor = Executors.newFixedThreadPool(10);

static class TestCallable implements Callable<Integer> {

private int index;

TestCallable(int index) {
this.index = index;
}

@Override
public Integer call() throws Exception {
// 当index为3的倍数时睡眠3s
if (index % 3 == 0) {
TimeUnit.SECONDS.sleep(3L);
}
return index;
}
}


@Test
public void futureListTest() throws ExecutionException, InterruptedException {
List<Future<Integer>> futureList = new ArrayList<>();
for (int i = 1; i <= 5; i++) {
TestCallable callable = new TestCallable(i);
Future<Integer> future = executor.submit(callable);
futureList.add(future);
}
for (Future<Integer> future : futureList) {
log.info("线程:{} 执行任务结束", future.get());
}
}


}

运行futureListTest()方法打印结果:

1
2
3
4
5
16:47:53.850 [main] INFO com.lzumetal.multithread.threadpool.CompleteServiceTest - 线程:1 执行任务结束
16:47:53.865 [main] INFO com.lzumetal.multithread.threadpool.CompleteServiceTest - 线程:2 执行任务结束
16:47:56.842 [main] INFO com.lzumetal.multithread.threadpool.CompleteServiceTest - 线程:3 执行任务结束
16:47:56.842 [main] INFO com.lzumetal.multithread.threadpool.CompleteServiceTest - 线程:4 执行任务结束
16:47:56.842 [main] INFO com.lzumetal.multithread.threadpool.CompleteServiceTest - 线程:5 执行任务结束

但上面这种多线程执行任务的方式有个不好的地方,因为我们用遍历的方式调用future.get(),而future.get()方法又是阻塞的,所以会有上面例子中的情况,线程4和线程5已经执行完任务了,但还是要等线程3执行完后,才能拿到线程4和线程5的执行结果。

如果使用CompletionService则可以很好地解决这个问题,因为它可以异步非阻塞地获取并行任务的执行结果。基于上面已有代码,CompletionService测试代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void completionServiceTest() throws InterruptedException, ExecutionException {
CompletionService<Integer> completionService = new ExecutorCompletionService<>(executor);
int taskCount = 0;
for (int i = 1; i <= 5; i++) {
TestCallable callable = new TestCallable(i);
completionService.submit(callable);
taskCount++;
}
for (int i = 0; i < taskCount; i++) {
log.info("线程:{} 执行任务结束", completionService.take().get());
}
}

打印结果:

1
2
3
4
5
17:02:29.650 [main] INFO com.lzumetal.multithread.threadpool.CompleteServiceTest - 线程:1 执行任务结束
17:02:29.660 [main] INFO com.lzumetal.multithread.threadpool.CompleteServiceTest - 线程:2 执行任务结束
17:02:29.668 [main] INFO com.lzumetal.multithread.threadpool.CompleteServiceTest - 线程:4 执行任务结束
17:02:29.670 [main] INFO com.lzumetal.multithread.threadpool.CompleteServiceTest - 线程:5 执行任务结束
17:02:32.667 [main] INFO com.lzumetal.multithread.threadpool.CompleteServiceTest - 线程:3 执行任务结束

可以看到这种多线程执行的方式效率要更好,任务只要一执行完毕我们就可以先拿到结果,而不必按照一定的顺序去获取结果。

源码分析

CompletionService异步非阻塞获取执行结果的实现原理其实很简单,就是写一个回调函数,任务执行完毕后都调用这个回调函数,这个回调函数的操作是将执行结果放到队列里。最终的效果是每当有一个任务执行完毕,队里里面都会增加一个执行的结果,先执行完的先放,只要队列里有元素,我们就从队列里拉取到这个任务结果。

具体ExecutorCompletionService源码里是怎么写的呢?

首先看一下ExecutorCompletionService的成员变量:

1
2
3
4
5
6
7
//真正执行任务的线程池
private final Executor executor;

private final AbstractExecutorService aes;

//用来保存任务Future的队列
private final BlockingQueue<Future<V>> completionQueue;

再看下构造方法,初始化了一个阻塞队列,用来存储已完成的任务。

1
2
3
4
5
6
7
8
9
10
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;

//创建阻塞队列
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}

然后看一下提交任务的submit()方法。

1
2
3
4
5
6
7
8
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);

//将我们的callable任务最终包装成QueueingFuture
executor.execute(new QueueingFuture(f));
return f;
}

可以看到,我们提交的Callable任务被包装成QueueingFuture,而QueueingFutureFutureTask的子类,而FutureTask又实现了Runnable接口,所以最终执行了FutureTask中的run()方法。来看一下该方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
 public void run() {
//判断执行状态,保证callable任务只被运行一次
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//这里回调我们创建的callable对象中的call方法
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
//处理执行结果
set(result);
}
} finally {
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}

可以看到在FutureTaskrun()方法中,最终回调我们提交的线程池的Callable任务的call()方法,执行结束之后,通过set(result)方法处理call()方法的返回结果:

1
2
3
4
5
6
7
8
9
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
//设置执行结果v,并标记线程执行状态为:NORMAL
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
//完成执行
finishCompletion();
}
}

继续跟进finishCompletion()方法,在该方法中会调用到done()方法,而QueueingFuture类是重写了这个方法的,就是将任务提交后的Future添加到了阻塞队列里。

1
2
3
4
5
6
7
8
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}

至此,相关源码梳理完毕。

------ 本文完 ------