线程池ThreadPoolExecutor

2019-04-05 23:59:23

一. 为什么要使用线程池

  1. 当程序中创建了大量生命周期较短的线程时,比如服务器应用,每接收一个请求就创建一个新的线程去处理,由于每个线程都会经历创建,销毁的阶段,由于这两个阶段涉及到与操作系统交互,为每个线程创建和销毁所消耗的时间与系统资源很有可能比实际处理请求的时间和资源更多。如果采用线程池,当一个请求到达的时候,线程已经存在,可以立即处理请求,消除了创建线程所带来的延迟,使程序响应更快。
  2. 限制线程数量,创建大量线程会大大降低性能,甚至使虚拟机崩溃,而且,一些操作系统是有最大线程数量限制的。当运行的线程数量逼近这个值的时候,操作系统会变得不稳定。

设计线程池的大概思路就是一个容器能够缓存多个线程,另一个容器能够缓存提交的任务,线程池中的线程被创建后开始执行一个循环,每次循环获取一个缓存的任务来执行,获取不到就阻塞成为空闲线程。当然,实际的线程池远远要复杂得多。

二. ThreadPoolExecutor的继承层次

  1. Execotur接口定义了一个执行被提交任务的方法execute,被提交的任务可以在一个新建的线程中执行,在一个池化的线程中执行,或者在调用execute方法的线程中同步执行,这取决于具体的实现类
  1. public interface Executor {
  2. void execute(Runnable command);
  3. }
  1. ExecutorService接口增加了可以返回Future的方法submit,让线程池停止接受新任务的方法shutdownshutdownNow,以及提交批量任务的invokeAllinvokeAny方法
  1. public interface ExecutorService extends Executor {
  2. void shutdown();
  3. List<Runnable> shutdownNow();
  4. boolean isShutdown();
  5. boolean isTerminated();
  6. boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
  7. <T> Future<T> submit(Callable<T> task);
  8. <T> Future<T> submit(Runnable task, T result);
  9. Future<?> submit(Runnable task);
  10. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
  11. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
  12. <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
  13. <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
  14. }
  1. AbstractExecutorService是一个实现了ExecutorService接口的抽象类,实现了submitinvokeAnyinvokeAll方法,在这些方法的实现中,对任务的执行最终还是调用Execotur接口中定义的execute方法。在这个类中有个newTaskFor方法,用于将提交的任务包装成一个RunnableFuture对象作为Future对象从submit方法中返回,默认返回的是juc下的FutureTask类,可以自己重写这个方法,返回自己定义的Future对象。
  1. public <T> Future<T> submit(Callable<T> task) {
  2. if (task == null) throw new NullPointerException();
  3. RunnableFuture<T> ftask = newTaskFor(task);
  4. execute(ftask);
  5. return ftask;
  6. }
  7. protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
  8. return new FutureTask<T>(callable);
  9. }
  1. ThreadPoolExecutor是线程池大部分逻辑的实现。

三. 线程池的状态

  1. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  2. private static final int COUNT_BITS = Integer.SIZE - 3; // 29
  3. private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
  4. // runState is stored in the high-order bits
  5. private static final int RUNNING = -1 << COUNT_BITS;
  6. private static final int SHUTDOWN = 0 << COUNT_BITS;
  7. private static final int STOP = 1 << COUNT_BITS;
  8. private static final int TIDYING = 2 << COUNT_BITS;
  9. private static final int TERMINATED = 3 << COUNT_BITS;
  • 线程池有五种状态:
    1. RUNNING:接受新的任务,并且可以处理队列中等待的任务
    2. SHUTDOWN: 不接受新任务,会等待所有任务处理完(包括阻塞队列中的任务)
    3. STOP:不接受新的任务,不处理阻塞队列中等待的任务,并且中断正在进行的任务
    4. TIDYING:所有任务已经终止,workerCount为0,进入此状态的线程池会运行terminated()钩子方法进入TERMINATED状态
    5. TERMINATEDterminated()方法已经完成
  • 线程池状态变化:
    • RUNNING -> SHUTDOWN:调用shutdown()方法
    • RUNNINGSHUTDOWN -> STOPshutdownNow()
    • SHUTDOWN -> TIDYING:当等待队列和线程池都为空的时候
    • STOP -> TIDYING:当线程池为空
    • TIDYING -> TERMINATEDterminated()方法完成时
  • 原子int变量ctl的高3位是runState即上面的五种状态,低29位是workerCount,活跃线程数量。
  • 有几个和ctl有关的方法:
  1. // Packing and unpacking ctl
  2. // 获取线程池状态
  3. private static int runStateOf(int c) { return c & ~COUNT_MASK; }
  4. // 获取线程池活跃线程数
  5. private static int workerCountOf(int c) { return c & COUNT_MASK; }
  6. // runState与workerCount包装成一个ctl
  7. private static int ctlOf(int rs, int wc) { return rs | wc; }

四. 主要的成员变量

  1. //任务缓存队列,用来存放等待执行的任务
  2. private final BlockingQueue<Runnable> workQueue;
  3. // 线程池的主要锁,对线程池状态的改变都要使用这个锁
  4. private final ReentrantLock mainLock = new ReentrantLock();
  5. // 存放所有的工作线程,只有获取了mainLock的时候才能访问此集合
  6. private final HashSet<Worker> workers = new HashSet<>();
  7. /**
  8. * Wait condition to support awaitTermination.
  9. */
  10. private final Condition termination = mainLock.newCondition();
  11. // 记录线程池中曾经出现过的最大线程数,只有获取了mainLock的时候才能访问
  12. private int largestPoolSize;
  13. // 记录已经执行完毕的任务个数
  14. private long completedTaskCount;
  15. // 线程工厂对象,用来创建线程
  16. private volatile ThreadFactory threadFactory;
  17. // 任务拒绝策略
  18. private volatile RejectedExecutionHandler handler;
  19. // 空闲线程存活时间(纳秒)
  20. private volatile long keepAliveTime;
  21. // 是否允许核心线程在空闲超时后消亡
  22. private volatile boolean allowCoreThreadTimeOut;
  23. // 核心线程数
  24. private volatile int corePoolSize;
  25. // 最大线程数
  26. private volatile int maximumPoolSize;
  • corePoolSize,核心线程数:当一个任务被execute方法提交时,如果正在运行的线程数小于corePoolSize,就创建一个新的线程来运行任务,即使有其他工作线程是空闲的;否则如果正在运行的线程数目大于等于corePoolSize,但是小于maximumPoolSize,将任务放入等待队列等待执行,如果等待队列已满,就创建一个新的线程用于执行任务,如果队列已满,且线程数已经达到maximumPoolSize,拒绝任务。默认情况下,线程池创建后,只有当任务到达时才会开始创建核心线程,但prestartCoreThread方法会创建一个空闲的核心线程等待任务到达,prestartAllCoreThreads方法会创建corePoolSize个空闲线程来等待任务到达。
  • workQueue,等待队列,当任务提交时,如果线程数大于corePoolSize,就将任务放入等待队列。一般有三种阻塞队列策略可以使用:
    1. Direct handoffsSynchronousQueue,提交一个任务时不排队,而是直接交给线程池,当没有线程可用时会排队失败从而新创建一个线程取执行任务。这种策略通常需要一个无界的maximumPoolSize来避免线程池拒绝执行任务,当任务提交速度大于任务处理速度时,可能会造成线程数无限制增长。
    2. Unbounded queuesLinkedBlockingQueue,无界队列,此时maximumPoolSize参数是无效的,而且线程池的最大线程数就是corePoolSize
    3. Bounded queuesArrayBlockingQueue,有界队列,可以限制线程数目,但是参数值不好确定。
  • keepAliveTime,如果线程池的线程数量超过corePoolSize,核心线程之外的线程在空闲超过一定时间就会被销毁。默认情况下,这只应用于超出核心线程之外的线程,但如果调用了allowCoreThreadTimeOut方法,这也会用于核心线程。
  • 任务拒绝策略:当线程池shutdown后,或者阻塞队列满了,且线程数达到maximumPoolSize后,再提交任务会执行拒绝策略:
    1. 默认为ThreadPoolExecutor.AbortPolicy,直接抛RejectedExecutionException
    2. ThreadPoolExecutor.CallerRunsPolicy, 用调用execute方法的线程去执行任务
    3. ThreadPoolExecutor.DiscardPolicy,直接丢弃新提交的任务
    4. ThreadPoolExecutor.DiscardOldestPolicy,如果线程池没有shutdown,队列头的任务会被丢弃,然后重新尝试提交任务,如果失败,会重复上面过程。

五. 构造器

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler) {
  8. if (corePoolSize < 0 ||
  9. maximumPoolSize <= 0 ||
  10. maximumPoolSize < corePoolSize ||
  11. keepAliveTime < 0)
  12. throw new IllegalArgumentException();
  13. if (workQueue == null || threadFactory == null || handler == null)
  14. throw new NullPointerException();
  15. this.corePoolSize = corePoolSize;
  16. this.maximumPoolSize = maximumPoolSize;
  17. this.workQueue = workQueue;
  18. this.keepAliveTime = unit.toNanos(keepAliveTime);
  19. this.threadFactory = threadFactory;
  20. this.handler = handler;
  21. }

剩下的三个构造器最终都是调用这个构造器,它们对线程工厂或任务拒绝策略采用了默认值。

六. execute

  1. public void execute(Runnable command) {
  2. if (command == null)
  3. throw new NullPointerException();
  4. int c = ctl.get();
  5. // 线程数小于核心线程数,创建新的核心线程
  6. if (workerCountOf(c) < corePoolSize) {
  7. if (addWorker(command, true))
  8. return;
  9. c = ctl.get();
  10. }
  11. // 线程数已达到核心线程数
  12. // 线程池处于RUNNING状态,并且任务向队列添加成功
  13. if (isRunning(c) && workQueue.offer(command)) {
  14. int recheck = ctl.get();
  15. // 如果线程池已经不是RUNNING状态,移除刚才添加到队列的任务,并执行拒绝策略
  16. if (! isRunning(recheck) && remove(command))
  17. reject(command);
  18. // 如果工作线程数量为0,创建一个线程什么也不做,之后会从队列中取出任务执行
  19. else if (workerCountOf(recheck) == 0)
  20. addWorker(null, false);
  21. }
  22. // 1. 线程池不是RUNNING状态
  23. // 2. 排队失败,要加一个新的线程
  24. else if (!addWorker(command, false))
  25. reject(command);
  26. }
  1. // 添加线程,core=true是核心线程
  2. private boolean addWorker(Runnable firstTask, boolean core) {
  3. retry:
  4. for (int c = ctl.get();;) {
  5. // Check if queue empty only if necessary.
  6. // private static boolean runStateAtLeast(int c, int s) {
  7. // return c >= s;
  8. // }
  9. if (runStateAtLeast(c, SHUTDOWN)
  10. && (runStateAtLeast(c, STOP)
  11. || firstTask != null
  12. || workQueue.isEmpty()))
  13. return false;
  14. for (;;) {
  15. if (workerCountOf(c) // 线程数已经达到限制了,返回false
  16. >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
  17. return false;
  18. // 尝试CAS增加线程数,如果成功,跳出retry循环
  19. if (compareAndIncrementWorkerCount(c))
  20. break retry;
  21. c = ctl.get(); // Re-read ctl
  22. if (runStateAtLeast(c, SHUTDOWN))
  23. continue retry; // 如果线程池状态发生了变化,重新进入外层循环
  24. // 否则由于线程数目发生变化导致CAS失败,重新执行内层循环,尝试增加线程数目
  25. }
  26. }
  27. boolean workerStarted = false;
  28. boolean workerAdded = false;
  29. Worker w = null;
  30. try {
  31. w = new Worker(firstTask);
  32. final Thread t = w.thread;
  33. if (t != null) {
  34. final ReentrantLock mainLock = this.mainLock;
  35. mainLock.lock();
  36. try {
  37. int c = ctl.get();
  38. // 如果线程池处于RUNNING状态
  39. // 或处于SHUTDOWN状态但是firstTask为null(执行阻塞队列中的任务)
  40. // 向线程池中添加线程
  41. if (isRunning(c) ||
  42. (runStateLessThan(c, STOP) && firstTask == null)) {
  43. if (t.isAlive()) // precheck that t is startable
  44. throw new IllegalThreadStateException();
  45. // 添加到线程池
  46. workers.add(w);
  47. int s = workers.size();
  48. if (s > largestPoolSize)
  49. largestPoolSize = s;
  50. workerAdded = true;
  51. }
  52. } finally {
  53. mainLock.unlock();
  54. }
  55. // 启动线程
  56. if (workerAdded) {
  57. t.start();
  58. workerStarted = true;
  59. }
  60. }
  61. } finally {
  62. if (! workerStarted)
  63. addWorkerFailed(w);
  64. }
  65. return workerStarted;
  66. }

Worker

  1. private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
  2. private static final long serialVersionUID = 6138294804551838833L;
  3. // 用户执行任务的线程
  4. final Thread thread;
  5. // 构造Worker对象时传入的Runnable对象,作为线程的第一个任务
  6. // 可以为null
  7. Runnable firstTask;
  8. // 每个线程完成任务数
  9. volatile long completedTasks;
  10. Worker(Runnable firstTask) {
  11. setState(-1); // inhibit interrupts until runWorker
  12. this.firstTask = firstTask;
  13. this.thread = getThreadFactory().newThread(this);
  14. }
  15. /** Delegates main run loop to outer runWorker. */
  16. public void run() {
  17. runWorker(this);
  18. }
  19. // Lock methods
  20. //
  21. // The value 0 represents the unlocked state.
  22. // The value 1 represents the locked state.
  23. protected boolean isHeldExclusively() {
  24. return getState() != 0;
  25. }
  26. protected boolean tryAcquire(int unused) {
  27. if (compareAndSetState(0, 1)) {
  28. setExclusiveOwnerThread(Thread.currentThread());
  29. return true;
  30. }
  31. return false;
  32. }
  33. protected boolean tryRelease(int unused) {
  34. setExclusiveOwnerThread(null);
  35. setState(0);
  36. return true;
  37. }
  38. public void lock() { acquire(1); }
  39. public boolean tryLock() { return tryAcquire(1); }
  40. public void unlock() { release(1); }
  41. public boolean isLocked() { return isHeldExclusively(); }
  42. void interruptIfStarted() {
  43. Thread t;
  44. if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
  45. try {
  46. t.interrupt();
  47. } catch (SecurityException ignore) {
  48. }
  49. }
  50. }
  51. }

Worker类实现了Runnable接口,包装了一个线程对象,在构造线程对象时,把自身作为Runnable对象传入。
同时Worker也继承自AQS,实现了一个不可重入的独占锁,在执行一个任务的前后获取锁和释放锁。
启动线程后,开始执行Worker对象的run方法,run方法在内部调用线程池的runWorker方法:

  1. // 执行这个方法的线程时Worker里面的线程
  2. final void runWorker(Worker w) {
  3. Thread wt = Thread.currentThread();
  4. Runnable task = w.firstTask;
  5. w.firstTask = null;
  6. w.unlock(); // 允许中断
  7. boolean completedAbruptly = true; // 是否是因为任务执行异常而退出
  8. try {
  9. // 如果Worker对象的firstTask不为null 或 从同步队列中获取到任务
  10. while (task != null || (task = getTask()) != null) { // 如果这里获取不到任务,
  11. w.lock(); // 获取锁,防止其他线程中断该线程 // 就要在下面执行processWorkerExit来销毁这个Worker
  12. // 如果线程池处于STOP,保证当前线程是中断状态(可能是此线程执行到这里时另一个线程执行了shutdownNow)
  13. // 否则保证当前线程不是被中断状态,因为线程是复用的,之前的任务的中断不能影响之后的任务
  14. if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
  15. && !wt.isInterrupted())
  16. wt.interrupt();
  17. try {
  18. // beforeExecute与afterExecute是两个空的钩子方法,可以自己继承线程池去实现
  19. beforeExecute(wt, task);
  20. try {
  21. task.run(); // 执行任务
  22. afterExecute(task, null);
  23. } catch (Throwable ex) {
  24. afterExecute(task, ex);
  25. throw ex;
  26. }
  27. } finally {
  28. task = null;
  29. w.completedTasks++;
  30. w.unlock();
  31. }
  32. }
  33. completedAbruptly = false;
  34. } finally {
  35. processWorkerExit(w, completedAbruptly);
  36. }
  37. }
  1. // 从阻塞队列中获取任务
  2. private Runnable getTask() {
  3. boolean timedOut = false; // 从阻塞队列中取任务是否超时
  4. for (;;) {
  5. int c = ctl.get();
  6. // 如果状态>=SHUTDOWN,也就是不在RUNNING状态下
  7. // 线程池STOP 或 阻塞队列为空
  8. // 减少workerCount并返回null
  9. // 因为如果当前线程池状态的值是SHUTDOWN或以上时,不允许再向阻塞队列中添加任务
  10. // 如果此时为SHUTDOWN且阻塞队列为空,也就没必要继续获取任务了
  11. // 而如果为STOP,不会去执行阻塞队列中剩下的任务,也没必要去获取任务
  12. if (runStateAtLeast(c, SHUTDOWN)
  13. && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
  14. decrementWorkerCount();
  15. return null;
  16. }
  17. int wc = workerCountOf(c);
  18. // 允许核心线程超时 或 线程数大于核心线程数
  19. // timed用于判断是否需要超时控制
  20. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  21. // 线程数量超过最大限制(可能执行了setMaximumPoolSize) 或 需要超时控制并且超时
  22. // 且 线程池线程数量大于1 或 阻塞队列为空
  23. // 减少线程数量 并 返回null
  24. if ((wc > maximumPoolSize || (timed && timedOut))
  25. && (wc > 1 || workQueue.isEmpty())) {
  26. if (compareAndDecrementWorkerCount(c))
  27. return null;
  28. continue;
  29. }
  30. try {
  31. // 根据是否需要超时控制,采取不同策略从阻塞队列中获取任务
  32. Runnable r = timed ?
  33. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
  34. workQueue.take();
  35. if (r != null)
  36. return r;
  37. timedOut = true;
  38. } catch (InterruptedException retry) {
  39. timedOut = false;
  40. }
  41. }
  42. }

线程池中的每个线程最终都会执行processWorkerExit这个方法退出,所以会在这里检查是否有必要将线程池状态转化为TIDYING以及TERMINATED

  1. // 执行到这个方法有以下几种可能:
  2. // 1. 执行的任务抛异常
  3. // 2. 阻塞队列中获取不到任务,这个还分为以下两种情形:
  4. // (1) 获取任务超时
  5. // (2) 被shutdown或shutdownNow方法中断
  6. private void processWorkerExit(Worker w, boolean completedAbruptly) {
  7. if (completedAbruptly) // 如果线程因为异常而退出,线程数减一
  8. decrementWorkerCount(); // 正常退出的话在getTask方法中已经减过了,这里就不用了
  9. final ReentrantLock mainLock = this.mainLock;
  10. mainLock.lock();
  11. try {
  12. completedTaskCount += w.completedTasks;
  13. workers.remove(w); // 移除线程
  14. } finally {
  15. mainLock.unlock();
  16. }
  17. // 根据线程池状态进行判断是否结束线程池
  18. tryTerminate();
  19. int c = ctl.get();
  20. if (runStateLessThan(c, STOP)) {
  21. // 当线程池状态是RUNNING 或 SHUTDOWN时
  22. // 如果线程不是因为异常而退出的,直接addWorker添加非核心线程去执行队列中的任务
  23. // 否则,算出一个最小的线程数量,如果当前线程数量比这个最小值大,直接返回,否则也addWorker
  24. if (!completedAbruptly) {
  25. int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
  26. if (min == 0 && ! workQueue.isEmpty())
  27. min = 1;
  28. if (workerCountOf(c) >= min)
  29. return; // replacement not needed
  30. }
  31. addWorker(null, false);
  32. }
  33. }
  1. // 根据线程池状态进行判断是否结束线程池
  2. final void tryTerminate() {
  3. for (;;) {
  4. int c = ctl.get();
  5. if (isRunning(c) || // 正在运行,不能停止线程池
  6. runStateAtLeast(c, TIDYING) || // TIDYING 或 TERMINATED
  7. (runStateLessThan(c, STOP) && ! workQueue.isEmpty())) // SHUTDOWN并且队列不为空,此时要执行完队列中的任务,所以也不能停止线程池
  8. return;
  9. // 如果线程数量不为0,随机中断一个空闲线程并返回
  10. if (workerCountOf(c) != 0) { // Eligible to terminate
  11. interruptIdleWorkers(ONLY_ONE);
  12. return;
  13. }
  14. // 运行到这里说明线程数量为0,进入TIDYING状态
  15. final ReentrantLock mainLock = this.mainLock;
  16. mainLock.lock();
  17. try {
  18. // 尝试设置状态为TIDYING,如果设置成功,则调用terminated方法
  19. if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
  20. try {
  21. terminated(); // 空的钩子方法
  22. } finally {
  23. // 设置状态为TERMINATED
  24. ctl.set(ctlOf(TERMINATED, 0));
  25. termination.signalAll();
  26. }
  27. return;
  28. }
  29. } finally {
  30. mainLock.unlock();
  31. }
  32. // else retry on failed CAS
  33. }
  34. }

七. shutdown

不再接受新提交的任务,但是会完成之前提交的任务,包括阻塞队列中的。

  1. public void shutdown() {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. // 安全策略判断
  6. checkShutdownAccess();
  7. // 线程池状态设置为SHUTDOWN
  8. advanceRunState(SHUTDOWN);
  9. // 中断空闲线程
  10. interruptIdleWorkers();
  11. onShutdown(); // hook for ScheduledThreadPoolExecutor
  12. } finally {
  13. mainLock.unlock();
  14. }
  15. tryTerminate();
  16. }
  1. private void interruptIdleWorkers() {
  2. interruptIdleWorkers(false);
  3. }
  4. private void interruptIdleWorkers(boolean onlyOne) {
  5. final ReentrantLock mainLock = this.mainLock;
  6. mainLock.lock();
  7. try {
  8. // 遍历所有线程
  9. for (Worker w : workers) {
  10. Thread t = w.thread;
  11. // 获取到Worker对象锁说明线程是空闲的
  12. if (!t.isInterrupted() && w.tryLock()) {
  13. try {
  14. t.interrupt();
  15. } catch (SecurityException ignore) {
  16. } finally {
  17. w.unlock();
  18. }
  19. }
  20. if (onlyOne)
  21. break;
  22. }
  23. } finally {
  24. mainLock.unlock();
  25. }
  26. }

Worker继承AQS实现了一个不可重入的独占锁,在执行任务前,会加锁,此时其他线程去尝试获取此Worker对象锁会失败,这样就可以判定线程不是空闲的。

八. shutdownNow

不再接受新提交的任务,同时也会尝试中断当前正在执行的任务,阻塞队列中剩下的任务也会被丢弃。返回尚未执行的任务。

  1. public List<Runnable> shutdownNow() {
  2. List<Runnable> tasks;
  3. final ReentrantLock mainLock = this.mainLock;
  4. mainLock.lock();
  5. try {
  6. // 安全策略判断
  7. checkShutdownAccess();
  8. // 线程池状态设置为STOP
  9. advanceRunState(STOP);
  10. // 中断所有线程,无论是否空闲
  11. interruptWorkers();
  12. // 取出队列中没有被执行的任务返回
  13. tasks = drainQueue();
  14. } finally {
  15. mainLock.unlock();
  16. }
  17. tryTerminate();
  18. return tasks;
  19. }
  1. private void interruptWorkers() {
  2. // assert mainLock.isHeldByCurrentThread();
  3. for (Worker w : workers)
  4. w.interruptIfStarted();
  5. }
  6. // ThreadPoolExecutor.Worker
  7. void interruptIfStarted() {
  8. Thread t;
  9. if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
  10. try {
  11. t.interrupt();
  12. } catch (SecurityException ignore) {
  13. }
  14. }
  15. }

0
1
0

添加评论

正在回复:
取消
0
0
1
0