本节介绍Spring for Apache Pulsar如何支持事务。

Overview

Spring for Apache Pulsar事务支持是基于Spring Framework提供的事务支持构建的。在高层,事务资源向事务管理器注册,事务管理器反过来处理注册资源的事务状态(提交、回滚等)。

Apache Pulsar的Spring提供了以下功能:

PulsaTransactionManager-用于正常的Spring事务支持(@transactional,transactionTemplate等)

交易脉冲星模板

交易@pulsaListener

与其他事务管理器的事务同步

事务支持尚未添加到响应式组件中

默认情况下,事务支持已禁用。要在使用Spring Boot时启用支持,只需设置Spring.pulsar.transaction.enabled属性。下面每个组件部分都概述了进一步的配置选项。

Transactional Publishing with PulsarTemplate

事务性PulsarTemplate上的所有发送操作都会查找活动事务,并在事务中登记每个发送操作(如果找到)。

Non-transactional use

默认情况下,事务性PulsarTemplate也可用于非事务性操作。当未找到现有事务时,它将以非事务方式继续发送操作。但是,如果模板配置为需要事务,则任何在事务范围之外使用模板的尝试都会导致异常。

事务可以由TransactionTemplate、@Transactional方法、调用executeInTransaction或事务侦听器容器启动。

Local Transactions

我们使用术语“本地”事务来表示不受Spring事务管理工具(即PulsarTransactionManager)管理或与之关联的Pulsar本地事务。相反,“同步”事务是由PulsarTransactionManager管理或与之关联的事务。

您可以使用PulsarTemplate在本地事务中执行一系列操作。以下示例显示了如何执行此操作:

var results = pulsarTemplate.executeInTransaction((template) -> {var rv = new HashMap<String, MessageId>();rv.put("msg1", template.send(topic, "msg1"));rv.put("msg2", template.send(topic, "msg2"));return rv;
});

回调中的参数是调用executeInTransaction方法的模板实例。模板上的所有操作都登记在当前事务中。如果回调正常退出,则事务被提交。如果抛出异常,则事务将回滚。

若有一个同步的事务正在处理中,它将被忽略,并使用一个新的“嵌套”事务。

Configuration

以下交易设置可直接在PulsarTemplate上使用(通过交易字段):

enabled-模板是否支持事务(默认为false)

required-模板是否需要交易(默认为false)

timeout-事务超时的持续时间(默认为空)

不使用Spring Boot时,您可以在提供的模板上调整这些设置。但是,使用Spring Boot时,模板是自动配置的,没有影响属性的机制。在这种情况下,您可以注册一个可用于调整设置的PulsarTemplateCustomizer bean。以下示例显示了如何在自动配置的模板上设置超时:

@Bean
PulsarTemplateCustomizer<?> templateCustomizer() {return (template) -> template.transactions().setTimeout(Duration.ofSeconds(45));
}

Transactional Receiving with @PulsarListener

当启用侦听器事务时,在同步事务的范围内调用@PulsarListener注释的侦听器方法。

DefaultPulsarMessageListenerContainer使用配置了PulsarTransactionManager的Spring TransactionTemplate在方法调用之前启动事务。

每个接收到的消息的确认都登记在作用域事务中。

Consume-Process-Produce Scenario

一种常见的事务模式是,消费者从Pulsar主题读取消息,转换消息,最后生产者将生成的消息写入另一个Pulsar主题。当启用事务并且您的侦听器方法使用事务性PulsarTemplate来生成转换后的消息时,该框架支持此用例。

给定以下侦听器方法:

@PulsarListener(topics = "my-input-topic")
void listen(String msg) {var transformedMsg = msg.toUpperCase(Locale.ROOT);this.transactionalTemplate.send("my-output-topic", transformedMsg);
}

启用侦听器事务时会发生以下交互:

侦听器容器启动新事务并在事务范围内调用侦听器方法
侦听器方法接收消息
侦听器方法转换消息
监听器方法使用事务模板发送转换后的消息,该模板在活动事务中注册发送操作
侦听器容器自动确认消息,并在活动事务中注册确认操作
侦听器容器(通过TransactionTemplate)提交事务

如果您没有使用@PulsarListener,而是直接使用监听器容器,则会提供与上述相同的事务支持。记住,@PulsarListener只是为了方便将Java方法注册为侦听器容器消息侦听器。

Transactions with Record Listeners

上面的例子使用了一个记录监听器。使用记录侦听器时,每次侦听器方法调用时都会创建一个新事务,相当于每条消息一个事务。

由于事务边界是针对每条消息的,并且每条消息的确认都登记在每个事务中,因此批处理确认模式不能用于事务记录侦听器。

Transactions with Batch Listeners

使用批侦听器时,每次侦听器方法调用时都会创建一个新事务,相当于每批消息创建一个事务。

事务性批处理侦听器当前不支持自定义错误处理程序。

Configuration

Listener container factory

以下事务设置可以直接在ConcurrentPulsarListenerContainerFactory在创建侦听器容器时使用的PulsarContainerProperties上使用。这些设置会影响所有侦听器容器,包括@PulsarListener使用的容器。

enabled-容器是否支持事务(默认为false)

required-容器是否需要事务(默认为false)

timeout-事务超时的持续时间(默认为空)

transactionDefinition-一个蓝图事务定义,其属性将被复制到容器的事务模板中(默认为null)

transactionManager-用于启动事务的事务管理器

不使用Spring Boot时,您可以在提供的容器出厂设置中调整这些设置。但是,使用Spring Boot时,容器工厂是自动配置的。在这种情况下,您可以注册一个org.springframework.boot.pulser.autofigure。PulsarContainerFactory定制器<并发PulsarListenerContainerFactory<?>>bean访问和自定义容器属性。以下示例显示了如何在容器工厂设置超时:

@Bean
PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>> containerCustomizer() {return (containerFactory) -> containerFactory.getContainerProperties().transactions().setTimeout(Duration.ofSeconds(45));
}
@PulsarListener

默认情况下,每个侦听器都尊重其相应侦听器容器工厂的事务设置。但是,用户可以在每个@PulsarListener上设置事务属性,以覆盖容器工厂设置,如下所示:

如果容器工厂启用了事务,那么transaction=false将禁用单个侦听器的事务。

如果容器工厂启用了事务并且是必需的,那么尝试设置transaction=false将导致抛出一个异常,说明事务是必需的。

如果容器工厂已禁用事务,则将忽略设置transaction=true的尝试,并记录警告。

Using PulsarTransactionManager

PulsarTransactionManager是Spring框架的PlatformTransactionManager的实现。您可以将PulsarTransactionManager与正常的Spring事务支持(@Transactional、TransactionTemplate等)一起使用。

如果事务处于活动状态,则在事务范围内执行的任何PulsarTemplate操作都会登记并参与正在进行的事务。经理提交或回滚事务,取决于成功或失败。

您可能不需要直接使用PulsarTransactionManager,因为大多数事务用例都包含在PulsarTemplate和@PulsarListener中。

Pulsar Transactions with Other Transaction Managers

Producer-only transaction

如果你想将记录发送到Pulsar并在单个事务中执行一些数据库更新,你可以使用DataSourceTransactionManager进行正常的Spring事务管理。

以下示例假设有一个名为“DataSourceTransactionManager”的DataSourceTransactionManager bean注册

@Transactional("dataSourceTransactionManager")
public void myServiceMethod() {var msg = calculateMessage();this.pulsarTemplate.send("my-topic", msg);this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(msg));
}

@Transactional注释的拦截器启动数据库事务,PulsarTemplate将与DB事务管理器同步事务;每次发送都将参与该交易。当该方法退出时,数据库事务将提交,然后是Pulsar事务。

如果您希望首先提交Pulsar事务,并且仅在Pulsar事务成功时提交DB事务,请使用嵌套的@Transactional方法,其中外部方法配置为使用DataSourceTransactionManager,内部方法配置为用PulsarTransactionManager。

@Transactional("dataSourceTransactionManager")
public void myServiceMethod() {var msg = calculateMessage();this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(msg));this.sendToPulsar(msg);
}@Transactional("pulsarTransactionManager")
public void sendToPulsar(String msg) {this.pulsarTemplate.send("my-topic", msg);
}

Consumer + Producer transaction

如果你想使用Pulsar的记录,将记录发送到Pulsar,并在事务中执行一些数据库更新,你可以将正常的Spring事务管理(使用DataSourceTransactionManager)与容器发起的事务相结合。

在以下示例中,侦听器容器启动Pulsar事务,@Transactional注释启动DB事务。DB事务首先提交;如果Pulsar事务未能提交,记录将被重新传递,因此DB更新应该是幂等的。

@PulsarListener(topics = "my-input-topic")
@Transactional("dataSourceTransactionManager")
void listen(String msg) {var transformedMsg = msg.toUpperCase(Locale.ROOT);this.pulsarTemplate.send("my-output-topic", transformedMsg);this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(transformedMsg));
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/913912.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/913912.shtml
英文地址,请注明出处:http://en.pswp.cn/news/913912.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

在Ubuntu上从零开始编译并运行Home Assistant源码并集成HACS与小米开源的Ha Xiaomi Home

目录1. 前言&&叠甲2. 使用的环境3. 相关链接4. 前期步骤4.1 安装路径提前说明4.2 Ubuntu 相关依赖安装4.3 Python源码编译安装4.3.1 编译安装4.3.2 换源4.3.3 环境变量5. 构建Home Assistant源码5.1 clone源码5.2 创建虚拟Python环境5.3 安装项目依赖5.4 安装项目5.5 运…

【实习篇】之Http头部字段之Disposition介绍

Http头部字段之DispositionDisposition头部字段介绍RFC规范介绍RFC 6266与RFC 2047实习的时候公司将一个某个关于下载的Bug交给了我来修&#xff0c;看了代码和日志后发现是Disposition字段的规范兼容性惹的锅&#xff0c;因为有些协议使用的是老协议&#xff0c;我们的项目没有…

VM文件管理与Vi/vim操作

[rootlocalhost /]# sudo mkdir /opt [rootlocalhost /]# sudo mkdir /opt/tmp [rootlocalhost /]# sudo touch /opt/tmp/a.txt [rootlocalhost /]# ls /opt/tmp/ a.txt [rootlocalhost /]# 3.步骤1&#xff1a;创建文件并插入日期时间vi /tmp/newfile在vi编辑器中输入以下命令…

【Android】安卓四大组件之内容提供者(ContentProvider):从基础到进阶

你手机里的通讯录&#xff0c;存储了所有联系人的信息。如果你想把这些联系人信息分享给其他App&#xff0c;就可以通过ContentProvider来实现。。 一、什么是 ContentProvider ‌ContentProvider‌ 是 Android 四大组件之一&#xff0c;负责实现‌跨应用程序的数据共享与访问…

Vue-19-前端框架Vue之应用基础组件通信(二)

文章目录 1 v-model(父子相传)1.1 App.vue1.2 Father.vue1.2.1 v-model用在html标签上1.2.2 v-model用在html标签上(本质写法)1.2.3 v-model用在组件标签上1.2.4 v-model用在组件标签上(本质写法)1.3 MyInput(自定义的组件)1.4 修改modelValue1.4.1 Father.vue1.4.2 MyInput.vu…

宝塔下载pgsql适配spring ai

1.宝塔安装pgvector 1.先去github下载pgvectorpgvector/pgvector: Open-source vector similarity search for Postgres 2.把压缩包上传到系统文件的/temp下解压&#xff0c;重命名文件名为pgvector&#xff0c;之后命令操作 cd /tmp cd pgvector export PG_CONFIG/www/serv…

RK3568项目(八)--linux驱动开发之基础外设(上)

目录 一、引言 二、准备工作 ------>2.1、驱动加载/卸载命令 三、字符设备驱动开发 ------>3.1、驱动模块的加载和卸载 ------>3.2、外部模块编译模板 Makefile ------>3.3、cdev 四、LED驱动 ------>4.1、原理图 ------>4.2、驱动 五、设备树 -…

BUUCTF在线评测-练习场-WebCTF习题[GXYCTF2019]BabySQli1-flag获取、解析

解题思路打开靶场&#xff0c;题目提示是sql注入输入数据&#xff0c;判断下闭合11123报错&#xff1a;Error: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 1 at line 1报错提示…

“AI 曼哈顿计划”:科技竞赛还是人类挑战?

美国国会下属的经济与安全审查委员会已将“推动建立并资助一项堪比曼哈顿计划的通用人工智能研发项目”列为其对国会的核心建议之一&#xff0c;明确显示出对AI竞赛战略意义的高度重视。与此同时&#xff0c;美国能源部在近几个月中多次公开将人工智能的突破比作“下一场曼哈顿…

音频信号的预加重:提升语音清晰度

一、预加重介绍预加重是一种信号处理技术&#xff0c;主要用于增强音频信号中的高频成分。由于人类语音的频谱特性&#xff0c;尤其是在辅音和音调的表达上&#xff0c;高频成分对于语音的清晰度至关重要。然而&#xff0c;在录音和传输过程中&#xff0c;这些高频成分往往会受…

WebSocket实战:实现实时聊天应用 - 双向通信技术详解

目录一、WebSocket&#xff1a;实时通信的"高速公路"1.1 HTTP的短板&#xff1a;永远的"单相思"1.2 WebSocket的优势&#xff1a;真正的"双向对话"二、30分钟搭建聊天服务器2.1 环境准备2.2 WebSocket配置类2.3 核心消息处理器三、前端实现&…

宏集案例 | 基于CODESYS的自动化控制系统,开放架构 × 高度集成 × 远程运维

​​案例概况客户&#xff1a;MACS Sterilisationsanlagen GmbH&#xff08;Ermafa Environmental Technologies GmbH 旗下&#xff09; 应用场景&#xff1a;医疗与感染性废弃物的无害化处理控制系统应用产品&#xff1a;宏集Berghof高性能控制器设备&#xff08;一&#xff0…

学习JNI 二

创建一个名为Learn1项目&#xff08;Android Studio&#xff09;。一、项目结构二、配置 build.gradlebuild.gradle.kts(:app)plugins {alias(libs.plugins.android.application)alias(libs.plugins.jetbrains.kotlin.android) }android {namespace "com.demo.learn1&quo…

基于Spring Boot+Vue的DIY手工社预约管理系统(Echarts图形化、腾讯地图API)

2.10 视频课程管理功能实现2.11手工互动&#xff08;视频弹幕&#xff09;2.8预约设置管理功能实现&#x1f388;系统亮点&#xff1a;Echarts图形化、腾讯地图API&#xff1b;文档包含功能结构图、系统架构图、用例图、实体属性图、E-R图。一.系统开发工具与环境搭建1.系统设计…

leetcode 每日一题 1353. 最多可以参加的会议数目

更多技术访问 我的个人网站 &#xff08;免费服务器&#xff0c;没有80/443端口&#xff09; 1353. 最多可以参加的会议数目 给你一个数组 events&#xff0c;其中 events[i] [startDayi, endDayi] &#xff0c;表示会议 i 开始于 startDayi &#xff0c;结束于 endDayi 。 …

AI+智慧园区 | 事件处置自动化——大模型重构园区治理逻辑

在智慧园区的建设浪潮中&#xff0c;事件管理一直是园区高效运营的关键环节。考拉悠然所推出的大模型 智慧园区解决方案&#xff0c;在事件智能闭环管理方面独树一帜&#xff0c;为园区的日常运营编织了一张严密、高效、智能的管理网络&#xff0c;实现了从事件感知到处置的全…

FFmpeg Windows安装

FFmpeg 用于音频文件转换 Builds - CODEX FFMPEG gyan.dev ffmpeg-release-full.7z 下载完成之后 zip解压 大概就是 ffmpeg/ └── bin/ └── ffmpeg.exe 配置环境变量 ffmpeg -version 有可能idea还是找不到命令 就把命令路径写在程序里 例如

【2025/07/10】GitHub 今日热门项目

GitHub 今日热门项目 &#x1f680; 每日精选优质开源项目 | 发现优质开源项目&#xff0c;跟上技术发展趋势 &#x1f4cb; 报告概览 &#x1f4ca; 统计项&#x1f4c8; 数值&#x1f4dd; 说明&#x1f4c5; 报告日期2025-07-10 (周四)GitHub Trending 每日快照&#x1f55…

JVM 基础 - JVM 内存结构

前言 本文主要对JVM 内存结构进行讲解&#xff0c;注意不要和Java内存模型混淆了。 运行时数据区 内存是非常重要的系统资源&#xff0c;是硬盘和 CPU 的中间仓库及桥梁&#xff0c;承载着操作系统和应用程序的实时运行。JVM 内存布局规定了 Java 在运行过程中内存申请、分配…

【案例】二手车交易价格预测-472

二手车交易价格预测 数据来源数据特征探索构建模型参考数据来源 天池 https://tianchi.aliyun.com/competition/entrance/231784/information 数据特征探索 目标特征工程做好之后,能同时进行 lightgbm catboost 神经网络等模型,所以尽量都转换为数值类特征。 如果仅仅是使用…