當前位置: 妍妍網 > 碼農

不可思議!億級數據竟然如此輕松同步至ES!

2024-04-25碼農

架構師(JiaGouX)

我們都是架構師!
架構未來,你來不來?

  • 1 這是一個背景

  • 2 捋一捋訂單數據同步到ES中的復雜度

  • 2.1 數據同步ES索引流程

  • 2.2 來梳理下是否有難點?

  • 3 神奇的服務

  • 3.1 ECP的簡單執行流程

  • 3.2 多資料來源數據讀取

  • 3.3 SQL的解析與校驗

  • 3.4 動態限流的實作

  • 3.5 重試策略與故障感知

  • 3.6 將數據推播給哪個服務來處理?-SPI機制

  • 3.7 環境隔離

  • 3.8 探活與任務故障恢復機制

  • 3.9 平滑遷移的實作

  • 3.10 優雅的日誌記錄

  • 4 總結


  • 1 這是一個背景

    最近接了一個需求,要提供一個隨意組合多個條件來查詢訂單數據的功能,看著資料庫裏過億的訂單量,頭發不爭氣的又脫落了兩根代表這個需求不簡單

    脫落的兩根頭發,不是技術實作上很難,其實技術實作上清晰明了,就是透過數據異構,將數據同步到ES,利用ES的倒排索引、緩存等能力,提供多條件復雜查詢的能力,而ES集群我們已經有了

    但有些數據,在目前的ES索引中是不存在的,也就是說,我需要將過億的訂單數據從訂單資料庫重新刷一遍到ES中,而這一頓操作下來得需要一周的時間!

    什麽?你不信,那咱們來捋一捋

    2 捋一捋訂單數據同步到ES中的復雜度

    2.1 數據同步ES索引流程

    如上圖所示,就是將數據同步到ES索引的過程。

    首先需要從訂單資料庫查詢所有的訂單數據,然後根據訂單數據上保存的使用者ID,商品ID等資訊從使用者服務,商品服務查詢相關資訊,經過處理與組裝後落到ES集群中。

    之所以要查詢使用者資訊和商品資訊,是因為異構在ES索引中的訂單數據,並不會與mysql中的數據一一對應,有很多根據商品類目,使用者資訊等查詢訂單資訊的訴求存在,因此在這裏就需要查詢很多的上遊服務來組裝資訊

    2.2 來梳理下是否有難點?

    1. 從資料庫把上億的訂單數據出來。這個操作不能影響到線上業務,因此查詢的訂單資料庫一般是從庫,OK,配置多資料來源來讀取數據吧,而且上億的訂單一般采用的都是分庫分表來儲存的,我們是分了16個庫,每個庫16個表,總共256張表,嘿嘿

    2. 上億的訂單數據不能一次性全部讀取到記憶體吧,不然記憶體冒煙都存不下啊。所以得考慮分頁,分頁直接limit也不好,隨著數據量越大,速度越慢,所以得考慮一個遊標,嗯,選一個欄位當遊標吧,遊標最好唯一且遞增

    3. 從多個服務獲取數據,這些數據所在的服務一般都屬於公司的其它部門,讀取數據的時候也不能影響到人家的服務吧,你這裏查詢的是嘎嘎猛,一看人家的服務都崩了,這個黑鍋就飛來了。所以這裏得考慮限流吧,得考慮隔離吧?不說全鏈路隔離,成本太高,起碼關鍵服務得隔離一下

    4. 數據同步一段時間,產品來問,同步多久了啊,大概還有多久能完成啊,數據量大概是多少啊,一臉懵,不知道啊。

    5. 如果中途同步失敗了,咋處理啊,是不是得重試,咋重試,重試策略是啥?失敗有沒有報警,能不能及時感知並處理啊?如果同步一段時間中斷了咋整啊?有沒有記錄從哪中斷的?能否從中斷處繼續同步啊,不然從頭開始又得N天,哭了

    6. 同步了一部份,發現有問題需要暫停一會,咋整?

    7. 如果只想同步部份數據不一致的訂單數據,可能就2,3個訂單,咋整,是不是還得提供按照手動輸入訂單ID同步ES數據的能力?

    8. 同步過程是咋樣的?開始時間?結束時間?共耗時多久?操作人是誰?這些統計數據從哪來?

    9. 想夜深人靜的時候同步數據,這有時候對業務的影響小,定個鬧鐘晚上起?

    10. 現在不單需要同步訂單的數據了,還需要同步商品ES集群的數據,這些邏輯還得重新寫一遍?

    啊啊啊啊,想想都頭疼啊

    所以,一些事情看著簡單,其實並沒有那麽簡單

    3 神奇的服務

    為了讓頭發更有歸屬感,針對上述的難點開發了一款神奇的服務,那就是ECP。它可以將整個流程自動化、視覺化的處理,降低數據異構到ES的成本 任務界面如下所示:

    3.1 ECP的簡單執行流程

    簡單來說,ECP的作用就是將數據從資料來源讀取出來,然後推播給ES寫服務。因為數據處理的邏輯因不同的業務而異,ES寫服務由各個對接方來實作,因此一個簡單的流程如下圖:

    這裏面涉及到一些技術細節,比如如何進行多資料來源數據讀取,資料來源配置,sql校驗,動態限流、SPI機制、重試策略與故障感知、探活與故障恢復,環境隔離等等。

    下面一一介紹下

    3.2 多資料來源數據讀取

    ECP支持目前支持三個資料來源數據的讀取,分別為ID源,文本源、以及指令碼源

    3.2.1 ID源

    有個文字域用來輸入ID。這種場景適用於小數據的數據同步,比如發現一些資料庫和ES的數據不一致了,就簡單的刷一下數據

    3.2.2 檔源

    檔源指的是資料來源來源於文字檔案,適合中等數據的同步。ECP和物件儲存進行了對接,使用者可以上傳檔至物件儲存,在任務執行時,ECP會讀取物件儲存中的文本數據。

    這種情況需要註意的是,使用者上傳的檔有可能會比較大,直接都讀取到記憶體再處理不現實,因此這裏采用的是流的方式進行讀取,讀取一批次處理一批,再釋放一批,不會造成OOM

    簡化的處理方式如下:


    try (Response response = OK_HTTP_CLIENT.newCall(request).execute()) {
    if (!response.isSuccessful()) {
    throw new IOException("Unexpected code " + response);
    }
    // 以流的方式讀取檔數據
    InputStream inputStream = response.body().byteStream();
    BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
    }

    3.2.3 指令碼源

    指令碼源適用於大數據量的數據同步。

    指令碼本質上就是SQL和資料來源的結合。

    使用者在ECP中配置資料庫的連線資訊,然後配置SQL。ECP會執行該SQL,將數據從配置的資料庫中讀取出來,推播到ES寫服務中。

    指令碼源可以支持上億數據的讀取與推播,如下圖為訂單庫(分庫分表)配置的指令碼資訊:

    3.2.4 指令碼源大數據讀取的實作

    將幾億數據讀取到記憶體中來處理顯然不可能,因此采用局部數據的讀取與處理才是正道。

    在業務中,經常使用的是分頁,但分頁如果僅是使用limit offset,size,待offset的值比較大時,效能會急劇下降,形成慢SQL,甚至拖累整個資料庫的效能。

    因此在分頁數量比較大時,需要指定一個有索引的欄位作為遊標,該遊標可以提高分頁的效能,如在訂單表中,若在訂單ID是遞增的且有設定了索引,SQL就可以這麽寫: select * from t_order where order_id > xxx order by order_id desc limit 10 ; 利用order_id值的變化就可以起到分頁的效果

    這種方式雖好,但讓使用者選定遊標索引無疑增加了使用的門檻,因此ECP沒有采用上述分頁的形式來讀取大數據,而是采用JDBC遊標查詢的方式,如下所示:

    // 建立連線
    conn = DriverManager.getConnection(url, param.getDsUsername(), param.getDsPassword());
    // 建立查詢
    stmt = conn.createStatement(java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
    stmt.setFetchSize(param.getFetchSize());

    遊標查詢每次讀取fetchSize大小的數據量,可以很好的避免讀取大數據量導致的OOM問題

    3.3 SQL的解析與校驗

    使用者配置SQL指令碼,ECP需要對該SQL指令碼進行校驗與修改,傳統的字串處理(比如正則)雖然在一定情況下可以滿足需求,但是容易出錯。因此ECP采用的是Druid的SQL解析工具包,可以將SQL解析成AST語法樹,以便對SQL進行各種處理。如下圖所示:

    ECP提供的數據樣例查詢,會對SQL自動拼接上limit 1

    3.4 動態限流的實作

    限流分集群限流和單機限流,經過評估,在能簡單就簡單的原則下,我們采用的是單機限流,限流元件使用的是guava的RateLimiter

    當在頁面上修改QPS的值時,會將該值同步到資料庫中,有個排程任務會不斷地掃描該值的變動,將變動的值同步到RateLimiter元件中

    當然,也可以采用數據監聽的策略(比如廣播MQ),讓變動值同步到RateLimiter更及時,但這種方式還需引入其它元件,復雜度嗷嗷上升,不符合我們簡單實作的策略

    動態限流的實作流程如下;

    如下圖是在不同的時間點修改了限流值後的QPS變化圖:

    3.5 重試策略與故障感知

    ES中和DB中的數據要盡可能的保證即時一致性,但最終一致性是必須要保證的,所以數據推播、處理失敗的時候要進行重試,如何重試?

    首先需要了解下失敗的型別,制定合適的重試策略,知彼知己,百戰不殆嘛

    一、網路抖動導致的介面呼叫超時。在呼叫微服務RPC介面的時候,由於網路抖動等情況,會導致介面呼叫超時,但很快就會恢復,通常情況下也就偶爾一次,下一次呼叫就會正常

    二、數據處理邏輯異常。這種情況下,異常沒辦法自恢復,只能人工介入

    三、上遊服務異常。如上遊服務壓力過大導致介面呼叫失敗,這時候就需要我們緩一緩再繼續處理,不能一個勁的呼叫導致上遊服務崩潰掉

    結合上面的失敗型別的特點,費氏數列的重試策略就非常適合 費氏數列的特點是:1,1,2,3,5,8,13,21,34,55,89…

    當第一次失敗的時候,延時1秒後就重試,如果此時是網路抖動導致的超時,重試就成功了,不影響數據處理的速度 若失敗的次數越多,重試的間隔時間就會越長,這也會兼顧到上述二、三的失敗型別

    重試元件使用的是Guava Retry,簡單的虛擬碼如下:


    // 重試元件配置
    private final Retryer<Boolean> RETRYER = RetryerBuilder.<Boolean>newBuilder()
    // 對中斷類的異常不重試
    .retryIfException(input -> !isPauseException(input))
    // 1,1,2,3,5,8,13,21,33...
    .withWaitStrategy(WaitStrategies.fibonacciWait(1000, 30, TimeUnit.SECONDS))
    // 重試次數達到一定的次數後,不再重試
    .withStopStrategy(StopStrategies.stopAfterAttempt(MAX_RETRY_TIMES))
    .withRetryListener(new RetryListener() {
    @Override
    public <V> void onRetry(Attempt<V> attempt) {
    if (attempt.hasException()) {
    log.error("act=【DataFlushRpcCallRetry】desc=【重試】重試次數=【{}】重試異常=【{}】", attempt.getAttemptNumber(), attempt.getExceptionCause());
    // 重試超過閾值進行報警提醒
    alarmIfExceedThreshold(attempt);
    }
    }
    })
    .build();
    // 將執行邏輯抽象為Runnable,對外暴露該方法
    public void execute(Runnable runnable) {
    innerExecute(runnable,RETRYER);
    }

    private void innerExecute(Runnable runnable, Retryer<Boolean> retryer) {
    try {
    retryer.call(() -> {
    runnable.run();
    returntrue;
    });
    } catch (Exception e) {
    log.error("act=【DataFlushRpcCallRetry】desc=【重試異常】error=【{}】", e);
    throw new IllegalStateException(e);
    }
    }

    若重試到一定次數之後依然是失敗的話,則會將錯誤資訊發送到報警群。根據推播的資訊,可以明確知道錯誤的型別,重試的次數,以及任務的建立人等等資訊,無需檢視日誌,即可定位大部份的問題。如下圖:

    3.6 將數據推播給哪個服務來處理?-SPI機制

    ECP是個通用的服務,因此需要將共性功能收攏在一起做成成品,將非共性的功能抽象一下,交給各個對接方去實作。

    從簡單實作的角度來看,若有某個服務想要對接ECP,我們在ECP上開發一下,呼叫該服務的介面,將數據推播給該服務,思路雖清晰明了,但對接及維護成本極高,且沒有一個統一的規範,因此不可取,其流程如下圖:

    Java上有個很好的思想可以解決這個問題,那就是SPI。因此由ECP提供一個介面,制定一個規範,具體的ES索引數據的組裝邏輯由各個對接方去實作

    這樣,若有一個新的對接方接入,只要實作介面即可,ECP無需做任何改動

    至於服務發現,ECP采用的配置的方式,也就是在新建任務的時候,選擇數據推播的消費方服務,如下圖:

    對於實作方式,得益於公司內部自研的RPC框架,提供了動態指定呼叫服務的方式,虛擬碼如下:

    Reference<IEsIndexFlushAPI> reference = new Reference<>();
    // 設定呼叫的服務名
    reference.setServiceName(serviceName);
    // 設定介面名
    reference.setInterface class(IEsIndexFlushAPI. class);
    // 設定上下文
    reference.setApplicationConfig(applicationConfig);
    // 獲取介面例項
    IEsIndexFlushAPI iEsIndexFlushAPI = ES_INDEX_FLUSH_API_MAP.computeIfAbsent(serviceName, s -> reference.refer());
    // 介面呼叫
    log.info("act=【EsIndexFlushApiInvoker】desc=【請求值】serviceName=【{}】dataListSize=【{}】indexNameList=【{}】tag=【{}】", serviceName,request.getDataList().size(),request.getIndexNameList(),request.getTag() );
    EMApiResult<FlushResponse> result = iEsIndexFlushAPI.flush(request);

    3.7 環境隔離

    同步數據是個比較重的操作,這個操作不應該影響到線上業務 因此,同步數據的服務應當與線上服務隔離開 ECP整合了架構組提供的標簽路由功能,可以在整個請求鏈路中呼叫指定標簽的服務,實作環境隔離

    ECP標簽路由配置圖:

    如下圖,若在ECP上配置任務的標簽路由為FLUSH,則在同步任務執行過程中,會自動呼叫鏈路中繫結了FLUSH標簽的服務分組。

    若某些服務沒有配置為FLUSH標簽的分組,這時就會自動請求該服務的線上正常環境。這樣,就可以做到一定程度上的環境隔離

    3.8 探活與任務故障恢復機制

    在推播數據的過程中,若發生了不可描述的事情導致任務中斷,咋整?

    到了需求DeadLine,發現任務在某年某月某日進度為1%的時候停了,哭了。

    而且工作時間緊,任務重,總不能一定盯著任務,看有沒有中斷吧?這不適合,也不禮貌。

    當然,這種情況在ECP是不會發生的,因為ECP是有「自救包」的。下面聊下ECP的任務探活和中斷恢復機制

    如下圖,在ECP中有探活和任務故障恢復兩大元件 探活元件負責監控當前任務執行緒的執行狀態,若任務執行緒正在執行,則對該任務的存活時間進行續期 任務故障恢復元件負責掃描當前未完成的任務,若任務上次存活時間大於指定的閾值時,則拉取該任務恢復執行

    續期的虛擬碼如下:

    @Scheduled(fixedDelay = ScheduleTimeConstants.KEEP_ALIVE_MILLS)
    public void renewal(){
    futureMap.forEach((taskId,future)->{
    if (!future.isDone()){
    log.info("act=【renewal】desc=【任務續期】taskId=【{}】續期時間=【{}】",taskId, DateUtils.dateToString(new Date(),DateUtils.PATTERN));
    contextService.renewal(taskId);
    }else {
    log.info("act=【renewal】desc=【任務結束】taskId=【{}】",taskId);
    futureMap.remove(taskId);
    }
    });
    }

    任務故障恢復的虛擬碼如下:

    @Scheduled(fixedDelay = ScheduleTimeConstants.RESTART_TASK_MILLS)
    public void restartTask(){
    // 1.查詢當前未完成的任務
    List<TaskFlushExecuteContextPO> contextPOS = contextService.queryRunningTask();
    for (TaskFlushExecuteContextPO contextPO : contextPOS) {
    // 2.計算上次存活到當前的時間
    Integer durationMin = calculateTimeSinceLastAlive();
    // 3.若時間大於指定閾值 則對任務重新拉起
    if (durationMin >= MAX_DURATION_MIN){
    log.info("act=【restartTask】desc=【任務重新拉起】taskId=【{}】",contextPO.getTaskId());
    // 4.更新alive_time進行釘選 防止並行執行
    int i = contextExtMapper.casUpdateAliveTime();
    if (i >0){
    // 5.重新拉起任務
    restart0(contextPO, aliveTime);
    }
    }
    }
    }

    3.9 平滑遷移的實作

    將數據同步到ES,通常有兩種方式:

    1. 直接把數據同步到原索引上

    2. 新建一個索引,利用雙寫以及切換別名的方式實作流量的平滑遷移。

    對於新建一個索引的場景,往往是索引Mapping的改變,或者是為了不影響原索引,保證操作可回滾

    針對這種場景,ECP分析了歷來大家手動操作刷ES索引的步驟,將流程進行抽象,歸納了以下幾個步驟,如下圖:

    ECP提供了平滑遷移元件,其內部整合了Apollo配置中心實作推播能力,其簡要的實作流程如下圖:

    3.10 優雅的日誌記錄

    如下圖所示展示了該任務操作的日誌,原則上日誌記錄為非核心業務,需要與核心業務程式碼進行剝離,因此使用註解式流水記錄是個很好的選擇

    但註解式流水記錄有個問題,就是在很多的場景下,流水裏面的值需要動態獲取,利用註解可以實作嗎? 答案是可以的,在上圖所示中,任務ID、數據來源都是動態數據,那如何實作的呢?看下面程式碼:

    @Flow(subjectIdEp = "#taskPO.id",subjectType = SubjectTypeEnum.TASK,operateFlowType = OperateFlowTypeEnum.CREATE_TASK,content = "'建立任務,任務ID:' + #taskPO.id ")
    public void saveTaskWithUser(TaskPO taskPO) {
    String name = LoginUserContext.get().getName();
    taskPO.setCreator(name);
    taskPO.setModifier(name);
    taskMapper.insertSelective(taskPO);
    }

    subjectIdEp為流水主題ID,#taskPo.id為一個運算式,可用動態獲取參數taskPo中的id值,這裏利用了springEl運算式的能力

    content = "'建立任務,任務ID:' + #taskPO.id " 為流水資訊,同樣利用了springEL運算式,動態獲取請求參數taskPo中的id資訊

    但有些資訊需要一系列的計算才可以獲取到,而不是單純的從物件中取值,這也是可以實作的。如下:

    @Flow(subjectIdEp = "#contextPO.taskId",
    subjectType = SubjectTypeEnum.TASK,
    operateFlowType = OperateFlowTypeEnum.DATA_FLUSH,
    content = "'【數據同步】異常中斷任務恢復執行,中斷時間:' + T(com.zhuanzhuan.esmanage.utils.DateUtils).dateToStringSimple(#aliveTime)")
    @Transactional(rollbackFor = Exception. class,isolation = Isolation.REPEATABLE_READ)
    public void restart0(TaskFlushExecuteContextPO contextPO, Date aliveTime) {
    log.info("act=【restartTask】desc=【任務重新拉起】taskId=【{}】原aliveTime=【{}】", contextPO.getTaskId(), aliveTime);
    dsProcessorExecutor.executeAndKeepAliveMonitor(contextPO.getTaskId());
    }

    其中 T(com.zhuanzhuan.esmanage.utils.DateUtils).dateToStringSimple(#aliveTime) 代表執行的是 DateUtils.dateToStringSimple 方法,也就是說運算式是可以呼叫方法的,包括從spring容器中獲取物件,呼叫物件的方法均可。

    這種註解式流水的實作原理,就是利用SPEL運算式和Spring Aop的特性,寫一個切面,攔截自訂的flow註解即可,虛擬碼如下:


    // 定義切面,攔截FLOW註解
    @Around("@annotation(com.zhuanzhuan.esmanage.entity.annotation.Flow)")
    public Object around(ProceedingJoinPoint point) throws Throwable {
    // 呼叫目標方法
    Object result = null;
    try {
    result = point.proceed();
    recordFlow(point,result);
    return result;
    } catch (Throwable e) {
    recordException(point,e);
    throw e;
    }
    }

    // 流水記錄的實作
    private void recordFlow(ProceedingJoinPoint point, Object result) {
    // try catch 防止影響主邏輯
    //TODO 看是否需要寫在一個事務中,主要評估流水的重要性
    try {
    MethodSignature signature = (MethodSignature) point.getSignature();
    Flow flowAnnotation = getFlowAnnotation(signature);
    // 組裝參數上下文
    EvaluationContext evaluationContext = buildContext(point, signature);
    evaluationContext.setVariable("result",result);
    // ID運算式
    String subjectIdEp = flowAnnotation.subjectIdEp();
    // content運算式
    String content = getContent(flowAnnotation, evaluationContext);
    // SPEL解析運算式
    Expression expression = PARSER.parseExpression(subjectIdEp);
    Integer subjectId = (Integer)expression.getValue(evaluationContext);
    record(flowAnnotation, subjectId, content);
    } catch (Exception e) {
    log.error("記錄操作流水失敗", e);
    }
    }






    4 總結

    總得來說,ECP的實作中有很多的技術細節需要考慮,技術難度一般。

    實際上,在我們大部份的計畫中,考驗的就是對細節的把控~

    ps:感謝ChatGPT對本文名稱的大力支持

    如喜歡本文,請點選右上角,把文章分享到朋友圈
    如有想了解學習的技術點,請留言給若飛安排分享

    因公眾號更改推播規則,請點「在看」並加「星標」 第一時間獲取精彩技術分享

    ·END·

    相關閱讀:

    作者:閆展,轉轉交易中台研發工程師

    來源:轉轉技術

    版權申明:內容來源網路,僅供學習研究,版權歸原創者所有。如有侵權煩請告知,我們會立即刪除並表示歉意。謝謝!

    架構師

    我們都是架構師!

    關註 架構師(JiaGouX),添加「星標」

    獲取每天技術幹貨,一起成為牛逼架構師

    技術群請 加若飛: 1321113940 進架構師群

    投稿、合作、版權等信箱: [email protected]