Categories
程式開發

基於數組的有界阻塞隊列—— ArrayBlockingQueue


前言在閱讀完和AQS 相關的鎖以及同步輔助器之後,來一起閱讀JUC 下的和隊列相關的源碼。先從第一個開始:ArrayBlockingQueue。公眾號:liuzhihangs,記錄工作學習中的技術、開發及源碼筆記;時不時分享一些生活中的見聞感悟。歡迎大佬來指導!

介紹

由數組支持的有界BlockingQueue阻塞隊列。

這個隊列的命令元素FIFO(先入先出)。隊列的頭是元素一直在隊列中時間最長。隊列的尾部是該元素已經在隊列中的時間最短。新元素插入到隊列的尾部,並且隊列檢索操作獲取在隊列的頭部元素。

這是一個典型的“有界緩衝區”,在其中一個固定大小的數組保持由生產者插入並受到消費者的提取的元素。一旦創建,容量不能改變。試圖put 一個元素到一個滿的隊列將導致操作阻塞; 試圖take 從空隊列一個元素將類似地阻塞。

此類支持訂購等待生產者和消費者線程可選的公平政策。默認情況下,這個順序不能保證。然而,隊列公平設置為構建true 保證線程以FIFO的順序進行訪問。公平性通常會降低吞吐量,但減少了可變性和避免飢餓。

基本使用


public class ArrayBlockingQueueTest {

private static final ArrayBlockingQueue QUEUE = new ArrayBlockingQueue(10);

private static final CountDownLatch LATCH = new CountDownLatch(2);

public static void main(String[] args) {

ExecutorService pool = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue(1024),
new ThreadFactoryBuilder().setNameFormat("Thread-pool-%d").build(),
new ThreadPoolExecutor.AbortPolicy());

pool.submit(() -> {
for (int i = 0; i {

for (int i = 0; i < 100; i++) { try { Thread.sleep(500L); String take = QUEUE.take(); System.out.println("take = " + take); } catch (InterruptedException ignored) { } } LATCH.countDown(); }); try { LATCH.await(); } catch (InterruptedException ignored) { } pool.shutdown(); } }

demo 只是臨時寫的一個,很簡單的版本。

問題疑問

ArrayBlockingQueue 的實現原理是什麼?入隊列和出隊列方法之間的區別是什麼?

源碼分析

基本結構

基於數組的有界阻塞隊列—— ArrayBlockingQueue 1

參數介紹


/** 数组 - 存储队列中的元素 */
final Object[] items;

/** 下一个 take, poll, peek or remove 的索引 */
int takeIndex;

/** 下一个 put, offer, or add 的索引 */
int putIndex;

/** 队列中的元素数 */
int count;

/** Main lock guarding all access */
final ReentrantLock lock;

/** take 操作时是否等待 */
private final Condition notEmpty;

/** put 操作时是否等待 */
private final Condition notFull;

構造函數

public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}

// 指定容量,及是否公平
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } // 初始化的时候放入元素 public ArrayBlockingQueue(int capacity, boolean fair, Collection c) {
this(capacity, fair);

final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}

添加元素

public boolean add(E e) {
return super.add(e);
}

// 父类的方法,其实调用的也是 offer
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
// 使用锁
public boolean offer(E e) {
checkNotNull(e);
// 加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
// 放入元素, 如果队列满了,则等待
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}

add 方法:調用的是父類AbstractQueue 的add 方法,內部調用的是offer 方法,如果offer 返回false,則拋出異常。 offer 方法:校驗元素非空,加互斥鎖,如果隊列滿了,則返回false,如果隊列未滿,則調用enqueue 方法,添加元素。 put 方法:校驗元素非空,加互斥鎖,如果隊列滿了,則一直自旋等待,隊列未滿則調用enqueue 方法,添加元素。

所以下面還是需要看一下enqueue 方法:

// 只有在获取锁的时候才可以调用
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
// putIndex 下一个 put, offer, or add 的索引
// 对其进行赋值,然后进行 ++putIndex 操作
items[putIndex] = x;
// 如果等于长度,则指定为开始
if (++putIndex == items.length)
putIndex = 0;
// 对元素数进行 ++
count++;
// 有元素入队列,唤醒在等待获取元素的线程
notEmpty.signal();
}

獲取元素

public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}

通過源碼可以看出:

pool 和take 都是從隊列中獲取元素,二者不同的是,當隊列中沒有元素時,poll 方法返回null,而take 方法會一直阻塞等待,直到從隊列中獲取到元素。 poll 和take 方法獲取元素都是調用的dequeue 方法。

private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
// 获取元素并将元素置为 null
E x = (E) items[takeIndex];
items[takeIndex] = null;
// takeIndex 下一个 take, poll, peek or remove 的索引
// 指向下一个元素,并且 元素数减少
if (++takeIndex == items.length)
takeIndex = 0;
count--;
// 更新迭代器状态
if (itrs != null)
itrs.elementDequeued();
// 唤醒等待放入元素的线程
notFull.signal();
return x;
}

查看元素

public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}

總結

問答環節

Q: ArrayBlockingQueue 的實現原理?

A: ArrayBlockingQueue 是基於數組實現的,內部使用ReentrantLock 互斥鎖,防止並發放置元素或者取出元素的衝突問題。

Q: 入隊列和出隊列方法之間的區別是什麼?

基於數組的有界阻塞隊列—— ArrayBlockingQueue 2

結束語

ArrayBlockingQueue 中使用了ReentrantLock 互斥鎖,在元素入隊列和出隊列的時候都進行了加鎖,所以同時只會有一個線程進行入隊列或者出隊列,從而保證線程安全。