ABP VNext + MassTransit:构建分布式事务与异步消息协作 🚀
📚 目录
- ABP VNext + MassTransit:构建分布式事务与异步消息协作 🚀
- 📚 1. 背景与动机
- 🛠️ 2. 环境与依赖
- 🔧 3. 在 ABP 模块中注册 MassTransit
- 3.1 强类型配置绑定
- 3.2 模块配置
- 📝 4. 完整消息契约
- 🔄 5. Saga 实体 & 状态机
- 5.1 `OrderState` with RowVersion
- 5.2 `OrderSagaDbContext` & `OrderStateMap`
- 5.3 状态机 + 时序图
- 📤 6. 发布 & 消费示例
- 6.1 发布
- 6.2 消费
- 📦 7. 分布式事务 Outbox 流程
- 🔍 8. 可观测性 & 性能监控
- 🛠️ 9. Kafka 兼容示例
✨ TL;DR
- 🚀 零侵入 MassTransit:使用
services.AddMassTransit(...)
集成,无需依赖Volo.Abp.EventBus.MassTransit
;若需 ABP 自有事件总线,可安装Volo.Abp.EventBus.RabbitMQ
- 💾 生产级 Saga 持久化:通过
services.AddDbContext<OrderSagaDbContext>(…)
+.EntityFrameworkRepository(...)
保证状态持久化与乐观并发(需配置RowVersion
) - 📬 标准化 Outbox:在
DbContext
内调用builder.ConfigureEventOutbox()
并配置AbpDistributedEventBusOptions
,实现数据库写入与消息发布的原子性 - 🔍 一体化可观测性:弃用 Prometheus 直出,采用 OpenTelemetry Trace + Metrics,通过
UseOpenTelemetryTracing()
与AddOpenTelemetryMetrics().AddPrometheusExporter()
深度监控消息流转
📚 1. 背景与动机
在微服务架构中,“下单 → 支付 → 发货” 属于跨服务长流程,既要求 数据一致性,又追求 高可用 与 可观测。
- 传统 2PC 性能低、易死锁;
- 本地事务+补偿模式缺乏集中管理与可视化;
- Saga 模式通过状态机、持久化与补偿,提供更优的分布式事务解决方案
🛠️ 2. 环境与依赖
-
.NET:6 +
-
ABP:VNext 6.x +
-
中间件:RabbitMQ(默认)或 Kafka
-
核心 NuGet 包:
dotnet add package MassTransit dotnet add package MassTransit.RabbitMQ dotnet add package MassTransit.Kafka dotnet add package MassTransit.AspNetCore
-
appsettings.json
{"MassTransit": {"UseRabbitMq": true,"RabbitMq": {"Host": "rabbitmq://localhost","Username": "guest","Password": "guest"},"Kafka": {"BootstrapServers": "localhost:9092"}} }
🔧 3. 在 ABP 模块中注册 MassTransit
3.1 强类型配置绑定
public class MassTransitOptions
{public bool UseRabbitMq { get; set; }public RabbitMqOptions RabbitMq { get; set; } = new();public KafkaOptions Kafka { get; set; } = new();
}
services.Configure<MassTransitOptions>(Configuration.GetSection("MassTransit"));
var mtOptions = services.BuildServiceProvider().GetRequiredService<IOptions<MassTransitOptions>>().Value;
3.2 模块配置
[DependsOn(typeof(AbpAutofacModule))]
public class OrderProcessingModule : AbpModule
{public override void ConfigureServices(ServiceConfigurationContext context){var options = context.Services.GetConfiguration().GetSection("MassTransit").Get<MassTransitOptions>();// 先注册 Saga DbContext(用于迁移)context.Services.AddDbContext<OrderSagaDbContext>(builder =>builder.UseSqlServer(Configuration.GetConnectionString("Default")));context.Services.AddMassTransit(x =>{// —— Saga 持久化 & 乐观并发 —— x.AddSagaStateMachine<OrderStateMachine, OrderState>().EntityFrameworkRepository(r =>{r.ExistingDbContext<OrderSagaDbContext>();r.UseSqlServer();r.ConcurrencyMode = ConcurrencyMode.Optimistic;});x.AddConsumer<AcceptOrderConsumer>();if (options.UseRabbitMq){x.UsingRabbitMq((ctx, cfg) =>{var rmq = options.RabbitMq;cfg.Host(rmq.Host, h =>{h.Username(rmq.Username);h.Password(rmq.Password);});cfg.UseMessageRetry(r => r.Interval(3, TimeSpan.FromSeconds(5)));cfg.UseCircuitBreaker(cb =>{cb.TrackingPeriod = TimeSpan.FromMinutes(1);cb.TripThreshold = 15;cb.ActiveThreshold = 10;cb.ResetInterval = TimeSpan.FromMinutes(5);});cfg.UseHealthCheck(ctx);cfg.UseOpenTelemetryTracing(); cfg.ConfigureEndpoints(ctx);});}else{x.AddRider(r =>{r.AddSagaStateMachine<OrderStateMachine, OrderState>().EntityFrameworkRepository();r.UsingKafka((ctx, k) =>{k.Host(options.Kafka.BootstrapServers);k.TopicEndpoint<SubmitOrder>("submit-order-topic","order-group",e => e.ConfigureSaga<OrderState>(ctx));});});}});}
}
📝 4. 完整消息契约
public record SubmitOrder (Guid OrderId, decimal Amount, DateTime Timestamp);
public record AcceptOrder (Guid OrderId);
public record OrderCompleted (Guid OrderId, DateTime CompletedAt);
public record OrderFaulted (Guid OrderId, string Reason);
🔄 5. Saga 实体 & 状态机
5.1 OrderState
with RowVersion
public class OrderState : SagaStateMachineInstance
{public Guid CorrelationId { get; set; }public string CurrentState { get; set; } = "";public DateTime? Created { get; set; }public DateTime? Completed { get; set; }public byte[]? RowVersion { get; set; } // 乐观并发标记
}
5.2 OrderSagaDbContext
& OrderStateMap
public class OrderSagaDbContext : SagaDbContext
{public OrderSagaDbContext(DbContextOptions<OrderSagaDbContext> options): base(options) { }protected override IEnumerable<ISagaClassMap> Configurations=> new[] { new OrderStateMap() };
}public class OrderStateMap : SagaClassMap<OrderState>
{protected override void Configure(EntityTypeBuilder<OrderState> entity, ModelBuilder model){entity.Property(x => x.RowVersion).IsRowVersion();}
}
5.3 状态机 + 时序图
public class OrderStateMachine : MassTransitStateMachine<OrderState>
{public State Submitted { get; private set; }public State Completed { get; private set; }public Event<SubmitOrder> SubmitOrderEvent { get; private set; }public Event<AcceptOrder> AcceptOrderEvent { get; private set; }public Event<OrderFaulted> OrderFaultedEvent { get; private set; }public OrderStateMachine(){InstanceState(x => x.CurrentState);Event(() => SubmitOrderEvent, x => x.CorrelateById(m => m.Message.OrderId));Event(() => AcceptOrderEvent, x => x.CorrelateById(m => m.Message.OrderId));Event(() => OrderFaultedEvent, x => x.CorrelateById(m => m.Message.OrderId));Initially(When(SubmitOrderEvent).Then(ctx => ctx.Saga.Created = DateTime.UtcNow).TransitionTo(Submitted));During(Submitted,When(AcceptOrderEvent).ThenAsync(ctx => /* 发货等业务 */ Task.CompletedTask).PublishAsync(ctx => ctx.Init<OrderCompleted>(new{ ctx.Saga.CorrelationId, CompletedAt = DateTime.UtcNow })).Then(ctx => ctx.Saga.Completed = DateTime.UtcNow).TransitionTo(Completed));DuringAny(When(OrderFaultedEvent).ThenAsync(ctx => { /* 补偿逻辑 */ return Task.CompletedTask; }).Finalize());SetCompletedWhenFinalized();}
}
📤 6. 发布 & 消费示例
6.1 发布
public class OrderAppService : ApplicationService
{private readonly IPublishEndpoint _publish;public OrderAppService(IPublishEndpoint publish) => _publish = publish;public async Task<Guid> CreateOrderAsync(decimal amount){var orderId = Guid.NewGuid();// 本地写库…await _publish.Publish(new SubmitOrder(orderId, amount, DateTime.UtcNow));return orderId;}
}
6.2 消费
public class AcceptOrderConsumer : IConsumer<AcceptOrder>
{public async Task Consume(ConsumeContext<AcceptOrder> ctx){// 支付、库存等业务// 失败时:await ctx.Publish(new OrderFaulted(ctx.Message.OrderId, "库存不足"));}
}
📦 7. 分布式事务 Outbox 流程
public class AppDbContext : AbpDbContext<AppDbContext>, IHasEventOutbox
{public DbSet<OutgoingEventRecord> OutgoingEvents { get; set; }protected override void OnModelCreating(ModelBuilder builder){base.OnModelCreating(builder);builder.ConfigureEventOutbox();}
}
Configure<AbpDistributedEventBusOptions>(opt =>
{opt.Outboxes.Configure(cfg =>cfg.UseDbContext<AppDbContext>());
});
🔍 8. 可观测性 & 性能监控
- OpenTelemetry Trace:
cfg.UseOpenTelemetryTracing()
捕获消息发布、消费、Saga 状态切换等全链路 - OpenTelemetry Metrics:
services.AddOpenTelemetryMetrics(builder =>builder.AddPrometheusExporter());
Grafana 拉取 /metrics
可视化监控。
-
并发限流:在接收端口上设置:
cfg.ReceiveEndpoint("accept-order-queue", e => {e.UseConcurrencyLimit(4);e.ConfigureConsumer<AcceptOrderConsumer>(ctx); });
-
批量消费:
x.AddConsumer<BatchOrderConsumer>(cfg =>cfg.Options<BatchOptions>(o => o.MessageLimit = 50)); x.UsingRabbitMq((_, cfg) => {cfg.ReceiveEndpoint("batch-queue", e =>{e.ConfigureConsumer<BatchOrderConsumer>(context);}); });
进一步提升吞吐。
🛠️ 9. Kafka 兼容示例
x.AddRider(r =>
{r.AddSagaStateMachine<OrderStateMachine, OrderState>().EntityFrameworkRepository();r.UsingKafka((ctx, k) =>{k.Host("localhost:9092");k.TopicEndpoint<SubmitOrder>("submit-order-topic","order-group",e => e.ConfigureSaga<OrderState>(ctx));});
});