c# 使用IHostedService实现后台高并发任务

来源:这里教程网 时间:2026-02-21 17:36:36 作者:

为什么 IHostedService 不适合直接处理高并发任务

IHostedService
StartAsync
StopAsync
是单次调用、生命周期绑定宿主的机制,它本身不提供任务队列、限流、重试或并发控制能力。直接在
ExecuteAsync
中用
Task.Run
或无限循环
await Task.Delay
拉起大量任务,极易导致线程池饥饿、内存泄漏或取消信号丢失。

常见错误现象:
OperationCanceledException
频繁抛出但未被正确捕获;
TaskScheduler.UnobservedTaskException
触发崩溃;GC 压力陡增,
ThreadPool.GetAvailableThreads
返回值持续为 0
根本原因:把
IHostedService
当作“后台线程工厂”而非“生命周期协调器”,忽略了其设计初衷是托管长期运行的协调逻辑(如启动调度器、注册监听器),而非执行业务任务本身
正确分工:用
IHostedService
启动一个独立的任务调度器(如
BackgroundService
子类 + 内部
Channel<t></t>
),实际任务交由
Task.Run
+ 自定义
TaskScheduler
ThreadPool.QueueUserWorkItem
承载,且必须配限流

用 BackgroundService + Channel 实现可控并发消费

继承

BackgroundService
(它是
IHostedService
的推荐实现)并搭配
System.Threading.Channels.Channel<t></t>
,能天然支持异步背压、取消传播和有序消费。关键不是“多开几个 Task”,而是“稳住入口、控住出口”。

Channel<t></t>
Channel.CreateBounded
:硬限制待处理任务数,避免 OOM;
SingleWriter = true
可提升吞吐,但需确保写入方单线程
消费端用
channel.Reader.ReadAllAsync(cancellationToken)
+
Parallel.ForEachAsync
控制并发度,不要用
Task.WhenAll
一次性拉取全部
务必在
ExecuteAsync
中捕获所有异常并记录,否则通道会因未处理异常而静默关闭
public class ConcurrentJobService : BackgroundService
{
    private readonly Channel<JobData> _channel = Channel.CreateBounded<JobData>(new BoundedChannelOptions(1000)
    {
        FullMode = BoundedChannelFullMode.Wait,
        SingleWriter = true,
        SingleReader = false
    });
    private readonly ILogger<ConcurrentJobService> _logger;
<pre class='brush:php;toolbar:false;'>public ConcurrentJobService(ILogger<ConcurrentJobService> logger) => _logger = logger;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    var reader = _channel.Reader;
    await foreach (var job in reader.ReadAllAsync(stoppingToken))
    {
        try
        {
            // 并发上限设为 8,避免打满线程池
            await Parallel.ForEachAsync(new[] { job }, new ParallelOptions
            {
                MaxDegreeOfParallelism = 8,
                CancellationToken = stoppingToken
            }, async (j, ct) =>
            {
                await ProcessJobAsync(j, ct);
            });
        }
        catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
        {
            break;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Job processing failed");
        }
    }
}
public async Task EnqueueAsync(JobData data, CancellationToken ct = default) =>
    await _channel.Writer.WriteAsync(data, ct);
private async Task ProcessJobAsync(JobData job, CancellationToken ct)
{
    // 实际业务逻辑,务必支持 ct
    await Task.Delay(100, ct);
}

}

注册与使用时的三个关键配置点

注册方式、作用域和并发参数稍有偏差,就会让前面的设计失效。重点不是“加了 Service”,而是“加对了位置和参数”。

注册必须用
AddHostedService
,不能用
AddSingleton
AddScoped
:后者不会触发
StartAsync/StopAsync
生命周期钩子
若任务需访问 scoped 服务(如
DbContext
),必须在
ProcessJobAsync
内部通过
IServiceScopeFactory
创建新 scope,不能把 scope 跨越
ExecuteAsync
传递
MaxDegreeOfParallelism
建议设为
Environment.ProcessorCount * 2
上下浮动,而非固定 100;高 IO 场景可略高,高 CPU 场景必须压低,否则
ThreadPool
会不断扩容再回收,引发抖动

取消与异常传播最容易被忽略的细节

很多人以为传入

CancellationToken
就万事大吉,其实
Channel.Reader
Parallel.ForEachAsync
、甚至
await using
的资源释放,都存在取消时机错位风险。

Channel.Writer.TryWrite
在 channel 已完成(completed)时返回
false
,但不会抛异常——必须检查返回值,否则任务静默丢失
Parallel.ForEachAsync
中若某次迭代抛出未捕获异常,整个并行块会立即终止,且其他正在运行的迭代**不会自动取消**,需手动在
catch
块中调用
ct.ThrowIfCancellationRequested()
BackgroundService.StopAsync
默认只有 5 秒超时,若任务未响应取消,会被强制 kill;应在
ProcessJobAsync
开头就调用
ct.ThrowIfCancellationRequested()
,并在长耗时操作中定期检查

真正难的不是写一个能跑的后台服务,而是让成百上千个任务在内存、线程、IO、取消信号之间不互相撕扯。Channel 的背压、ParallelOptions 的并发粒度、scope 的生命周期边界——这些地方没对齐,系统就只是“看起来在并发”,实则在慢性崩塌。

相关推荐