Java并发编程之ArrayBlockingQueue

    xiaoxiao2025-02-25  41

    我们今天来介绍一个新概念,阻塞队列。

    阻塞队列

    当队列中为空时,从队列中获取元素的操作将被阻塞,当队列满时,向队列中添加元素的操作将被阻塞。

    ArrayBlockingQueue

    ArrayBlockingQueue是一个由数组组成的有界队列。此队列按照先进先出的顺序进行排序。支持公平锁和非公平锁。

    ArrayBlockingQueue的继承关系

    public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {}

    我们看到,ArrayBlockingQueue实现了BlockingQueue接口,该接口表示阻塞型的队列

    ArrayBlockingQueue的部分属性

    // 存放实际元素的数组 final Object[] items; // 取元素索引 int takeIndex; // 获取元素索引 int putIndex; // 队列中的项 int count; // 可重入锁 final ReentrantLock lock; // 等待获取条件 private final Condition notEmpty; // 等待存放条件 private final Condition notFull; // 迭代器 transient Itrs itrs = null;

    我们需要注意,取元素和存元素有不同的索引

    ArrayBlockingQueue的构造函数

    无参构造函数
    public ArrayBlockingQueue(int capacity) { this(capacity, false); }

    创建了一个固定容量和默认访问策略的ArrayBlockingQueue

    ArrayBlockingQueue(int, boolean)
    public ArrayBlockingQueue(int capacity, boolean fair) { // 初始容量必须大于0 if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); // 初始化等待条件 notEmpty = lock.newCondition(); notFull = lock.newCondition(); }

    创建了一个固定容量和指定访问策略的ArrayBlockingQueue

    put操作

    public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; // 如果当前线程未被中断,则获取锁 lock.lockInterruptibly(); try { while (count == items.length) // 判断元素是否已满 // 若满,则等待 notFull.await(); // 入队列 enqueue(e); } finally { lock.unlock(); } }

    put函数用于存放元素,在当前线程被中断时会抛出异常,并且当队列已经满时,会阻塞一直等待。我们观察到调用了enqueue来实行了入队列操作,下面我们来看看该函数的源码

    private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) // 放入后存元素的索引等于数组长度(表示已满) // 重置存索引为0 putIndex = 0; // 元素数量加1 count++; // 唤醒在notEmpty条件上等待的线程 notEmpty.signal(); }

    enqueue会唤醒等待notEmpty条件的线程。

    offer操作

    public boolean offer(E e) { // 检查元素不能为空 checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) // 元素个数等于数组长度,则返回 return false; else { // 添加进数组 enqueue(e); return true; } } finally { lock.unlock(); } }

    用于存放元素

    take操作

    public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // 如果当前线程未被中断,则获取锁,中断会抛出异常 lock.lockInterruptibly(); try { while (count == 0) // 元素数量为0,即Object数组为空 // 则等待notEmpty条件 notEmpty.await(); // 出队列 return dequeue(); } finally { // 释放锁 lock.unlock(); } }

    与put操作对应,从阻塞队列中获取一个元素。我们观察到调用了dequeue来实行了出队列操作,下面我们来看看该函数的源码

    private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; // 该索引的值赋值为null items[takeIndex] = null; // 取值索引等于数组长度 if (++takeIndex == items.length) // 重新赋值取值索引 takeIndex = 0; // 元素个数减1 count--; if (itrs != null) itrs.elementDequeued(); // 唤醒在notFull条件上等待的线程 notFull.signal(); return x; }

    poll操作

    public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { // 若元素个数为0则返回null,否则,调用dequeue,出队列 return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } }

    poll操作与offer对应,用于获取元素。

    clear操作

    public void clear() { final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { // 保存元素个数 int k = count; if (k > 0) { // 元素个数大于0 final int putIndex = this.putIndex; int i = takeIndex; do { // 赋值为null items[i] = null; if (++i == items.length) // 重新赋值i i = 0; } while (i != putIndex); // 重新赋值取元素索引 takeIndex = putIndex; // 元素个数为0 count = 0; if (itrs != null) itrs.queueIsEmpty(); for (; k > 0 && lock.hasWaiters(notFull); k--) // 若有等待notFull条件的线程,则逐一唤醒 notFull.signal(); } } finally { lock.unlock(); } }

    clear操作会清空阻塞队列,并且会释放所有等待notFull条件的线程(存放元素的线程)。

    add操作

    public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }

    remove操作

    public E remove() { E x = poll(); if (x != null) return x; else throw new NoSuchElementException(); }

    参考:【JUC】JDK1.8源码分析之ArrayBlockingQueue(三) 阻塞队列和ArrayBlockingQueue源码解析(JDK1.8)

    最新回复(0)