C#怎么使用gRPC流式传输 gRPC双向流实现方法

来源:这里教程网 时间:2026-02-21 17:43:06 作者:

什么是gRPC双向流

gRPC双向流(Bidirectional Streaming)是指客户端和服务器可以同时发送和接收多个消息,连接保持打开状态,双方消息可交错收发。它适合实时协作、聊天、日志推送、IoT设备控制等场景。和单向流(ClientStreaming/ServerStreaming)不同,双向流用一个长期连接完成全双工通信。

定义.proto文件支持双向流

.proto
文件中,把RPC方法的请求和响应类型都声明为
stream
即可:

example.proto

syntax = "proto3";
package chat;
service ChatService {
  // 双向流:客户端发多条Message,服务端也回多条Message
  rpc Chat(stream Message) returns (stream Message);
}
message Message {
  string content = 1;
  int64 timestamp = 2;
  string sender = 3;
}

生成C#代码后,会得到带

IAsyncEnumerable<t></t>
参数和返回值的异步方法。

C#客户端实现双向流调用

使用

CallOptions
不是必须的,重点是拿到
AsyncDuplexStreamingCall<trequest tresponse></trequest>
对象,它提供
RequestStream
ResponseStream
两个管道:

RequestStream.WriteAsync()
持续发消息(可多次)
ResponseStream.ReadAllAsync()
或手动遍历
IAsyncEnumerator
接收服务端消息
调用
RequestStream.CompleteAsync()
通知服务端“我不再发了”,但不影响继续收消息

示例片段:

var channel = GrpcChannel.ForAddress("https://localhost:5001");
var client = new ChatService.ChatServiceClient(channel);
using var call = client.Chat();
// 启动接收任务(后台持续读)
_ = Task.Run(async () =>
{
    await foreach (var msg in call.ResponseStream.ReadAllAsync())
    {
        Console.WriteLine($"[Server] {msg.content}");
    }
});
// 主线程发消息
await call.RequestStream.WriteAsync(new Message { Content = "Hello", Sender = "client" });
await call.RequestStream.WriteAsync(new Message { Content = "How are you?", Sender = "client" });
// 发送完毕,关闭写入端
await call.RequestStream.CompleteAsync();
// 等待服务端也结束(可选)
await call.ResponseStream.Completion;

C#服务端实现双向流处理

服务端方法签名是

Task
,参数为
IAsyncStreamReader<trequest></trequest>
,返回值为
IAsyncStreamWriter<tresponse></tresponse>

requestStream.MoveNextAsync()
await foreach
接收客户端消息
responseStream.WriteAsync()
随时向客户端推送响应
无需主动关闭流,gRPC框架会在对方断开或超时后自动清理

示例服务端逻辑:

public override async Task Chat(IAsyncStreamReader<Message> requestStream,
                                IServerStreamWriter<Message> responseStream,
                                ServerCallContext context)
{
    try
    {
        await foreach (var msg in requestStream.ReadAllAsync())
        {
            Console.WriteLine($"[Client] {msg.Content}");
            // 回复一条(也可批量或按条件回复)
            await responseStream.WriteAsync(new Message
            {
                Content = $"Echo: {msg.Content}",
                Timestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds(),
                Sender = "server"
            });
        }
    }
    catch (OperationCanceledException)
    {
        // 客户端断开或取消
    }
}

基本上就这些。双向流不复杂但容易忽略

CompleteAsync()
和异常捕获,实际部署建议加上超时、重连、心跳保活逻辑。

相关推荐

热文推荐