CAS与AQS源码简析

    xiaoxiao2022-07-14  154

    什么是CAS? CAS(Compare And Swap),顾名思义就是比较并交换。用于解决多线程使用锁带来的性能损耗的问题,是一种非阻塞算法,其交换原理如下图:  

        CAS用法: - 数据库中的乐观锁:即表字段+version字段,然后每次更新时就比较当前version版本是否一致,一直才更新并且升级version=version+1。   - unsafe的用法:https://blog.csdn.net/qq_28666081/article/details/83119734   - java中用到CAS的类如:java.util.concurrent.atomic.*  
    什么是AQS? AQS(AbstractQueuedSynchronizer),顾名思义就是抽象队列同步器。由FIFO(先进先出)的阻塞队列和相关同步器组成。这是在concurrent包(并发处理)下。  

        AbstractQueuedSynchronizer为锁机制维护了一个队列,需要获取锁的线程们排在队列中,只有排在队首的线程才有资格获取锁。

        首先看张图,取自《Java并发编程的艺术》:  

        然后看如下,AbstractQueuedSynchronizer源码及其分析如下: /** * 提供一个阻塞锁和相关依赖FIFO等待队列同步器的实现。 * 这个类支持排他和共享模式。排他模式下当一个已获取到了,其他线程尝试获取不可能成功。共享模式可以被多个线程获取。通常子类实现仅支持其中一种,但是也有两种的支持的如ReadWriteLock。 * 这个类定义了一个实现了Condition的内部类ConditionObject,用于排他模式。     * 使用一个基础的同步器需要重新定义以下方法: * <li> {@link #tryAcquire} * <li> {@link #tryRelease} * <li> {@link #tryAcquireShared} * <li> {@link #tryReleaseShared} * <li> {@link #isHeldExclusively} * 以上的每个方法均默认抛出{UnsupportedOperationException}错误,所以以上的几个方法没有提供默认实现,需要子类重写。 * 这个类提供了一个有效的、可伸缩的基础给同步器如状态、acquire获取和的同步器释放参数、内部FIFO等待队。当这些不够用时,可使用atomic、Queue、LockSupport。 */ public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {     private static final long serialVersionUID = 7373984972572414691L;       protected AbstractQueuedSynchronizer() { }       /**      * 等待队列的node class。Node作为等待队列的节点      * 这个等待队列是CLH的变体,CLH一般用于自旋锁。使用其代替一般的同步器,但也用了相同的策略来控制。       */     static final class Node {         /** 共享模式 */         static final Node SHARED = new Node();         /** 排他模式 */         static final Node EXCLUSIVE = null;         /** 当前线程被取消 */         static final int CANCELLED =  1;         /** 当前节点的后继节点包含的线程需要运行*/         static final int SIGNAL    = -1;         /** 当前结点在condition队列中 */         static final int CONDITION = -2;         /** 当前场景下后续的acquireShared能够得以执行  */         static final int PROPAGATE = -3;           /** 当前节点的状态。*/         volatile int waitStatus;         /** 前驱结点 */         volatile Node prev;         /** 后继节点 */         volatile Node next;         /** 入队线程 */         volatile Thread thread;         /** 存储condition队列中的后继节点 */         Node nextWaiter;           final boolean isShared() { return nextWaiter == SHARED;}         /**          * 返回前驱节点          */         final Node predecessor() throws NullPointerException {             Node p = prev;             if (p == null)                 throw new NullPointerException();             else                 return p;         }         ..............................     }     /**      * 仅用于初始化等待队列的head。只能通过setHead修改,当这个head还存在时不能将waitStatus=>cancelled      */     private transient volatile Node head;     /** Tail节点初始化,仅能通过enq追加新的wait node*/     private transient volatile Node tail;     /** synchronization state. */     private volatile int state;       /** CAS原子性的修改 synchronization state ,拉到代码最下面可见其值的设置*/     protected final boolean compareAndSetState(int expect, int update) {         return unsafe.compareAndSwapInt(this, stateOffset, expect, update);     }     static final long spinForTimeoutThreshold = 1000L;       /**      * 为队列追加node节点      */     private Node enq(final Node node) {         for (;;) {//一直循环入队,直到成功             Node t = tail;             if (t == null) { //同样获取尾节点,并且如果为空就将尾节点初始化为头结点head一样                 if (compareAndSetHead(new Node()))                     tail = head;             } else { //尾节点不为空就执行addWaiter一样的过程把新的node加到最后                 node.prev = t;                 if (compareAndSetTail(t, node)) {                     t.next = node;                     return t;                 }             }         }     }       /**      * 新建Node并入队      */     private Node addWaiter(Node mode) {         //新建一个Node         Node node = new Node(Thread.currentThread(), mode);         // 存储当前尾节点(当作旧的尾节点)         Node pred = tail;         if (pred != null) {  //如果当前尾节点不为空             node.prev = pred;  //将新建的节点的前驱节点执行旧的为节点             if (compareAndSetTail(pred, node)) {//CAS原子替换当前尾节点从旧的替换到新建node的位置                 pred.next = node;//将旧的尾节点位置的后置节点执行新建的节点                 return node;             }         }         //如果上面入队失败则调用enq方法入队         enq(node);         return node;     }     private void setHead(Node node) {         head = node; //将头结点指向node         node.thread = null;  //线程置空         node.prev = null;//因为是头节点了,不用需要前驱结点     }       /**      * 唤醒后续节点      */     private void unparkSuccessor(Node node) {         int ws = node.waitStatus;         if (ws < 0) compareAndSetWaitStatus(node, ws, 0);         Node s = node.next;         //如果后置节点是尾节点或Cancelled状态         if (s == null || s.waitStatus > 0) {             s = null;  //将当前后置节点置为null             for (Node t = tail; t != null && t != node; t = t.prev)                 if (t.waitStatus <= 0)                     s = t;         }         if (s != null)             LockSupport.unpark(s.thread);     }       /**      * 共享模式下      */     private void doReleaseShared() {         for (;;) {             Node h = head;             if (h != null && h != tail) {                 int ws = h.waitStatus;                 if (ws == Node.SIGNAL) {                     if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                         continue;            // loop to recheck cases                     unparkSuccessor(h);                 }                 else if (ws == 0 &&                          !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                     continue;                // loop on failed CAS             }             if (h == head)                   // loop if head changed                 break;         }     }       private void setHeadAndPropagate(Node node, int propagate) {         Node h = head; // Record old head for check below         setHead(node);         if (propagate > 0 || h == null || h.waitStatus < 0 ||             (h = head) == null || h.waitStatus < 0) {             Node s = node.next;             if (s == null || s.isShared())                 doReleaseShared();         }     }       /**      * 取消正在尝试获取锁的节点      */     private void cancelAcquire(Node node) {         if (node == null) return;         //cancel一个节点时会将当前节点thread置为null         node.thread = null;         // 循环跳过已设置了cancelled状态的节点         Node pred = node.prev;         while (pred.waitStatus > 0) node.prev = pred = pred.prev;         //存储上面得到的节点前驱节点         Node predNext = pred.next;         //将当前要cancel的节点状态设置CANCELLED         node.waitStatus = Node.CANCELLED;           //1 如果当前节点node是尾节点。更新尾节点为pred.next指向null,相当于删除了node(和pred到node间为cancel的节点)         if (node == tail && compareAndSetTail(node, pred)) {             compareAndSetNext(pred, predNext, null);         } else {             int ws;             //2 当前既不是尾节点,也不是head后继节点。设置node的前驱节点waitStatus为SIGNAL,node前驱节点指向后继节点,相当于删除node             if (pred != head &&                 ((ws = pred.waitStatus) == Node.SIGNAL ||                  (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&                 pred.thread != null) {                 Node next = node.next;                 if (next != null && next.waitStatus <= 0)                     compareAndSetNext(pred, predNext, next);             } else {                 //3 如果node是head的后继节点。则直接唤醒node的后继节点。在head后面的节点有资格尝试获取锁,但是当前node放齐了当前资格,所以会唤醒其后续的节点                 unparkSuccessor(node);             }               node.next = node; // help GC         }     }       /**      * 判断当前节点是否挂起      */     private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {         int ws = pred.waitStatus;         if (ws == Node.SIGNAL)  //当前状态下挂起             return true;         if (ws > 0) {             do {//跳过已被设置了cancelled的前驱节点                 node.prev = pred = pred.prev;             } while (pred.waitStatus > 0);             pred.next = node;         } else {             /** 将上级的等待状态设为SIGNAL */             compareAndSetWaitStatus(pred, ws, Node.SIGNAL);         }         return false;     }       static void selfInterrupt() {         Thread.currentThread().interrupt();     }       private final boolean parkAndCheckInterrupt() {         LockSupport.park(this);         return Thread.interrupted();     }       /**      * 尝试获取锁      */     final boolean acquireQueued(final Node node, int arg) {         boolean failed = true;         try {             boolean interrupted = false;             for (;;) {  //进入循环后会不断的尝试获取                 final Node p = node.predecessor();//获取当前节点的头结点!!只有head头结点才持有锁!!                 //如果当前的前驱节点是头结点则尝试获取锁。                 //如果尝试成功则将当前node设为头结点,并将旧的head设为null便于回收                 //获取失败看是否需要挂起,如果需要挂起则挂起线程等待下一次被唤醒时继续尝试获取锁。                 if (p == head && tryAcquire(arg)) {                     setHead(node);                     p.next = null; // 帮助GC                     failed = false;                     return interrupted;                 }                 //判断是否挂起(根据Node的状态=-3就会挂起),然后调用刮起的方法(里面调了Thread.interrupted();)                 if (shouldParkAfterFailedAcquire(p, node) &&                     parkAndCheckInterrupt())                       interrupted = true;             }         } finally {             if (failed)                 cancelAcquire(node);         }     }     。。。。。。。。。。。。。。。     // Main exported methods     /** 子类实现的尝试获取锁的方法 */     protected boolean tryAcquire(int arg) {         throw new UnsupportedOperationException();     }       /** 子类实现尝试释放锁的方法 */     protected boolean tryRelease(int arg) {         throw new UnsupportedOperationException();     }       /** 子类实现尝试获取共享锁的方法  */     protected int tryAcquireShared(int arg) {         throw new UnsupportedOperationException();     }       /** 子类实现尝试释放共享锁的方法  */     protected boolean tryReleaseShared(int arg) {         throw new UnsupportedOperationException();     }        /** 子类实现排他模式下状态是否占用  */     protected boolean isHeldExclusively() {         throw new UnsupportedOperationException();     }       /**      * 排他模式,获取互斥锁      */     public final void acquire(int arg) {         //尝试获取锁(tryAcquire在此类中是抛异常的,应在子类实现),         //如果尝试获取失败就调用acquireQueued再次尝试获取锁,addWaiter适用于新建一个新node         if (!tryAcquire(arg) &&             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))             selfInterrupt();     }       public final void acquireInterruptibly(int arg)              throws InterruptedException {}     public final boolean tryAcquireNanos(int arg, long nanosTimeout)             throws InterruptedException {}       public final boolean release(int arg) {         if (tryRelease(arg)) {//尝试释放锁成功             Node h = head;//获取当前被释放了锁的head头节点             //如果头节点不为空且当前节点状态正常就唤醒当前节点的后续节点             if (h != null && h.waitStatus != 0)                 unparkSuccessor(h);             return true;         }         return false;     }       public final void acquireShared(int arg) {         if (tryAcquireShared(arg) < 0)             doAcquireShared(arg);     }       public final void acquireSharedInterruptibly(int arg)             throws InterruptedException {... }       public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)             throws InterruptedException {........ }       public final boolean releaseShared(int arg) {         if (tryReleaseShared(arg)) {             doReleaseShared();             return true;         }         return false;     }       // 以下是队列检查方法     public final boolean hasQueuedThreads() {         return head != tail;//队列是否存在     }     public final boolean hasContended() {         return head != null;  //头结点是否为空     }     public final Thread getFirstQueuedThread() {         return (head == tail) ? null : fullGetFirstQueuedThread();     }      。。。。。。。。。。     //工具和监控方法     public final int getQueueLength() {         int n = 0;//拿到队列长度         for (Node p = tail; p != null; p = p.prev) {             if (p.thread != null)                 ++n;         }         return n;     }     //获取当前node queue的所有线程     public final Collection<Thread> getQueuedThreads() {         ArrayList<Thread> list = new ArrayList<Thread>();         for (Node p = tail; p != null; p = p.prev) {             Thread t = p.thread;             if (t != null)                 list.add(t);         }         return list;     }     ......的工具类..............................       /**      * condition queue, 单向队列。线程拿到锁,但条件不足时,会放到这个队列等待被唤醒      */     public class ConditionObject implements Condition, java.io.Serializable {         private static final long serialVersionUID = 1173984872572414699L;         private transient Node firstWaiter; //头结点         private transient Node lastWaiter;//尾节点           public ConditionObject() { }           private Node addConditionWaiter() {             Node t = lastWaiter;             // If lastWaiter is cancelled, clean out.             if (t != null && t.waitStatus != Node.CONDITION) {                 unlinkCancelledWaiters();                 t = lastWaiter;             }             Node node = new Node(Thread.currentThread(), Node.CONDITION);             if (t == null)                 firstWaiter = node;             else                 t.nextWaiter = node;             lastWaiter = node;             return node;         }           private void doSignal(Node first) {             do {                 if ( (firstWaiter = first.nextWaiter) == null)                     lastWaiter = null;                 first.nextWaiter = null;             } while (!transferForSignal(first) &&                      (first = firstWaiter) != null);         }           private void doSignalAll(Node first) {             lastWaiter = firstWaiter = null;             do {                 Node next = first.nextWaiter;                 first.nextWaiter = null;                 transferForSignal(first);                 first = next;             } while (first != null);         }           private void unlinkCancelledWaiters() {             Node t = firstWaiter;             Node trail = null;             while (t != null) {                 Node next = t.nextWaiter;                 if (t.waitStatus != Node.CONDITION) {                     t.nextWaiter = null;                     if (trail == null)                         firstWaiter = next;                     else                         trail.nextWaiter = next;                     if (next == null)                         lastWaiter = trail;                 }                 else                     trail = t;                 t = next;             }         }           public final void signal() {             if (!isHeldExclusively())                 throw new IllegalMonitorStateException();             Node first = firstWaiter;             if (first != null)                 doSignal(first);         }           public final void signalAll() {             if (!isHeldExclusively())                 throw new IllegalMonitorStateException();             Node first = firstWaiter;             if (first != null)                 doSignalAll(first);         }           public final void awaitUninterruptibly() {             Node node = addConditionWaiter();             int savedState = fullyRelease(node);             boolean interrupted = false;             while (!isOnSyncQueue(node)) {                 LockSupport.park(this);                 if (Thread.interrupted())                     interrupted = true;             }             if (acquireQueued(node, savedState) || interrupted)                 selfInterrupt();         }           private static final int REINTERRUPT =  1;         private static final int THROW_IE    = -1;           private int checkInterruptWhileWaiting(Node node) {             return Thread.interrupted() ?                 (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :                 0;         }           private void reportInterruptAfterWait(int interruptMode)             throws InterruptedException {             if (interruptMode == THROW_IE)                 throw new InterruptedException();             else if (interruptMode == REINTERRUPT)                 selfInterrupt();         }           public final void await() throws InterruptedException {             if (Thread.interrupted())                 throw new InterruptedException();             Node node = addConditionWaiter();             int savedState = fullyRelease(node);             int interruptMode = 0;             while (!isOnSyncQueue(node)) {                 LockSupport.park(this);                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)                     break;             }             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)                 interruptMode = REINTERRUPT;             if (node.nextWaiter != null) // clean up if cancelled                 unlinkCancelledWaiters();             if (interruptMode != 0)                 reportInterruptAfterWait(interruptMode);         }           public final long awaitNanos(long nanosTimeout)                 throws InterruptedException {             if (Thread.interrupted())                 throw new InterruptedException();             Node node = addConditionWaiter();             int savedState = fullyRelease(node);             final long deadline = System.nanoTime() + nanosTimeout;             int interruptMode = 0;             while (!isOnSyncQueue(node)) {                 if (nanosTimeout <= 0L) {                     transferAfterCancelledWait(node);                     break;                 }                 if (nanosTimeout >= spinForTimeoutThreshold)                     LockSupport.parkNanos(this, nanosTimeout);                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)                     break;                 nanosTimeout = deadline - System.nanoTime();             }             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)                 interruptMode = REINTERRUPT;             if (node.nextWaiter != null)                 unlinkCancelledWaiters();             if (interruptMode != 0)                 reportInterruptAfterWait(interruptMode);             return deadline - System.nanoTime();         }           public final boolean awaitUntil(Date deadline)                 throws InterruptedException {             long abstime = deadline.getTime();             if (Thread.interrupted())                 throw new InterruptedException();             Node node = addConditionWaiter();             int savedState = fullyRelease(node);             boolean timedout = false;             int interruptMode = 0;             while (!isOnSyncQueue(node)) {                 if (System.currentTimeMillis() > abstime) {                     timedout = transferAfterCancelledWait(node);                     break;                 }                 LockSupport.parkUntil(this, abstime);                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)                     break;             }             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)                 interruptMode = REINTERRUPT;             if (node.nextWaiter != null)                 unlinkCancelledWaiters();             if (interruptMode != 0)                 reportInterruptAfterWait(interruptMode);             return !timedout;         }           public final boolean await(long time, TimeUnit unit)                 throws InterruptedException {             long nanosTimeout = unit.toNanos(time);             if (Thread.interrupted())                 throw new InterruptedException();             Node node = addConditionWaiter();             int savedState = fullyRelease(node);             final long deadline = System.nanoTime() + nanosTimeout;             boolean timedout = false;             int interruptMode = 0;             while (!isOnSyncQueue(node)) {                 if (nanosTimeout <= 0L) {                     timedout = transferAfterCancelledWait(node);                     break;                 }                 if (nanosTimeout >= spinForTimeoutThreshold)                     LockSupport.parkNanos(this, nanosTimeout);                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)                     break;                 nanosTimeout = deadline - System.nanoTime();             }             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)                 interruptMode = REINTERRUPT;             if (node.nextWaiter != null)                 unlinkCancelledWaiters();             if (interruptMode != 0)                 reportInterruptAfterWait(interruptMode);             return !timedout;         }     。。。。。。。。。。之类的工具类。。。。。。。。     }       private static final Unsafe unsafe = Unsafe.getUnsafe();     private static final long stateOffset;     private static final long headOffset;     private static final long tailOffset;     private static final long waitStatusOffset;     private static final long nextOffset;       static {         try {             stateOffset = unsafe.objectFieldOffset                 (AbstractQueuedSynchronizer.class.getDeclaredField("state"));             headOffset = unsafe.objectFieldOffset                 (AbstractQueuedSynchronizer.class.getDeclaredField("head"));             tailOffset = unsafe.objectFieldOffset                 (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));             waitStatusOffset = unsafe.objectFieldOffset                 (Node.class.getDeclaredField("waitStatus"));             nextOffset = unsafe.objectFieldOffset                 (Node.class.getDeclaredField("next"));           } catch (Exception ex) { throw new Error(ex); }     }       private final boolean compareAndSetHead(Node update) {         return unsafe.compareAndSwapObject(this, headOffset, null, update);     }     。。。。。一堆CAS方法。。。。。。。。 }                         参考: http://ifeve.com/introduce-abstractqueuedsynchronizer/ 以下四篇: https://www.jianshu.com/p/9ebca222513b https://www.jianshu.com/p/c806dd7f60bc https://www.jianshu.com/p/01f2046aab64 https://www.jianshu.com/p/134f494d76d8                                              
    最新回复(0)