逐步理解java同步队列AbstractQueuedSynchronizer

    xiaoxiao2022-07-13  151

    java并发工具类AbstractQueuedSynchronizer

    文章开头

    自己对读源码的一些感受: 读JDK的源码宛若一件艺术品,其中蕴含着令人沉醉的滋味,但同时也带给读者或多或少的困惑,想要理解大师们的思想并非易事,用于实践更是困难重重,在我看来读源码也有技巧和方法。

    先了解该类大概的作用以及相关的继承体系做到心中有数基于事件驱动,合理分配时间,而不是一字一句从头到尾遍历,这样对我个人来说时间上不允许并且效率低下,带着问题去看,遇见不懂得地方观察是否是首要解决的问题,不是就大概理解其意,以后回头来看。随后要是有乐趣刨根问底也不错

    进入主题

    接下来就分析一下java.util.concurrent包中AbstractQueuedSynchronizer的具体实现,AQS数据结构上是一个带头尾指针的双向链表实现FIFO的同步队列,是ReentrantLock,CountDownLatch等并发工具类的根基,下面我们以ReentrantLock为例从源码角度描述一下AQS的实现。

    AQS结构特征

    //队列的头结点 private transient volatile Node head; //队列的尾结点 private transient volatile Node tail; //可以当作共享资源,为0就是当前锁没有线程占有 // 大于0表示已经有线程占有锁资源,是实现CAS算法和互斥的关键 private volatile int state;

    Node节点是AQS中静态内部类,用来封装线程的相关信息,也是链表的组成部分,Node有多种状态

    SHARED :共享状态 在Semaphore和ReentrantReadWriteLock中有实现EXCLUSIVE :独占状态CANCELLED:删除状态,意思说当前结点无效了,线程不用抢占锁SIGNAL :当前结点释放锁资源后唤醒后续结点进行抢占锁资源CONDITION :作用于Condition队列中 static final class Node { //共享状态下默认空参实例化 static final Node SHARED = new Node(); //独占状态先不实例化 static final Node EXCLUSIVE = null; //删除状态的标识为1 static final int CANCELLED = 1; //通知后续结点进行资源抢占的标识为 -1 static final int SIGNAL = -1; //条件队列中标识为-2 static final int CONDITION = -2; //先掠过 (!= = !) static final int PROPAGATE = -3; //结点目前状态标识 volatile int waitStatus; //前置指针,指向前一个结点 volatile Node prev; //后置指针,指向后一个结点 volatile Node next; //当前线程 volatile Thread thread; //在共享状态和Condition中实现 Node nextWaiter; final boolean isShared() { return nextWaiter == SHARED; } //指向当结点的前一个结点 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; }

    相信到这里,大家对于AQS有一个基本的认识,就是一个双向链表实现的队列!

    注意!下面是加锁流程

    //我们稍后分析ReentrantLock中的实现,首先当前线程尝试获取锁,!tryAcquire(arg)若尝试获取锁成功就直接退出,失败就进入 public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } //protected修饰空实现,稍后再ReentranLock中介绍,这里说明一下,这是一种模板方法设计模式,具体的实现在子类中,目的是减少重复代码 protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }

    addWaiter方法做了两件事

    将当前线程以某总状态封装成Node结点(若是Node.EXCLUSIVE即独占式的结点)加入到同步队列末尾CAS方式进行锁资源抢占(CAS无锁算法之后在说明) private Node addWaiter(Node mode) { //封装Node结点 Node node = new Node(Thread.currentThread(), mode); //临时引用指向尾结点 Node pred = tail; if (pred != null) { node.prev = pred; //先尝试以CAS方式将当前结点设置成尾结点 if (compareAndSetTail(pred, node)) { //当前线程成功设置成尾结点,就将原来Node的next域指向当前Node pred.next = node; return node; } } //说明发生竞争或者队列还是空的 enq(node); return node; }

    看一看compareAndSetTail的实现,其调用Unsafe类的compareAndSwapObject 方法,tailOffset是该域相对于对象头地址的偏移量,直接获取内存的数据和预期值比较,若相等则替换,避免线程挂起,唤醒进行上下文切换

    private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); }

    enq方法的实现 总体来说就是一个自旋加入到队列尾的过程

    private Node enq(final Node node) { //死循环,不断自旋直到当前线程成功加入到队列尾部 for (;;) { Node t = tail; if (t == null) { //队列为空的情况下先初始化一个头节点 //注意空参构造waitStatus == 0 if (compareAndSetHead(new Node())) tail = head; } else { //cas方式加入到队列尾部 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; //成功就返回 return t; } } } }

    线程封装成Node结点加入到阻塞队列还要进行挂起操作

    //返回一下,回到acquire,避免迷路 public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } final boolean acquireQueued(final Node node, int arg) { //一个标记,若该过程没有发生异常就置为false boolean failed = true; try { //线程中断标记 boolean interrupted = false; //又是一个自旋 for (;;) { //之前说明了,获取当前结点的前置结点 final Node p = node.predecessor(); //若前置结点是头节点并且尝试获取锁成功 //之前在enq方法里面若队列没有节点就new Node() //此时可以先去尝试抢占一下锁,在ReentrantLock中的setExclusiveOwnerThread(current); 方法设置独占的线程,但空参初始化没有设置线程 if (p == head && tryAcquire(arg)) { //CAS方式设置当前结点为头结点 setHead(node); p.next = null; //协助 GC //此过程没有异常 failed = false; return interrupted; } //也就是说当前结点没有条件获取锁资源,且往下看 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) //发生异常删除当前结点 cancelAcquire(node); } }

    简单的说 :就是判断一下当前线程是否可以进行挂起操作,因为一直得不到锁资源,避免一直占用cpu

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //之前已经说明了,获取前置结点的状态 int ws = pred.waitStatus; //若是SIGNAL即-1,说明当前线程可以安心的休眠,因为当前结点的前置结点若是SIGNAL状态,即释放了锁资源就能唤醒当前线程 if (ws == Node.SIGNAL) return true; //大于0就得向前遍历,找到最近一个状态是signal状态的结点 if (ws > 0) { do { //不错的方法,可以借鉴 node.prev = pred = pred.prev; } while (pred.waitStatus > 0) pred.next = node; } else { //如果是其他状态也要置为SIGNAL状态 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }

    下面是真正的中断操作

    private final boolean parkAndCheckInterrupt() { //调用LockSupport的park,挂起当前线程,操作系统级别的实现 LockSupport.park(this); return Thread.interrupted(); }

    下面说说ReentrantLock的加锁实现

    首先ReentrantLock可实现公平和非公平锁,支持可重入 private final Sync sync; //内部抽象类Sync继承了AbstractQueuedSynchronizer abstract static class Sync extends AbstractQueuedSynchronizer { //加锁抽象方法 abstract void lock(); //非公平尝试获取锁的实现 final boolean nonfairTryAcquire(int acquires) { //获取当前线程 final Thread current = Thread.currentThread(); //获取当前state变量的值,当前变量就是互斥变量 int c = getState(); //等于0说明没有线程占有锁资源 if (c == 0) { //先用CAS设置state变量为acquires(一般为1),成功的话说明当前线程直接占有锁资源 if (compareAndSetState(0, acquires)) { //就设置独占锁资源的线程为当前线程 setExclusiveOwnerThread(current); return true; } }//当前state互斥变量不为0,说明有竞争,若当前线程是独占锁资源的线程 //这里就是一个可重入的操作,说明可重入是在尝试获取锁这个方法里,同理,公平锁的实现也一致 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } //解锁操作 protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } }

    这里是非公平锁的实现 注意和公平锁实现的不同之处还模板方法设计模式的具体实现

    //非公平锁的实现,继承Sync static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; //这里是非公平锁实现Sync抽象方法lock final void lock() { //cas方式占有锁资源,显而易见非公平每个线程都能争取一下锁资源,不管先来后到 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else //抢占失败就得调用AQS的acquire方法,将线程封装成Node节点加入阻塞队列末尾允许的化还要进行挂起操作 acquire(1); } //这里是模板方法设计模式的实现,思考一下,ReentrantLock有飞公平和公平锁实现,区别就是在尝试获取锁的时候,公平锁要先判断同步队列中有没有先进入的结点,要讲资历,但是其他的封装成Node节点加入阻塞队列以及挂起操作步骤一致,若是公平和非公平锁都重写一遍,造成代码重复,尝试获取锁这一步才用重写 protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } //公平锁实现 static final class FairSync extends Sync { //对比非公平锁不同的是没有先进行CAS抢占锁,直接调用acquire方法 final void lock() { acquire(1); } //非公平锁尝试获取锁实现 protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //前面的代码就不描述了,这一步是关键 !hasQueuedPredecessors()看看其实现 / * public final boolean hasQueuedPredecessors() { * Node t = tail; //指向尾结点 * Node h = head; //指向头结点 * Node s; * //若只有一个结点,头尾指针指向同一块内存区域,直接返回false * return h != t && * ((s = h.next) == null || s.thread != Thread.currentThread()); * } */ //即要先来先服务,也就是公平锁的实现 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //同理就是可重入的实现 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } } //默认情况下是非公平的实现 public ReentrantLock() { sync = new NonfairSync(); } //传入参数为true为公平锁创建 public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }

    在这里ReentrantLock的非公平和公平锁大致就讲完了,不过还有一些细节,在ReentrantLock可实现等待可中断,这个方法在LinkedBlockingQueue中有实现,这里不详细说BlockingQueue

    public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } // public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } // 与之前已经分析过的acquireQueued方法大致相同,不同的是 // final Node node = addWaiter(Node.EXCLUSIVE); private void doAcquireInterruptibly(int arg) throws InterruptedException { //当前节点是独占式的 final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }

    解锁流程

    经过加锁的洗礼相信解锁操作会更容易理解,在ReentrantLock 中具体实现如下

    // 通常在finally中调用解锁方法 public void unlock() { sync.release(1); } public final boolean release(int arg) { //尝试解锁成功 if (tryRelease(arg)) { //指向头结点 Node h = head; //头结点不为空并且结点状态不为0 if (h != null && h.waitStatus != 0) //进行唤醒操作 unparkSuccessor(h); return true; } return false; } private void unparkSuccessor(Node node) { //获取当前结点状态 int ws = node.waitStatus; if (ws < 0) //CAS方式将节点状态置为0 compareAndSetWaitStatus(node, ws, 0); Node s = node.next; //当前结点后一个结点为空或者状态是大于0的 if (s == null || s.waitStatus > 0) { s = null; //从尾结点向前开始遍历,找到离当前节点最近的状态为-1的结点 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //后一个结点不为空,唤醒后一个结点 if (s != null) //唤醒当前线程 LockSupport.unpark(s.thread); }

    总结一下

    AQS数据结构上是一个带头尾指针的双向链表实现的同步队列,使用了模板方法设计模式,具体的尝试获取锁在不同的子类中实现,维护的一个变量state用于当作互斥变量加锁流程:对于非公平锁,先判断互斥变量state是不是为0,为0没有其他线程占用锁的情况下,先用CAS 方式尝试获取锁,失败就将当前线程以独占式封装成Node节点以CAS或者自旋的方法加入到同步队列末尾,判断当前结点前置结点是否是头结点,是头结点,头结点尝试获取锁成功,将当前结点设置成头结点,否则进行挂起操作,找到离当前结点最近的状态为-1的Node结点,就安心休眠,公平锁大致相同,可以参照以上代码解锁流程:释放时候主要是要进行后序结点的唤醒操作当然AQS的功能不仅仅是这些,还有Condition条件队列以及中断的实现,下文在说。
    最新回复(0)