用 C# 连接 RabbitMQ,推荐直接使用 MassTransit —— 它不是简单的封装,而是成熟的分布式消息通信框架,内置对 RabbitMQ 的深度支持,自动处理连接管理、重连、序列化、消费者生命周期等细节,比手写 Raw RabbitMQ.Client 更安全、更省心。
安装 MassTransit 和 RabbitMQ 客户端依赖
在项目中通过 NuGet 安装两个核心包:
MassTransit(主框架) MassTransit.RabbitMQ(RabbitMQ 传输适配器)命令行执行:
dotnet add package MassTransit<br>dotnet add package MassTransit.RabbitMQ
注意:无需单独安装 RabbitMQ.Client,MassTransit.RabbitMQ 已包含兼容版本。
配置 MassTransit 服务(.NET 6/7/8 推荐方式)
在
Program.cs中注册 MassTransit,并指定 RabbitMQ 连接地址和虚拟主机:
builder.Services.AddMassTransit(x =><br>{<br> x.UsingRabbitMq((context, cfg) =><br> {<br> cfg.Host("localhost", "/", h =><br> {<br> h.Username("guest");<br> h.Password("guest");<br> });<br> // 可选:启用自动交换/队列声明<br> cfg.ConfigureEndpoints(context);<br> });<br>});说明:
–
"localhost"是 RabbitMQ 服务地址,生产环境换成实际 IP 或域名
–
"/"是虚拟主机(vhost),默认为 "/",如已创建自定义 vhost(如 "myapp")请替换
– 用户名密码默认是
guest/guest,生产务必修改并配置权限
定义消息契约与发送消息
消息必须是 public class,推荐实现空接口标记(非必须但利于类型识别):
public record OrderSubmitted<br>{<br> public Guid OrderId { get; init; }<br> public string CustomerName { get; init; } = default!;<br>}在业务代码中注入
IPublishEndpoint发布事件:
public class OrderService<br>{<br> private readonly IPublishEndpoint _publishEndpoint;<br><br> public OrderService(IPublishEndpoint publishEndpoint)<br> => _publishEndpoint = publishEndpoint;<br><br> public async Task SubmitOrder(Guid orderId, string name)<br> {<br> await _publishEndpoint.Publish(new OrderSubmitted<br> {<br> OrderId = orderId,<br> CustomerName = name<br> });<br> }<br>}发布即发往默认交换机(
amq.topic),MassTransit 自动路由到匹配的消费者队列。
编写消费者接收并处理消息
新建一个类实现
IConsumer<t></t>:
public class OrderSubmittedConsumer : IConsumer<OrderSubmitted><br>{<br> public async Task Consume(ConsumeContext<OrderSubmitted> context)<br> {<br> var message = context.Message;<br> Console.WriteLine($"收到订单: {message.OrderId} - {message.CustomerName}");<br><br> // 模拟处理逻辑(如保存数据库、调用下游服务)<br> await Task.Delay(100);<br><br> // 处理成功,自动 ACK;抛出异常则 NACK 并可能进入重试或死信队列<br> }<br>}注册消费者(仍在
AddMassTransit配置块内):
x.AddConsumer<OrderSubmittedConsumer>();
并在
ConfigureEndpoints前加上:
cfg.ReceiveEndpoint("order-queue", e =><br>{<br> e.ConfigureConsumer<OrderSubmittedConsumer>(context);<br>});这样 MassTransit 就会创建名为
order-queue的队列,绑定到默认交换机,监听
OrderSubmitted类型消息。
基本上就这些。MassTransit 把 RabbitMQ 的复杂性屏蔽得很好,你专注定义消息和业务逻辑即可。不复杂但容易忽略的是:确保 RabbitMQ 服务已启动、vhost 存在、用户有对应权限,以及开发时打开管理界面(http://localhost:15672)实时观察队列和消息流动,排查问题快得多。
