c# 如何用 Channel 实现一个批处理(Batching)的后台服务

来源:这里教程网 时间:2026-02-21 17:36:33 作者:

为什么直接用
Channel<t></t>
做批处理容易丢数据

因为

Channel<t></t>
本身不提供“等凑够 N 条再发”或“超时强制提交”的语义。你如果只靠
Channel.Reader.ReadAsync()
一条条读,就退化成单条处理;如果自己加循环
TryRead
拼批次,又得手动管超时、取消、边界条件——稍不留神,
Writer
关闭时未读完的数据就丢了,或者批次卡住不触发。

核心矛盾在于:Channel 是流式传输原语,不是批处理原语。必须在它之上封装一层协调逻辑。

Channel<t></t>
+
Timer
实现可靠批处理的关键点

推荐用一个后台

Task
持续从
Channel.Reader
尝试批量读取,同时用
System.Threading.Timer
触发“兜底提交”。注意三点:

Timer
的回调必须是线程安全的,且不能阻塞(比如别在里面 await 或调 long-running 方法)
每次读取前检查
Reader.Completion.IsCompleted
,避免在 Channel 关闭后还尝试读
批次收集必须用
List<t></t>
而非数组,且每次提交后要清空,不能复用引用(否则并发写会乱)
private async Task BatchingLoopAsync(CancellationToken ct)
{
    var batch = new List<MyData>();
    var timer = new Timer(_ => { _ = FlushBatchAsync(batch, ct); }, null, Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
    try
    {
        while (!ct.IsCancellationRequested && await _channel.Reader.WaitToReadAsync(ct).ConfigureAwait(false))
        {
            while (_channel.Reader.TryRead(out var item))
            {
                batch.Add(item);
                if (batch.Count >= _batchSize)
                {
                    await FlushBatchAsync(batch, ct).ConfigureAwait(false);
                    batch.Clear();
                    timer.Change(_flushInterval, Timeout.InfiniteTimeSpan);
                }
            }
            // 每次有新数据进来,重置定时器(实现“最后一条进来后等 flushInterval 再提交”)
            timer.Change(_flushInterval, Timeout.InfiniteTimeSpan);
        }
    }
    finally
    {
        timer.Dispose();
        if (batch.Count > 0)
            await FlushBatchAsync(batch, ct).ConfigureAwait(false);
    }
}

FlushBatchAsync
必须支持取消且不能吞异常

这是最容易出问题的一环:如果

FlushBatchAsync
里调用的是外部 HTTP API 或数据库写入,它可能耗时、可能失败、可能被取消。必须显式传递
CancellationToken
,并在 catch 块中区分
OperationCanceledException
和其他异常。

遇到
OperationCanceledException
:直接 return,不要重试,因为上层已要求停止
遇到其他异常:记录日志,但不要 throw —— 否则整个
BatchingLoopAsync
会退出,后续数据全丢
如果需要重试,应在
FlushBatchAsync
内部做(比如用
Polly
),而不是让外层循环崩溃
private async Task FlushBatchAsync(List<MyData> batch, CancellationToken ct)
{
    try
    {
        await _httpClient.PostAsJsonAsync("/api/batch", batch, ct).ConfigureAwait(false);
    }
    catch (OperationCanceledException) when (ct.IsCancellationRequested)
    {
        // 正常退出路径,不记日志
        return;
    }
    catch (Exception ex) when (!(ex is OperationCanceledException))
    {
        _logger.LogError(ex, "Failed to flush batch of {Count} items", batch.Count);
        // 不 throw,继续下一轮
    }
    finally
    {
        batch.Clear(); // 确保清空,避免引用残留
    }
}

注册为
IHostedService
时要注意生命周期绑定

Channel 的

Writer
Reader
都需要和宿主生命周期对齐。常见错误是把
Channel.CreateBounded<t>()</t>
放在构造函数里,但没在
StopAsync
中显式调用
Writer.Complete()
,导致
BatchingLoopAsync
永远等在
WaitToReadAsync
上,服务无法正常退出。

StartAsync
中启动
BatchingLoopAsync
并用
Task.Run
BackgroundService
托管
StopAsync
中先调
_channel.Writer.Complete()
,再
await _batchingTask
等它自然结束
别在
Dispose
里做任何异步清理——IHostedService 的 Dispose 是同步的

真正难的是边界情况:比如 StopAsync 被调用时,FlushBatchAsync 正在发请求,这时 CancelToken 触发,你得确保那个 HTTP 请求真能被取消(HttpClient 默认支持),而不是留下悬挂连接。

相关推荐

热文推荐