Saga 模式在 C# 中用 MassTransit 实现补偿逻辑
Saga 是处理长时间运行、跨服务事务的主流方案,C# 生态里最成熟的选择是
MassTransit—— 它原生支持基于状态机(
StateMachineSaga)的 Saga 管理,自动持久化、幂等性、重试和补偿都内置了。
关键不是“手写事务日志”,而是定义状态流转和对应命令。比如订单创建后扣库存失败,必须触发“取消预留库存”操作,这个补偿动作由
Compensate方法声明,不是靠 try-catch 手动调。 必须实现
ISagaRepository<t></t>,推荐用
EntityFrameworkSagaRepository存到 SQL Server 或 PostgreSQL 每个 Saga 实体需有唯一
CorrelationId(通常用
Guid),所有消息必须携带它,否则状态无法关联 超时控制用
RequestTimeout+
When(Timeout),别依赖外部定时器轮询 不要在
When()里直接调用 HTTP API;应发新命令(如
ReserveInventoryCommand),让下游消费者处理,保证解耦
public class OrderSaga : MassTransitStateMachine<OrderState>
{
public State Submitted { get; private set; }
public State InventoryReserved { get; private set; }
public Event<SubmitOrder> SubmitOrder { get; private set; }
public Event<InventoryReserved> InventoryReserved { get; private set; }
public Event<InventoryReservationFailed> InventoryReservationFailed { get; private set; }
<pre class='brush:php;toolbar:false;'>public OrderSaga()
{
InstanceState(x => x.CurrentState);
Event(() => SubmitOrder);
Event(() => InventoryReserved);
Event(() => InventoryReservationFailed);
Initially(
When(SubmitOrder)
.Then(ctx => ctx.Instance.OrderId = ctx.Data.OrderId)
.TransitionTo(Submitted)
.Send(context => new ReserveInventoryCommand(context.Instance.OrderId)));
During(Submitted,
When(InventoryReserved)
.TransitionTo(InventoryReserved),
When(InventoryReservationFailed)
.Call(ctx => Console.WriteLine($"Rollback for {ctx.Instance.OrderId}"))
.Compensate(ctx => new CancelInventoryReservationCommand(ctx.Instance.OrderId)));
}}
两阶段提交(2PC)在 C# 中不推荐直接实现
.NET 原生没有跨服务 2PC 支持,
System.Transactions.TransactionScope只适用于同进程内多个
SqlConnection或支持 MSDTC 的资源,一旦涉及 HTTP、RabbitMQ、第三方 API,它就完全失效——不是功能限制,是协议层面不兼容。
常见误用:用
TransactionScope包住 EF Core SaveChanges 和 HttpClient.PostAsync,以为能原子提交。实际结果是数据库改了,HTTP 请求失败,没人回滚数据库。
TransactionScope要求所有参与者实现
IEnlistmentNotification,而 REST API、Kafka Producer、gRPC Client 都不实现它 启用 MSDTC 在容器或云环境几乎不可行,且性能差、故障面大,超时默认 10 分钟,容易卡死资源 即使本地多 DB 场景,EF Core 6+ 的
BeginTransactionAsync(isolationLevel)也比
TransactionScope更可控、无隐式分布式事务风险
什么时候该选 Saga,什么时候绕开分布式事务
核心判断依据是“业务是否允许中间态 + 补偿可行性”。例如电商下单:用户看到“已提交”,库存显示“已预留”,这是合法中间态;若支付失败,发补偿指令取消预留,整个流程可自愈。
强一致性要求场景(如银行实时转账)→ 别用微服务拆,合并在单库单服务里用 ACID 跨组织/第三方系统(如调微信支付、对接物流 SaaS)→ 只能靠 Saga + 对账,2PC 根本不存在 高吞吐写入(如 IoT 设备上报)→ 用最终一致性 + 幂等写入,连 Saga 状态机都可能成为瓶颈,改用事件溯源 + 状态投影 临时性数据(如购物车)→ 用 Redis + 过期时间,根本不需要事务语义MassTransit Saga 的坑:持久化与幂等性没配对就等于没做
很多人只写状态机,却忘了配持久化,导致重启后 Saga 状态丢失,消息重复消费时无法识别“这单我已经处理过了”,直接双倍扣库存。
另一个高频问题是补偿消息没加幂等键。比如
CancelInventoryReservationCommand被重发三次,库存就多加三次。正确做法是在命令里带
RequestId,消费者用该字段做去重(如存到 Redis Set 或 DB 唯一索引)。 EF Core 迁移必须包含
DbContext中的
ISagaRepository表(如
OrderState),否则启动报
InvalidOperation: No saga repository configured所有入站消息(
SubmitOrder,
InventoryReserved)必须设置
MessageId和
CorrelationId,否则
ConsumeContext<t></t>拿不到上下文 本地测试时禁用重试(
UseInMemoryOutbox()+
DisableRetry()),否则补偿逻辑会被干扰,难以验证状态流转
分布式事务从来不是“怎么实现”,而是“哪些地方根本不能分布”。Saga 不是银弹,但它是目前 C# 微服务里最靠谱的落地路径;硬上 2PC,多数时候只是把问题从代码移到运维日志里。
