Java中如何高效使用ArrayBlockingQueue实现并发控制?

2026-05-23 20:541阅读0评论SEO问题
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计3068个文字,预计阅读时间需要13分钟。

Java中如何高效使用ArrayBlockingQueue实现并发控制?

一、阻塞队列 BlockingQueue 简介

在 Java.util.concurrent 包中,BlockingQueue 是一个线程安全的队列实现,它非常适合解决多线程环境中高效且安全地传递数据的问题。BlockingQueue 通过内置的锁机制,确保了在多线程环境中对队列的操作是线程安全的。

二、高效安全地传递数据

BlockingQueue 通过以下特性高效且安全地处理数据传递:

1. 线程安全:内部机制确保了队列操作的原子性,避免了并发访问时的数据竞争问题。

2.阻塞操作:当队列满时,生产者线程会自动等待,直到队列中有空间;当队列为空时,消费者线程会自动等待,直到队列中有元素。

3.非阻塞操作:提供了方法如 `offer()` 和 `poll()`,允许线程在队列满或空时立即返回,而不必等待。

三、构建高质量系统

通过使用 BlockingQueue,我们可以快速构建高质量的系统,具有以下优点:

1. 简化编程:无需手动处理锁和同步,降低了编程复杂度。

2.高性能:利用了线程池和阻塞队列的优化,提高了系统的吞吐量。

3.高可靠性:线程安全的队列确保了系统的稳定运行。

以下是一些常用的 BlockingQueue 实现:

- `ArrayBlockingQueue`:基于数组的阻塞队列,有固定大小。

- `LinkedBlockingQueue`:基于链表的阻塞队列,有默认大小和可指定大小。- `PriorityBlockingQueue`:基于优先级的阻塞队列。- `SynchronousQueue`:没有容量,每个插入操作必须等待有消费者取出元素,反之亦然。

通过合理选择和配置 BlockingQueue,我们可以构建出高性能、高可靠性的多线程系统。

一、阻塞队列 BlockingQueue

在java.util.concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。

1.1、BlockingQueue的基本原理

先来解释一下阻塞队列:

如上图:

  • 1、生产线程1往阻塞队列里面添加新的数据,当阻塞队列满的时候(针对有界队列),生产线程1将会处于阻塞状态,直到消费线程2从队列中取走一个数据;
  • 2、消费线程2从阻塞队列取数据,当阻塞队列空的时候,消费线程2将会处于阻塞状态,直到生产线程把一个数据放进去。

阻塞队列的基本原理就这样,至于队列是用什么数据结构进行存储的,这里并没有规定,所以后面我们可以看到很多阻塞队列的实现。

阻塞队列的常用方法

查阅BlockingQueue总结了以下阻塞队列的方法:

1、boolean add(E e)

  • 在不违反容量限制的情况下,可立即将指定元素插入此队列,成功返回true,当无可用空间时候,返回IllegalStateException异常。

2、boolean offer(E e)

  • 在不违反容量限制的情况下,可立即将指定元素插入此队列,成功返回true,当无可用空间时候,返回false。

3、void put(E e)

  • 直接在队列中插入元素,当无可用空间时候,阻塞等待。

4、boolean offer(E e, long timeout, TimeUnit unit)

  • 将给定元素在给定的时间内设置到队列中,如果设置成功返回true, 否则返回false。

5、E take()

  • 获取并移除队列头部的元素,无元素时候阻塞等待。

6、E poll( long time, timeunit unit)

  • 获取并移除队列头部的元素,无元素时候阻塞等待指定时间。

7、boolean remove()

  • 获取并移除队列头部的元素,无元素时候会抛出NoSuchElementException异常。

8、E element()

  • 不移除的情况下返回列头部的元素,无元素时候会抛出NoSuchElementException异常。

9、E peek()

  • 不移除的情况下返回列头部的元素,队列为空无元素时返回null。

注意:

根据remove(Object o)方法签名可知,这个方法可以移除队列的特定对象,但是这个方法效率并不高。因为需要遍历队列匹配到特定的对象之后,再进行移除。 以上支持阻塞和超时的方法都是能够响应中断的。

1.2、BlockingQueue的实现

BlockingQueue底层也是基于AQS实现的,队列的阻塞使用ReentrantLock的Condition实现的。

Java中如何高效使用ArrayBlockingQueue实现并发控制?

 

下面我们来看看各个实现类的原理。以下分析我都会基于支持阻塞的put和take方法来分析。

二、ArrayBlockingQueue

ArrayBlockingQueue使用的数据结构是数组

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** 队列中元素保存的地方 */ final Object[] items; /** 取元素的指针 记录下一次操作的位置 */ int takeIndex; /** 放元素的指针 记录下一次操作的位置 */ int putIndex; /** 元素数量 */ int count; /** 保证并发访问的锁 */ final ReentrantLock lock; /** 等待出队的条件 消费者监视器 */ private final Condition notEmpty; /** 等待入队的条件 生产者监视器 */ private final Condition notFull; }

构造函数

  • 容量大小有构造函数的capacity参数决定。
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { //必须传入容量,可以控制重入锁是公平还是非公平 public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); // 初始化数组 this.items = new Object[capacity]; // 创建重入锁及两个条件 lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); //final修饰的变量不会发生指令重排 final ReentrantLock lock = this.lock; lock.lock(); // 保证可见性 不是为了互斥 防止指令重排 保证item的安全 try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } } }

2.1、入队

2.1.1、add(E e)方法

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public boolean add(E e) { return super.add(e); } } public abstract class AbstractQueue<E> extends AbstractCollection<E> implements Queue<E> { // AbstractQueue 调用offer(e)如果成功返回true,如果失败抛出异常 public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); } }

2.1.2、offer(E e)方法

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public boolean offer(E e) { // 元素不可为空 checkNotNull(e); final ReentrantLock lock = this.lock; // 加锁 lock.lock(); try { if (count == items.length) // 如果数组满了就返回false return false; else { // 如果数组没满就调用入队方法并返回true enqueue(e); return true; } } finally { //释放锁 lock.unlock(); } } }

2.1.3、put(E e) 方法

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public void put(E e) throws InterruptedException { checkNotNull(e); // 获取ReentrantLock锁 final ReentrantLock lock = this.lock; // 加锁,如果线程中断了抛出异常 lock.lockInterruptibly(); try { // 如果队列满了,则进入条件队列进行等待 while (count == items.length) notFull.await(); // 队列不满,或者被取数线程唤醒了,那么会继续执行 // 这里会往阻塞队列添加一个数据,然后唤醒等待时间最长的取数线程 enqueue(e); } finally { // 释放ReentrantLock锁 lock.unlock(); } } }

2.1.4、offer(E e, long timeout, TimeUnit unit)方法

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0) return false; // 如果数组满了,就阻塞nanos纳秒,如果唤醒这个线程时依然没有空间且时间到了就返回false nanos = notFull.awaitNanos(nanos); } //入队 enqueue(e); return true; } finally { lock.unlock(); } } private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; // 把元素直接放在放指针的位置上 items[putIndex] = x; // 如果放指针到数组尽头了,就返回头部 if (++putIndex == items.length) putIndex = 0; // 数量加1 count++; // 唤醒notEmpty,因为入队了一个元素,所以肯定不为空了 notEmpty.signal(); } }
  • add(e)时如果队列满了则抛出异常;
  • offer(e)时如果队列满了则返回false;
  • put(e)时如果队列满了则使用notFull等待;
  • offer(e, timeout, unit)时如果队列满了则等待一段时间后如果队列依然满就返回false;
  • 利用放指针循环使用数组来存储元素;

2.2、出队

2.2.1、 remove()方法

public abstract class AbstractQueue<E> extends AbstractCollection<E> implements Queue<E> { public E remove() // 调用poll()方法出队 E x = poll(); if (x != null) // 如果有元素出队就返回这个元素 return x; else // 如果没有元素出队就抛出异常 throw new NoSuchElementException(); } }

2.2.2、 poll()方法

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { //如果队列里没有数据就直接返回null //否则从队列头部出队 return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } } }

2.2.3、 poll(long timeout, TimeUnit unit)方法

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 如果队列无元素,则阻塞等待nanos纳秒 // 如果下一次这个线程获得了锁但队列依然无元素且已超时就返回null while (count == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } return dequeue(); } finally { lock.unlock(); } } }

2.2.4、 take()方法

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public E take() throws InterruptedException { final ReentrantLock lock = this.lock; //加锁,如果线程中断了抛出异常 lock.lockInterruptibly(); try { //队列中不存元素 while (count == 0) /* * 一直等待条件notEmpty,即被其他线程唤醒 * (唤醒其实就是,有线程将一个元素入队了,然后调用notEmpty.signal() * 唤醒其他等待这个条件的线程,同时队列也不空了) */ notEmpty.await(); //否则出队 return dequeue(); } finally { lock.unlock(); } } private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") // 取取指针位置的元素 E x = (E) items[takeIndex]; // 把取指针位置设为null items[takeIndex] = null; // 取指针前移,如果数组到头了就返回数组前端循环利用 if (++takeIndex == items.length) takeIndex = 0; // 元素数量减1 count--; if (itrs != null) itrs.elementDequeued(); // 唤醒notFull条件 notFull.signal(); return x; } }
  • remove()时如果队列为空则抛出异常;
  • poll()时如果队列为空则返回null;
  • take()时如果队列为空则阻塞等待在条件notEmpty上;
  • poll(timeout, unit)时如果队列为空则阻塞等待一段时间后如果还为空就返回null;
  • 利用取指针循环从数组中取元素;

如下图,以put和take方法为例:

这里put和take使用了同一个ReentrantLock,不能并发执行。

2.3、缺点

  • a、队列长度固定且必须在初始化时指定,所以使用之前一定要慎重考虑好容量;

  • b、如果消费速度跟不上入队速度,则会导致提供者线程一直阻塞,且越阻塞越多,非常危险;

  • c、只使用了一个锁来控制入队出队,效率较低。

参考: www.itzhai.com/articles/graphical-blocking-queue.html

zhuanlan.zhihu.com/p/224946304

本文共计3068个文字,预计阅读时间需要13分钟。

Java中如何高效使用ArrayBlockingQueue实现并发控制?

一、阻塞队列 BlockingQueue 简介

在 Java.util.concurrent 包中,BlockingQueue 是一个线程安全的队列实现,它非常适合解决多线程环境中高效且安全地传递数据的问题。BlockingQueue 通过内置的锁机制,确保了在多线程环境中对队列的操作是线程安全的。

二、高效安全地传递数据

BlockingQueue 通过以下特性高效且安全地处理数据传递:

1. 线程安全:内部机制确保了队列操作的原子性,避免了并发访问时的数据竞争问题。

2.阻塞操作:当队列满时,生产者线程会自动等待,直到队列中有空间;当队列为空时,消费者线程会自动等待,直到队列中有元素。

3.非阻塞操作:提供了方法如 `offer()` 和 `poll()`,允许线程在队列满或空时立即返回,而不必等待。

三、构建高质量系统

通过使用 BlockingQueue,我们可以快速构建高质量的系统,具有以下优点:

1. 简化编程:无需手动处理锁和同步,降低了编程复杂度。

2.高性能:利用了线程池和阻塞队列的优化,提高了系统的吞吐量。

3.高可靠性:线程安全的队列确保了系统的稳定运行。

以下是一些常用的 BlockingQueue 实现:

- `ArrayBlockingQueue`:基于数组的阻塞队列,有固定大小。

- `LinkedBlockingQueue`:基于链表的阻塞队列,有默认大小和可指定大小。- `PriorityBlockingQueue`:基于优先级的阻塞队列。- `SynchronousQueue`:没有容量,每个插入操作必须等待有消费者取出元素,反之亦然。

通过合理选择和配置 BlockingQueue,我们可以构建出高性能、高可靠性的多线程系统。

一、阻塞队列 BlockingQueue

在java.util.concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。

1.1、BlockingQueue的基本原理

先来解释一下阻塞队列:

如上图:

  • 1、生产线程1往阻塞队列里面添加新的数据,当阻塞队列满的时候(针对有界队列),生产线程1将会处于阻塞状态,直到消费线程2从队列中取走一个数据;
  • 2、消费线程2从阻塞队列取数据,当阻塞队列空的时候,消费线程2将会处于阻塞状态,直到生产线程把一个数据放进去。

阻塞队列的基本原理就这样,至于队列是用什么数据结构进行存储的,这里并没有规定,所以后面我们可以看到很多阻塞队列的实现。

阻塞队列的常用方法

查阅BlockingQueue总结了以下阻塞队列的方法:

1、boolean add(E e)

  • 在不违反容量限制的情况下,可立即将指定元素插入此队列,成功返回true,当无可用空间时候,返回IllegalStateException异常。

2、boolean offer(E e)

  • 在不违反容量限制的情况下,可立即将指定元素插入此队列,成功返回true,当无可用空间时候,返回false。

3、void put(E e)

  • 直接在队列中插入元素,当无可用空间时候,阻塞等待。

4、boolean offer(E e, long timeout, TimeUnit unit)

  • 将给定元素在给定的时间内设置到队列中,如果设置成功返回true, 否则返回false。

5、E take()

  • 获取并移除队列头部的元素,无元素时候阻塞等待。

6、E poll( long time, timeunit unit)

  • 获取并移除队列头部的元素,无元素时候阻塞等待指定时间。

7、boolean remove()

  • 获取并移除队列头部的元素,无元素时候会抛出NoSuchElementException异常。

8、E element()

  • 不移除的情况下返回列头部的元素,无元素时候会抛出NoSuchElementException异常。

9、E peek()

  • 不移除的情况下返回列头部的元素,队列为空无元素时返回null。

注意:

根据remove(Object o)方法签名可知,这个方法可以移除队列的特定对象,但是这个方法效率并不高。因为需要遍历队列匹配到特定的对象之后,再进行移除。 以上支持阻塞和超时的方法都是能够响应中断的。

1.2、BlockingQueue的实现

BlockingQueue底层也是基于AQS实现的,队列的阻塞使用ReentrantLock的Condition实现的。

Java中如何高效使用ArrayBlockingQueue实现并发控制?

 

下面我们来看看各个实现类的原理。以下分析我都会基于支持阻塞的put和take方法来分析。

二、ArrayBlockingQueue

ArrayBlockingQueue使用的数据结构是数组

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** 队列中元素保存的地方 */ final Object[] items; /** 取元素的指针 记录下一次操作的位置 */ int takeIndex; /** 放元素的指针 记录下一次操作的位置 */ int putIndex; /** 元素数量 */ int count; /** 保证并发访问的锁 */ final ReentrantLock lock; /** 等待出队的条件 消费者监视器 */ private final Condition notEmpty; /** 等待入队的条件 生产者监视器 */ private final Condition notFull; }

构造函数

  • 容量大小有构造函数的capacity参数决定。
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { //必须传入容量,可以控制重入锁是公平还是非公平 public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); // 初始化数组 this.items = new Object[capacity]; // 创建重入锁及两个条件 lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); //final修饰的变量不会发生指令重排 final ReentrantLock lock = this.lock; lock.lock(); // 保证可见性 不是为了互斥 防止指令重排 保证item的安全 try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } } }

2.1、入队

2.1.1、add(E e)方法

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public boolean add(E e) { return super.add(e); } } public abstract class AbstractQueue<E> extends AbstractCollection<E> implements Queue<E> { // AbstractQueue 调用offer(e)如果成功返回true,如果失败抛出异常 public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); } }

2.1.2、offer(E e)方法

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public boolean offer(E e) { // 元素不可为空 checkNotNull(e); final ReentrantLock lock = this.lock; // 加锁 lock.lock(); try { if (count == items.length) // 如果数组满了就返回false return false; else { // 如果数组没满就调用入队方法并返回true enqueue(e); return true; } } finally { //释放锁 lock.unlock(); } } }

2.1.3、put(E e) 方法

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public void put(E e) throws InterruptedException { checkNotNull(e); // 获取ReentrantLock锁 final ReentrantLock lock = this.lock; // 加锁,如果线程中断了抛出异常 lock.lockInterruptibly(); try { // 如果队列满了,则进入条件队列进行等待 while (count == items.length) notFull.await(); // 队列不满,或者被取数线程唤醒了,那么会继续执行 // 这里会往阻塞队列添加一个数据,然后唤醒等待时间最长的取数线程 enqueue(e); } finally { // 释放ReentrantLock锁 lock.unlock(); } } }

2.1.4、offer(E e, long timeout, TimeUnit unit)方法

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0) return false; // 如果数组满了,就阻塞nanos纳秒,如果唤醒这个线程时依然没有空间且时间到了就返回false nanos = notFull.awaitNanos(nanos); } //入队 enqueue(e); return true; } finally { lock.unlock(); } } private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; // 把元素直接放在放指针的位置上 items[putIndex] = x; // 如果放指针到数组尽头了,就返回头部 if (++putIndex == items.length) putIndex = 0; // 数量加1 count++; // 唤醒notEmpty,因为入队了一个元素,所以肯定不为空了 notEmpty.signal(); } }
  • add(e)时如果队列满了则抛出异常;
  • offer(e)时如果队列满了则返回false;
  • put(e)时如果队列满了则使用notFull等待;
  • offer(e, timeout, unit)时如果队列满了则等待一段时间后如果队列依然满就返回false;
  • 利用放指针循环使用数组来存储元素;

2.2、出队

2.2.1、 remove()方法

public abstract class AbstractQueue<E> extends AbstractCollection<E> implements Queue<E> { public E remove() // 调用poll()方法出队 E x = poll(); if (x != null) // 如果有元素出队就返回这个元素 return x; else // 如果没有元素出队就抛出异常 throw new NoSuchElementException(); } }

2.2.2、 poll()方法

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { //如果队列里没有数据就直接返回null //否则从队列头部出队 return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } } }

2.2.3、 poll(long timeout, TimeUnit unit)方法

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 如果队列无元素,则阻塞等待nanos纳秒 // 如果下一次这个线程获得了锁但队列依然无元素且已超时就返回null while (count == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } return dequeue(); } finally { lock.unlock(); } } }

2.2.4、 take()方法

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public E take() throws InterruptedException { final ReentrantLock lock = this.lock; //加锁,如果线程中断了抛出异常 lock.lockInterruptibly(); try { //队列中不存元素 while (count == 0) /* * 一直等待条件notEmpty,即被其他线程唤醒 * (唤醒其实就是,有线程将一个元素入队了,然后调用notEmpty.signal() * 唤醒其他等待这个条件的线程,同时队列也不空了) */ notEmpty.await(); //否则出队 return dequeue(); } finally { lock.unlock(); } } private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") // 取取指针位置的元素 E x = (E) items[takeIndex]; // 把取指针位置设为null items[takeIndex] = null; // 取指针前移,如果数组到头了就返回数组前端循环利用 if (++takeIndex == items.length) takeIndex = 0; // 元素数量减1 count--; if (itrs != null) itrs.elementDequeued(); // 唤醒notFull条件 notFull.signal(); return x; } }
  • remove()时如果队列为空则抛出异常;
  • poll()时如果队列为空则返回null;
  • take()时如果队列为空则阻塞等待在条件notEmpty上;
  • poll(timeout, unit)时如果队列为空则阻塞等待一段时间后如果还为空就返回null;
  • 利用取指针循环从数组中取元素;

如下图,以put和take方法为例:

这里put和take使用了同一个ReentrantLock,不能并发执行。

2.3、缺点

  • a、队列长度固定且必须在初始化时指定,所以使用之前一定要慎重考虑好容量;

  • b、如果消费速度跟不上入队速度,则会导致提供者线程一直阻塞,且越阻塞越多,非常危险;

  • c、只使用了一个锁来控制入队出队,效率较低。

参考: www.itzhai.com/articles/graphical-blocking-queue.html

zhuanlan.zhihu.com/p/224946304