C产品如何满足特定用户需求?
- 内容介绍
- 文章标签
- 相关推荐
本文共计733个文字,预计阅读时间需要3分钟。
它底层默认采用+ConcurrentLinkedQueue+实现。
- 构造时可传入容量上限,比如
new BlockingCollection<int>(boundedCapacity: 10)</int>,超限时Add()会阻塞 - 不设上限时(默认构造),
Add()永不阻塞,但Take()在空时仍会阻塞 - 调用
CompleteAdding()后,后续Add()抛InvalidOperationException,已阻塞的Take()会逐个返回剩余项,之后才返回default(T)或抛异常(取决于是否启用GetConsumingEnumerable())
用 GetConsumingEnumerable() 写消费者最简洁
这是最常用、也最不容易出错的消费写法,内部自动处理“空时等待”和“完成信号”。
var collection = new BlockingCollection<string>(); <p>// 生产者线程 Task.Run(() => { for (int i = 0; i < 5; i++) { collection.Add($"item-{i}"); Thread.Sleep(100); } collection.CompleteAdding(); });</p><p>// 消费者线程 Task.Run(() => { foreach (var item in collection.GetConsumingEnumerable()) { Console.WriteLine($"Consumed: {item}"); Thread.Sleep(150); } });
-
GetConsumingEnumerable()返回的是“消耗式枚举器”,每次MoveNext()都会调用Take() - 循环会在
CompleteAdding()被调用且集合为空后自然退出 - 不要混用
Take()和GetConsumingEnumerable(),否则可能漏项或重复取
Add() 和 TryAdd() 的行为差异很关键
Add() 是阻塞式:当集合已达上限(有界)时,它会一直等,直到有空间;TryAdd() 则立即返回 bool 表示是否成功。
- 如果你希望“尽力加、不卡住”,用
TryAdd(item, timeoutMs),超时返回false - 若没设上限,
TryAdd()总是返回true(除非已CompleteAdding()) -
Add()在已完成添加后调用,会直接抛InvalidOperationException:“The collection has been marked as complete with regards to additions.”
多个消费者共用同一个 BlockingCollection 安全吗
安全,而且正是设计初衷。所有 Add()、Take()、GetConsumingEnumerable() 都是线程安全的。
- 多个消费者调用
GetConsumingEnumerable(),每个都会独立消费不同元素,不会重复 - 没有“全局消费顺序保证”,但每个元素只被一个消费者拿到
- 注意:如果某个消费者处理慢,其他消费者仍能继续取,不会被拖累 —— 这是优势,但也意味着你要确保业务逻辑能容忍乱序或并行处理
真正容易忽略的是:一旦调用 CompleteAdding(),就不能再恢复。如果你需要暂停/恢复生产,得自己用 ManualResetEvent 或 SemaphoreSlim 控制,BlockingCollection 本身不提供这个能力。
本文共计733个文字,预计阅读时间需要3分钟。
它底层默认采用+ConcurrentLinkedQueue+实现。
- 构造时可传入容量上限,比如
new BlockingCollection<int>(boundedCapacity: 10)</int>,超限时Add()会阻塞 - 不设上限时(默认构造),
Add()永不阻塞,但Take()在空时仍会阻塞 - 调用
CompleteAdding()后,后续Add()抛InvalidOperationException,已阻塞的Take()会逐个返回剩余项,之后才返回default(T)或抛异常(取决于是否启用GetConsumingEnumerable())
用 GetConsumingEnumerable() 写消费者最简洁
这是最常用、也最不容易出错的消费写法,内部自动处理“空时等待”和“完成信号”。
var collection = new BlockingCollection<string>(); <p>// 生产者线程 Task.Run(() => { for (int i = 0; i < 5; i++) { collection.Add($"item-{i}"); Thread.Sleep(100); } collection.CompleteAdding(); });</p><p>// 消费者线程 Task.Run(() => { foreach (var item in collection.GetConsumingEnumerable()) { Console.WriteLine($"Consumed: {item}"); Thread.Sleep(150); } });
-
GetConsumingEnumerable()返回的是“消耗式枚举器”,每次MoveNext()都会调用Take() - 循环会在
CompleteAdding()被调用且集合为空后自然退出 - 不要混用
Take()和GetConsumingEnumerable(),否则可能漏项或重复取
Add() 和 TryAdd() 的行为差异很关键
Add() 是阻塞式:当集合已达上限(有界)时,它会一直等,直到有空间;TryAdd() 则立即返回 bool 表示是否成功。
- 如果你希望“尽力加、不卡住”,用
TryAdd(item, timeoutMs),超时返回false - 若没设上限,
TryAdd()总是返回true(除非已CompleteAdding()) -
Add()在已完成添加后调用,会直接抛InvalidOperationException:“The collection has been marked as complete with regards to additions.”
多个消费者共用同一个 BlockingCollection 安全吗
安全,而且正是设计初衷。所有 Add()、Take()、GetConsumingEnumerable() 都是线程安全的。
- 多个消费者调用
GetConsumingEnumerable(),每个都会独立消费不同元素,不会重复 - 没有“全局消费顺序保证”,但每个元素只被一个消费者拿到
- 注意:如果某个消费者处理慢,其他消费者仍能继续取,不会被拖累 —— 这是优势,但也意味着你要确保业务逻辑能容忍乱序或并行处理
真正容易忽略的是:一旦调用 CompleteAdding(),就不能再恢复。如果你需要暂停/恢复生产,得自己用 ManualResetEvent 或 SemaphoreSlim 控制,BlockingCollection 本身不提供这个能力。

