读写锁

8天前

读写锁也是基于AQS实现的,内部有一个Sync的抽象类,是AQS的子类,然后Sync的子类NonfairSyncFairSync分别实现了非公平与公平的策略,然后内部类WriteLockReadLock在其内部通过Sync的子类实现锁的接口。读写锁是AQS独占模式与共享模式的组合。

一. 构造器

  1. public ReentrantReadWriteLock() {
  2. this(false);
  3. }
  4. public ReentrantReadWriteLock(boolean fair) {
  5. sync = fair ? new FairSync() : new NonfairSync();
  6. readerLock = new ReadLock(this);
  7. writerLock = new WriteLock(this);
  8. }

默认是用非公平锁策略的,在读锁与写锁对象中分别保存了一份Sync对象的引用。

二. Sync

  1. static final int SHARED_SHIFT = 16;
  2. static final int SHARED_UNIT = (1 << SHARED_SHIFT);
  3. static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
  4. static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
  5. /** Returns the number of shared holds represented in count */
  6. static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
  7. /** Returns the number of exclusive holds represented in count */
  8. static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

AQS的子类Sync中,状态state的高16位用来存放获取的读锁的获取次数(包括重入),低16位用来存放获取的写锁的重入次数。

  1. //用来记录最后一个获取读锁的线程获取读锁的重入次数
  2. private transient HoldCounter cachedHoldCounter;
  3. //用来记录第一个获取到读锁的线程
  4. private transient Thread firstReader
  5. //用来记录第一个获取到读锁的线程获取读锁的重入次数
  6. private transient int firstReadHoldCount;
  7. //用来存放除去第一个获取读锁线程外的其他线程获取读锁的重入次数
  8. private transient ThreadLocalHoldCounter readHolds = new ThreadLocalHoldCounter();
  1. // 用来记录每个线程读锁重入的次数
  2. static final class HoldCounter {
  3. int count = 0;
  4. // Use id, not reference, to avoid garbage retention
  5. final long tid = Thread.currentThread().getId();
  6. }
  7. static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
  8. public HoldCounter initialValue() {
  9. return new HoldCounter();
  10. }
  11. }

firstReadercachedHoldCounter是用来提高性能的,获取线程读锁重入次数时,先从这两个缓存中获取,如果这两个缓存不是当前线程的,再从ThreadLocal中获取。

三. 读锁

读锁是利用AQS的共享模式
获取读锁:

  1. // ReentrantReadWriteLock.ReadLock
  2. public void lock() {
  3. sync.acquireShared(1);
  4. }

AQS中,acquireShared方法中会调用子类中对tryAcquireShared方法的实现

  1. // ReentrantReadWriteLock.Sync
  2. protected final int tryAcquireShared(int unused) {
  3. Thread current = Thread.currentThread();
  4. int c = getState();
  5. // 1. 写锁已被其他线程占有,获取失败
  6. if (exclusiveCount(c) != 0 &&
  7. getExclusiveOwnerThread() != current)
  8. return -1;
  9. int r = sharedCount(c);
  10. // 多个线程同时获取读锁,最多只会有一个线程获取成功
  11. // 获取不成功的线程进入fullTryAcquireShared继续尝试获取
  12. if (!readerShouldBlock() && // 2. 如果是公平锁策略要看前面有没有线程在排队
  13. r < MAX_COUNT && // 3. 锁被获取数量没达到上限
  14. compareAndSetState(c, c + SHARED_UNIT)) { // 4. CAS是否成功
  15. if (r == 0) {
  16. // 读锁第一次被获取
  17. // 记录下第一个获取读锁的线程和此线程重入次数
  18. firstReader = current;
  19. firstReaderHoldCount = 1;
  20. } else if (firstReader == current) {
  21. // 第一次获取读锁的线程重入读锁,更新重入次数
  22. firstReaderHoldCount++;
  23. } else {
  24. // 记录下最后一个获取读锁的线程及其重入次数
  25. HoldCounter rh = cachedHoldCounter;
  26. if (rh == null || rh.tid != current.getId()) // cachedHoldCounter为空或不是当前线程的
  27. cachedHoldCounter = rh = readHolds.get();// 设置为缓存当前线程
  28. else if (rh.count == 0) // cachedHoldCounter是当前线程的,并且重入次数为0
  29. readHolds.set(rh);
  30. rh.count++;
  31. }
  32. return 1;
  33. }
  34. return fullTryAcquireShared(current);
  35. }
  36. // 到这里是因为:1. 公平锁 2. 上面的CAS失败
  37. // 这个方法和上面的方法逻辑差不多,但是在一个循环中自旋获取锁
  38. final int fullTryAcquireShared(Thread current) {
  39. HoldCounter rh = null;
  40. for (;;) {
  41. int c = getState();
  42. // 其他线程占有写锁,当前线程只能去排队
  43. if (exclusiveCount(c) != 0) {
  44. if (getExclusiveOwnerThread() != current)
  45. return -1;
  46. // else we hold the exclusive lock; blocking here
  47. // would cause deadlock.
  48. } else if (readerShouldBlock()) {
  49. // 写锁没有被占用 且 同步队列前面有其他线程在等待
  50. // 这里来确保没有重入读锁的线程都会去排队,重入的进入下面的CAS去处理
  51. if (firstReader == current) {
  52. // firstReader 线程重入读锁,直接到下面的 CAS
  53. // 这里count肯定不为0,因为在释放锁的时候会清空firstReader
  54. } else {
  55. if (rh == null) {
  56. rh = cachedHoldCounter;
  57. if (rh == null || rh.tid != current.getId()) {
  58. // cachedHoldCounter为空或不是缓存当前线程的
  59. // 从ThreadLocal中获取,如果之前没获取过,get会初始化一个
  60. rh = readHolds.get();
  61. // rh.count==0说明上面的get仅仅是初始化,说明不是重入读锁
  62. // 先remove,然后直接在下面返回-1去排队
  63. if (rh.count == 0)
  64. readHolds.remove();
  65. }
  66. }
  67. if (rh.count == 0)
  68. return -1;
  69. }
  70. }
  71. if (sharedCount(c) == MAX_COUNT)
  72. throw new Error("Maximum lock count exceeded");
  73. // 下面处理重入
  74. if (compareAndSetState(c, c + SHARED_UNIT)) {
  75. if (sharedCount(c) == 0) {
  76. firstReader = current;
  77. firstReaderHoldCount = 1;
  78. } else if (firstReader == current) {
  79. firstReaderHoldCount++;
  80. } else {
  81. if (rh == null)
  82. rh = cachedHoldCounter;
  83. if (rh == null || rh.tid != current.getId())
  84. rh = readHolds.get();
  85. else if (rh.count == 0)
  86. readHolds.set(rh);
  87. rh.count++;
  88. cachedHoldCounter = rh; // cache for release
  89. }
  90. return 1;
  91. }
  92. }
  93. }

释放读锁

  1. // ReentrantReadWriteLock.ReadLock
  2. public void unlock() {
  3. sync.releaseShared(1);
  4. }
  5. // AQS
  6. public final boolean releaseShared(int arg) {
  7. if (tryReleaseShared(arg)) {
  8. doReleaseShared();
  9. return true;
  10. }
  11. return false;
  12. }
  1. // ReentrantReadWriteLock.Sync
  2. protected final boolean tryReleaseShared(int unused) {
  3. Thread current = Thread.currentThread();
  4. // 当前线程是firstReader
  5. if (firstReader == current) {
  6. // assert firstReaderHoldCount > 0;
  7. // 当前线程完全释放读锁,清空firstReader
  8. if (firstReaderHoldCount == 1)
  9. firstReader = null;
  10. else
  11. // 不是完全释放,就将重入次数减1
  12. firstReaderHoldCount--;
  13. } else {
  14. // 当前线程不是firstReader
  15. // 获取当前线程的HoldCounter
  16. HoldCounter rh = cachedHoldCounter;
  17. if (rh == null || rh.tid != current.getId())
  18. rh = readHolds.get();
  19. int count = rh.count;
  20. // 如果是完全释放,从ThreadLocal中移除
  21. if (count <= 1) {
  22. readHolds.remove();
  23. if (count <= 0)
  24. throw unmatchedUnlockException();
  25. }
  26. // 重入次数减1
  27. --rh.count;
  28. }
  29. // CAS修改状态
  30. for (;;) {
  31. int c = getState();
  32. int nextc = c - SHARED_UNIT;
  33. if (compareAndSetState(c, nextc))
  34. // 写锁和读锁全都释放光了,返回true帮助唤醒后继节点中的获取写锁的线程
  35. return nextc == 0;
  36. }
  37. }

四. 写锁

获取写锁

  1. // ReentrantReadWriteLock.WriteLock
  2. public void lock() {
  3. sync.acquire(1);
  4. }
  1. // ReentrantReadWriteLock.Sync
  2. protected final boolean tryAcquire(int acquires) {
  3. Thread current = Thread.currentThread();
  4. int c = getState();
  5. int w = exclusiveCount(c);
  6. if (c != 0) {
  7. // 有线程持有读锁(包括自己)或 写锁被其他线程占有
  8. if (w == 0 || current != getExclusiveOwnerThread())
  9. return false;
  10. if (w + exclusiveCount(acquires) > MAX_COUNT)
  11. throw new Error("Maximum lock count exceeded");
  12. // 当前线程重入写锁
  13. setState(c + acquires);
  14. return true;
  15. }
  16. // 由于公平锁策略需要排队 或 CAS失败
  17. if (writerShouldBlock() ||
  18. !compareAndSetState(c, c + acquires))
  19. return false;
  20. // 设置独占线程为当前线程
  21. setExclusiveOwnerThread(current);
  22. return true;
  23. }

释放写锁

  1. // ReentrantReadWriteLock.WriteLock
  2. public void unlock() {
  3. sync.release(1);
  4. }
  5. // AQS
  6. public final boolean release(int arg) {
  7. if (tryRelease(arg))
  8. // 如果写锁被完全释放,去唤醒队列后面的节点
  9. Node h = head;
  10. if (h != null && h.waitStatus != 0)
  11. unparkSuccessor(h);
  12. return true;
  13. }
  14. return false;
  15. }
  1. protected final boolean tryRelease(int releases) {
  2. // 不是由上锁的线程释放锁
  3. if (!isHeldExclusively())
  4. throw new IllegalMonitorStateException();
  5. int nextc = getState() - releases;
  6. // 写锁是否被完全释放,如果是清空独占线程
  7. boolean free = exclusiveCount(nextc) == 0;
  8. if (free)
  9. setExclusiveOwnerThread(null);
  10. setState(nextc);
  11. return free;
  12. }

五. 锁降级

一个线程获取写锁后,在写锁未释放之前可以继续获取读锁,这被称为锁降级。
但是一个线程获取读锁后,在读锁未释放之前不能获取写锁,因为在获取写锁的代码里,如果当前线程已获取读锁,会进入队列休眠,自己把自己弄休眠了,可能之后就没线程去唤醒它了,导致死锁。


0
2
0

添加评论

正在回复:
取消
3
0
2
0