我们今天来介绍一个新概念,阻塞队列。
阻塞队列
当队列中为空时,从队列中获取元素的操作将被阻塞,当队列满时,向队列中添加元素的操作将被阻塞。
ArrayBlockingQueue
ArrayBlockingQueue是一个由数组组成的有界队列。此队列按照先进先出的顺序进行排序。支持公平锁和非公平锁。
ArrayBlockingQueue的继承关系
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java
.io
.Serializable
{}
我们看到,ArrayBlockingQueue实现了BlockingQueue接口,该接口表示阻塞型的队列
ArrayBlockingQueue的部分属性
final Object
[] items
;
int takeIndex
;
int putIndex
;
int count
;
final ReentrantLock lock
;
private final Condition notEmpty
;
private final Condition notFull
;
transient Itrs itrs
= null
;
我们需要注意,取元素和存元素有不同的索引
ArrayBlockingQueue的构造函数
无参构造函数
public ArrayBlockingQueue(int capacity
) {
this(capacity
, false);
}
创建了一个固定容量和默认访问策略的ArrayBlockingQueue
ArrayBlockingQueue(int, boolean)
public ArrayBlockingQueue(int capacity
, boolean fair
) {
if (capacity
<= 0)
throw new IllegalArgumentException();
this.items
= new Object[capacity
];
lock
= new ReentrantLock(fair
);
notEmpty
= lock
.newCondition();
notFull
= lock
.newCondition();
}
创建了一个固定容量和指定访问策略的ArrayBlockingQueue
put操作
public void put(E e
) throws InterruptedException
{
checkNotNull(e
);
final ReentrantLock lock
= this.lock
;
lock
.lockInterruptibly();
try {
while (count
== items
.length
)
notFull
.await();
enqueue(e
);
} finally {
lock
.unlock();
}
}
put函数用于存放元素,在当前线程被中断时会抛出异常,并且当队列已经满时,会阻塞一直等待。我们观察到调用了enqueue来实行了入队列操作,下面我们来看看该函数的源码
private void enqueue(E x
) {
final Object
[] items
= this.items
;
items
[putIndex
] = x
;
if (++putIndex
== items
.length
)
putIndex
= 0;
count
++;
notEmpty
.signal();
}
enqueue会唤醒等待notEmpty条件的线程。
offer操作
public boolean offer(E e
) {
checkNotNull(e
);
final ReentrantLock lock
= this.lock
;
lock
.lock();
try {
if (count
== items
.length
)
return false;
else {
enqueue(e
);
return true;
}
} finally {
lock
.unlock();
}
}
用于存放元素
take操作
public E
take() throws InterruptedException
{
final ReentrantLock lock
= this.lock
;
lock
.lockInterruptibly();
try {
while (count
== 0)
notEmpty
.await();
return dequeue();
} finally {
lock
.unlock();
}
}
与put操作对应,从阻塞队列中获取一个元素。我们观察到调用了dequeue来实行了出队列操作,下面我们来看看该函数的源码
private E
dequeue() {
final Object
[] items
= this.items
;
@SuppressWarnings("unchecked")
E x
= (E
) items
[takeIndex
];
items
[takeIndex
] = null
;
if (++takeIndex
== items
.length
)
takeIndex
= 0;
count
--;
if (itrs
!= null
)
itrs
.elementDequeued();
notFull
.signal();
return x
;
}
poll操作
public E
poll() {
final ReentrantLock lock
= this.lock
;
lock
.lock();
try {
return (count
== 0) ? null
: dequeue();
} finally {
lock
.unlock();
}
}
poll操作与offer对应,用于获取元素。
clear操作
public void clear() {
final Object
[] items
= this.items
;
final ReentrantLock lock
= this.lock
;
lock
.lock();
try {
int k
= count
;
if (k
> 0) {
final int putIndex
= this.putIndex
;
int i
= takeIndex
;
do {
items
[i
] = null
;
if (++i
== items
.length
)
i
= 0;
} while (i
!= putIndex
);
takeIndex
= putIndex
;
count
= 0;
if (itrs
!= null
)
itrs
.queueIsEmpty();
for (; k
> 0 && lock
.hasWaiters(notFull
); k
--)
notFull
.signal();
}
} finally {
lock
.unlock();
}
}
clear操作会清空阻塞队列,并且会释放所有等待notFull条件的线程(存放元素的线程)。
add操作
public boolean add(E e
) {
if (offer(e
))
return true;
else
throw new IllegalStateException("Queue full");
}
remove操作
public E
remove() {
E x
= poll();
if (x
!= null
)
return x
;
else
throw new NoSuchElementException();
}
参考:【JUC】JDK1.8源码分析之ArrayBlockingQueue(三) 阻塞队列和ArrayBlockingQueue源码解析(JDK1.8)