来自:https://blog.csdn.net/qq381332153
1. 背景
-
如何动态管理多个数据源以及切换?
-
如何保证多数据源场景下的数据一致性(事务)?
2. 数据源切换原理
AbstractRoutingDataSource
,可以实现切换数据源。其类结构如下图所示:-
targetDataSources&defaultTargetDataSource
-
resolvedDataSources&resolvedDefaultDataSource
AbstractRoutingDataSource
对象时,通过调用afterPropertiesSet
复制上述目标数据源。由此可见,一旦数据源实例对象创建完毕,业务无法再添加新的数据源。-
determineCurrentLookupKey
Map
其key为lookup key
。resolvedDataSources
中取出DataSource。3. 配置文件解决方案
-
定义
DynamicDataSource
类继承AbstractRoutingDataSource
,重写determineCurrentLookupKey()
方法。 -
配置多个数据源注入
targetDataSources
和defaultTargetDataSource
,通过afterPropertiesSet()
方法将数据源写入resolvedDataSources
和resolvedDefaultDataSource
。 -
调用
AbstractRoutingDataSource
的getConnection()
方法时,determineTargetDataSource()
方法返回DataSource
执行底层的getConnection()
。
3.1 创建数据源
DynamicDataSource
数据源的注入,目前业界主流实现步骤如下:spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.driverClassName=com.mysql.jdbc.Driver
# 主数据源
spring.datasource.druid.master.url=jdbcUrl
spring.datasource.druid.master.username=***
spring.datasource.druid.master.password=***
# 其他数据源
spring.datasource.druid.second.url=jdbcUrl
spring.datasource.druid.second.username=***
spring.datasource.druid.second.password=***
@Configuration
public class DynamicDataSourceConfig {
@Bean
@ConfigurationProperties("spring.datasource.druid.master")
public DataSource firstDataSource(){
return DruidDataSourceBuilder.create().build();
}
@Bean
@ConfigurationProperties("spring.datasource.druid.second")
public DataSource secondDataSource(){
return DruidDataSourceBuilder.create().build();
}
@Bean
@Primary
public DynamicDataSource dataSource(DataSource firstDataSource, DataSource secondDataSource) {
Map
3.2 AOP处理
DataSourceAspect
切面技术来简化业务上的使用,只需要在业务方法添加@SwitchDataSource
注解即可完成动态切换:@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface SwitchDataSource {
String value();
}
DataSourceAspect
拦截业务方法,更新当前线程上下文DataSourceContextHolder
中存储的key,即可实现数据源切换。3.3 方案不足
AbstractRoutingDataSource
的多数据源动态切换,有个明显的缺点,无法动态添加和删除数据源。在我们的产品中,不能把应用数据源写死在配置文件。接下来分享一下基于数据库表的实现方案。4. 数据库表解决方案
AbstractRoutingDataSource
的设计思路,实现自定义数据源管理。4.1 设计数据源表
DataSource
的相关配置参数,其相应的ORM操作代码在此不再赘述,主要是实现数据源的增删改查操作。4.2 自定义数据源管理
4.2.1 定义管理接口
AbstractDataSource
即可实现DynamicDataSource
。为了方便对数据源进行操作,我们定义一个接口DataSourceManager
,为业务提供操作数据源的统一接口。public interface DataSourceManager {
void put(String var1, DataSource var2);
DataSource get(String var1);
Boolean hasDataSource(String var1);
void remove(String var1);
void closeDataSource(String var1);
Collectionall() ;
}
4.2.2 自定义数据源
DynamicDataSource
的实现如下图所示:AbstractRoutingDataSource
是在容器启动的时候,执行afterPropertiesSet
注入数据源对象,完成之后无法对数据源进行修改。DynamicDataSource
则实现DataSourceManager
接口,可以将数据表中的数据源加载到dataSources。4.2.3 切面处理
public DataSource determineTargetDataSource() {
String lookupKey = DataSourceContextHolder.getKey();
DataSource dataSource = Optional.ofNullable(lookupKey)
.map(dataSources::get)
.orElse(defaultDataSource);
if (dataSource == null) {
throw new IllegalStateException("Cannot determine DataSource for lookup key [" + lookupKey + "]");
}
return dataSource;
}
4.2.4 管理数据源状态
DataSourceBuilder
类,根据数据源表的定义创建DataSource。在项目运行过程中,可以使用定时任务对数据源进行保活,为了提升性能再添加一层缓存。AbstractRoutingDataSource
只支持单库事务,切换数据源是在开启事务之前执行。Spring使用 DataSourceTransactionManager
进行事务管理。开启事务,会将数据源缓存到DataSourceTransactionObject
对象中,后续的commit和 rollback事务操作实际上是使用的同一个数据源。@SwitchDataSource
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
6. 多库事务处理
6.1 关于事务的理解
-
begin(事务开始): 可以认为存在于数据库的命令中,比如Mysql的
start transaction
命令,但是在JDBC编程方式中不存在。 -
close(事务关闭):Spring事务的close()方法,是把
Connection
对象归还给数据库连接池,与事务无关。 -
suspend(事务挂起):Spring中事务挂起的语义是:需要新事务时,将现有的
Connection
保存起来(还有尚未提交的事务),然后创建新的Connection2
,Connection2
提交、回滚、关闭完毕后,再把Connection1
取出来继续执行。 -
resume(事务恢复): 嵌套事务执行完毕,返回上层事务重新绑定连接对象到事务管理器的过程。
setAutoCommit()
、commit()
、rollback()
。-
关闭一个数据库连接,这已经不再是事务的方法了。
6.2 自定义管理事务
Connetion
的事务提交和回滚。考虑到不同ORM框架的事务管理实现差异,要求实现自定义事务管理不影响框架层的事务。Connection
进行包装重写commit和rolllback屏蔽其默认行为,这样就不会影响到原生Connection
和ORM框架的默认事务行为。其整体思路如下图所示:@SwitchDataSource
,这是因为我们在TransactionAop
中已经执行了lookupKey的切换。6.2.1 定义多事务注解
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MultiTransaction {
String transactionManager() default "multiTransactionManager";
// 默认数据隔离级别,随数据库本身默认值
IsolationLevel isolationLevel() default IsolationLevel.DEFAULT;
// 默认为主库数据源
String datasourceId() default "default";
// 只读事务,若有更新操作会抛出异常
boolean readOnly() default false;
datasourceId
指定事务用到的数据源,不指定默认为主库。6.2.3 包装Connection
Connection
,屏蔽其中的commit&rollback
方法。这样我们就可以在主事务里进行统一的事务提交和回滚操作。public class ConnectionProxy implements Connection {
private final Connection connection;
public ConnectionProxy(Connection connection) {
this.connection = connection;
}
@Override
public void commit() throws SQLException {
// connection.commit();
}
public void realCommit() throws SQLException {
connection.commit();
}
@Override
public void close() throws SQLException {
//connection.close();
}
public void realClose() throws SQLException {
if (!connection.getAutoCommit()) {
connection.setAutoCommit(true);
}
connection.close();
}
@Override
public void rollback() throws SQLException {
if(!connection.isClosed())
connection.rollback();
}
...
}
commit&close
方法不执行操作,rollback执行的前提是连接执行close才生效。这样不管是使用哪个ORM框架,其自身事务管理都将失效。事务的控制就交由MultiTransaction
控制了。6.2.4 事务上下文管理
public class TransactionHolder {
// 是否开启了一个MultiTransaction
private boolean isOpen;
// 是否只读事务
private boolean readOnly;
// 事务隔离级别
private IsolationLevel isolationLevel;
// 维护当前线程事务ID和连接关系
private ConcurrentHashMapconnectionMap;
// 事务执行栈
private StackexecuteStack;
// 数据源切换栈
private StackdatasourceKeyStack;
// 主事务ID
private String mainTransactionId;
// 执行次数
private AtomicInteger transCount;
// 事务和数据源key关系
private ConcurrentHashMapexecuteIdDatasourceKeyMap;
}
ConnectionProxy
。事务嵌套调用,保存事务ID和lookupKey至栈中,当内层事务执行完毕执行pop。这样的话,外层事务只需在栈中执行peek即可获取事务ID和lookupKey。6.2.5 数据源兼容处理
getConnection
方法。当前线程没有启动自定义事务,则直接从数据源中返回连接。@Override
public Connection getConnection() throws SQLException {
TransactionHolder transactionHolder = MultiTransactionManager.TRANSACTION_HOLDER_THREAD_LOCAL.get();
if (Objects.isNull(transactionHolder)) {
return determineTargetDataSource().getConnection();
}
ConnectionProxy ConnectionProxy = transactionHolder.getConnectionMap()
.get(transactionHolder.getExecuteStack().peek());
if (ConnectionProxy == null) {
// 没开跨库事务,直接返回
return determineTargetDataSource().getConnection();
} else {
transactionHolder.addCount();
// 开了跨库事务,从当前线程中拿包装过的Connection
return ConnectionProxy;
}
}
6.2.6 切面处理
package com.github.mtxn.transaction.aop;
@Aspect
@Component
@Slf4j
@Order(99999)
public class MultiTransactionAop {
@Pointcut("@annotation(com.github.mtxn.transaction.annotation.MultiTransaction)")
public void pointcut() {
if (log.isDebugEnabled()) {
log.debug("start in transaction pointcut...");
}
}
@Around("pointcut()")
public Object aroundTransaction(ProceedingJoinPoint point) throws Throwable {
MethodSignature signature = (MethodSignature) point.getSignature();
// 从切面中获取当前方法
Method method = signature.getMethod();
MultiTransaction multiTransaction = method.getAnnotation(MultiTransaction.class);
if (multiTransaction == null) {
return point.proceed();
}
IsolationLevel isolationLevel = multiTransaction.isolationLevel();
boolean readOnly = multiTransaction.readOnly();
String prevKey = DataSourceContextHolder.getKey();
MultiTransactionManager multiTransactionManager = Application.resolve(multiTransaction.transactionManager());
// 切数据源,如果失败使用默认库
if (multiTransactionManager.switchDataSource(point, signature, multiTransaction)) return point.proceed();
// 开启事务栈
TransactionHolder transactionHolder = multiTransactionManager.startTransaction(prevKey, isolationLevel, readOnly, multiTransactionManager);
Object proceed;
try {
proceed = point.proceed();
multiTransactionManager.commit();
} catch (Throwable ex) {
log.error("execute method:{}#{},err:", method.getDeclaringClass(), method.getName(), ex);
multiTransactionManager.rollback();
throw ExceptionUtils.api(ex, "系统异常:%s", ex.getMessage());
} finally {
// 当前事务结束出栈
String transId = multiTransactionManager.getTrans().getExecuteStack().pop();
transactionHolder.getDatasourceKeyStack().pop();
// 恢复上一层事务
DataSourceContextHolder.setKey(transactionHolder.getDatasourceKeyStack().peek());
// 最后回到主事务,关闭此次事务
multiTransactionManager.close(transId);
}
return proceed;
}
}
7.总结
1、本站所有资源均从互联网上收集整理而来,仅供学习交流之用,因此不包含技术服务请大家谅解!
2、本站不提供任何实质性的付费和支付资源,所有需要积分下载的资源均为网站运营赞助费用或者线下劳务费用!
3、本站所有资源仅用于学习及研究使用,您必须在下载后的24小时内删除所下载资源,切勿用于商业用途,否则由此引发的法律纠纷及连带责任本站和发布者概不承担!
4、本站站内提供的所有可下载资源,本站保证未做任何负面改动(不包含修复bug和完善功能等正面优化或二次开发),但本站不保证资源的准确性、安全性和完整性,用户下载后自行斟酌,我们以交流学习为目的,并不是所有的源码都100%无错或无bug!如有链接无法下载、失效或广告,请联系客服处理!
5、本站资源除标明原创外均来自网络整理,版权归原作者或本站特约原创作者所有,如侵犯到您的合法权益,请立即告知本站,本站将及时予与删除并致以最深的歉意!
6、如果您也有好的资源或教程,您可以投稿发布,成功分享后有站币奖励和额外收入!
7、如果您喜欢该资源,请支持官方正版资源,以得到更好的正版服务!
8、请您认真阅读上述内容,注册本站用户或下载本站资源即您同意上述内容!
原文链接:https://www.shuli.cc/?p=19898,转载请注明出处。
评论0