JDK源码分析系列--AbstractExecutorService

上一篇介绍了ThreadPoolExecutor,提供了一个execute方法执行任务,但execute方法没有返回值,不能批量提交任务,不能设置超时时间。AbstractExecutorServiceThreadPoolExecutor的抽象父类,提供了一些额外方法,使得执行任务的功能得到增强。方法可分为三类:有返回的提交(submit),执行全部(invokeAll),执行任意一个(invokeAny)。这三类方法也是基于execute


submit方法

作用

submit用来提交一个任务,并异步返回任务的执行结果。

重载方法

submit有三种重载方法,可以提交一个RunnableCallable类型对象作为参数。

方法 参数 返回 说明
submit Callable task Future 提交一个Callable对象,返回对应类型Future
submit Runnable task Future<?> 提交一个Runnable 对象,返回Void类型Future
submit Runnable task, T result Future 提交一个Runnable 对象,并指定返回Future类型

源码分析

submit方法传入一个Callable类型对象task,接着通过newTaskFor方法将task转换成RunnableFuture类型对象ftask,然后调用execute(ftask),最后返回ftask

submit(Callable<T> task)方法

Callable接口

其中,j.u.c.Callable是一个函数式接口,只包含一个call方法。与Runnable.run相比,call方法拥有返回值,类型即为泛型V。

只包含一个call方法

Future接口

Future接口提供了一些方法,其中get方法可以在任务执行完毕后获得返回值。

image-20221116113851219

RunnableFuture接口

由于RunnableFuture实现了RunnableFuture,因此这个ftask既可以是execute的方法参数,又可以是submit的返回值。

RunnableFuture实现了Runnable和Future

newTaskFor方法

newTaskFor有两种重载方法,这两种方法都是将一个CallableRunnable 对象封装成了FutureTask对象。而FutureTask实现了RunnableFuture接口。

方法 参数 返回 说明
newTaskFor Callable callable RunnableFuture 将callable封装进RunnableFuture
newTaskFor Runnable runnable, T value RunnableFuture 将runnable封装进RunnableFuture,并通过value指定泛型

分别调用了两个FutureTask的构造方法

参数类型为Runnable

参数类型为Callable

由之前对execute(Runnable command)方法的分析可知,线程最终执行的是command.run()方法。而这里传入的commandFutureTask类型。

FutureTask

FutureTask中起关键作用的是rungetreport方法

run方法

run方法中,先执行callable.call()方法获得结果result,再将result保存到成员变量outcome

执行callable.call()方法

将结果保存到outcome中

get方法

get方法用来获取结果,包括两个重载方法

无超时时间

任务结束前会一直阻塞

任务未完成,则一直阻塞

有超时时间

如果超时了任务还没完成,则抛出异常

任务未完成,阻塞指定时间

report方法

任务执行完成,会调用report方法,返回outcome

任务执行结束,返回outcome值

invokeAny方法

作用

invokeAny方法用来提交多个任务,只要这些任务中有一个成功,就返回执行结果,其它任务都会被取消。如果所有任务都执行失败,则抛出异常。

invokeAny方法适合执行某些时效性要求高的任务。比如向多台机器发出同样的计算请求,结果都一样,但性能高或网络好的机器会最先返回,那其他任务可以取消。

重载方法

invokeAny包含两个重载方法,提交一个Callable<T>类型任务集合。区别是是否允许超时。

方法 参数 返回 说明
invokeAny Collection<? extends Callable> tasks T 提交任务集合,只要有任意一个成功则返回,其余任务取消
invokeAny Collection<? extends Callable> tasks, long timeout, TimeUnit unit T 超时则抛出异常

这两个方法内部都是调用的doInvokeAny方法。

doInvokeAny方法

doInvokeAnyinvokeAny的具体实现。

逻辑

1.首先构建一个ECS实例,提交第一个任务,然后进入for循环,循环体包含两个if语句;

2.进入循环后,开始尝试从ecs的队列中获取一个future,此时存在以下几种情况:

​ 2.1还没有任务执行完成,future=null,进入第一个if语句,判断剩余任务是否都已提交

​ 2.1.1如果还没提交完,则提交一个,进入下一轮循环;

​ 2.1.2如果已经提交完,判断是否设置了超时时间

​ 2.1.2.1如果设置了,则等待超时时间,没有结果就抛出异常,有结果就执行第二个if语句;

​ 2.1.2.2如果没有设置,则阻塞等待。

​ 2.2已有任务已执行完成,future!=null,执行后续的f.get操作,

​ 2.2.1如果get获取失败,则记录异常,进入下一轮循环;

​ 2.2.2如果get获取成功,则结束循环,返回执行结果

3.如果循环结束所有任务都执行失败,则抛出最后记录的异常;

4.最后中断所有已提交的任务。

源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null)
throw new NullPointerException();
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
// 用来保存已提交的任务,方便执行成功后取消所有任务
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
// 构建一个ECS对象,用来获取任务的执行结果
ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this);

try {
// 记录任务执行可能抛出的异常
ExecutionException ee = null;
// 超时时刻
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Iterator<? extends Callable<T>> it = tasks.iterator();
// 先提交一个任务,并将任务加入到futures中
futures.add(ecs.submit(it.next()));
// 任务数-1
--ntasks;
// 正在执行的任务数
int active = 1;
for (;;) {
// 尝试从队列中取出一个future,非阻塞
Future<T> f = ecs.poll();
// 为null说明还没有任务执行完
if (f == null) {
// 队列不为空
if (ntasks > 0) {
// 任务数-1
--ntasks;
// 提交下一个任务,并将任务加入到futures中
futures.add(ecs.submit(it.next()));
// 正在执行的任务数+1
++active;
}
// 以下都是队列为空的情况
// 没有在执行的任务,说明所有的任务都抛了异常,则结束循环
else if (active == 0)
break;
// 设置了超时
else if (timed) {
// 尝试在限定时间内取出一个future
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
// 说明已经超时还没取到
if (f == null)
// 抛出超时异常
throw new TimeoutException();
// 说明已取到,但可能get失败,所以计算剩余的超时时间,供下次循环用
nanos = deadline - System.nanoTime();
}
// 未设置超时时间,阻塞式获取future
else
f = ecs.take();
}
// future不为null
if (f != null) {
// 正在执行的任务数-1
--active;
try {
// 阻塞式获取结果,成功则直接返回
return f.get();
} catch (ExecutionException eex) {
// 记录异常,但循环不结束
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}
if (ee == null)
ee = new ExecutionException();
// 说明没有执行成功的任务,抛出最后一个异常
throw ee;
} finally {
// 取消所有任务,此操作不影响结果返回
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}

ExecutorCompletionService类

doInvokeAny方法中,创建了一个j.u.c.ExecutorCompletionService对象ecs

ExecutorCompletionService构造方法

ecs包含三个成员变量,其中

  • executor就是调用方ThreadPoolExecutor对象
  • aes为向上转型的AbstractExecutorService对象
  • completionQueueBlockingQueue<Future>类型,用来保存执行结果

QueueingFuture类

ExecutorCompletionService提供了submitpolltake方法。

其中submit方法创建了一个ExecutorCompletionService.QueueingFuture对象,QueueingFuture继承自FutureTask,重写了父类的done方法,用于将task添加到completionQueue中。

submit方法

QueueingFuture类

done方法会在FutureTask.run方法执行后被调用。因此completionQueue中保存有任务执行结果。

因此AbstractExecutorService可以通过ECSpolltake方法从completionQueue中获取执行结果。

ECS的poll,take方法

invokeAll方法

作用

invokeAny不同,invokeAll方法用来提交多个任务,并等待所有任务返回。

重载方法

invokeAll包含两个重载方法,都是提交一个Callable<T>类型任务集合。区别是是否允许超时。

方法 参数 返回 说明
invokeAll Collection<? extends Callable> tasks List> 提交任务集合,阻塞直到所以任务都完成
invokeAll Collection<? extends Callable> tasks, long timeout, TimeUnit unit List> 超时则直接返回

源码分析

不设置超时

会等待所有任务执行完毕,并同步返回执行结果。一旦有任务提交失败,或者执行失败,则取消所有任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
// 用来保存所有任务的执行结果
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
// 依次执行所有任务
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}
// 等待所有任务完成,总时长取决于最后一个执行完成的任务
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
// 当前任务未完成,则阻塞在get方法
if (!f.isDone()) {
try {
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
// 返回所有结果
return futures;
} finally {
if (!done)
// 取消所有任务(当任务提交被拒绝,或任务执行失败会进入到这里)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}

设置超时

会等待所有任务执行完毕,并同步返回执行结果。一旦有任务提交失败,或者超时,则取消所有任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
// 用来保存所有任务
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
// 添加所有任务
for (Callable<T> t : tasks)
futures.add(newTaskFor(t));
// 超时时刻
final long deadline = System.nanoTime() + nanos;
final int size = futures.size();
// 循环提交任务
for (int i = 0; i < size; i++) {
execute((Runnable)futures.get(i));
// 每提交一个任务后检测是否超时
nanos = deadline - System.nanoTime();
// 超时则直接返回
if (nanos <= 0L)
return futures;
}
// 遍历futures
for (int i = 0; i < size; i++) {
Future<T> f = futures.get(i);
// 当前任务未执行完成
if (!f.isDone()) {
// 已到超时时间,直接返回
if (nanos <= 0L)
return futures;
try {
// 没到超时时间,则阻塞等待
f.get(nanos, TimeUnit.NANOSECONDS);
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
// 只有超时异常才返回
return futures;
}
// 重新计算剩余时间
nanos = deadline - System.nanoTime();
}
}
done = true;
return futures;
} finally {
if (!done)
// 取消所有任务(提交失败、超时会进入到这里)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
文章作者: SongGT
文章链接: http://www.songguangtao.xyz/2022/11/18/22.JDK源码分析系列--AbstractExecutorService/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 SongGuangtao's Blog
大哥大嫂[微信打赏]
过年好[支付宝打赏]