

blog/ThreadPoolExecutor源码剖析.md at master · c-rainstorm/blog · GitHub
source link: https://github.com/c-rainstorm/blog/blob/master/java/ThreadPoolExecutor%E6%BA%90%E7%A0%81%E5%89%96%E6%9E%90.md?
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

ThreadPoolExecutor 源码剖析
源码基于 JDK9
ThreadPoolExecutor
为每个提交的任务分配一个线程处理,是一种 ExecutorService
实现。通常使用 Executors
的工厂方法来进行配置。
因为减少了每个任务调度的开销,所以它能在执行大量异步任务的场景中提供更好的性能。并且它提供了一种限定和管理资源(比如线程)的方式。他也会保存一些基本的统计信息,比如已完成的任务数量。
一般情况下我们使用 Executors
的工厂方法来创建相应的实例。
Executors.newCachedThreadPool()
,线程数量没有上界(Integer.MAX_VALUE
),有新任务提交并且没有空闲线程时,创建一个新线程执行该任务,每个线程空闲时间为 60s, 60s 空闲后线程会被移出缓存。使用SynchronousQueue
作为任务队列的实现类。适用于执行大量生命周期短的异步任务。Executors.newFixedThreadPool(int)
,固定容量的线程池。使用LinkedBlockingQueue
作为任务队列的实现类。当新任务到达时,创建新线程,当线程数达到上限时,将任务放到队列中,任务队列中任务数量没有上界。当线程创建之后就一直存在直至显式的调用shutdown()
方法。Executors.newSingleThreadExecutor()
,单个 Worker 的线程池。和newFixedThreadPool(1)
类似,区别在于这个实例经过了一次封装,不能对该实例的参数进行重配置,并且实现了finalize()
方法,能够在 GC 时调用shutdown()
方法关闭该线程池。
ThreadPoolExecutor
实现了 Executor
和 ExecutorService
两个接口。
Executor
是执行已提交任务的对象。这个接口提供了一种分离任务提交和任务执行细节的机制。用户只需要通过 execute()
方法提交任务即可,不用显式的创建线程。但使用该接口并不意味着就是异步执行,比如我们实现一个 Executor
类,在 execute(Runnable r)
中直接调用任务的 run()
方法。
ExecutorService
提供了一些管理终止和能够输出 Future
(用来跟踪异步任务进度)的方法。提供了两个用来 shutdown 的方法:
shutdown()
。允许之前已提交的任务执行完毕。shutdownNow()
。不允许任务队列中的任务再执行并且试图去中断正在执行的任务。
ExecutorService
在不用时应该 shutdown 来允许回收其占用的资源。
线程池的状态在 ThreadPoolExecutor
中的实际表示方式是一个 AtomicInteger
类型的成员变量的高三位(因为有 5 种状态),名称为 ctl
。
ctl
是 32 位的整型变量,他封装了两个变量:
- 线程池运行状态。高三位。
- RUNNING: 111
- SHUTDOWN: 000
- STOP: 001
- TIDYING: 010
- TERMINATED: 011
- 有效工作线程数。低 29 位。
因为 ctl
是 AtomicInteger
的实例,其上的操作基于 CAS,是线程安全的。
shutdown()
advanceRunState(SHUTDOWN);
首先将运行状态修改为SHUTDOWN
此时当有新任务提交时直接抛出RejectedExecutionException
来拒绝服务。interruptIdleWorkers()
通过调用 Worker 线程的interrupt()
方法试图中断空闲 worker。// todot.interrupt()
方法会对那些线程状态有效?成功调用会产生什么影响?Java 的线程状态和操作系统内部线程状态之间有什么关系?此处涉及的知识面稍广,浪费了一些时间依旧没能理解,把 ThreadPoolExecutor 过完以后再系统解决。tryTerminate()
只对于运行状态为STOP
或SHUTDOWN
+任务队列为空两种情况, 当 Worker 数量未减到 0 之前,每次调用会尝试中断一个 Worker 线程。当任务队列为不为空时,Worker 线程处理完正在处理的任务,会从工作队列中取出未处理的任务继续工作,循环这个过程至任务队列为空,Worker 获取不到要处理的任务时会将其从 Worker 集中移除,worker 数量减一,然后在processWorkerExit()
方法中再次调用tryTerminate()
。当 Worker 线程数量减到 0 以后再调用该方法时,会将运行状态修改为TIDYING
并调用terminated()
方法。ThreadPoolExextor
中该方法为空函数所以TIDYING
和TERMINATED
两个状态基本没有区别。
shutdownNow()
advanceRunState(STOP)
首先将运行状态修改为STOP
此时当有新任务提交时直接抛出RejectedExecutionException
来拒绝服务。interruptWorkers();
drainQueue()
,直接清空任务队列tryTerminate()
Core and maximum pool sizes
ThreadPoolExecutor
会依照 corePoolSize
和 maximumPoolSize
两个字段来动态调整线程池的大小。新任务提交过来时,如果当前活动的线程数少于 corePoolSize
会创建一个新线程来处理这个新任务即使当前有空闲线程。如果当前线程数大于 corePoolSize
小于 maximumPoolSize
且任务队列已满时也会创建新线程。
- 配置两个属性相等时可以获得固定容量的线程池。
- 将
maximumPoolSize
设为一个很大的数(比如Integer.MAX_VAlUE
)时,可以获得一个无上界的线程池,可以用来处理任意数量的并发任务。(Tips: 线程过多并不合适,因为物理机的 CPU 数量有限,无法同时处理那么多线程,只会白白占用资源,所以这个属性可以根据实际物理机 CPU 数量来配置。)
通常来说这两个属性,只通过构造器来配置,但是 ThreadPoolExecutor
也提供了 setter
方法可以在运行时配置。
On-demand construction
如果在构造 ThreadPoolExecutor
时,任务队列中已经有要处理的任务了,那么在创建好以后,可用通过直接调用 prestartCoreThread()
或 prestartAllCoreThreads()
来创建核心线程去处理这些任务。否则这些任务就只能在有新任务提交过来以后才能开始处理。
Creating new threads
新线程是通过 ThreadFactory
来创建的,如果在构造时未指定,就使用默认的 java.util.concurrent.Executors.DefaultThreadFactory
。该 ThreadFactory
创建的线程都属于同一个线程组、Thread.NORM_PRIORITY
优先级、用户线程。
我们可以通过指定一个不同的线程工厂来修改线程名、线程组、优先级、线程守护状态等等。
Keep-alive times
如果当前线程数量超出了 corePoolSize
,超出的那部分非核心线程会在空闲超出 keepAliveTime
时被终止。这能够线程池活跃状态不足时及时回收占用的资源。该参数也可以使用 setKeepAliveTime(long, TimeUnit)
来运行时动态修改。可以通过使用 Long.MAX_VALUE TimeUnit.NANOSECONDS
两个参数来禁用线程回收。默认情况下核心线程超时不回收,可以通过配置 keepAliveTime
和 allowCoreThreadTimeOut
来允许核心线程超时回收。
Queuing
任意的 BlockingQueue
实现都可以作为任务队列。任务队列的使用对线程池收缩会有一定影响:
- 如果当前线程数少于
corePoolSize
,新提交的任务会直接提交给新创建的线程。 - 如果当前线程数不少于
corePoolSize
,新提交的任务会提交到任务队列中。 - 如果新任务无法提交到任务队列(队列已满),会尝试创建一个新线程,如果线程数达到了
maximumPoolSize
而导致新线程无法创建则该任务会被拒绝。
一般来说,任务队列有三种使用策略:
- 直接交付。直接将到达的任务交付给线程,而不是将任务暂存起来。当没有空闲线程可用时直接新建线程。这种方式通常需要无上界的
maximumPoolSize
来防止拒绝服务。当然这种方式也有缺点,新任务到达的速度超过任务处理的速度时,新建的线程数量会越来越多。耗费内存。常常使用SynchronousQueue
作为任务队列的实现类。 - 无界队列。使用无界队列的话,执行任务的线程数不会超过
corePoolSize
的大小,但核心线程都无空闲时,新到的任务会添加到任务队列。当新任务到达的速度超过了任务处理的速度时,任务会积累的越来越多。常常使用LinkedBlockingQueue
作为任务队列的实现类。 - 有界队列。和有限的
maximumPoolSize
结合使用能够防止资源耗尽。但是队列的大小和maximumPoolSize
的大小配置权衡起来会更难一些。大队列加小容量线程池可以最小化 CPU使用率、OS 资源和上下文切换的开销。但是有可能吞吐量会比较低,如果任务频繁阻塞(I/O操作)的话无法最优使用 CPU 资源。如果使用小队列加大容量的线程池,可以保证 CPU 的使用率,但是上下文调度的开销可能会过大,这也会降低吞吐量。常使用ArrayBlockingQueue
作为任务队列的实现类。
Rejected tasks
Executor
状态不再是RUNNING
(已经被SHUTDOWN
)- 任务队列已满并且线程数量达到最大值,已达到饱和状态。
Hook methods
ThreadPoolExecutor
也提供了一些其他方法,子类可以重写这些方法来提供额外的支持:重新初始化 ThreadLocals
,收集统计信息,添加日志等等。
beforeExecute(Thread, Runnable)
, //任务执行前调用afterExecute(Runnable, Throwable)
//任务执行后调用terminated()
//Executor
状态转为TIDYING
后调用
Queue maintenance
getQueue()
可以访问任务队列,但是只鼓励用于监控与调试。remove(Runnable)
和purge()
方法可以用于取消尚未执行的任务。remove(Runnable)
直接从任务队列删除,purge()
从任务队列批量删除已取消的Future
Finalization
当线程池没有到 GC Roots 的引用并且 Worker 数为 0 时会被自动回收。
如果想要在忘记调用 shutdown()
时也能确保未被引用的线程池被回收的话,需要确保未使用的线程最终都被能终止。可以设置合适的 keepAliveTime
以及 allowCoreThreadTimeOut
。
任务处理流程
我们以一个简单例子来剖析一下整个过程。
package me.rainstorm;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
*
* @author baochen.zhang
* @date 2017.12.4
*/
public class ThreadPoolExecutorDemo {
private static final int THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors() + 2;
private static ExecutorService exe = new ThreadPoolExecutor(THREAD_POOL_SIZE, THREAD_POOL_SIZE,
60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(100));
{
((ThreadPoolExecutor)exe).allowCoreThreadTimeOut(true);
}
public static void main(String[] args) {
exe.execute(() -> System.out.println("Hello world"));
exe.shutdown();
exe = null;
}
}
ctl
变量之前已经提到了,作为一个控制变量用来控制线程池的状态和工作线程数。默认值是RUNNING | 0
,即状态为RUNNING
,Worker 线程数为 0;- 在 Demo 中并未指定线程工厂,
ThreadPoolExecutor
使用Executors
提供的默认线程工厂。 - 因为只有一个任务,所以
main
方法中在提交完这个任务后,直接调用了shutdown()
方法,并将其赋为null
便于在任务执行完毕后回收资源,一般情况下推荐在所有任务都提交到线程池以后再调用shutdown
,否则之后的任务直接会被拒绝掉。 - 因为只有一个任务,且允许核心线程超时,所以该线程在
getTask()
过程中会超时,然后返回null
,进入processWorkerExit()
流程。 - 线程池在进入
TERMINATED
状态后就可以被 GC 了。
一般情况下使用 Executors
的工厂方法来创建即可适用于大多数场景。需要配置的话参考 个性定制 来配置更合适自己项目的 ThreadPoolExecutor
。
Recommend
-
86
什么是空指针异常,如何修正它 原文链接:What is a NullPointerException, and how do I fix it?
-
78
-
88
以下所有内容以及源码分析都是基于JDK1.8的,请知悉。我写博客就真的比较没有顺序了,这可能跟我的学习方式有关,我自己也觉得这样挺不好的,但是没办法说服自己去改变,所以也只能这样想到什么学什么了。池化技术真的是一门在我看来非常牛逼的技术,因为它做到了...
-
51
-
54
联系我1.Q群【Java开发技术交流】:https://jq.qq.com/?_wv=1027&k=5UB4P1T2.完整博客链接:www.shishusheng.com3.知乎:http://www.zhihu.com/people/shi-shu-sheng-4.gayhub:https://github.com/Wasabi12341线程池的好处线程使应用能够更加充分合理地协调利用C...
-
47
很早之前就打算看一次JUC线程池 ThreadPoolExecutor 的源码实现,由于近段时间比较忙,一直没有时间整理出源码分析的文章。之前...
-
16
阿粉万字长文带你解析 ThreadPoolExecutor 为什么要用线程池 你有没有这样的疑惑,为什么要用线程池呢?可能你会说,我可以复用已经创建的线程呀;线程是个重量级对象,为了避免频繁创建和销毁,使用线程池来管理最好...
-
5
线程池执行任务的流程 如果线程池工作线程数<corePoolSize,创建新线程执行task,并不断轮训t等待队列处理task。 如果线程池工作线程数>=corePoolSize并且等待队列未满,将tas...
-
9
(juc系列)threadpoolexecutor源码学习 - 呼延十的博客 | HuYan Blog本文源码基于: JDK13 其实早在19年,就简单的写过ThreadPoolExecutor. 但是只涉及到了其中两个参数,理解也不深刻,今天重新看一下代码。 这个类是Java中常...
-
6
1ThreadPoolExecutor 线程销毁源码分析2021 年 1...
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK