LinkedBlockingQueue源码解读

    xiaoxiao2022-07-09  72

    Node LinkedBlockingQueue链表节点,单向节点

    //Node节点类,单向节点 static class Node<E> { E item; /** * One of: * - the real successor Node 继任节点 * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) next为空意味当前节点为链尾 */ Node<E> next; Node(E x) { item = x; } }

    构造方法

    //默认无界队列 public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }

    LinkedBlockingQueue指定容量,链头和链尾都非空对象但item为空

    public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }

    全局变量

    /** The capacity bound, or Integer.MAX_VALUE if none */ //链表容量,默认Integer.MAX_VALUE,即无界队列 private final int capacity; /** Current number of elements */ //当前队列元素总量 private final AtomicInteger count = new AtomicInteger(); /** * Head of linked list. 链头 * Invariant: head.item == null //不变形:链头元素永为空 */ transient Node<E> head; /** * Tail of linked list. 链尾 * Invariant: last.next == null //不变形:链尾元素后再无元素 */ private transient Node<E> last; /** Lock held by take, poll, etc */ //拿锁,在 take, poll等方法时会请求 private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ //队列非空条件,以便通知队列进行取元素 private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ //插入锁,在 put, offer等方法时会请求 private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ //队列非空条件,以便同意队列进行插入元素 private final Condition notFull = putLock.newCondition();

    signalNotEmpty链表非空,然后signal(通知) takeLock进行获取元素

    //仅在put/offer后调用 private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock();//强制拿锁 try { notEmpty.signal();//触发signal } finally { takeLock.unlock(); } }

    signalNotFull链表非满,然后signal(通知) putLock进行插入元素

    //仅在take/poll后调用 private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } }

    enqueue插入元素

    private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); //putLock必须获取当前锁 // assert last.next == null; //队列尾必须为空 //新node指向原队列尾元素的next(链下个对象),然后再指向last(链尾) last = last.next = node; }

    dequeue取元素

    private E dequeue() { // assert takeLock.isHeldByCurrentThread(); //当前线程必须持有takeLock // assert head.item == null; //当前链头元素必须为空 //备份链头 Node<E> h = head; //备份链次元素(实际要取出的元素,定义为first) Node<E> first = h.next; h.next = h; // help GC //原链头h已经无作用 head = first;//first指向head,first成为新链头 E x = first.item;//取出目标元素 first.item = null;//置空(LinkBlockingQueue规范) return x; }

    fullyLock&fullyUnlock全局拿锁和放锁

    void fullyLock() { putLock.lock(); takeLock.lock(); } void fullyUnlock() { takeLock.unlock(); putLock.unlock(); }

    Collection插入到队列

    public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE);//默认无界队列 //获取pulLock,构造函数中pulLock从无竞争,但需要保证可见性 final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { //不支持空对象 if (e == null) throw new NullPointerException(); //不允许超过限度 if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n);//原子更新 } finally { putLock.unlock(); } }

    put放入对象

    public void put(E e) throws InterruptedException { //不支持空对象 if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. //预创建操作标记(-1即无操作) int c = -1; //创建节点 Node<E> node = new Node<E>(e); //备份(副本)putLock和当前总量 final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; //请求putLock锁(可打断) putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ //容量已满则等待 while (count.get() == capacity) { notFull.await(); } //执行过插入至链尾 enqueue(node); //原子自增一位,返回旧值 c = count.getAndIncrement(); //链表还没有满,通知其他线程执行插入 if (c + 1 < capacity) notFull.signal(); } finally { //释放锁 putLock.unlock(); } if (c == 0) // 由于存在放锁和拿锁,这里可能拿锁一直在消费数据,count会变化。这里的if条件表示如果队列中还有1条数据 signalNotEmpty(); // 在拿锁的条件对象notEmpty上唤醒正在等待的1个线程,表示队列里还有1条数据,可以进行消费 }

    offer带时间限制的插入,返回操作结果

    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { //严禁非空对象 if (e == null) throw new NullPointerException(); //获取等待时间 long nanos = unit.toNanos(timeout); //预创建操作标记(-1即无操作) int c = -1; //获取putLock锁和当前链表容量 final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; //可打断的请求锁 putLock.lockInterruptibly(); try { //如果已满载,并且 while (count.get() == capacity) { if (nanos <= 0) return false; //等待nanos(毫秒)秒,期间收到signal则返回(nanos-等待时间),否则在等待结束后返回0或负数 //可打断并返回InterruptedException nanos = notFull.awaitNanos(nanos); } enqueue(new Node<E>(e));//插入元素 c = count.getAndIncrement();//获取最新容量高 if (c + 1 < capacity)//未满载则通知其他线程进行put/offer notFull.signal(); } finally { putLock.unlock(); } //存在takeLock和putlock,takeLock可能在消费,count会变化,c == 0表示队列有一条数据待消费 if (c == 0)//takeLock的条件对象notEmpty上唤醒正在等待的1个线程,表示队列里还有1条数据,可以进行消费 signalNotEmpty(); return true; }

    offer尝试插入(一旦尝试插入则一直等待直至成功)

    public boolean offer(E e) { //元素不能为空 if (e == null) throw new NullPointerException(); //当前链表容量 final AtomicInteger count = this.count; //满的话则返回false if (count.get() == capacity) return false; //预创建操作标记(-1即无操作) int c = -1; Node<E> node = new Node<E>(e); //获取putlock final ReentrantLock putLock = this.putLock; putLock.lock(); try { //未满载 if (count.get() < capacity) { enqueue(node);//插入元素 c = count.getAndIncrement(); //队列未满则继续通知插入 if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0)//存在takeLock和putlock,takeLock可能在消费,count会变化,c == 0表示队列有一条数据待消费 signalNotEmpty();//takeLock的条件对象notEmpty上唤醒正在等待的1个线程,表示队列里还有1条数据,可以进行消费 return c >= 0;//链表有元素则表示插入成功 }

    take取元素(可打断)

    public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { //如果当前链表空,则等待 while (count.get() == 0) { notEmpty.await(); } //取出首元素(first) E x = dequeue(); c = count.getAndDecrement();//链表容量(原子)减一,并返回旧值 //链表非空则继续通知其他线程来取 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock();//放锁 } //如果链满则通知其他putLock等待的线程进行取元素 //意思是,takeLock和putLock同时进行时,putLock一直在放元素,true表示有一条线程在等待插入元素 if (c == capacity) signalNotFull(); return x; }

    poll取元素,有时间

    public E poll(long timeout, TimeUnit unit) throws InterruptedException { E x = null;//定义待取得元素 int c = -1;//操作标记 long nanos = unit.toNanos(timeout); final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly();//可打断请求 try { //如果链表无元素,则执行等待,直至耗时完毕 while (count.get() == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } x = dequeue();//取元素 c = count.getAndDecrement();//获取原来链表长度然后长度减一 if (c > 1)//如果链长度大于1,证明还有链还有元素,通知其他等待的takeLock执行取元素 notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity)//原链长度达至最大长度,即在取完元素后还可以放一个元素,所以执行通知putLock进行放元素 signalNotFull(); return x; }

    poll取元素

    public E poll() { final AtomicInteger count = this.count; if (count.get() == 0)//当前链无元素,直接返回 return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock();//获取锁,否则一直等待 try { if (count.get() > 0) {//获取锁成功,并链有元素 x = dequeue();//取元素 c = count.getAndDecrement();//获取原链表长度然后实际长度减一 if (c > 1)//原链表长度还有元素则通知继续进行通知其他线程取元素 notEmpty.signal(); } } finally { takeLock.unlock(); } //原链表长度达至满,则表示刚取完还可以放一个元素,所以执行通知 if (c == capacity) signalNotFull(); return x; }

    peek取元素,但不拆链

    public E peek() { if (count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { Node<E> first = head.next; if (first == null) return null; else return first.item; } finally { takeLock.unlock(); } }

    unlink 拆链,将trail的下个元素p从链中拆除

    void unlink(Node<E> p, Node<E> trail) { // assert isFullyLocked();//必须获takeLock和putLock,合称fullyLock(); // p.next is not changed, to allow iterators that are // traversing p to maintain their weak-consistency guarantee. p.item = null;//p元素item置null trail.next = p.next;//trail和(p.next)建立链关系 //如果原p就是尾元素,则置trail为尾元素 if (last == p) last = trail; //获取原链长度并减一,原链长度等于限额则表示链未满,则通知进行插入 if (count.getAndDecrement() == capacity) notFull.signal(); }

    remove拆除item所在的链

    public boolean remove(Object o) { //LinkedBlockingQueue允许null的item,除了head和last if (o == null) return false; fullyLock();//全局锁 try { //迭代链表,发现首元素则拆除链 // for (Node<E> trail = head, p = trail.next;p != null;trail = p, p = p.next) { if (o.equals(p.item)) { unlink(p, trail); return true; } } return false; } finally { fullyUnlock(); } }

    contains o是否存在链表中

    public boolean contains(Object o) { if (o == null) return false; fullyLock(); try { //p为空则表示到达链尾,否则原本就是空链 for (Node<E> p = head.next; p != null; p = p.next) if (o.equals(p.item)) return true; return false; } finally { fullyUnlock(); } }

    toArray迭代元素返回数组

    public Object[] toArray() { fullyLock();//获取全局锁 try { int size = count.get(); Object[] a = new Object[size]; int k = 0; for (Node<E> p = head.next; p != null; p = p.next) a[k++] = p.item; return a; } finally { fullyUnlock(); } }

    toArray迭代元素,然后插入数组a

    public <T> T[] toArray(T[] a) { fullyLock();//获取全锁 try { int size = count.get();//当前链表长度 if (a.length < size)//参数数组a长度小于当前链表长度,则进行扩容 a = (T[])java.lang.reflect.Array.newInstance (a.getClass().getComponentType(), size); int k = 0;//迭代下标 //迭代链表并且将item存入数组a for (Node<E> p = head.next; p != null; p = p.next) a[k++] = (T)p.item; //如果参数数组a长度长于链长度,则a下标的元素置空,但(k+1)往后的元素呢?又不置空? if (a.length > k) a[k] = null; return a; } finally { fullyUnlock(); } }

    toString

    public String toString() { fullyLock();//全局锁 try { Node<E> p = head.next;//获取首元素,为空则直接返回[] if (p == null) return "[]"; StringBuilder sb = new StringBuilder(); sb.append('['); for (;;) { E e = p.item;//获取元素,然后组装,如果item为当前链表则返回this Collection sb.append(e == this ? "(this Collection)" : e); p = p.next;//为空则到达链尾 if (p == null) return sb.append(']').toString(); sb.append(',').append(' '); } } finally { fullyUnlock(); } }

    clear清空链

    public void clear() { fullyLock();//全局锁 try { //1,取出实际头元素p,备份原head至h 4,将p定于为新head for (Node<E> p, h = head; (p = h.next) != null; h = p) { h.next = h;//2,原head的next指向原head,造成循坏链,并item为空 p.item = null;//3,实际头元素item置空,help gc } head = last; // assert head.item == null && head.next == null; if (count.getAndSet(0) == capacity) notFull.signal(); } finally { fullyUnlock(); } }

    drainTo转换item至Collection

    public int drainTo(Collection<? super E> c, int maxElements) { //集合不能为空并不能为当前元素 if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) return 0; boolean signalNotFull = false;//是否未满,以通知来插入 final ReentrantLock takeLock = this.takeLock; takeLock.lock();//请求tackLock try { //maxElements和count,取小的一方 int n = Math.min(maxElements, count.get()); // count.get provides visibility to first n Nodes Node<E> h = head; int i = 0; try { //迭代链 while (i < n) { //取出节点head次元素 Node<E> p = h.next; //添加至collections c.add(p.item); //然后置空 p.item = null; //原head自关联 h.next = h; //定义新head h = p; ++i; } return n; } finally { // Restore invariants even if c.add() threw //恢复链原有状态,即使c.add()抛异常 if (i > 0) {//如果已经迭代过元素将迭代中的h作为新head //assert h.item == null; head = h; //如果迭代完链的原长度的为capacity(链容量最大值)则表示此链没有满,通知putlock进行插入 signalNotFull = (count.getAndAdd(-i) == capacity); } } } finally { takeLock.unlock(); if (signalNotFull) signalNotFull(); } }

    Itr迭代器

    private class Itr implements Iterator<E> { /* * Basic weakly-consistent iterator. At all times hold the next * item to hand out so that if hasNext() reports true, we will * still have it to return even if lost race with a take etc. */ private Node<E> current; private Node<E> lastRet; private E currentElement; //构造函数 Itr() { fullyLock(); try {//下个元素 current = head.next; //下个元素item if (current != null) currentElement = current.item; } finally { fullyUnlock(); } } //下个月元素是否为空 public boolean hasNext() { return current != null; } /** * Returns the next live successor of p, or null if no such. * * Unlike other traversal methods, iterators need to handle both: * - dequeued nodes (p.next == p) * - (possibly multiple) interior removed nodes (p.item == null) */ private Node<E> nextNode(Node<E> p) { for (;;) { Node<E> s = p.next; //如果自连链,则证明链头(clear()和drainTo()回导致此情况) if (s == p) return head.next; //否则返回p.next(s != null && s.item == null仅在链头时发生) if (s == null || s.item != null) return s; p = s; } } public E next() { fullyLock(); try { //hasNext已经判断了不为空 if (current == null) throw new NoSuchElementException(); E x = currentElement; lastRet = current; current = nextNode(current); //current == null表示已经达到链尾 currentElement = (current == null) ? null : current.item; return x; } finally { fullyUnlock(); } } public void remove() { //hasNext已经判断了不为空 if (lastRet == null) throw new IllegalStateException(); fullyLock(); try { Node<E> node = lastRet; lastRet = null; for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { //迭代链,找到p然后拆链 if (p == node) { unlink(p, trail); break; } } } finally { fullyUnlock(); } } }
    最新回复(0)