c# 如何在c#中实现一个异步的生产者消费者队列

来源:这里教程网 时间:2026-02-21 17:38:30 作者:

Channel<t></t>
实现线程安全的异步生产者消费者队列

直接上结论:C# 6+(.NET Core 2.1+)推荐用

System.Threading.Channels.Channel<t></t>
,它专为高并发异步场景设计,比手写
BlockingCollection<t></t>
+
Task
组合更轻量、无锁、原生支持
async/await

常见错误是试图用

ConcurrentQueue<t></t>
自己封装 awaitable 操作——它本身不提供异步等待能力,强行加
Task.Delay
或轮询会浪费 CPU;也有人误用
BlockingCollection<t></t>
Take()
,它会同步阻塞线程,破坏 async 上下文。

Channel.CreateBounded<t>()</t>
创建有界通道,溢出时可配置拒绝策略(如
DropWrite
或抛异常)
Channel.CreateUnbounded<t>()</t>
适合写入吞吐优先、内存可控的场景
生产端调用
Writer.WriteAsync(item)
,消费端用
Reader.ReadAsync()
—— 两者都返回
ValueTask
,无分配开销
通道关闭后,
Reader.ReadAsync()
会完成并返回
default(T)
,需配合
TryRead
或检查
WaitToReadAsync().IsCompletedSuccessfully
var channel = Channel.CreateBounded<string>(100);
<p>// 生产者
_ = Task.Run(async () =>
{
for (int i = 0; i < 5; i++)
{
await channel.Writer.WriteAsync($"msg-{i}");
await Task.Delay(100);
}
channel.Writer.Complete();
});</p><p>// 消费者
_ = Task.Run(async () =>
{
await foreach (var msg in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"Received: {msg}");
}
});

如何正确处理消费者异常与通道终止

消费者任务崩溃或未捕获异常会导致

ReadAllAsync()
提前退出,但通道可能仍有未读项;更隐蔽的问题是:生产者调用
Complete()
后,若消费者没读完就退出,剩余数据会丢失。

必须在
await foreach
外层包
try/catch
,否则异常会静默终止迭代
若需确保所有已入队消息被处理,不要依赖
channel.Writer.Complete()
触发消费者退出——应另设信号(如
CancellationToken
)协调停机
Reader.Completion
Task
,反映消费者侧是否完成(包括异常终止),可用于
await channel.Reader.Completion
等待消费结束
避免在消费者中直接
await channel.Reader.Completion
—— 它不会等未读项,只等迭代器退出

什么时候不该用
Channel<t></t>

不是所有“队列”需求都适合

Channel
。它定位是“流式数据传输”,不提供随机访问、计数查询、中间件插拔等能力。

需要实时获取当前队列长度?
Channel.Reader.Count
只在有界通道且未被并发写入时可靠;无界通道返回 -1
要支持多个消费者竞争消费同一消息?
Channel
是单消费者语义;此时该用
ServiceBus
RabbitMQ
消息需持久化、重试、死信?
Channel
纯内存,崩溃即丢;必须外接存储层
项目还在 .NET Framework 4.8?
Channel
不可用,只能降级用
BlockingCollection<t></t>
+
GetConsumingEnumerable()
(但无法真正异步)

替代方案:
BlockingCollection<t></t>
的异步包装陷阱

有人用

Task.Run(() => collection.Take())
伪异步,这本质是线程池抢占,增加调度开销且无法取消;正确做法是仅在必须兼容旧框架时,用
TryTake
配合短时
Task.Delay
循环,但务必设超时和取消令牌。

永远不要写
await Task.Run(() => collection.Take())
—— 这违背 async/await 减少线程占用的初衷
若坚持用
BlockingCollection
,消费循环应类似:
while (collection.TryTake(out var item, 10, token)) { ... }
BlockingCollection
Add
在有界模式下可能阻塞线程,而
Channel.Writer.WriteAsync
在满时默认返回
ValueTask
并挂起,更可控

实际落地时,最易忽略的是消费者异常传播路径和通道生命周期管理——写个

await foreach
很容易,但谁负责捕获异常?谁决定何时
Complete()
?这些边界不厘清,上线后就会出现消息静默丢失或消费者假死。

相关推荐