上一篇介绍了ThreadPoolExecutor
,提供了一个execute
方法执行任务,但execute
方法没有返回值,不能批量提交任务,不能设置超时时间。AbstractExecutorService
是ThreadPoolExecutor
的抽象父类,提供了一些额外方法,使得执行任务的功能得到增强。方法可分为三类:有返回的提交(submit),执行全部(invokeAll),执行任意一个(invokeAny)。这三类方法也是基于execute
。
submit方法
作用
submit
用来提交一个任务,并异步返回任务的执行结果。
重载方法
submit
有三种重载方法,可以提交一个Runnable
或Callable
类型对象作为参数。
方法 | 参数 | 返回 | 说明 |
---|---|---|---|
submit | Callable |
Future |
提交一个Callable对象,返回对应类型Future |
submit | Runnable task | Future<?> | 提交一个Runnable 对象,返回Void类型Future |
submit | Runnable task, T result | Future |
提交一个Runnable 对象,并指定返回Future类型 |
源码分析
submit
方法传入一个Callable
类型对象task
,接着通过newTaskFor
方法将task
转换成RunnableFuture
类型对象ftask
,然后调用execute(ftask)
,最后返回ftask
。
Callable接口
其中,j.u.c.Callable
是一个函数式接口,只包含一个call
方法。与Runnable.run
相比,call
方法拥有返回值,类型即为泛型V。
Future接口
Future
接口提供了一些方法,其中get
方法可以在任务执行完毕后获得返回值。
RunnableFuture接口
由于RunnableFuture
实现了Runnable
和Future
,因此这个ftask
既可以是execute
的方法参数,又可以是submit
的返回值。
newTaskFor方法
newTaskFor
有两种重载方法,这两种方法都是将一个Callable
或Runnable
对象封装成了FutureTask
对象。而FutureTask
实现了RunnableFuture
接口。
方法 | 参数 | 返回 | 说明 |
---|---|---|---|
newTaskFor | Callable |
RunnableFuture |
将callable封装进RunnableFuture |
newTaskFor | Runnable runnable, T value | RunnableFuture |
将runnable封装进RunnableFuture,并通过value指定泛型 |
分别调用了两个FutureTask
的构造方法
由之前对execute(Runnable command)
方法的分析可知,线程最终执行的是command.run()
方法。而这里传入的command
是FutureTask
类型。
FutureTask
FutureTask
中起关键作用的是run
、get
和report
方法
run方法
在run
方法中,先执行callable.call()
方法获得结果result
,再将result
保存到成员变量outcome
中
get方法
get
方法用来获取结果,包括两个重载方法
无超时时间
任务结束前会一直阻塞
有超时时间
如果超时了任务还没完成,则抛出异常
report方法
任务执行完成,会调用report
方法,返回outcome
invokeAny方法
作用
invokeAny
方法用来提交多个任务,只要这些任务中有一个成功,就返回执行结果,其它任务都会被取消。如果所有任务都执行失败,则抛出异常。
invokeAny
方法适合执行某些时效性要求高的任务。比如向多台机器发出同样的计算请求,结果都一样,但性能高或网络好的机器会最先返回,那其他任务可以取消。
重载方法
invokeAny
包含两个重载方法,提交一个Callable<T>
类型任务集合。区别是是否允许超时。
方法 | 参数 | 返回 | 说明 |
---|---|---|---|
invokeAny | Collection<? extends Callable |
T | 提交任务集合,只要有任意一个成功则返回,其余任务取消 |
invokeAny | Collection<? extends Callable |
T | 超时则抛出异常 |
这两个方法内部都是调用的doInvokeAny
方法。
doInvokeAny方法
doInvokeAny
是invokeAny
的具体实现。
逻辑
1.首先构建一个ECS实例,提交第一个任务,然后进入for循环,循环体包含两个if语句;
2.进入循环后,开始尝试从ecs的队列中获取一个future
,此时存在以下几种情况:
2.1还没有任务执行完成,future=null
,进入第一个if语句,判断剩余任务是否都已提交
2.1.1如果还没提交完,则提交一个,进入下一轮循环;
2.1.2如果已经提交完,判断是否设置了超时时间
2.1.2.1如果设置了,则等待超时时间,没有结果就抛出异常,有结果就执行第二个if语句;
2.1.2.2如果没有设置,则阻塞等待。
2.2已有任务已执行完成,future!=null
,执行后续的f.get
操作,
2.2.1如果get获取失败,则记录异常,进入下一轮循环;
2.2.2如果get获取成功,则结束循环,返回执行结果
3.如果循环结束所有任务都执行失败,则抛出最后记录的异常;
4.最后中断所有已提交的任务。
源码分析
1 | private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, |
ExecutorCompletionService类
在doInvokeAny
方法中,创建了一个j.u.c.ExecutorCompletionService
对象ecs
。
ecs
包含三个成员变量,其中
executor
就是调用方ThreadPoolExecutor
对象aes
为向上转型的AbstractExecutorService
对象completionQueue
是BlockingQueue<Future>
类型,用来保存执行结果
QueueingFuture类
ExecutorCompletionService
提供了submit
,poll
,take
方法。
其中submit
方法创建了一个ExecutorCompletionService.QueueingFuture
对象,QueueingFuture
继承自FutureTask
,重写了父类的done
方法,用于将task
添加到completionQueue
中。
而done
方法会在FutureTask.run
方法执行后被调用。因此completionQueue
中保存有任务执行结果。
因此AbstractExecutorService
可以通过ECS
的poll
,take
方法从completionQueue
中获取执行结果。
invokeAll方法
作用
与invokeAny
不同,invokeAll
方法用来提交多个任务,并等待所有任务返回。
重载方法
invokeAll
包含两个重载方法,都是提交一个Callable<T>
类型任务集合。区别是是否允许超时。
方法 | 参数 | 返回 | 说明 |
---|---|---|---|
invokeAll | Collection<? extends Callable |
List |
提交任务集合,阻塞直到所以任务都完成 |
invokeAll | Collection<? extends Callable |
List |
超时则直接返回 |
源码分析
不设置超时
会等待所有任务执行完毕,并同步返回执行结果。一旦有任务提交失败,或者执行失败,则取消所有任务。
1 | public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) |
设置超时
会等待所有任务执行完毕,并同步返回执行结果。一旦有任务提交失败,或者超时,则取消所有任务。
1 | public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, |