文章目录
- 问题分析
- 解决方案
- 实现原理解析
- 执行流程说明
- 运行实例
- 正常流程执行
- 执行异常流程
- 关键优势
在分布式系统开发中,我们经常会遇到本地事务与远程服务调用结合的场景。当本地事务包含RPC调用时,如果事务回滚,RPC调用已经执行就会导致数据不一致。本文将介绍如何优雅地解决这个问题。
问题分析
考虑以下场景:有方法A、B、C组成一个大事务,其中B方法需要调用RPC服务。如果C方法执行失败导致整个事务回滚,但RPC已经调用并提交,就会造成数据不一致。
核心问题是:RPC调用默认会在本地事务提交前执行,无法参与事务回滚。
解决方案
Spring提供了事务同步机制(TransactionSynchronization),允许我们注册回调函数,在事务完成后执行特定操作。利用这个机制,我们可以确保:
只有当本地事务成功提交后,才会执行RPC调用;如果事务回滚,则不执行RPC
以下是实现代码:
package cn.bb.mydemo.service;import cn.bb.mydemo.entity.Student;
import cn.bb.mydemo.mapper.StudentMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;import java.time.LocalTime;
import java.util.concurrent.CompletableFuture;@Service
public class StudentService {@Autowiredprivate StudentMapper studentMapper;@Transactional(rollbackFor = Exception.class)public void saveWithRpcAfterTx(String name) {// 1. 执行本地数据库操作Student student = new Student();student.setName(name);studentMapper.insert(student);// 2. 注册事务同步回调:在事务提交后异步执行RPCTransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {@Overridepublic void afterCommit() {// 使用CompletableFuture异步执行RPC,避免阻塞事务提交CompletableFuture.runAsync(() -> doRpc(name));}});}@Transactional(rollbackFor = Exception.class)public void run(String name) {Student student = new Student();student.setName(name);studentMapper.insert(student);}@Transactional(rollbackFor = Exception.class)public void allRun() {// 调用不同方法组成一个大事务this.saveWithRpcAfterTx("Alice");this.run("a");this.run("b");System.out.println("本地事务已提交");// int i = 1 / 0; // 取消注释此行会触发异常,导致事务回滚}// 模拟RPC调用private void doRpc(String name) {System.out.println(LocalTime.now() + " RPC调用收到:" + name);}
}
测试代码验证实现效果:
@SpringBootTest
class TxAsyncTest {@Autowiredprivate StudentService studentService;@Testvoid shouldRpcAfterTxCommit2() throws InterruptedException {// 调用组合事务方法studentService.allRun();// 等待异步RPC执行完成Thread.sleep(20000);}
}
实现原理解析
这个解决方案的核心在于TransactionSynchronizationManager
和TransactionSynchronizationAdapter
的使用:
-
TransactionSynchronizationManager:Spring提供的事务同步管理器,允许注册回调函数监听事务生命周期事件
-
TransactionSynchronizationAdapter:事务同步适配器,我们可以重写其中的关键方法:
afterCommit()
:事务成功提交后执行afterCompletion()
:事务完成后执行(无论成功或失败)beforeCommit()
:事务提交前执行
-
异步执行:使用
CompletableFuture.runAsync()
确保RPC调用不会阻塞事务提交过程
执行流程说明
当调用allRun()
方法时,执行流程如下:
- 开启数据库事务
- 执行
saveWithRpcAfterTx()
:插入数据库记录并注册事务同步回调 - 执行
run()
两次:插入两条额外记录 - 如果没有异常,事务提交
- 事务提交后,触发注册的
afterCommit()
回调 - 异步执行RPC调用
- 如果任何步骤发生异常,事务回滚,不会触发RPC调用
运行实例
正常流程执行
新增了ab 并且 rpc是最后被调用的 虽然这个方法被放在了中间
执行异常流程
数据也没有添加什么 事务被回滚了 这就保证了我们的需求【当事务被提交 rpc才被调用 否则错误就不执行rpc 防止重复数据的生成】
关键优势
- 数据一致性:确保本地事务与RPC调用的最终一致性
- 非侵入性:不需要修改现有事务管理代码
- 异步执行:不影响事务性能
- 简单可靠:基于Spring内置机制,无需额外框架
这种方法特别适合那些需要保证本地数据与远程服务数据一致性的场景,是解决大事务中RPC无法回滚问题的优雅方案。