當前位置: 妍妍網 > 資訊

新一代即時數據整合框架 Flink CDC 3.0 —— 核心技術架構解析

2024-03-18資訊

摘要: 本文整理自阿裏雲開源大數據平台呂宴全關於新一代即時數據整合框架 Flink CDC 3.0 的核心技術架構解析,內容主要分為以下四部份:

1. Flink CDC 演進歷程

2. Flink CDC 3.0 的架構設計

3. Flink CDC 3.0 的核心實作

4. 未來規劃

Tips: 點選 「閱讀原文」 線上觀看 FFA 2023 會後資料~

01

Flink CDC 演進歷程

Flink CDC 是基於資料庫日誌 CDC(Change Data Capture)技術的即時數據整合框架,配合 Flink 優秀的管道能力和豐富的上下遊生態,Flink CDC 可以高效實作海量數據的即時整合。

在 2020 年 7 月,Flink CDC 作為一個基於個人興趣孵化的計畫合並了第一個 commit,拉開了 Flink CDC 即時數據整合的篇章,讓使用者只建立一個簡單的 Flink SQL 作業就能完成 CDC 數據的同步、加工和分析。這個階段裏存在透過加鎖保證一致性,並且不支持水平拓展的問題,Flink CDC 參考 DBLog 論文 的設計,實作了無鎖並行讀取的全增量同步,完成了從 1.0 到 2.0 的升級。

Flink CDC 2.0 受到了廣大使用者的好評,不過在廣泛套用的過程中,也暴露出了一些有待提升的地方,需要提升的部份主要包括透過 SQL 定義表結構的方式,在上遊表發生加減列時需要手動調整作業;在整庫同步的場景下需要為每一張表建立一個作業,占用連線多,計算資源消耗大等。在社群使用者與開發者的共同努力下,Flink CDC 於 2023 年 12 月完成了 3.0 版本 [1] 的功能落地,提供了強大的端到端的全增量同步、表結構變更自動同步、整庫同步、分庫分表同步等高級特性,有效地解決了使用者的痛點。

02

Flink CDC 3.0 的架構設計

Flink CDC 3.0 的核心特性包括:

1. 端到端數據整合,使用者只需要配置一個 YAML 檔就能快速構建數據入湖入倉作業

2. 完整的數據同步,全量讀取結束自動同步增量數據,並且上遊表結構變更自動套用到下遊

3. 一個作業例項支持讀取和寫入多表,占用資料庫連線少,增量讀取階段自動關閉空閑讀取器,節省計算資源


Flink CDC 3.0 的整體架構自頂而下分為 4 層:

1. Flink CDC API:面向終端使用者的 API 層,使用者使用 YAML 格式配置數據同步流水線,使用 Flink CDC CLI 送出任務

2. Flink CDC Connect:對接外部系統的連結器層,透過對 Flink 與現有 Flink CDC source 進行封裝實作對外部系統同步數據的讀取和寫入

3. Flink CDC Composer:同步任務的構建層,將使用者的同步任務轉譯為 Flink DataStream 作業

4. Flink CDC Runtime:執行時層,根據數據同步場景高度客製 Flink 算子,實作 schema 變更、路由、變換等高級功能


03

Flink CDC 3.0 的核心實作


3.1 數據抽象

Event 是 Flink CDC 3.0 內部進行數據處理及傳輸的數據結構介面,其作用類似於 Flink SQL 中的 RowData 介面。Event 目前所有的實作如下圖所示。


■ 3.1.1 ChangeEvent

ChangeEvent 介面代表著在一張表上發生過的變更事件,實作類包括數據變更事件(即 DataChangeEvent 類)和表結構變更事件(即繼承 SchemaChangeEvent 介面的類)兩種:DataChangeEvent 裏保存了完整的數據變更資訊,即包含了變更前(before)和變更後(after)每條記錄的欄位值;SchemaChangeEvent 有增加列、刪除列、修改列型別等實作。

Flink CDC 把表結構變更資訊當成一種事件流轉,這樣能夠避免在數據變更事件裏保存型別資訊,需要從 DataChangeEvent 讀取數據的節點會基於 SchemaChangeEvent 維護表結構資訊。Flink CDC 還實作了自己的序列化器,每條記錄使用二進制的方式儲存在 Flink 的 MemorySegment 中,透過這種底層結構的最佳化設計,有效提高在不同節點之間數據流轉的效率。

■ 3.1.2 FlushEvent

FlushEvent 是包含數據刷寫控制邏輯的特殊事件。當發生表結構變更事件後,之前的數據可能尚未處理完,鏈路上會並存兩種不同表結構的數據。大部份資料庫不允許直接在同一批次中混合處理兩種表格式的數據,在處理新版本的數據之前,必須確保舊版本的數據已全部完成刷寫操作。FlushEvent 作用是間隔這兩種數據,在 Sink 端接受到 FlushEvent 後,就需要將之前緩存的數據全部刷寫出去。

3.2 算子編排

FlinkCDC 根據數據整合的場景,深度客製了 Flink DataStream 的算子鏈路,目前制定的數據處理鏈路如下圖所示:

下面對這些模組的具體實作做進一步的介紹。

■ 3. 2.1 Source

Source 模組負責生產在鏈路中流轉的變更事件。FlinkCDC 2.0 提供了強大的全增量同步、並行讀取的能力,已經能夠生成包含各類變更事件資訊的 SourceRecord 物件,在此基礎上,只需要再實作一個將 SourceRecord 解析成前面介紹的各種表變更事件的 DebeziumDeserializationSchema 自訂轉換器,就能完成 FlinkCDC 3.0 資料來源的接入。

在第一次啟動時,Source 模組需要先拉取表結構資訊,並生成 CreateTableEvent 發送到下遊中,這是為了讓下遊節點能夠解析 DataChangeEvent。

public void enterAlterByDropColumn(MySqlParser.AlterByDropColumnContext ctx) { String removedColName = parser.parseName(ctx.uid()); changes.add(new DropColumnEvent(currentTable, Collections.singletonList(removedColName))); super.enterAlterByDropColumn(ctx);}

■ 3. 2.2 Transform

在目前版本暫未實作。

■ 3. 2.3 Schema


在發生表結構變更事件以後,Schema 模組負責阻塞上遊數據的繼續發放,直到舊版本格式數據刷寫完畢。這個邏輯需要透過 FlushEvent 來傳遞,由於下遊可能存在多個 Sink,需要透過執行在 JobManager 上的一個 OperatorCoordinator 來進行管控,這個 OperatorCoordinator 稱為 SchemaRegistry。

具體來說,處理表結構變更的流程如下圖所示:

  1. 在程式啟動時,所有的 Sink Operator 都向 SchemaRegistry 註冊,SchemaRegistry 記錄 Writer 的數量。

  2. 在收到來自 Source 的 SchemaChangeEvent 時,SchemaOperator 發送一個包含本次表結構變更事件的 SchemaChangeRequest 給 SchemaRegistry,讓 SchemaRegistry 緩存這個 SchemaChangeEvent。

  3. SchemaOperator 下發一個 FlushEvent 給所有的 Sink Operator,Sink Operator 接收到 FlushEvent 後刷寫數據到外部系統,並且發送 FlushSuccessEvent 向 SchemaRegistry 進行匯報。SchemaRegistry 據此統計響應的 Writer 數量。

  4. SchemaOperator 下發 SchemaChangeEvent 給所有的 Sink Operator,讓 Sink Operator 更新對應表的序列化器。

  5. SchemaOperator 發送一個 ReleaseUpstreamRequest 給 SchemaRegistry,並且開始阻塞自身,不再處理任何變更事件,直到收到 SchemaRegistry 的回應。

  6. SchemaRegistry 接收到 FlushSuccessEvent 以後,會和第 1 步中註冊的 Sink Operator 進行比較,如果所有的 Sink Operator 都已刷寫完畢,則開始將第 2 步中受到的 SchemaChangeEvent 套用到外部系統中,並且對第 4 步接收到的 ReleaseUpstreamRequest 進行回應。這樣,SchemaOperator 就可以開始繼續傳遞新的數據變更時和表結構變更事件了。

■ 3. 2.4 Route


Route 模組提供了表名對映的能力。透過為每一個源表中的數據設定其寫入的目標表,透過一對一以及多對一的對映配置,我們能夠實作整庫同步和簡單的分庫分表同步功能。

Route 模組基於 Flink 的 RichMapFunction 實作,允許透過 source-table 指定一個正規表式規則,將一系列符合正規表式規則的表名,替換到另外一個由 sink-table 指定的表名。RouteFunction 的核心程式碼如下:

public Event map(Event event) throws Exception { ChangeEvent changeEvent = (ChangeEvent) event; TableId tableId = changeEvent.tableId(); for (Tuple2<Selectors, TableId> route : routes) { Selectors selectors = route.f0; TableId replaceBy = route.f1; if (selectors.isMatch(tableId)) { return recreateChangeEvent(changeEvent, replaceBy); } } return event;}


■ 3. 2.5 Partition

在數據同步場景,數據的生產和消費的速率常常是不匹配的,使用者希望能夠透過增加 Sink 的並行度來提高數據處理的速率。Partition 模組負責分發事件到不同的 Sink 中。

在 Partition 階段,數據變更事件按照表名和主鍵作為哈希鍵,保證同一張表中相同主鍵的數據不會因數據分發出現亂序的情況。哈希鍵的計算方式如下所示:

public Integer apply(DataChangeEvent event) { List<Object> objectsToHash = new ArrayList<>(); // Table ID TableId tableId = event.tableId(); Optional.ofNullable(tableId.getNamespace()).ifPresent(objectsToHash::add); Optional.ofNullable(tableId.getSchemaName()).ifPresent(objectsToHash::add); objectsToHash.add(tableId.getTableName()); // Primary key RecordData data = event.op().equals(OperationType.DELETE) ? event.before() : event.after(); for (RecordData.FieldGetter primaryKeyGetter : primaryKeyGetters) { objectsToHash.add(primaryKeyGetter.getFieldOrNull(data)); } // Calculate hash return (Objects.hash(objectsToHash.toArray()) * 31) & 0x7FFFFFFF;}

同時由於 Sink 模組需要維護表結構資訊,對於表結構變更事件,需要廣播到每一個並行裏。對於控制數據刷寫的 FlushEvent,也需要廣播到每一個下遊的每一個通道裏。

其程式碼實作如下:

public void processElement(StreamRecord<Event> element) throws Exception { Event event = element.getValue(); if (event instanceof SchemaChangeEvent) { // Update hash function TableId tableId = ((SchemaChangeEvent) event).tableId(); cachedHashFunctions.put(tableId, recreateHashFunction(tableId)); // Broadcast SchemaChangeEvent broadcastEvent(event); } else if (event instanceof FlushEvent) { // Broadcast FlushEvent broadcastEvent(event); } else if (event instanceof DataChangeEvent) { // Partition DataChangeEvent by table ID and primary keys partitionBy(((DataChangeEvent) event)); }}


■ 3. 2.6 Sink


在 Sink 模組,需要將數據寫出到外部系統中,並且將表結構變更套用到外部系統中。FlinkCDC 的 DataSink API 提供了 EventSinkProvider 和 MetaDataApplier 介面去完成這兩件事情。


EventSinkProvider 用於將表數據變更套用到外部系統中。EventSinkProvider 要求提供一個基於 Flink SinkFunction 或者是 Flink Sink API 的實作,並且具備寫出到多個表的能力。以 Flink Sink API 為例,SinkWriter 需要從 DataChangeEvent 中取出變更數據,並寫出到對應的表中。當處理到 SchemaChangeEvent 時, SinkWriter 更新記憶體中保存的表結構資訊。當處理到 FlushEvent 時, Sink Operator 會呼叫 SinkWriter 的 flush 方法將數據刷寫出去。

MetaDataApplier 用於將表結構變更套用到外部系統中。在 SchemaRegistry 接受到所有的 Sink 算子處理完 FlushEvent 的通知後,由 SchemaRegistry 負責呼叫 MetaDataApplier 的 applySchemaChange方法去套用表結構變更事件。考慮到任務重新開機的情況,MetaDataApplier 需要支持對一個表結構變更事件冪等處理。


04

未來規劃

Flink CDC 社群致力於持續深化數據同步與處理的全面性和靈活性。在 Flink CDC 3.0 裏,針對數據整合場景客製了高效的數據格式和算子編排,實作了對表結構變更同步和整庫同步的支持。基於未來的演進規劃,社會將會著重關註完善 Transform 模組的功能,以滿足使用者對數據同步過程中的深度客製需求。計劃在下一個大版本中,支持表結構動態調整,包括裁剪不必要的列、添加計算列等功能,以及提供數據過濾能力,讓使用者能夠在同步過程中一站式完成復雜的數據轉換任務。

此外,在連結器生態建設方面,社群正著手接入 Kafka、PostgreSQL 等業界主流的資料來源,以及 Paimon、Iceberg 等先進的湖倉儲存系統。進一步拓寬 Flink CDC 的上下遊數據整合範圍,推動上下遊元件的深度融合。


文章超連結:

[1] https://github.com/apache/flink-cdc/releases/tag/release-3.0.0

Flink Forward Asia 2023

本屆 Flink Forward Asia 更多精彩內容,可點選 「閱 讀原文」 或掃描圖片二維碼觀看全部議題的視訊回放及 FFA 2023 峰會資料!

Apache Flink 公眾號 ,回復 FFA 2023 即可獲取 FFA 2023 會後資料檢視地址

掃碼開啟 Flink學習之旅

▼ 關註「 Apache Flink 」,獲取更多技術幹貨

點選「閱 讀原文 」,線上觀看 FFA 2023 會後資料~