用 Channel<t></t>
实现线程安全的异步生产者消费者队列
直接上结论:C# 6+(.NET Core 2.1+)推荐用
System.Threading.Channels.Channel<t></t>,它专为高并发异步场景设计,比手写
BlockingCollection<t></t>+
Task组合更轻量、无锁、原生支持
async/await。
常见错误是试图用
ConcurrentQueue<t></t>自己封装 awaitable 操作——它本身不提供异步等待能力,强行加
Task.Delay或轮询会浪费 CPU;也有人误用
BlockingCollection<t></t>的
Take(),它会同步阻塞线程,破坏 async 上下文。
Channel.CreateBounded<t>()</t>创建有界通道,溢出时可配置拒绝策略(如
DropWrite或抛异常)
Channel.CreateUnbounded<t>()</t>适合写入吞吐优先、内存可控的场景 生产端调用
Writer.WriteAsync(item),消费端用
Reader.ReadAsync()—— 两者都返回
ValueTask,无分配开销 通道关闭后,
Reader.ReadAsync()会完成并返回
default(T),需配合
TryRead或检查
WaitToReadAsync().IsCompletedSuccessfully
var channel = Channel.CreateBounded<string>(100);
<p>// 生产者
_ = Task.Run(async () =>
{
for (int i = 0; i < 5; i++)
{
await channel.Writer.WriteAsync($"msg-{i}");
await Task.Delay(100);
}
channel.Writer.Complete();
});</p><p>// 消费者
_ = Task.Run(async () =>
{
await foreach (var msg in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"Received: {msg}");
}
});如何正确处理消费者异常与通道终止
消费者任务崩溃或未捕获异常会导致
ReadAllAsync()提前退出,但通道可能仍有未读项;更隐蔽的问题是:生产者调用
Complete()后,若消费者没读完就退出,剩余数据会丢失。 必须在
await foreach外层包
try/catch,否则异常会静默终止迭代 若需确保所有已入队消息被处理,不要依赖
channel.Writer.Complete()触发消费者退出——应另设信号(如
CancellationToken)协调停机
Reader.Completion是
Task,反映消费者侧是否完成(包括异常终止),可用于
await channel.Reader.Completion等待消费结束 避免在消费者中直接
await channel.Reader.Completion—— 它不会等未读项,只等迭代器退出
什么时候不该用 Channel<t></t>
?
不是所有“队列”需求都适合
Channel。它定位是“流式数据传输”,不提供随机访问、计数查询、中间件插拔等能力。 需要实时获取当前队列长度?
Channel.Reader.Count只在有界通道且未被并发写入时可靠;无界通道返回 -1 要支持多个消费者竞争消费同一消息?
Channel是单消费者语义;此时该用
ServiceBus或
RabbitMQ消息需持久化、重试、死信?
Channel纯内存,崩溃即丢;必须外接存储层 项目还在 .NET Framework 4.8?
Channel不可用,只能降级用
BlockingCollection<t></t>+
GetConsumingEnumerable()(但无法真正异步)
替代方案:BlockingCollection<t></t>
的异步包装陷阱
有人用
Task.Run(() => collection.Take())伪异步,这本质是线程池抢占,增加调度开销且无法取消;正确做法是仅在必须兼容旧框架时,用
TryTake配合短时
Task.Delay循环,但务必设超时和取消令牌。 永远不要写
await Task.Run(() => collection.Take())—— 这违背 async/await 减少线程占用的初衷 若坚持用
BlockingCollection,消费循环应类似:
while (collection.TryTake(out var item, 10, token)) { ... }
BlockingCollection的
Add在有界模式下可能阻塞线程,而
Channel.Writer.WriteAsync在满时默认返回
ValueTask并挂起,更可控
实际落地时,最易忽略的是消费者异常传播路径和通道生命周期管理——写个
await foreach很容易,但谁负责捕获异常?谁决定何时
Complete()?这些边界不厘清,上线后就会出现消息静默丢失或消费者假死。
