ABP VNext + MassTransit:构建分布式事务与异步消息协作 🚀


📚 目录

  • ABP VNext + MassTransit:构建分布式事务与异步消息协作 🚀
    • 📚 1. 背景与动机
    • 🛠️ 2. 环境与依赖
    • 🔧 3. 在 ABP 模块中注册 MassTransit
      • 3.1 强类型配置绑定
      • 3.2 模块配置
    • 📝 4. 完整消息契约
    • 🔄 5. Saga 实体 & 状态机
      • 5.1 `OrderState` with RowVersion
      • 5.2 `OrderSagaDbContext` & `OrderStateMap`
      • 5.3 状态机 + 时序图
    • 📤 6. 发布 & 消费示例
      • 6.1 发布
      • 6.2 消费
    • 📦 7. 分布式事务 Outbox 流程
    • 🔍 8. 可观测性 & 性能监控
    • 🛠️ 9. Kafka 兼容示例


TL;DR

  • 🚀 零侵入 MassTransit:使用 services.AddMassTransit(...) 集成,无需依赖 Volo.Abp.EventBus.MassTransit;若需 ABP 自有事件总线,可安装 Volo.Abp.EventBus.RabbitMQ
  • 💾 生产级 Saga 持久化:通过 services.AddDbContext<OrderSagaDbContext>(…) + .EntityFrameworkRepository(...) 保证状态持久化与乐观并发(需配置 RowVersion
  • 📬 标准化 Outbox:在 DbContext 内调用 builder.ConfigureEventOutbox() 并配置 AbpDistributedEventBusOptions,实现数据库写入与消息发布的原子性
  • 🔍 一体化可观测性:弃用 Prometheus 直出,采用 OpenTelemetry Trace + Metrics,通过 UseOpenTelemetryTracing()AddOpenTelemetryMetrics().AddPrometheusExporter() 深度监控消息流转

📚 1. 背景与动机

在微服务架构中,“下单 → 支付 → 发货” 属于跨服务长流程,既要求 数据一致性,又追求 高可用可观测

  • 传统 2PC 性能低、易死锁;
  • 本地事务+补偿模式缺乏集中管理与可视化;
  • Saga 模式通过状态机、持久化与补偿,提供更优的分布式事务解决方案

🛠️ 2. 环境与依赖

  • .NET:6 +

  • ABP:VNext 6.x +

  • 中间件:RabbitMQ(默认)或 Kafka

  • 核心 NuGet 包

    dotnet add package MassTransit
    dotnet add package MassTransit.RabbitMQ
    dotnet add package MassTransit.Kafka
    dotnet add package MassTransit.AspNetCore
    
  • appsettings.json

    {"MassTransit": {"UseRabbitMq": true,"RabbitMq": {"Host": "rabbitmq://localhost","Username": "guest","Password": "guest"},"Kafka": {"BootstrapServers": "localhost:9092"}}
    }
    

🔧 3. 在 ABP 模块中注册 MassTransit

3.1 强类型配置绑定

public class MassTransitOptions
{public bool UseRabbitMq { get; set; }public RabbitMqOptions RabbitMq   { get; set; } = new();public KafkaOptions    Kafka      { get; set; } = new();
}
services.Configure<MassTransitOptions>(Configuration.GetSection("MassTransit"));
var mtOptions = services.BuildServiceProvider().GetRequiredService<IOptions<MassTransitOptions>>().Value;

3.2 模块配置

[DependsOn(typeof(AbpAutofacModule))]
public class OrderProcessingModule : AbpModule
{public override void ConfigureServices(ServiceConfigurationContext context){var options = context.Services.GetConfiguration().GetSection("MassTransit").Get<MassTransitOptions>();// 先注册 Saga DbContext(用于迁移)context.Services.AddDbContext<OrderSagaDbContext>(builder =>builder.UseSqlServer(Configuration.GetConnectionString("Default")));context.Services.AddMassTransit(x =>{// —— Saga 持久化 & 乐观并发 —— x.AddSagaStateMachine<OrderStateMachine, OrderState>().EntityFrameworkRepository(r =>{r.ExistingDbContext<OrderSagaDbContext>();r.UseSqlServer();r.ConcurrencyMode = ConcurrencyMode.Optimistic;});x.AddConsumer<AcceptOrderConsumer>();if (options.UseRabbitMq){x.UsingRabbitMq((ctx, cfg) =>{var rmq = options.RabbitMq;cfg.Host(rmq.Host, h =>{h.Username(rmq.Username);h.Password(rmq.Password);});cfg.UseMessageRetry(r => r.Interval(3, TimeSpan.FromSeconds(5)));cfg.UseCircuitBreaker(cb =>{cb.TrackingPeriod  = TimeSpan.FromMinutes(1);cb.TripThreshold   = 15;cb.ActiveThreshold = 10;cb.ResetInterval   = TimeSpan.FromMinutes(5);});cfg.UseHealthCheck(ctx);cfg.UseOpenTelemetryTracing();  cfg.ConfigureEndpoints(ctx);});}else{x.AddRider(r =>{r.AddSagaStateMachine<OrderStateMachine, OrderState>().EntityFrameworkRepository();r.UsingKafka((ctx, k) =>{k.Host(options.Kafka.BootstrapServers);k.TopicEndpoint<SubmitOrder>("submit-order-topic","order-group",e => e.ConfigureSaga<OrderState>(ctx));});});}});}
}

📝 4. 完整消息契约

public record SubmitOrder    (Guid OrderId, decimal Amount, DateTime Timestamp);
public record AcceptOrder    (Guid OrderId);
public record OrderCompleted (Guid OrderId, DateTime CompletedAt);
public record OrderFaulted   (Guid OrderId, string Reason);

🔄 5. Saga 实体 & 状态机

5.1 OrderState with RowVersion

public class OrderState : SagaStateMachineInstance
{public Guid     CorrelationId { get; set; }public string   CurrentState  { get; set; } = "";public DateTime? Created      { get; set; }public DateTime? Completed    { get; set; }public byte[]?  RowVersion    { get; set; }  // 乐观并发标记
}

5.2 OrderSagaDbContext & OrderStateMap

public class OrderSagaDbContext : SagaDbContext
{public OrderSagaDbContext(DbContextOptions<OrderSagaDbContext> options): base(options) { }protected override IEnumerable<ISagaClassMap> Configurations=> new[] { new OrderStateMap() };
}public class OrderStateMap : SagaClassMap<OrderState>
{protected override void Configure(EntityTypeBuilder<OrderState> entity, ModelBuilder model){entity.Property(x => x.RowVersion).IsRowVersion();}
}

5.3 状态机 + 时序图

public class OrderStateMachine : MassTransitStateMachine<OrderState>
{public State Submitted  { get; private set; }public State Completed  { get; private set; }public Event<SubmitOrder>  SubmitOrderEvent  { get; private set; }public Event<AcceptOrder>  AcceptOrderEvent  { get; private set; }public Event<OrderFaulted>  OrderFaultedEvent { get; private set; }public OrderStateMachine(){InstanceState(x => x.CurrentState);Event(() => SubmitOrderEvent,   x => x.CorrelateById(m => m.Message.OrderId));Event(() => AcceptOrderEvent,   x => x.CorrelateById(m => m.Message.OrderId));Event(() => OrderFaultedEvent,  x => x.CorrelateById(m => m.Message.OrderId));Initially(When(SubmitOrderEvent).Then(ctx => ctx.Saga.Created = DateTime.UtcNow).TransitionTo(Submitted));During(Submitted,When(AcceptOrderEvent).ThenAsync(ctx => /* 发货等业务 */ Task.CompletedTask).PublishAsync(ctx => ctx.Init<OrderCompleted>(new{ ctx.Saga.CorrelationId, CompletedAt = DateTime.UtcNow })).Then(ctx => ctx.Saga.Completed = DateTime.UtcNow).TransitionTo(Completed));DuringAny(When(OrderFaultedEvent).ThenAsync(ctx => { /* 补偿逻辑 */ return Task.CompletedTask; }).Finalize());SetCompletedWhenFinalized();}
}
Yes
No
SubmitOrder
State: Submitted
AcceptOrder
处理成功?
Publish OrderCompleted
Set Completed
Finalized
Publish OrderFaulted
Compensation

📤 6. 发布 & 消费示例

6.1 发布

public class OrderAppService : ApplicationService
{private readonly IPublishEndpoint _publish;public OrderAppService(IPublishEndpoint publish) => _publish = publish;public async Task<Guid> CreateOrderAsync(decimal amount){var orderId = Guid.NewGuid();// 本地写库…await _publish.Publish(new SubmitOrder(orderId, amount, DateTime.UtcNow));return orderId;}
}

6.2 消费

public class AcceptOrderConsumer : IConsumer<AcceptOrder>
{public async Task Consume(ConsumeContext<AcceptOrder> ctx){// 支付、库存等业务// 失败时:await ctx.Publish(new OrderFaulted(ctx.Message.OrderId, "库存不足"));}
}

📦 7. 分布式事务 Outbox 流程

public class AppDbContext : AbpDbContext<AppDbContext>, IHasEventOutbox
{public DbSet<OutgoingEventRecord> OutgoingEvents { get; set; }protected override void OnModelCreating(ModelBuilder builder){base.OnModelCreating(builder);builder.ConfigureEventOutbox();}
}
Configure<AbpDistributedEventBusOptions>(opt =>
{opt.Outboxes.Configure(cfg =>cfg.UseDbContext<AppDbContext>());
});
API: CreateOrder
DB Txn Begin
保存 OutgoingEvent
DB Txn Commit
Outbox Worker 定时/轮询触发
MassTransit.Publish
消息下行

🔍 8. 可观测性 & 性能监控

  • OpenTelemetry Tracecfg.UseOpenTelemetryTracing() 捕获消息发布、消费、Saga 状态切换等全链路
  • OpenTelemetry Metrics
 services.AddOpenTelemetryMetrics(builder =>builder.AddPrometheusExporter());

Grafana 拉取 /metrics 可视化监控。

  • 并发限流:在接收端口上设置:

    cfg.ReceiveEndpoint("accept-order-queue", e =>
    {e.UseConcurrencyLimit(4);e.ConfigureConsumer<AcceptOrderConsumer>(ctx);
    });
    
  • 批量消费

    x.AddConsumer<BatchOrderConsumer>(cfg =>cfg.Options<BatchOptions>(o => o.MessageLimit = 50));
    x.UsingRabbitMq((_, cfg) =>
    {cfg.ReceiveEndpoint("batch-queue", e =>{e.ConfigureConsumer<BatchOrderConsumer>(context);});
    });
    

    进一步提升吞吐。


🛠️ 9. Kafka 兼容示例

x.AddRider(r =>
{r.AddSagaStateMachine<OrderStateMachine, OrderState>().EntityFrameworkRepository();r.UsingKafka((ctx, k) =>{k.Host("localhost:9092");k.TopicEndpoint<SubmitOrder>("submit-order-topic","order-group",e => e.ConfigureSaga<OrderState>(ctx));});
});

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/89059.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/89059.shtml
英文地址,请注明出处:http://en.pswp.cn/diannao/89059.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

语义网技术

用通俗语言说语义网技术&#xff0c;以及它和现在互联网的关系 一、语义网技术&#xff1a;让网络“听懂人话”的智能升级 现有互联网就像一本巨大的“图文报纸”&#xff1a;我们人类看文章、图片能轻松理解意思&#xff0c;但计算机只能识别文字符号&#xff0c;不知道“苹…

pytorch学习—4.反向传播(用pytorch算梯度)

2. 线性模型 3.梯度下降算法 4.反向传播_哔哩哔哩_bilibili 4.1 代码复现 import torch import matplotlib.pyplot as pltx_data=[1.0,2.0,3.0] y_data=[2.0,4.0,6.0]#这里创建了一个PyTorch张量w,初始值为1.0,并且设置requires_grad=True, #这意味着在计算过程中,PyTo…

7类茶叶嫩芽图像分类数据集

在茶叶育种、溯源管理与自动采摘等智能农业场景中&#xff0c;茶树品种的识别与分类是一项关键任务。不同茶叶品种在嫩芽期表现出显著的形态差异&#xff0c;例如颜色、叶缘结构、芽头密度等。因此&#xff0c;基于图像的茶叶品种分类不仅具备实际应用价值&#xff0c;也为农业…

【Elasticsearch】Linux环境下安装Elasticsearch

一&#xff0c;前言 Elasticsearch&#xff08;简称 ES&#xff09;是一个基于 ​​Apache Lucene​​ 构建的开源分布式搜索与分析引擎。它支持​​实时数据处理​​&#xff0c;提供近实时的全文搜索能力&#xff0c;并通过 ​​JSON 格式的 RESTful API​​ 实现数据索引与检…

【数据结构--树于哨兵查找-1】

查找 从前到后- 线性查找 -就是顺序查找. 哨兵法查找–节省每次都要判断是否越界的这一步骤利于节省开销&#xff0c;从而提升效率。 参考我的程序 #include <stdio.h> #include <stdlib.h> #include <time.h> #include <stdbool.h>#define SIZE …

MyBatis修改(update)操作

1. 三步法口诀 “接口收对象&#xff0c;SQL全赋值&#xff0c;主键定目标” 2. 详细记忆点 | 步骤 | 口诀 | 说明与示例 | |--------------|----------------|----------------------------------------------------------------------------| | 1. 写接口 | “接口收对象…

Spring Boot 入门学习

一、 Web应用开发概述 什么是Web应用 1. Web应用 &#xff08;Web Application&#xff09;是一种运行在Web服务器上的软件程序&#xff0c;由用户通过Web浏览器进行访问和交互。 2.Web应用与传统的桌面应用不同&#xff0c;它不需要在个人计算机上安装特定的软件&#xff0…

深度解读概率与证据权重 -Probability and the Weighing of Evidence

以下是I.J.古德&#xff08;I.J. Good&#xff09;的经典著作 《概率与证据权衡》&#xff08;Probability and the Weighing of Evidence, 1950&#xff09; 的中文详细总结&#xff1a; 本文由「大千AI助手」原创发布&#xff0c;专注用真话讲AI&#xff0c;回归技术本质。拒…

跟着AI学习C#之项目实战-电商平台 Day6

&#x1f4c5; Day 6&#xff1a;后台管理系统开发&#xff08;Admin Panel&#xff09; ✅ 今日目标&#xff1a; 创建管理员页面布局实现商品管理&#xff08;CRUD&#xff09;实现订单管理&#xff08;查看、状态变更&#xff09;添加权限控制&#xff08;仅管理员可访问&…

使用OpcUaHelper在C# WinForms中连接OPC UA服务器并读取数据

使用OpcUaHelper在C# WinForms中连接OPC UA服务器并读取数据 下面是一个完整的示例&#xff0c;展示如何使用OpcUaHelper库在C# WinForms应用程序中连接OPC UA服务器并读取数据。 1. 准备工作 首先&#xff0c;确保你已经安装了OpcUaHelper NuGet包。可以通过NuGet包管理器控…

鸿蒙应用开发中的数据存储:SQLite与Preferences全面解析

在鸿蒙&#xff08;HarmonyOS&#xff09;应用开发中&#xff0c;数据存储是构建功能完整、用户体验良好的应用程序的关键环节。鸿蒙系统提供了多种数据存储解决方案&#xff0c;其中SQLite数据库和Preferences&#xff08;偏好设置&#xff09;是最常用的两种方式。本文将深入…

夏至之日,共赴实时 AI 之约:RTE Open Day@AGI Playground 2025 回顾

每年 RTE 开发者社区的重磅活动—— RTE Open Day &#xff0c;也在六月的 AGI Playground 现场开启今年的行程。这是 RTE Open Day 第五期现场&#xff0c;这期我们的关键词是 「Real-Time AI」 和 「Voice Agent」&#xff0c;不仅有来自社区的 16 个项目&#xff0c;还有两场…

Tomcat性能调优指南

文章目录 一、Tomcat性能调优概述为什么需要调优Tomcat&#xff1f; 二、Tomcat架构与性能关键点三、JVM调优1. 内存配置优化2. 垃圾回收优化3. 其他JVM优化参数 四、连接器(Connector)调优1. NIO vs APR/Native2. 高级NIO配置 五、线程池优化六、会话管理优化1. 会话超时配置2…

Swift 小技巧:用单边区间优雅处理模糊范围

进入正题之前先科普一下 Swift 区间的知识。 Swift 中的区间有两种类型&#xff1a;闭区间和半开区间。 闭区间&#xff1a;用 a...b 表示&#xff0c;包含 a 和 b。半开区间&#xff1a;用 a..<b 表示&#xff0c;包含 a 但不包含 b。 举个例子 想判断一个数字是否在 0 …

Tang Prime 20K板OV2640例程

准备用Tang Prime 20K开发板进行OV2640摄像头采集验证。 Tang Primer 20K是由开源硬件厂商SiPEED矽速科技推出&#xff0c;是一款以 GW2A-LV18PG256C8/I7 为主芯片的核心板&#xff0c;准备了 2 个扩展板&#xff0c;Dock 和 Lite。板卡包含有HDMI输出&#xff0c;DVP接口&…

基于Anaconda环境开发IntelliJ IDEA实用JSON转Java实体插件

在软件开发中&#xff0c;将JSON数据转换为Java实体类是常见需求。借助Anaconda环境强大的包管理能力与IntelliJ IDEA的插件开发体系&#xff0c;我们可以打造一款高效实用的JSON转Java实体插件&#xff0c;显著提升开发效率。下面将从需求分析、技术选型、开发实现到优化部署&…

idea运行到远程机器 和 idea远程JVM调试

一、idea运行到远程机器 适用场景&#xff0c;本地连接不上远程机器的部分组件&#xff0c;如&#xff1a;redis、数据库。 缺点&#xff1a;每次修改程序&#xff0c;会复制所有的 依赖和class 启动比较慢。 工作原理&#xff1a;远程机器和本机器&#xff0c;都会启动一个端口…

微信小程序接入腾讯云短信验证码流程

以下是针对 AA公司微信小程序接入腾讯云短信验证码 的 全流程操作指南&#xff0c;包含资质申请、签名/模板配置、代码对接的完整解决方案&#xff1a; 一、资质申请&#xff08;必须通过审核才能发短信&#xff09; 1️⃣ 进入资质管理页 路径&#xff1a;腾讯云控制台 → 短…

阿里云OSS文件上传完整实现方案

一、前言 阿里云对象存储服务(OSS)是一种海量、安全、低成本、高可靠的云存储服务。本文将详细介绍如何在Spring Boot项目中集成阿里云OSS实现文件上传功能。 二、准备工作 1. 获取OSS配置信息 在开始前&#xff0c;您需要准备以下OSS配置信息&#xff1a; endpoint: OSS服…

【软考--软件设计师】10.2 关系型数据库

10 模式分解 分解 模式分解:将一个关系模式分解为多个子模式 模式分解就是模式规范化的工具&#xff0c;模式分解使用无损连接和保持函数依赖来衡量模式分解后是否导致原有模式中部分信息丢失。 无损连接 保持函数依赖 11、事务管理 事务的ACID性质: (1)原子性(Atomicit…