AQS与Condition

2019-02-28 21:47:32

使用了Condition对象后,除了AQS内部维护的一个同步队列之外,还有一个在Condition对象中的等待队列,调用await方法而等待的线程处于等待队列中,被signal的线程会从等待队列移入同步队列,等待获取锁。

一. Condition接口

Condition是一个接口,它的实现类是在AQS中定义的一个内部类ConditionObject

  1. public interface Condition {
  2. void await() throws InterruptedException;
  3. void awaitUninterruptibly();
  4. long awaitNanos(long nanosTimeout) throws InterruptedException;
  5. boolean await(long time, TimeUnit unit) throws InterruptedException;
  6. boolean awaitUntil(Date deadline) throws InterruptedException;
  7. void signal();
  8. void signalAll();
  9. }

ConditionObject内部维护了一个等待队列,是一个单向链表,firstWaiter是队头,lastWaiter是队尾。

二. await()

  1. public final void await() throws InterruptedException {
  2. if (Thread.interrupted())
  3. throw new InterruptedException();
  4. Node node = addConditionWaiter();
  5. // 记录锁释放之前的状态
  6. int savedState = fullyRelease(node);
  7. int interruptMode = 0;
  8. // 当节点不在同步队列中,即在等待队列中时,在这里循环
  9. // 直到节点进入同步队列竞争锁
  10. while (!isOnSyncQueue(node)) {
  11. // 如果节点还在等待队列中,park
  12. LockSupport.park(this);
  13. // 线程被中断后立即跳出循环
  14. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  15. break;
  16. }
  17. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  18. interruptMode = REINTERRUPT;
  19. if (node.nextWaiter != null) // clean up if cancelled
  20. unlinkCancelledWaiters();
  21. if (interruptMode != 0)
  22. reportInterruptAfterWait(interruptMode);
  23. }

先调用addConditionWaiter()方法将当前线程加入等待队列:

  1. private Node addConditionWaiter() {
  2. Node t = lastWaiter;
  3. // 清除等待队列中已取消的节点
  4. if (t != null && t.waitStatus != Node.CONDITION) {
  5. unlinkCancelledWaiters();
  6. t = lastWaiter;
  7. }
  8. // 入队列
  9. Node node = new Node(Thread.currentThread(), Node.CONDITION);
  10. if (t == null)
  11. firstWaiter = node;
  12. else
  13. t.nextWaiter = node;
  14. lastWaiter = node;
  15. return node;
  16. }

然后调用fullyRelease方法将锁释放,以便让其他线程能够获取锁,然后满足条件时来唤醒此线程:

  1. final int fullyRelease(Node node) {
  2. boolean failed = true;
  3. try {
  4. int savedState = getState();
  5. if (release(savedState)) {
  6. failed = false;
  7. // 返回锁释放之前的状态
  8. return savedState;
  9. } else {
  10. // 释放失败说明该线程没有获取锁就释放锁,即没获取锁就调用await
  11. throw new IllegalMonitorStateException();
  12. }
  13. } finally {
  14. if (failed) // 释放失败,节点设为取消
  15. node.waitStatus = Node.CANCELLED;
  16. }
  17. }

然后调用isOnSyncQueue方法不断检查节点是否在同步队列中,如果不在,那就是在等待队列中,这时应一直在这里循环挂起,直到节点进入同步队列竞争锁。

  1. // 如果节点在同步队列中,返回true,在等待队列中返回false
  2. final boolean isOnSyncQueue(Node node) {
  3. // 节点的ws为CONDITION,肯定是在等待队列中
  4. // 由于node节点是在await方法中新建的,所以next,prev最初都为null
  5. // 而另一个线程调用signal方法唤醒此线程时,会将此node放入同步队列,
  6. // 此时node一定会有前继节点(enq方法),所以prev == null一定是在等待队列中
  7. // 但如果prev != null,并不能说明节点就是在同步队列中(下面注释)
  8. if (node.waitStatus == Node.CONDITION || node.prev == null)
  9. return false;
  10. // next == null不能说明是在等待队列中,因为可能是同步队列中的尾节点
  11. // 所以,反过来,如果next != null,那一定是在同步队列中
  12. if (node.next != null) // If has successor, it must be on queue
  13. return true;
  14. // node.prev != null时,节点也有可能不在同步队列中,因为在enq方法中,
  15. // 是先node.prev = t,然后用CAS更改尾指针tail,如果这个CAS成功了,才能算是node进入同步队列
  16. // 但如果失败了,就不在同步队列中,这个方法返回true后,是要继续竞争锁的,但如果这时候节点还没完全入队,是不可以的
  17. // 由于节点是插入到队列尾部的,所以从尾部向前遍历查找会效率高些
  18. return findNodeFromTail(node);
  19. }
  20. private boolean findNodeFromTail(Node node) {
  21. Node t = tail;
  22. for (;;) {
  23. if (t == node)
  24. return true;
  25. if (t == null)
  26. return false;
  27. t = t.prev;
  28. }
  29. }

检查线程在等待的时候是否被中断,如果没有被中断返回0;
如果被中断,如果是在被signal之前中断的,返回THROW_IE,如果之后中断的,返回REINTERRUPT
REINTERRUPT:从wait退出后应恢复中断;
THROW_IE:从wait退出后应抛出InterruptedException

  1. private int checkInterruptWhileWaiting(Node node) {
  2. return Thread.interrupted() ?
  3. (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
  4. 0;
  5. }
  6. // 如果线程在被signal之前取消,返回true
  7. final boolean transferAfterCancelledWait(Node node) {
  8. // 这个CAS只有可能在两个地方出现,一个是这里,另一个是signal此线程的时候transferForSignal方法内
  9. // 如果这里的CAS成功了,说明此线程还没有被signal,将节点加入同步队列,返回true(THROW_IE)
  10. if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
  11. enq(node);
  12. return true;
  13. }
  14. // 此节点被signal后,还未完全进入同步队列,让出cpu时间
  15. while (!isOnSyncQueue(node))
  16. Thread.yield();
  17. // 确保节点已经进入同步队列,返回false(REINTERRUPT)
  18. return false;
  19. }

到这里,在await()方法中,while()循环部分已经执行完了,不管是被signal还是线程被中断而退出循环,执行到这里,此线程都在同步队列中,等待获取状态(对acquireQueued方法的调用),acquireQueued方法会返回在同步队列中等待时线程是否被中断,如果在等待获取状态时被中断,而且之前被中断而得到的interruptMode不为THROW_IE时(不需要抛中断异常),将interruptMode设为REINTERRUPT,说明待会要恢复中断状态。
在方法的最后会有对中断的统一处理

  1. if (interruptMode != 0)
  2. reportInterruptAfterWait(interruptMode);
  3. private void reportInterruptAfterWait(int interruptMode)
  4. throws InterruptedException {
  5. if (interruptMode == THROW_IE)
  6. throw new InterruptedException();
  7. else if (interruptMode == REINTERRUPT)
  8. selfInterrupt();
  9. }

在处理中断之前,还有这么一段代码来清除已经取消节点:

  1. if (node.nextWaiter != null) // clean up if cancelled
  2. unlinkCancelledWaiters();
  3. // 遍历一遍链表,删除ws不为CONDITION的节点
  4. private void unlinkCancelledWaiters() {
  5. Node t = firstWaiter;
  6. Node trail = null;
  7. while (t != null) {
  8. Node next = t.nextWaiter;
  9. if (t.waitStatus != Node.CONDITION) {
  10. t.nextWaiter = null;
  11. if (trail == null) // 删的是头节点
  12. firstWaiter = next;
  13. else
  14. // 前一个CONDITION节点的next指向下一个节点
  15. trail.nextWaiter = next;
  16. if (next == null)
  17. lastWaiter = trail;
  18. }
  19. else
  20. // CONDITION的节点不需要删除
  21. trail = t;
  22. t = next;
  23. }
  24. }

三. signal()

  1. public final void signal() {
  2. // 如果当前线程没上锁,抛异常
  3. if (!isHeldExclusively())
  4. throw new IllegalMonitorStateException();
  5. Node first = firstWaiter;
  6. if (first != null)
  7. doSignal(first);
  8. }

将等待队列的队头节点放入同步队列去等待获取锁

  1. private void doSignal(Node first) {
  2. do {
  3. // 从等待队列中删除头节点并将其移动到同步队列,成功则结束
  4. if ( (firstWaiter = first.nextWaiter) == null)
  5. lastWaiter = null;
  6. first.nextWaiter = null;
  7. } while (!transferForSignal(first) &&
  8. (first = firstWaiter) != null);
  9. }
  1. // 将节点从等待队列放入同步队列中
  2. final boolean transferForSignal(Node node) {
  3. // 如果这个CAS失败了,说明线程被中断了
  4. if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
  5. return false;
  6. // 上面的CAS成功,节点入同步队列,并返回在它同步队列中的前继节点
  7. Node p = enq(node);
  8. int ws = p.waitStatus;
  9. // 如果它的前继节点被取消了,唤醒node中的线程
  10. // 否则如果CAS设置尾SIGNAL失败,也唤醒node中的线程,设置成功的话则不唤醒,由同步队列自己去解决
  11. // 总之,这里保证了线程会被unpark
  12. // 由这里唤醒后会进入acquireQueued方法
  13. if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
  14. LockSupport.unpark(node.thread);
  15. return true;
  16. }

0
0
0

添加评论

正在回复:
取消
0
0
0
0