源码分析AbstractQuenedSynchronized(一)

    xiaoxiao2025-04-23  17

    源码分析AbstractQuenedSynchronized(二) 源码分析AbstractQuenedSynchronized(三)

    文章目录

    AbstractQuenedSynchronized数据结构模板方法 ReentrantLockReentrantLock获得公平锁ReentrantLock释放公平锁 公平锁和非公平锁公平锁的争锁过程非公平锁的争锁过程总结

    从 ReentrantLock 的公平锁源码出发,分析AbstractQueuedSynchronizer 这个类如何工作

    AbstractQuenedSynchronized

    该类是一个抽象类,简称AQS,是Java并发包的基础工具类,是实现ReentrantLock、CountDownLatch、Semaphore、FutureTask等类的基础。

    数据结构

    双向队列,结点Node包含:pre+next+thread+waitStatus

    图片来源https://javadoop.com/post/AbstractQueuedSynchronizer

    static final class Node { //标识结点在共享模式下 static final Node SHARED = new Node(); //标识结点在独占模式下 static final Node EXCLUSIVE = null; //================WaitStatus的状态==================// static final int CANCELLED = 1;//取消争抢锁 static final int SIGNAL = -1;//当前node的后继结点对应的线程需要被唤醒 static final int CONDITION = -2;//结点在等待队列中,结点线程等待在Condition上 static final int PROPAGATE = -3;//下一次共享式同步状态获取将会无条件地被传播下去 volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter;//用于条件队列 }

    模板方法

    //这些模板方法的作用体现在下面对ReentrantLock的分析中 void acquire(int arg) ***->子类会覆写其中的boolean tryAcquire(arg)方法 boolean release(int arg) ***->子类会覆写其中的boolean tryRelease(arg)方法 Node addWaiter(Node mode)//循环CAS操作将结点加入同步队列尾部 Node enq(final Node node)//addWaiter中的方法 boolean acquireQueued(final Node node, int arg)//循环CAS操作更新同步队列 boolean shouldParkAfterFailedAcquire()//判断是否需要被阻塞 boolean parkAndCheckInterrupt()//阻塞当前线程然后判断线程等待状态时是否被中断 void unparkSuccessor(Node node)//唤醒后继结点

    ReentrantLock

    默认非公平锁,通过构造器参数中的布尔值来确定锁是否公平,通过内部类Sync来管理锁,真正的获取锁和释放锁由Sync两个实现类来实现,

    public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }

    ReentrantLock获得公平锁

    static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; //争锁 final void lock() { acquire(1); } /*========调用父类AQS的acquire方法,注意这里面涉及到3个方法:tryAcquire()【子类覆写父类AQS】、addWaiter()【父类AQS】、acquireQueued()【父类AQS】,下面一一讲解========== public final void acquire(int arg) { if (!tryAcquire(arg) &&//调用成功则不需要排队 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//否则将该线程包装成结点添加到同步队列中。Node.EXCLUSIVE=null; selfInterrupt(); } */ /** * 覆写了父类AQS的tryAcquire方法 * 两种情况下说明尝试获得锁成功,1.没有线程持有锁,也就是没有线程在等待锁 2.进行锁重入 */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState();//获取等待状态 //1.state==0,没有线程持有锁 if (c == 0) { if (!hasQueuedPredecessors() &&//没有线程在等待锁 //尝试CAS操作获取锁 compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current);//获取成功则标记当前线程已持有锁 return true; } } //2.该分支说明锁可重入了,state=state+1; else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } /* ==================调用父类AQS的addWaiter方法,该方法的功能是通过循环CAS将新的结点加入到同步队列中,这里面又有一个enq()方法==================== private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) {//同步队列不为空 node.prev = pred; if (compareAndSetTail(pred, node)) {//如果CAS操作将新的结点加入到队列末尾则直接返回该结点 //进入到这里说明设置成功,tail==node pred.next = node; return node; } } //执行到这里说明:要么加入该结点前队列为空;要么CAS操作没有成功 enq(node); return node; } */ /*======================父类AQS的enq方法=============================== * 进入到这个方法只有两种可能:1.队列为空 2.有线程竞争入队 * 采用自旋方式入队:CAS设置tail过程中,竞争一次竞争不到,就多次竞争,总会排到的 * @return node's predecessor private Node enq(final Node node) { //循环CAS for (;;) { Node t = tail; if (t == null) { // Must initialize 初始时队列为空,head和tail都为空,因此第一个结点加入队列后肯定会进入到该方法进行初始化 if (compareAndSetHead(new Node())) tail = head;//新实例化一个Node对象作为头结点,这个时候head.waitStatus==0,表示初始状态, //!!!!!!!!!!注意没有return!!!!!!!!!!!!!! 继续for循环!让多线程从head结点后面开始排队 } else { //下一个线程进来向获取锁,发现同步队列中尾结点不为空,就会插入到队列尾进行排队。当多个线程同时进来竞争时,通过CAS循环,将排队操作串行化 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } */ /*========================父类AQS的方法acquireQueued,该方法的作用是循环CAS操作不断尝试去更新同步队列的head,即不断尝试去获得锁======================================= final boolean acquireQueued(final Node node, int arg) { boolean failed = true;//标记是否成功获取同步状态,初始时设置为failed==true,即未获取同步状态 try { boolean interrupted = false;//标记等待过程中是否被中断过 for (;;) { final Node p = node.predecessor();//p是node的前驱结点 //如果p是head,也就是说node是阻塞队列的第一个结点,则尝试获取同步状态 //注意:我们认为阻塞队列不包含head结点,head一般指的是占有同步状态的线程,head后面的才称为阻塞队列 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false;//成功获取同步状态 return interrupted; } //代码执行到这里,说明node尝试获取锁失败,原因有两个:1. node不是队头 2.与其他线程争取同步状态失败 if (shouldParkAfterFailedAcquire(p, node) //判断当前线程是否需要被挂起,需要挂起则返回true && parkAndCheckInterrupt())//执行当前线程的挂起,调用LockSupport的park方法,等待被唤醒 interrupted = true; } } finally { if (failed) cancelAcquire(node); } } */ /*============================父类AQS的shouldParkAfterFailedAcquire()方法========================== private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; //前驱结点状态为signal,则当前结点可以被挂起,等待前驱结点释放同步状态或者被中断 if (ws == Node.SIGNAL) return true; //前驱结点状态>0,则说明前驱结点取消了排队,因此继续往前找,直到找到一个状态<=0的结点作为前驱结点 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 进入这个分支意味着等待状态为0,-2,-3 // 由于之前都没有设置waitStatus,所以每个新的node入队时,waitStatus都是0 // 正常情况下前驱结点是之前的tail,它的waitStatus应该是0 // CAS操作将前驱结点的waitStatus设置为Node.Signal(也就是-1) // 然后从该方法中返回false,进入上层方法的for循环,又重新进入该方法从从一个分支返回true compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } */ /*===================父类AQS的parkAndCheckInterrupt()方法============================= private final boolean parkAndCheckInterrupt() { LockSupport.park(this);//利用LockSupport工具阻塞当前线程,只有调用unpark()或者当前线程被中断才能够从park()方法返回 return Thread.interrupted(); } */ }

    ReentrantLock释放公平锁

    public void unlock() { sync.release(1); /*=================父类AQS的release()方法。里面又有tryRelease()方法【子类Sync覆写】和unparkSuccessor()方法===================== public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } */ /*====================Sync类的tryRelease()方法,重写了父类AQS的tryRelease方法================== protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //c==0说明完全释放锁 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } */ /*============================父类AQS的unparkSuccessor()方法========================== private void unparkSuccessor(Node node) {//此处node为头结点head int ws = node.waitStatus; if (ws < 0)//头结点waitStatus小于0则将其设为0 compareAndSetWaitStatus(node, ws, 0); Node s = node.next;//s为同步队列中的第一个结点 //如果s为空或者状态大于0即取消了等待,那么从队列尾部向前寻找waitStatus<0结点中排在最前面的 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //唤醒后继结点,后继结点在acquireQueued方法中通过for循环继续执行,发现前继结点是head从而获取锁 if (s != null) LockSupport.unpark(s.thread); } */ }

    公平锁和非公平锁

    公平锁的争锁过程

    static final class FairSync extends Sync { //争锁 final void lock() { acquire(1); } /*========调用父类AQS的acquire方法,注意这里面涉及到3个方法:tryAcquire()【子类覆写父类AQS】、addWaiter()【父类AQS】、acquireQueued()【父类AQS】========== public final void acquire(int arg) { if (!tryAcquire(arg) &&//调用成功则不需要排队 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//否则将该线程包装成结点添加到同步队列中。Node.EXCLUSIVE=null; selfInterrupt(); } */ /** * 覆写了父类AQS的tryAcquire方法 * 两种情况下说明尝试获得锁成功,1.没有线程持有锁,也就是没有线程在等待锁 2.进行锁重入 */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState();//获取等待状态 //1.state==0,没有线程持有锁 if (c == 0) { 【对比】非公平锁在此处没有对等待线程的判断 if (!hasQueuedPredecessors() &&//没有线程在等待锁 //尝试CAS操作获取锁 compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current);//获取成功则标记当前线程已持有锁 return true; } } //2.该分支说明锁可重入了,state=state+1; else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }

    非公平锁的争锁过程

    static final class NonfairSync extends Sync { /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1))【对比】和公平锁相比,并没有对锁的等待状态进行判断,而是直接调用一次CAS,尝试去获得锁 setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } /*========调用父类AQS的acquire方法,注意这里面涉及到3个方法:tryAcquire()【子类覆写父类AQS】、addWaiter()【父类AQS】、acquireQueued()【父类AQS】========== public final void acquire(int arg) { if (!tryAcquire(arg) &&//调用成功则不需要排队 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//否则将该线程包装成结点添加到同步队列中。Node.EXCLUSIVE=null; selfInterrupt(); } */ protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) {【对比】和公平锁相比,这里没有对阻塞队列进行判断,即根本不管有没有线程在等待锁 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }

    总结

    非公平锁与公平锁的lock方法有以下两点不同:

    非公平锁: 进入lock方法后直接尝试CAS获得锁,成功则直接返回; 公平锁: 进入lock方法后首先要获取锁的等待状态state,判断是否有线程在等待锁(state是否为0) 非公平锁: CAS未成功后进入父类AQS的acquire方法,然后调用自己的nonfairTryAcquire方法,如果暂时没有线程持有锁或者当前锁可重入,则直接获得锁,也不会对阻塞队列有没有线程在等待锁作判断,如果没有获得锁则后续步骤和公平锁一样,将当前线程封装成结点插入到队列中自旋等待。 公平锁: 在步骤1的基础上,当锁的等待状态为0,即未有线程持有锁时,会进而判断阻塞队列中有没有线程在等待锁

    对比之下,非公平锁的吞吐量更大,但是也有可能使阻塞队列中的线程长期处于饥饿状态。

    参考:https://javadoop.com/post/AbstractQueuedSynchronizer

    最新回复(0)