本博客先简单看下模板模式,然后解析下java中的AQS是如何使用模板模式的,最后使用AQS自定义一种锁。
顾名思义,本模式旨在"套模板",跟写PPT时套模板道理一样,PPT模板事先给好布局、图片、配色等,用户添加自己的内容即可。模板模式则是父类事先准备好一些函数框架,子类(用户类)继承父类并实现自己的功能即可。UML图如下: 父类为虚类,其中execute方法伪代码如下:
private void execute(){ other logic..... methodOne(); other logic..... other logic..... methodTwo(); other logic..... methodThree(); other logic..... }即execute方法内容是固定的,子类不可复写,但是methodOne等方法是protected的,子类可以复写,这就达到一种目的:父类核心流程不能改,但是具体实现可以由子类实现。
全称:AbstractQueuedSynchronizer(队列同步器),以下基于jdk12分析,某些方法已不同于java7、8 它是一种机制,用数字表示资源个数,用一个队列存储想获取资源的线程信息,并通过各种三种方式使线程竞争资源。 这三种方式是
独占资源:一次仅能有一个线程访问资源共享资源:一次能有多个线程访问资源超时独占资源:与独占资源一样,只不过支持超时放弃首先来看资源控制:
/** * The synchronization state. */ private volatile int state;注释中说state为同步状态,AQS子类通过set和get这个变量控制同步状态,可以视为资源个数,下图可以看到getState方法被AQS子类调用。为volatile修饰,保证了内存可见性。 存储线程信息的队列,这里称作同步队列吧,是一个双向链表,节点信息如下: 关键信息图中已标注,节点类型分为SHARED共享型和EXCLUSIVE排他(独占)型,节点状态有CANCELLED、SIGNAL等,当然还有当前线程本身。 下面分三种情况分析线程竞争情况:
AQS中的模板方法为:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }此方法定义了一种流程,子类不可复写,子类需要做的是复写tryAcquire方法,即获取资源(设置state变量)的方式,如下可以看到这个方法默认抛出异常,即子类要想实现独占锁,这个方法必须复写。后文简单介绍可重入锁的实现,即基于此方法。
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }从acquire方法看到,如果tryAcquire失败了,即没有当前线程没有获取到资源,会先调用addWaiter方法,即向同步队列中添加一个包含当前线程的节点:
private Node addWaiter(Node mode) { Node node = new Node(mode); for (;;) { Node oldTail = tail; //1 if (oldTail != null) { //2 node.setPrevRelaxed(oldTail); //3 if (compareAndSetTail(oldTail, node)) { oldTail.next = node; return node; } } else { initializeSyncQueue(); } } }可以看到是一个自旋添加的过程,tail为成员变量,即同步队列中的末尾,为volatile修饰,只有通过CAS机制判断当前tail符合预期值时才会将新建的node添加到末尾。这里的CAS是不断获取队列末尾,看这个末尾是否与经过123操作后的末尾是否一致,因为用的volatile修饰,内存可见性得到保证,便防止了其他线程对tail的修改,CAS本质上是一种忙则等待的调度策略。CAS一般称为CompareAndSwap,最终调用Unsafe的相关方法,其实jdk12已经不这么叫了,这是jdk8及以前的叫法,jdk12称为CompareAndSet,最终调用VarHandle的相关方法,但是道理是一样的,都是拿内存中最新值与预期值对比,如果一致则赋予变量另一个新值。
addWaiter成功后各个线程调用accquiredQueued方法进入自旋状态:
final boolean acquireQueued(final Node node, int arg) { boolean interrupted = false; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return interrupted; } if (shouldParkAfterFailedAcquire(p, node)) interrupted |= parkAndCheckInterrupt(); } } catch (Throwable t) { cancelAcquire(node); if (interrupted) selfInterrupt(); throw t; } }这里需要注意两点,一是只有当前节点的前驱为头节点时才可尝试获取资源(tryAcquire),保证队列的先进先出嘛, 二是一个细节,p.next = null; 这个操作使得p的后继标记为空,帮助GC完成垃圾收集。对象标记为null来促进GC在大部分情况下是错误的!因为编译器会对这个操作进行优化,赋值为null其实大部分情况下是被干掉的,没有意义,但这里是正确的,后续可能更新博客详细解释。
至此,一个线程可以顺利获取资源,如果获取不到则添加到同步队列里,一旦自旋轮到自己就可以获取到资源。可是还没释放呢,总不能自己占着不然别人用吧。下边是释放资源相关逻辑:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }同样tryRelease是子类必须复写的方法,否则直接抛异常。释放资源(修改同步状态state)成功后,会执行下边的核心方法unparkSuccessor:
private void unparkSuccessor(Node node) { ……………其他逻辑…………… /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node p = tail; p != node && p != null; p = p.prev) if (p.waitStatus <= 0) s = p; } if (s != null) LockSupport.unpark(s.thread); }大部分情况会执行LockSupport.unpark(s.thread);注释也说道如果s为空了,则从后向前遍历得到一个非空的继任者,但最终都是调用LockSupport.unpark(s.thread);这个方法作用是唤醒某个线程,在release这个情景下就是唤醒下一个等待的线程。注意这里release后没有直接删除相应节点,删除节点在上述acquireQueued方法中。
至此一个独占式锁完成。大体流程如下:
共享式与独占式核心区别在于,前者可以同时有多个线程访问同步状态state,后者仅能有一个线程访问,与独占锁类似,AQS提供如下模板:
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }tryAcquireShared为子类复写,注意这里不同的是返回数值小于0后才执行doAcquireShared,意为,共享资源可以有多个线程同时访问资源。
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean interrupted = false; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC return; } } if (shouldParkAfterFailedAcquire(p, node)) interrupted |= parkAndCheckInterrupt(); } } catch (Throwable t) { cancelAcquire(node); throw t; } finally { if (interrupted) selfInterrupt(); } }与独占类似,也是添加节点,然后进入自旋,如果前驱是头节点则尝试获取资源,不在赘述。 释放资源最终也是调用unparkSuccessor方法,与独占锁一致。
与独占模型一样,只不过加上超时控制:
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return true; } nanosTimeout = deadline - System.nanoTime(); //核心 if (nanosTimeout <= 0L) { cancelAcquire(node); return false; } if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } catch (Throwable t) { cancelAcquire(node); throw t; } }核心在于加入nanosTimeout字段用于判断自旋状态是否超时,超时则取消请求,其他逻辑与独占类似。
重入锁,顾名思义,可以重新进入的锁,通俗来讲就是说,当一个线程获取到锁以后,该线程又请求同样的锁,可重入就是允许这种情况发生,不可重入就是不允许同一个线程请求同一个锁。常见的锁基本都是可重入锁,比如synchronized关键字其实是可重入的。java中有ReentrantLock,即为基于AQS的可重入锁。 ReentrantLock分为公平锁和非公平锁,下边分别来看。 首先可重入的前提是同步状态的意义,这时state字段不是资源数目,而是当前线程获得此资源的次数,当一个线程获取到此资源时,state加一,释放一次时减一,当state为0时说明没有线程占用此资源。
非公平锁 所谓非公平锁,关键点在于当一个线程请求资源时先不必关心AQS中的同步队列,如下代码,当资源没有被占用时,当前线程直接执行CAS操作,如果成功,则此资源被当前线程占用,并把当前线程设为“owner thread”。如果state不是0时,先进行判断,当前线程是否是“owner thread”,不是则不往下进行,tryAcquire返回失败,进行AQS的addWaiter以及后续操作,这便实现了独占。如果当前是“owner thread”,则state加acquires,实现计数(一般是+1),这里没有必要使用CAS,因为是单线程。 随后可以执行释放资源,即进行state减,见下面代码,不在赘述。
@ReservedStackAccess final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } @ReservedStackAccess protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }公平锁 与非公平锁不同的地方在于获取资源部分,即tryAcquire,如下代码,比非公平多了一个hasQueuedPredecessors方法的判断,其余都一样。
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }继续看hasQueuedPredecessors这个方法干了什么:
public final boolean hasQueuedPredecessors() { Node h, s; if ((h = head) != null) { if ((s = h.next) == null || s.waitStatus > 0) { s = null; // traverse in case of concurrent cancellation for (Node p = tail; p != h && p != null; p = p.prev) { if (p.waitStatus <= 0) s = p; } } if (s != null && s.thread != Thread.currentThread()) return true; } return false; }这个方法是AQS类的方法,不可复写,目的在于查看同步队列中有无前驱节点,如果没有前驱,则说明当前线程前边没有别的线程尝试获取资源,当前线程才会进行CAS操作尝试获取资源。如果有前驱,当前线程不会尝试获取资源,而是执行addWaiter以及后续操作,主动进入同步队列。 所以,非公平锁和公平锁区别就在于,当某线程执行tryAcquire时,非公平锁不关心同步队列,直接尝试获取资源,成功了就获取到资源了,失败了才进入到同步队列,而公平锁首先会检查同步队列是不是有节点,如果没有才尝试获取资源,如果有节点则直接进入同步队列排队。通俗一点就是非公平锁没素质,直接插队,公平锁直接排队,到我了才获取资源。
Semaphore是用来控制并发线程数的一种机制。先看下java的注释:
* A counting semaphore. Conceptually, a semaphore maintains a set of * permits. Each {@link #acquire} blocks if necessary until a permit is * available, and then takes it. Each {@link #release} adds a permit, * potentially releasing a blocking acquirer. * However, no actual permit objects are used; the {@code Semaphore} just * keeps a count of the number available and acts accordingly. * * <p>Semaphores are often used to restrict the number of threads than can * access some (physical or logical) resource.大意是Semaphore持有一些许可,这些许可并不是真正的对象,而只是一个数字在记录,它可以用来限制并发的线程数。
比如有50个线程,但是同时并发执行的线程仅能有10个,其他线程处于阻塞状态。换一种方式解释,就是说,仅有10个资源,却有50个线程想获取资源,当然只能一部分一部分地获取,比如0~9号线程获取到了资源,开始执行各自的逻辑,假如3号线程执行完毕,会释放一个资源,同时唤醒阻塞队列第一个节点,使得该节点的线程得以访问资源。这就是共享锁啊!查看java源码发现,确实是基于AQS实现的共享锁。
final int getPermits() { return getState(); } final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } }可以看到,permit(许可)其实就是AQS的state变量,在这里指代资源数量,与可重入锁一样,也区分公平和非公平,原理也一致。这里贴的是非公平锁的代码,可以看到tryAcquire方法试图获取资源,获取到则占用,获取不到则进入AQS的addWaiter逻辑。release为释放资源,都是对state这个数的操作,与前边描述一致,不再赘述。
自定义锁的实现大体流程:确定实现独占还是共享锁 --> 继承AQS -->实现acquire和release方法; 如下自定义了一个共享锁,其实是造了个semaphore的简单轮子,支持自定义资源个数:
public class NSourceLock implements Lock { private Sync sync; public NSourceLock() { sync = new Sync(1); } public NSourceLock(long sourceCount) { sync = new Sync(sourceCount); } /** * 写成静态内部类,继承AQS模板类 */ private static final class Sync extends AbstractQueuedLongSynchronizer { Sync(long sourceCount) { if (sourceCount < 1) { throw new IllegalStateException("source should be no less than 1"); } //调用AQS方法,设置资源数目 setState(sourceCount); } @Override protected long tryAcquireShared(long arg) { //自旋 while (true) { long cur = getState(); //当前资源数,注意state字段是volatile修饰,这里保证可见性 long newCount = cur - arg; //请求多少资源,arg为请求资源数 if (newCount < 0) { return newCount; //分配失败,返回负数,详见AQS注释 } if (compareAndSetState(cur, newCount)) { return newCount; //分配成功 } } } @Override protected boolean tryReleaseShared(long arg) { while (true) { long cur = getState(); long newCount = cur + arg; if (compareAndSetState(cur, newCount)) { return true; } } } } /** * Lock接口方法 */ @Override public void lock() { sync.acquireShared(1L); } @Override public void lockInterruptibly() throws InterruptedException { } @Override public boolean tryLock() { return false; } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } @Override public void unlock() { sync.releaseShared(1L); } @Override public Condition newCondition() { return null; } }调用:设置5个资源,8个线程共享这5个资源。
NSourceLock nSourceLock = new NSourceLock(5); for (int i = 0; i < 8; i++) { new Thread(() -> { nSourceLock.lock(); try { System.out.println(Thread.currentThread().getName() + " lock " + System.currentTimeMillis()); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println(Thread.currentThread().getName() + " unlock " + System.currentTimeMillis()); nSourceLock.unlock(); } }, "th" + i).start(); }输出: 输出可看出,首先是0~4号线程获取到资源,大约一秒后0号释放一个资源,5号得以获取一个资源;随后2号释放一个,6号获取一个;随后3号释放一个,7号获取一个;随后相继释放资源。