當前位置: 妍妍網 > 碼農

SpringBoot + 事務勾點函式,打造高效支付系統!

2024-05-06碼農

大家好,我是鵬磊。

經過前面對Spring AOP、事務的總結,我們已經對它們有了一個比較感性的認知了。今天,我繼續安利一個 獨門絕技 :Spring 事務的勾點函式。單純的講技術可能比較枯燥乏味。接下來,我將以一個實際的案例來描述Spring事務勾點函式的正確使用姿勢。

一、案例背景

拿支付系統相關的業務來舉例。在支付系統中,我們需要記錄每個帳戶的資金流水(記錄使用者A因為哪個操作扣了錢,因為哪個操作加了錢),這樣我們才能對每個帳戶的 做到心中有數,對於支付系統而言,資金流水的數據可謂是 最重要 的。因此,為了防止支付系統的老大 徇私舞弊 ,CTO提了一個 流水存檔 的需求:要求支付系統對每個帳戶的資金流水做一份存檔,要求支付系統在寫流水的時候,把流水相關的資訊以訊息的形式推播到kafka,由 存檔系統 消費這個訊息並落地到柯瑞(這個庫只有 存檔系統 擁有寫許可權)。整個需求的流程如下所示:

整個需求的流程還是比較簡單的,考慮到後續會有其他事業部也要進行數據存檔操作,CTO建議支付系統團隊內部開發一個二方庫,這個二方庫的主要功能就是發送訊息到kafka中去。

二、確定方案

既然要求開發一個二方庫,因此,我們需要考慮如下幾件事情:

1、技術棧使用的springboot,因此,這裏最好以starter的方式提供

2、二方庫需要發送訊息給kafka,最好是二方庫內部基於kafka生產者的api建立生產者,不要使用Spring內建的kafkaTemplate,因為整合方有可能已經使用了kafkaTemplate。不能與整合方造成沖突。

3、減少對接方的整合難度、學習成本,最好是提供一個簡單實用的api,業務側能簡單上手。

4、 發送訊息這個操作需要支持事務,盡量不影響主業務

在上述的幾件事情中,最需要註意的應該就是第4點: 發送訊息這個操作需要支持事務,盡量不影響主業務 。這是什麽意思呢?首先,盡量不影響主業務,這個最簡單的方式就是使用 異步 機制。

如果你近期準備面試跳槽,建議在ddkk.com線上刷題,涵蓋 一萬+ 道 Java 面試題,幾乎覆蓋了所有主流技術面試題,還有市面上最全的技術五百套,精品系列教程,免費提供。

其次,需要支持事務是指: 假設我們的api是在事務方法內部呼叫的,那麽我們需要保證事務送出後再執行這個api 。那麽,我們的流水落地api應該要有這樣的功能:

內部可以判斷當前是否存在事務,如果存在事務,則需要等事務送出後再異步發送訊息給kafka。如果不存在事務則直接異步發送訊息給kafka。而且這樣的判斷邏輯得放在二方庫內部才行。那現在擺在我們面前的問題就是: 我要如何判斷當前是否存在事務,以及如何在事務送出後再觸發我們自訂的邏輯呢?

三、TransactionSynchronizationManager顯神威

這個類內部所有的變量、方法都是static修飾的,也就是說它其實是一個工具類。是一個 事務同步器 。下述是 流水落地API 的虛擬碼,這段程式碼就解決了我們上述提到的疑問:

privatefinal ExecutorService executor = Executors.newSingleThreadExecutor();
publicvoidsendLog(){
// 判斷當前是否存在事務
if (!TransactionSynchronizationManager.isSynchronizationActive()) {
// 無事務,異步發送訊息給kafka
executor.submit(() -> {
// 發送訊息給kafka
try {
// 發送訊息給kafka
catch (Exception e) {
// 記錄異常資訊,發信件或者進入待處理列表,讓開發人員感知異常
}
});
return;
}
// 有事務,則添加一個事務同步器,並重寫afterCompletion方法(此方法在事務送出後會做回呼)
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
publicvoidafterCompletion(int status){
if (status == TransactionSynchronization.STATUS_COMMITTED) {
// 事務送出後,再異步發送訊息給kafka
executor.submit(() -> {
try {
// 發送訊息給kafka
catch (Exception e) {
// 記錄異常資訊,發信件或者進入待處理列表,讓開發人員感知異常
}
});
}
}
});
}



程式碼比較簡單,其主要是 TransactionSynchronizationManager 的使用。

3.1、判斷是否存在事務?TransactionSynchronizationManager.isSynchronizationActive() 方法顯神威

  • 我們先看下這個方法的源碼:

  • // TransactionSynchronizationManager.java類內部的部份程式碼
    privatestaticfinal ThreadLocal<Set<TransactionSynchronization>> synchronizations =
    new NamedThreadLocal<>("Transaction synchronizations");
    publicstaticbooleanisSynchronizationActive(){
    return (synchronizations.get() != null);
    }

    很明顯,synchronizations是一個執行緒變量(ThreadLocal)。那它是在什麽時候set進去的呢?這裏的話,可以參考下這個方法:org.springframework.transaction.support.TransactionSynchronizationManager\#initSynchronization,其源碼如下所示:

    /**
    * Activate transaction synchronization for the current thread.
    * Called by a transaction manager on transaction begin.
    @throws IllegalStateException if synchronization is already active
    */

    publicstaticvoidinitSynchronization()throws IllegalStateException {
    if (isSynchronizationActive()) {
    thrownew IllegalStateException("Cannot activate transaction synchronization - already active");
    }
    logger.trace("Initializing transaction synchronization");
    synchronizations.set(new LinkedHashSet<>());
    }

    由源碼中的註釋也可以知道,它是在事務管理器開啟事務時呼叫的。換句話說,只要我們的程式執行到帶有事務特性的方法時,就會線上程變量中放入一個LinkedHashSet,用來標識當前存在事務。只要isSynchronizationActive返回true,則代表當前有事務。因此,結合這兩個方法我們是指能解決我們最開始提出的疑問: 要如何判斷當前是否存在事務

    3.2、如何在事務送出後觸發自訂邏輯?TransactionSynchronizationManager.registerSynchronization()方法顯神威

  • 我們來看下這個方法的原始碼:

  • /**
    * Register a new transaction synchronization for the current thread.
    * Typically called by resource management code.
    * <p>Note that synchronizations can implement the
    * {@link org.springframework.core.Ordered} interface.
    * They will be executed in an order according to their order value (if any).
    @param synchronization the synchronization object to register
    @throws IllegalStateException if transaction synchronization is not active
    @see org.springframework.core.Ordered
    */

    publicstaticvoidregisterSynchronization(TransactionSynchronization synchronization)
    throws IllegalStateException 
    {
    Assert.notNull(synchronization, "TransactionSynchronization must not be null");
    if (!isSynchronizationActive()) {
    thrownew IllegalStateException("Transaction synchronization is not active");
    }
    synchronizations.get().add(synchronization);
    }

    這裏又使用到了synchronizations執行緒變量,我們在判斷是否存在事務時,就是判斷這個執行緒變量內部是否有值。那我們現在想在 事務送出後觸發自訂邏輯 和這個有什麽關系呢?

    如果你近期準備面試跳槽,建議在ddkk.com線上刷題,涵蓋 一萬+ 道 Java 面試題,幾乎覆蓋了所有主流技術面試題,還有市面上最全的技術五百套,精品系列教程,免費提供。

    我們在上面構建 流水落地api 的虛擬碼中有向synchronizations內部添加了一個TransactionSynchronizationAdapter,內部並重寫了afterCompletion方法,其程式碼如下所示:

    TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
    @Override
    publicvoidafterCompletion(int status){
    if (status == TransactionSynchronization.STATUS_COMMITTED) {
    // 事務送出後,再異步發送訊息給kafka
    executor.submit(() -> {
    try {
    // 發送訊息給kafka
    catch (Exception e) {
    // 記錄異常資訊,發信件或者進入待處理列表,讓開發人員感知異常
    }
    });
    }
    }
    });

    我們結合registerSynchronization的源碼來看,其實這段程式碼主要就是向執行緒變量內部的LinkedHashSet添加了一個物件而已,但就是這麽一個操作,讓Spring在事務執行的過程中變得 「有事情可做」 。這是什麽意思呢?是因為Spring在執行事務方法時,對於操作事務的每一個階段都有一個回呼操作,比如:trigger系列的回呼

    invoke系列的回呼

    而我們現在的需求就是在事務送出後觸發自訂的函式,那就是在invokeAfterCommit和invokeAfterCompletion這兩個方法來選了。首先,這兩個方法都會拿到所有TransactionSynchronization的集合(其中會包括我們上述添加的TransactionSynchronizationAdapter)。但是要註意一點:invokeAfterCommit只能拿到集合,invokeAfterCompletion除了集合還有一個int型別的參數,而這個int型別的參數其實是當前事務的一種狀態。也就是說,如果我們重寫了invokeAfterCompletion方法,我們除了能拿到集合外,還能拿到當前事務的狀態。因此,此時我們可以根據這個狀態來做不同的事情,比如:可以在事務送出時做自訂處理,也可以在事務回滾時做自訂處理等等。

    來源:juejin.cn/post/6984574787511123999

    四、總結

    上面有說到,我們 判斷當前是否存在事務、添加勾點函式 都是依賴執行緒變量的。因此,我們在使用過程中,一定要 避免切換執行緒 。否則會出現 不生效 的情況。

    🔥 磊哥私藏精品 熱門推薦 🔥