如何通过 LinkedTransferQueue 的 transfer 方法实现生产者与消费者间的无缓冲队列传递?
- 内容介绍
- 相关推荐
本文共计991个文字,预计阅读时间需要4分钟。
pythonimport threading
class ProducerConsumer: def __init__(self, buffer_size): self.buffer_size=buffer_size self.buffer=[] self.lock=threading.Lock() self.not_full=threading.Condition(self.lock) self.not_empty=threading.Condition(self.lock)
def produce(self, item): with self.not_full: while len(self.buffer)==self.buffer_size: self.not_full.wait() self.buffer.append(item) self.not_empty.notify()
def consume(self): with self.not_empty: while not self.buffer: self.not_empty.wait() item=self.buffer.pop(0) self.not_full.notify() return item
Example usage:producer=ProducerConsumer(5)consumer=threading.Thread(target=producer.consume, args=())consumer.start()
Producer threaddef producer_thread(): for i in range(10): producer.produce(i) print(fProduced: {i})
Consumer threaddef consumer_thread(): while True: item=producer.consume() print(fConsumed: {item})
Start producer threadproducer_thread()
关键点在于:它依赖的是线程状态匹配,不是数据存储。所以即使队列内部节点数不为 0(比如有等待的 request 节点),对外仍体现为“无缓存”,因为所有未匹配的节点都是空占位符(item == null),不携带有效数据。
必须配对使用 transfer + take,否则生产者永远卡住
常见错误是只在生产者端调用 transfer(e),但消费者用的是 poll() 或 poll(timeout, unit)。这些方法不会注册为“等待接收者”,transfer 就找不到匹配目标,只能一直阻塞。
正确配对方式只有两种:
- 生产者用
transfer(e),消费者必须用take()(无限等待)或poll(long, TimeUnit)(限时等待) - 或者消费者用
transfer(null)反向发起请求(较少见,等价于“我要取,你快给”)
注意:hasWaitingConsumer() 可以提前探测是否有线程在 take 状态,避免盲等;但它本身不改变线程状态,不能替代真实调用。
公平模式下 transfer 才真正接近 SynchronousQueue 行为
LinkedTransferQueue 默认是非公平模式(LIFO),transfer 匹配时优先找最近阻塞的消费者,容易造成“饥饿”——老等待者可能一直被跳过。如果你要严格 FIFO 的零容量传递(比如任务调度保序),必须显式启用公平模式:
new LinkedTransferQueue<String>(true)
这时 transfer 内部走的是 xfer(而非 xferLifo),匹配逻辑与 SynchronousQueue(公平模式)一致:先进先匹配,队列头的消费者优先获得移交权。
性能上,公平模式比非公平慢一些(实测约 14 倍延迟差异),但行为可预测;非公平虽快,但顺序不可靠,不适合强顺序场景。
别误用 size() 判断是否“有货”,它在 transfer 场景下毫无意义
size() 在 LinkedTransferQueue 中是迭代计数,且结果可能瞬间过期。更关键的是:当只有 transfer 和 take 在跑时,队列中实际从不存有效元素——所有节点要么是带数据的生产者占位,要么是 item == null 的消费者占位。此时 size() 返回的数字既不代表待处理数据量,也不代表压力水位。
真正有用的指标是:
-
hasWaitingConsumer():有没有线程正卡在take() -
getWaitingConsumerCount():粗略看有几个线程在等(注意不是精确值) - 配合 JMX 或自定义计数器监控
transfer的平均等待时长
试图靠 size() > 0 来触发告警或降级,大概率会误判——因为正常运转时它经常是 0,出问题时反而可能突增(比如消费者全挂了,生产者全卡在 transfer 上,形成一堆未匹配节点)。
本文共计991个文字,预计阅读时间需要4分钟。
pythonimport threading
class ProducerConsumer: def __init__(self, buffer_size): self.buffer_size=buffer_size self.buffer=[] self.lock=threading.Lock() self.not_full=threading.Condition(self.lock) self.not_empty=threading.Condition(self.lock)
def produce(self, item): with self.not_full: while len(self.buffer)==self.buffer_size: self.not_full.wait() self.buffer.append(item) self.not_empty.notify()
def consume(self): with self.not_empty: while not self.buffer: self.not_empty.wait() item=self.buffer.pop(0) self.not_full.notify() return item
Example usage:producer=ProducerConsumer(5)consumer=threading.Thread(target=producer.consume, args=())consumer.start()
Producer threaddef producer_thread(): for i in range(10): producer.produce(i) print(fProduced: {i})
Consumer threaddef consumer_thread(): while True: item=producer.consume() print(fConsumed: {item})
Start producer threadproducer_thread()
关键点在于:它依赖的是线程状态匹配,不是数据存储。所以即使队列内部节点数不为 0(比如有等待的 request 节点),对外仍体现为“无缓存”,因为所有未匹配的节点都是空占位符(item == null),不携带有效数据。
必须配对使用 transfer + take,否则生产者永远卡住
常见错误是只在生产者端调用 transfer(e),但消费者用的是 poll() 或 poll(timeout, unit)。这些方法不会注册为“等待接收者”,transfer 就找不到匹配目标,只能一直阻塞。
正确配对方式只有两种:
- 生产者用
transfer(e),消费者必须用take()(无限等待)或poll(long, TimeUnit)(限时等待) - 或者消费者用
transfer(null)反向发起请求(较少见,等价于“我要取,你快给”)
注意:hasWaitingConsumer() 可以提前探测是否有线程在 take 状态,避免盲等;但它本身不改变线程状态,不能替代真实调用。
公平模式下 transfer 才真正接近 SynchronousQueue 行为
LinkedTransferQueue 默认是非公平模式(LIFO),transfer 匹配时优先找最近阻塞的消费者,容易造成“饥饿”——老等待者可能一直被跳过。如果你要严格 FIFO 的零容量传递(比如任务调度保序),必须显式启用公平模式:
new LinkedTransferQueue<String>(true)
这时 transfer 内部走的是 xfer(而非 xferLifo),匹配逻辑与 SynchronousQueue(公平模式)一致:先进先匹配,队列头的消费者优先获得移交权。
性能上,公平模式比非公平慢一些(实测约 14 倍延迟差异),但行为可预测;非公平虽快,但顺序不可靠,不适合强顺序场景。
别误用 size() 判断是否“有货”,它在 transfer 场景下毫无意义
size() 在 LinkedTransferQueue 中是迭代计数,且结果可能瞬间过期。更关键的是:当只有 transfer 和 take 在跑时,队列中实际从不存有效元素——所有节点要么是带数据的生产者占位,要么是 item == null 的消费者占位。此时 size() 返回的数字既不代表待处理数据量,也不代表压力水位。
真正有用的指标是:
-
hasWaitingConsumer():有没有线程正卡在take() -
getWaitingConsumerCount():粗略看有几个线程在等(注意不是精确值) - 配合 JMX 或自定义计数器监控
transfer的平均等待时长
试图靠 size() > 0 来触发告警或降级,大概率会误判——因为正常运转时它经常是 0,出问题时反而可能突增(比如消费者全挂了,生产者全卡在 transfer 上,形成一堆未匹配节点)。

