當前位置: 妍妍網 > 資訊

如何從0-1使用 Apache Arrow 構建新數據系統

2024-03-07資訊

導讀 為了應對大規模數據處理中的效能和互操作性挑戰。Arrow 致力於提供一種高效的跨平台數據交換機制,使數據能夠在不同作業系統和程式語言之間迅速、一致地流動。其設計註重效能最佳化,並得到開源社群廣泛的支持,成為許多數據處理計畫的核心元件,推動了數據科學和分析領域的創新。本文將分享如何使用 Apache Arrow 來構建一個完整的數據系統

本次介紹會圍繞下面四點展開:

1. 為什麽要構建新的數據系統

2. 什麽是 Apache Arrow,為什麽是 Apache Arrow

3. 如何使用 Apache Arrow 構建數據系統

4. 一些 Tips

分享嘉賓| 李晨曦 上海炎凰數據科技有限公司 研發工程師

編輯整理 | 張陽

內容校對|李瑤

出品社群|DataFun

01

為什麽要構建新的數據系統

首先需要回答的問題就是為什麽要構建一個新的數據系統。

1. One Size Fits All or Not?

據圖靈獎得主 Stonebraker 在 2005 年的一篇論文中指出,"One Size Fits All" 的概念已經過時。這意味著使用一個通用的數據系統(如 Oracle 或 MySQL)無法完全滿足所有問題的需求。因此,許多領域都需要特定的專用系統。一個明顯的例子是交易場景中的 OLTP(線上事務處理)和分析場景中的 OLAP(線上分析處理)的分離。此外,還有眾多由於數據處理模型的不同而產生的新系統,如適合即時計算的流處理和更適應雲端運算的 NoSQL 等等。然而有些人持不同意見,他們嘗試在一個系統中解決所有問題。例如,試圖同時處理 OLTP 和 OLAP 的 HTAP,還有流批一體、NewSQL 等新概念。無論是否支持"One Size Fits All"的觀點,新的想法和概念總是不斷湧現, 所以也需要不斷構建新的數據系統。

2. 資料庫的黃金時代

當前,我們正處於一個數據系統的黃金時代。有許多新的資料庫正在不斷出現。根據 DBDB( Database of Databases )的統計,自 2020 年以來已經出現了 124 個新的資料庫。這意味著幾乎每周都會有一個新的數據系統問世。

有了這麽多數據系統,我們為什麽還需要構造一個新的數據系統,這是否是重復造輪子呢?

3. 讀時建模

我們面對的場景是通用的日誌處理,這是一項頗具挑戰的任務,因為不同廠商,甚至同一廠商不同套用的日誌格式各不相同,而且一個尚處於開發中的系統的日誌格式也會經常變動。常見的方法之一是寫時建模,即透過在資料庫中預先定義表結構或者模式來儲存日誌數據。然後,透過一些 ETL 工具,將數據轉化為所需的表格型別。然而,當面對不同型別的數據時,例如日誌來自 Nginx、Apache 和 Windows IIS 等不同系統,就需要維護多個 ETL 流程來處理它們。另一種解決方案是讀時建模,即先將原始數據儲存起來,不進行 ETL 處理,在實際使用時對欄位進行抽取。例如圖中查詢 1,就是從這三類日誌數據中抽取出各自的 method、time 和 client 欄位,並即時構建一個新的結果表格。這種方法更適合處理多樣化的、不固定的數據模式。有興趣可以掃描上圖中的二維碼,它提供了一個有關讀時建模更加詳細的分享。

讀時建模的關鍵是有一個支持動態數據表模式的數據查詢引擎,才能夠在沒有預先定義數據模式的情況下查詢各種型別的數據。

下面介紹一下動態數據表模式可能是什麽樣子的。在我們收集的數據中,可能有不同格式的日誌。例如,有以鍵值對表示的日誌,如綠色框所示;也可能有以 JSON 格式表示的日誌,如紅色框所示。在支持動態數據表模式的系統中,可以在同一數據集中同時查詢到這兩種型別的日誌。

接下來,我們可以透過欄位提取的方式對這兩類數據進行格式化處理,從而得到一個合並在一起的二維表結構。

最後,可以使用一些函式對數據進行歸一化和合並。這樣做可以得到一個直觀明確的數據表,便於後續機器學習系統和報表系統進行處理。在這個過程中,我們可能會遇到同一個欄位有不同型別的情況。例如,某個同名欄位在某類日誌中可能以整數形式儲存,而在另一個日誌中可能以浮點數形式儲存;或者在匯入數據時沒有進行適當處理,導致某些欄位被匯入為字串型別。因此,同一個欄位中可能同時包含多種不同型別的數據。

構建資料庫是一項復雜的工程,其中包含許多子任務。可以選擇改造一個開源計畫,也可以從頭開始構建。現有計畫要麽無法滿足讀時建模的需求,要麽架構相對陳舊,不適合現代雲端運算服務的架構。而且改造現有系統,難度也比較大,因為具有固定的模式是很多系統的前置條件。因此,我們決定從頭開始構建這個系統,第一步就是定義記憶體中的數據格式。

02

什麽是 Apache Arrow,為什麽是 Apache Arrow

下面介紹一下什麽是 Apache Arrow,我們為什麽選擇了 Apache Arrow,以及我們要用它來做一些什麽事情。

1. 記憶體數據格式

記憶體中的數據可以根據分布的不同,分為行式儲存和列式儲存。在行式儲存中,數據按照行的方式排列,類似於 C 或 C++ 中的二維陣列,每一行連續儲存,將相關的資訊放在一起。而在列式儲存中,同一列的數據放在一起,構成一個長陣列。圖中是包含三個內容(session_id,timestamp 和 ip)的二維表,左邊是它在行式儲存中的記憶體表示,右邊是它在列式儲存中的記憶體表示。

在事務處理的場景中,行式儲存是非常自然的選擇。因為大部份操作都是以一行為單位進行的,可以使用 session_id 構建 B 樹或 B+ 樹等索引,透過這個索引很快定位到 session_id 對應的具體的某一行,然後進行刪除或修改操作。

但在分析型場景中,使用者通常不會使用所有內容,而是只會對某些內容進行運算。例如,如果想知道表中有多少個不同的 IP 地址時,只需要關心 ip 這一列,而不關心 session_id 或 timestamp。在這種情況下,列式儲存就更為適合,因為只需讀取 ip 這一列數據,從而避免讀取其他不必要數據的開銷,減少了 IO 和記憶體的消耗。而且,列式儲存的數據在磁盤或記憶體中都是連續儲存的,可以更好保證數據的局部性,從而充分利用現代 CPU 的緩存和 SIMD(單指令多數據)向量化等運算機制。另外,相同內容的數據放在一起可以更好地進行壓縮。例如,可以使用字典或其他高效的壓縮演算法將這些具有相同內容的數據放在一起進行壓縮。對於日誌處理這種分析型的套用來說,列儲存是更合適的選擇。

2. Apache Arrow 的優勢

那麽我們還要不要繼續造輪子?還是定義一套自己的列式儲存記憶體格式?

現在許多系統都定義了自己的記憶體數據格式,這帶來了數據轉換的問題。試想,如果我們想在 pandas 中呼叫 Spark 進行數據處理,那麽要從 pandas 基於的 Python 環境轉換到 Spark 基於的 Java 環境,其中需要經過 Py4J、JVM 和 Spark 三層數據轉換。同樣地,如果我們定義了自己的記憶體格式,也會面臨類似的數據轉換問題,特別是在需要與其他系統進行互操作時。幸運的是,借助 Apache Arrow,可以在 PySpark 啟用 Arrow 格式,就可以與 pandas 直接共享記憶體,實作記憶體交換。

Arrow 到底是什麽呢?Arrow 本身並不是一個數據儲存或執行引擎,而是一種高效能、記憶體中的列式儲存標準。它與具體的語言或應用程式無關,無論是用 C++、Python 還是 Rust 等語言,都可以進行跨語言跨系統的互操作。因為在任何環境中,Arrow 數據的記憶體表示是完全一致的,所以在進行跨系統傳輸時,不需要進行記憶體拷貝、序列化或反序列化等工作,實作了零拷貝。Arrow 沒有發明新的數據儲存方式,比如浮點數仍然按 IEEE 754 標準進行表示,但 Arrow 在標準化方面做了很多工作,例如如何表示空值 NULL、如何處理時間戳以及時區的表示等等。這些細節看起來很微小,但它們的重要性是在任何平台和任何語言下的標準化。因此,一個全新的數據引擎也無需重新發明這些記憶體格式。

使用 Arrow 後,可以實作在不同系統之間共享記憶體,從而實作零拷貝。這意味著我們不是避免了繁重的數據復制和轉換,而是直接共享記憶體中的數據,使得數據處理過程更加高效。

Arrow 還有以下幾個好處。

首先,Arrow 原生實作了七種程式語言,並在此基礎上實作了更多語言的繫結,包括 Rust、C++、C、Python 等,基本覆蓋了主流的程式語言。並且得到大量數據系統的支持,如 PyTorch、Spark、ClickHouse 和 DuckDB 等,在這些系統中,數據可以采用 Arrow 格式進行輸出。

其次,Arrow 的效能表現不錯。有一個 Benchmark 對比了基於 Arrow 的數據引擎 DataFusion、Polars 與 DuckDB 的效能,雖然前兩者稍慢於 DuckDB,但仍然是可接受的成績。盡管看起來 Arrow 在每個小功能點上沒有什麽創新,但綜合起來,它提供了一個相對完整的解決方案,並且模組化做得非常好,API 對於系統的侵入性也較小。

此外,Arrow 的擴充套件性較強,比如可以擴充套件 Arrow 的型別,將機器學習中的一些型別如 tensor 在 Arrow 中實作;也可以使用 API 擴充套件自訂的計算函式。

總而言之,Arrow 的主要貢獻在於為列式儲存提供了一個標準和生態系,因此對開發者和使用者來說,它可以作為一個現代數據技術棧的標準和基礎。

Arrow 擁有非常活躍的開源社群。除了 Arrow Rust 等相關計畫外,截至 2023 年 10 月,Arrow 本身已經得到了超過一萬兩千個 GitHub 的 star。上圖展示了 Arrow 在最近一個月的活躍程度,包括 PR、issue 以及貢獻者等方面的數據,足以看到 Arrow 是一個非常活躍的計畫,並不需要擔心它的持續性和穩定性,可以預期 Arrow 能夠長期存在並會持續完善。

Arrow 透過幫助標準化記憶體格式,為構建數據系統提供了一個起點。然而,這只是開始,還有許多其他任務有待完成。例如,需要聚合、排序等更多的算子;需要開發客戶端 API 和數據交換功能;需要支持新的硬體,尤其是在信創領域,需要考慮對 ARM 指令集和國產硬體的相容。

如此一來,構建一個數據系統變得非常復雜。上述工作也只能初步構建一個勉強滿足小規模使用的數據系統,而完成這個階段可能需要 10 年甚至更長時間。如果我們想要構建一個更大規模、分布式和高可用的數據系統,所需要的時間可能是前一階段的幾倍甚至更長。

按照人月神話的理論,投入更多的人力並不能線性地減少完成時間。因此,構建新的資料庫是一個非常昂貴的事情,需要巨大的時間、人力和經濟成本,這也是為什麽新的資料庫創業公司需要籌集大量資金和足夠的時間。

雖然現在是資料庫的黃金時代,但也是最具挑戰性的時期。如果新的想法不能迅速實作,很難在市場上生存。好在有 Arrow。Arrow 不僅提供了基本的記憶體數據格式和模型,還提供了一些算子和計算功能,以及持久化、數據交換和跨平台執行等模組。透過使用 Arrow,能夠大大節省構建數據系統的時間和開發成本。

03

使用 Apache Arrow 構建數據系統

下面介紹 Arrow 如何助力數據系統的開發以及如何使用 Arrow 構建一個數據系統。

1. 數據系統執行流程

一個數據系統的執行流程通常包括以下幾個步驟。

首先,當系統接收到使用者的查詢請求時,會利用儲存和索引來獲取相關資源。

接下來,系統會根據使用者查詢生成一個邏輯計劃,該計劃表示了執行查詢所需的關系代數和操作的抽象。

然後,邏輯計劃會在經過最佳化之後轉為物理計劃,即如何真正執行查詢的計劃。

之後,在執行引擎中,系統會執行具體的操作,如運算式執行、聚合、排序和物化檢視等算子。

最後,系統將結果保存到使用者指定的路徑或傳輸到使用者的客戶端。

2. 數據儲存

我們的數據儲存模型是基於事件的,即基本的儲存單元抽象成了事件,類似於日誌中的每一條日誌。每個事件都有時間戳、原始資訊和其他基本內容,比如主機名、數據型別等。這些都是事件的元資訊定義,我們將其抽象出來,並進行索引。

對於日誌中的其他內容,我們將其作為原始數據儲存。底層儲存使用了 Parquet 這一列式持久化儲存標準,其對 Arrow 有很好的支持。Parquet 還會儲存一些後設資料,比如每列的儲存位置和一些統計資訊,如最大值和最小值等。這樣就可以支持一些查詢的下推操作。如果數據中儲存了多列,但只想存取某一列,可以直接定位到該列的儲存位置,而不需要將整個檔都讀入記憶體中。

然而,Parquet 需要預先給定數據的模式,即儲存數據時需要先定義一個模式,無法直接支持動態模式或者無模式數據。為了支持動態模式的數據,在 Parquet 的基礎上我們進行了一些擴充套件,這樣就可以在 Arrow 和 Parquet 的基礎上進行簡單的修改,從而完成數據儲存。

數據儲存之後,需要讀入到記憶體中。每個數據在記憶體中會以 Arrow 定義的 Record Batch 形式存在。這種表示方法用於描述一組數據,並由其 Schema 指定數據的模式。

例如,有一列包含 session_id 欄位的 Int64 型別數據,一列包含 datetime 欄位的String 型別數據,還有一列包含 source_ip 欄位的 String 型別數據,Schema 中定義並儲存了這些欄位的型別,而具體的數據儲存在 Arrow Array 中,不同 Record Batch 的 Schema 是可以變動的。例如,在下一個 Record Batch 中,session_id 欄位可能變成 String 型別,而 time 欄位可能變成 Timestamp 型別。

透過不同模式 Record Batch 的組合,就可以獲得不同模式的數據。這樣,就實作了從數據儲存到記憶體表示的對映關系。

3. 索引/程式碼/硬體資源

Arrow 並不是一個完整的查詢引擎。它缺少索引和使用者自訂函式等功能的支持,在我們的系統中,我們使用了時間戳索引和倒排索引,這樣使用者可以透過關鍵字和時間來定位到日誌的位置。至於使用者自訂函式方面,我們向 Arrow 送出了一系列 PR,使其能夠支持使用者自訂函式。Arrow 在硬體資源方面有一些簡單的實作,比如記憶體管理和執行緒池。但是,如果想要進行更細粒度的管理,例如限制每個查詢的記憶體使用或設定不同查詢任務的優先級,仍然需要自己開發。所以,從這個角度來看,Arrow 在這方面還有繼續完善的空間。

4. SQL 解析/計劃生成/執行與傳輸

Arrow 也沒有提供將使用者的 SQL 語句解析成抽象語法樹的功能,但是我們可以使用一些開源工具,比如 ANTLR 和 Calcite,將 SQL 語句轉換成抽象語法樹。我們選擇使用 ANTLR 而不是 Calcite,是因為 Calcite 過於復雜且基於固定數據模式的假設,在處理動態模式時不太適用。

之後可以將抽象語法樹進一步轉換成邏輯計劃,邏輯計劃描述了數據執行的具體操作。在進行查詢最佳化時,我們可以調整邏輯計劃來提高效能。

例如想要找到特定 ip 的最新存取時間,首先需要從數據集中讀取相關數據。然後,根據指定的條件(這裏是 ip 等於某個特定字串)進行數據過濾,並將需要的數據篩選出來。接下來,對過濾後的數據進行聚合運算計算其時間的最大值。

在此過程中,可以進行一些最佳化,其中一個常見的最佳化是下推操作。透過下推,可以將讀取 ip 和 _time 兩個欄位的操作下推到表掃描階段,從而每次讀取數據時都跳過其他不必要的欄位。此外,我們還可以將條件運算式(例如,ip 等於特定字串)嵌入到操作中,這樣每次讀取時只會讀取與我們需要的 ip 相匹配的數據。

透過在表掃描階段進行這些最佳化,可以節省大量的 IO 開銷和記憶體資源,提高查詢效能。

邏輯計劃是一個抽象層,不包含在 Arrow 中,因此需要自己編寫邏輯計劃的程式碼。

相對而言,邏輯計劃相對簡單,因為大多數 SQL 查詢語言及關系代數和邏輯計劃可以相互對應。物理計劃則相對復雜,因為它與底層機器有關,需要處理執行緒、並行和各種硬體。

最近,Arrow 提供了一個查詢執行引擎——Acero,可以提供很大幫助。Acero 是一個基於推播(Push)的引擎,其最小執行單元是 execution node,它的程式碼非常清晰,並且具有清晰的 API 介面,包括如何處理其上遊輸入和下遊輸出,如何處理接收到的數據和停止接收數據,以及暫停和繼續執行等功能。

如果需要擴充套件,只需按照 API 定義自己的節點,並在 Acero 中註冊即可,就可以借助 Acero 進行計算的排程和執行,而不需要修改 Arrow 程式碼。

我們註意到 Arrow 在處理動態數據模式方面存在一些限制,因此對 Arrow 進行了一些擴充套件。例如,添加了支持動態模式的匯聚節點 Schemaless SinkNode,它可以消除數據模式方面的一些限制。透過使用這個節點,可以處理沒有嚴格定義模式的數據。這允許我們更靈活地處理各種數據型別,而不僅僅限於特定的固定模式。

在這個過程中,我們得到了一個支持動態模式物理計劃的執行節點。此外,Arrow 的另一個限制在於執行節點建立時就需要預先定義數據的輸出模式。為了克服這個限制,我們進行了一些改造,將數據輸出模式延遲到實際輸出時動態生成。這樣,就能更好地支持動態模式的數據引擎。另外,我們也對 Arrow 提供的一些聚合函式和純量函式進行了動態模式的擴充套件。

這樣就可以使用 Arrow 來處理動態模式數據,並使用它執行並排程查詢的。目前,Acero 還不支持物化檢視,但對於大規模數據來說,物化檢視非常重要。物化檢視可以預先計算並且儲存一些耗時或復雜場景的結果,在查詢時可以快速存取和利用這些預先計算的結果。同樣,我們對 Acero 進行了一些擴充套件,添加了中間狀態的處理方式,以便在 Arrow 中實作物化檢視,我們也計劃將這些一系列擴充套件送出給 Arrow。

最後,當查詢結束時,需要進行數據傳輸,可以是傳輸給使用者的客戶端,也可以是傳輸到使用者前端進行顯示。如果直接使用 ODBC 或者 JDBC,因為 ODBC 和 JDBC 本質上只能處理行式數據,行列的轉換無法避免,我們可以使用 Arrow Flight 和 Arrow Flight SQL 來規避這個問題。

Arrow Flight 是 Arrow 提供的基於 gRPC 或者 REST 的列式數據交換框架,無需復雜的開發,直接使用其 API 即可實作列式數據傳輸,而避免了數據轉換。在 Arrow Flight 之上得到了與 SQL 資料庫互動的協定 Arrow Flight SQL。這樣我們就可以利用與 SQL 相容的現有客戶端直接進行查詢。

將來,Arrow 還將推出一個類似於 Arrow 自己的 JDBC 或者 ODBC 的工具,稱為 Arrow ADBC。這樣,原本與 ODBC 和 JDBC 相容的資料庫客戶端將無需或只需極少修改程式碼,就可以直接與 Arrow 進行通訊。

Arrow 幫助我們實作了數據儲存、物理計劃和傳輸這三個方面的重要功能。如果在自己的數據系統實作中不是動態模式的,而僅僅是針對特定領域開發固定模式的新系統,那麽只需構建索引、解析用以查詢的 SQL 或 Dataframe API,並轉換成邏輯計劃,然後使用 Calcite 的最佳化器將其轉換為 Arrow 的物理計劃,最後直接使用 Arrow 執行即可,需要構建的東西非常少。

04

一些 Tips

我們在 Arrow 的使用中積累了一些經驗和教訓。作為一個新的數據產品或數據產品的底座,Arrow 還存在不少問題。

1. 踩過的一些坑

首先更新頻繁是 Arrow 社群活躍的體現,意味著會有新的功能和改進,但同時它的介面還是不夠完善,我們建議盡量少修改原始程式碼,而是向 Arrow 社群貢獻改進並多做擴充套件。Arrow 程式碼庫可以分為三個層次:

  • Core 層: 提供數據型別表示,這一層非常穩定,新版本可以完全保證和之前版本的相容。

  • Compute 層: 提供計算算子,相對穩定但可能有一些 bug,當使用一些比較高級的指令集如 AVX512 指令集可能會有一些記憶體對齊的問題。

  • Acero 層: 是最新的執行引擎,不夠穩定而更適合開發測試。

  • Arrow 對於復雜型別的處理還不夠完備,比如 Union、List、JSON 等,需要額外的程式碼實作。另外,Arrow 始於 2016 年,仍需要時間和大規模數據的驗證。各個相關計畫(包括 DuckDB 等)主要使用的是 Core 部份,對於 Arrow 的 Compute 和 Acero 等部份,仍然需要在更大規模的數據上進行進一步的驗證。我們在開發過程中遇到了一些問題已經修復並向 Arrow 社群送出了改進。目前看來,Arrow 處於相對穩定的狀態。

    2. DATA FUSION

    最後,對於追求安全和現代化的考慮,我們建議使用 Arrow Rust 的實作。而且 Arrow 在 Rust 實作的基礎上推出了一個完整的數據引擎 DataFusion,它提供了比 Arrow 更強大的功能。DataFusion 在 Arrow 記憶體格式的基礎上提供了 SQL 解析和查詢計劃等功能,也支持子查詢和其他高級函式。此外,DataFusion 也繼承了 Arrow 出色的模組化和可延伸的程式碼風格,基於 DataFusion 構建新的數據引擎可以減少開發所需的時間,同時也能在開源社群獲得更多支持。

    以上就是本次分享的內容,謝謝大家。


    分享嘉賓

    INTRODUCTION


    李晨曦

    上海炎凰數據科技有限公司

    研發工程師

    畢業於南京大學,專註於大數據處理分析系統的研發。目前致力於讀時建模數據系統查詢引擎的開發工作

    往期推薦

    點個 在看 你最好看