c# IAsyncEnumerable 和 gRPC 双向流的结合使用

来源:这里教程网 时间:2026-02-21 17:39:17 作者:

gRPC 双向流方法签名必须返回
IAsyncEnumerable<tresponse></tresponse>

gRPC .NET 6+ 生成的双向流(

bidi streaming
)服务端方法,其返回类型固定为
IAsyncEnumerable<tresponse></tresponse>
,客户端调用时也需用
await foreach
消费。这不是可选项——如果手写服务端方法返回
Task
IObservable<t></t>
,gRPC 运行时会直接抛出
InvalidOperationException
:“Method does not return IAsyncEnumerable”。

生成代码示例(由

.proto
文件编译而来):

public virtual async Task BidirectionalStreamingCall(
    IAsyncEnumerable<HelloRequest> requestStream,
    IServerStreamWriter<HelloReply> responseStream,
    ServerCallContext context)
{
    await foreach (var req in requestStream)
    {
        await responseStream.WriteAsync(new HelloReply { Message = $"Hello, {req.Name}!" });
    }
}

注意:这里服务端逻辑是「读请求流 → 写响应流」,但实际双向交互中,你往往需要在写响应的同时监听新请求,或按条件提前退出。此时不能只依赖单层

await foreach

服务端主动推送需配合
CancellationToken
和手动
WriteAsync

IAsyncEnumerable
本身是拉取模型(pull-based),而双向流的真实需求常是推模型(push-based):比如后台任务持续产出数据、事件总线触发通知、或定时心跳。这时不能把所有逻辑塞进
await foreach (req in requestStream)
里——它会阻塞直到客户端发来下一条请求。

正确做法是:启动独立任务处理推送逻辑,并用

IServerStreamWriter<t>.WriteAsync()</t>
主动写入;同时监听
context.CancellationToken
确保连接断开时及时取消。

不要用
yield return
模拟推送(性能差、无法响应取消)
必须检查
context.CancellationToken.IsCancellationRequested
await context.CancellationToken.WaitHandle
避免写入已关闭的流
WriteAsync
是线程安全的,多个任务可并发调用,但 gRPC 不保证顺序——若需严格序,加锁或串行化写入

客户端消费时
await foreach
会隐式等待流结束

客户端调用双向流后,

await foreach (var reply in call.ResponseStream)
会一直挂起,直到服务端完成(即
BidirectionalStreamingCall
方法返回)、或连接异常中断、或显式调用
call.RequestStream.CompleteAsync()
关闭请求流。

常见误操作:

忘记调用
await call.RequestStream.CompleteAsync()
,导致服务端永远等不到请求流结束,也就不会退出
await foreach
循环
在循环中抛出未捕获异常,使
await foreach
提前退出,但服务端仍在写入 → 触发
RpcException
:“Status(StatusCode=Unknown, Detail="Stream removed"")
未设置
CallOptions
Deadline
,长连接无超时,网络闪断后客户端卡死

安全写法示例:

using var call = client.BidirectionalStreamingCall();
_ = Task.Run(async () =>
{
    await foreach (var req in requests)
    {
        await call.RequestStream.WriteAsync(req);
    }
    await call.RequestStream.CompleteAsync(); // 必须!
});
await foreach (var reply in call.ResponseStream)
{
    Console.WriteLine(reply.Message);
}

序列化与背压问题:避免
WriteAsync
积压引发 OOM

gRPC 默认不启用背压(backpressure):服务端连续调用

WriteAsync
,数据会在内存中排队,直到 TCP 窗口满或客户端消费过慢。极端情况下,大量未确认消息堆积在
IServerStreamWriter
内部缓冲区,触发
OutOfMemoryException

缓解方式只有两种:

在写入前检查
context.CancellationToken
,并在循环中加入轻量级
await Task.Yield()
让出控制权(不推荐高频使用)
改用
WriteAsync(T, CancellationToken)
重载,并传入一个能响应客户端消费速度的 token —— 但 .NET gRPC 目前不暴露底层流的可写信号,所以**实际唯一可控手段是业务层限速**:例如每秒最多推送 N 条,或根据上一条
WriteAsync
的耗时动态调整间隔

没有银弹。如果你的场景要求高吞吐低延迟,且客户端处理能力波动大,得在协议层加应答机制(如客户端每收到 K 条就发一个

Ack
),服务端据此控制发送节奏。

相关推荐