55

高并发编程:ExecutorCompletionService 深入解析

 4 years ago
source link: https://www.tuicool.com/articles/u2AnI3A
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

要点解说

假设现在有一大批需要进行计算的任务,为了提高整批任务的执行效率,你可能会使用线程池,向线程池中不断submit异步计算任务,同时你需要保留与每个任务关联的Future,最后遍历这些 Future,通过调用 Future接口实现类的 get方法获取整批计算任务的各个结果。

虽然使用了线程池提高了整体的执行效率,但遍历 这些Future, 调用 Future接口实现类的get方法是阻塞的,也就是和当前这个 Future关联的计算任务真正执行完成的时候,get方法才返回结果,如果当前计算任务没有执行完成,而有其它 Future关联的计算任务已经执行完成了,就会白白浪费很多等待的时间,所以最好是遍历的时候谁先执行完成就先获取哪个结果,这样就节省了很多持续等待的时间。

而ExecutorCompletionService可以实现这样的效果,它的内部有一个先进先出的阻塞队列,用于保存已经执行完成的Future,通过调用它的take方法或poll方法可以获取到一个已经执行完成的 Future,进而通过调用 Future接口实现类的get方法获取最终的结果。

实例演示

@Test

public void test() throws InterruptedException, ExecutionException {

Executor executor = Executors.newFixedThreadPool(3);

CompletionService<String> service = new ExecutorCompletionService<>(executor);

for (int i = 0 ; i < 5 ;i++) {

int seqNo = i;

service.submit(new Callable<String>() {

@Override

public String call() throws Exception {

return "HelloWorld-" + seqNo + "-" + Thread.currentThread().getName();

}

});

}

for (int j = 0 ; j < 5; j++) {

System.out.println(service.take().get());

}

}

执行结果:

HelloWorld-2-pool-1-thread-3

HelloWorld-1-pool-1-thread-2

HelloWorld-3-pool-1-thread-2

HelloWorld-4-pool-1-thread-3

HelloWorld-0-pool-1-thread-1

方法解析

ExecutorCompletionService实现了CompletionService接口,在CompletionService接口中定义了如下这些方法:

  • Future<V> submit(Callable<V> task):提交一个Callable类型任务,并返回该任务执行结果关联的Future;

  • Future<V> submit(Runnable task,V result):提交一个Runnable类型任务,并返回该任务执行结果关联的Future;

  • Future<V> take():从内部阻塞队列中获取并移除第一个执行完成的任务,阻塞,直到有任务完成;

  • Future<V> poll():从内部阻塞队列中获取并移除第一个执行完成的任务,获取不到则返回null,不阻塞;

  • Future<V> poll(long timeout, TimeUnit unit):从内部阻塞队列中获取并移除第一个执行完成的任务,阻塞时间为timeout,获取不到则返回null;

源码解析

根据上面的实例演示代码分析 ExecutorCompletionService内部的实现原理。

ExecutorCompletionService有三个私有属性,分别是 executor、aes和completionQueue,其中completionQueue就是存储已完成任务的队列, 具体代码如下图。

eUVnE37.png!web

进入它的构造方法,在方法内部给它的 三个属性赋值,可以看到在这里初始化了一个LinkedBlockingQueue类型的先进先出阻塞队列 ,具体代码如下图。

nQBbiuj.png!web

接着,进入ExecutorCompletionService的submit方法,这里我们分析参数类型是Callable的 submit方法,具体代码如下图。

uMN3uuF.png!web

跟踪代码进入newTaskFor方法,具体代码如下图。

aiyU3aZ.png!web

ExecutorCompletionService构造方法中已经给aes赋过值了,所以进入AbstractExecutorService的newTaskFor方法,具体代码如下图。

跟踪代码进入FutureTask构造方法,具体代码如下图。

Yj2EfuM.jpg!web

到这里构建的RunnableFuture实例对象完成了,回到上述的submit方法中,继续分析executor.execute(new QueueingFuture(f)),首先是 new QueueingFuture(f),QueueingFuture是 ExecutorCompletionService中的内部类,具体代码如下图。

VBVbeuA.png!web

从图中的代码可以看到,将 RunnableFuture实例对象赋值给了 QueueingFuture的task属性,注意上图红框中有一个done方法,它的内部是将一个task添加到已完成阻塞队列中,这个先记住后面会用到。接着,分析 executor.execute(new QueueingFuture(f)),因为我们的实例演示代码中使用到的是ThreadPoolExecutor,所以 executor.execute()方法执行到 ThreadPoolExecutor中,具体重点代码如下图。 2Yb6f2z.jpg!web

uYJfuea.jpg!web

这里我们不分析极端的情况,当工作线程数小于核心线程数的时候,执行addWorker方法,这个方法体的内容比较多,这里只关注重点代码,具体代码如下图。

RVJZvuE.jpg!web

第一个红框中的代码会构建一个Worker实例,具体代码如下图。

fUZrE3M.jpg!web

根据上图中的红框代码,继续跟踪代码,会发现t.start()方法会执行到上图的run方法中,而run方法的内部执行了runWorker方法,具体代码如下图。

Vneym2q.jpg!web

上图中代码继续跟踪可以发现,执行task.run()会进入前面构建的 RunnableFuture实例对象的run方法中,具体代码如下图。

6jY3Yre.jpg!web

第一个红框中的代码就是实际任务执行的代码,也就是submit提交的任务真正执行的地方。第二个红框中的代码是当发生异常时的处理,第三个红框中的代码是正常执行完成的处理,下面是它们的具体实现代码。

aEv2miY.png!web

fmeqUbQ.png!web

从上面两张图中的代码发现,都执行了finishCompletion()方法,下面来揭晓这个方法的作用,具体代码如下图。

BzA32mU.png!web

从上图红框中的代码可以看到,这里执行了done()方法,实际执行的是我们前面分析提到的 将一个task添加到已完成阻塞队列中的那个done方法。至此,当一个任务执行完成或异常的时候,都会被添加到已完成阻塞队列中,进而被取出处理。

下面再分析一下 ExecutorCompletionService中的take方法和poll方法,具体代码如下图。

Yjeeiur.jpg!web

从上图可以看到,都是操作已完成阻塞队列,那我们就看一下这个已完成阻塞列队中的代码,如下图。

Vr2URbq.png!web

上图清晰的展示了通过循环等待已完成的执行任务。

7vMrAzu.png!web

上图代码不阻塞,当没有已完成的执行任务时,直接返回null。

Yjmaqij.png!web

上图代码阻塞指定时间,当没有已完成的执行任务时,直接返回null。

往期精彩内容

觉得有收获,诚邀 关注、点赞、转发

I3AbYfq.jpg!web


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK