Java并发学习之ArrayBlockingQueue 源码分析

    xiaoxiao2025-04-29  10

    Java并发之ArrayBlockingQueue 源码分析

    阻塞队列

    定义

    阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。

    支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。

    阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里获取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。

    在阻塞队列不可用时,这两个附加操作提供了4种处理方式

    方法/处理方式抛出异常返回特殊值一直阻塞超时退出插入方法add(e)offer(e)put(e)offer(e,time,unit)移除方法remove()poll()take()poll(time,unit)检查方法element()peek()不可用不可用

    抛出异常:当队列满时,如果再往队列里插入元素,会抛出IllegalStateException异常。当队列空时,从队列里面获取元素会抛出NoSuchElementException异常。

    返回特殊值:当往队列插入元素时,会返回元素是否插入成功,成功返回true。如果是移除方法,则是从队列里去除一个元素,如果没有则返回null。

    一直阻塞:当阻塞队列满时,如果生产者线程往队列里面put元素,队列会一直阻塞生产者线程,直到队列可用或者响应中断退出。当队列空时,如果消费者线程从队列里take元素,队列会阻塞住消费者线程,直到队列不为空。

    超时退出:当阻塞队列满时,如果生产者线程往队列里面插入元素,队列会阻塞生产者线程一段时间,如果超过了指定的时间,生产者线程就退退出。

    JDK7提供了7个阻塞队列,如下

    ArrayBlockingQueue::一个由数组结构组成的有界队列。LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。DelayQueue:一个使用优先级队列实现的无界阻塞队列。SynchronousQueue:一个不存储元素的阻塞队列。LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

    本文选取ArrayBlockingQueue代表性的分析下阻塞队列的源码

    ArrayBlockingQueue

    ArrayBlockingQueue是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。

    默认情况下不保证线程公平的访问队列,所谓公平访问 队列是指阻塞的线程,可以按照阻塞的先后顺序访问队列,即先阻塞线程先访问队列。非公平性是对先等待的线程是非公平的,当队列可用时,阻塞的线程都可以争夺访问队列的资格,有可能先阻塞的线程最后才访问队列。为了保证公平性,通常会降低吞吐量。后面结合源码分析

    源码分析

    ArrayBlockingQueue 源码简介

    队列元素先进先出(FIFO)新元素插入队列的尾部,队列检索操作获取队列头部的元素经典的有界缓冲区,创建时指定固定大小,一旦创建,后面不能更改尝试将元素put进满元素队列将导致阻塞,尝试从空队列take一个元素同样会阻塞

    类继承关系图

    继承 java.util.AbstractQueue类,该类在 java.util.Queue 接口中扮演着非常重要的作用,该类提供了对queue 操作的骨干实现。继承 java.util.Queue 接口,为阻塞队列的核心接口,提供了在多线程环境下的出列、入列操作。

    初始化

    public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** 队列元素存放数组 */ final Object[] items; /** 元素下一个待取出索引 */ int takeIndex; /** 元素下一个待添加索引 */ int putIndex; /** 队列元素的数量 */ int count; /** 主锁对象 */ final ReentrantLock lock; /** 非空 condition */ private final Condition notEmpty; /** 不满 condition */ private final Condition notFull; /** 迭代器共享状态 */ transient Itrs itrs = null; /** 真正初始化方法*/ public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; //线程是否公平竞争本质是利用ReentrantLock实现,每次队列操作都需要先获得锁,如果配置了公平竞争重入锁,先到的线程先获得锁,和公平锁一样的开销,存在线程上下文切换成本。 lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } }

    入队方法

    基础添加元素方法enquene(几种不同策略插入最终都会调用的插入核心方法)

    private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; //往对象数组中添加元素 final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; // 通知阻塞在出列的线程 notEmpty.signal(); }

    add方法

    //通过add方法插入元素 public boolean add(E e) { return super.add(e); } //插入成功返回true,否则抛出异常 public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }

    offer方法

    public boolean offer(E e) { checkNotNull(e); //插入元素时,先获得锁 final ReentrantLock lock = this.lock; lock.lock(); try { //插入成功返回true,失败返回false if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); } }

    put方法

    public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //插入时,如果队列已满,则线程阻塞至notFull不满通知时,再次循环执行入队操作 while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }

    超时offer方法

    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) { //超时阻塞通知,如果在超时时间内,notFull还没有被通知(即队列有元素被移出队列),将会返回false if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } enqueue(e); return true; } finally { lock.unlock(); } }

    出队方法

    基础元素移除核心方法

    private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); // 通知阻塞在入列的线程 notFull.signal(); return x; }

    remove方法

    //队列存在元素时,返回元素,否则抛出异常 public E remove() { E x = poll(); if (x != null) return x; else throw new NoSuchElementException(); }

    poll方法

    //队列存在元素时返回元素,否则返回null public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } }

    take方法

    public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //如果队列没有元素,阻塞至notEmpty非空通知,再次循环获取元素返回 while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }

    poll超时方法

    public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //如果队列没有元素,超时阻塞至notEmpty非空通知,如果达到超时时间还没有被通知,返回null while (count == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } return dequeue(); } finally { lock.unlock(); } }

    总结

    本文从ArrayBlockingQueue入手分析了下阻塞队列的源码,其他几种队列具体实现方式可能不同,但是基本的操作还是类似的,具体对应的源码读者可自行分析。

    参考资料:《并发编程的艺术》

    最新回复(0)