c# 如何用 Channel 实现一个优雅的生产者消费者模型

来源:这里教程网 时间:2026-02-21 17:37:21 作者:

为什么不用
BlockingCollection
而选
Channel

因为

Channel
是 .NET Core 3.0+ 原生支持的无锁、异步优先的管道抽象,比
BlockingCollection
更轻量、更可控,尤其适合高并发、流式处理或需要细粒度控制完成信号的场景。它天然支持
async/await
,消费者可
WaitToReadAsync
,生产者可
WriteAsync
,没有线程阻塞风险。

Channel.CreateBounded
Channel.CreateUnbounded
怎么选

关键看是否需要背压(backpressure):

Channel.CreateBounded<t>(capacity)</t>
:有容量限制,写入时若满会默认 await(可配置
FullMode
),适合内存敏感或需限流的场景;
Channel.CreateUnbounded<t>()</t>
:无缓冲限制,写入永不阻塞,但可能造成内存暴涨——仅适用于生产速率极低、或后续消费绝对及时的简单测试场景。

真实项目中,优先用有界 Channel,并设合理容量(如

100
或基于吞吐预估);避免用
Unbounded
当“图省事”的方案。

如何正确关闭 Channel 并通知消费者退出

不能靠“写完就关”,必须显式调用

Writer.Complete()
,否则消费者在
ReadAsync
中会永远等待。消费者需检测
channel.Reader.TryRead(out T item)
返回
false
(表示已关闭且无剩余数据),或用
await channel.Reader.ReadAsync(ct)
配合
CancellationToken
捕获完成信号。

var channel = Channel.CreateBounded<string>(10);
// 生产者
_ = Task.Run(async () =>
{
    for (int i = 0; i < 5; i++)
    {
        await channel.Writer.WriteAsync($"msg-{i}");
        await Task.Delay(100);
    }
    channel.Writer.Complete(); // 必须调用!
});
// 消费者
while (await channel.Reader.WaitToReadAsync())
{
    while (channel.Reader.TryRead(out var msg))
    {
        Console.WriteLine($"Consumed: {msg}");
    }
}

常见陷阱:忘记 await Writer 或 Reader 操作

WriteAsync
ReadAsync
都是异步方法,但
TryRead
/
TryWrite
是同步非阻塞的。错误写法如:
channel.Writer.WriteAsync(...).GetAwaiter().GetResult()
会死锁(尤其在 UI 或 ASP.NET 同步上下文中);正确做法始终用
await

写入失败不抛异常?检查
channel.Writer.TryWrite
返回值,它在有界 Channel 满时返回
false
消费者卡住?确认是否漏了
Writer.Complete()
,或
WaitToReadAsync
没加超时/取消令牌;
多个消费者竞争?
Channel.Reader
天然线程安全,多个
Task
可同时
ReadAsync
,无需额外锁。

Channel 的优雅不在语法多炫,而在它把“完成传播”“背压响应”“异步解耦”全封装进两个对象(

Writer
Reader
)里——但前提是,你得让它们真正“完成”。

相关推荐