今天是:
带着程序的旅程,每一行代码都是你前进的一步,每个错误都是你成长的机会,最终,你将抵达你的目的地。
title

AbstractExecutorService

概述

提供ExecutorService执行方法的默认实现。该类使用由newTaskFor返回的RunnableFuture实现submit、invokeAny和invokeAll方法,默认为此包中提供的FutureTask类。例如,submit(Runnable)的实现创建并返回一个关联的RunnableFuture。子类可以重写newTaskFor方法,返回除FutureTask之外的RunnableFuture实现。

源码解析

invokeAny方法允许您执行可调用任务的集合,并返回第一个完成任务的结果。当您希望在并行中执行多个任务并返回第一个完成的结果时,该方法很有用,同时丢弃其他结果。也可以用它在一个任务完成后取消剩余任务。

 private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                              boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (tasks == null)
            throw new NullPointerException();
        int ntasks = tasks.size();
        if (ntasks == 0)
            throw new IllegalArgumentException();
        ArrayList<Future<T>> futures = new ArrayList<>(ntasks);
        ExecutorCompletionService<T> ecs =
            new ExecutorCompletionService<T>(this);

        // For efficiency, especially in executors with limited
        // parallelism, check to see if previously submitted tasks are
        // done before submitting more of them. This interleaving
        // plus the exception mechanics account for messiness of main
        // loop.

        try {
            // Record exceptions so that if we fail to obtain any
            // result, we can throw the last exception we got.
            ExecutionException ee = null;
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Iterator<? extends Callable<T>> it = tasks.iterator();

            // Start one task for sure; the rest incrementally
            futures.add(ecs.submit(it.next()));
            --ntasks;
            int active = 1;

            for (;;) {
                Future<T> f = ecs.poll();
                if (f == null) {
                    //如果没有获取到,重新提交
                    if (ntasks > 0) {
                        --ntasks;
                        futures.add(ecs.submit(it.next()));
                        ++active;
                    }
                    else if (active == 0)
                        break;
                    //等待特定时间
                    else if (timed) {
                        f = ecs.poll(nanos, NANOSECONDS);
                        if (f == null)
                            throw new TimeoutException();
                        nanos = deadline - System.nanoTime();
                    }
                    else
                        f = ecs.take();
                }
                if (f != null) {
                    --active;
                    try {
                        //如果有一个执行完了,就返回结果
                        return f.get();
                    } catch (ExecutionException eex) {
                        ee = eex;
                    } catch (RuntimeException rex) {
                        ee = new ExecutionException(rex);
                    }
                }
            }

            if (ee == null)
                ee = new ExecutionException();
            throw ee;

        } finally {
            //取消剩余任务
            cancelAll(futures);
        }
    }

invokeAll方法允许您执行可调用任务的集合,并返回表示每个任务结果的Future对象的列表。当你想并行执行多个任务并等待它们全部完成之前处理结果时,这种方法很有用。它还可以用来在一个任务完成后取消剩余任务。返回的Future对象列表可用于检查每个任务的状态并在可用时检索结果。

    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
        try {
            //执行每个任务
            for (Callable<T> t : tasks) {
                RunnableFuture<T> f = newTaskFor(t);
                futures.add(f);
                execute(f);
            }
            for (int i = 0, size = futures.size(); i < size; i++) {
                Future<T> f = futures.get(i);
                //如果没有完成,一直阻塞获取
                if (!f.isDone()) {
                    try { f.get(); }
                    catch (CancellationException | ExecutionException ignore) {}
                }
            }
            //返回所有的执行结果
            return futures;
        } catch (Throwable t) {
            cancelAll(futures);
            throw t;
        }
    }

 

分享到:

专栏

类型标签

网站访问总量