什么是 gRPC 的双向流式 RPC?
gRPC 的
BidirectionalStreamingRpc是唯一支持服务器和客户端同时持续发消息的调用模式。它不是“先发后收”或“发一次收多次”,而是双方各自维护独立的读写通道,可随时
WriteAsync、随时
ReadAsync,彼此不阻塞。
常见误用是把它当“增强版客户端流”或“带回传的服务器流”——实际它没有隐含顺序约定,应用层必须自行定义协议(比如用
messageType字段区分心跳、数据、结束信号)。
定义 .proto 时必须用 stream
修饰符两次
服务端流、客户端流、双向流在 .proto 中全靠
stream出现次数区分,少一个关键字就生成完全不同的 C# 方法签名。
rpc Chat(stream ChatMessage) returns (ChatMessage)→ 客户端流(
IAsyncEnumerable<t></t>入,单次返回)
rpc Chat(ChatMessage) returns (stream ChatMessage)→ 服务器流(单次入,
IAsyncStreamReader<t></t>出)
rpc Chat(stream ChatMessage) returns (stream ChatMessage)→ 双向流(两个
stream)→ 生成
Task Chat(IAsyncStreamReader<chatmessage>, IServerStreamWriter<chatmessage>, ServerCallContext)</chatmessage></chatmessage>
如果生成的 C# 类里没看到
IServerStreamWriter<t></t>参数,八成是 .proto 少写了一个
stream。
客户端发起双向流:别卡在 await foreach
里等响应
典型错误是写成:
var call = client.Chat();
await call.RequestStream.WriteAsync(new ChatMessage { Text = "hi" });
await foreach (var msg in call.ResponseStream.ReadAllAsync()) { /* ... */ } // ❌ 阻塞在此,后续 WriteAsync 不会执行
正确做法是并发驱动读写:
用Task.Run(() => WriteLoop(call.RequestStream))单独发消息 用
await foreach在主线程收消息 或用
Channel<t></t>+
Writer/
Reader解耦生产和消费
注意
RequestStream.CompleteAsync()必须显式调用,否则服务端
ReadAsync()永远不会返回
null。
服务端处理双向流:避免在循环里 await 所有操作
最易被忽略的是线程调度开销。下面这段代码在高并发下会迅速堆积
Task:
while (await requestStream.ReadAsync(out var req)) {
var resp = Process(req);
await responseStream.WriteAsync(resp); // ❌ 每次 WriteAsync 都可能跨线程调度
}
优化方向:
批量写入:缓存多个响应,用WriteBatchAsync(需自实现或用
Channel<t></t>聚合) 取消检查:在循环头部加
context.CancellationToken.ThrowIfCancellationRequested(),否则客户端断连后服务端仍空转 异常隔离:
ReadAsync抛异常时,
WriteAsync可能已失败,不要假设响应一定送达
真正难调试的不是连接断开,而是某次
WriteAsync因网络抖动超时后,后续所有写入都静默失败——因为 gRPC 的
IServerStreamWriter不抛异常,只记录日志。
