JDK源码分析系列--ThreadPoolExecutor

线程池是众多池化思想的一种。在Java中,与多线程相关的是java.util.concurrent包,而其中最核心的便是线程池类java.util.concurrent.ThreadPoolExecutor。该类包含了线程池的维护与任务执行的过程。本文主要围绕ThreadPoolExecutor,既是源码分析,也是经验总结。如有理解不当,望不吝赐教。


整体结构

继承关系

J.U.C包下,线程池主要包括四个类(或接口)。从上到下依次是ExecutorExecutorServiceAbstractExecutorServiceThreadPoolExecutor

image/image-20220127112305054.png

分工

  • Executor作为顶层父接口,将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器 (Executor) 中,由 Executor 框架完成线程的调配和任务的执行部分。

  • ExecutorService接口增加了一些能力:

    • 扩充执行任务的能力,比如可以为一个或一批异步任务生成Future的方法(invokeAll);
    • 提供了管控线程池的方法,比如停止线程池运行的方法(shutdown);
    • 拓展了有返回值的提交任务方法(submit)。
  • AbstractExecutorService是抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。

  • ThreadPoolExecutor实现最复杂的运行部分,一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。

构造方法

ThreadPoolExecutor有四个构造方法,除全参构造外,其它三个都是全参构造的子集。

全参构造

共7个构造参数,赋值给6个成员变量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
// 调用重写的toNanos方法将超时时间转换成纳秒
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

构造参数

含义分别如下:

参数 含义
corePoolSize 核心线程数。核心线程即使空闲也不会被清除
maximumPoolSize 最大线程数。超出核心线程数的线程是非核心线程,空闲一定时间会被清除
keepAliveTime 空闲线程的最大存活时间
unit 存活时间单位
workQueue 任务队列。当核心线程没有空闲时,会将任务丢到等待队列
threadFactory 线程工厂。创建线程的工厂
handler 任务拒绝策略。当核心线程、任务队列、非核心线程都满了,会执行拒绝策略

线程池运行流程

  1. 线程池创建时,并未创建线程,而是在第一次提交任务时创建;
  2. 在线程数未大于核心线程数corePoolSize时,每提交一个任务会创建一个线程(通过threadFactory);
  3. 在线程数达到核心线程数时,会将任务丢到工作队列workQueue里;
  4. 在工作队列满了时,会继续创建非核心线程;
  5. 当线程数达到最大线程数maximumPoolSize时,会执行拒绝策略handler
  6. 当非核心线程空闲达到存活时间(keepAliveTime unit)时,会被自动移除。

参数设置建议

  1. 如果是计算密集型,由于CPU几乎没有空闲时间,因此线程数不应大于CPU核数,对于核数为N的CPU,建议线程数=N-1(留一个核给主线程)
  2. 如果是IO密集型,可以根据单个线程CPU使用率来确定。对于使用率为k(0<k<1)的任务,建议线程数=N/k
  3. 队列长度应与核心线程数相当。队列太长,不能及时创建新线程,导致处理任务效率低;队列太短,可能会频繁创建非核心线程

成员变量

除构造参数中指定的6个参数外,还有以下成员变量

ctl

ctl是原子int类型,共32位。其中高3位表示线程池状态,低29位表示线程数。

1
2
3
4
5
6
7
// 原子类型,线程池初始状态是RUNNING,初始线程数是0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// 或运算
private static int ctlOf(int rs, int wc) {
return rs | wc;
}

ctl初始值为ctlOf(RUNNING, 0),其中RUNNING值为-1<<29,即1110 00…00(29个0)。ctlOf方法中,与0做|运算后,结果不变。

1110 00…00表示线程池状态为111,线程数为0。

-1的二进制表示

源码:1000 00…01

反码:1111 11..10(符号位不变,其它取反)

补码:1111 11…11(反码加1)

largestPoolSize

曾经出现的最大线程数

1
private int largestPoolSize;

completedTaskCount

已完成的任务数(所有线程)

1
private long completedTaskCount;

allowCoreThreadTimeOut

是否允许核心线程超时,默认false

1
private volatile boolean allowCoreThreadTimeOut;

常量

COUNT_BITS

COUNT_BITS 表示线程数占ctl的位数,即32-3=29。

1
private static final int COUNT_BITS = Integer.SIZE - 3;

CAPACITY

CAPACITY用作拆分ctl。值为1 << 29 - 1,二进制表示为00011111 11111111 11111111 11111111,即3个0和29个1。

同时,CAPACITY也表示线程池中允许的最大线程数,这样线程数就被限制在ctl的低29位。

1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

线程池的五种状态

状态分别用-1~3的左移29位表示,结果按从小到大排序。

1
2
3
4
5
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

下表列出了线程池的五种状态的含义和触发条件。

状态 名称 二进制表示 说明 触发条件
RUNNING 运行 1110 00…00 允许提交任务 初始化时
SHUTDOWN 关闭 0000 00…00 不允许提交任务,允许执行已提交的任务 调用shutdown()后
STOP 停止 0010 00…00 不允许提交任务,丢弃已提交的任务,中断正在执行的任务 调用shutdownNow()后
TIDYING 整理 0100 00…00 线程数清零,任务清零 调用tryTerminate()后
TERMINATED 终止 0110 00…00 线程池终止 调用terminated()后

二进制下,各状态都只有高3位有效,分别为111000001010011。这样线程池状态就被限制在ctl的高3位。

内部类

Worker

WorkerThreadPoolExecutor的内部类,继承了AbstractQueuedSynchronizerRunnable接口。其中,实现Runnable接口表明Worker就是一个线程,承担着处理任务的工人角色。继承AQS使Worker具备了有锁/无锁两个状态,分别用来表示线程的空闲/忙碌。方便后续销毁。

1
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {...}

成员变量

Worker有三个成员变量,分别是:

  • Thread thread 实际的工作线程
  • Runnable firstTask 待处理的初始任务
  • long completedTasks 任务计数器,统计单个worker执行的任务数

构造方法

Worker只有一个构造方法。在调用构造方法时,会通过线程工厂创建线程,当前worker对象作为Runnable实例传入newThread方法中。这样,当调用thread.start()时,实际调用的是worker.run方法。

1
2
3
4
5
6
7
8
// 构造方法
Worker(Runnable firstTask) {
// 设置state为-1,防止线程被中断。只有大于等于0可以被中断
setState(-1);
this.firstTask = firstTask;
// 使用线程工厂方法创建线程,传入当前worker对象
this.thread = getThreadFactory().newThread(this);
}

这里有点绕,threadworker的成员变量,但worker又是threadtarget。也就Doug Lea敢这么写啊。

成员方法

run方法

run内部调用了runWorker方法,传入的是当前worker对象。

1
2
3
public void run() {
runWorker(this);
}
interruptIfStarted方法

中断所有运行中的线程。此方法只在shutdownNow中有调用。

1
2
3
4
5
6
7
8
9
10
void interruptIfStarted() {
Thread t;
// 运行中断且未中断的线程,执行中断
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}

拒绝策略

j.u.c.RejectedExecutionHandler接口提供了一个方法rejectedExecution,当线程池和队列都满的时候,会调用rejectedExecution方法,执行拒绝策略。

1
2
3
4
// RejectedExecutionHandler只有一个方法
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

ThreadPoolExecutor内置了4个RejectedExecutionHandler实现类,分别表示4种拒绝策略。

CallerRunsPolicy

任务交由主线程运行。由于主线程被占用,此策略会暂时阻止任务继续提交。

1
2
3
4
5
6
7
8
9
10
11
public static class CallerRunsPolicy implements RejectedExecutionHandler {

public CallerRunsPolicy() { }

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 线程池如果没关闭,则交由主线程执行
if (!e.isShutdown()) {
r.run();
}
}
}

AbortPolicy

抛出异常

1
2
3
4
5
6
7
8
9
10
11
public static class AbortPolicy implements RejectedExecutionHandler {

public AbortPolicy() { }

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 抛出异常
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}

DiscardPolicy

丢弃且静默处理

1
2
3
4
5
6
7
8
public static class DiscardPolicy implements RejectedExecutionHandler {

public DiscardPolicy() { }

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 什么也不做
}
}

DiscardOldestPolicy

丢弃最老的任务,提交当前任务

1
2
3
4
5
6
7
8
9
10
11
12
public static class DiscardOldestPolicy implements RejectedExecutionHandler {

public DiscardOldestPolicy() { }

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 如果线程池没关闭,从队列头取出一个任务丢掉,将r放入队尾
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}

对比

分别对四种拒绝策略构建的线程池(queueSize=5,corePoolSize=1,maxPoolSize=5),for循环提交100个任务,执行如下:

拒绝策略 描述 作用 任务处理结果
CallerRunsPolicy 交给主线程 队列和最大线程都满了,则会把新任务交给调用线程处理。 处理1-100
AbortPolicy 拒绝 队列和最大线程都满了,则抛出异常RejectedExecutionException 处理1-10
DiscardPolicy 丢弃 队列和最大线程都满了,直接丢弃 处理1-10
DiscardOldestPolicy 丢弃最早 队列和最大线程都满了,丢弃旧任务,处理新任务 处理1,7-10,96-100

分析如下

  • CallerRunsPolicy:当线程和队列都满了,则由主线程处理第11号任务。等主线程处理完,才会重新提交12-21号任务,然后主线程继续处理22号任务。最终处理1-100。

    注意:由于主线程的阻塞会影响任务提交,所以队列不应该设置的太小,否则有可能造成主线程繁忙、其它线程空闲的情况。建议queueSize=maxPoolSize

  • AbortPolicy:当线程和队列都满了,直接抛出异常,程序中断,最终处理1-10。业务中可以捕获这个异常做后续处理。

  • DiscardPolicy:当线程和队列都满了,没有任何提示,程序中断,最终处理1-10。此策略应少用。

  • DiscardOldestPolicy:当线程和队列都满了的时候,此时线程持有1、7、8、9、10,队列持有2、3、4、5、6。此策略会丢弃队列头的任务,将新任务放入队列尾。因此就有:

    ​ 丢弃2,放入11

    ​ 丢弃3,放入12

    ​ ……

    ​ 丢弃94,放入99

    ​ 丢弃95,放入100

    等任务全部提交完毕,队列中还剩下96-100号任务。等线程空闲,就会来执行这5个,所以最后的执行结果是1、7、8、9、10、96、97、98、99、100。

    当新任务对旧任务有替代作用时,可以使用此策略。比如业务需求是定时获取节目信息,那么新任务肯定比旧任务信息更准确,新任务到来时如果旧任务还没返回,可以丢弃旧任务。

    注意,此策略下队列不应设置太小,否则可能导致任务被大量丢弃,如示例中所示。

成员方法

ThreadPoolExecutor功能主要包含线程池维护和任务执行两部分,其中任务执行最重要的就是execute方法

execute方法

execute方法作用是将任务提交给线程池。具体可以分四种情况:

  1. 核心线程未满,创建核心线程,并将任务交给核心线程;
  2. 核心线程已满,队列未满,则将任务添加到队列;
  3. 队列已满,则创建非核心线程,将任务交给新线程;
  4. 最大线程已满,或线程池关闭,则执行拒绝策略。

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public void execute(Runnable command) {
// 任务不能为空
if (command == null)
throw new NullPointerException();

// ctl是原子类型,获得保存的值c。
// c是一个32位数,其中高3位保存线程池状态,低29位保存线程数
int c = ctl.get();

// 当前线程数小于核心线程数(对应核心数不为0的情况)
if (workerCountOf(c) < corePoolSize) {
// 添加核心线程,并将当前任务交给新线程
if (addWorker(command, true))
// 添加成功,则方法结束
return;
// 到这里说明添加失败,重新获得c
c = ctl.get();
}
// 如果线程池还在运行,则尝试将任务添加到队列中
if (isRunning(c) && workQueue.offer(command)) {
// 再次获得ctl的值
int recheck = ctl.get();
// 如果线程池不在运行,则将任务从队列中移除
if (!isRunning(recheck) && remove(command))
// 执行拒绝策略
reject(command);
// 如果线程池还在运行且线程数为0(对应核心数为0的情况)
else if (workerCountOf(recheck) == 0)
// 添加新线程,由于任务已经放到队列了,这里不再指定
addWorker(null, false);
}
// 如果添加任务到队列失败,尝试创建新线程
else if (!addWorker(command, false))
// 创建新线程失败,执行拒绝策略
reject(command);
}

isRunning方法

c < SHUTDOWN,则只有RUNNING一种情况。

1
2
3
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}

addWorker方法

顾名思义,addWorker方法主要作用是创建工人对象。方法有两个参数

  • Runnable firstTask,表示该线程需要处理的第一个任务
  • boolean core,表示该线程是否为核心线程

execute方法中,有三个地方会调用addWorker方法。分别是:

  • 核心线程未满,创建核心线程;
  • 核心线程数为0,且任务已加到队列里,创建非核心线程;
  • 核心线程已满且队列已满,创建非核心线程

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
// 返回true,表示线程添加成功
private boolean addWorker(Runnable firstTask, boolean core) {
// 两层循环,更新线程数成功则跳出
retry:
for (;;) {
int c = ctl.get();
// 线程池状态
int rs = runStateOf(c);
/*
if条件可以改写成
rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())
即 rs > SHUTDOWN || (rs >= SHUTDOWN && firstTask != null) || (rs >= SHUTDOWN && workQueue.isEmpty())
1. rs > SHUTDOWN,线程池已停止,不允许创建线程
2. rs >= SHUTDOWN && firstTask != null,线程池已关闭,不允许提交任务,此条件不支持
3. rs >= SHUTDOWN && workQueue.isEmpty(),线程池已关闭,且队列已空,不需要创建线程
综上,三种情况都会返回false,表示未创建线程
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
return false;

for (;;) {
// 线程数
int wc = workerCountOf(c);
// 线程数超过上限,或者线程数超过核心数或最大数,都返回false
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 线程数加1
if (compareAndIncrementWorkerCount(c))
// 结束外层循环
break retry;
c = ctl.get();
// 走到这里说明ctl更新失败,判断线程池状态是否发生变化
if (runStateOf(c) != rs)
// 如果变化,重新进入外层循环
continue retry;
// 否则,说明是线程数更新失败,重新进入内存循环
}
}

// 线程是否启动
boolean workerStarted = false;
// 线程是否添加
boolean workerAdded = false;
Worker w = null;
try {
// 创建新线程
w = new Worker(firstTask);
// 实际的执行线程
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
// 线程池是运行中或者已关闭但第一个任务为空(第二点不明白)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 线程还未启动但已经是存活状态了,说明有异常
if (t.isAlive())
throw new IllegalThreadStateException();
// 加到线程集合中
workers.add(w);
// 线程数
int s = workers.size();
// 如果当前线程数超过了历史最大线程数,则更新历史最大线程数
if (s > largestPoolSize)
largestPoolSize = s;
// 表示添加成功
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 添加成功则启动线程
if (workerAdded) {
t.start();
// 线程启动成功
workerStarted = true;
}
}
} finally {
// 线程启动失败,执行添加失败逻辑,将线程数减1
if (!workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

workerCountOf方法

从ctl中获取线程数

1
2
3
private static int workerCountOf(int c) { 
return c & CAPACITY;
}

CAPACITY = (1 << 29) - 1, 1<<29是 0010 00...11,再减1是 0001 11...11,即3个1和29个0。c & CAPACITY取的就是c的低29位。

runStateOf方法

从ctl中获取线程池状态

1
2
3
private static int runStateOf(int c) {
return c & ~CAPACITY;
}

CAPACITY 是 0001 11...11,取反就是1110 00...00,c & ~CAPACITY取的就是c的高3位。

compareAndIncrementWorkerCount方法

CAS操作,使ctl的值加1。跟线程数加1等效。

1
2
3
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}

addWorkerFailed方法

线程启动失败,会调用addWorkerFailed方法,有三个作用:

  1. 从集合中移除当前Worker对象
  2. ctl值减1,因为在addWorker中先加了1
  3. 尝试终止线程池

此方法只在addWorker中调用。

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 从集合中移除
if (w != null)
workers.remove(w);
// 线程数减1
decrementWorkerCount();
// 尝试终止线程池
tryTerminate();
} finally {
mainLock.unlock();
}
}
decrementWorkerCount方法

循环调用compareAndDecrementWorkerCount使ctl减1,直到成功

1
2
3
4
5
private void decrementWorkerCount() {
do {} while (
!compareAndDecrementWorkerCount(ctl.get())
);
}
compareAndDecrementWorkerCount方法

CAS操作,使ctl的值减1。跟线程数减1等效。

1
2
3
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}

runWorker方法

由上面分析可知:在addWorker方法中,调用 t.start() 启动线程,t = worker.thread,而 thread 持有的是worker对象,因此会调用到workerrun方法,最终会调用到runWorker

runWorker方法主要作用是:持续从队列中获取任务并消费,直到获取到的任务为null,则会结束循环并移除当前线程。

runWorker方法在run方法中唯一调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
final void runWorker(Worker w) {
// 当前线程
Thread wt = Thread.currentThread();
// 获取第一个任务
Runnable task = w.firstTask;
// 置空
w.firstTask = null;
// 设置状态位为0,表示允许中断
w.unlock();
// true表示突然结束,是异常情况
boolean completedAbruptly = true;
try {
// 初始任务为空,则从队列中获取,直到取不到则结束循环
while (task != null || (task = getTask()) != null) {
// 设置状态位为1,在shutdown方法调用时不会中断
w.lock();

// 线程状态在STOP及以上,需要中断正在执行的任务
if ((runStateAtLeast(ctl.get(), STOP) ||
// 只有||之前的条件不成立才会进入||后,但此时||后一定也不成立,不清楚为啥要这么写?
(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
// 当前线程非中断状态
!wt.isInterrupted())

// 中断当前线程
wt.interrupt();
try {
// 空方法,按需实现
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行具体任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 空方法,按需实现
afterExecute(task, thrown);
}
} finally {
// 置空task,便于进入下一个循环
task = null;
// 计数器加1
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

注意一下Thread的三个方法:

方法 性质 返回值类型 作用
interrupt 成员方法 void 设置线程的中断状态,作用于调用线程
isInterrupted 成员方法 boolean 返回线程的中断状态,作用于调用线程
interrupted 静态方法 boolean 返回并清除线程的中断状态,只能作用于当前线程

getTask方法

runWorker方法中,如果 task==null,则会调用getTask方法从队列获取任务。

具体逻辑是:

  • 队列不为空,从队列尾取出一个任务,并返回;
  • 队列为空
    • 如果线程数未超出核心线程,则workQueue.take()会一直阻塞,直到有新的任务入队列;
    • 如果线程数超出了核心线程,或者允许核心线程超时(核心线程默认不超时,可通过allowCoreThreadTimeOut()方法设置),则在从队列获取任务时设置一个超时时间(即keepAliveTime),超时则返回null,跳出循环,进入后续清理逻辑。

这样就实现了设置线程存活时间的逻辑。非常巧妙。

getTask方法在runWorker方法中唯一调用。

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private Runnable getTask() {
// 获取task是否超时
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// if条件等价于rs >= STOP || (rs >= SHUTDOWN && workQueue.isEmpty())
// 前者要丢弃已经提交的任务,后者允许执行已提交的任务但任务队列为空。这两种情况都没必要再获取
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// ctl减1
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);

// 允许线程超时检测(允许核心线程超时或存在非核心线程)
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

// 线程数大于最大线程数 或者 开启超时检测且获取任务超时
// 要求队列不为空且线程数大于1,则将线程数减1
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 执行成功,则返回null,否则重试
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
// 开启超时检测的,则设置一个超时时间去获取
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
// 未开启超时检测的,阻塞式获取
workQueue.take();
// 获取到则返回,方法结束
if (r != null)
return r;
// 否则认为超时,在下一个循环时将线程数减1
timedOut = true;
} catch (InterruptedException retry) {
// 下一个循环重试
timedOut = false;
}
}
}

processWorkerExit方法

runWorker方法中,如果getTask方法有结果,则while循环得以持续;

如果结果为null,循环正常结束,执行processWorkerExit(w, false),将当前线程从集合中移除;

如果循环异常结束,则执行processWorkerExit(w, true),将当前线程移除并补充一个线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 循环异常结束,在这里将线程数减1;正常则不执行
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 统计执行完的任务数
completedTaskCount += w.completedTasks;
// 将线程从集合中移除,GC会自动回收
workers.remove(w);
} finally {
mainLock.unlock();
}

// 尝试关闭线程池
tryTerminate();
int c = ctl.get();

// 如果线程池未关闭,确保线程数不低于最小值
if (runStateLessThan(c, STOP)) {
// 循环正常结束
if (!completedAbruptly) {
// 需保留的最小线程数,0或corePoolSize
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果队列中还有任务,确保至少有一个线程
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 线程数大于最小线程数,方法结束
if (workerCountOf(c) >= min)
return;
}
// 异常情况 或 正常情况但线程池不满足最小线程数,则需要添加一个线程
addWorker(null, false);
}
}

shutdown方法

shutdown方法会使线程池进入SHUTDOWN状态,不再接收新任务,但可以处理已经提交的任务。

此方法主要分三步进行:

  1. 校验执行线程是否有关闭线程池的权限;
  2. 修改线程池状态为SHUTDOWN
  3. 中断空闲的线程;
  4. 最后尝试关闭线程池。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 权限校验
checkShutdownAccess();
// 将线程池改为SHUTDOWN状态
advanceRunState(SHUTDOWN);
// 中断空闲的线程
interruptIdleWorkers();
// 空方法,在ScheduledThreadPoolExecutor中有实现
onShutdown();
} finally {
mainLock.unlock();
}
// 最后尝试关闭线程池
tryTerminate();
}

怎么实现“不再接收新任务”?

execute方法中,共有三处调用addWorker方法。

execute中三处调用addWorker

在执行advanceRunState(SHUTDOWN)后,线程池状态改为SHUTDOWN,因此只会进入1或3两个addWorker中。而这两处调用,都有command!=null。因此addWorker方法必然返回false(满足rs >= SHUTDOWN && firstTask != null)。最后会进入reject

返回false

checkShutdownAccess方法

权限校验方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void checkShutdownAccess() {
SecurityManager security = System.getSecurityManager();
if (security != null) {
// 校验调用线程是否有修改线程池的权限
security.checkPermission(shutdownPerm);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
// 校验调用线程是否有中断每个线程的权限
security.checkAccess(w.thread);
} finally {
mainLock.unlock();
}
}
}

advanceRunState方法

advanceRunState方法会将ctl的线程状态设为targetState,循环直到成功为止

1
2
3
4
5
6
7
8
9
10
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
// 如果线程状态在targetState及之上,则结束循环
if (runStateAtLeast(c, targetState) ||
// 否则设置为targetState
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}

interruptIdleWorkers方法

内部调用重载方法,中断所有空闲的线程

1
2
3
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
interruptIdleWorkers(boolean onlyOne)方法

中断空闲的线程。如果onlyOne=true,则最多只中断一个,否则中断所有。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 线程未中断且worker处于空闲状态(说明线程没有要执行的任务)
if (!t.isInterrupted() && w.tryLock()) {
try {
// 中断线程
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
// 只中断一个线程就结束
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}

怎么实现“中断空闲的线程,但不影响已提交的任务”?

在执行t.interrupt前,首先判断if (!t.isInterrupted() && w.tryLock()),要求t线程未中断且worker处于空闲状态(未上锁)。

另一方面,在runWorker方法中,一旦task != null || (task = getTask()) != null成立,就会执行w.lock()使worker上锁。

因此只有阻塞在getTask()t线程才会进入interruptIdleWorkers方法的if条件。

又由于t线程只会阻塞在BlockingQueue.pollBlockingQueue.take方法处,而这两个方法都会抛出InterruptedException异常,因此t.interrupt()会使t线程中断运行。

tryTerminate方法

尝试终止线程池。只有在线程池状态为STOPSHUTDOWN(且队列为空),且没有工作线程时,线程池才会终止,最后线程池进入TERMINATED状态。如果有线程在运行,则尝试中断一个,方法结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 判断线程池是RUNNING或TIDYING或TERMINATED状态 或 线程池是SHUTDOWN状态但队列不为空,
// 则线程池不能终止,方法结束
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;

// 走到这里,线程池状态只能是STOP或SHUTDOWN状态(且队列为空)。如果线程数不为0
if (workerCountOf(c) != 0) {
// 则中断一个线程,方法结束
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 设置状态为TIDYING,线程数为0。设置不成功则继续循环,直到成功
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 空方法,按需实现
terminated();
} finally {
// 状态改为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
// 唤醒mainLock对象上等待的所有线程
termination.signalAll();
}
// 成功则方法结束
return;
}
} finally {
mainLock.unlock();
}
}
}
runStateAtLeast方法

判断状态。五种状态由小到大排序,比较大小即可。

1
2
3
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}

shutdownNow方法

shutdown方法不同的是,shutdownNow方法调用advanceRunState方法使线程池进入STOP状态,不再接收新任务,通过interruptWorkers方法中断正在执行的任务,通过drainQueue方法丢弃并返回队列中的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 权限校验
checkShutdownAccess();
// 将线程池改为STOP状态
advanceRunState(STOP);
// 中断正在运行的线程
interruptWorkers();
// 清空队列,将任务转移到tasks并返回
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 尝试关闭线程池
tryTerminate();
return tasks;
}

interruptWorkers方法

interruptWorkers方法通过调用w.interruptIfStarted()中断所有运行中的线程,来中断正在执行的任务。

但是注意:interruptWorkers并不一定真的会中断正在执行中的任务。因为interrupt方法实际上只是设置了一个中断标记,只有当线程执行到sleep, wait, join方法,抛出InterruptedException异常时,任务才会中断。否则当前任务会继续执行完毕。

1
2
3
4
5
6
7
8
9
10
11
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
// 中断所有运行中的线程
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}

drainQueue方法

drainQueue方法将阻塞队列中的元素转移到集合中,并清空阻塞队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
// 各个队列的实现大同小异,都是将元素转移到集合中
q.drainTo(taskList);
// 转移失败,则一个一个按顺序转移
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
// 返回集合
return taskList;
}

参考资料

  1. Java线程池ThreadPoolExecutor使用和分析(三) - 终止线程池原理)
  2. ThreadPoolExecutor源码解析
  3. ThreadPoolExecutor关闭线程池的原理
文章作者: SongGT
文章链接: http://www.songguangtao.xyz/2022/11/01/21.JDK源码分析系列--ThreadPoolExecutor/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 SongGuangtao's Blog
大哥大嫂[微信打赏]
过年好[支付宝打赏]