MassTransit 和 NServiceBus 都支持并发消费者与 Saga,但底层机制、配置粒度和默认行为差异显著——直接决定你是否要重写补偿逻辑、是否踩到数据库锁、以及 Saga 实例能否跨节点正确路由。
ConcurrentConsumer
的线程模型与消息吞吐控制方式不同
MassTransit 默认每个
ReceiveEndpoint启动一个消费者实例(
IConsumer),并发靠底层传输层(如 RabbitMQ 的
PrefetchCount)和 .NET 线程池自动调度;而 NServiceBus 显式提供
MaxConcurrency配置项,且其消费者(
IHandleMessages<t></t>)按 handler 实例生命周期隔离。 MassTransit 中若用
e.Consumer<t>()</t>,每次消息触发都 new 一个新实例,无需手动保证线程安全;但若改用
e.Instance(new T()),就必须确保该实例是线程安全的——否则
CorrelationId冲突或状态覆盖极难排查 NServiceBus 的
MaxConcurrency = 4表示最多 4 个 handler 并发执行,但它不控制底层 transport 的 prefetch,容易在高负载下堆积未 ack 消息,需同步调大
Transport.Transaction.MaxConcurrency两者都依赖数据库事务完成 Saga 持久化,但 MassTransit 默认用乐观并发(检查
RowVersion或时间戳),NServiceBus 默认悲观锁(SQL Server 上用
SELECT ... WITH (UPDLOCK)),后者在长事务中易引发阻塞
Saga
初始化与事件路由的匹配逻辑差异明显
MassTransit 要求所有参与 Saga 的消息类型必须实现
CorrelatedBy<t></t>,且
CorrelationId字段名和类型必须完全一致;NServiceBus 则允许通过
ConfigureHowToFindSaga自定义查找条件,比如用
OrderId+
CustomerId组合键,灵活性更高但配置更隐晦。 MassTransit 的
InitiatedBy<t></t>接口只在首次收到消息时创建 Saga 实例,后续同
CorrelationId的消息自动路由到已存实例;而 NServiceBus 的
IContainSagaData类型没有“启动/协调”语义分离,全靠
ConfigureHowToFindSaga返回非 null 才认为是已有实例 MassTransit 的
Orchestrates<t></t>是显式契约,编译期可查;NServiceBus 中同一 message 可能既触发新建又更新旧实例,取决于查找逻辑返回值,运行时才暴露问题 两者都支持分布式部署,但 MassTransit 要求所有节点共享同一 Saga 存储(如 SQL Server),NServiceBus 允许分库分表(需自定义
ISagaStorage),不过代价是失去跨库的原子性保证
Compensating Transaction
的实现责任归属不同
MassTransit 不内置补偿动作定义,Saga 类里需手动调用
context.Publish<ordercancelled>()</ordercancelled>或
context.Send<undoinventoryreserve>()</undoinventoryreserve>;NServiceBus 提供
IAmStartedByMessages<t></t>+
IHandleMessages<t></t>组合,并鼓励用
ReplyToOriginator触发反向流程,结构更固定但扩展性受限。 MassTransit 中 Saga 失败后重试由
UseMessageRetry控制,补偿消息发送失败不会自动回滚已存 Saga 状态,需靠幂等 + 重试 + 监控兜底 NServiceBus 的
SagaTimeOut可绑定到具体消息,超时后自动触发
IHandleTimeouts<t></t>,适合订单过期场景;MassTransit 需手动发
ScheduleMessage+
ConsumeContext.ScheduleSend模拟,稍繁琐 两者都要求补偿操作幂等,但 MassTransit 更依赖开发人员对
CorrelationId的严格使用,漏传或错传会导致补偿发到错误 Saga 实例
真正容易被忽略的是:MassTransit 的 Saga 存储初始化(如
AddEntityFrameworkRepository)必须在
AddMassTransit之前注册,否则运行时报
Unable to resolve service for type 'ISagaRepository<ordersaga>'</ordersaga>;而 NServiceBus 的
endpointConfiguration.UsePersistence<sqlpersistence>()</sqlpersistence>顺序相对宽松。这点在混合使用多个传输(Kafka + RabbitMQ)或升级到 v8 时尤为致命。
