FutureTask

2019-04-13 19:17:23

一. FutureTask继承体系

RunnableFuture接口继承了RunnableFuture接口,FutureTask类实现了RunnableFuture接口

  1. public interface Future<V> {
  2. /**
  3. * 尝试取消正在执行的任务,如果任务已经完成,或者已经被取消了,或者由于其他原因而不能被取消,或取消失败。
  4. * 如果取消的时候,任务没有开始执行,那么任务之后不会被执行。
  5. * 如果任务正在执行,那么参数mayInterruptIfRunning决定了是否要中断执行任务的线程。
  6. *
  7. * 这个方法返回true后,后续对isDone 和 isCancelled 方法的调用都会返回true
  8. *
  9. * 取消成功返回true,否则返回false
  10. */
  11. boolean cancel(boolean mayInterruptIfRunning);
  12. /**
  13. * 任务在正常完成之前是否被取消
  14. */
  15. boolean isCancelled();
  16. /**
  17. * 返回任务是否完成
  18. * 任务完成包括 正常结束、由于异常结束、或者是任务被取消。
  19. */
  20. boolean isDone();
  21. /**
  22. * 等待任务完成,返回结果
  23. */
  24. V get() throws InterruptedException, ExecutionException;
  25. /**
  26. * get的超时版本,如果超时会抛TimeoutException
  27. */
  28. V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
  29. }
  1. public interface RunnableFuture<V> extends Runnable, Future<V> {
  2. void run();
  3. }

二. FutureTask的状态和几个实例变量

  1. private volatile int state;
  2. /**
  3. * 可能的状态变化
  4. * NEW -> COMPLETING -> NORMAL
  5. * NEW -> COMPLETING -> EXCEPTIONAL
  6. * NEW -> CANCELLED
  7. * NEW -> INTERRUPTING -> INTERRUPTED
  8. */
  9. private static final int NEW = 0; // 新建,初始化状态
  10. private static final int COMPLETING = 1; // 正在完成
  11. private static final int NORMAL = 2; // 任务正常执行完
  12. private static final int EXCEPTIONAL = 3; // 任务执行过程中抛异常了
  13. private static final int CANCELLED = 4; // 任务被取消了
  14. private static final int INTERRUPTING = 5; // 正在中断
  15. private static final int INTERRUPTED = 6; // 中断完成

只能通过setsetExceptioncancel这几个方法改变状态

  1. // 任务正常执行完成
  2. // 先将状态 NEW -> COMPLETING,然后设置任务返回值
  3. // 设置完返回值后再修改状态 COMPLETING -> NORMAL
  4. protected void set(V v) {
  5. if (STATE.compareAndSet(this, NEW, COMPLETING)) {
  6. outcome = v;
  7. STATE.setRelease(this, NORMAL); // final state
  8. finishCompletion();
  9. }
  10. }
  11. // 执行过程抛异常,则任务的返回值为异常对象
  12. // NEW -> COMPLETING -> EXCEPTIONAL
  13. // 由于调用cancel方法产生的InterruptedException并不会在这里被设置
  14. // 因为调用cancel后,在这里state不是NEW,CAS会失败
  15. protected void setException(Throwable t) {
  16. if (STATE.compareAndSet(this, NEW, COMPLETING)) {
  17. outcome = t;
  18. STATE.setRelease(this, EXCEPTIONAL); // final state
  19. finishCompletion();
  20. }
  21. }
  1. // 执行的任务
  2. private Callable<V> callable;
  3. // 返回的结果或从get()方法中抛出的异常
  4. private Object outcome; // non-volatile, protected by state reads/writes
  5. // 执行任务的线程
  6. private volatile Thread runner;
  7. // 等待队列的头节点
  8. private volatile WaitNode waiters;

三. 构造器

  1. public FutureTask(Callable<V> callable) {
  2. if (callable == null)
  3. throw new NullPointerException();
  4. this.callable = callable;
  5. this.state = NEW; // ensure visibility of callable
  6. }
  7. // 将Runnable对象包装成Callable对象
  8. public FutureTask(Runnable runnable, V result) {
  9. this.callable = Executors.callable(runnable, result);
  10. this.state = NEW; // ensure visibility of callable
  11. }

四. run方法

  1. public void run() {
  2. // 如果state不是NEW(可能被取消,或任务已经执行过了) 或 任务已经由其他线程执行
  3. // 直接返回,不执行任务
  4. if (state != NEW ||
  5. !RUNNER.compareAndSet(this, null, Thread.currentThread()))
  6. return;
  7. try {
  8. Callable<V> c = callable;
  9. if (c != null && state == NEW) {
  10. V result;
  11. boolean ran;
  12. try {
  13. result = c.call(); // 执行任务
  14. ran = true;
  15. } catch (Throwable ex) {
  16. result = null;
  17. ran = false;
  18. // 设置异常状态
  19. setException(ex);
  20. }
  21. // 设置正常执行的返回结果
  22. if (ran)
  23. set(result);
  24. }
  25. } finally {
  26. runner = null;
  27. int s = state;
  28. // 如果正在中断,等待至响应中断完成
  29. if (s >= INTERRUPTING)
  30. handlePossibleCancellationInterrupt(s);
  31. }
  32. }

五. get方法

因调用get()方法而阻塞的线程会被封装成WaitNode对象并放到链表里排队

  1. static final class WaitNode {
  2. volatile Thread thread;
  3. volatile WaitNode next;
  4. WaitNode() { thread = Thread.currentThread(); }
  5. }
  1. public V get() throws InterruptedException, ExecutionException {
  2. int s = state;
  3. // 如果任务还没完成,则等待至完成,并获取完成时的状态
  4. if (s <= COMPLETING)
  5. s = awaitDone(false, 0L);
  6. return report(s); // 返回结果
  7. }
  8. private int awaitDone(boolean timed, long nanos) throws InterruptedException {
  9. long startTime = 0L; // Special value 0L means not yet parked
  10. WaitNode q = null;
  11. boolean queued = false;
  12. for (;;) {
  13. int s = state;
  14. // 任务完成,返回完成时的状态
  15. if (s > COMPLETING) {
  16. if (q != null)
  17. q.thread = null;
  18. return s;
  19. }
  20. else if (s == COMPLETING)
  21. // 正在设置结果,让出cpu时间给其他线程
  22. Thread.yield();
  23. else if (Thread.interrupted()) {
  24. // 线程被中断,抛出InterruptedException
  25. removeWaiter(q);
  26. throw new InterruptedException();
  27. }
  28. else if (q == null) {
  29. // 如果需要计时并且超时了,直接返回状态
  30. if (timed && nanos <= 0L)
  31. return s;
  32. q = new WaitNode();
  33. }
  34. else if (!queued)
  35. // 如果还没排队,头插法将当前线程加入链表
  36. queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
  37. else if (timed) {
  38. // 需要计时
  39. final long parkNanos;
  40. if (startTime == 0L) { // 第一次,还没Park过
  41. startTime = System.nanoTime();
  42. if (startTime == 0L)
  43. startTime = 1L;
  44. parkNanos = nanos;
  45. } else {
  46. long elapsed = System.nanoTime() - startTime;
  47. // 从开始到现在经历过的时间大于超时时间
  48. // 队列中删除此线程,返回状态
  49. if (elapsed >= nanos) {
  50. removeWaiter(q);
  51. return state;
  52. }
  53. parkNanos = nanos - elapsed;
  54. }
  55. // 重新检查一下状态,确定需要Park
  56. if (state < COMPLETING)
  57. LockSupport.parkNanos(this, parkNanos);
  58. }
  59. else // 没有设置超时,park
  60. LockSupport.park(this);
  61. }
  62. }
  63. // 根据任务完成时的状态处理结果
  64. @SuppressWarnings("unchecked")
  65. private V report(int s) throws ExecutionException {
  66. Object x = outcome;
  67. if (s == NORMAL) // 正常执行完,返回结果
  68. return (V)x;
  69. if (s >= CANCELLED) // 任务取消后调用get方法,抛异常
  70. throw new CancellationException();
  71. throw new ExecutionException((Throwable)x); // 抛出执行任务过程中出现的异常
  72. }

六. cannel方法

  1. // 根据mayInterruptIfRunning设置值不同,
  2. // 状态变化可能是 NEW -> INTERRUPTING -> INTERRUPTED 或 NEW -> CANCELLED
  3. public boolean cancel(boolean mayInterruptIfRunning) {
  4. // 如果state不为NEW,或CAS修改状态失败(可能由其他线程在修改状态),返回false
  5. if (!(state == NEW && STATE.compareAndSet
  6. (this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
  7. return false;
  8. try { // in case call to interrupt throws exception
  9. if (mayInterruptIfRunning) {
  10. try {
  11. Thread t = runner;
  12. if (t != null)
  13. t.interrupt(); // 中断线程
  14. } finally { // final state
  15. STATE.setRelease(this, INTERRUPTED);
  16. }
  17. }
  18. } finally {
  19. finishCompletion();
  20. }
  21. return true;
  22. }

七. finishCompletion

这个方法是在setsetExceptioncancel方法中最后被调用的,即任务完成之后被调用的。
主要作用就是将因调用get方法而阻塞在链表中的线程全部唤醒,并最后执行done()方法。

  1. private void finishCompletion() {
  2. // assert state > COMPLETING;
  3. for (WaitNode q; (q = waiters) != null;) {
  4. if (WAITERS.weakCompareAndSet(this, q, null)) {
  5. for (;;) {
  6. Thread t = q.thread;
  7. if (t != null) {
  8. q.thread = null;
  9. LockSupport.unpark(t); // 唤醒线程
  10. }
  11. WaitNode next = q.next;
  12. if (next == null)
  13. break;
  14. q.next = null; // unlink to help gc
  15. q = next;
  16. }
  17. break;
  18. }
  19. }
  20. // 一个空的钩子方法,子类可以重写它来实现自己的逻辑
  21. done();
  22. callable = null; // to reduce footprint
  23. }

八. 获取状态的方法

  1. // 根据cancel方法参数不同,状态有两种变化,但都是>=CANCELLED
  2. public boolean isCancelled() {
  3. return state >= CANCELLED;
  4. }
  5. // 只要不是NEW,就是done
  6. public boolean isDone() {
  7. return state != NEW;
  8. }

九. 总结

  • 当任务没完成时,调用get方法的线程会通过自旋加LockSupport.parkLockSupport.parkNanos方法等待至任务完成,同时该线程会被封装成WaitNode对象存放于一个链表中
  • 任务正常执行完后会调用set方法,执行过程抛异常会执行setException方法,取消这个任务时调用cancel方法,取消任务又分为只将状态设为CANCELLED和中断线程两种。只有这三个方法才能改变任务的状态,对应于这四种情况,状态变化分别是:
    1. NEW -> COMPLETING -> NORMAL
    2. NEW -> COMPLETING -> EXCEPTIONAL
    3. NEW -> CANCELLED
    4. NEW -> INTERRUPTING -> INTERRUPTED
  • NORMALEXCEPTIONALCANCELLEDINTERRUPTED这四种状态又意味着任务结束,所以上面三个方法最后都会调用finishCompletion方法来唤醒链表中所有的线程,让这些因调用get方法而阻塞的线程获取运行结果,继续执行
  • 执行任务过程中抛出的异常会在调用get方法时包装成ExecutionException被抛出(不包括由cancel方法造成的InterruptedException)
  • get方法返回结果之前任务已经被取消,get方法会抛CancellationException

0
1
0

添加评论

正在回复:
取消
0
0
1
0