gRPC 双向流方法签名必须返回 IAsyncEnumerable<tresponse></tresponse>
gRPC .NET 6+ 生成的双向流(
bidi streaming)服务端方法,其返回类型固定为
IAsyncEnumerable<tresponse></tresponse>,客户端调用时也需用
await foreach消费。这不是可选项——如果手写服务端方法返回
Task或
IObservable<t></t>,gRPC 运行时会直接抛出
InvalidOperationException:“Method does not return IAsyncEnumerable”。
生成代码示例(由
.proto文件编译而来):
public virtual async Task BidirectionalStreamingCall(
IAsyncEnumerable<HelloRequest> requestStream,
IServerStreamWriter<HelloReply> responseStream,
ServerCallContext context)
{
await foreach (var req in requestStream)
{
await responseStream.WriteAsync(new HelloReply { Message = $"Hello, {req.Name}!" });
}
}
注意:这里服务端逻辑是「读请求流 → 写响应流」,但实际双向交互中,你往往需要在写响应的同时监听新请求,或按条件提前退出。此时不能只依赖单层
await foreach。
服务端主动推送需配合 CancellationToken
和手动 WriteAsync
IAsyncEnumerable本身是拉取模型(pull-based),而双向流的真实需求常是推模型(push-based):比如后台任务持续产出数据、事件总线触发通知、或定时心跳。这时不能把所有逻辑塞进
await foreach (req in requestStream)里——它会阻塞直到客户端发来下一条请求。
正确做法是:启动独立任务处理推送逻辑,并用
IServerStreamWriter<t>.WriteAsync()</t>主动写入;同时监听
context.CancellationToken确保连接断开时及时取消。 不要用
yield return模拟推送(性能差、无法响应取消) 必须检查
context.CancellationToken.IsCancellationRequested或
await context.CancellationToken.WaitHandle避免写入已关闭的流
WriteAsync是线程安全的,多个任务可并发调用,但 gRPC 不保证顺序——若需严格序,加锁或串行化写入
客户端消费时 await foreach
会隐式等待流结束
客户端调用双向流后,
await foreach (var reply in call.ResponseStream)会一直挂起,直到服务端完成(即
BidirectionalStreamingCall方法返回)、或连接异常中断、或显式调用
call.RequestStream.CompleteAsync()关闭请求流。
常见误操作:
忘记调用await call.RequestStream.CompleteAsync(),导致服务端永远等不到请求流结束,也就不会退出
await foreach循环 在循环中抛出未捕获异常,使
await foreach提前退出,但服务端仍在写入 → 触发
RpcException:“Status(StatusCode=Unknown, Detail="Stream removed"") 未设置
CallOptions的
Deadline,长连接无超时,网络闪断后客户端卡死
安全写法示例:
using var call = client.BidirectionalStreamingCall();
_ = Task.Run(async () =>
{
await foreach (var req in requests)
{
await call.RequestStream.WriteAsync(req);
}
await call.RequestStream.CompleteAsync(); // 必须!
});
await foreach (var reply in call.ResponseStream)
{
Console.WriteLine(reply.Message);
}
序列化与背压问题:避免 WriteAsync
积压引发 OOM
gRPC 默认不启用背压(backpressure):服务端连续调用
WriteAsync,数据会在内存中排队,直到 TCP 窗口满或客户端消费过慢。极端情况下,大量未确认消息堆积在
IServerStreamWriter内部缓冲区,触发
OutOfMemoryException。
缓解方式只有两种:
在写入前检查context.CancellationToken,并在循环中加入轻量级
await Task.Yield()让出控制权(不推荐高频使用) 改用
WriteAsync(T, CancellationToken)重载,并传入一个能响应客户端消费速度的 token —— 但 .NET gRPC 目前不暴露底层流的可写信号,所以**实际唯一可控手段是业务层限速**:例如每秒最多推送 N 条,或根据上一条
WriteAsync的耗时动态调整间隔
没有银弹。如果你的场景要求高吞吐低延迟,且客户端处理能力波动大,得在协议层加应答机制(如客户端每收到 K 条就发一个
Ack),服务端据此控制发送节奏。
