BlockingQueue源码中如何实现线程安全?

2026-05-22 08:201阅读0评论SEO问题
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计2375个文字,预计阅读时间需要10分钟。

BlockingQueue源码中如何实现线程安全?

学习多线程定时器时遇到+BlockingQueue+阻塞队列,当时的认知仅限于它是一个并发阻塞队列。不知如何使用及其原理。1. 介绍BlockingQueue首先,BlockingQueue是一个队列,它提供了阻塞操作,以便在队列满时等待元素取出,或队列空时等待元素放入。2. 原理BlockingQueue看起来像一个普通队列,但它在内部实现了线程间的协作,当队列为空时,尝试取元素的操作会阻塞;当队列满时,尝试添加元素的操作会阻塞。通过这些机制,BlockingQueue支持了生产者-消费者模型。

学习多线程定时器时遇到 BlockingQueue 阻塞队列,当时的认识仅限于了解其是一个并发阻塞队列,不知如何使用及其原理


1. 介绍

BlockingQueue 首先是一个队列,其次提供了阻塞功能。它看起来很像消息队列可让消息解耦,但其在生产者-消费者模型中通过阻塞又可使二者速度达到平衡。使用阻塞队列无需过多考虑线程安全问题,专注业务逻辑的实现即可


BlockingQueue 有正常的队列功能,即出队与入队。其阻塞体现在:

BlockingQueue源码中如何实现线程安全?

  • 当队列满时,若有生产者继续往队列添加元素,则阻塞这个生产者线程
  • 当队列为空,若有消费者继续从队列移除元素,则阻塞这个消费者线程
  • 添加元素时,则队列不为空,则唤醒消费者线程
  • 移除元素时,则队列不为满,则唤醒生产者线程

常见的阻塞队列:

  • ArrayBlockingQueue:基于数组的有界阻塞队列
  • LinkedBlockingQueue:基于链表的有界阻塞队列
  • PriorityBlockingQueue:基于堆的优先级无界阻塞队列
  • DelayQueue:基于时间优先级的无界阻塞队列

后面将介绍 ArrayBlockingQueue、LinkedBlockingQueue 两个阻塞队列, 其类继承图如下:







2. Queue 接口

Queue 接口具有队列的基本方法,其不同之处在于同一个功能他有两套方法,两套方法区别于一套是实现返回值,另一套是抛出异常

Throw Exception Return value 增加 add() offer() 删除 remove() poll() 检查 element() peek()





3. BlockingQueue 接口

BlockingQueue 接口在 Queue 的接口上添加多几个方法或重载,最常用的方法有 put 和 take(有阻塞功能)

非阻塞 阻塞线程方法 增加 offer(E e, long timeout, TimeUnit unit):等待超时后返回值 put() 删除 poll(long timeout, TimeUnit unit):等待超时后返回值 take() 包含 contains(Object o) 剩余大小 remainingCapacity() 转移元素 drainTo(Collection<? super E> c)





4. AbstractQueue 抽象类

通过 AbstractQueue 抽象类可知道 Queue 接口的两套方法其实本质是一样的,只不过多了一层抛异常的判断而已

public abstract class AbstractQueue<E> extends AbstractCollection<E> implements Queue<E> { /** * 构造器 */ protected AbstractQueue() { } /** * add 调用的还是 offer 方法返回值 */ public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); } /** * remove 调用的还是 poll 方法返回值 */ public E remove() { E x = poll(); if (x != null) return x; else throw new NoSuchElementException(); } /** * remove 调用的还是 poll 方法返回值 */ public E element() { E x = peek(); if (x != null) return x; else throw new NoSuchElementException(); } /** * 清空元素 */ public void clear() { while (poll() != null) ; } /** * 批量添加元素 */ public boolean addAll(Collection<? extends E> c) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); boolean modified = false; for (E e : c) if (add(e)) modified = true; return modified; } }





5. ArrayBlockingQueue

ArrayBlockingQueue 使用循环数组来存储元素,其大小是确定的,所以属于有界阻塞队列。阻塞功能由可重入锁实现,出队入队都靠同一个锁来控制,并且由条件锁来实现生产者和消费者线程的阻塞和唤醒。


5.1 内部属性

// 存储元素的数组 final Object[] items; // 可获取元素的下标 int takeIndex; // 可存放元素的下标 int putIndex; // 元素总数量 int count; // 可重入锁 final ReentrantLock lock; // 非空条件锁,队列为空时阻塞消费者线程 private final Condition notEmpty; // 非满条件锁,队列满时阻塞生产者线程 private final Condition notFull; // 自定义的迭代器(可使用默认提供的) transient Itrs itrs = null;
5.2 关键方法

// 入队,添加元素后队列则不为空,会唤醒被阻塞的消费者线程 private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }

// 出队,移除元素后队列则不满,唤醒被阻塞的生产者线程 private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }
5.3 常用方法

put、take 是最常用的方法,对应添加与移除元素

添加元素时,锁住整个数组。若队列满了则阻塞当前添加元素的线程,否则添加完元素后唤醒消费者线程进行消费

// 添加元素, public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; // 获取锁 lock.lockInterruptibly(); try { // 若队满,则阻塞生产者线程 while (count == items.length) notFull.await(); // 入队 enqueue(e); } finally { // 释放锁 lock.unlock(); } }

移除元素时,锁住整个数组。若队列空了则阻塞当前移除元素的线程,否则移除完元素后唤醒生产者线程进行生产

// 移除元素 public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // 获取锁 lock.lockInterruptibly(); try { // 队空则阻塞消费者线程 while (count == 0) notEmpty.await(); // 出队 return dequeue(); } finally { // 释放锁 lock.unlock(); } }
5.4 使用示例

下面来模拟一个生产-消费者场景。发现刚开始生产者一直生产商品,到队满后则被阻塞,此后消费者消费了一个商品后,生产者才生产一个商品

public class BlockingQueueTest { public static void main(String[] args) { ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(5); // 生产者线程,每秒生产一个商品 new Thread(()->{ while (true) { try { Thread.sleep(1000); System.out.println("生产者生产了商品"); blockingQueue.put(new Integer(1)); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); // 消费者线程,每5秒消费一个商品 new Thread(()->{ while (true) { try { Thread.sleep(5000); System.out.println("消费者消费了商品"); blockingQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } }





6. LinkedBlockingQueue

LinkedBlockingQueue 底层是一个单向链表,可指定链表大小(不指定默认为 Integer.MAX_VALUE),实现原理和 ArrayBlockingQueue 类似,不同在于其链表的结构可实现同时出队和入队,此时需要两把锁来控制并发,所以其出队入队是不影响的


下面的实现类是 删除简化很多代码之后的结果 ,为了简洁的展示实现原理

public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> { /** * 单向链表的节点 */ static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } } /** 最大容量 */ private final int capacity; /** 现有元素总数量,原子操作 */ private final AtomicInteger count = new AtomicInteger(); /** * 链表头 */ transient Node<E> head; /** * 链表尾 */ private transient Node<E> last; /** take方法的可重入锁,控制并发 */ private final ReentrantLock takeLock = new ReentrantLock(); /** take方法的条件锁 */ private final Condition notEmpty = takeLock.newCondition(); /** put方法的可重入锁,控制并发 */ private final ReentrantLock putLock = new ReentrantLock(); /** put方法的条件锁 */ private final Condition notFull = putLock.newCondition(); /** * 唤醒非空阻塞的线程 */ private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } } /** * 唤醒非满阻塞的线程 */ private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } } /** * 链表尾入队 */ private void enqueue(Node<E> node) { last = last.next = node; } /** * 链表头出队 */ private E dequeue() { Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; } /** * 入队 */ public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; // 获取 put 方法锁,锁住链表尾部 putLock.lockInterruptibly(); try { // 队满时,阻塞添加元素的线程 while (count.get() == capacity) { notFull.await(); } // 入队操作 enqueue(node); c = count.getAndIncrement(); // 添加元素后队列还没满,唤醒非满阻塞的线程 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } // 说明队列为空了,唤醒被阻塞的生产者线程 if (c == 0) signalNotEmpty(); } /** * 出队 */ public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; // 获取 take 方法锁,锁住链表头部 takeLock.lockInterruptibly(); try { // 队空时,阻塞获取元素的线程 while (count.get() == 0) { notEmpty.await(); } // 出队 x = dequeue(); c = count.getAndDecrement(); // 出队后,队列不为空,继续唤醒出队线程 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } // 队满了,唤醒被阻塞的消费者线程 if (c == capacity) signalNotFull(); return x; } }

本文共计2375个文字,预计阅读时间需要10分钟。

BlockingQueue源码中如何实现线程安全?

学习多线程定时器时遇到+BlockingQueue+阻塞队列,当时的认知仅限于它是一个并发阻塞队列。不知如何使用及其原理。1. 介绍BlockingQueue首先,BlockingQueue是一个队列,它提供了阻塞操作,以便在队列满时等待元素取出,或队列空时等待元素放入。2. 原理BlockingQueue看起来像一个普通队列,但它在内部实现了线程间的协作,当队列为空时,尝试取元素的操作会阻塞;当队列满时,尝试添加元素的操作会阻塞。通过这些机制,BlockingQueue支持了生产者-消费者模型。

学习多线程定时器时遇到 BlockingQueue 阻塞队列,当时的认识仅限于了解其是一个并发阻塞队列,不知如何使用及其原理


1. 介绍

BlockingQueue 首先是一个队列,其次提供了阻塞功能。它看起来很像消息队列可让消息解耦,但其在生产者-消费者模型中通过阻塞又可使二者速度达到平衡。使用阻塞队列无需过多考虑线程安全问题,专注业务逻辑的实现即可


BlockingQueue 有正常的队列功能,即出队与入队。其阻塞体现在:

BlockingQueue源码中如何实现线程安全?

  • 当队列满时,若有生产者继续往队列添加元素,则阻塞这个生产者线程
  • 当队列为空,若有消费者继续从队列移除元素,则阻塞这个消费者线程
  • 添加元素时,则队列不为空,则唤醒消费者线程
  • 移除元素时,则队列不为满,则唤醒生产者线程

常见的阻塞队列:

  • ArrayBlockingQueue:基于数组的有界阻塞队列
  • LinkedBlockingQueue:基于链表的有界阻塞队列
  • PriorityBlockingQueue:基于堆的优先级无界阻塞队列
  • DelayQueue:基于时间优先级的无界阻塞队列

后面将介绍 ArrayBlockingQueue、LinkedBlockingQueue 两个阻塞队列, 其类继承图如下:







2. Queue 接口

Queue 接口具有队列的基本方法,其不同之处在于同一个功能他有两套方法,两套方法区别于一套是实现返回值,另一套是抛出异常

Throw Exception Return value 增加 add() offer() 删除 remove() poll() 检查 element() peek()





3. BlockingQueue 接口

BlockingQueue 接口在 Queue 的接口上添加多几个方法或重载,最常用的方法有 put 和 take(有阻塞功能)

非阻塞 阻塞线程方法 增加 offer(E e, long timeout, TimeUnit unit):等待超时后返回值 put() 删除 poll(long timeout, TimeUnit unit):等待超时后返回值 take() 包含 contains(Object o) 剩余大小 remainingCapacity() 转移元素 drainTo(Collection<? super E> c)





4. AbstractQueue 抽象类

通过 AbstractQueue 抽象类可知道 Queue 接口的两套方法其实本质是一样的,只不过多了一层抛异常的判断而已

public abstract class AbstractQueue<E> extends AbstractCollection<E> implements Queue<E> { /** * 构造器 */ protected AbstractQueue() { } /** * add 调用的还是 offer 方法返回值 */ public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); } /** * remove 调用的还是 poll 方法返回值 */ public E remove() { E x = poll(); if (x != null) return x; else throw new NoSuchElementException(); } /** * remove 调用的还是 poll 方法返回值 */ public E element() { E x = peek(); if (x != null) return x; else throw new NoSuchElementException(); } /** * 清空元素 */ public void clear() { while (poll() != null) ; } /** * 批量添加元素 */ public boolean addAll(Collection<? extends E> c) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); boolean modified = false; for (E e : c) if (add(e)) modified = true; return modified; } }





5. ArrayBlockingQueue

ArrayBlockingQueue 使用循环数组来存储元素,其大小是确定的,所以属于有界阻塞队列。阻塞功能由可重入锁实现,出队入队都靠同一个锁来控制,并且由条件锁来实现生产者和消费者线程的阻塞和唤醒。


5.1 内部属性

// 存储元素的数组 final Object[] items; // 可获取元素的下标 int takeIndex; // 可存放元素的下标 int putIndex; // 元素总数量 int count; // 可重入锁 final ReentrantLock lock; // 非空条件锁,队列为空时阻塞消费者线程 private final Condition notEmpty; // 非满条件锁,队列满时阻塞生产者线程 private final Condition notFull; // 自定义的迭代器(可使用默认提供的) transient Itrs itrs = null;
5.2 关键方法

// 入队,添加元素后队列则不为空,会唤醒被阻塞的消费者线程 private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }

// 出队,移除元素后队列则不满,唤醒被阻塞的生产者线程 private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }
5.3 常用方法

put、take 是最常用的方法,对应添加与移除元素

添加元素时,锁住整个数组。若队列满了则阻塞当前添加元素的线程,否则添加完元素后唤醒消费者线程进行消费

// 添加元素, public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; // 获取锁 lock.lockInterruptibly(); try { // 若队满,则阻塞生产者线程 while (count == items.length) notFull.await(); // 入队 enqueue(e); } finally { // 释放锁 lock.unlock(); } }

移除元素时,锁住整个数组。若队列空了则阻塞当前移除元素的线程,否则移除完元素后唤醒生产者线程进行生产

// 移除元素 public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // 获取锁 lock.lockInterruptibly(); try { // 队空则阻塞消费者线程 while (count == 0) notEmpty.await(); // 出队 return dequeue(); } finally { // 释放锁 lock.unlock(); } }
5.4 使用示例

下面来模拟一个生产-消费者场景。发现刚开始生产者一直生产商品,到队满后则被阻塞,此后消费者消费了一个商品后,生产者才生产一个商品

public class BlockingQueueTest { public static void main(String[] args) { ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(5); // 生产者线程,每秒生产一个商品 new Thread(()->{ while (true) { try { Thread.sleep(1000); System.out.println("生产者生产了商品"); blockingQueue.put(new Integer(1)); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); // 消费者线程,每5秒消费一个商品 new Thread(()->{ while (true) { try { Thread.sleep(5000); System.out.println("消费者消费了商品"); blockingQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } }





6. LinkedBlockingQueue

LinkedBlockingQueue 底层是一个单向链表,可指定链表大小(不指定默认为 Integer.MAX_VALUE),实现原理和 ArrayBlockingQueue 类似,不同在于其链表的结构可实现同时出队和入队,此时需要两把锁来控制并发,所以其出队入队是不影响的


下面的实现类是 删除简化很多代码之后的结果 ,为了简洁的展示实现原理

public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> { /** * 单向链表的节点 */ static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } } /** 最大容量 */ private final int capacity; /** 现有元素总数量,原子操作 */ private final AtomicInteger count = new AtomicInteger(); /** * 链表头 */ transient Node<E> head; /** * 链表尾 */ private transient Node<E> last; /** take方法的可重入锁,控制并发 */ private final ReentrantLock takeLock = new ReentrantLock(); /** take方法的条件锁 */ private final Condition notEmpty = takeLock.newCondition(); /** put方法的可重入锁,控制并发 */ private final ReentrantLock putLock = new ReentrantLock(); /** put方法的条件锁 */ private final Condition notFull = putLock.newCondition(); /** * 唤醒非空阻塞的线程 */ private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } } /** * 唤醒非满阻塞的线程 */ private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } } /** * 链表尾入队 */ private void enqueue(Node<E> node) { last = last.next = node; } /** * 链表头出队 */ private E dequeue() { Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; } /** * 入队 */ public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; // 获取 put 方法锁,锁住链表尾部 putLock.lockInterruptibly(); try { // 队满时,阻塞添加元素的线程 while (count.get() == capacity) { notFull.await(); } // 入队操作 enqueue(node); c = count.getAndIncrement(); // 添加元素后队列还没满,唤醒非满阻塞的线程 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } // 说明队列为空了,唤醒被阻塞的生产者线程 if (c == 0) signalNotEmpty(); } /** * 出队 */ public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; // 获取 take 方法锁,锁住链表头部 takeLock.lockInterruptibly(); try { // 队空时,阻塞获取元素的线程 while (count.get() == 0) { notEmpty.await(); } // 出队 x = dequeue(); c = count.getAndDecrement(); // 出队后,队列不为空,继续唤醒出队线程 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } // 队满了,唤醒被阻塞的消费者线程 if (c == capacity) signalNotFull(); return x; } }