为什么直接用 Channel<t></t>
做批处理容易丢数据
因为
Channel<t></t>本身不提供“等凑够 N 条再发”或“超时强制提交”的语义。你如果只靠
Channel.Reader.ReadAsync()一条条读,就退化成单条处理;如果自己加循环
TryRead拼批次,又得手动管超时、取消、边界条件——稍不留神,
Writer关闭时未读完的数据就丢了,或者批次卡住不触发。
核心矛盾在于:Channel 是流式传输原语,不是批处理原语。必须在它之上封装一层协调逻辑。
用 Channel<t></t>
+ Timer
实现可靠批处理的关键点
推荐用一个后台
Task持续从
Channel.Reader尝试批量读取,同时用
System.Threading.Timer触发“兜底提交”。注意三点:
Timer的回调必须是线程安全的,且不能阻塞(比如别在里面 await 或调 long-running 方法) 每次读取前检查
Reader.Completion.IsCompleted,避免在 Channel 关闭后还尝试读 批次收集必须用
List<t></t>而非数组,且每次提交后要清空,不能复用引用(否则并发写会乱)
private async Task BatchingLoopAsync(CancellationToken ct)
{
var batch = new List<MyData>();
var timer = new Timer(_ => { _ = FlushBatchAsync(batch, ct); }, null, Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
try
{
while (!ct.IsCancellationRequested && await _channel.Reader.WaitToReadAsync(ct).ConfigureAwait(false))
{
while (_channel.Reader.TryRead(out var item))
{
batch.Add(item);
if (batch.Count >= _batchSize)
{
await FlushBatchAsync(batch, ct).ConfigureAwait(false);
batch.Clear();
timer.Change(_flushInterval, Timeout.InfiniteTimeSpan);
}
}
// 每次有新数据进来,重置定时器(实现“最后一条进来后等 flushInterval 再提交”)
timer.Change(_flushInterval, Timeout.InfiniteTimeSpan);
}
}
finally
{
timer.Dispose();
if (batch.Count > 0)
await FlushBatchAsync(batch, ct).ConfigureAwait(false);
}
}
FlushBatchAsync
必须支持取消且不能吞异常
这是最容易出问题的一环:如果
FlushBatchAsync里调用的是外部 HTTP API 或数据库写入,它可能耗时、可能失败、可能被取消。必须显式传递
CancellationToken,并在 catch 块中区分
OperationCanceledException和其他异常。 遇到
OperationCanceledException:直接 return,不要重试,因为上层已要求停止 遇到其他异常:记录日志,但不要 throw —— 否则整个
BatchingLoopAsync会退出,后续数据全丢 如果需要重试,应在
FlushBatchAsync内部做(比如用
Polly),而不是让外层循环崩溃
private async Task FlushBatchAsync(List<MyData> batch, CancellationToken ct)
{
try
{
await _httpClient.PostAsJsonAsync("/api/batch", batch, ct).ConfigureAwait(false);
}
catch (OperationCanceledException) when (ct.IsCancellationRequested)
{
// 正常退出路径,不记日志
return;
}
catch (Exception ex) when (!(ex is OperationCanceledException))
{
_logger.LogError(ex, "Failed to flush batch of {Count} items", batch.Count);
// 不 throw,继续下一轮
}
finally
{
batch.Clear(); // 确保清空,避免引用残留
}
}
注册为 IHostedService
时要注意生命周期绑定
Channel 的
Writer和
Reader都需要和宿主生命周期对齐。常见错误是把
Channel.CreateBounded<t>()</t>放在构造函数里,但没在
StopAsync中显式调用
Writer.Complete(),导致
BatchingLoopAsync永远等在
WaitToReadAsync上,服务无法正常退出。
StartAsync中启动
BatchingLoopAsync并用
Task.Run或
BackgroundService托管
StopAsync中先调
_channel.Writer.Complete(),再
await _batchingTask等它自然结束 别在
Dispose里做任何异步清理——IHostedService 的 Dispose 是同步的
真正难的是边界情况:比如 StopAsync 被调用时,FlushBatchAsync 正在发请求,这时 CancelToken 触发,你得确保那个 HTTP 请求真能被取消(HttpClient 默认支持),而不是留下悬挂连接。
