ABP VNext + Cosmos DB Change Feed:搭建实时数据变更流服务 🚀


📚 目录

  • ABP VNext + Cosmos DB Change Feed:搭建实时数据变更流服务 🚀
      • TL;DR ✨🚀
    • 1. 环境与依赖 🏗️
    • 2. 服务注册与依赖注入 🔌
    • 3. 封装 Change Feed 为 IHostedService 🔧
      • 3.1 HostedService 生命周期流程图
      • 3.2 `ChangeFeedHostedService` 实现
    • 4. 事务与幂等 🛡️
    • 5. 发布到事件总线 📡
      • MassTransit 示例
    • 6. 容错与监控 🛠️📊
    • 7. 横向扩展 🌐
    • 参考文档 📖


TL;DR ✨🚀

  • 全托管 DI:CosmosClient 由容器单例管理,HostedService 构造注入,优雅释放。
  • 作用域与事务:回调内创建新 Scope,结合 IUnitOfWorkManager 实现事务一致性🛡️。
  • Exactly-Once:通过(DocumentId, ETag)唯一索引 + 手动 Checkpoint,确保不漏不重✅。
  • 容错重试:Polly 指数退避重试与熔断,处理启动与回调中的网络抖动🔄。
  • 监控可扩展:日志、指标、Dead-Letter 容错,中控告警 + 多实例自动分片,助力弹性伸缩📊。

1. 环境与依赖 🏗️

  • .NET 平台:.NET 6 + / ABP VNext 6.x

  • Azure 资源:Cosmos DB Core API(Source 容器 + Lease 容器)

  • 主要 NuGet 包

    dotnet add package Microsoft.Azure.Cosmos
    dotnet add package Volo.Abp.EventBus.MassTransit
    dotnet add package Streamiz.Kafka.Net.Stream        # 可选
    dotnet add package Volo.Abp.EntityFrameworkCore
    dotnet add package Polly
    
  • appsettings.json 配置

    {"Cosmos": {"ConnectionString": "<your-connection-string>","Database": "MyAppDb","SourceContainer": "Docs","LeaseContainer": "Leases"},"RabbitMq": { "Host": "rabbitmq://localhost" },"Kafka":   { "BootstrapServers": "localhost:9092" }
    }
    

2. 服务注册与依赖注入 🔌

MyAppModuleConfigureServices 中:

public override void ConfigureServices(ServiceConfigurationContext context)
{var configuration = context.Services.GetConfiguration();// CosmosClient 单例托管context.Services.AddSingleton(sp =>new CosmosClient(configuration["Cosmos:ConnectionString"]));// Polly 重试策略:3 次指数退避context.Services.AddSingleton(sp => Policy.Handle<Exception>().WaitAndRetryAsync(retryCount: 3,sleepDurationProvider: attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)),onRetry: (ex, ts, retryCount, ctx) =>{var logger = sp.GetRequiredService<ILogger<ChangeFeedHostedService>>();logger.LogWarning(ex, "⚠️ ChangeFeed 启动重试,第 {RetryCount} 次", retryCount);}));// 注册 HostedServicecontext.Services.AddHostedService<ChangeFeedHostedService>();
}

💡 Tip:将 Cosmos、RabbitMQ、Kafka 等配置抽象到 SettingDefinition,支持动态变更。


3. 封装 Change Feed 为 IHostedService 🔧

3.1 HostedService 生命周期流程图

应用启动
DI 容器构建
触发 IHostedService.StartAsync
启动 ChangeFeedProcessor
监听文档变更
HandleChangesAsync 回调
发布事件 & 写审计 & Checkpoint
准备下一批

⚠️ “触发 StartAsync”更准确地反映了 ASP.NET Core Host 的启动流程。

3.2 ChangeFeedHostedService 实现

public class ChangeFeedHostedService : IHostedService, IDisposable
{private readonly CosmosClient _cosmosClient;private readonly IConfiguration _config;private readonly ILogger<ChangeFeedHostedService> _logger;private readonly IAsyncPolicy _retryPolicy;private readonly IServiceProvider _serviceProvider;private ChangeFeedProcessor _processor;public ChangeFeedHostedService(CosmosClient cosmosClient,IConfiguration config,ILogger<ChangeFeedHostedService> logger,IAsyncPolicy retryPolicy,IServiceProvider serviceProvider){_cosmosClient    = cosmosClient;_config          = config;_logger          = logger;_retryPolicy     = retryPolicy;_serviceProvider = serviceProvider;}public async Task StartAsync(CancellationToken ct){await _retryPolicy.ExecuteAsync(async () =>{_logger.LogInformation("🔄 ChangeFeedHostedService 正在启动...");var dbName = _config["Cosmos:Database"];var src    = _cosmosClient.GetContainer(dbName, _config["Cosmos:SourceContainer"]);var lease  = _cosmosClient.GetContainer(dbName, _config["Cosmos:LeaseContainer"]);_processor = src.GetChangeFeedProcessorBuilder<MyDocument>("abp-processor", HandleChangesAsync).WithInstanceName(Environment.MachineName).WithLeaseContainer(lease).WithStartTime(DateTime.MinValue.ToUniversalTime()).Build();await _processor.StartAsync(ct);_logger.LogInformation("✅ ChangeFeedProcessor 已启动");});}public async Task StopAsync(CancellationToken ct){if (_processor != null){_logger.LogInformation("🛑 ChangeFeedProcessor 正在停止...");await _processor.StopAsync(ct);_logger.LogInformation("✅ ChangeFeedProcessor 已停止");}}public void Dispose() => _processor = null;private async Task HandleChangesAsync(IReadOnlyCollection<MyDocument> docs,CancellationToken ct){if (docs == null || docs.Count == 0) return;_logger.LogInformation("📥 收到 {Count} 条文档变更", docs.Count);// 创建新的 DI Scopeusing var scope = _serviceProvider.CreateScope();var uowManager = scope.ServiceProvider.GetRequiredService<IUnitOfWorkManager>();var eventBus   = scope.ServiceProvider.GetRequiredService<IDistributedEventBus>();var auditRepo  = scope.ServiceProvider.GetRequiredService<IRepository<AuditEntry, Guid>>();// 开始事务using var uow = await uowManager.BeginAsync();foreach (var doc in docs){try{// 发布领域事件await eventBus.PublishAsync(new DocumentChangedEvent(doc.Id, doc), ct);// 审计写入,唯一索引保证幂等var entry = new AuditEntry{DocumentId = doc.Id,ETag       = doc.ETag,Operation  = doc.Operation,Timestamp  = DateTime.UtcNow,Payload    = JsonConvert.SerializeObject(doc)};await auditRepo.InsertAsync(entry, autoSave: true);}catch (DbUpdateException dbEx)when (dbEx.InnerException?.Message.Contains("UNIQUE") ?? false){_logger.LogWarning("⚠️ 文档 {DocumentId}@{ETag} 唯一索引冲突,跳过", doc.Id, doc.ETag);}catch (Exception ex){_logger.LogError(ex, "🔥 写审计失败,写入 Dead-Letter 容器");await WriteToDeadLetterAsync(doc, ex, ct);// 回滚本次事务await uow.RollbackAsync();// 跳过到下一文档continue;}}// 提交事务await uow.CompleteAsync();// 手动 Checkpointawait _processor.CheckpointAsync(ct);_logger.LogInformation("🗸 Checkpoint 完成,位置已记录");}private Task WriteToDeadLetterAsync(MyDocument doc, Exception ex, CancellationToken ct){// TODO: 实现将失败批次写入 Dead-Letter 容器或队列,用于离线补偿return Task.CompletedTask;}
}

4. 事务与幂等 🛡️

HandleChangesAsync
IUnitOfWorkManager.Begin
Publish Event & Insert Audit
异常?
写入 Dead-Letter
Rollback UoW
Complete UoW
Checkpoint

💡 Tip:在 AuditEntry 上建立 (DocumentId, ETag) 唯一索引,捕获 DbUpdateException 后跳过重复。


5. 发布到事件总线 📡

ChangeFeedProcessor
IDistributedEventBus.PublishAsync
MassTransit/RabbitMQ
Streamiz/Kafka
DocumentChangedConsumer
DocumentChangedProcessor

MassTransit 示例

services.AddMassTransit(cfg =>
{cfg.AddConsumer<DocumentChangedConsumer>();cfg.UsingRabbitMq((ctx, rc) =>{rc.Host(Configuration["RabbitMq:Host"]);rc.ReceiveEndpoint("change-feed-queue", e =>e.ConfigureConsumer<DocumentChangedConsumer>(ctx));});
});
public class DocumentChangedConsumer : IConsumer<DocumentChangedEvent>
{public async Task Consume(ConsumeContext<DocumentChangedEvent> ctx){// 下游业务逻辑…}
}

6. 容错与监控 🛠️📊

  • Polly 重试:启动与回调均受重试策略保护🔁。
  • Dead-Letter 容错:异常时写入专用容器/队列,离线补偿。
  • 日志ILogger<ChangeFeedHostedService> 记录启动/停止、批次数量、Checkpoint、异常详情。
  • 监控指标:集成 Application Insights 或 Prometheus,暴露 Lease 分片数、消费延迟、批量大小、错误率等。

7. 横向扩展 🌐

  • 多实例分片:同一 ProcessorName 启动 N 实例,Cosmos DB 自动均衡 Lease 分片。
  • 弹性伸缩:结合监控告警,自动扩缩 Kubernetes Deployment 或 VMSS,实现高峰应对。

参考文档 📖

  • Azure Cosmos DB Change Feed 官方文档
  • ABP 事件总线指南
  • MassTransit 文档

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/87982.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/87982.shtml
英文地址,请注明出处:http://en.pswp.cn/web/87982.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

STM32-定时器

定时器&#xff1a;有4个独立通道&#xff1a;输入捕获&#xff1b;输出比较PWM生成&#xff1b;单脉冲模式输出&#xff1b;可通外部信号控制定时器&#xff08;TIMx-ETR&#xff09;&#xff1b;支持针对定时的增量&#xff08;正交&#xff09;编码器、霍尔传感器电路通用定…

Windows Server 2019--职业技能大赛B模块Windows服务器配置样题

一、赛题说明 &#xff08;一&#xff09;竞赛介绍 请详细阅读网络拓扑图&#xff0c;为所有计算机修改默认防火墙以便允许ICMP和相应的流量&#xff0c;不允许直接关闭主机的防火墙。除了CD-ROM/HDD驱动器&#xff0c;请不要修改虚拟机本身的硬件设置。 &#xff08;二&…

vue3+Echarts实现立体柱状图

Echarts柱状图中文网&#xff1a;https://echarts.apache.org/examples/zh/index.html#chart-type-bar 效果展示&#xff1a; 主要实现过程是三部分的组合&#xff0c;最上面是一个椭圆&#xff0c;中间是正常的柱子&#xff0c;下方再加上一个椭圆&#xff0c;就出来立体的效…

【UE5】虚幻引擎小百科

一、类名前面的大写字母的含义是什么UE5常见前缀分类表前缀含义实例用于AActorACharacter&#xff0c;AWeaponBase可放入世界中的对象&#xff08;有位置、可碰撞等&#xff09;UUObject派生类UUserWidget&#xff0c;UWeaponComponent引擎对象、逻辑模块&#xff0c;不具备Tra…

【Linux系统】vim编辑器 | 编译器gcc/g++ | make/Makefile

1. vim编辑器一、历史发展与Vim vs Vi的区别起源与演进Vi&#xff08;1976年&#xff09; &#xff1a;由Bill Joy开发&#xff0c;嵌入BSD Unix系统&#xff0c;是首个面向屏幕的文本编辑器&#xff0c;但功能有限&#xff08;如无多级撤销&#xff09;。Vim&#xff08;1991年…

国产飞腾主板,赋能网络安全防御硬手段

​ 当前&#xff0c;网络安全形势严峻&#xff0c;网络攻击手段不断翻新&#xff0c;从数据泄露到电脑中毒&#xff0c;企业、机构乃至国家的数字资产都面临着巨大风险。在此背景下&#xff0c;国产硬件技术的突破对筑牢网络安全防线意义重大。 高能计算机基于市场需求&#…

Spring AI 概述与架构设计

目录一、前言二、简介三、核心能力概览四、理解模块架构图五、模型适配能力六、最小应用示例七、与传统 LLM 调用相比八、总结九、参考一、前言 在 AI 正以前所未有的速度“下沉”到各类系统与业务的当下&#xff0c;Spring 官方推出的 Spring AI 项目&#xff0c;为 Java 开发…

UI前端与数字孪生融合新领域:智慧环保的污染源监测与治理

hello宝子们...我们是艾斯视觉擅长ui设计、前端开发、数字孪生、大数据、三维建模、三维动画10年经验!希望我的分享能帮助到您!如需帮助可以评论关注私信我们一起探讨!致敬感谢感恩!一、引言&#xff1a;数字孪生重构智慧环保的技术范式在环境污染治理压力持续增大的背景下&…

【go/wails】wails入门系列(一)环境安装与demo

文章目录说在前面go安装nodejs安装wails创建项目运行说在前面 操作系统&#xff1a;win11go版本&#xff1a;1.24.4nodejs版本&#xff1a;v22.16.0wails版本&#xff1a;v2.10.1 go安装 官网 这里 下载安装即可 nodejs 官网 这里 下载安装即可 安装wails 设置go国内代理g…

linux qt 使用log4cpp库

一、日志库下载 下载地址&#xff1a;https://log4cpp.sourceforge.net/二、日志库解压&#xff0c;编译 1.将文件夹解压出来2.进入文件夹内部&#xff0c;打开终端3.终端中依次输入以下命令 mkdir build ./configure --prefix$(pwd)/build make make install 一般来说不会报错…

探索阿里云Data Integration:数据同步的魔法工具

引言在当今数字化时代&#xff0c;数据已成为企业的核心资产&#xff0c;如同企业发展的 “燃料”&#xff0c;驱动着业务的增长与创新。从用户行为数据到业务运营数据&#xff0c;从市场趋势数据到供应链数据&#xff0c;每一个数据点都蕴含着巨大的价值&#xff0c;能够为企业…

【Java面试】Redis的poll函数epoll函数区别?

Redis 在选择 poll 和 epoll 时主要基于性能需求、连接规模、操作系统支持等因素。以下是具体场景的对比与选择建议&#xff1a;1. 何时使用 poll 函数&#xff1f;适用场景&#xff1a; 跨平台兼容性需求&#xff1a;poll 在几乎所有操作系统&#xff08;如 Windows、BSD、Lin…

RPC--RPCHandler的实现

在RPC框架中&#xff0c;Handler用于接收RpcRequest&#xff0c;经过处理后返回RpcResponseSlf4jpublic class RpcRequestHandler {private final ServiceProvider serviceProvider;//获取一个单例模式的服务提供类public RpcRequestHandler() {serviceProvider SingletonFact…

C#读取文件夹和文件列表:全面指南

C#读取文件夹和文件列表&#xff1a;全面指南 在 C# 开发中&#xff0c;经常需要获取文件夹中的文件列表或子文件夹结构&#xff0c;例如文件管理器、批量处理工具、备份程序等场景。本文将详细介绍 C# 中读取文件夹和文件列表的各种方法&#xff0c;包括基础操作、递归遍历、过…

从小白到进阶:解锁linux与c语言高级编程知识点嵌入式开发的任督二脉(1)

【硬核揭秘】Linux与C高级编程&#xff1a;从入门到精通&#xff0c;你的全栈之路&#xff01;第一部分&#xff1a;初识Linux与环境搭建&#xff0c;玩转软件包管理——嵌入式开发的第一道“坎”嘿&#xff0c;各位C语言的“卷王”们&#xff01;你可能已经习惯了在Windows或m…

.net开源库SignalR

.NET开源库SignalR&#xff1a;打造实时Web应用的利器 在当今的Web开发领域&#xff0c;实时性已经成为了许多应用的核心需求。无论是实时聊天、实时数据监控还是实时游戏&#xff0c;都需要服务器能够及时地将数据推送给客户端。而.NET开源库SignalR&#xff0c;正是满足这一…

SQL Server不同场景批量插入数据的方式详解

INSERT INTO...VALUES多行语法 该方法适用于单次插入少量数据(通常<1000行),语法简洁直观。示例: INSERT INTO Employees (EmployeeID, Name, Department) VALUES (101, Zhang San, IT),(102, Li Si, HR),(103, Wang Wu, Finance)优点:语法简单易理解,适合开发测试环…

Day08-Flask 或 Django 简介:构建 Web 应用程序

Flask 或 Django 简介&#xff1a;构建 Web 应用程序 网络开发领域提供了丰富的工具和框架&#xff0c;而 Python 作为一门多功能的语言&#xff0c;在构建健壮且可扩展的 Web 应用方面脱颖而出。本课程将作为你使用 Python 进行 Web 开发的入门指南&#xff0c;特别聚焦于两个…

k8s多集群管理中的联邦和舰队如何理解?

在 Kubernetes 多集群管理中&#xff0c;联邦&#xff08;Federation&#xff09;和舰队&#xff08;Fleet&#xff09;是两种不同的方法&#xff0c;用于管理和协调多个 Kubernetes 集群。下面是对这两种方法的详细解释&#xff1a; 联邦&#xff08;Federation&#xff09; K…

Docker部署MySQL镜像

1.拉取镜像 # 拉取指定版本的MySQL镜像 docker pull mysql:8.02.创建挂载目录 # 自己创建好如下三个文件夹 路径任意 [rootiZuf6aigs7rxe6f6oifq7vZ mysql]# ll 总用量 12 drwxr-xr-x 2 root root 4096 7月 7 10:25 config drwxr-xr-x 2 root root 4096 6月 26 16:43 data d…