用 Channel<t></t>
搭建可扩展的处理阶段
核心是把每个处理步骤抽象为独立的
Task,通过
Channel<t></t>连接——它比
BlockingCollection<t></t>更轻量、支持异步读写,且天然适配
async/await。每个阶段消费上游
Channel.Reader,处理后写入下游
Channel.Writer,彼此解耦。
关键点:
用Channel.CreateBounded<t>(capacity)</t>控制背压,避免内存爆炸;容量设为 100~1000 常见(太小易阻塞,太大失衡) 每个阶段必须调用
reader.Completion.WaitAsync()等待上游关闭,再调用
writer.Complete()不要在管道中直接
await外部 I/O(如 HTTP 请求)而不限流——否则并发数会失控
var input = Channel.CreateBounded<string>(100);
var processed = Channel.CreateBounded<int>(100);
_ = Task.Run(async () =>
{
await foreach (var item in input.Reader.ReadAllAsync())
{
var result = item.Length; // 模拟处理
await processed.Writer.WriteAsync(result);
}
processed.Writer.Complete();
});
动态扩缩容:按负载调整并行度
Parallel.ForEachAsync本身不支持运行时调速,但你可以把「单个处理单元」封装成可取消、可计数的任务,并用
SemaphoreSlim控制并发上限。扩容不是加线程,而是动态调节信号量的
CurrentCount。
常见错误:
直接 new Thread() 或 Parallel.Invoke —— 绕过 .NET 线程池,导致上下文切换开销飙升 用Task.Run包裹 CPU 密集型操作却不设
TaskCreationOptions.LongRunning,抢占 ThreadPool 线程影响其他请求 信号量未在异常路径下调用
Release(),导致后续任务永久挂起
var throttle = new SemaphoreSlim(4, 4); // 初始并发=4
async Task ProcessItem(string item)
{
await throttle.WaitAsync();
try
{
await Task.Run(() => HeavyCompute(item)); // CPU 密集型
}
finally
{
throttle.Release();
}
}
错误隔离与重试:每个阶段独立失败不影响全局
管道里一个环节抛出未捕获异常,会导致整个
Channel.Reader中断,下游收不到后续数据。必须在每个阶段内做粒度更细的错误处理——不是 try/catch 全包,而是对单条数据失败时记录日志、跳过、或转入死信通道。
推荐做法:
定义Result<t></t>类型(如
ValueTask<result>></result>),让处理函数显式返回成功/失败 失败项写入单独的
Channel<faileditem></faileditem>,由后台任务统一归档或告警 重试仅限瞬时错误(如 HTTP 503),用
Polly的
AsyncRetryPolicy包裹具体调用,而非包裹整个
ReadAllAsync循环
await foreach (var item in input.Reader.ReadAllAsync())
{
var result = await ProcessWithRetryAsync(item).ConfigureAwait(false);
if (result.IsSuccess)
await output.Writer.WriteAsync(result.Value);
else
await deadLetter.Writer.WriteAsync(new FailedItem(item, result.Error));
}
监控与诊断:别等崩溃才看吞吐量
并发管道最难调试的是“卡顿”和“假死”——表面没报错,但数据积压、延迟飙升。必须在每个
Channel上暴露实时指标: 用
channel.Reader.Count和
channel.Writer.Count监控队列深度(注意:只读属性,无锁) 记录每个阶段的平均处理耗时(用
Stopwatch,别用
DateTime.Now) 在
Channel.Reader关闭前,检查
reader.Completion.IsCompletedSuccessfully是否为 false,判断是否因异常终止
真正容易被忽略的是:当上游生产速度远高于下游处理能力时,
Channel的
WriteAsync会开始等待,而你可能根本没 await 它——结果就是写入协程被挂起,但主线程毫无感知。
