线程池是众多池化思想的一种。在Java中,与多线程相关的是java.util.concurrent
包,而其中最核心的便是线程池类java.util.concurrent.ThreadPoolExecutor
。该类包含了线程池的维护与任务执行的过程。本文主要围绕ThreadPoolExecutor
,既是源码分析,也是经验总结。如有理解不当,望不吝赐教。
整体结构
继承关系
在J.U.C
包下,线程池主要包括四个类(或接口)。从上到下依次是Executor
,ExecutorService
,AbstractExecutorService
和ThreadPoolExecutor
分工
Executor作为顶层父接口,将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器 (Executor) 中,由 Executor 框架完成线程的调配和任务的执行部分。
ExecutorService接口增加了一些能力:
- 扩充执行任务的能力,比如可以为一个或一批异步任务生成Future的方法(invokeAll);
- 提供了管控线程池的方法,比如停止线程池运行的方法(shutdown);
- 拓展了有返回值的提交任务方法(submit)。
AbstractExecutorService是抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。
ThreadPoolExecutor实现最复杂的运行部分,一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。
构造方法
ThreadPoolExecutor
有四个构造方法,除全参构造外,其它三个都是全参构造的子集。
全参构造
共7个构造参数,赋值给6个成员变量。
1 | public ThreadPoolExecutor(int corePoolSize, |
构造参数
含义分别如下:
参数 | 含义 |
---|---|
corePoolSize | 核心线程数。核心线程即使空闲也不会被清除 |
maximumPoolSize | 最大线程数。超出核心线程数的线程是非核心线程,空闲一定时间会被清除 |
keepAliveTime | 空闲线程的最大存活时间 |
unit | 存活时间单位 |
workQueue | 任务队列。当核心线程没有空闲时,会将任务丢到等待队列 |
threadFactory | 线程工厂。创建线程的工厂 |
handler | 任务拒绝策略。当核心线程、任务队列、非核心线程都满了,会执行拒绝策略 |
线程池运行流程
- 线程池创建时,并未创建线程,而是在第一次提交任务时创建;
- 在线程数未大于核心线程数
corePoolSize
时,每提交一个任务会创建一个线程(通过threadFactory
); - 在线程数达到核心线程数时,会将任务丢到工作队列
workQueue
里; - 在工作队列满了时,会继续创建非核心线程;
- 当线程数达到最大线程数
maximumPoolSize
时,会执行拒绝策略handler
; - 当非核心线程空闲达到存活时间(
keepAliveTime
unit
)时,会被自动移除。
参数设置建议
- 如果是计算密集型,由于CPU几乎没有空闲时间,因此线程数不应大于CPU核数,对于核数为N的CPU,建议线程数=N-1(留一个核给主线程)
- 如果是IO密集型,可以根据单个线程CPU使用率来确定。对于使用率为k(0<k<1)的任务,建议线程数=N/k
- 队列长度应与核心线程数相当。队列太长,不能及时创建新线程,导致处理任务效率低;队列太短,可能会频繁创建非核心线程
成员变量
除构造参数中指定的6个参数外,还有以下成员变量
ctl
ctl是原子int类型,共32位。其中高3位表示线程池状态,低29位表示线程数。
1 | // 原子类型,线程池初始状态是RUNNING,初始线程数是0 |
ctl初始值为ctlOf(RUNNING, 0)
,其中RUNNING值为-1<<29,即1110 00…00(29个0)。ctlOf
方法中,与0做|
运算后,结果不变。
1110 00…00表示线程池状态为111,线程数为0。
-1的二进制表示
源码:1000 00…01
反码:1111 11..10(符号位不变,其它取反)
补码:1111 11…11(反码加1)
largestPoolSize
曾经出现的最大线程数
1 | private int largestPoolSize; |
completedTaskCount
已完成的任务数(所有线程)
1 | private long completedTaskCount; |
allowCoreThreadTimeOut
是否允许核心线程超时,默认false
1 | private volatile boolean allowCoreThreadTimeOut; |
常量
COUNT_BITS
COUNT_BITS
表示线程数占ctl
的位数,即32-3=29。
1 | private static final int COUNT_BITS = Integer.SIZE - 3; |
CAPACITY
CAPACITY
用作拆分ctl
。值为1 << 29 - 1,二进制表示为00011111 11111111 11111111 11111111
,即3个0和29个1。
同时,CAPACITY
也表示线程池中允许的最大线程数,这样线程数就被限制在ctl
的低29位。
1 | private static final int CAPACITY = (1 << COUNT_BITS) - 1; |
线程池的五种状态
状态分别用-1~3的左移29位表示,结果按从小到大排序。
1 | private static final int RUNNING = -1 << COUNT_BITS; |
下表列出了线程池的五种状态的含义和触发条件。
状态 | 名称 | 二进制表示 | 说明 | 触发条件 |
---|---|---|---|---|
RUNNING | 运行 | 1110 00…00 | 允许提交任务 | 初始化时 |
SHUTDOWN | 关闭 | 0000 00…00 | 不允许提交任务,允许执行已提交的任务 | 调用shutdown()后 |
STOP | 停止 | 0010 00…00 | 不允许提交任务,丢弃已提交的任务,中断正在执行的任务 | 调用shutdownNow()后 |
TIDYING | 整理 | 0100 00…00 | 线程数清零,任务清零 | 调用tryTerminate()后 |
TERMINATED | 终止 | 0110 00…00 | 线程池终止 | 调用terminated()后 |
二进制下,各状态都只有高3位有效,分别为111
,000
,001
,010
,011
。这样线程池状态就被限制在ctl
的高3位。
内部类
Worker
Worker
是ThreadPoolExecutor
的内部类,继承了AbstractQueuedSynchronizer
和Runnable
接口。其中,实现Runnable
接口表明Worker
就是一个线程,承担着处理任务的工人角色。继承AQS使Worker
具备了有锁/无锁两个状态,分别用来表示线程的空闲/忙碌。方便后续销毁。
1 | private final class Worker extends AbstractQueuedSynchronizer implements Runnable {...} |
成员变量
Worker
有三个成员变量,分别是:
Thread thread
实际的工作线程Runnable firstTask
待处理的初始任务long completedTasks
任务计数器,统计单个worker执行的任务数
构造方法
Worker
只有一个构造方法。在调用构造方法时,会通过线程工厂创建线程,当前worker
对象作为Runnable
实例传入newThread
方法中。这样,当调用thread.start()
时,实际调用的是worker.run
方法。
1 | // 构造方法 |
这里有点绕,thread
是worker
的成员变量,但worker
又是thread
的target
。也就Doug Lea敢这么写啊。
成员方法
run方法
run
内部调用了runWorker
方法,传入的是当前worker
对象。
1 | public void run() { |
interruptIfStarted方法
中断所有运行中的线程。此方法只在shutdownNow
中有调用。
1 | void interruptIfStarted() { |
拒绝策略
j.u.c.RejectedExecutionHandler
接口提供了一个方法rejectedExecution
,当线程池和队列都满的时候,会调用rejectedExecution
方法,执行拒绝策略。
1 | // RejectedExecutionHandler只有一个方法 |
ThreadPoolExecutor
内置了4个RejectedExecutionHandler
实现类,分别表示4种拒绝策略。
CallerRunsPolicy
任务交由主线程运行。由于主线程被占用,此策略会暂时阻止任务继续提交。
1 | public static class CallerRunsPolicy implements RejectedExecutionHandler { |
AbortPolicy
抛出异常
1 | public static class AbortPolicy implements RejectedExecutionHandler { |
DiscardPolicy
丢弃且静默处理
1 | public static class DiscardPolicy implements RejectedExecutionHandler { |
DiscardOldestPolicy
丢弃最老的任务,提交当前任务
1 | public static class DiscardOldestPolicy implements RejectedExecutionHandler { |
对比
分别对四种拒绝策略构建的线程池(queueSize
=5,corePoolSize
=1,maxPoolSize
=5),for循环提交100个任务,执行如下:
拒绝策略 | 描述 | 作用 | 任务处理结果 |
---|---|---|---|
CallerRunsPolicy | 交给主线程 | 队列和最大线程都满了,则会把新任务交给调用线程处理。 | 处理1-100 |
AbortPolicy | 拒绝 | 队列和最大线程都满了,则抛出异常RejectedExecutionException |
处理1-10 |
DiscardPolicy | 丢弃 | 队列和最大线程都满了,直接丢弃 | 处理1-10 |
DiscardOldestPolicy | 丢弃最早 | 队列和最大线程都满了,丢弃旧任务,处理新任务 | 处理1,7-10,96-100 |
分析如下
CallerRunsPolicy:当线程和队列都满了,则由主线程处理第11号任务。等主线程处理完,才会重新提交12-21号任务,然后主线程继续处理22号任务。最终处理1-100。
注意:由于主线程的阻塞会影响任务提交,所以队列不应该设置的太小,否则有可能造成主线程繁忙、其它线程空闲的情况。建议
queueSize
=maxPoolSize
。AbortPolicy:当线程和队列都满了,直接抛出异常,程序中断,最终处理1-10。业务中可以捕获这个异常做后续处理。
DiscardPolicy:当线程和队列都满了,没有任何提示,程序中断,最终处理1-10。此策略应少用。
DiscardOldestPolicy:当线程和队列都满了的时候,此时线程持有1、7、8、9、10,队列持有2、3、4、5、6。此策略会丢弃队列头的任务,将新任务放入队列尾。因此就有:
丢弃2,放入11
丢弃3,放入12
……
丢弃94,放入99
丢弃95,放入100
等任务全部提交完毕,队列中还剩下96-100号任务。等线程空闲,就会来执行这5个,所以最后的执行结果是1、7、8、9、10、96、97、98、99、100。
当新任务对旧任务有替代作用时,可以使用此策略。比如业务需求是定时获取节目信息,那么新任务肯定比旧任务信息更准确,新任务到来时如果旧任务还没返回,可以丢弃旧任务。
注意,此策略下队列不应设置太小,否则可能导致任务被大量丢弃,如示例中所示。
成员方法
ThreadPoolExecutor
功能主要包含线程池维护和任务执行两部分,其中任务执行最重要的就是execute
方法
execute方法
execute
方法作用是将任务提交给线程池。具体可以分四种情况:
- 核心线程未满,创建核心线程,并将任务交给核心线程;
- 核心线程已满,队列未满,则将任务添加到队列;
- 队列已满,则创建非核心线程,将任务交给新线程;
- 最大线程已满,或线程池关闭,则执行拒绝策略。
源码如下:
1 | public void execute(Runnable command) { |
isRunning方法
c
< SHUTDOWN
,则只有RUNNING
一种情况。
1 | private static boolean isRunning(int c) { |
addWorker方法
顾名思义,addWorker
方法主要作用是创建工人对象。方法有两个参数
Runnable firstTask
,表示该线程需要处理的第一个任务boolean core
,表示该线程是否为核心线程
在execute
方法中,有三个地方会调用addWorker
方法。分别是:
- 核心线程未满,创建核心线程;
- 核心线程数为0,且任务已加到队列里,创建非核心线程;
- 核心线程已满且队列已满,创建非核心线程
源码如下:
1 | // 返回true,表示线程添加成功 |
workerCountOf方法
从ctl中获取线程数
1 | private static int workerCountOf(int c) { |
CAPACITY = (1 << 29) - 1, 1<<29是 0010 00...11
,再减1是 0001 11...11
,即3个1和29个0。c & CAPACITY取的就是c的低29位。
runStateOf方法
从ctl中获取线程池状态
1 | private static int runStateOf(int c) { |
CAPACITY 是 0001 11...11
,取反就是1110 00...00
,c & ~CAPACITY取的就是c的高3位。
compareAndIncrementWorkerCount方法
CAS操作,使ctl的值加1。跟线程数加1等效。
1 | private boolean compareAndIncrementWorkerCount(int expect) { |
addWorkerFailed方法
线程启动失败,会调用addWorkerFailed
方法,有三个作用:
- 从集合中移除当前Worker对象
- ctl值减1,因为在
addWorker
中先加了1 - 尝试终止线程池
此方法只在addWorker
中调用。
源码如下:
1 | private void addWorkerFailed(Worker w) { |
decrementWorkerCount方法
循环调用compareAndDecrementWorkerCount
使ctl减1,直到成功
1 | private void decrementWorkerCount() { |
compareAndDecrementWorkerCount方法
CAS操作,使ctl的值减1。跟线程数减1等效。
1 | private boolean compareAndDecrementWorkerCount(int expect) { |
runWorker方法
由上面分析可知:在addWorker
方法中,调用 t.start()
启动线程,t = worker.thread
,而 thread
持有的是worker
对象,因此会调用到worker
的run
方法,最终会调用到runWorker
。
runWorker
方法主要作用是:持续从队列中获取任务并消费,直到获取到的任务为null,则会结束循环并移除当前线程。
runWorker
方法在run
方法中唯一调用。
1 | final void runWorker(Worker w) { |
注意一下Thread
的三个方法:
方法 | 性质 | 返回值类型 | 作用 |
---|---|---|---|
interrupt | 成员方法 | void | 设置线程的中断状态,作用于调用线程 |
isInterrupted | 成员方法 | boolean | 返回线程的中断状态,作用于调用线程 |
interrupted | 静态方法 | boolean | 返回并清除线程的中断状态,只能作用于当前线程 |
getTask方法
在runWorker
方法中,如果 task==null
,则会调用getTask
方法从队列获取任务。
具体逻辑是:
- 队列不为空,从队列尾取出一个任务,并返回;
- 队列为空
- 如果线程数未超出核心线程,则
workQueue.take()
会一直阻塞,直到有新的任务入队列; - 如果线程数超出了核心线程,或者允许核心线程超时(核心线程默认不超时,可通过
allowCoreThreadTimeOut()
方法设置),则在从队列获取任务时设置一个超时时间(即keepAliveTime
),超时则返回null,跳出循环,进入后续清理逻辑。
- 如果线程数未超出核心线程,则
这样就实现了设置线程存活时间的逻辑。非常巧妙。
getTask
方法在runWorker
方法中唯一调用。
源码如下:
1 | private Runnable getTask() { |
processWorkerExit方法
在runWorker
方法中,如果getTask
方法有结果,则while循环得以持续;
如果结果为null,循环正常结束,执行processWorkerExit(w, false)
,将当前线程从集合中移除;
如果循环异常结束,则执行processWorkerExit(w, true)
,将当前线程移除并补充一个线程。
1 | private void processWorkerExit(Worker w, boolean completedAbruptly) { |
shutdown方法
shutdown
方法会使线程池进入SHUTDOWN
状态,不再接收新任务,但可以处理已经提交的任务。
此方法主要分三步进行:
- 校验执行线程是否有关闭线程池的权限;
- 修改线程池状态为
SHUTDOWN
; - 中断空闲的线程;
- 最后尝试关闭线程池。
1 | public void shutdown() { |
怎么实现“不再接收新任务”?
在execute
方法中,共有三处调用addWorker
方法。
在执行advanceRunState(SHUTDOWN)
后,线程池状态改为SHUTDOWN
,因此只会进入1或3两个addWorker
中。而这两处调用,都有command!=null
。因此addWorker
方法必然返回false(满足rs >= SHUTDOWN && firstTask != null
)。最后会进入reject
。
checkShutdownAccess方法
权限校验方法
1 | private void checkShutdownAccess() { |
advanceRunState方法
advanceRunState
方法会将ctl
的线程状态设为targetState
,循环直到成功为止
1 | private void advanceRunState(int targetState) { |
interruptIdleWorkers方法
内部调用重载方法,中断所有空闲的线程
1 | private void interruptIdleWorkers() { |
interruptIdleWorkers(boolean onlyOne)方法
中断空闲的线程。如果onlyOne=true
,则最多只中断一个,否则中断所有。
1 | private void interruptIdleWorkers(boolean onlyOne) { |
怎么实现“中断空闲的线程,但不影响已提交的任务”?
在执行t.interrupt
前,首先判断if (!t.isInterrupted() && w.tryLock())
,要求t
线程未中断且worker
处于空闲状态(未上锁)。
另一方面,在runWorker
方法中,一旦task != null || (task = getTask()) != null
成立,就会执行w.lock()
使worker
上锁。
因此只有阻塞在getTask()
的t
线程才会进入interruptIdleWorkers
方法的if
条件。
又由于t
线程只会阻塞在BlockingQueue.poll
或BlockingQueue.take
方法处,而这两个方法都会抛出InterruptedException
异常,因此t.interrupt()
会使t
线程中断运行。
tryTerminate方法
尝试终止线程池。只有在线程池状态为STOP
或SHUTDOWN
(且队列为空),且没有工作线程时,线程池才会终止,最后线程池进入TERMINATED
状态。如果有线程在运行,则尝试中断一个,方法结束。
1 | final void tryTerminate() { |
runStateAtLeast方法
判断状态。五种状态由小到大排序,比较大小即可。
1 | private static boolean runStateAtLeast(int c, int s) { |
shutdownNow方法
与shutdown
方法不同的是,shutdownNow
方法调用advanceRunState
方法使线程池进入STOP
状态,不再接收新任务,通过interruptWorkers
方法中断正在执行的任务,通过drainQueue
方法丢弃并返回队列中的任务。
1 | public List<Runnable> shutdownNow() { |
interruptWorkers方法
interruptWorkers
方法通过调用w.interruptIfStarted()
中断所有运行中的线程,来中断正在执行的任务。
但是注意:interruptWorkers
并不一定真的会中断正在执行中的任务。因为interrupt
方法实际上只是设置了一个中断标记,只有当线程执行到sleep
, wait
, join
方法,抛出InterruptedException
异常时,任务才会中断。否则当前任务会继续执行完毕。
1 | private void interruptWorkers() { |
drainQueue方法
drainQueue
方法将阻塞队列中的元素转移到集合中,并清空阻塞队列。
1 | private List<Runnable> drainQueue() { |