Java并发之ArrayBlockingQueue 源码分析
阻塞队列
定义
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。
支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。
阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里获取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。
在阻塞队列不可用时,这两个附加操作提供了4种处理方式
方法/处理方式抛出异常返回特殊值一直阻塞超时退出
插入方法add(e)offer(e)put(e)offer(e,time,unit)移除方法remove()poll()take()poll(time,unit)检查方法element()peek()不可用不可用
抛出异常:当队列满时,如果再往队列里插入元素,会抛出IllegalStateException异常。当队列空时,从队列里面获取元素会抛出NoSuchElementException异常。
返回特殊值:当往队列插入元素时,会返回元素是否插入成功,成功返回true。如果是移除方法,则是从队列里去除一个元素,如果没有则返回null。
一直阻塞:当阻塞队列满时,如果生产者线程往队列里面put元素,队列会一直阻塞生产者线程,直到队列可用或者响应中断退出。当队列空时,如果消费者线程从队列里take元素,队列会阻塞住消费者线程,直到队列不为空。
超时退出:当阻塞队列满时,如果生产者线程往队列里面插入元素,队列会阻塞生产者线程一段时间,如果超过了指定的时间,生产者线程就退退出。
JDK7提供了7个阻塞队列,如下
ArrayBlockingQueue::一个由数组结构组成的有界队列。LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。DelayQueue:一个使用优先级队列实现的无界阻塞队列。SynchronousQueue:一个不存储元素的阻塞队列。LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
本文选取ArrayBlockingQueue代表性的分析下阻塞队列的源码
ArrayBlockingQueue
ArrayBlockingQueue是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。
默认情况下不保证线程公平的访问队列,所谓公平访问 队列是指阻塞的线程,可以按照阻塞的先后顺序访问队列,即先阻塞线程先访问队列。非公平性是对先等待的线程是非公平的,当队列可用时,阻塞的线程都可以争夺访问队列的资格,有可能先阻塞的线程最后才访问队列。为了保证公平性,通常会降低吞吐量。后面结合源码分析
源码分析
ArrayBlockingQueue 源码简介
队列元素先进先出(FIFO)新元素插入队列的尾部,队列检索操作获取队列头部的元素经典的有界缓冲区,创建时指定固定大小,一旦创建,后面不能更改尝试将元素put进满元素队列将导致阻塞,尝试从空队列take一个元素同样会阻塞
类继承关系图
继承 java.util.AbstractQueue类,该类在 java.util.Queue 接口中扮演着非常重要的作用,该类提供了对queue 操作的骨干实现。继承 java.util.Queue 接口,为阻塞队列的核心接口,提供了在多线程环境下的出列、入列操作。
初始化
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java
.io
.Serializable
{
final Object
[] items
;
int takeIndex
;
int putIndex
;
int count
;
final ReentrantLock lock
;
private final Condition notEmpty
;
private final Condition notFull
;
transient Itrs itrs
= null
;
public ArrayBlockingQueue(int capacity
, boolean fair
) {
if (capacity
<= 0)
throw new IllegalArgumentException();
this.items
= new Object[capacity
];
lock
= new ReentrantLock(fair
);
notEmpty
= lock
.newCondition();
notFull
= lock
.newCondition();
}
}
入队方法
基础添加元素方法enquene(几种不同策略插入最终都会调用的插入核心方法)
private void enqueue(E x
) {
final Object
[] items
= this.items
;
items
[putIndex
] = x
;
if (++putIndex
== items
.length
)
putIndex
= 0;
count
++;
notEmpty
.signal();
}
add方法
public boolean add(E e
) {
return super.add(e
);
}
public boolean add(E e
) {
if (offer(e
))
return true;
else
throw new IllegalStateException("Queue full");
}
offer方法
public boolean offer(E e
) {
checkNotNull(e
);
final ReentrantLock lock
= this.lock
;
lock
.lock();
try {
if (count
== items
.length
)
return false;
else {
enqueue(e
);
return true;
}
} finally {
lock
.unlock();
}
}
put方法
public void put(E e
) throws InterruptedException
{
checkNotNull(e
);
final ReentrantLock lock
= this.lock
;
lock
.lockInterruptibly();
try {
while (count
== items
.length
)
notFull
.await();
enqueue(e
);
} finally {
lock
.unlock();
}
}
超时offer方法
public boolean offer(E e
, long timeout
, TimeUnit unit
)
throws InterruptedException
{
checkNotNull(e
);
long nanos
= unit
.toNanos(timeout
);
final ReentrantLock lock
= this.lock
;
lock
.lockInterruptibly();
try {
while (count
== items
.length
) {
if (nanos
<= 0)
return false;
nanos
= notFull
.awaitNanos(nanos
);
}
enqueue(e
);
return true;
} finally {
lock
.unlock();
}
}
出队方法
基础元素移除核心方法
private E
dequeue() {
final Object
[] items
= this.items
;
@SuppressWarnings("unchecked")
E x
= (E
) items
[takeIndex
];
items
[takeIndex
] = null
;
if (++takeIndex
== items
.length
)
takeIndex
= 0;
count
--;
if (itrs
!= null
)
itrs
.elementDequeued();
notFull
.signal();
return x
;
}
remove方法
public E
remove() {
E x
= poll();
if (x
!= null
)
return x
;
else
throw new NoSuchElementException();
}
poll方法
public E
poll() {
final ReentrantLock lock
= this.lock
;
lock
.lock();
try {
return (count
== 0) ? null
: dequeue();
} finally {
lock
.unlock();
}
}
take方法
public E
take() throws InterruptedException
{
final ReentrantLock lock
= this.lock
;
lock
.lockInterruptibly();
try {
while (count
== 0)
notEmpty
.await();
return dequeue();
} finally {
lock
.unlock();
}
}
poll超时方法
public E
poll(long timeout
, TimeUnit unit
) throws InterruptedException
{
long nanos
= unit
.toNanos(timeout
);
final ReentrantLock lock
= this.lock
;
lock
.lockInterruptibly();
try {
while (count
== 0) {
if (nanos
<= 0)
return null
;
nanos
= notEmpty
.awaitNanos(nanos
);
}
return dequeue();
} finally {
lock
.unlock();
}
}
总结
本文从ArrayBlockingQueue入手分析了下阻塞队列的源码,其他几种队列具体实现方式可能不同,但是基本的操作还是类似的,具体对应的源码读者可自行分析。
参考资料:《并发编程的艺术》