概述
提供ExecutorService执行方法的默认实现。该类使用由newTaskFor返回的RunnableFuture实现submit、invokeAny和invokeAll方法,默认为此包中提供的FutureTask类。例如,submit(Runnable)的实现创建并返回一个关联的RunnableFuture。子类可以重写newTaskFor方法,返回除FutureTask之外的RunnableFuture实现。
源码解析
invokeAny方法允许您执行可调用任务的集合,并返回第一个完成任务的结果。当您希望在并行中执行多个任务并返回第一个完成的结果时,该方法很有用,同时丢弃其他结果。也可以用它在一个任务完成后取消剩余任务。
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null)
throw new NullPointerException();
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
ArrayList<Future<T>> futures = new ArrayList<>(ntasks);
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);
// For efficiency, especially in executors with limited
// parallelism, check to see if previously submitted tasks are
// done before submitting more of them. This interleaving
// plus the exception mechanics account for messiness of main
// loop.
try {
// Record exceptions so that if we fail to obtain any
// result, we can throw the last exception we got.
ExecutionException ee = null;
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Iterator<? extends Callable<T>> it = tasks.iterator();
// Start one task for sure; the rest incrementally
futures.add(ecs.submit(it.next()));
--ntasks;
int active = 1;
for (;;) {
Future<T> f = ecs.poll();
if (f == null) {
//如果没有获取到,重新提交
if (ntasks > 0) {
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}
else if (active == 0)
break;
//等待特定时间
else if (timed) {
f = ecs.poll(nanos, NANOSECONDS);
if (f == null)
throw new TimeoutException();
nanos = deadline - System.nanoTime();
}
else
f = ecs.take();
}
if (f != null) {
--active;
try {
//如果有一个执行完了,就返回结果
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}
if (ee == null)
ee = new ExecutionException();
throw ee;
} finally {
//取消剩余任务
cancelAll(futures);
}
}
invokeAll方法允许您执行可调用任务的集合,并返回表示每个任务结果的Future对象的列表。当你想并行执行多个任务并等待它们全部完成之前处理结果时,这种方法很有用。它还可以用来在一个任务完成后取消剩余任务。返回的Future对象列表可用于检查每个任务的状态并在可用时检索结果。
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
try {
//执行每个任务
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
//如果没有完成,一直阻塞获取
if (!f.isDone()) {
try { f.get(); }
catch (CancellationException | ExecutionException ignore) {}
}
}
//返回所有的执行结果
return futures;
} catch (Throwable t) {
cancelAll(futures);
throw t;
}
}
分享到: