Channel
创建和配置 Channel
Channel 有无界(Unbounded)和有界(Bounded)两种类型,选择取决于你对内存控制和背压(backpressure)的需求:
无界 Channel:缓冲区无限增长,适合吞吐优先、不担心内存溢出的场景(如日志采集)var channel = Channel.CreateUnbounded
var channel = Channel.CreateBounded
写入数据(Producer)
使用 Writer 写入,支持同步和异步方式。推荐用 WriteAsync 配合 await,尤其对有界 Channel 能自然等待空闲空间:
写入单个值:await channel.Writer.WriteAsync("hello"); 批量写入(高效):await channel.Writer.WriteAsync(new[] {"a", "b", "c"}); 标记写入完成(通知消费者停止读取):channel.Writer.Complete();读取数据(Consumer)
使用 Reader 读取,核心是 ReadAsync —— 它会挂起直到有数据或 Channel 关闭:
基础读取循环:while (await channel.Reader.WaitToReadAsync())
while (channel.Reader.TryRead(out var item))
Console.WriteLine(item); 更简洁写法(推荐):
await foreach (var item in channel.Reader.ReadAllAsync())
Console.WriteLine(item); 注意:ReadAllAsync 会在 Writer.Complete() 后自动退出循环,无需手动判断。
组合多个 Channel 实现管道
Channel 天然适合构建链式处理管道。例如:接收原始日志 → 过滤 → 格式化 → 输出:
启动一个后台任务做过滤:_ = Task.Run(async () => {
await foreach (var line in input.Reader.ReadAllAsync())
if (line.Contains("ERROR"))
await filtered.Writer.WriteAsync(line);
filtered.Writer.Complete();
}); 后续环节消费
filteredChannel,彼此完全解耦,且自动异步流控。
基本上就这些。Channel 不复杂但容易忽略两点:一是别忘了调用
Complete()告诉消费者“没新数据了”,二是有界 Channel 的容量设置要结合实际吞吐和延迟容忍度来权衡。
