當前位置: 妍妍網 > 碼農

SpringBoot 多資料來源及事務解決方案

2024-03-31碼農

來自: https://blog.csdn.net/qq381332153

1. 背景

一個主庫和N個套用庫的資料來源,並且會同時操作主庫和套用庫的數據,需要解決以下兩個問題:

  • 如何動態管理多個資料來源以及切換?

  • 如何保證多資料來源場景下的數據一致性(事務)?

  • 本文主要探討這兩個問題的解決方案,希望能對讀者有一定的啟發。

    2. 資料來源切換原理

    透過擴充套件Spring提供的抽象類 AbstractRoutingDataSource ,可以實作切換資料來源。其類結構如下圖所示:

  • targetDataSources&defaultTargetDataSource

  • 計畫上需要使用的所有資料來源和預設資料來源。

  • resolvedDataSources&resolvedDefaultDataSource

  • 當Spring容器建立 AbstractRoutingDataSource 物件時,透過呼叫 afterPropertiesSet 復制上述目標資料來源。由此可見,一旦資料來源例項物件建立完畢,業務無法再添加新的資料來源。

  • determineCurrentLookupKey

  • 此方法為抽象方法,透過擴充套件這個方法來實作資料來源的切換。目標資料來源的結構為: Map<Object, DataSource> 其key為 lookup key

    我們來看官方對這個方法的註釋:

    lookup key通常是繫結線上程上下文中,根據這個key去 resolvedDataSources 中取出DataSource。

    根據目標資料來源的管理方式不同,可以使用基於配置檔和資料庫表兩種方式。基於配置檔管理方案無法後續添加新的資料來源,而基於資料庫表方案管理,則更加靈活。關註公z號:碼猿技術專欄,回復關鍵詞:1111 獲取阿裏內部Java效能調優手冊!

    3. 配置檔解決方案

    根據上面的分析,我們可以按照下面的步驟去實作:

  • 定義 DynamicDataSource 類繼承 AbstractRoutingDataSource ,重寫 determineCurrentLookupKey() 方法。

  • 配置多個資料來源註入 targetDataSources defaultTargetDataSource ,透過 afterPropertiesSet() 方法將資料來源寫入 resolvedDataSources resolvedDefaultDataSource

  • 呼叫 AbstractRoutingDataSource getConnection() 方法時, determineTargetDataSource() 方法返回 DataSource 執行底層的 getConnection()

  • 其流程如下圖所示:

    3.1 建立資料來源

    DynamicDataSource 資料來源的註入,目前業界主流實作步驟如下:

    在配置檔中定義資料來源

    spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
    spring.datasource.driver className=com.mysql.jdbc.Driver
    # 主資料來源
    spring.datasource.druid.master.url=jdbcUrl
    spring.datasource.druid.master.username=***
    spring.datasource.druid.master.password=***
    # 其他資料來源
    spring.datasource.druid.second.url=jdbcUrl
    spring.datasource.druid.second.username=***
    spring.datasource.druid.second.password=***

    在程式碼中配置Bean

    @Configuration
    public classDynamicDataSourceConfig{
    @Bean
    @ConfigurationProperties("spring.datasource.druid.master")
    public DataSource firstDataSource(){
    return DruidDataSourceBuilder.create().build();
    }
    @Bean
    @ConfigurationProperties("spring.datasource.druid.second")
    public DataSource secondDataSource(){
    return DruidDataSourceBuilder.create().build();
    }
    @Bean
    @Primary
    public DynamicDataSource dataSource(DataSource firstDataSource, DataSource secondDataSource){
    Map<Object, Object> targetDataSources = new HashMap<>(5);
    targetDataSources.put(DataSourceNames.FIRST, firstDataSource);
    targetDataSources.put(DataSourceNames.SECOND, secondDataSource);
    returnnew DynamicDataSource(firstDataSource, targetDataSources);
    }
    }

    3.2 AOP處理

    透過 DataSourceAspect 切面技術來簡化業務上的使用,只需要在業務方法添加 @SwitchDataSource 註解即可完成動態切換:

    @Documented
    @Retention(RetentionPolicy.RUNTIME)
    @Target({ElementType.METHOD})
    public@interface SwitchDataSource {
    String value();
    }

    DataSourceAspect 攔截業務方法,更新當前執行緒上下文 DataSourceContextHolder 中儲存的key,即可實作資料來源切換。

    3.3 方案不足

    基於 AbstractRoutingDataSource 的多資料來源動態切換,有個明顯的缺點,無法動態添加和刪除資料來源。在我們的產品中,不能把套用資料來源寫死在配置檔。接下來分享一下基於資料庫表的實作方案。

    4. 資料庫表解決方案

    我們需要實作視覺化的資料來源管理,並即時檢視資料來源的執行狀態。所以我們不能把資料來源全部配置在檔中,應該將資料來源定義保存到資料庫表。參考 AbstractRoutingDataSource 的設計思路,實作自訂資料來源管理。

    4.1 設計資料來源表

    主庫的資料來源資訊仍然配置在計畫配置檔中,套用庫資料來源配置參數,則設計對應的數據表。表結構如下所示:

    這個表主要就是 DataSource 的相關配置參數,其相應的ORM操作程式碼在此不再贅述,主要是實作資料來源的增刪改查操作。

    4.2 自訂資料來源管理

    4.2.1 定義管理介面

    透過繼承 AbstractDataSource 即可實作 DynamicDataSource 。為了方便對資料來源進行操作,我們定義一個介面 DataSourceManager ,為業務提供操作資料來源的統一介面。

    publicinterfaceDataSourceManager{
    voidput(String var1, DataSource var2);
    DataSource get(String var1);
    Boolean hasDataSource(String var1);
    voidremove(String var1);
    voidcloseDataSource(String var1);
    Collection<DataSource> all();
    }



    該介面主要是對數據表中定義的資料來源,提供基礎管理功能。

    4.2.2 自訂資料來源

    DynamicDataSource 的實作如下圖所示:

    根據前面的分析, AbstractRoutingDataSource 是在容器啟動的時候,執行 afterPropertiesSet 註入資料來源物件,完成之後無法對資料來源進行修改。 DynamicDataSource 則實作 DataSourceManager 介面,可以將數據表中的資料來源載入到dataSources。

    4.2.3 切面處理

    這一塊的處理跟配置檔資料來源方案處理方式相同,都是透過AOP技術切換lookup key。

    public DataSource determineTargetDataSource(){
    String lookupKey = DataSourceContextHolder.getKey();
    DataSource dataSource = Optional.ofNullable(lookupKey)
    .map(dataSources::get)
    .orElse(defaultDataSource);
    if (dataSource == null) {
    thrownew IllegalStateException("Cannot determine DataSource for lookup key [" + lookupKey + "]");
    }
    return dataSource;
    }

    4.2.4 管理資料來源狀態

    在計畫啟動的時候,載入數據表中的所有資料來源,並執行初始化。初始化操作主要是使用SpringBoot提供的 DataSourceBuilder 類,根據資料來源表的定義建立DataSource。在計畫執行過程中,可以使用定時任務對資料來源進行保活,為了提升效能再添加一層緩存。

    AbstractRoutingDataSource 只支持單庫事務,切換資料來源是在開啟事務之前執行。Spring使用 DataSourceTransactionManager 進行事務管理。開啟事務,會將資料來源緩存到 DataSourceTransactionObject 物件中,後續的commit和 rollback事務操作實際上是使用的同一個資料來源。

    如何解決切庫事務問題?借助Spring的聲明式事務處理,我們可以在多次切庫操作時強制開啟新的事務:

    @SwitchDataSource
    @Transactional(rollbackFor = Exception. classpropagation= Propagation.REQUIRES_NEW)

    這樣的話,執行切庫操作的時候強制啟動新事務,便可實作多次切庫而且事務能夠生效。但是這種事務方式,存在數據一致性問題:

    假若ServiceB正常執行送出事務,接著返回ServiceA執行並且發生異常。因為兩次處理是不同的事務,ServiceA這個事務執行回滾,而ServiceA事務已經送出。這樣的話,數據就不一致了。接下來,我們主要討論如何解決多庫的事務問題。

    6. 多庫事務處理

    6.1 關於事務的理解

    首先有必要理解事務的本質。

    1.提到Spring事務,就離不開事務的四大特性和隔離級別、七大傳播特性。

    事務特性和離級別是屬於資料庫範疇。Spring事務的七大傳播特性是什麽呢?它是Spring在當前執行緒內,處理多個事務操作時的事務套用策略,資料庫事務本身並不存在傳播特性。

    2.Spring事務的定義包括:begin、commit、rollback、close、suspend、resume等動作。

  • begin(事務開始): 可以認為存在於資料庫的命令中,比如Mysql的 start transaction 命令,但是在JDBC編程方式中不存在。

  • close(事務關閉):Spring事務的close()方法,是把 Connection 物件歸還給資料庫連線池,與事務無關。

  • suspend(事務掛起):Spring中事務掛起的語意是:需要新事務時,將現有的 Connection 保存起來(還有尚未送出的事務),然後建立新的 Connection2 Connection2 送出、回滾、關閉完畢後,再把 Connection1 取出來繼續執行。

  • resume(事務恢復): 巢狀事務執行完畢,返回上層事務重新繫結連線物件到事務管理器的過程。

  • 實際上,只有commit、rollback、close是在JDBC真實存在的,而其他動作都是套用的語意,而非JDBC事務的真實命令。因此,事務真實存在的方法是: setAutoCommit() commit() rollback()

    close()語意為:

  • 關閉一個資料庫連線,這已經不再是事務的方法了。

  • 使用DataSource並不會執行物理關閉,只是歸還給連線池。

    6.2 自訂管理事務

    為了保證在多個資料來源中事務的一致性,我們可以手動管理 Connetion 的事務送出和回滾。考慮到不同 ORM框架 的事務管理實作差異,要求實作自訂事務管理不影響框架層的事務。

    這可以透過使用裝飾器設計模式,對 Connection 進行包裝重寫commit和rolllback遮蔽其預設行為,這樣就不會影響到原生 Connection 和ORM框架的預設事務行為。其整體思路如下圖所示:

    這裏並沒有使用前面提到的 @SwitchDataSource ,這是因為我們在 TransactionAop 中已經執行了lookupKey的切換。

    6.2.1 定義多事務註解

    @Target({ElementType.METHOD})
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    public@interface MultiTransaction {
    String transactionManager()default "multiTransactionManager";
    // 預設數據隔離級別,隨資料庫本身預設值
    IsolationLevel isolationLevel()default IsolationLevel.DEFAULT;
    // 預設為主庫資料來源
    String datasourceId()default "default";
    // 唯讀事務,若有更新操作會丟擲異常
    booleanreadOnly()defaultfalse;

    業務方法只需使用該註解即可開啟事務, datasourceId 指定事務用到的資料來源,不指定預設為主庫。

    6.2.3 包裝Connection

    自訂事務我們使用包裝過的 Connection ,遮蔽其中的 commit&rollback 方法。這樣我們就可以在主事務裏進行統一的事務送出和回滾操作。

    public classConnectionProxyimplementsConnection{
    privatefinal Connection connection;
    publicConnectionProxy(Connection connection){
    this.connection = connection;
    }
    @Override
    publicvoidcommit()throws SQLException {
    // connection.commit();
    }
    publicvoidrealCommit()throws SQLException {
    connection.commit();
    }
    @Override
    publicvoidclose()throws SQLException {
    //connection.close();
    }
    publicvoidrealClose()throws SQLException {
    if (!connection.getAutoCommit()) {
    connection.setAutoCommit(true);
    }
    connection.close();
    }
    @Override
    publicvoidrollback()throws SQLException {
    if(!connection.isClosed())
    connection.rollback();
    }
    ...
    }





    這裏 commit&close 方法不執行操作,rollback執行的前提是連線執行close才生效。這樣不管是使用哪個ORM框架,其自身事務管理都將失效。事務的控制就交由 MultiTransaction 控制了。

    6.2.4 事務上下文管理

    public classTransactionHolder{
    // 是否開啟了一個MultiTransaction
    privateboolean isOpen;
    // 是否唯讀事務
    privateboolean readOnly;
    // 事務隔離級別
    private IsolationLevel isolationLevel;
    // 維護當前執行緒事務ID和連線關系
    private ConcurrentHashMap<String, ConnectionProxy> connectionMap;
    // 事務執行棧
    private Stack<String> executeStack;
    // 資料來源切換棧
    private Stack<String> datasourceKeyStack;
    // 主事務ID
    private String mainTransactionId;
    // 執行次數
    private AtomicInteger transCount;
    // 事務和資料來源key關系
    private ConcurrentHashMap<String, String> executeIdDatasourceKeyMap;
    }

    每開啟一個事物,生成一個事務ID並繫結一個 ConnectionProxy 。事務巢狀呼叫,保存事務ID和lookupKey至棧中,當內層事務執行完畢執行pop。這樣的話,外層事務只需在棧中執行peek即可獲取事務ID和lookupKey。

    6.2.5 資料來源相容處理

    為了不影響原生事務的使用,需要重寫 getConnection 方法。當前執行緒沒有啟動自訂事務,則直接從資料來源中返回連線。

    @Override
    public Connection getConnection()throws SQLException {
    TransactionHolder transactionHolder = MultiTransactionManager.TRANSACTION_HOLDER_THREAD_LOCAL.get();
    if (Objects.isNull(transactionHolder)) {
    return determineTargetDataSource().getConnection();
    }
    ConnectionProxy ConnectionProxy = transactionHolder.getConnectionMap()
    .get(transactionHolder.getExecuteStack().peek());
    if (ConnectionProxy == null) {
    // 沒開跨庫事務,直接返回
    return determineTargetDataSource().getConnection();
    else {
    transactionHolder.addCount();
    // 開了跨庫事務,從當前執行緒中拿包裝過的Connection
    return ConnectionProxy;
    }
    }

    6.2.6 切面處理

    切面處理的核心邏輯是:維護一個巢狀事務棧,當業務方法執行結束,或者發生異常時,判斷當前棧頂事務ID是否為主事務ID。如果是的話這時候已經到了最外層事務,這時才執行送出和回滾。詳細流程如下圖所示:

    package com.github.mtxn.transaction.aop;
    @Aspect
    @Component
    @Slf4j
    @Order(99999)
    public classMultiTransactionAop{
    @Pointcut("@annotation(com.github.mtxn.transaction.annotation.MultiTransaction)")
    publicvoidpointcut(){
    if (log.isDebugEnabled()) {
    log.debug("start in transaction pointcut...");
    }
    }

    @Around("pointcut()")
    public Object aroundTransaction(ProceedingJoinPoint point)throws Throwable {
    MethodSignature signature = (MethodSignature) point.getSignature();
    // 從切面中獲取當前方法
    Method method = signature.getMethod();
    MultiTransaction multiTransaction = method.getAnnotation(MultiTransaction. class);
    if (multiTransaction == null) {
    return point.proceed();
    }
    IsolationLevel isolationLevel = multiTransaction.isolationLevel();
    boolean readOnly = multiTransaction.readOnly();
    String prevKey = DataSourceContextHolder.getKey();
    MultiTransactionManager multiTransactionManager = Application.resolve(multiTransaction.transactionManager());
    // 切資料來源,如果失敗使用預設庫
    if (multiTransactionManager.switchDataSource(point, signature, multiTransaction)) return point.proceed();
    // 開啟事務棧
    TransactionHolder transactionHolder = multiTransactionManager.startTransaction(prevKey, isolationLevel, readOnly, multiTransactionManager);
    Object proceed;
    try {
    proceed = point.proceed();
    multiTransactionManager.commit();
    catch (Throwable ex) {
    log.error("execute method:{}#{},err:", method.getDeclaring class(), method.getName(), ex);
    multiTransactionManager.rollback();
    throw ExceptionUtils.api(ex, "系統異常:%s", ex.getMessage());
    finally {
    // 當前事務結束出棧
    String transId = multiTransactionManager.getTrans().getExecuteStack().pop();
    transactionHolder.getDatasourceKeyStack().pop();
    // 恢復上一層事務
    DataSourceContextHolder.setKey(transactionHolder.getDatasourceKeyStack().peek());
    // 最後回到主事務,關閉此次事務
    multiTransactionManager.close(transId);
    }
    return proceed;
    }

    }



    7.總結

    本文主要介紹了多資料來源管理的解決方案(套用層事務,而非XA二段送出保證),以及對多個庫同時操作的事務管理。

    需要註意的是,這種方式只適用於單體架構的套用。因為多個庫的事務參與者都是執行在同一個JVM進行。如果是在微服務架構的套用中,則需要使用分布式事務管理(譬如:Seata)。

    <END>