手把手教你玩多数据源动态切换!

为了提高应用的可靠性,多数据源现在也很常见,数据库可以搭建双 M 结构,这个松哥之前也发文和大家分享过如何搭建双 M 结构的主从备份?,那么 Java 代码里该如何操作多数据源呢?

我在 19 年的时候写过几篇文章教大家配置 JdbcTemplate、MyBatis 以及 JPA 中的多数据源(公众号江南一点雨后台回复 666 有相关的资料),不过那几篇文章的整体思路都是弄多个 Dao 层实例,然后手动选择用哪个实例,这样总感觉不太方便。

MyBatis-Plus 也提供了相应的工具,感兴趣的小伙伴可以自行尝试。

今天我想带领小伙伴们,利用 AOP 的思想,自己来写一个简单的多数据源切换工具。

1. 预备知识

想要自定义动态数据源切换,得先了解一个类 AbstractRoutingDataSource

AbstractRoutingDataSource 是在 Spring2.0.1 中引入的(注意是 Spring2.0.1 不是 Spring Boot2.0.1,所以这其实也算是 Spring 一个非常古老的特性了), 该类充当了 DataSource 的路由中介,它能够在运行时, 根据某种 key 值来动态切换到真正的 DataSource 上。

大致的用法就是你提前准备好各种数据源,存入到一个 Map 中,Map 的 key 就是这个数据源的名字,Map 的 value 就是这个具体的数据源,然后再把这个 Map 配置到 AbstractRoutingDataSource 中,最后,每次执行数据库查询的时候,拿一个 key 出来,AbstractRoutingDataSource 会找到具体的数据源去执行这次数据库操作。

大致思路就是这样。

接下来我们就来看看怎么玩。

2. 创建项目

首先我们创建一个 Spring Boot 项目,引入 Web、MyBatis 以及 MySQL 依赖,项目创建成功之后,再手动加入 Druid 和 AOP 依赖,如下:

1
2
3
4
5
6
7
8
9
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.2.9</version>
</dependency>

这块呢其实没啥好说的,都是常规操作。

3. 配置文件

接下来我们创建一个 application-druid.yaml 用来配置我们的数据源信息,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# 数据源配置
spring:
datasource:
type: com.alibaba.druid.pool.DruidDataSource
driverClassName: com.mysql.cj.jdbc.Driver
ds:
# 主库数据源,默认 master 不能变
master:
url: jdbc:mysql://127.0.0.1:3306/test08?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=Asia/Shanghai
username: root
password: 123
# 从库数据源
slave:
url: jdbc:mysql://127.0.0.1:3306/test07?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=Asia/Shanghai
username: root
password: 123
# 初始连接数
initialSize: 5
# 最小连接池数量
minIdle: 10
# 最大连接池数量
maxActive: 20
# 配置获取连接等待超时的时间
maxWait: 60000
# 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
timeBetweenEvictionRunsMillis: 60000
# 配置一个连接在池中最小生存的时间,单位是毫秒
minEvictableIdleTimeMillis: 300000
# 配置一个连接在池中最大生存的时间,单位是毫秒
maxEvictableIdleTimeMillis: 900000
# 配置检测连接是否有效
validationQuery: SELECT 1 FROM DUAL
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
druid:
webStatFilter:
enabled: true
statViewServlet:
enabled: true
# 设置白名单,不填则允许所有访问
allow:
url-pattern: /druid/*
# 控制台管理用户名和密码
login-username: javaboy
login-password: 123456
filter:
stat:
enabled: true
# 慢SQL记录
log-slow-sql: true
slow-sql-millis: 1000
merge-sql: true
wall:
config:
multi-statement-allow: true

都是 Druid 的常规配置,也没啥好说的,唯一需要注意的是我们整个配置文件的格式。ds 里边配置我们的所有数据源,每个数据源都有一个名字,master 是默认数据源的名字,不可修改,其他数据源都可以自定义名字。最后面我们还配置了 Druid 的监控功能,如果小伙伴们还不懂 Druid 的监控功能,可以查看Spring Boot 如何监控 SQL 运行情况?

不过小伙伴们知道,YAML 配置不像 properties 配置可以通过 @PropertySource 注解加载自定义的配置文件,YAML 配置没有类似的加载机制。不过工具是死的人是活的,我们可以利用 Spring Boot 的 profile 机制来加载这个自定义的 application-druid.yaml 配置文件,具体做法就是在 application.yaml 中加一行配置,如下:

1
2
3
spring:
profiles:
active: druid

接下来我们还需要提供一个配置类,将这个配置文件的内容加载到配置类中,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
@ConfigurationProperties(prefix = "spring.datasource")
public class DruidProperties {
private int initialSize;

private int minIdle;

private int maxActive;

private int maxWait;

private int timeBetweenEvictionRunsMillis;

private int minEvictableIdleTimeMillis;

private int maxEvictableIdleTimeMillis;

private String validationQuery;

private boolean testWhileIdle;

private boolean testOnBorrow;

private boolean testOnReturn;

private Map<String, Map<String, String>> ds;

public DruidDataSource dataSource(DruidDataSource datasource) {
/** 配置初始化大小、最小、最大 */
datasource.setInitialSize(initialSize);
datasource.setMaxActive(maxActive);
datasource.setMinIdle(minIdle);

/** 配置获取连接等待超时的时间 */
datasource.setMaxWait(maxWait);

/** 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 */
datasource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);

/** 配置一个连接在池中最小、最大生存的时间,单位是毫秒 */
datasource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
datasource.setMaxEvictableIdleTimeMillis(maxEvictableIdleTimeMillis);

/**
* 用来检测连接是否有效的sql,要求是一个查询语句,常用select 'x'。如果validationQuery为null,testOnBorrow、testOnReturn、testWhileIdle都不会起作用。
*/
datasource.setValidationQuery(validationQuery);
/** 建议配置为true,不影响性能,并且保证安全性。申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效。 */
datasource.setTestWhileIdle(testWhileIdle);
/** 申请连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。 */
datasource.setTestOnBorrow(testOnBorrow);
/** 归还连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。 */
datasource.setTestOnReturn(testOnReturn);
return datasource;
}

public int getInitialSize() {
return initialSize;
}

public void setInitialSize(int initialSize) {
this.initialSize = initialSize;
}

public int getMinIdle() {
return minIdle;
}

public void setMinIdle(int minIdle) {
this.minIdle = minIdle;
}

public int getMaxActive() {
return maxActive;
}

public void setMaxActive(int maxActive) {
this.maxActive = maxActive;
}

public int getMaxWait() {
return maxWait;
}

public void setMaxWait(int maxWait) {
this.maxWait = maxWait;
}

public int getTimeBetweenEvictionRunsMillis() {
return timeBetweenEvictionRunsMillis;
}

public void setTimeBetweenEvictionRunsMillis(int timeBetweenEvictionRunsMillis) {
this.timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis;
}

public int getMinEvictableIdleTimeMillis() {
return minEvictableIdleTimeMillis;
}

public void setMinEvictableIdleTimeMillis(int minEvictableIdleTimeMillis) {
this.minEvictableIdleTimeMillis = minEvictableIdleTimeMillis;
}

public int getMaxEvictableIdleTimeMillis() {
return maxEvictableIdleTimeMillis;
}

public void setMaxEvictableIdleTimeMillis(int maxEvictableIdleTimeMillis) {
this.maxEvictableIdleTimeMillis = maxEvictableIdleTimeMillis;
}

public String getValidationQuery() {
return validationQuery;
}

public void setValidationQuery(String validationQuery) {
this.validationQuery = validationQuery;
}

public boolean isTestWhileIdle() {
return testWhileIdle;
}

public void setTestWhileIdle(boolean testWhileIdle) {
this.testWhileIdle = testWhileIdle;
}

public boolean isTestOnBorrow() {
return testOnBorrow;
}

public void setTestOnBorrow(boolean testOnBorrow) {
this.testOnBorrow = testOnBorrow;
}

public boolean isTestOnReturn() {
return testOnReturn;
}

public void setTestOnReturn(boolean testOnReturn) {
this.testOnReturn = testOnReturn;
}

public Map<String, Map<String, String>> getDs() {
return ds;
}

public void setDs(Map<String, Map<String, String>> ds) {
this.ds = ds;
}
}

这个配置类没啥好说的,我们配置的多个数据源我将之读取到了一个名为 ds 的 Map 中,将来就根据这个 Map 中的数据来构造数据源。

4. 加载数据源

接下来我们要根据配置文件来加载数据源。加载方式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public interface DynamicDataSourceProvider {
String DEFAULT_DATASOURCE = "master";
/**
* 加载所有的数据源
* @return
*/
Map<String, DataSource> loadDataSources();
}
@Configuration
@EnableConfigurationProperties(DruidProperties.class)
public class YamlDynamicDataSourceProvider implements DynamicDataSourceProvider {
@Autowired
DruidProperties druidProperties;

@Override
public Map<String, DataSource> loadDataSources() {
Map<String, DataSource> ds = new HashMap<>(druidProperties.getDs().size());
try {
Map<String, Map<String, String>> map = druidProperties.getDs();
Set<String> keySet = map.keySet();
for (String s : keySet) {
DruidDataSource dataSource = (DruidDataSource) DruidDataSourceFactory.createDataSource(map.get(s));
ds.put(s, druidProperties.dataSource(dataSource));
}
} catch (Exception e) {
e.printStackTrace();
}
return ds;
}
}

加载的核心工作在 YamlDynamicDataSourceProvider 类中完成的。该类中有一个 loadDataSources 方法表示读取所有的数据源对象。数据源的相关属性都在 druidProperties 对象中,我们先根据基本的数据库连接信息创建一个 DataSource 对象,然后再调用 druidProperties#dataSource 方法为这些数据源连接池配置其他的属性(最大连接数、最小空闲数等),最后,以 key-value 的形式将数据源存入一个 Map 集合中,每一个数据源的 key 就是你在 YAML 中配置的数据源名称。

5. 数据源切换

对于当前数据库操作使用哪个数据源?我们有很多种不同的设置方案,当然最为省事的办法是把当前使用的数据源信息存入到 ThreadLocal 中,ThreadLocal 的特点,简单说就是在哪个线程中存入的数据,在哪个线程才能取出来,换一个线程就取不出来了,这样可以确保多线程环境下的数据安全。

先来一个简单的工具类,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class DynamicDataSourceContextHolder {
public static final Logger log = LoggerFactory.getLogger(DynamicDataSourceContextHolder.class);

/**
* 使用ThreadLocal维护变量,ThreadLocal为每个使用该变量的线程提供独立的变量副本,
* 所以每一个线程都可以独立地改变自己的副本,而不会影响其它线程所对应的副本。
*/
private static final ThreadLocal<String> CONTEXT_HOLDER = new ThreadLocal<>();

/**
* 设置数据源的变量
*/
public static void setDataSourceType(String dsType) {
log.info("切换到{}数据源", dsType);
CONTEXT_HOLDER.set(dsType);
}

/**
* 获得数据源的变量
*/
public static String getDataSourceType() {
return CONTEXT_HOLDER.get();
}

/**
* 清空数据源变量
*/
public static void clearDataSourceType() {
CONTEXT_HOLDER.remove();
}
}

接下来我们自定义一个注解用来标记当前的数据源,如下:

1
2
3
4
5
6
7
8
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.TYPE})
public @interface DataSource {
String dataSourceName() default DynamicDataSourceProvider.DEFAULT_DATASOURCE;

@AliasFor("dataSourceName")
String value() default DynamicDataSourceProvider.DEFAULT_DATASOURCE;
}

这个注解将来加在 Service 层的方法上,使用该注解的时候,需要指定一个数据源名称,不指定的话,默认就使用 master 作为数据源。

我们还需要通过 AOP 来解析当前的自定义注解,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Aspect
@Order(1)
@Component
public class DataSourceAspect {
@Pointcut("@annotation(org.javaboy.demo.annotation.DataSource)"
+ "|| @within(org.javaboy.demo.annotation.DataSource)")
public void dsPc() {

}

@Around("dsPc()")
public Object around(ProceedingJoinPoint point) throws Throwable {
DataSource dataSource = getDataSource(point);

if (Objects.nonNull(dataSource)) {
DynamicDataSourceContextHolder.setDataSourceType(dataSource.dataSourceName());
}

try {
return point.proceed();
} finally {
// 销毁数据源 在执行方法之后
DynamicDataSourceContextHolder.clearDataSourceType();
}
}

/**
* 获取需要切换的数据源
*/
public DataSource getDataSource(ProceedingJoinPoint point) {
MethodSignature signature = (MethodSignature) point.getSignature();
DataSource dataSource = AnnotationUtils.findAnnotation(signature.getMethod(), DataSource.class);
if (Objects.nonNull(dataSource)) {
return dataSource;
}
return AnnotationUtils.findAnnotation(signature.getDeclaringType(), DataSource.class);
}
}
  1. 首先,我们在 dsPc() 方法上定义了切点,我们拦截下所有带有 @DataSource 注解的方法,同时由于该注解也可以加在类上,如果该注解加在类上,就表示类中的所有方法都使用该数据源。
  2. 接下来我们定义了一个环绕通知,首先根据当前的切点,调用 getDataSource 方法获取到 @DataSource 注解,这个注解可能来自方法上也可能来自类上,方法上的优先级高于类上的优先级。如果拿到的注解不为空,则我们在 DynamicDataSourceContextHolder 中设置当前的数据源名称,设置完成后进行方法的调用;如果拿到的注解为空,那么就直接进行方法的调用,不再设置数据源了(将来会自动使用默认的数据源)。最后记得方法调用完成后,从 ThreadLocal 中移除数据源。

6. 定义动态数据源

接下来我们来自定义一个动态数据源:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class DynamicDataSource extends AbstractRoutingDataSource {

DynamicDataSourceProvider dynamicDataSourceProvider;

public DynamicDataSource(DynamicDataSourceProvider dynamicDataSourceProvider) {
this.dynamicDataSourceProvider = dynamicDataSourceProvider;
Map<Object, Object> targetDataSources = new HashMap<>(dynamicDataSourceProvider.loadDataSources());
super.setTargetDataSources(targetDataSources);
super.setDefaultTargetDataSource(dynamicDataSourceProvider.loadDataSources().get(DynamicDataSourceProvider.DEFAULT_DATASOURCE));
super.afterPropertiesSet();
}

@Override
protected Object determineCurrentLookupKey() {
String dataSourceType = DynamicDataSourceContextHolder.getDataSourceType();
return dataSourceType;
}
}

这就是我们文章开头所说的 AbstractRoutingDataSource 了,该类有一个方法名为 determineCurrentLookupKey,当需要使用数据源的时候,系统会自动调用该方法,获取当前数据源的标记,如 master 或者 slave 或者其他,拿到标记之后,就可以据此获取到一个数据源了。

当我们配置 DynamicDataSource 的时候,需要配置两个关键的参数,一个是 setTargetDataSources,这个就是当前所有的数据源,把当前所有的数据源都告诉给 AbstractRoutingDataSource,这些数据源都是 key-value 的形式(将来根据 determineCurrentLookupKey 方法返回的 key 就可以获取到具体的数据源了);另一个方法是 setDefaultTargetDataSource,这个就是默认的数据源,当我们执行一个数据库操作的时候,如果没有指定数据源(例如 Service 层的方法没有加 @DataSource 注解),那么默认就使用这个数据源。

最后,再将这个 bean 注册到 Spring 容器中,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@Configuration
public class DruidAutoConfiguration {
@Autowired
DynamicDataSourceProvider dynamicDataSourceProvider;

@Bean
DynamicDataSource dynamicDataSource() {
return new DynamicDataSource(dynamicDataSourceProvider);
}

/**
* 去除数据源监控页面的广告
*
* @param properties
* @return
*/
@Bean
@ConditionalOnProperty(name = "spring.datasource.druid.statViewServlet.enabled", havingValue = "true")
public FilterRegistrationBean removeDruidFilterRegistrationBean(DruidStatProperties properties) {
// 获取web监控页面的参数
DruidStatProperties.StatViewServlet config = properties.getStatViewServlet();
// 提取common.js的配置路径
String pattern = config.getUrlPattern() != null ? config.getUrlPattern() : "/druid/*";
String commonJsPattern = pattern.replaceAll("\\*", "js/common.js");
// 创建filter进行过滤
Filter filter = new Filter() {
@Override
public void init(javax.servlet.FilterConfig filterConfig) throws ServletException {
}

@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
String text = Utils.readFromResource("support/http/resources/js/common.js");
text = text.replace("this.buildFooter();", "");
response.getWriter().write(text);
}

@Override
public void destroy() {
}
};
FilterRegistrationBean registrationBean = new FilterRegistrationBean();
registrationBean.setFilter(filter);
registrationBean.addUrlPatterns(commonJsPattern);
return registrationBean;
}
}

下面,我们还配置了一个过滤器,这个过滤器的目的是去除 Druid 监控页面的阿里广告,具体原理参考Spring Boot 如何监控 SQL 运行情况?一文。

7. 测试

好啦,大功告成,我们再来测试一下,写一个 UserMapper:

1
2
3
4
5
@Mapper
public interface UserMapper {
@Select("select count(*) from user")
Integer count();
}

一个很简单的数据库查询操作。

再来一个 service:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Service
public class UserService {

@Autowired
UserMapper userMapper;

@DataSource("master")
public Integer master() {
return userMapper.count();
}

@DataSource("slave")
public Integer slave() {
return userMapper.count();
}
}

通过 @DataSource 注解来指定具体操作的数据源,如果没有使用该注解指定,默认就使用 master 数据源。

最后去单元测试中测一下,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
@SpringBootTest
class DynamicDatasourceDemoApplicationTests {

@Autowired
UserService userService;

@Test
void contextLoads() {
System.out.println("userService.master() = " + userService.master());
System.out.println("userService.slave() = " + userService.slave());
}

}

由于我这里 master 和 slave 分别对应了不同的库,这里查询会展示出不同的结果。

8. 小结

知其然知其所以然!

好啦,公众号江南一点雨后台回复 dynamic_datasource,获取本文源码下载链接。