c# Saga模式和两阶段提交在c#中的实现

来源:这里教程网 时间:2026-02-21 17:41:26 作者:

Saga 模式在 C# 中用 MassTransit 实现补偿逻辑

Saga 是处理长时间运行、跨服务事务的主流方案,C# 生态里最成熟的选择是

MassTransit
—— 它原生支持基于状态机(
StateMachineSaga
)的 Saga 管理,自动持久化、幂等性、重试和补偿都内置了。

关键不是“手写事务日志”,而是定义状态流转和对应命令。比如订单创建后扣库存失败,必须触发“取消预留库存”操作,这个补偿动作由

Compensate
方法声明,不是靠 try-catch 手动调。

必须实现
ISagaRepository<t></t>
,推荐用
EntityFrameworkSagaRepository
存到 SQL Server 或 PostgreSQL
每个 Saga 实体需有唯一
CorrelationId
(通常用
Guid
),所有消息必须携带它,否则状态无法关联
超时控制用
RequestTimeout
+
When(Timeout)
,别依赖外部定时器轮询
不要在
When()
里直接调用 HTTP API;应发新命令(如
ReserveInventoryCommand
),让下游消费者处理,保证解耦
public class OrderSaga : MassTransitStateMachine<OrderState>
{
    public State Submitted { get; private set; }
    public State InventoryReserved { get; private set; }
    public Event<SubmitOrder> SubmitOrder { get; private set; }
    public Event<InventoryReserved> InventoryReserved { get; private set; }
    public Event<InventoryReservationFailed> InventoryReservationFailed { get; private set; }
<pre class='brush:php;toolbar:false;'>public OrderSaga()
{
    InstanceState(x => x.CurrentState);
    Event(() => SubmitOrder);
    Event(() => InventoryReserved);
    Event(() => InventoryReservationFailed);
    Initially(
        When(SubmitOrder)
            .Then(ctx => ctx.Instance.OrderId = ctx.Data.OrderId)
            .TransitionTo(Submitted)
            .Send(context => new ReserveInventoryCommand(context.Instance.OrderId)));
    During(Submitted,
        When(InventoryReserved)
            .TransitionTo(InventoryReserved),
        When(InventoryReservationFailed)
            .Call(ctx => Console.WriteLine($"Rollback for {ctx.Instance.OrderId}"))
            .Compensate(ctx => new CancelInventoryReservationCommand(ctx.Instance.OrderId)));
}

}

两阶段提交(2PC)在 C# 中不推荐直接实现

.NET 原生没有跨服务 2PC 支持,

System.Transactions.TransactionScope
只适用于同进程内多个
SqlConnection
或支持 MSDTC 的资源,一旦涉及 HTTP、RabbitMQ、第三方 API,它就完全失效——不是功能限制,是协议层面不兼容。

常见误用:用

TransactionScope
包住 EF Core SaveChanges 和 HttpClient.PostAsync,以为能原子提交。实际结果是数据库改了,HTTP 请求失败,没人回滚数据库。

TransactionScope
要求所有参与者实现
IEnlistmentNotification
,而 REST API、Kafka Producer、gRPC Client 都不实现它
启用 MSDTC 在容器或云环境几乎不可行,且性能差、故障面大,超时默认 10 分钟,容易卡死资源 即使本地多 DB 场景,EF Core 6+ 的
BeginTransactionAsync(isolationLevel)
也比
TransactionScope
更可控、无隐式分布式事务风险

什么时候该选 Saga,什么时候绕开分布式事务

核心判断依据是“业务是否允许中间态 + 补偿可行性”。例如电商下单:用户看到“已提交”,库存显示“已预留”,这是合法中间态;若支付失败,发补偿指令取消预留,整个流程可自愈。

强一致性要求场景(如银行实时转账)→ 别用微服务拆,合并在单库单服务里用 ACID 跨组织/第三方系统(如调微信支付、对接物流 SaaS)→ 只能靠 Saga + 对账,2PC 根本不存在 高吞吐写入(如 IoT 设备上报)→ 用最终一致性 + 幂等写入,连 Saga 状态机都可能成为瓶颈,改用事件溯源 + 状态投影 临时性数据(如购物车)→ 用 Redis + 过期时间,根本不需要事务语义

MassTransit Saga 的坑:持久化与幂等性没配对就等于没做

很多人只写状态机,却忘了配持久化,导致重启后 Saga 状态丢失,消息重复消费时无法识别“这单我已经处理过了”,直接双倍扣库存。

另一个高频问题是补偿消息没加幂等键。比如

CancelInventoryReservationCommand
被重发三次,库存就多加三次。正确做法是在命令里带
RequestId
,消费者用该字段做去重(如存到 Redis Set 或 DB 唯一索引)。

EF Core 迁移必须包含
DbContext
中的
ISagaRepository
表(如
OrderState
),否则启动报
InvalidOperation: No saga repository configured
所有入站消息(
SubmitOrder
,
InventoryReserved
)必须设置
MessageId
CorrelationId
,否则
ConsumeContext<t></t>
拿不到上下文
本地测试时禁用重试(
UseInMemoryOutbox()
+
DisableRetry()
),否则补偿逻辑会被干扰,难以验证状态流转

分布式事务从来不是“怎么实现”,而是“哪些地方根本不能分布”。Saga 不是银弹,但它是目前 C# 微服务里最靠谱的落地路径;硬上 2PC,多数时候只是把问题从代码移到运维日志里。

相关推荐