摘 要 : 本文整理自 Apache Yarn && Flink Contributor,阿裏巴巴智慧引擎事業部技術專家王偉駿(鴻歷)老師在 5月16日 Streaming Lakehouse Meetup · Online 上的分享。內容主要分為以下三個部份:
阿裏智慧引擎 AI 業務背景介紹
引入 Paimon 原因、場景及預期收益
遇到的問題及解法
Tips: 點選 「閱讀原文」 線上觀看 FFA 2023 會後資料~
01
阿裏智慧引擎 AI 業務背景介紹
1.1 業務場景及特點
首先介紹一下阿裏智慧引擎事業部中AI相關的業務背景。
我們平台支持多種大數據離線處理鏈路,比如說搜推廣鏈路、演算法工程鏈路、模型推理鏈路等,這些數據處理鏈路的業務場景基本可以涵蓋為上述圖示流程。
最左邊是各種搜推廣等引擎依賴的原始數據,主要來源於業務的事務數據、演算法數據以及使用者的點選事件日誌等等,它們分布在不同的儲存系統中。最右邊的引擎想利用這些數據就需要一個離線系統,將不同維度的數據聚合到一起,以提供給不同的引擎使用。可以設想一下要開發這樣一個離線系統,需要面臨哪些痛點及難點。
(1) 異構資料來源多:資料來源來源於眾多異構資料來源,所以我們需要對接各類儲存引擎,以此橫向擴充套件平台的功能。
(2) 業務邏輯復雜:業務邏輯非常復雜,所以流批很難一體,很難做到多個批流作業之間的無縫完美銜接。
(3) 效能調優難、運維門檻高:在效能和運維方面由於涉及大數據元件非常多,需要了解很多計算引擎及儲存系統的內部實作細節,所以運維排查問題困難,作業效能調優要考慮的因素也很多。
1.2 產品介紹及成果
為了降低業務離線開發和運維的門檻,減少業務接入的成本和提高業務叠代的效率,我們研發和建設了大數據離線處理平台,提供 AI 領域端到端的 ETL 數據處理解決方案。
(1) 一站式平台
本平台是從開發到運維的一站式平台,使用者可以透過拖拉拽 UI 方式開發,沒有大數據背景的人也能使用。平台遮蔽了背後的大數據技術,進一步降低使用者使用門檻。
(2) 端到端開發
在開發上,從資料來源到引擎,平台會把一個 ETL 流程轉換成多個流批作業,平台管理背後所有作業依賴和儲存的對接。
(3) 流批一體
在流批一體方面使用者只需一次開發就能實作流批一體,內部同一份儲存可同時用於流處理和批次處理。
目前平台套用規模、作業規模、日數據處理量都很大,在增量 Tps 達到百萬級的情況下能給使用者帶來秒級延時的體驗,已經連續支持雙 11 多年。
1.3 產品技術架構
(1) 依賴元件
① 計算
從下往上看,首先在平台的 Runtime 層的計算維度,依賴支持 K8S 協定的統一資源池來提供計算資源。 目前主要是透過企業版 Flink 即 VVR 以流批一體的方式,將各種資料來源的數據經過大量復雜計算,最終寫入不同的儲存介質中供下遊使用。 另外一種業界大家熟知的計算引擎 Spark目前正在接入中,平台很快就會呈現出支持多引擎的狀況。 由於Spark 的接入,所以我們正在借鑒 Seatunnel 來重構 Connector 模組,統一計算引擎 API 介面,以此在 Connector 層面實作支持多引擎的目的。 同樣,由於多引擎的接入,我們重新設計 UDxF 元件,使用者只需寫一套 UDxF 的程式碼提供給平台,我們自動將其 Translator 成不同計算引擎的 UDxF。 透過 VVP 的 JAVA SDK 來統一送出作業,按需在頁面上對作業進行更進一步的運維及開發。
② 儲存
依賴阿裏內部自研的 Pangu 和 Swift,作為底層分布式檔案系統和訊息佇列,用 Hologres 來滿足業務對於高效能的數據掃描和點查等需求。數據湖格式選用的是Paimon,湖表儲存最佳化服務是我們正在調研的,主要是對大量 Paimon 表做 dedicated-compaction 以及對多種儲存引擎底層 SST 檔的儲存最佳化,進而提升大表讀寫吞吐效能。上面是統一的 Catalog 後設資料服務,它整合了各種表的資源建立,資源回收、Meta、版本、訂閱及血緣管理的功能。
③ 排程
平台的依賴 Airflow實作多作業的排程編排,流批作業的銜接等,依賴 Hippo申請集群資源。
(2) 核心功能
透過這些依賴的底層元件我們平台向上提供了很多大數據處理相關功能,比如數據整合,支持使用者自訂外掛程式的流批一體計算,樣本處理,OLAP 等核心能力。
(3) 產品端
透過核心能力,在產品端就能結合各種業務場景來提供多種端到端的大數據處理解決方案。比如在收推場景下的收推平台、樣本場景下的樣本處理平台等。
(4) 支持業務
有了解決方案,平台目前支持的圖中最上方所示阿裏內部幾乎所有業務線的各種數據處理的需求。
綜上所述,平台的核心能力來源於底層依賴的各種計算、儲存、排程的元件,它們是給平台上層業務賦能的動力源泉。為了更好的支持各種業務源源不斷的復雜新需求,我們必須持續更新叠代底層的計算儲存元件,所以我們今年開始引入了 Paimon。
02
引入 Paimon 原因、場景及預期收益
以上是一些關於平台背景的相關內容,下面重點說下平台引入 Paimon 的原因、場景及預期收益。
2.1 引入 Paimon 原因
引入 Paimon的原因主要是四個方面。
(1) 公司戰略
公司要建立集團數據湖生態,湖倉協同,促進集團數據資產集中儲存,高效使用。
(2) 成本
儲存成本居高不下,很多實效性要求不高的場景,其實沒必要用成本較高的分布式儲存服務來支持。
(3) 解決 Lambda 架構缺點
Lambda架構開發維護復雜存在資源浪費情況,我們這邊也有類似的現象,所以考慮引入Paimon。
(4) 最佳化
我們呼叫發現數據湖在某些場景下可以解決業務效能瓶頸。
基於以上幾個原因,我們深度對比了業界幾大數據湖產品(Paimon、Iceberg、Hudi)後,結合業務需求及社群發展情況等因素綜合考慮,最終選擇了 Apache Paimon 作為我們數據湖的湖格式。
2.2 探索場景及預期收益一、樣本生成鏈路
以下是樣本生成鏈路的大致處理邏輯,也是要介紹的第一個場景——樣本生成鏈路。
這條鏈路的特點主要有:第一,時效性要求不高,5 分鐘左右;第二、數據量大,所以目前依賴的儲存成本很高;第三、計算邏輯相當復雜。
簡述:在樣本生成過程中,會分別消費使用者點選日誌和一些Odps表數據,進行寬表加工,及大量 JOIN 操作和復雜的 ETL 等計算邏輯,生成樣本特征及label。最終會將生成的樣本數據進行持久化,寫入到不同的目標系統中。當然實際處理邏輯遠比這裏要復雜的多。
這條數據處理鏈路中,流批是完全分開的兩條鏈路,計算儲存均沒做到統一,開發維護成本偏高。更重要的是,這條時效性要求不高的鏈路的儲存成本卻一直居高不下,所以我們目前正在探索、嘗試將 Paimon 引入進來。
以上便是我們目前正在探索和嘗試的新架構。全鏈路不再有分布式 KV 儲存服務,而是用 Paimon 作為數據映像及 DimJoin 維表等來實作樣本處理過程中的數據儲存需求。
預期達到的收益 :
(1) 做到真正的流批一體,流批計算引擎統一為 Flink,儲存統一為 Paimon,同一份儲存,既可以被用於流處理,也可以被用於批次處理。明顯可以降低業務開發維護的成本。
(2) 可根據業務邏輯來決定是否共享部份儲存資源,如圖中間的paimon 表。
(3) 在某些情況下用 DimJoin 替換以前的 SortMergeJoin,提升效能。
(4) 由於沒有了分布式 KV 儲存服務,可以減少很多儲存服務的成本。
2.3 探索場景及預期收益二、批樣本儲存鏈路
這是第二個場景——批樣本儲存鏈路,該鏈路是將樣本平台產出的批樣本發往訊息隊 列給索引平台 Build 成線上檢索引擎所需的 ORC 格式檔,以共使用者分析使用。
該鏈路有明顯的幾個缺點:
第一,索引平台讀取訊息佇列中的樣本數據 Build 索引的過程會有長尾,導致產出延遲。
第二,依賴元件多,整體鏈路太繞,導致運維成本高,可控性差。
所以我們探索是否能讓線上檢索引擎支持辨識 Paimon 這種湖格式,樣本平台就可以直接將樣本數據寫入 Paimon 中。如果實作,那依賴元件減少,產出延遲也就可控,運維及費用成本均可降低。
2.4 探索場景及預期收益三、圖片特征計算鏈路
這是探索的第三個場景——圖片特征計算鏈路。在圖片特征計算場景下,該鏈路的時效性要求不高,主要是計算圖片的特征。但是由於圖片數量多,達到百億級,所以透過 TFS / OSS 拉取圖片經常導致伺服端壓力過高,甚至雪崩、限流也經常發生,所以我們引入了 KV系統作為 Cache。該鏈路是利用Flink作業動態分析圖片的計算特征,當 Flink Batch Job 查詢該服務發現圖片不存在時,則會透過 HTTP 向 TFS 也就是圖片中心服務請求圖片,然後發往訊息佇列中來更新 Cache。
該鏈路有幾個缺點:
第一,原系統 Partition 數受限,所以我們分了十多張 KV 表存圖片 Cache,使用者使用不便。
第二,每天上億個新圖片,伺服端 Build 鏈路不穩定,經常遇到 OOM 和 磁盤不足等問題。
由於 Paimon 支持作為維表被點查的,所以我們目前正在嘗試將 Paimon 引入進來當 Cache,替換原 KV 系統。
如果能實作,則預期收益是:
第一,由於去除了 KV 系統的伺服端,所以不存在 Build 鏈路不穩定的情況,成本也能相應下降。
第二,Paimon 表的 Bucket 數可以設定成很大,一張表足以,方便使用者使用。
2.5 探索場景及預期收益四、搜尋離線鏈路
該鏈路是搜尋平台較典型離線處理鏈路。
簡單介紹下,首先在同步層,多個商品維度源表與多個商家維度源表分別透過 Flink 流批作業全增量同步到分布式 KV 儲存系統中,作映像表使用。在 Join 層,透過一些 Flink 流批作業將各維度的映像表數據合並在一起做打寬處理,最終得到淘寶一件商品的完整數據資訊。然後全增量分別寫到分布式 KV 儲存系統和訊息佇列中,供下遊的線上搜尋引擎消費建索引等。該鏈路主要是用於時效性要求很高的場景,業務要求在源頭觸發增量以後,能在下遊搜尋引擎立馬查到最新的商品資訊。
這條鏈路目前的主要缺點是,所有套用不管時效性是分鐘級還是秒級,都統一用分布式 KV 系統做映像表及結果表,儲存成本偏高。
我們調研到,由於 Paimon 支持流讀流寫、批讀批寫、以及作為維表被點查,所以我們目前正在探索是否能用 Paimon 來替換該鏈路中的分布式 KV 儲存系統,來滿足一些時效性要求在分鐘級別以上的業務需求,以此來實作成本的下降。目前結果表正在落地過程中,而映像表則還在探索調研中,屬於未來規劃。
2.6 探索場景及預期收益五、搜尋全量拉庫鏈路
最後一個場景來源於剛剛那個場景的源頭。也就是使用者在做大全量時,需要去拉分庫分表的Mysql數據,目前是各套用都去拉,很明顯有幾個缺點:
第一,拉取分庫分表Mysql時,並行有上限限制,吞吐受限,而盲目加並行有拉掛庫的風險。
第二,公司有些核心庫只允許晚上拉取,這直接影響到業務叠代。
第三,每個套用都要分別去拉取 Mysql 表,無法做到共享。
我們調研引入 Paimon 來解決該場景下的效能瓶頸問題,先將 mysql 表數據全增量同步到一張 Paimon 表中,然後下遊全量來拉取這張 Paimon 表,增量可以根據時效性要求高低而決定選擇是走 DRC 原鏈路,還是消費 Paimon 的 changelog。目前正在落地過程中,預期收益其實很明顯,未來並行無上限了,釋放了吞吐和加快全量速度,全天24小時均可拉取,且能做到各套用共享 Paimon 表。
03
遇到的問題及解法
最後介紹下在落地過程中遇到的問題及解法。
3.1 問題一及解法、Snapshot Expire 導致批作業執行失敗
Snapshot使用者去拉取時,會存在過期的可能,且在過程中發現會有非常多的錯誤,原因很簡單,就是全使用者全量拉過期的Paimon表,檔被刪,導致作業全量失敗。
有以下三種解法:第一,將 Consume-id 從流場景擴充套件到批場景,但調研詳細程式碼的實作後,發現consumer ID有局限性。第二,統一加大 Snapshot Expire 時間,這樣,所有套用去拉Paimon表時都不會過期,但這樣有一個缺點就如圖中左邊所示,不同APP業務的邏輯不同,導致使用者每個作業的執行時長不同,有些作業只需讀Paimon表,中間沒有任何計算邏輯,直接落到KV儲存中,有些任務可能有DimJoin、UDTF等復雜的演算法邏輯,算特征、算label等等。線上的作業實踐下來,發現同一張mysql表,快則十多分鐘拉完,慢則好幾個小時。第三,各 App 分別建立 Tag,作業結束後,每個套用負責刪除自己建立的Tag。因為平台目前支持的上千個業務,每個業務都透過air flow去排程,使用者經常自己操作air flow,直接停止排程或重新clear排程節點,這樣會導致tag殘留,平台無法保證使用者自己手工操作而產生的一些錯誤的運維手段。
最後,和社群一起討論,最後討論的解法如下:首先 Tag 支持精細化 TTL,然後 App 不再 Scan Snapshot,而是 Scan Tag With TTL。其次,每個業務知道自己的業務邏輯,所以可以設定自己需要的 TTL。同時,該方法也可以給平台兜底,防止漏刪 Tag 的情況發生。另外,我們對老版本 Tag 及 Snapshot 都做了相容。具體的實作是新建 class Tag extends Snapshot,詳細的開發的程式碼及邏輯,可看下面的PIP。
3.2 問題二及解法、Schema Evolution
遇到的第二個問題是Schema Evolution, 這是DRC數據,即集團內解析Binlog,吐出增量的元件。我們把它寫作Paimon表時,希望使用者的源頭數據變更可以動態的讓Paimon表生效。目前這個功能我們調研到社群以及公司的做法,決定用複制表的方式來做。
(1) 不依賴 Flink-CDC 來實作 Schema Evloution 的原因:
① Flink-CDC 不支持集團 TDDL (基於 Java 語言的分布式資料庫系統,提供分表分庫等功能)
② Debezium 不支持集團用的 Mysql 版本
(2) 沒采用 Paimon 官網的 RichCdcSinkBuilder API 實作 Schema Evloution 的原因:
平台全部作業統一用 Flink SQL,暫無支持 DataStream 的計劃。
所以我們另辟蹊徑,用 Clone Table 來支持 Schema Evloution。簡述是源頭DRC的增量會同步到Paimon表。此時使用者有了Schema Evloution需求,然後呼叫社群的Clone Table將Paimon表1複制到Paimon表2,複制最後一個snapshot,這個Paimon表會執行Alter Schema操作。執行完後,再拉全量同步mysql表新加的欄位,只需d欄位,再回溯增量,把a、b、c、d,4個欄位回溯寫入Paimon表2。以此支持使用者Schema Evloution。目前這個方法也在嘗試落地中。
3.3 問題三及解法、Data Migration
第三個問題是數據遷移的情況,即業務遇到 DFS 集群裁撤,需要數據從 DFS 集群 A 遷移到 DFS 集群 B。還有是由於阿裏雲降價,所以有雲上使用者想將數據從別的雲廠商的雲遷移到 AliYun 上。針對這種數據遷移的場景,不可能重新把業務全量掃描。針對以上兩種情況的解法:第一,我們決定去社群開發複制表這種數據遷移工具,提供 Clone Table 這種 Data Migration 工具。第二,支持 Catalog、Database、Table、Snapshot、Tag 等 CloneType。
3.4 Clone Table 實作方法
最後詳細介紹一下Clone Table 實作方法。
具體實作上來看,由於要 copy 的檔有可能很多,所以我們沒有在客戶端單點執行 copy File,而是起分布式任務來執行。
作業拓撲大致分為四個 Operator:
第一個 Operator,用來根據使用者傳遞的參數查詢要 copy 的表,封裝為 Record 發給 第二個 Operator, 這個節點的設定主要是為了方便使用者一鍵 copy 多個表,整個 db或整個 catalog 下的所有表,而不是每 copy 一個表就要起一個作業。
第二個 Operator 在收到第一個節點的 Record 之後,會存取該表最後一個 Snapshot 對應的 Manifest 等相關後設資料檔,進而 pick 出相關數據、Schema 等檔。最後將檔資訊 Rebalance 給第三個節點。
第三個 Operator,負責分布式執行檔復制,主要是透過 InputStream + OutputStream 的方式對檔進行逐字節 Copy。然後對 Database 和 Table 求 MurmurHash 來重新 Partition 發往下遊最後一個節點。
最後一個 Operator,主要是負責一個表的檔復制完以後建立 snapshot 的 Hint 檔,代表 著該 Snapshot 可供下遊使用了。
由於使用者可能將Snapshot過去時間設定的時間很短,導致在執行Clone作業的過程中,Snapshot可能過期刪除, 導致Clone作業失敗。
因此為解決這個問題,我們會透過作業失敗的重新開機後,來比較檔的size和檔名,這樣就會過濾掉已經copy的檔,以此加速作業整體執行速度。從而第二次的Clone job就能順利完成,生產實踐後發現這種方法是完全可用的。對於該功能詳細的邏輯以及程式碼,可參考這個pip( https://cwiki.apache.org/confluence/display/PAIMON/PIP-18:+Introduce+clone+Action+and+Procedure )。
活動推薦
阿裏雲基於 Apache Flink 構建的企業級產品-即時計算 Flink 版現開啟活動:
新使用者復制下方連結或者掃描二維碼即可0元免費試用 Flink + Paimon
了解活動詳情: https://free.aliyun.com/?pipCode=sc
▼ 關註「 Apache Flink 」,獲取更多技術幹貨 ▼
點選「閱 讀原文 」跳轉 阿裏雲即時計算 Flink ~