ReentrantReadWriteLock是Lock的另一种实现方式,我们已经知道了ReentrantLock是一个排他锁,同一时间只允许一个线程访问,而ReentrantReadWriteLock允许多个读线程同时访问,但不允许写线程和读线程、写线程和写线程同时访问。相对于排他锁,提高了并发性。在实际应用中,大部分情况下对共享数据(如缓存)的访问都是读操作远多于写操作,这时ReentrantReadWriteLock能够提供比排他锁更好的并发性和吞吐量。
读写锁内部维护了两个锁,一个用于读操作,一个用于写操作。所有 ReadWriteLock实现都必须保证 writeLock操作的内存同步效果也要保持与相关 readLock的联系。也就是说,成功获取读锁的线程会看到写入锁之前版本所做的所有更新。
ReentrantReadWriteLock支持以下功能:
1)支持公平和非公平的获取锁的方式;
2)支持可重入。读线程在获取了读锁后还可以获取读锁;写线程在获取了写锁之后既可以再次获取写锁又可以获取读锁;
3)还允许从写入锁降级为读取锁,其实现方式是:先获取写入锁,然后获取读取锁,最后释放写入锁。但是,从读取锁升级到写入锁是不允许的;
4)读取锁和写入锁都支持锁获取期间的中断;
5)Condition支持。仅写入锁提供了一个 Conditon 实现;读取锁不支持 Conditon ,readLock().newCondition() 会抛出 UnsupportedOperationException。
class CachedData { Object data; volatile boolean cacheValid; //缓存是否有效 ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() { rwl.readLock().lock(); //获取读锁 //如果缓存无效,更新cache;否则直接使用data if (!cacheValid) { // Must release read lock before acquiring write lock //获取写锁前须释放读锁 rwl.readLock().unlock(); rwl.writeLock().lock(); // Recheck state because another thread might have acquired // write lock and changed state before we did. if (!cacheValid) { data = ... cacheValid = true; } // Downgrade by acquiring read lock before releasing write lock //锁降级,在释放写锁前获取读锁 rwl.readLock().lock(); rwl.writeLock().unlock(); // Unlock write, still hold read }
use(data); rwl.readLock().unlock(); //释放读锁 } }
class RWDictionary { private final Map<String, Data> m = new TreeMap<String, Data>(); private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); private final Lock r = rwl.readLock(); //读锁 private final Lock w = rwl.writeLock(); //写锁
public Data get(String key) { r.lock(); try { return m.get(key); } finally { r.unlock(); } } public String[] allKeys() { r.lock(); try { return m.keySet().toArray(); } finally { r.unlock(); } } public Data put(String key, Data value) { w.lock(); try { return m.put(key, value); } finally { w.unlock(); } } public void clear() { w.lock(); try { m.clear(); } finally { w.unlock(); } } }
ReentrantReadWriteLock 是基于AQS实现的(参考AbstractQueuedSynchronizer源码解析),它的自定义同步器(继承AQS)需要在同步状态(一个整型变量state)上维护多个读线程和一个写线程的状态,使得该状态的设计成为读写锁实现的关键。如果在一个整型变量上维护多种状态,就一定需要“按位切割使用”这个变量,读写锁将变量切分成了两个部分,高16位表示读,低16位表示写。
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {} { /** Inner class providing readlock */ private final ReentrantReadWriteLock.ReadLock readerLock; /** Inner class providing writelock */ private final ReentrantReadWriteLock.WriteLock writerLock; /** Performs all synchronization mechanics */ final Sync sync; }ReadLock与WriteLock比较:
ReadLock使用共享模式,WriteLock使用独占模式ReadLock与WriteLock使用同一个Sync public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); }
数据结构
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 6317671515068378041L;
/* SHARED_SHIFT相关变量与方法为锁状态访问逻辑。 */ //低16位为写锁状态,高16位为读锁状态。 static final int SHARED_SHIFT = 16; //读锁每次增加的单位(因为在高16位,所以加1,即相当于整个int加 1个SHARED_UNIT ) static final int SHARED_UNIT = (1 << SHARED_SHIFT); //读锁最大数(2^16 -1 ) static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; //写锁状态掩码(2^16-1) static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/**返回读锁数量 ,高16位(无符号右移16位) */ static int sharedCount(int c) { return c >>> SHARED_SHIFT; } /** 返回写锁数量
*EXCLUSIVE_MASK,高位全是0,与操作返回写状态数量
*/ static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
/** * A counter for per-thread read hold counts. * Maintained as a ThreadLocal; cached in cachedHoldCounter */
//当前读线程的计数器 static final class HoldCounter { int count = 0; //当前读线程总的重入次数。 //线程ID final long tid = getThreadId(Thread.currentThread()); }
/** * ThreadLocal 子类 */ static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } }
/** */ private transient ThreadLocalHoldCounter readHolds;
//当前线程缓存的HoldCounter , private transient HoldCounter cachedHoldCounter;
/** */ private transient Thread firstReader = null; private transient int firstReaderHoldCount;
Sync() { readHolds = new ThreadLocalHoldCounter(); setState(getState()); // ensures visibility of readHolds } }
state结构读锁的lock和unlock的实际实现对应Sync的 tryAcquireShared 和 tryReleaseShared方法。
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); 如果写锁线程数 != 0 ,且独占锁不是当前线程则返回失败,因为存在锁降级(先write,后read), if (exclusiveCount(c) != 0 && //获取写锁数量 getExclusiveOwnerThread() != current ) return -1; int r = sharedCount(c); //获取读锁数量。 if ( !readerShouldBlock() && //读操作不被阻塞。 r < MAX_COUNT && //读锁数量未到最大值 compareAndSetState(c, c + SHARED_UNIT) //竞争成功 ) { //r == 0,表示第一个读锁线程,第一个读锁firstRead是不会加入到readHolds中 if (r == 0) { firstReader = current; // 设置第一个读线程 firstReaderHoldCount = 1; // 读线程占用的资源数为1 } else if (firstReader == current) { // 当前线程为第一个读线程,表示第一个读锁线程重入,则第一个线程占用资源+1 firstReaderHoldCount++; } else { // 获取计数器 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) // 计数器为空或者计数器的tid不为当前正在运行的线程的tid,则获取当前 线程对应的计数器 cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) //计数为0,则表示未加入到缓存,加入到readHolds中 readHolds.set(rh); //把当前线程的计数加1 rh.count++; } //返回1,则表示acquire成功, return 1; } return fullTryAcquireShared(current); } final int fullTryAcquireShared(Thread current) { /* * This code is in part redundant with that in * tryAcquireShared but is simpler overall by not * complicating tryAcquireShared with interactions between * retries and lazily reading hold counts. */ HoldCounter rh = null; for (;;) { int c = getState(); if (exclusiveCount(c) != 0) { //写线程不是本线程,则返回-1 if (getExclusiveOwnerThread() != current) return -1; // else we hold the exclusive lock; blocking here // would cause deadlock. } else if (readerShouldBlock()) { // 写线程数量为0并且读线程被阻塞 // Make sure we're not acquiring read lock reentrantly if (firstReader == current) { //第一个读线程是本线程。 // assert firstReaderHoldCount > 0; } else { //第一个读线程不是本线程。 if (rh == null) { //获取缓存hold rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { //获取本线程的Hold rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); //如果为0,则需要移出。 } } if (rh.count == 0) return -1; } } if (sharedCount(c) == MAX_COUNT) // 读锁数量为最大值,抛出异常 throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { // 比较并且设置成功 if (sharedCount(c) == 0) { // 读线程数量为0,设置第一个读线程 firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { //第一个读线程计数加1. firstReaderHoldCount++; } else { //不是第一个线程,则获取计数器。 if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }读锁acquire流程图
注意:
更新成功后会在firstReaderHoldCount中或readHolds(ThreadLocal类型的)的本线程副本中记录当前线程重入数(23行至43行代码),这是为了实现jdk1.6中加入的getReadHoldCount()方法的,这个方法能获取当前线程重入共享锁的次数(state中记录的是多个线程的总重入次数),加入了这个方法让代码复杂了不少,但是其原理还是很简单的:如果当前只有一个线程的话,还不需要动用ThreadLocal,直接往firstReaderHoldCount这个成员变量里存重入数,当有第二个线程来的时候,就要动用ThreadLocal变量readHolds了,每个线程拥有自己的副本,用来保存自己的重入数。
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; } }
读锁的释放工作,主要是包括更新计数器的值(重入次数),以及更新state(总的读锁重入次数)
在读锁的获取、释放过程中,总是会有一个对象存在着,同时该对象在获取线程获取读锁是+1,释放读锁时-1,该对象就是HoldCounter。
要明白HoldCounter就要先明白读锁。前面提过读锁的内在实现机制就是共享锁,对于共享锁其实我们可以稍微的认为它不是一个锁的概念,它更加像一个计数器的概念。一次共享锁操作就相当于一次计数器的操作,获取共享锁计数器+1,释放共享锁计数器-1。只有当线程获取共享锁后才能对共享锁进行释放、重入操作。所以HoldCounter的作用就是当前线程持有共享锁的数量,这个数量必须要与线程绑定在一起,否则操作其他线程锁就会抛出异常。
static final class HoldCounter { int count = 0; final long tid = Thread.currentThread().getId(); } static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } } private transient ThreadLocalHoldCounter readHolds; private transient HoldCounter cachedHoldCounter;在HoldCounter中仅有count和tid两个变量,其中count代表着计数器,tid是线程的id。但是如果要将一个对象和线程绑定起来仅记录tid肯定不够的,而且HoldCounter根本不能起到绑定对象的作用,只是记录线程tid而已。
诚然,在java中,我们知道如果要将一个线程和对象绑定在一起只有ThreadLocal才能实现 。ThreadLocalHoldCounter继承ThreadLocal,并且重写了initialValue方法。
故而,HoldCounter应该就是绑定线程上的一个计数器,而ThradLocalHoldCounter则是线程绑定的ThreadLocal。从上面我们可以看到ThreadLocal将HoldCounter绑定到当前线程上,同时HoldCounter也持有线程Id,这样在释放锁的时候才能知道ReadWriteLock里面缓存的上一个读取线程(cachedHoldCounter)是否是当前线程。这样做的好处是可以减少ThreadLocal.get()的次数,因为这也是一个耗时操作。需要说明的是这样HoldCounter绑定线程id而不绑定线程对象的原因是避免HoldCounter和ThreadLocal互相绑定而GC难以释放它们(尽管GC能够智能的发现这种引用而回收它们,但是这需要一定的代价),所以其实这样做只是为了帮助GC快速回收对象而已。
写锁的lock和unlock调用的独占式同步状态的获取与释放,因此真实的实现就是Sync的 tryAcquire和 tryRelease。
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); //写线程数量(即获取独占锁的重入数) int w = exclusiveCount(c); //当前同步状态state != 0,说明已经有其他线程获取了读锁或写锁 if (c != 0) { // 当前state不为0,此时:如果写锁状态为0说明读锁此时被占用返回false; // 如果写锁状态不为0且写锁没有被当前线程持有返回false if ( w == 0 // w==0 ,说明写锁为0,c!=0 说明读锁正在操作 || current != getExclusiveOwnerThread() // w != 0 说明有写锁在操作,但是不是本线程,则返回false。 ) return false; //判断同一线程获取写锁是否超过最大次数(65535),支持可重入 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); //此时当前线程已持有写锁,现在是重入,所以只需要修改锁的数量即可。 setState(c + acquires); return true; } //到这里说明此时c=0,读锁和写锁都没有被获取 if ( writerShouldBlock() //判断写是否阻塞 || !compareAndSetState(c, c + acquires) //竞争失败, ) return false; //设置写锁为当前线程所有 setExclusiveOwnerThread(current); return true; }更新写锁重入次数以及state
protected final boolean tryRelease(int releases) { //若锁的持有者不是当前线程,抛出异常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //写锁的新线程数 int nextc = getState() - releases; //如果独占模式重入数为0了,说明独占模式被释放 boolean free = exclusiveCount(nextc) == 0; if (free) //若写锁的新线程数为0,则将锁的持有者设置为null setExclusiveOwnerThread(null); //设置写锁的新线程数 //不管独占模式是否被释放,更新独占重入数 setState(nextc); return free; }NonFairSync与FairSync的区别:writerShouldBlock()与readerShouldBlock()方法的实现不同
调用:apparentlyFirstQueuedIsExclusive,如果队列中第一个节点是独占式,则返回true,堵塞读锁。
//判断是否:队列中第一个等待节点是独占模式的 final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; return (h = head) != null && //head节点不为空。 (s = h.next) != null && //head的后置节点不为空 !s.isShared() && //head的后置节点不是共享模式 s.thread != null; //head的后置节点关联了线程。 }调用apparentlyFirstQueuedIsExclusive方法, 为了防止写线程饥饿等待,如果同步队列中的第一个线程是以独占模式获取锁(写锁),那么当前获取读锁的线程需要阻塞,让队列中的第一个线程先执行。
FairSync.readerShouldBlock调用:hasQueuedPredecessors,判断是否是等待队列中是否有前置节点,有则返回true
直接返回false,说明非公平锁不堵塞写锁。
FairSync.writerShouldBlock调用:hasQueuedPredecessors,判断是否是等待队列中是否有前置节点,有则返回true