今天是:
带着程序的旅程,每一行代码都是你前进的一步,每个错误都是你成长的机会,最终,你将抵达你的目的地。
title

ThreadPoolExecutor

概述

是一种ExecutorService执行器,它使用可能有多个的池化线程之一来执行每个提交的任务,通常使用Executors工厂方法配置。 线程池解决了两个不同的问题:通常在执行大量异步任务时提供更好的性能,因为减少了每个任务调用开销,并提供了一种限制和管理资源(包括线程)的方法,在执行任务集合时消耗。每个ThreadPoolExecutor还维护一些基本统计信息,如完成任务的数量。 要在广泛的上下文中有用,这个类提供了许多可调整的参数和可扩展性钩子。但是,程序员被敦促使用更方便的Executors工厂方法Executors.newCachedThreadPool(无界线程池,自动线程回收),Executors.newFixedThreadPool(固定大小线程池)和Executors.newSingleThreadExecutor(单个后台线程),它们预配置了最常见用例的设置。否则,在手动配置和调整此类时使用以下指南。

  • 设置核心和最大池大小。当新任务被提交时,如果线程数小于核心池大小,则会创建新线程来处理请求,即使其他工作线程是空闲的。否则,如果线程数小于最大池大小,只有在队列满时才会创建新线程来处理请求。
  • 按需构建。默认情况下,即使核心线程也只有在新任务到达时才初始化和启动,但可以使用prestartCoreThread或prestartAllCoreThreads动态覆盖。如果您使用非空队列构造池,您可能希望预启动线程。
  • 创建新线程。使用ThreadFactory创建新线程。如果未指定,则使用Executors.defaultThreadFactory,它创建线程以在同一个ThreadGroup中并具有相同的NORM_PRIORITY优先级和非守护线程状态。通过提供不同的ThreadFactory,您可以更改线程的名称,线程组,优先级,守护线程状态等。如果ThreadFactory在newThread中返回null时无法创建线程,执行器将继续运行,但可能无法执行任何任务。线程应具有"modifyThread"RuntimePermission。如果工作线程或使用池的其他线程没有此权限,服务可能会降级:配置更改可能无法及时生效,关闭池可能仍处于可以终止但未完成的状态。
  • 保持活动时间。如果池当前有多于corePoolSize的线程,则如果它们空闲时间超过keepAliveTime(请参阅getKeepAliveTime(TimeUnit)),则会终止过剩线程。这提供了在池未被积极使用时减少资源消耗的方法。如果池后来变得更加活跃,将创建新线程。此参数也可以使用setKeepAliveTime(long,TimeUnit)动态更改。使用Long.MAX_VALUE TimeUnit.NANOSECONDS的值有效地禁用了在关闭之前终止的空闲线程。默认情况下,保持活动策略仅适用于corePoolSize线程以上,但可以使用allowCoreThreadTimeOut(boolean)将此时间限制策略应用于核心线程,只要keepAliveTime值为非零。
  • 排队。可以使用任何BlockingQueue来传输和保存提交的任务。使用此队列与池大小进行交互: 如果运行的线程少于corePoolSize,则Executor始终更喜欢添加新线程而不是排队。 如果corePoolSize或更多线程正在运行,则Executor始终更喜欢排队请求而不是添加新线程。 如果请求无法排队,则会创建新线程,除非这会超过maximumPoolSize,在这种情况下,任务将被拒绝。

有三种通用策略排队:

  • 直接交接。工作队列的一个很好的默认选择是SynchronousQueue,它将任务交给线程而不进行其他持有。在这里,尝试排队任务将在没有立即可用的线程来运行它时失败,因此将构造新线程。此策略避免在处理具有内部依赖关系的请求集时锁死。直接交接通常需要无限制的maximumPoolSizes来避免拒绝新提交的任务。这反过来允许命令继续到达时线程增长的可能性。
  • 固定大小的队列。使用固定大小的队列(例如,ArrayBlockingQueue)将有助于防止资源耗尽,但是可能导致新任务被拒绝。在这种情况下,通过使用ThreadPoolExecutor.CallerRunsPolicy更改拒绝策略,可以使调用者的线程来运行被拒绝的任务。
  • 无界队列。使用无界队列(例如,LinkedBlockingQueue)将允许无限制的任务增长,但是可能导致资源耗尽。在大多数生产环境中,更好的做法是使用有界队列并通过调整队列和池大小来预防资源耗尽。

 

  • 拒绝任务。当Executor已经关闭时,在execute(Runnable)方法中提交的新任务将被拒绝,并且当Executor对最大线程和工作队列容量都使用有限边界,且饱和时也会被拒绝。在任何情况下,execute方法都会调用其RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor)方法。提供了四种预定义的处理程序策略:
  1. 默认的ThreadPoolExecutor.AbortPolicy,在拒绝时,处理程序会抛出运行时RejectedExecutionException。
  2. ThreadPoolExecutor.CallerRunsPolicy,调用execute的线程本身运行任务。这提供了一种简单的反馈控制机制,可以减缓新任务提交的速度。
  3. ThreadPoolExecutor.DiscardPolicy,无法执行的任务将简单地被丢弃。此策略仅适用于从不依赖任务完成的罕见情况。
  4. ThreadPoolExecutor.DiscardOldestPolicy,如果executor没有关闭,则工作队列头部的任务将被丢弃,然后重试执行(这可能会再次失败,导致重复)。这种策略很少被接受。在几乎所有情况下,您应该取消任务,以在等待完成的任何组件中引起异常,并/或记录失败,如ThreadPoolExecutor.DiscardOldestPolicy文档中所示。

也可以定义和使用其他类型的RejectedExecutionHandler类。这需要特别小心,特别是在策略设计只适用于特定容量或排队策略的情况下。

  • 钩子方法:这个类提供了受保护的可重写的beforeExecute(Thread, Runnable)和afterExecute(Runnable, Throwable)方法,在每个任务执行前后调用。它们可用于操作执行环境;例如,重新初始化ThreadLocals,收集统计信息或添加日志条目。此外,可以重写方法terminated以在Executor完全终止后执行任何特殊处理。 如果钩子,回调或BlockingQueue方法抛出异常,内部工作线程可能会失败,突然终止,并可能被替换。
  • 队列维护:getQueue()方法允许访问工作队列以进行监视和调试。强烈反对使用此方法进行其他目的。提供的两种方法remove(Runnable)和purge可用于帮助在取消大量排队任务时进行存储回收。
  • 回收:在程序中不再引用且没有剩余线程的池可能会被回收(垃圾回收),而无需显式关闭。您可以配置池以允许所有未使用的线程最终死亡,通过设置适当的保活时间,使用零基本线程数和/或设置allowCoreThreadTimeOut(boolean)。

源码解析

主池控制状态 ctl 是一个原子整数,它打包了两个概念字段 workerCount,表示有效线程数 runState,表示运行状态,关闭等为了将它们打包到一个 int 中,我们将 workerCount 限制为 (2^29)-1 (约 5 亿) 线程,而不是 (2^31)-1 (20 亿)。如果将来有问题,变量可以更改为 AtomicLong,并下面调整 shift/mask 常量。但是,在需要时之前,使用 int 的代码更快,更简单。workerCount 是已允许启动并且不允许停止的工作线程数。该值可能与实际存活线程数暂时不同,例如当线程工厂未能在请求时创建线程时,以及在退出线程在终止之前仍在进行账目记录时。用户可见的池大小被报告为 workers 集合的当前大小。runState 提供了主要的生命周期控制,具有以下值:

  • RUNNING: 接受新任务并处理排队的任务
  • SHUTDOWN: 不接受新任务,但处理排队的任务
  • STOP: 不接受新任务,不处理排队的任务,并中断正在进行的任务
  • TIDYING: 所有任务已终止,workerCount 为零,正在转换为状态 TIDYING 的线程将运行 terminated() 钩子方法
  • TERMINATED: terminated() 已完成

这些值之间的数字顺序很重要,允许有序比较。runState 随时间单调增长,但不需要达到每个状态。转换为:

  • RUNNING -> SHUTDOWN 在调用shutdown()时
  • (RUNNING or SHUTDOWN) -> STOP 在调用shutdownNow()时
  • SHUTDOWN -> TIDYING 队列和池都为空时
  • STOP -> TIDYING 池为空时
  • TIDYING -> TERMINATED 当 terminated() 钩子方法完成时

在 awaitTermination() 中等待的线程将在状态达到 TERMINATED 时返回。检测 SHUTDOWN 到 TIDYING 的转换不如你想象的那么简单,因为在 SHUTDOWN 状态下队列可能在非空后变为空,反之亦然,但我们只能在看到它是空的后,看到 workerCount 是 0 时终止(这有时需要重新检查 -- 请参见下面)

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int ctlOf(int rs, int wc) { return rs | wc; }

 

这是一个用于保存任务和分配给工作线程的队列。我们不要求 workQueue.poll() 返回 null 一定意味着 workQueue.isEmpty(),因此仅依靠 isEmpty 来查看队列是否为空(例如,当决定是否从 SHUTDOWN 转换到 TIDYING 时,我们必须这样做)。这适用于特殊目的队列,例如 DelayQueues,其中 poll() 允许返回 null,即使在延迟过期后可能会返回非空值。

private final BlockingQueue<Runnable> workQueue;

 

拒绝处理程序,当 execute 中线程池饱和或关闭时,会调用此处理程序。

private volatile RejectedExecutionHandler handler;

 

在调用shutdown和shutdownNow时需要权限。 我们还需要(参见checkShutdownAccess)调用者有权实际中断工作线程集中的线程(由Thread.interrupt管理,它依赖于ThreadGroup.checkAccess,又依赖于SecurityManager.checkAccess)。 只有在通过这些检查之后,才尝试关闭。 Thread.interrupt的所有实际调用(参见interruptIdleWorkers和interruptWorkers)忽略SecurityExceptions,这意味着尝试中断将静默失败。 在关闭的情况下,除非SecurityManager具有不一致的策略,有时允许访问线程,有时不允许,否则它们不应该失败。 在这种情况下,无法实际中断线程可能会禁用或延迟完全终止。 使用interruptIdleWorkers的其他用途是建议性的,并且无法实际中断将仅延迟对配置更改的响

private static final RuntimePermission shutdownPerm =new RuntimePermission("modifyThread");

 

这个类 Worker 主要维护线程运行任务时的中断控制状态,以及其他的小的记录。这个类机会性地扩展了 AbstractQueuedSynchronizer ,以简化在每个任务执行时获取和释放锁。这可以保护那些企图唤醒等待任务的工作线程而不是中断正在运行的任务的中断。我们实现了一个简单的不可重入互斥锁,而不是使用 ReentrantLock ,因为我们不希望工作任务在调用 setCorePoolSize 等池控制方法时能够重新获取锁。此外,为了在线程实际开始运行任务之前阻止中断,我们将锁定状态初始化为负值,并在启动时清除它(在 runWorker 中)。

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        @SuppressWarnings("serial") // Unlikely to be serializable
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        @SuppressWarnings("serial") // Not statically typed as Serializable
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        // TODO: switch to AbstractQueuedLongSynchronizer and move
        // completedTasks into the lock word.

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker. */
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }
//将运行状态转换到给定目标,如果已经至少是给定目标,则保持不变。
参数:
targetState - 所需状态,SHUTDOWN或STOP (但不是TIDYING或TERMINATED - 使用tryTerminate)
    private void advanceRunState(int targetState) {
        // assert targetState == SHUTDOWN || targetState == STOP;
        for (;;) {
            int c = ctl.get();
            if (runStateAtLeast(c, targetState) ||
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;
        }
    }
这个方法将状态转换为 TERMINATED,如果符合以下条件之一:(SHUTDOWN 并且线程池和队列为空) 或者 (STOP 并且线程池为空)。如果本来就有资格终止,但 workerCount 不为 0,则中断一个空闲的工作线程,以确保关闭信号传播。这个方法必须在任何可能导致终止的操作之后调用 - 在关闭期间减少工作线程数或从队列中删除任务。这个方法是非私有的,允许从 ScheduledThreadPoolExecutor 访问。

    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            //如果再运行中或runState 是否至少为 TIDYING 状态,或状态小于停止且工作队列为空
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateLessThan(c, STOP) && ! workQueue.isEmpty()))
                return;
             //工作数量不等于0
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        /调用模板方法,由子类具体实现
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }



如果有安全管理器,确保调用者有权限在一般情况下关闭线程(参见shutdownPerm)。如果通过了这个检查,还需要确保调用者被允许中断每个工作线程。即使第一个检查通过了,如果SecurityManager对某些线程有特殊处理,这可能是不成立的。
    private void checkShutdownAccess() {
        // assert mainLock.isHeldByCurrentThread();
        @SuppressWarnings("removal")
        SecurityManager security = System.getSecurityManager();
        if (security != null) {
            security.checkPermission(shutdownPerm);
            for (Worker w : workers)
                security.checkAccess(w.thread);
        }
    }

把任务队列中的任务排空到一个新的列表中,通常使用drainTo。但如果队列是DelayQueue或其他类型的队列,poll或drainTo可能会删除一些元素失败,它会一个一个地删除它们。
    private List<Runnable> drainQueue() {
        BlockingQueue<Runnable> q = workQueue;
        ArrayList<Runnable> taskList = new ArrayList<>();
        q.drainTo(taskList);
        if (!q.isEmpty()) {
            for (Runnable r : q.toArray(new Runnable[0])) {
                if (q.remove(r))
                    taskList.add(r);
            }
        }
        return taskList;
    }



检查是否可以根据当前池状态和给定边界(核心或最大值)添加新工作线程。如果可以,则相应地调整工作线程计数,并在可能的情况下创建并启动新工作线程,将 firstTask 运行为其第一个任务。如果池已停止或有资格关闭,则此方法返回 false。如果线程工厂无法创建线程,也会返回 false。如果线程创建失败,要么是因为线程工厂返回 null,要么是因为异常(通常是 Thread.start() 中的 OutOfMemoryError),我们将清理回滚。
参数:
firstTask - 新线程应运行的第一个任务(如果没有则为 null)。通过在执行方法中使用初始 firstTask 创建工作线程,可以在线程数小于 corePoolSize 时(在这种情况下我们始终启动一个)或队列已满时(此时我们必须绕过队列)绕过排队。最初空闲线程通常是通过 prestartCore
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry: //goto 语句,避免死循环
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            /*如果线程处于非运行状态,并且 rs 不等于 SHUTDOWN 且 firstTask 不等于空且且
            workQueue 为空,直接返回 false(表示不可添加 work 状态)
            1. 线程池已经 shutdown 后,还要添加新的任务,拒绝
            2. (第二个判断)SHUTDOWN 状态不接受新任务,但仍然会执行已经加入任务队列的任
            务,所以当进入 SHUTDOWN 状态,而传进来的任务为空,并且任务队列不为空的时候,是允许添加
            新线程的,如果把这个条件取反,就表示不允许添加 worker*/
            if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                            firstTask == null &&
                            ! workQueue.isEmpty()))
                return false;
            for (;;) { //自旋
                int wc = workerCountOf(c);//获得 Worker 工作线程数
                //如果工作线程数大于默认容量大小或者大于核心线程数大小,则直接返回 false 表示不
                能再添加 worker。
                if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))//通过 cas 来增加工作线程数,如果 cas 失败,则直接重试
                    break retry;
                c = ctl.get(); // Re-read ctl //再次获取 ctl 的值
                if (runStateOf(c) != rs) //这里如果不想等,说明线程的状态发生了变化,继续重试
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        
        //上面这段代码主要是对 worker 数量做原子+1 操作,下面的逻辑才是正式构建一个 worker


        boolean workerStarted = false; //工作线程是否启动的标识
        boolean workerAdded = false; //工作线程是否已经添加成功的标识
        Worker w = null;
        try {
            w = new Worker(firstTask); //构建一个 Worker,这个 worker 是什么呢?我们可以看到构造方法里面传入了一个 Runnable 对象
            final Thread t = w.thread; //从 worker 对象中取出线程
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock(); //这里有个重入锁,避免并发问题
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    //只有当前线程池是正在运行状态,[或是 SHUTDOWN 且 firstTask 为空],才能添加到 workers 集合中
                    if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                        //任务刚封装到 work 里面,还没 start,你封装的线程就是 alive,几个意思?肯定是要抛异常出去的
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w); //将新创建的 Worker 添加到 workers 集合中
                        int s = workers.size();
                        //如果集合中的工作线程数大于最大线程数,这个最大线程数表示线程池曾经出现过的最大线程数
                        if (s > largestPoolSize)
                            largestPoolSize = s; //更新线程池出现过的最大线程数
                        workerAdded = true;//表示工作线程创建成功了
                    }
                } finally {
                    mainLock.unlock(); //释放锁
                }
                if (workerAdded) {//如果 worker 添加成功
                    t.start();//启动线程
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w); //如果添加失败,就需要做一件事,就是递减实际工作线程数(还记得我们最开始的时候增加了工作线程数吗)
        }
        return workerStarted;//返回结果
    }


这个方法用于线程池中处理即将退出的工作线程的清理和记录工作。除非 completedAbruptly 设置为 true,否则假设 workerCount 已经调整以账户退出。该方法从工作线程集中删除线程,并可能终止池或替换工作线程,如果它由于用户任务异常退出或运行的工作线程少于 corePoolSize,或队列非空但没有工作线程。
参数:
w-工作线程
completedAbruptly-如果工作线程因用户异常死亡。

这个方法处理工作线程退出时的清理和记录工作。首先,如果completedAbruptly被设置为true,则表示工作线程因为用户异常退出,那么会调用decrementWorkerCount()减少工作线程计数。接着,使用ReentrantLock对象mainLock锁定,并将工作线程从workers集合中移除,并增加完成的任务数。
然后调用tryTerminate()尝试终止线程池。如果线程池的运行状态小于STOP,则尝试添加新的工作线程。如果completedAbruptly为false,则只有当运行的工作线程数量小于corePoolSize或者工作队列不为空时才会添加新的工作线程。

 private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        //线程池的运行状态小于STOP
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }



这是工作线程的主要运行循环。它会重复地从队列中获取任务并执行它们,同时处理一些问题:
1.我们可能会先执行初始任务,在这种情况下我们不需要获取第一个任务。否则,只要线程池正在运行,我们就会从getTask中获取任务。如果返回null,则工作线程会由于线程池状态或配置参数的更改而退出。其他退出是由于外部代码中的异常抛出导致的,在这种情况下completedAbruptly为true,通常会导致processWorkerExit替换此线程。
2.在运行任何任务之前,将获取锁以防止在任务执行期间其他线程池中断,然后确保除非线程池停止,否则此线程不会设置其中断。
3.在运行任务之前,会调用beforeExecute,这可能会导致异常,在这种情况下我们会导致线程死亡(使用completedAbruptly true终止循环)而不处理任务。
4.假设beforeExecute正常完成,我们会运行任务,并收集其抛出的任何异常以发送给afterExecute。我们分别处理RuntimeException,Error(规范保证我们处理这两种情况)和任意Throwables。因为我们无法在Runnable.run中重新抛出Throwables,我们在退出时将它们包装在Errors中(传递到线程的UncaughtExceptionHandler)。任何抛出的异常也会保守地导致线程死亡。
5.在任务.run完成后,我们调用afterExecute,它也可能会抛出异常,这也会导致线程死亡。根据JLS Sec 14.20,即使任务.run抛出异常,此异常也将生效。异常机制的净效果是afterExecute和线程的UncaughtExceptionHandler都拥有我们能提供的关于用户代码遇到的任何问题的准确信息。

这个方法运行工作线程w。首先,获取当前线程wt,并获取w的第一个任务。然后设置w的第一个任务为null,并解锁w以允许中断。completedAbruptly设置为true,表示工作线程因为异常而退出。之后,使用while循环获取任务并执行。在循环开始之前锁定w,确保线程池正在运行或线程未被中断。然后调用beforeExecute,执行任务,如果发生异常调用afterExecute并抛出异常。循环结束后,completedAbruptly设置为false,调用processWorkerExit处理工作线程退出。

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    try {
                        task.run();
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }


这个方法用于执行给定的任务command。首先检查command是否为空,如果为空则抛出空指针异常。
接下来,方法执行三个步骤:

1.如果运行的工作线程数量小于corePoolSize,则尝试使用给定的命令作为其第一个任务启动新线程。调用addWorker会原子地检查runState和workerCount,从而防止在不应该添加线程时添加线程,返回false。

2.如果成功地排队了任务,则仍需要再次检查是否应该添加线程(因为自上次检查以来存在的线程已死亡)或线程池自入口处关闭。因此,我们重新检查状态,如果需要,则回滚入队(如果停止)或启动新线程(如果没有线程)。

3.如果无法排队任务,则尝试添加新线程。如果失败,则知道已关闭或饱和,因此拒绝任务。

总之,这个方法会检查当前线程池的状态,如果线程数量小于corePoolSize,则尝试添加新的工作线程来执行给定的任务。如果线程池正在运行并且任务队列未满,则将任务加入队列。如果无法将任务加入队列,则尝试添加新的工作线程,如果无法添加新的工作线程,则拒绝执行该任

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

 

分享到:

专栏

类型标签

网站访问总量