Spring Boot 多数据源如何处理事务?教你一招!

好啦,经过前面几篇文章的学习,现在小伙伴们已经明白了多数据源要怎么玩了:

也明白了分布式事务要怎么玩了:

那么接下来的问题,就是如何利用分布式事务来处理多数据源中的事务问题。

首先我先声明一点,本文单纯就是技术探讨,要从实际应用中来说的话,我并不建议这样去玩分布式事务、也不建议这样去玩多数据源,毕竟分布式事务主要还是用在微服务场景下。

好啦,那就不废话了,开整。

1. 思路梳理

首先我们来梳理一下思路。

在上篇文章中,我们是一个微服务,在 A 中分别去调用 B 和 C,当 B 或者 C 有一个执行失败的时候,就去回滚。B 和 C 都是调用远程的服务,所谓的回滚也不是传统意义上的数据库回滚,而是一种“反向补偿”,即利用一条更新 SQL,将已经更新的数据复原。在这个例子中,B 和 C 都是远程服务,操作的也都是不同的数据库,这不就是我们多数据源中的情况么!

在微服务中,一个服务实际上就代表了一个数据源,而在我们多数据源的案例中,一个注解就能标记出来一个数据源,这样一类比,你就会发现利用分布式事务来解决多数据源中的事务问题其实是非常 Easy 的。而且这里还不是微服务项目,只是一个单体项目,更简单!

不过也有一些需要注意的细节。

2. 代码实践

接下来我们就结合代码来讲讲。

2.1 案例准备

首先多数据源的案例我就不重复写了,我们之前已经写过一个,这里就不再赘述,文章一开头也有相关的链接,还没看过的小伙伴可以先看看。

也可以直接在公众号后台回复 dynamic_datasource 获取相关的案例。

2.2 开始整活

因为上篇文章我主要是和大家分享的 seata 的 AT 模式,所以本文也是一样,就先采用 AT 模式。

小伙伴们知道,在我们的多数据源案例中,我们用到了两个库,test08 和 test09,现在也还是这两个库,但是现在由于我们使用的是 AT 模式,我们需要在这两个库中分别创建 undo log 表,用来记录我们对表的更新操作,当事务提交之后,undo log 表中的数据就会被清除,undo log,undo log 表的脚本如下:

1
2
3
4
5
6
7
8
9
10
11
12
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

数据库准备好之后,接下来就是准备依赖了,seata 有两个依赖,一个是 seata-all,还有一个微服务版的,咱们这里就直接使用上篇文章中所用到的微服务版的,依赖如下:

1
2
3
4
5
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<version>2.2.2.RELEASE</version>
</dependency>

配好之后,接下来提供两个配置文件 file.conf 和 regsigry.conf,这两个配置文件和上篇文章中介绍到的一模一样,这里不再赘述。

接下来配置 application.yaml,如下:

1
2
3
4
5
6
7
8
9
10
spring:
cloud:
alibaba:
seata:
tx-service-group: my_test_tx_group
main:
allow-circular-references: true
seata:
enable-auto-data-source-proxy: false
application-id: dd

大家看下这里的几个配置:

  • tx-service-group:这个是事务群组的名称,相关名字是在 file.conf 中配置的。
  • allow-circular-references:这个是允许循环依赖,可能有的小伙伴已经知道,现在最新版的 Spring Boot 中已经禁掉了循环依赖,但是这个 seata 中似乎还是用到了循环依赖,所以要开启。
  • enable-auto-data-source-proxy:由于 seata 会自动代理数据源,但是我们现在的数据源是自己加载的,所以关闭掉这个数据源的自动代理,将来用自己的。
  • application-id:给我们的应用取一个名字。

好啦,这个文件就配置好了。

接下来就是数据源问题了,刚刚说了,seata 中会自动代理数据源,用到的代理对象是 DataSourceProxy,而我们在之前自定义的数据源加载中,并没有用到这个 DataSourceProxy 对象所以这里要稍作修改,一共改两个地方,如下:

LoadDataSource.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Component
@EnableConfigurationProperties(DruidProperties.class)
public class LoadDataSource {
@Autowired
DruidProperties druidProperties;

public Map<String, DataSourceProxy> loadAllDataSource() {
Map<String, DataSourceProxy> map = new HashMap<>();
Map<String, Map<String, String>> ds = druidProperties.getDs();
try {
Set<String> keySet = ds.keySet();
for (String key : keySet) {
DataSource dataSource = druidProperties.dataSource((DruidDataSource) DruidDataSourceFactory.createDataSource(ds.get(key)));
DataSourceProxy proxyDs = new DataSourceProxy(dataSource);
map.put(key, proxyDs);
}
} catch (Exception e) {
e.printStackTrace();
}
return map;
}
}

其实这里的改动就是把之前的 DataSource 用 DataSourceProxy 重新包裹一下,然后将获取到的 DataSourceProxy 存起来。最后再修改一下动态数据源的地方:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Component
public class DynamicDataSource extends AbstractRoutingDataSource {

public DynamicDataSource(LoadDataSource loadDataSource) {
//1.设置所有的数据源
Map<String, DataSourceProxy> allDs = loadDataSource.loadAllDataSource();
super.setTargetDataSources(new HashMap<>(allDs));
//2.设置默认的数据源
//将来,并不是所有的方法上都有 @DataSource 注解,对于那些没有 @DataSource 注解的方法,该使用哪个数据源?
super.setDefaultTargetDataSource(allDs.get(DataSourceType.DEFAULT_DS_NAME));
//3
super.afterPropertiesSet();
}

/**
* 这个方法用来返回数据源名称,当系统需要获取数据源的时候,会自动调用该方法获取数据源的名称
* @return
*/
@Override
protected Object determineCurrentLookupKey() {
return DynamicDataSourceContextHolder.getDataSourceType();
}
}

Map 中的 value 类型变为 DataSourceProxy,其他都不变。

另外还有一个地方要改造下,就是解析 @DataSource 注解的切面,在之前的解析中,我们是将异常捕获了,现在我们要将之抛出来,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Around("pc()")
public Object around(ProceedingJoinPoint pjp) throws Throwable {
//获取方法上面的有效注解
DataSource dataSource = getDataSource(pjp);
if (dataSource != null) {
//获取注解中数据源的名称
String value = dataSource.value();
DynamicDataSourceContextHolder.setDataSourceType(value);
}
try {
return pjp.proceed();
} finally {
DynamicDataSourceContextHolder.clearDataSourceType();
}
}

将之抛出来的原因也很简单,因为这是切面方法,所有的 service 层方法都在这里执行,如果将异常捕获了,将来 service 层方法不抛出异常,事务就没法生效了。

好了,现在准备工作就算是到位了。

接下来我们写一个简单的多数据源事务的案例,首先我们来创建一个 MasterService,专门用来操作 master 数据源:

1
2
3
4
5
6
7
8
9
10
11
@Service
public class MasterService {

@Autowired
MasterMapper masterMapper;

@DataSource("master")
public void addUser(String username, Integer age) {
masterMapper.addUser(username, age);
}
}

mapper 就不用看了吧,就是普通的添加,大家可以在文末下载本文案例案例。

再来一个 SlaveService,用来操作 slave 数据源:

1
2
3
4
5
6
7
8
9
10
11
12
@Service
public class SlaveService {

@Autowired
SlaveMapper slaveMapper;

@DataSource("slave")
public void addAccount(String name, Double balance) {
int i = 1 / 0;
slaveMapper.addAccount(name, balance);
}
}

slave 数据源的方法中有一个异常。

最后,我们在 UserService 中分别调用这两个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Service
public class UserService {

@Autowired
MasterService masterService;
@Autowired
SlaveService slaveService;

@GlobalTransactional(rollbackFor = Exception.class)
public void test() {
masterService.addUser("javaboy.org", 99);
slaveService.addAccount("javaboy.org", 99.0);
}
}

注意,test 方法上有一个全局事务注解。

好啦,齐活!现在我们去执行这个 test 方法,由于 slaveService#addAccount 中的方法会抛出异常,所以会导致整个事务回滚,最终的结果就是 master 中也没有添加进数据。

3. 总结

好啦,结合上一篇文章,相信大家应该能够熟练的使用 seata 分布式事务中的 at 模式了吧!关于本文的内容,松哥也会整一个视频放在 TienChin 项目的视频中,感兴趣的小伙伴戳戳戳这里: