如何通过SynchronousQueue实现线程间的点对点、无缓冲数据交换?
- 内容介绍
- 相关推荐
本文共计1050个文字,预计阅读时间需要5分钟。
它本质上不存储元素——put()必须等待另一个线程正确调用take(),反之亦然。这并非缓冲队列,而是线程间强耦合的“即时交接机制。一旦配对失败(例如,只有put()没有对应的take()),调用线程就会阻塞,直到对方出现。”
常见错误现象:put() 卡死、CPU 无增长但程序不动、堆栈里全是 SynchronousQueue$TransferStack 相关的 wait;本质是线程没对上,不是代码写错了,而是协作逻辑断了。
- 只适用于严格一对一、交替执行的场景(如生产者-消费者必须实时配对)
- 不能用
size()判断状态——它永远返回 0,因为根本不缓存 - 不支持
offer()/poll()的超时变体以外的非阻塞操作;offer(e)总是返回false,poll()总是返回null
用 SynchronousQueue 实现 ping-pong 线程协作
典型点对点交付:线程 A 发一个值,必须等线程 B 接走,B 处理完再发回结果,A 再接。整个过程无中间存储,纯靠两次手递手完成闭环。
final SynchronousQueue<String> channel = new SynchronousQueue<>(); Thread t1 = new Thread(() -> { try { String req = "hello"; System.out.println("A: sending " + req); channel.put(req); // 阻塞,直到 B 调用 take() String resp = channel.take(); // 阻塞,直到 B put() 回来 System.out.println("A: got response " + resp); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); Thread t2 = new Thread(() -> { try { String req = channel.take(); // 阻塞,等 A put() System.out.println("B: received " + req); channel.put("world"); // 阻塞,等 A take() } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); t1.start(); t2.start();
- 两个线程启动顺序无关,
SynchronousQueue自动协调等待关系 - 不要在单个线程里连续
put()两次——没有第二个线程接,第一次就卡死 - 务必捕获
InterruptedException并恢复中断状态,否则协作可能无法被外部中断
和 LinkedBlockingQueue(1) 的关键区别在哪
有人会想:“我用容量为 1 的阻塞队列不也差不多?” 不行。LinkedBlockingQueue(1) 允许生产者先存、消费者后取,存在时间差和缓冲;而 SynchronousQueue 强制双方“碰头瞬间交接”,语义更严格,延迟更低,且能暴露协作缺失问题(比如漏掉一次 take() 就直接卡死,而不是悄悄积压)。
-
LinkedBlockingQueue(1):适合“尽量快交,但可容忍短暂异步” -
SynchronousQueue:适合“必须同步交接,错一次就该立刻失败”的协议级协作 - 性能上,
SynchronousQueue在高争用下可能比小容量队列更快——没内存拷贝、无节点分配,纯 CAS 协作
容易忽略的 shutdown 安全问题
如果线程因异常退出,没执行到 take() 或 put(),另一端就会永久阻塞。没有内置超时或中断传播保障,得靠外部控制。
- 所有
put()/take()调用都应包裹在带超时的版本里:queue.offer(e, 5, TimeUnit.SECONDS)和queue.poll(5, TimeUnit.SECONDS) - 不要依赖
Thread.interrupt()来“唤醒”阻塞中的SynchronousQueue操作——它确实响应中断,但必须在线程处于可中断状态(如刚进入take())时才生效;若已卡在锁竞争中,可能延迟响应 - 真实系统中建议配合
CountDownLatch或CyclicBarrier做初始握手,避免“谁先发谁卡住”的启动竞态
真正难的不是写通这段代码,而是确保两端线程生命周期对齐、错误路径全覆盖、中断信号不丢失——手递手看着简单,容错余地其实最小。
本文共计1050个文字,预计阅读时间需要5分钟。
它本质上不存储元素——put()必须等待另一个线程正确调用take(),反之亦然。这并非缓冲队列,而是线程间强耦合的“即时交接机制。一旦配对失败(例如,只有put()没有对应的take()),调用线程就会阻塞,直到对方出现。”
常见错误现象:put() 卡死、CPU 无增长但程序不动、堆栈里全是 SynchronousQueue$TransferStack 相关的 wait;本质是线程没对上,不是代码写错了,而是协作逻辑断了。
- 只适用于严格一对一、交替执行的场景(如生产者-消费者必须实时配对)
- 不能用
size()判断状态——它永远返回 0,因为根本不缓存 - 不支持
offer()/poll()的超时变体以外的非阻塞操作;offer(e)总是返回false,poll()总是返回null
用 SynchronousQueue 实现 ping-pong 线程协作
典型点对点交付:线程 A 发一个值,必须等线程 B 接走,B 处理完再发回结果,A 再接。整个过程无中间存储,纯靠两次手递手完成闭环。
final SynchronousQueue<String> channel = new SynchronousQueue<>(); Thread t1 = new Thread(() -> { try { String req = "hello"; System.out.println("A: sending " + req); channel.put(req); // 阻塞,直到 B 调用 take() String resp = channel.take(); // 阻塞,直到 B put() 回来 System.out.println("A: got response " + resp); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); Thread t2 = new Thread(() -> { try { String req = channel.take(); // 阻塞,等 A put() System.out.println("B: received " + req); channel.put("world"); // 阻塞,等 A take() } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); t1.start(); t2.start();
- 两个线程启动顺序无关,
SynchronousQueue自动协调等待关系 - 不要在单个线程里连续
put()两次——没有第二个线程接,第一次就卡死 - 务必捕获
InterruptedException并恢复中断状态,否则协作可能无法被外部中断
和 LinkedBlockingQueue(1) 的关键区别在哪
有人会想:“我用容量为 1 的阻塞队列不也差不多?” 不行。LinkedBlockingQueue(1) 允许生产者先存、消费者后取,存在时间差和缓冲;而 SynchronousQueue 强制双方“碰头瞬间交接”,语义更严格,延迟更低,且能暴露协作缺失问题(比如漏掉一次 take() 就直接卡死,而不是悄悄积压)。
-
LinkedBlockingQueue(1):适合“尽量快交,但可容忍短暂异步” -
SynchronousQueue:适合“必须同步交接,错一次就该立刻失败”的协议级协作 - 性能上,
SynchronousQueue在高争用下可能比小容量队列更快——没内存拷贝、无节点分配,纯 CAS 协作
容易忽略的 shutdown 安全问题
如果线程因异常退出,没执行到 take() 或 put(),另一端就会永久阻塞。没有内置超时或中断传播保障,得靠外部控制。
- 所有
put()/take()调用都应包裹在带超时的版本里:queue.offer(e, 5, TimeUnit.SECONDS)和queue.poll(5, TimeUnit.SECONDS) - 不要依赖
Thread.interrupt()来“唤醒”阻塞中的SynchronousQueue操作——它确实响应中断,但必须在线程处于可中断状态(如刚进入take())时才生效;若已卡在锁竞争中,可能延迟响应 - 真实系统中建议配合
CountDownLatch或CyclicBarrier做初始握手,避免“谁先发谁卡住”的启动竞态
真正难的不是写通这段代码,而是确保两端线程生命周期对齐、错误路径全覆盖、中断信号不丢失——手递手看着简单,容错余地其实最小。

