C# gRPC流式处理方法 C#如何实现服务器和客户端流

来源:这里教程网 时间:2026-02-21 17:40:09 作者:

什么是 gRPC 的双向流式 RPC?

gRPC 的

BidirectionalStreamingRpc
是唯一支持服务器和客户端同时持续发消息的调用模式。它不是“先发后收”或“发一次收多次”,而是双方各自维护独立的读写通道,可随时
WriteAsync
、随时
ReadAsync
,彼此不阻塞。

常见误用是把它当“增强版客户端流”或“带回传的服务器流”——实际它没有隐含顺序约定,应用层必须自行定义协议(比如用

messageType
字段区分心跳、数据、结束信号)。

定义 .proto 时必须用
stream
修饰符两次

服务端流、客户端流、双向流在 .proto 中全靠

stream
出现次数区分,少一个关键字就生成完全不同的 C# 方法签名。

rpc Chat(stream ChatMessage) returns (ChatMessage)
→ 客户端流(
IAsyncEnumerable<t></t>
入,单次返回)
rpc Chat(ChatMessage) returns (stream ChatMessage)
→ 服务器流(单次入,
IAsyncStreamReader<t></t>
出)
rpc Chat(stream ChatMessage) returns (stream ChatMessage)
→ 双向流(两个
stream
)→ 生成
Task Chat(IAsyncStreamReader<chatmessage>, IServerStreamWriter<chatmessage>, ServerCallContext)</chatmessage></chatmessage>

如果生成的 C# 类里没看到

IServerStreamWriter<t></t>
参数,八成是 .proto 少写了一个
stream

客户端发起双向流:别卡在
await foreach
里等响应

典型错误是写成:

var call = client.Chat();
await call.RequestStream.WriteAsync(new ChatMessage { Text = "hi" });
await foreach (var msg in call.ResponseStream.ReadAllAsync()) { /* ... */ } // ❌ 阻塞在此,后续 WriteAsync 不会执行

正确做法是并发驱动读写:

Task.Run(() => WriteLoop(call.RequestStream))
单独发消息
await foreach
在主线程收消息
或用
Channel<t></t>
+
Writer
/
Reader
解耦生产和消费

注意

RequestStream.CompleteAsync()
必须显式调用,否则服务端
ReadAsync()
永远不会返回
null

服务端处理双向流:避免在循环里 await 所有操作

最易被忽略的是线程调度开销。下面这段代码在高并发下会迅速堆积

Task

while (await requestStream.ReadAsync(out var req)) {
    var resp = Process(req);
    await responseStream.WriteAsync(resp); // ❌ 每次 WriteAsync 都可能跨线程调度
}

优化方向:

批量写入:缓存多个响应,用
WriteBatchAsync
(需自实现或用
Channel<t></t>
聚合)
取消检查:在循环头部加
context.CancellationToken.ThrowIfCancellationRequested()
,否则客户端断连后服务端仍空转
异常隔离:
ReadAsync
抛异常时,
WriteAsync
可能已失败,不要假设响应一定送达

真正难调试的不是连接断开,而是某次

WriteAsync
因网络抖动超时后,后续所有写入都静默失败——因为 gRPC 的
IServerStreamWriter
不抛异常,只记录日志。

相关推荐