ReentrantReadWriteLock源码解析

    xiaoxiao2024-12-24  2

    概述

      ReentrantReadWriteLock是Lock的另一种实现方式,我们已经知道了ReentrantLock是一个排他锁,同一时间只允许一个线程访问,而ReentrantReadWriteLock允许多个读线程同时访问,但不允许写线程和读线程、写线程和写线程同时访问。相对于排他锁,提高了并发性。在实际应用中,大部分情况下对共享数据(如缓存)的访问都是读操作远多于写操作,这时ReentrantReadWriteLock能够提供比排他锁更好的并发性和吞吐量。

      读写锁内部维护了两个锁,一个用于读操作,一个用于写操作。所有 ReadWriteLock实现都必须保证 writeLock操作的内存同步效果也要保持与相关 readLock的联系。也就是说,成功获取读锁的线程会看到写入锁之前版本所做的所有更新

      ReentrantReadWriteLock支持以下功能:

        1)支持公平和非公平的获取锁的方式;

        2)支持可重入。读线程在获取了读锁后还可以获取读锁;写线程在获取了写锁之后既可以再次获取写锁又可以获取读锁

        3)还允许从写入锁降级为读取锁,其实现方式是:先获取写入锁,然后获取读取锁,最后释放写入锁。但是,从读取锁升级到写入锁是不允许的;

        4)读取锁和写入锁都支持锁获取期间的中断;

        5)Condition支持。仅写入锁提供了一个 Conditon 实现;读取锁不支持 Conditon ,readLock().newCondition() 会抛出 UnsupportedOperationException。 

    示例(JDK自带示例)

    示例1:利用重入来执行升级缓存后的锁降级

    class CachedData {     Object data;     volatile boolean cacheValid;    //缓存是否有效     ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

        void processCachedData() {         rwl.readLock().lock();    //获取读锁         //如果缓存无效,更新cache;否则直接使用data         if (!cacheValid) {             // Must release read lock before acquiring write lock             //获取写锁前须释放读锁             rwl.readLock().unlock();             rwl.writeLock().lock();                 // Recheck state because another thread might have acquired             //   write lock and changed state before we did.             if (!cacheValid) {                 data = ...                 cacheValid = true;             }             // Downgrade by acquiring read lock before releasing write lock             //锁降级,在释放写锁前获取读锁             rwl.readLock().lock();             rwl.writeLock().unlock(); // Unlock write, still hold read         }

            use(data);         rwl.readLock().unlock();    //释放读锁     } }

    示例2:使用 ReentrantReadWriteLock 来提高 Collection 的并发性

    class RWDictionary {     private final Map<String, Data> m = new TreeMap<String, Data>();     private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();     private final Lock r = rwl.readLock();    //读锁     private final Lock w = rwl.writeLock();    //写锁

        public Data get(String key) {         r.lock();         try { return m.get(key); }         finally { r.unlock(); }     }     public String[] allKeys() {         r.lock();         try { return m.keySet().toArray(); }         finally { r.unlock(); }     }     public Data put(String key, Data value) {         w.lock();         try { return m.put(key, value); }         finally { w.unlock(); }     }     public void clear() {         w.lock();         try { m.clear(); }         finally { w.unlock(); }     } }

     

    源码解析

    ReentrantReadWriteLock 是基于AQS实现的(参考AbstractQueuedSynchronizer源码解析),它的自定义同步器(继承AQS)需要在同步状态(一个整型变量state)上维护多个读线程和一个写线程的状态,使得该状态的设计成为读写锁实现的关键。如果在一个整型变量上维护多种状态,就一定需要“按位切割使用”这个变量,读写锁将变量切分成了两个部分,高16位表示读,低16位表示写。

    public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {} { /** Inner class providing readlock */ private final ReentrantReadWriteLock.ReadLock readerLock; /** Inner class providing writelock */ private final ReentrantReadWriteLock.WriteLock writerLock; /** Performs all synchronization mechanics */ final Sync sync; }

    ReadLock与WriteLock

    public static class ReadLock implements Lock, java.io.Serializable { private static final long serialVersionUID = -5992448646407690164L; private final Sync sync; protected ReadLock(ReentrantReadWriteLock lock) { sync = lock.sync; } public void lock() { sync.acquireShared(1); } public void lockInterruptibly() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public boolean tryLock() { return sync.tryReadLock(); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } public void unlock() { sync.releaseShared(1); } public Condition newCondition() { throw new UnsupportedOperationException(); } } public static class WriteLock implements Lock, java.io.Serializable { private static final long serialVersionUID = -4992448646407690164L; private final Sync sync; protected WriteLock(ReentrantReadWriteLock lock) { sync = lock.sync; } public void lock() { sync.acquire(1); } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } public boolean tryLock( ) { return sync.tryWriteLock(); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } public void unlock() { sync.release(1); } public Condition newCondition() { return sync.newCondition(); } public boolean isHeldByCurrentThread() { return sync.isHeldExclusively(); } public int getHoldCount() { return sync.getWriteHoldCount(); } }

     ReadLock与WriteLock比较:

    ReadLock使用共享模式,WriteLock使用独占模式ReadLock与WriteLock使用同一个Sync

    public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); }

     

    Sync

    数据结构

       abstract static class Sync extends AbstractQueuedSynchronizer {         private static final long serialVersionUID = 6317671515068378041L;

            /*         SHARED_SHIFT相关变量与方法为锁状态访问逻辑。          */         //低16位为写锁状态,高16位为读锁状态。         static final int SHARED_SHIFT   = 16;         //读锁每次增加的单位(因为在高16位,所以加1,即相当于整个int加 1个SHARED_UNIT    )         static final int SHARED_UNIT    = (1 << SHARED_SHIFT);         //读锁最大数(2^16 -1 )         static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;         //写锁状态掩码(2^16-1)         static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

            /**返回读锁数量 ,高16位(无符号右移16位) */         static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }         /** 返回写锁数量

            *EXCLUSIVE_MASK,高位全是0,与操作返回写状态数量

            */         static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

            /**          * A counter for per-thread read hold counts.          * Maintained as a ThreadLocal; cached in cachedHoldCounter          */

           //当前读线程的计数器         static final class HoldCounter {             int count = 0;    //当前读线程总的重入次数。             //线程ID             final long tid = getThreadId(Thread.currentThread());         }

            /**          * ThreadLocal 子类          */         static final class ThreadLocalHoldCounter             extends ThreadLocal<HoldCounter> {             public HoldCounter initialValue() {                 return new HoldCounter();             }         }

            /**          */         private transient ThreadLocalHoldCounter readHolds;

           //当前线程缓存的HoldCounter ,         private transient HoldCounter cachedHoldCounter;

            /**          */         private transient Thread firstReader = null;         private transient int firstReaderHoldCount;

            Sync() {             readHolds = new ThreadLocalHoldCounter();             setState(getState()); // ensures visibility of readHolds         } }

    state结构

    读锁的获取与释放

    读锁的lock和unlock的实际实现对应Sync的 tryAcquireShared 和 tryReleaseShared方法。

    protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); 如果写锁线程数 != 0 ,且独占锁不是当前线程则返回失败,因为存在锁降级(先write,后read), if (exclusiveCount(c) != 0 && //获取写锁数量 getExclusiveOwnerThread() != current ) return -1; int r = sharedCount(c); //获取读锁数量。 if ( !readerShouldBlock() && //读操作不被阻塞。 r < MAX_COUNT && //读锁数量未到最大值 compareAndSetState(c, c + SHARED_UNIT) //竞争成功 ) { //r == 0,表示第一个读锁线程,第一个读锁firstRead是不会加入到readHolds中 if (r == 0) { firstReader = current; // 设置第一个读线程 firstReaderHoldCount = 1; // 读线程占用的资源数为1 } else if (firstReader == current) { // 当前线程为第一个读线程,表示第一个读锁线程重入,则第一个线程占用资源+1 firstReaderHoldCount++; } else { // 获取计数器 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) // 计数器为空或者计数器的tid不为当前正在运行的线程的tid,则获取当前 线程对应的计数器 cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) //计数为0,则表示未加入到缓存,加入到readHolds中 readHolds.set(rh); //把当前线程的计数加1 rh.count++; } //返回1,则表示acquire成功, return 1; } return fullTryAcquireShared(current); } final int fullTryAcquireShared(Thread current) { /* * This code is in part redundant with that in * tryAcquireShared but is simpler overall by not * complicating tryAcquireShared with interactions between * retries and lazily reading hold counts. */ HoldCounter rh = null; for (;;) { int c = getState(); if (exclusiveCount(c) != 0) { //写线程不是本线程,则返回-1 if (getExclusiveOwnerThread() != current) return -1; // else we hold the exclusive lock; blocking here // would cause deadlock. } else if (readerShouldBlock()) { // 写线程数量为0并且读线程被阻塞 // Make sure we're not acquiring read lock reentrantly if (firstReader == current) { //第一个读线程是本线程。 // assert firstReaderHoldCount > 0; } else { //第一个读线程不是本线程。 if (rh == null) { //获取缓存hold rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { //获取本线程的Hold rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); //如果为0,则需要移出。 } } if (rh.count == 0) return -1; } } if (sharedCount(c) == MAX_COUNT) // 读锁数量为最大值,抛出异常 throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { // 比较并且设置成功 if (sharedCount(c) == 0) { // 读线程数量为0,设置第一个读线程 firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { //第一个读线程计数加1. firstReaderHoldCount++; } else { //不是第一个线程,则获取计数器。 if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }

     

    读锁acquire流程图

     

    注意:

    更新成功后会在firstReaderHoldCount中或readHolds(ThreadLocal类型的)的本线程副本中记录当前线程重入数(23行至43行代码),这是为了实现jdk1.6中加入的getReadHoldCount()方法的,这个方法能获取当前线程重入共享锁的次数(state中记录的是多个线程的总重入次数),加入了这个方法让代码复杂了不少,但是其原理还是很简单的:如果当前只有一个线程的话,还不需要动用ThreadLocal,直接往firstReaderHoldCount这个成员变量里存重入数,当有第二个线程来的时候,就要动用ThreadLocal变量readHolds了,每个线程拥有自己的副本,用来保存自己的重入数

     

    protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; } }

     读锁的释放工作,主要是包括更新计数器的值(重入次数),以及更新state(总的读锁重入次数)

    计数器:HoldCounter

    在读锁的获取、释放过程中,总是会有一个对象存在着,同时该对象在获取线程获取读锁是+1,释放读锁时-1,该对象就是HoldCounter。

    要明白HoldCounter就要先明白读锁。前面提过读锁的内在实现机制就是共享锁,对于共享锁其实我们可以稍微的认为它不是一个锁的概念,它更加像一个计数器的概念。一次共享锁操作就相当于一次计数器的操作,获取共享锁计数器+1,释放共享锁计数器-1。只有当线程获取共享锁后才能对共享锁进行释放、重入操作。所以HoldCounter的作用就是当前线程持有共享锁的数量,这个数量必须要与线程绑定在一起,否则操作其他线程锁就会抛出异常。

    static final class HoldCounter { int count = 0; final long tid = Thread.currentThread().getId(); } static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } } private transient ThreadLocalHoldCounter readHolds; private transient HoldCounter cachedHoldCounter;

    在HoldCounter中仅有count和tid两个变量,其中count代表着计数器,tid是线程的id。但是如果要将一个对象和线程绑定起来仅记录tid肯定不够的,而且HoldCounter根本不能起到绑定对象的作用,只是记录线程tid而已。

    诚然,在java中,我们知道如果要将一个线程和对象绑定在一起只有ThreadLocal才能实现 。ThreadLocalHoldCounter继承ThreadLocal,并且重写了initialValue方法。

    故而,HoldCounter应该就是绑定线程上的一个计数器,而ThradLocalHoldCounter则是线程绑定的ThreadLocal。从上面我们可以看到ThreadLocal将HoldCounter绑定到当前线程上,同时HoldCounter也持有线程Id,这样在释放锁的时候才能知道ReadWriteLock里面缓存的上一个读取线程(cachedHoldCounter)是否是当前线程。这样做的好处是可以减少ThreadLocal.get()的次数,因为这也是一个耗时操作。需要说明的是这样HoldCounter绑定线程id而不绑定线程对象的原因是避免HoldCounter和ThreadLocal互相绑定而GC难以释放它们(尽管GC能够智能的发现这种引用而回收它们,但是这需要一定的代价),所以其实这样做只是为了帮助GC快速回收对象而已。

    写锁的获取与释放

    写锁的lock和unlock调用的独占式同步状态的获取与释放,因此真实的实现就是Sync的 tryAcquire和 tryRelease。

    protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); //写线程数量(即获取独占锁的重入数) int w = exclusiveCount(c); //当前同步状态state != 0,说明已经有其他线程获取了读锁或写锁 if (c != 0) { // 当前state不为0,此时:如果写锁状态为0说明读锁此时被占用返回false; // 如果写锁状态不为0且写锁没有被当前线程持有返回false if ( w == 0 // w==0 ,说明写锁为0,c!=0 说明读锁正在操作 || current != getExclusiveOwnerThread() // w != 0 说明有写锁在操作,但是不是本线程,则返回false。 ) return false; //判断同一线程获取写锁是否超过最大次数(65535),支持可重入 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); //此时当前线程已持有写锁,现在是重入,所以只需要修改锁的数量即可。 setState(c + acquires); return true; } //到这里说明此时c=0,读锁和写锁都没有被获取 if ( writerShouldBlock() //判断写是否阻塞 || !compareAndSetState(c, c + acquires) //竞争失败, ) return false; //设置写锁为当前线程所有 setExclusiveOwnerThread(current); return true; }

    更新写锁重入次数以及state

    protected final boolean tryRelease(int releases) { //若锁的持有者不是当前线程,抛出异常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //写锁的新线程数 int nextc = getState() - releases; //如果独占模式重入数为0了,说明独占模式被释放 boolean free = exclusiveCount(nextc) == 0; if (free) //若写锁的新线程数为0,则将锁的持有者设置为null setExclusiveOwnerThread(null); //设置写锁的新线程数 //不管独占模式是否被释放,更新独占重入数 setState(nextc); return free; }

    公平锁与非公平锁

    NonfairSync

    static final class NonfairSync extends Sync { private static final long serialVersionUID = -8159625535654395037L; final boolean writerShouldBlock() { return false; // writers can always barge } final boolean readerShouldBlock() { return apparentlyFirstQueuedIsExclusive(); } }

    FairSync

    static final class FairSync extends Sync { private static final long serialVersionUID = -2274990926593161451L; final boolean writerShouldBlock() { return hasQueuedPredecessors(); } final boolean readerShouldBlock() { return hasQueuedPredecessors(); } }

    NonFairSync与FairSync的区别:writerShouldBlock()与readerShouldBlock()方法的实现不同

    readerShouldBlock

    NonfairSync.readerShouldBlock

           调用:apparentlyFirstQueuedIsExclusive,如果队列中第一个节点是独占式,则返回true,堵塞读锁。

    //判断是否:队列中第一个等待节点是独占模式的 final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; return (h = head) != null && //head节点不为空。 (s = h.next) != null && //head的后置节点不为空 !s.isShared() && //head的后置节点不是共享模式 s.thread != null; //head的后置节点关联了线程。 }

    调用apparentlyFirstQueuedIsExclusive方法, 为了防止写线程饥饿等待,如果同步队列中的第一个线程是以独占模式获取锁(写锁),那么当前获取读锁的线程需要阻塞,让队列中的第一个线程先执行。

    FairSync.readerShouldBlock

           调用:hasQueuedPredecessors,判断是否是等待队列中是否有前置节点,有则返回true

    writerShouldBlock

    NonfairSync.writerShouldBlock

           直接返回false,说明非公平锁不堵塞写锁。

    FairSync.writerShouldBlock

           调用:hasQueuedPredecessors,判断是否是等待队列中是否有前置节点,有则返回true

     

     

     

    最新回复(0)