AbstractQueuedSynchronizer为锁机制维护了一个队列,需要获取锁的线程们排在队列中,只有排在队首的线程才有资格获取锁。
首先看张图,取自《Java并发编程的艺术》: 然后看如下,AbstractQueuedSynchronizer源码及其分析如下: /** * 提供一个阻塞锁和相关依赖FIFO等待队列同步器的实现。 * 这个类支持排他和共享模式。排他模式下当一个已获取到了,其他线程尝试获取不可能成功。共享模式可以被多个线程获取。通常子类实现仅支持其中一种,但是也有两种的支持的如ReadWriteLock。 * 这个类定义了一个实现了Condition的内部类ConditionObject,用于排他模式。 * 使用一个基础的同步器需要重新定义以下方法: * <li> {@link #tryAcquire} * <li> {@link #tryRelease} * <li> {@link #tryAcquireShared} * <li> {@link #tryReleaseShared} * <li> {@link #isHeldExclusively} * 以上的每个方法均默认抛出{UnsupportedOperationException}错误,所以以上的几个方法没有提供默认实现,需要子类重写。 * 这个类提供了一个有效的、可伸缩的基础给同步器如状态、acquire获取和的同步器释放参数、内部FIFO等待队。当这些不够用时,可使用atomic、Queue、LockSupport。 */ public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { private static final long serialVersionUID = 7373984972572414691L; protected AbstractQueuedSynchronizer() { } /** * 等待队列的node class。Node作为等待队列的节点 * 这个等待队列是CLH的变体,CLH一般用于自旋锁。使用其代替一般的同步器,但也用了相同的策略来控制。 */ static final class Node { /** 共享模式 */ static final Node SHARED = new Node(); /** 排他模式 */ static final Node EXCLUSIVE = null; /** 当前线程被取消 */ static final int CANCELLED = 1; /** 当前节点的后继节点包含的线程需要运行*/ static final int SIGNAL = -1; /** 当前结点在condition队列中 */ static final int CONDITION = -2; /** 当前场景下后续的acquireShared能够得以执行 */ static final int PROPAGATE = -3; /** 当前节点的状态。*/ volatile int waitStatus; /** 前驱结点 */ volatile Node prev; /** 后继节点 */ volatile Node next; /** 入队线程 */ volatile Thread thread; /** 存储condition队列中的后继节点 */ Node nextWaiter; final boolean isShared() { return nextWaiter == SHARED;} /** * 返回前驱节点 */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } .............................. } /** * 仅用于初始化等待队列的head。只能通过setHead修改,当这个head还存在时不能将waitStatus=>cancelled */ private transient volatile Node head; /** Tail节点初始化,仅能通过enq追加新的wait node*/ private transient volatile Node tail; /** synchronization state. */ private volatile int state; /** CAS原子性的修改 synchronization state ,拉到代码最下面可见其值的设置*/ protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } static final long spinForTimeoutThreshold = 1000L; /** * 为队列追加node节点 */ private Node enq(final Node node) { for (;;) {//一直循环入队,直到成功 Node t = tail; if (t == null) { //同样获取尾节点,并且如果为空就将尾节点初始化为头结点head一样 if (compareAndSetHead(new Node())) tail = head; } else { //尾节点不为空就执行addWaiter一样的过程把新的node加到最后 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } /** * 新建Node并入队 */ private Node addWaiter(Node mode) { //新建一个Node Node node = new Node(Thread.currentThread(), mode); // 存储当前尾节点(当作旧的尾节点) Node pred = tail; if (pred != null) { //如果当前尾节点不为空 node.prev = pred; //将新建的节点的前驱节点执行旧的为节点 if (compareAndSetTail(pred, node)) {//CAS原子替换当前尾节点从旧的替换到新建node的位置 pred.next = node;//将旧的尾节点位置的后置节点执行新建的节点 return node; } } //如果上面入队失败则调用enq方法入队 enq(node); return node; } private void setHead(Node node) { head = node; //将头结点指向node node.thread = null; //线程置空 node.prev = null;//因为是头节点了,不用需要前驱结点 } /** * 唤醒后续节点 */ private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; //如果后置节点是尾节点或Cancelled状态 if (s == null || s.waitStatus > 0) { s = null; //将当前后置节点置为null for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); } /** * 共享模式下 */ private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } } /** * 取消正在尝试获取锁的节点 */ private void cancelAcquire(Node node) { if (node == null) return; //cancel一个节点时会将当前节点thread置为null node.thread = null; // 循环跳过已设置了cancelled状态的节点 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; //存储上面得到的节点前驱节点 Node predNext = pred.next; //将当前要cancel的节点状态设置CANCELLED node.waitStatus = Node.CANCELLED; //1 如果当前节点node是尾节点。更新尾节点为pred.next指向null,相当于删除了node(和pred到node间为cancel的节点) if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { int ws; //2 当前既不是尾节点,也不是head后继节点。设置node的前驱节点waitStatus为SIGNAL,node前驱节点指向后继节点,相当于删除node if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { //3 如果node是head的后继节点。则直接唤醒node的后继节点。在head后面的节点有资格尝试获取锁,但是当前node放齐了当前资格,所以会唤醒其后续的节点 unparkSuccessor(node); } node.next = node; // help GC } } /** * 判断当前节点是否挂起 */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) //当前状态下挂起 return true; if (ws > 0) { do {//跳过已被设置了cancelled的前驱节点 node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /** 将上级的等待状态设为SIGNAL */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } static void selfInterrupt() { Thread.currentThread().interrupt(); } private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } /** * 尝试获取锁 */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //进入循环后会不断的尝试获取 final Node p = node.predecessor();//获取当前节点的头结点!!只有head头结点才持有锁!! //如果当前的前驱节点是头结点则尝试获取锁。 //如果尝试成功则将当前node设为头结点,并将旧的head设为null便于回收 //获取失败看是否需要挂起,如果需要挂起则挂起线程等待下一次被唤醒时继续尝试获取锁。 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // 帮助GC failed = false; return interrupted; } //判断是否挂起(根据Node的状态=-3就会挂起),然后调用刮起的方法(里面调了Thread.interrupted();) if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } 。。。。。。。。。。。。。。。 // Main exported methods /** 子类实现的尝试获取锁的方法 */ protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } /** 子类实现尝试释放锁的方法 */ protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } /** 子类实现尝试获取共享锁的方法 */ protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } /** 子类实现尝试释放共享锁的方法 */ protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } /** 子类实现排他模式下状态是否占用 */ protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); } /** * 排他模式,获取互斥锁 */ public final void acquire(int arg) { //尝试获取锁(tryAcquire在此类中是抛异常的,应在子类实现), //如果尝试获取失败就调用acquireQueued再次尝试获取锁,addWaiter适用于新建一个新node if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } public final void acquireInterruptibly(int arg) throws InterruptedException {} public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {} public final boolean release(int arg) { if (tryRelease(arg)) {//尝试释放锁成功 Node h = head;//获取当前被释放了锁的head头节点 //如果头节点不为空且当前节点状态正常就唤醒当前节点的后续节点 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException {... } public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {........ } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } // 以下是队列检查方法 public final boolean hasQueuedThreads() { return head != tail;//队列是否存在 } public final boolean hasContended() { return head != null; //头结点是否为空 } public final Thread getFirstQueuedThread() { return (head == tail) ? null : fullGetFirstQueuedThread(); } 。。。。。。。。。。 //工具和监控方法 public final int getQueueLength() { int n = 0;//拿到队列长度 for (Node p = tail; p != null; p = p.prev) { if (p.thread != null) ++n; } return n; } //获取当前node queue的所有线程 public final Collection<Thread> getQueuedThreads() { ArrayList<Thread> list = new ArrayList<Thread>(); for (Node p = tail; p != null; p = p.prev) { Thread t = p.thread; if (t != null) list.add(t); } return list; } ......的工具类.............................. /** * condition queue, 单向队列。线程拿到锁,但条件不足时,会放到这个队列等待被唤醒 */ public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; private transient Node firstWaiter; //头结点 private transient Node lastWaiter;//尾节点 public ConditionObject() { } private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); } private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } } public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } public final void awaitUninterruptibly() { Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean interrupted = false; while (!isOnSyncQueue(node)) { LockSupport.park(this); if (Thread.interrupted()) interrupted = true; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); } private static final int REINTERRUPT = 1; private static final int THROW_IE = -1; private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); } public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } public final long awaitNanos(long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return deadline - System.nanoTime(); } public final boolean awaitUntil(Date deadline) throws InterruptedException { long abstime = deadline.getTime(); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (System.currentTimeMillis() > abstime) { timedout = transferAfterCancelledWait(node); break; } LockSupport.parkUntil(this, abstime); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; } public final boolean await(long time, TimeUnit unit) throws InterruptedException { long nanosTimeout = unit.toNanos(time); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { timedout = transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; } 。。。。。。。。。。之类的工具类。。。。。。。。 } private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long stateOffset; private static final long headOffset; private static final long tailOffset; private static final long waitStatusOffset; private static final long nextOffset; static { try { stateOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("state")); headOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("tail")); waitStatusOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("waitStatus")); nextOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("next")); } catch (Exception ex) { throw new Error(ex); } } private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update); } 。。。。。一堆CAS方法。。。。。。。。 } 参考: http://ifeve.com/introduce-abstractqueuedsynchronizer/ 以下四篇: https://www.jianshu.com/p/9ebca222513b https://www.jianshu.com/p/c806dd7f60bc https://www.jianshu.com/p/01f2046aab64 https://www.jianshu.com/p/134f494d76d8