LinkedBlockingQueue,Juc中提供的线程安全的阻塞队列,是一个基于链表的数据结构,
LinkedBlockingQueue维护一个Node数组:
static class Node<E> { E item; /** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ Node<E> next; Node(E x) { item = x; } }维护头尾两个指针:head和last,注意,head只作为一个头指针,不存储数据,即:head.item == null。再次也就是说,head指向的是一个虚拟节点。
维护2把锁,2个条件变量:
put锁,对应notFull条件变量。take锁,对应notEmpty条件变量。LinkedBlockingQueue是基于FIFO的队列,head元素(准确的说是head指针指向的元素)是队列中存活时间最久的元素,插入的元素一个个插入到尾部,而取元素则从head指针开始。
如果让我们自己实现一个阻塞队列,可能大多数人会使用一把锁控制,实现起来相对简单很多,就是一个很简单的生产者-消费者队列,然而LinkedBlockingQueue的实现基于了一种叫做:A Two-Lock Concurrent Queue的算法,对应论文地址是http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf。
假如使用一把锁,那么enqueue和dequeue操作都会互相阻塞,使用两把锁,enqueue/dequeue各自使用自己的锁,性能显然会好很多。当然,不可避免的是需要一些额外的保证,譬如,count字段,如果同时进行入队/出队操作,那么count字段的数据一致性如何保证:使用原子类AtomicInteger即可。
现在开始看入队出队的代码,上文提到,维护2个指针:head和last,是在哪里被初始化的呢:
public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }构造函数中,就将head和last指向一个空的节点。 对应的入队代码是:
/** * Links node at end of queue. * * @param node the node */ private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; }注意,入队只操作last节点,将原来的last的后置节点指向新入队的节点,然后将该新节点赋值为last。 对应的出队代码:
/** * Removes a node from head of queue. * * @return the node */ private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; //h指向head Node<E> h = head; //first指向head的第一个节点,该节点就是要 //出队的节点 Node<E> first = h.next; //这里,将原来的head节点的next指向了自己 //如同注释所说,帮助gc更快的回收自己 h.next = h; // help GC //接着,要出队的这个节点,变成了新的head head = first; //辅助,返回出队节点的item E x = first.item; //该节点item置空,此时他已经成为了新的head节点 first.item = null; return x; }只操作head节点。过程注释已经写了。 所以,如果有2个线程,同时进行入队出队操作,是不会相互影响的,这也是我们能够使用2把锁的保证。
接下来,再抽取2个方法讲一下流程,先讲take方法: 该方法取出队列的第一个元素,如果没有,则阻塞,直到有,代码:
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock;//拿到take lock takeLock.lockInterruptibly();//上锁 try { //注意,使用while,而不是if //确保被唤醒之后再进行判断,而不是立马进行取元素 //防止过程中被别的线程取走 while (count.get() == 0) { notEmpty.await();//如果没有元素,wait } //确保有元素之后,进行出队操作 x = dequeue(); //count-- c = count.getAndDecrement(); //如果还有元素,接着通知别的take线程 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } //如果,发生take的时候,已经满了,注意这个变量c是count.getAndDecrement();得到的 //那么发出notfull的通知,我刚刚消费了一个元素 //生产者可以过来接着生产了 if (c == capacity) signalNotFull(); return x; }注意一点,notEmpty.signal();是在该出队操作发现,我拿走了一个,还有其他多余的元素可以接着被take,此时调用:notEmpty.signal();通知更多的消费者,那么,这个通知的源头是在哪里?假如说现在有100个消费者被卡着了,谁负责通知,类似于take方法的最后一个逻辑,看offer方法:
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) return false; int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } //假如,原来一个元素都没有,那么此时通知 //消费者,我刚放进去了一个元素,你可以来取了 //注意,这个c是count.getAndIncrement();,别搞混了 if (c == 0) signalNotEmpty(); return c >= 0; }举个例子来讲,假如,初始化一个队列,此时,线程1,2先后执行take,线程1抢到了take锁,由于队列没有元素,将自己挂起,放入notEmpty队列,接着线程2也抢到了take锁,同样将自己挂起,放入notEmpty队列,此时来了一个线程3,放入一个元素,触发:
if (c == 0) signalNotEmpty();该逻辑,此时,线程1被唤醒(因为线程1等待时间>线程2,notEmpty的条件队列里面的阻塞时间最长的线程被唤醒的优先级最高),拿到对应元素。 注意到一点,源码使用的都是signal而不是signalAll,唤醒了一个生产者/消费者之后,再由对应的生产者/消费者去判断是否需要去唤醒其他的生产者/消费者,避免了锁竞争,对应的代码就是:
if (c + 1 < capacity) notFull.signal(); if (c > 1) notEmpty.signal();ArrayBlockingQueue,基于数组,但是,这是定长的,所以不存在数组扩容,复制元素这一套。 和LinkedBlockingQueue最大的不同就是其使用一把锁来控制并发,吞吐量相对来说低于LinkedBlockingQueue.不过正如JDK注释所说:
Linked queues typically have higher throughput than array-based queues but less predictable performance in most concurrent applications.在大多数并发程序中,性能的可预测性较差。 和一般的生产者消费者一样,ArrayBlockingQueue使用2个condition变量notEmpty¬Full:
/** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull;put方法:
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly();//上锁 try { while (count == items.length) notFull.await();//满了则等待 enqueue(e);//入队列 } finally { lock.unlock(); } }take:
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) //空了则等待 notEmpty.await(); return dequeue();//出队列 } finally { lock.unlock(); } }其实和LinkedBlockingQueue很像,核心就是在加锁的前提下,进行入队出队操作,ArrayBlockingQueue维护着读写2根指针:putIndex和takeIndex,看入队操作:
private void enqueue(E x) { final Object[] items = this.items; //写指针位置赋值 items[putIndex] = x; //写指针+1,如果满了,那么写指针需要回到数组头 if (++putIndex == items.length) putIndex = 0; count++; //唤醒等待队列 notEmpty.signal(); }注意,ArrayBlockingQueue同样是FIFO的,当数组满了,写指针需要回到数组头,从head开始写入。 看出队操作:
private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") //取出元素 E x = (E) items[takeIndex]; items[takeIndex] = null; //读指针到头了,同样回到数组头开始取 if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); //唤醒等待队列 notFull.signal(); return x; }LinkedBlockingQueue和ArrayBlockingQueue其实还是挺像的,用锁控制并发性,核心就是出对入队两个操作,不同的就是锁算法以及底层用于存储元素的数据结构。当然,还有一点就是后者必须要求传入一个初始容量,这是其底层设计所要求的。
