1 這是一個背景
2 捋一捋訂單數據同步到ES中的復雜度
2.1 數據同步ES索引流程
2.2 來梳理下是否有難點?
3 神奇的服務
3.1 ECP的簡單執行流程
3.2 多資料來源數據讀取
3.3 SQL的解析與校驗
3.4 動態限流的實作
3.5 重試策略與故障感知
3.6 將數據推播給哪個服務來處理?-SPI機制
3.7 環境隔離
3.8 探活與任務故障恢復機制
3.9 平滑遷移的實作
3.10 優雅的日誌記錄
4 總結
1 這是一個背景
最近接了一個需求,要提供一個隨意組合多個條件來查詢訂單數據的功能,看著資料庫裏過億的訂單量,頭發不爭氣的又脫落了兩根代表這個需求不簡單
脫落的兩根頭發,不是技術實作上很難,其實技術實作上清晰明了,就是透過數據異構,將數據同步到ES,利用ES的倒排索引、緩存等能力,提供多條件復雜查詢的能力,而ES集群我們已經有了
但有些數據,在目前的ES索引中是不存在的,也就是說,我需要將過億的訂單數據從訂單資料庫重新刷一遍到ES中,而這一頓操作下來得需要一周的時間!
什麽?你不信,那咱們來捋一捋
2 捋一捋訂單數據同步到ES中的復雜度
2.1 數據同步ES索引流程
如上圖所示,就是將數據同步到ES索引的過程。
首先需要從訂單資料庫查詢所有的訂單數據,然後根據訂單數據上保存的使用者ID,商品ID等資訊從使用者服務,商品服務查詢相關資訊,經過處理與組裝後落到ES集群中。
之所以要查詢使用者資訊和商品資訊,是因為異構在ES索引中的訂單數據,並不會與mysql中的數據一一對應,有很多根據商品類目,使用者資訊等查詢訂單資訊的訴求存在,因此在這裏就需要查詢很多的上遊服務來組裝資訊
2.2 來梳理下是否有難點?
從資料庫把上億的訂單數據出來。這個操作不能影響到線上業務,因此查詢的訂單資料庫一般是從庫,OK,配置多資料來源來讀取數據吧,而且上億的訂單一般采用的都是分庫分表來儲存的,我們是分了16個庫,每個庫16個表,總共256張表,嘿嘿
上億的訂單數據不能一次性全部讀取到記憶體吧,不然記憶體冒煙都存不下啊。所以得考慮分頁,分頁直接limit也不好,隨著數據量越大,速度越慢,所以得考慮一個遊標,嗯,選一個欄位當遊標吧,遊標最好唯一且遞增
從多個服務獲取數據,這些數據所在的服務一般都屬於公司的其它部門,讀取數據的時候也不能影響到人家的服務吧,你這裏查詢的是嘎嘎猛,一看人家的服務都崩了,這個黑鍋就飛來了。所以這裏得考慮限流吧,得考慮隔離吧?不說全鏈路隔離,成本太高,起碼關鍵服務得隔離一下
數據同步一段時間,產品來問,同步多久了啊,大概還有多久能完成啊,數據量大概是多少啊,一臉懵,不知道啊。
如果中途同步失敗了,咋處理啊,是不是得重試,咋重試,重試策略是啥?失敗有沒有報警,能不能及時感知並處理啊?如果同步一段時間中斷了咋整啊?有沒有記錄從哪中斷的?能否從中斷處繼續同步啊,不然從頭開始又得N天,哭了
同步了一部份,發現有問題需要暫停一會,咋整?
如果只想同步部份數據不一致的訂單數據,可能就2,3個訂單,咋整,是不是還得提供按照手動輸入訂單ID同步ES數據的能力?
同步過程是咋樣的?開始時間?結束時間?共耗時多久?操作人是誰?這些統計數據從哪來?
想夜深人靜的時候同步數據,這有時候對業務的影響小,定個鬧鐘晚上起?
現在不單需要同步訂單的數據了,還需要同步商品ES集群的數據,這些邏輯還得重新寫一遍?
啊啊啊啊,想想都頭疼啊
所以,一些事情看著簡單,其實並沒有那麽簡單
3 神奇的服務
為了讓頭發更有歸屬感,針對上述的難點開發了一款神奇的服務,那就是ECP。它可以將整個流程自動化、視覺化的處理,降低數據異構到ES的成本 任務界面如下所示:
3.1 ECP的簡單執行流程
簡單來說,ECP的作用就是將數據從資料來源讀取出來,然後推播給ES寫服務。因為數據處理的邏輯因不同的業務而異,ES寫服務由各個對接方來實作,因此一個簡單的流程如下圖:
這裏面涉及到一些技術細節,比如如何進行多資料來源數據讀取,資料來源配置,sql校驗,動態限流、SPI機制、重試策略與故障感知、探活與故障恢復,環境隔離等等。
下面一一介紹下
3.2 多資料來源數據讀取
ECP支持目前支持三個資料來源數據的讀取,分別為ID源,文本源、以及指令碼源
3.2.1 ID源
有個文字域用來輸入ID。這種場景適用於小數據的數據同步,比如發現一些資料庫和ES的數據不一致了,就簡單的刷一下數據
3.2.2 檔源
檔源指的是資料來源來源於文字檔案,適合中等數據的同步。ECP和物件儲存進行了對接,使用者可以上傳檔至物件儲存,在任務執行時,ECP會讀取物件儲存中的文本數據。
這種情況需要註意的是,使用者上傳的檔有可能會比較大,直接都讀取到記憶體再處理不現實,因此這裏采用的是流的方式進行讀取,讀取一批次處理一批,再釋放一批,不會造成OOM
簡化的處理方式如下:
try (Response response = OK_HTTP_CLIENT.newCall(request).execute()) {
if (!response.isSuccessful()) {
throw new IOException("Unexpected code " + response);
}
// 以流的方式讀取檔數據
InputStream inputStream = response.body().byteStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
}
3.2.3 指令碼源
指令碼源適用於大數據量的數據同步。
指令碼本質上就是SQL和資料來源的結合。
使用者在ECP中配置資料庫的連線資訊,然後配置SQL。ECP會執行該SQL,將數據從配置的資料庫中讀取出來,推播到ES寫服務中。
指令碼源可以支持上億數據的讀取與推播,如下圖為訂單庫(分庫分表)配置的指令碼資訊:
3.2.4 指令碼源大數據讀取的實作
將幾億數據讀取到記憶體中來處理顯然不可能,因此采用局部數據的讀取與處理才是正道。
在業務中,經常使用的是分頁,但分頁如果僅是使用limit offset,size,待offset的值比較大時,效能會急劇下降,形成慢SQL,甚至拖累整個資料庫的效能。
因此在分頁數量比較大時,需要指定一個有索引的欄位作為遊標,該遊標可以提高分頁的效能,如在訂單表中,若在訂單ID是遞增的且有設定了索引,SQL就可以這麽寫:
select * from t_order where order_id > xxx order by order_id desc limit 10
; 利用order_id值的變化就可以起到分頁的效果
這種方式雖好,但讓使用者選定遊標索引無疑增加了使用的門檻,因此ECP沒有采用上述分頁的形式來讀取大數據,而是采用JDBC遊標查詢的方式,如下所示:
// 建立連線
conn = DriverManager.getConnection(url, param.getDsUsername(), param.getDsPassword());
// 建立查詢
stmt = conn.createStatement(java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
stmt.setFetchSize(param.getFetchSize());
遊標查詢每次讀取fetchSize大小的數據量,可以很好的避免讀取大數據量導致的OOM問題
3.3 SQL的解析與校驗
使用者配置SQL指令碼,ECP需要對該SQL指令碼進行校驗與修改,傳統的字串處理(比如正則)雖然在一定情況下可以滿足需求,但是容易出錯。因此ECP采用的是Druid的SQL解析工具包,可以將SQL解析成AST語法樹,以便對SQL進行各種處理。如下圖所示:
ECP提供的數據樣例查詢,會對SQL自動拼接上limit 1
3.4 動態限流的實作
限流分集群限流和單機限流,經過評估,在能簡單就簡單的原則下,我們采用的是單機限流,限流元件使用的是guava的RateLimiter
當在頁面上修改QPS的值時,會將該值同步到資料庫中,有個排程任務會不斷地掃描該值的變動,將變動的值同步到RateLimiter元件中
當然,也可以采用數據監聽的策略(比如廣播MQ),讓變動值同步到RateLimiter更及時,但這種方式還需引入其它元件,復雜度嗷嗷上升,不符合我們簡單實作的策略
動態限流的實作流程如下;
如下圖是在不同的時間點修改了限流值後的QPS變化圖:
3.5 重試策略與故障感知
ES中和DB中的數據要盡可能的保證即時一致性,但最終一致性是必須要保證的,所以數據推播、處理失敗的時候要進行重試,如何重試?
首先需要了解下失敗的型別,制定合適的重試策略,知彼知己,百戰不殆嘛
一、網路抖動導致的介面呼叫超時。在呼叫微服務RPC介面的時候,由於網路抖動等情況,會導致介面呼叫超時,但很快就會恢復,通常情況下也就偶爾一次,下一次呼叫就會正常
二、數據處理邏輯異常。這種情況下,異常沒辦法自恢復,只能人工介入
三、上遊服務異常。如上遊服務壓力過大導致介面呼叫失敗,這時候就需要我們緩一緩再繼續處理,不能一個勁的呼叫導致上遊服務崩潰掉
結合上面的失敗型別的特點,費氏數列的重試策略就非常適合 費氏數列的特點是:1,1,2,3,5,8,13,21,34,55,89…
當第一次失敗的時候,延時1秒後就重試,如果此時是網路抖動導致的超時,重試就成功了,不影響數據處理的速度 若失敗的次數越多,重試的間隔時間就會越長,這也會兼顧到上述二、三的失敗型別
重試元件使用的是Guava Retry,簡單的虛擬碼如下:
// 重試元件配置
private final Retryer<Boolean> RETRYER = RetryerBuilder.<Boolean>newBuilder()
// 對中斷類的異常不重試
.retryIfException(input -> !isPauseException(input))
// 1,1,2,3,5,8,13,21,33...
.withWaitStrategy(WaitStrategies.fibonacciWait(1000, 30, TimeUnit.SECONDS))
// 重試次數達到一定的次數後,不再重試
.withStopStrategy(StopStrategies.stopAfterAttempt(MAX_RETRY_TIMES))
.withRetryListener(new RetryListener() {
@Override
public <V> void onRetry(Attempt<V> attempt) {
if (attempt.hasException()) {
log.error("act=【DataFlushRpcCallRetry】desc=【重試】重試次數=【{}】重試異常=【{}】", attempt.getAttemptNumber(), attempt.getExceptionCause());
// 重試超過閾值進行報警提醒
alarmIfExceedThreshold(attempt);
}
}
})
.build();
// 將執行邏輯抽象為Runnable,對外暴露該方法
public void execute(Runnable runnable) {
innerExecute(runnable,RETRYER);
}
private void innerExecute(Runnable runnable, Retryer<Boolean> retryer) {
try {
retryer.call(() -> {
runnable.run();
returntrue;
});
} catch (Exception e) {
log.error("act=【DataFlushRpcCallRetry】desc=【重試異常】error=【{}】", e);
throw new IllegalStateException(e);
}
}
若重試到一定次數之後依然是失敗的話,則會將錯誤資訊發送到報警群。根據推播的資訊,可以明確知道錯誤的型別,重試的次數,以及任務的建立人等等資訊,無需檢視日誌,即可定位大部份的問題。如下圖:
3.6 將數據推播給哪個服務來處理?-SPI機制
ECP是個通用的服務,因此需要將共性功能收攏在一起做成成品,將非共性的功能抽象一下,交給各個對接方去實作。
從簡單實作的角度來看,若有某個服務想要對接ECP,我們在ECP上開發一下,呼叫該服務的介面,將數據推播給該服務,思路雖清晰明了,但對接及維護成本極高,且沒有一個統一的規範,因此不可取,其流程如下圖:
Java上有個很好的思想可以解決這個問題,那就是SPI。因此由ECP提供一個介面,制定一個規範,具體的ES索引數據的組裝邏輯由各個對接方去實作
這樣,若有一個新的對接方接入,只要實作介面即可,ECP無需做任何改動
至於服務發現,ECP采用的配置的方式,也就是在新建任務的時候,選擇數據推播的消費方服務,如下圖:
對於實作方式,得益於公司內部自研的RPC框架,提供了動態指定呼叫服務的方式,虛擬碼如下:
Reference<IEsIndexFlushAPI> reference = new Reference<>();
// 設定呼叫的服務名
reference.setServiceName(serviceName);
// 設定介面名
reference.setInterface class(IEsIndexFlushAPI. class);
// 設定上下文
reference.setApplicationConfig(applicationConfig);
// 獲取介面例項
IEsIndexFlushAPI iEsIndexFlushAPI = ES_INDEX_FLUSH_API_MAP.computeIfAbsent(serviceName, s -> reference.refer());
// 介面呼叫
log.info("act=【EsIndexFlushApiInvoker】desc=【請求值】serviceName=【{}】dataListSize=【{}】indexNameList=【{}】tag=【{}】", serviceName,request.getDataList().size(),request.getIndexNameList(),request.getTag() );
EMApiResult<FlushResponse> result = iEsIndexFlushAPI.flush(request);
3.7 環境隔離
同步數據是個比較重的操作,這個操作不應該影響到線上業務 因此,同步數據的服務應當與線上服務隔離開 ECP整合了架構組提供的標簽路由功能,可以在整個請求鏈路中呼叫指定標簽的服務,實作環境隔離
ECP標簽路由配置圖:
如下圖,若在ECP上配置任務的標簽路由為FLUSH,則在同步任務執行過程中,會自動呼叫鏈路中繫結了FLUSH標簽的服務分組。
若某些服務沒有配置為FLUSH標簽的分組,這時就會自動請求該服務的線上正常環境。這樣,就可以做到一定程度上的環境隔離
3.8 探活與任務故障恢復機制
在推播數據的過程中,若發生了不可描述的事情導致任務中斷,咋整?
到了需求DeadLine,發現任務在某年某月某日進度為1%的時候停了,哭了。
而且工作時間緊,任務重,總不能一定盯著任務,看有沒有中斷吧?這不適合,也不禮貌。
當然,這種情況在ECP是不會發生的,因為ECP是有「自救包」的。下面聊下ECP的任務探活和中斷恢復機制
如下圖,在ECP中有探活和任務故障恢復兩大元件 探活元件負責監控當前任務執行緒的執行狀態,若任務執行緒正在執行,則對該任務的存活時間進行續期 任務故障恢復元件負責掃描當前未完成的任務,若任務上次存活時間大於指定的閾值時,則拉取該任務恢復執行
續期的虛擬碼如下:
@Scheduled(fixedDelay = ScheduleTimeConstants.KEEP_ALIVE_MILLS)
public void renewal(){
futureMap.forEach((taskId,future)->{
if (!future.isDone()){
log.info("act=【renewal】desc=【任務續期】taskId=【{}】續期時間=【{}】",taskId, DateUtils.dateToString(new Date(),DateUtils.PATTERN));
contextService.renewal(taskId);
}else {
log.info("act=【renewal】desc=【任務結束】taskId=【{}】",taskId);
futureMap.remove(taskId);
}
});
}
任務故障恢復的虛擬碼如下:
@Scheduled(fixedDelay = ScheduleTimeConstants.RESTART_TASK_MILLS)
public void restartTask(){
// 1.查詢當前未完成的任務
List<TaskFlushExecuteContextPO> contextPOS = contextService.queryRunningTask();
for (TaskFlushExecuteContextPO contextPO : contextPOS) {
// 2.計算上次存活到當前的時間
Integer durationMin = calculateTimeSinceLastAlive();
// 3.若時間大於指定閾值 則對任務重新拉起
if (durationMin >= MAX_DURATION_MIN){
log.info("act=【restartTask】desc=【任務重新拉起】taskId=【{}】",contextPO.getTaskId());
// 4.更新alive_time進行釘選 防止並行執行
int i = contextExtMapper.casUpdateAliveTime();
if (i >0){
// 5.重新拉起任務
restart0(contextPO, aliveTime);
}
}
}
}
3.9 平滑遷移的實作
將數據同步到ES,通常有兩種方式:
直接把數據同步到原索引上
新建一個索引,利用雙寫以及切換別名的方式實作流量的平滑遷移。
對於新建一個索引的場景,往往是索引Mapping的改變,或者是為了不影響原索引,保證操作可回滾
針對這種場景,ECP分析了歷來大家手動操作刷ES索引的步驟,將流程進行抽象,歸納了以下幾個步驟,如下圖:
ECP提供了平滑遷移元件,其內部整合了Apollo配置中心實作推播能力,其簡要的實作流程如下圖:
3.10 優雅的日誌記錄
如下圖所示展示了該任務操作的日誌,原則上日誌記錄為非核心業務,需要與核心業務程式碼進行剝離,因此使用註解式流水記錄是個很好的選擇
但註解式流水記錄有個問題,就是在很多的場景下,流水裏面的值需要動態獲取,利用註解可以實作嗎? 答案是可以的,在上圖所示中,任務ID、數據來源都是動態數據,那如何實作的呢?看下面程式碼:
@Flow(subjectIdEp = "#taskPO.id",subjectType = SubjectTypeEnum.TASK,operateFlowType = OperateFlowTypeEnum.CREATE_TASK,content = "'建立任務,任務ID:' + #taskPO.id ")
public void saveTaskWithUser(TaskPO taskPO) {
String name = LoginUserContext.get().getName();
taskPO.setCreator(name);
taskPO.setModifier(name);
taskMapper.insertSelective(taskPO);
}
subjectIdEp為流水主題ID,#taskPo.id為一個運算式,可用動態獲取參數taskPo中的id值,這裏利用了springEl運算式的能力
content = "'建立任務,任務ID:' + #taskPO.id " 為流水資訊,同樣利用了springEL運算式,動態獲取請求參數taskPo中的id資訊
但有些資訊需要一系列的計算才可以獲取到,而不是單純的從物件中取值,這也是可以實作的。如下:
@Flow(subjectIdEp = "#contextPO.taskId",
subjectType = SubjectTypeEnum.TASK,
operateFlowType = OperateFlowTypeEnum.DATA_FLUSH,
content = "'【數據同步】異常中斷任務恢復執行,中斷時間:' + T(com.zhuanzhuan.esmanage.utils.DateUtils).dateToStringSimple(#aliveTime)")
@Transactional(rollbackFor = Exception. class,isolation = Isolation.REPEATABLE_READ)
public void restart0(TaskFlushExecuteContextPO contextPO, Date aliveTime) {
log.info("act=【restartTask】desc=【任務重新拉起】taskId=【{}】原aliveTime=【{}】", contextPO.getTaskId(), aliveTime);
dsProcessorExecutor.executeAndKeepAliveMonitor(contextPO.getTaskId());
}
其中
T(com.zhuanzhuan.esmanage.utils.DateUtils).dateToStringSimple(#aliveTime)
代表執行的是
DateUtils.dateToStringSimple
方法,也就是說運算式是可以呼叫方法的,包括從spring容器中獲取物件,呼叫物件的方法均可。
這種註解式流水的實作原理,就是利用SPEL運算式和Spring Aop的特性,寫一個切面,攔截自訂的flow註解即可,虛擬碼如下:
// 定義切面,攔截FLOW註解
@Around("@annotation(com.zhuanzhuan.esmanage.entity.annotation.Flow)")
public Object around(ProceedingJoinPoint point) throws Throwable {
// 呼叫目標方法
Object result = null;
try {
result = point.proceed();
recordFlow(point,result);
return result;
} catch (Throwable e) {
recordException(point,e);
throw e;
}
}
// 流水記錄的實作
private void recordFlow(ProceedingJoinPoint point, Object result) {
// try catch 防止影響主邏輯
//TODO 看是否需要寫在一個事務中,主要評估流水的重要性
try {
MethodSignature signature = (MethodSignature) point.getSignature();
Flow flowAnnotation = getFlowAnnotation(signature);
// 組裝參數上下文
EvaluationContext evaluationContext = buildContext(point, signature);
evaluationContext.setVariable("result",result);
// ID運算式
String subjectIdEp = flowAnnotation.subjectIdEp();
// content運算式
String content = getContent(flowAnnotation, evaluationContext);
// SPEL解析運算式
Expression expression = PARSER.parseExpression(subjectIdEp);
Integer subjectId = (Integer)expression.getValue(evaluationContext);
record(flowAnnotation, subjectId, content);
} catch (Exception e) {
log.error("記錄操作流水失敗", e);
}
}
4 總結
總得來說,ECP的實作中有很多的技術細節需要考慮,技術難度一般。
實際上,在我們大部份的計畫中,考驗的就是對細節的把控~
ps:感謝ChatGPT對本文名稱的大力支持
如喜歡本文,請點選右上角,把文章分享到朋友圈
如有想了解學習的技術點,請留言給若飛安排分享
因公眾號更改推播規則,請點「在看」並加「星標」 第一時間獲取精彩技術分享
·END·
相關閱讀:
作者:閆展,轉轉交易中台研發工程師
來源:轉轉技術
版權申明:內容來源網路,僅供學習研究,版權歸原創者所有。如有侵權煩請告知,我們會立即刪除並表示歉意。謝謝!
架構師
我們都是架構師!
關註 架構師(JiaGouX),添加「星標」
獲取每天技術幹貨,一起成為牛逼架構師
技術群請 加若飛: 1321113940 進架構師群
投稿、合作、版權等信箱: [email protected]