來自: https://blog.csdn.net/qq381332153
1. 背景
一個主庫和N個套用庫的資料來源,並且會同時操作主庫和套用庫的數據,需要解決以下兩個問題:
如何動態管理多個資料來源以及切換?
如何保證多資料來源場景下的數據一致性(事務)?
本文主要探討這兩個問題的解決方案,希望能對讀者有一定的啟發。
2. 資料來源切換原理
透過擴充套件Spring提供的抽象類
AbstractRoutingDataSource
,可以實作切換資料來源。其類結構如下圖所示:
targetDataSources&defaultTargetDataSource
計畫上需要使用的所有資料來源和預設資料來源。
resolvedDataSources&resolvedDefaultDataSource
當Spring容器建立
AbstractRoutingDataSource
物件時,透過呼叫
afterPropertiesSet
復制上述目標資料來源。由此可見,一旦資料來源例項物件建立完畢,業務無法再添加新的資料來源。
determineCurrentLookupKey
此方法為抽象方法,透過擴充套件這個方法來實作資料來源的切換。目標資料來源的結構為:
Map<Object, DataSource>
其key為
lookup key
。
我們來看官方對這個方法的註釋:
lookup key通常是繫結線上程上下文中,根據這個key去
resolvedDataSources
中取出DataSource。
根據目標資料來源的管理方式不同,可以使用基於配置檔和資料庫表兩種方式。基於配置檔管理方案無法後續添加新的資料來源,而基於資料庫表方案管理,則更加靈活。關註公z號:碼猿技術專欄,回復關鍵詞:1111 獲取阿裏內部Java效能調優手冊!
3. 配置檔解決方案
根據上面的分析,我們可以按照下面的步驟去實作:
定義
DynamicDataSource
類繼承
AbstractRoutingDataSource
,重寫
determineCurrentLookupKey()
方法。
配置多個資料來源註入
targetDataSources
和
defaultTargetDataSource
,透過
afterPropertiesSet()
方法將資料來源寫入
resolvedDataSources
和
resolvedDefaultDataSource
。
呼叫
AbstractRoutingDataSource
的
getConnection()
方法時,
determineTargetDataSource()
方法返回
DataSource
執行底層的
getConnection()
。
其流程如下圖所示:
3.1 建立資料來源
DynamicDataSource
資料來源的註入,目前業界主流實作步驟如下:
在配置檔中定義資料來源
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.driver className=com.mysql.jdbc.Driver
# 主資料來源
spring.datasource.druid.master.url=jdbcUrl
spring.datasource.druid.master.username=***
spring.datasource.druid.master.password=***
# 其他資料來源
spring.datasource.druid.second.url=jdbcUrl
spring.datasource.druid.second.username=***
spring.datasource.druid.second.password=***
在程式碼中配置Bean
@Configuration
public classDynamicDataSourceConfig{
@Bean
@ConfigurationProperties("spring.datasource.druid.master")
public DataSource firstDataSource(){
return DruidDataSourceBuilder.create().build();
}
@Bean
@ConfigurationProperties("spring.datasource.druid.second")
public DataSource secondDataSource(){
return DruidDataSourceBuilder.create().build();
}
@Bean
@Primary
public DynamicDataSource dataSource(DataSource firstDataSource, DataSource secondDataSource){
Map<Object, Object> targetDataSources = new HashMap<>(5);
targetDataSources.put(DataSourceNames.FIRST, firstDataSource);
targetDataSources.put(DataSourceNames.SECOND, secondDataSource);
returnnew DynamicDataSource(firstDataSource, targetDataSources);
}
}
3.2 AOP處理
透過
DataSourceAspect
切面技術來簡化業務上的使用,只需要在業務方法添加
@SwitchDataSource
註解即可完成動態切換:
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public@interface SwitchDataSource {
String value();
}
DataSourceAspect
攔截業務方法,更新當前執行緒上下文
DataSourceContextHolder
中儲存的key,即可實作資料來源切換。
3.3 方案不足
基於
AbstractRoutingDataSource
的多資料來源動態切換,有個明顯的缺點,無法動態添加和刪除資料來源。在我們的產品中,不能把套用資料來源寫死在配置檔。接下來分享一下基於資料庫表的實作方案。
4. 資料庫表解決方案
我們需要實作視覺化的資料來源管理,並即時檢視資料來源的執行狀態。所以我們不能把資料來源全部配置在檔中,應該將資料來源定義保存到資料庫表。參考
AbstractRoutingDataSource
的設計思路,實作自訂資料來源管理。
4.1 設計資料來源表
主庫的資料來源資訊仍然配置在計畫配置檔中,套用庫資料來源配置參數,則設計對應的數據表。表結構如下所示:
這個表主要就是
DataSource
的相關配置參數,其相應的ORM操作程式碼在此不再贅述,主要是實作資料來源的增刪改查操作。
4.2 自訂資料來源管理
4.2.1 定義管理介面
透過繼承
AbstractDataSource
即可實作
DynamicDataSource
。為了方便對資料來源進行操作,我們定義一個介面
DataSourceManager
,為業務提供操作資料來源的統一介面。
publicinterfaceDataSourceManager{
voidput(String var1, DataSource var2);
DataSource get(String var1);
Boolean hasDataSource(String var1);
voidremove(String var1);
voidcloseDataSource(String var1);
Collection<DataSource> all();
}
該介面主要是對數據表中定義的資料來源,提供基礎管理功能。
4.2.2 自訂資料來源
DynamicDataSource
的實作如下圖所示:
根據前面的分析,
AbstractRoutingDataSource
是在容器啟動的時候,執行
afterPropertiesSet
註入資料來源物件,完成之後無法對資料來源進行修改。
DynamicDataSource
則實作
DataSourceManager
介面,可以將數據表中的資料來源載入到dataSources。
4.2.3 切面處理
這一塊的處理跟配置檔資料來源方案處理方式相同,都是透過AOP技術切換lookup key。
public DataSource determineTargetDataSource(){
String lookupKey = DataSourceContextHolder.getKey();
DataSource dataSource = Optional.ofNullable(lookupKey)
.map(dataSources::get)
.orElse(defaultDataSource);
if (dataSource == null) {
thrownew IllegalStateException("Cannot determine DataSource for lookup key [" + lookupKey + "]");
}
return dataSource;
}
4.2.4 管理資料來源狀態
在計畫啟動的時候,載入數據表中的所有資料來源,並執行初始化。初始化操作主要是使用SpringBoot提供的
DataSourceBuilder
類,根據資料來源表的定義建立DataSource。在計畫執行過程中,可以使用定時任務對資料來源進行保活,為了提升效能再添加一層緩存。
AbstractRoutingDataSource
只支持單庫事務,切換資料來源是在開啟事務之前執行。Spring使用
DataSourceTransactionManager
進行事務管理。開啟事務,會將資料來源緩存到
DataSourceTransactionObject
物件中,後續的commit和 rollback事務操作實際上是使用的同一個資料來源。
如何解決切庫事務問題?借助Spring的聲明式事務處理,我們可以在多次切庫操作時強制開啟新的事務:
@SwitchDataSource
@Transactional(rollbackFor = Exception. class, propagation= Propagation.REQUIRES_NEW)
這樣的話,執行切庫操作的時候強制啟動新事務,便可實作多次切庫而且事務能夠生效。但是這種事務方式,存在數據一致性問題:
假若ServiceB正常執行送出事務,接著返回ServiceA執行並且發生異常。因為兩次處理是不同的事務,ServiceA這個事務執行回滾,而ServiceA事務已經送出。這樣的話,數據就不一致了。接下來,我們主要討論如何解決多庫的事務問題。
6. 多庫事務處理
6.1 關於事務的理解
首先有必要理解事務的本質。
1.提到Spring事務,就離不開事務的四大特性和隔離級別、七大傳播特性。
事務特性和離級別是屬於資料庫範疇。Spring事務的七大傳播特性是什麽呢?它是Spring在當前執行緒內,處理多個事務操作時的事務套用策略,資料庫事務本身並不存在傳播特性。
2.Spring事務的定義包括:begin、commit、rollback、close、suspend、resume等動作。
begin(事務開始): 可以認為存在於資料庫的命令中,比如Mysql的
start transaction
命令,但是在JDBC編程方式中不存在。
close(事務關閉):Spring事務的close()方法,是把
Connection
物件歸還給資料庫連線池,與事務無關。
suspend(事務掛起):Spring中事務掛起的語意是:需要新事務時,將現有的
Connection
保存起來(還有尚未送出的事務),然後建立新的
Connection2
,
Connection2
送出、回滾、關閉完畢後,再把
Connection1
取出來繼續執行。
resume(事務恢復): 巢狀事務執行完畢,返回上層事務重新繫結連線物件到事務管理器的過程。
實際上,只有commit、rollback、close是在JDBC真實存在的,而其他動作都是套用的語意,而非JDBC事務的真實命令。因此,事務真實存在的方法是:
setAutoCommit()
、
commit()
、
rollback()
。
close()語意為:
關閉一個資料庫連線,這已經不再是事務的方法了。
使用DataSource並不會執行物理關閉,只是歸還給連線池。
6.2 自訂管理事務
為了保證在多個資料來源中事務的一致性,我們可以手動管理
Connetion
的事務送出和回滾。考慮到不同
ORM框架
的事務管理實作差異,要求實作自訂事務管理不影響框架層的事務。
這可以透過使用裝飾器設計模式,對
Connection
進行包裝重寫commit和rolllback遮蔽其預設行為,這樣就不會影響到原生
Connection
和ORM框架的預設事務行為。其整體思路如下圖所示:
這裏並沒有使用前面提到的
@SwitchDataSource
,這是因為我們在
TransactionAop
中已經執行了lookupKey的切換。
6.2.1 定義多事務註解
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public@interface MultiTransaction {
String transactionManager()default "multiTransactionManager";
// 預設數據隔離級別,隨資料庫本身預設值
IsolationLevel isolationLevel()default IsolationLevel.DEFAULT;
// 預設為主庫資料來源
String datasourceId()default "default";
// 唯讀事務,若有更新操作會丟擲異常
booleanreadOnly()defaultfalse;
業務方法只需使用該註解即可開啟事務,
datasourceId
指定事務用到的資料來源,不指定預設為主庫。
6.2.3 包裝Connection
自訂事務我們使用包裝過的
Connection
,遮蔽其中的
commit&rollback
方法。這樣我們就可以在主事務裏進行統一的事務送出和回滾操作。
public classConnectionProxyimplementsConnection{
privatefinal Connection connection;
publicConnectionProxy(Connection connection){
this.connection = connection;
}
@Override
publicvoidcommit()throws SQLException {
// connection.commit();
}
publicvoidrealCommit()throws SQLException {
connection.commit();
}
@Override
publicvoidclose()throws SQLException {
//connection.close();
}
publicvoidrealClose()throws SQLException {
if (!connection.getAutoCommit()) {
connection.setAutoCommit(true);
}
connection.close();
}
@Override
publicvoidrollback()throws SQLException {
if(!connection.isClosed())
connection.rollback();
}
...
}
這裏
commit&close
方法不執行操作,rollback執行的前提是連線執行close才生效。這樣不管是使用哪個ORM框架,其自身事務管理都將失效。事務的控制就交由
MultiTransaction
控制了。
6.2.4 事務上下文管理
public classTransactionHolder{
// 是否開啟了一個MultiTransaction
privateboolean isOpen;
// 是否唯讀事務
privateboolean readOnly;
// 事務隔離級別
private IsolationLevel isolationLevel;
// 維護當前執行緒事務ID和連線關系
private ConcurrentHashMap<String, ConnectionProxy> connectionMap;
// 事務執行棧
private Stack<String> executeStack;
// 資料來源切換棧
private Stack<String> datasourceKeyStack;
// 主事務ID
private String mainTransactionId;
// 執行次數
private AtomicInteger transCount;
// 事務和資料來源key關系
private ConcurrentHashMap<String, String> executeIdDatasourceKeyMap;
}
每開啟一個事物,生成一個事務ID並繫結一個
ConnectionProxy
。事務巢狀呼叫,保存事務ID和lookupKey至棧中,當內層事務執行完畢執行pop。這樣的話,外層事務只需在棧中執行peek即可獲取事務ID和lookupKey。
6.2.5 資料來源相容處理
為了不影響原生事務的使用,需要重寫
getConnection
方法。當前執行緒沒有啟動自訂事務,則直接從資料來源中返回連線。
@Override
public Connection getConnection()throws SQLException {
TransactionHolder transactionHolder = MultiTransactionManager.TRANSACTION_HOLDER_THREAD_LOCAL.get();
if (Objects.isNull(transactionHolder)) {
return determineTargetDataSource().getConnection();
}
ConnectionProxy ConnectionProxy = transactionHolder.getConnectionMap()
.get(transactionHolder.getExecuteStack().peek());
if (ConnectionProxy == null) {
// 沒開跨庫事務,直接返回
return determineTargetDataSource().getConnection();
} else {
transactionHolder.addCount();
// 開了跨庫事務,從當前執行緒中拿包裝過的Connection
return ConnectionProxy;
}
}
6.2.6 切面處理
切面處理的核心邏輯是:維護一個巢狀事務棧,當業務方法執行結束,或者發生異常時,判斷當前棧頂事務ID是否為主事務ID。如果是的話這時候已經到了最外層事務,這時才執行送出和回滾。詳細流程如下圖所示:
package com.github.mtxn.transaction.aop;
@Aspect
@Component
@Slf4j
@Order(99999)
public classMultiTransactionAop{
@Pointcut("@annotation(com.github.mtxn.transaction.annotation.MultiTransaction)")
publicvoidpointcut(){
if (log.isDebugEnabled()) {
log.debug("start in transaction pointcut...");
}
}
@Around("pointcut()")
public Object aroundTransaction(ProceedingJoinPoint point)throws Throwable {
MethodSignature signature = (MethodSignature) point.getSignature();
// 從切面中獲取當前方法
Method method = signature.getMethod();
MultiTransaction multiTransaction = method.getAnnotation(MultiTransaction. class);
if (multiTransaction == null) {
return point.proceed();
}
IsolationLevel isolationLevel = multiTransaction.isolationLevel();
boolean readOnly = multiTransaction.readOnly();
String prevKey = DataSourceContextHolder.getKey();
MultiTransactionManager multiTransactionManager = Application.resolve(multiTransaction.transactionManager());
// 切資料來源,如果失敗使用預設庫
if (multiTransactionManager.switchDataSource(point, signature, multiTransaction)) return point.proceed();
// 開啟事務棧
TransactionHolder transactionHolder = multiTransactionManager.startTransaction(prevKey, isolationLevel, readOnly, multiTransactionManager);
Object proceed;
try {
proceed = point.proceed();
multiTransactionManager.commit();
} catch (Throwable ex) {
log.error("execute method:{}#{},err:", method.getDeclaring class(), method.getName(), ex);
multiTransactionManager.rollback();
throw ExceptionUtils.api(ex, "系統異常:%s", ex.getMessage());
} finally {
// 當前事務結束出棧
String transId = multiTransactionManager.getTrans().getExecuteStack().pop();
transactionHolder.getDatasourceKeyStack().pop();
// 恢復上一層事務
DataSourceContextHolder.setKey(transactionHolder.getDatasourceKeyStack().peek());
// 最後回到主事務,關閉此次事務
multiTransactionManager.close(transId);
}
return proceed;
}
}
7.總結
本文主要介紹了多資料來源管理的解決方案(套用層事務,而非XA二段送出保證),以及對多個庫同時操作的事務管理。
需要註意的是,這種方式只適用於單體架構的套用。因為多個庫的事務參與者都是執行在同一個JVM進行。如果是在微服務架構的套用中,則需要使用分布式事務管理(譬如:Seata)。
<END>