背景
最近在閱讀查詢最佳化器的論文,發現System R中對於Join操作的定義一般分為了兩種,即巢狀迴圈、排序-合並聯接。
考慮到我的領域是在處理分庫分表或者其他的分區模式,這讓我開始不由得聯想我們怎麽在分布式場景套用這個Join邏輯,對於兩個不同柯瑞面的不同表我們是沒有辦法直接進行Join操作的。查閱資料後發現原來早有定義,即分布式聯接演算法。
分布式聯接演算法
跨界點處理數據即分布式聯接演算法,常見的有四種模型:
Shuffle Join
(洗牌聯接)、
Broadcast Join
(廣播聯接)、
MapReduce Join
(MapReduce聯接)、
Sort-Merge Join
(排序-合並聯接)。
接下來將進行逐一了解與分析,以便後續開發的套用。
Shuffle Join(洗牌聯接)
先上原理解釋:
「
Shuffle Join
的核心思想是將來自不同節點的數據重新分發(洗牌),使得可以聯接的數據行最終位於同一個節點上。
「
通常,對於要聯接的兩個表,會對聯接鍵套用相同的哈希函式,哈希函式的結果決定了數據行應該被發送到哪個節點。這樣,所有具有相同 哈希值 的行都會被送到同一個節點,然後在該節點上執行聯接操作。
可能解釋完還是有點模糊,舉個例子,有兩張表,分別以id欄位進行分庫操作,且哈希演算法相同(為了簡單,這裏只介紹分庫場景,分庫分表同理。演算法有很多種,這裏舉例是hash演算法),那麽這兩張表的分片或授權以在同一個物理庫中,這樣我們不需要做大表維度的處理,我們可以直接下推Join操作到對應的物理庫操作即可。
在
ShardingSphere
中,這種場景類似於繫結表的定義,如果兩張表的演算法相同,可以直接配置繫結表的關系,進行相同演算法的連線查詢,避免復雜的
笛卡爾積
。
這樣做的好處是可以盡量下推到資料庫操作,在中介軟體層面我們可以做並列處理,適合大規模的數據操作。
但是,這很理想,有多少表會采用相同演算法處理呢。
Broadcast Join(廣播聯接)
先上原理解釋:
「
當一個表的大小相對較小時,可以將這個小表的全部數據廣播到所有包含另一個表數據的節點上。
「
每個節點上都有小表的完整副本,因此可以獨立地與原生的大表數據進行聯接操作,而不需要跨節點通訊。
舉個例子,有一張非常小的表A,還有一張按照ID分片的表B,我們可以在每一個物理庫中復制一份表A,這樣我們的Join操作就可以直接下推到每一個資料庫操作了。
這種情況比Shuffle Join甚至還有效能高效,這種類似於
ShardingSphere
中的廣播表的定義,其存在類似於字典表,在每一個資料庫都同時存在一份,每次寫入會同步到多個節點。
這種操作的好處顯而易見,不僅支持並列操作而且效能極佳。
但是缺點也顯而易見,如果小表不夠小數據冗余不說,廣播可能會消耗大量的網路頻寬和資源。
MapReduce Join(MapReduce聯接)
先上原理解釋:
MapReduce是一種編程模型,用於處理和生成大數據集,其中的聯接操作可以分為兩個階段:Map階段和Reduce階段。
Map階段:
每個節點讀取其數據分片,並對需要聯接的鍵值對套用一個對映函式,生成中間鍵值對。
Reduce階段:
中間鍵值對會根據鍵進行排序(在某些實作中排序發生在Shuffle階段)和分組,然後發送到Reduce節點。
在Reduce節點上,具有相同鍵的所有值都會聚集在一起,這時就可以執行聯接操作。
MapReduce Join
不直接套用於
傳統資料庫
邏輯,而是適用於Hadoop這樣的分布式處理系統中。但是為了方便理解,還是用
SQL語言
來分析,例如一條SQL:
SELECT orders.order_id, orders.date, customers.name
FROM orders
JOIN customers ON orders.customer_id = customers.customer_id;
會被轉換為兩個SQL:
SELECT customer_id, order_id, date FROM orders;
SELECT customer_id, name FROM customers;
這個過程就是Map階段,即讀取orders和customers表的數據,並為每條記錄輸出鍵值對,鍵是
customer_id
,值是記錄的其余部份。
下一個階段可有可無,即Shuffle階段。如果不在這裏排序可能會在Map階段執行SQL時候排序/分組或者在接下來的Reduce階段進行額外排序/分組。在這個階段主要將收集到的數據按照
customer_id
排序分組,以確保相同的
customer_id
的數據達到Reduce階段。
Reduce階段將每個對應的
customer_id
進行聯接操作,輸出並返回最後的結果。
這種操作普遍套用於兩個演算法完全不相同的表單,也是一種標準的處理模型,在這個過程中,我們以一張邏輯表的維度進行操作。這種演算法可能會消耗大量記憶體,甚至導致記憶體溢位,並且在處理大數據量時會相當耗時,因此不適合需要低延遲的場景。
額外補充
記憶體溢位場景普遍在如下場景:
大鍵值對數量: 如果Map階段產生了大量的鍵值對,這些數據需要在記憶體中進行緩存以進行排序和傳輸,這可能會消耗大量記憶體。
數據傾斜: 如果某個鍵非常常見,而其他鍵則不那麽常見,那麽處理這個鍵的Reducer可能會接收到大量的數據,導致記憶體不足。這種現象稱為數據傾斜。
大值列表: 在Reduce階段,如果某個鍵對應的值列表非常長,處理這些值可能會需要很多記憶體。
不合理的並列度: 如果Reduce任務的數量設定得不合適(太少或太多),可能會導致單個任務處理不均勻,從而導致記憶體問題。
我能想到的相應解決方案:
記憶體到磁盤的溢寫:當Map任務的輸出緩沖區滿了,它會將數據溢寫到磁盤。這有助於限制記憶體使用,但會增加I/O開銷。
透過設定合適的Map和Reduce任務數量,可以更有效地分配資源,避免某些任務過載。具體操作可以將Map操作的分段比如1~100,100~200,Reduce階段開設較少的並行處理。
最佳化數據分布,比如使用範圍分區(
range partitioning
)或哈希分區(
hash partitioning
)來減少數據傾斜。
Sort-Merge Join(排序-合並聯接)
先上原理解釋:
「
在分布式環境中,
Sort-Merge Join
首先在每個節點上對數據進行局部排序,然後將排序後的數據合並起來,最後在合並的數據上執行聯接操作。
「
這通常涉及到多階段處理,包括局部排序、數據洗牌(重新分發),以及最終的排序和合並。
舉個理解,還是上面的SQL。
SELECT orders.order_id, orders.date, customers.name
FROM orders
JOIN customers ON orders.customer_id = customers.customer_id;
對orders表按
customer_id
進行排序。
對customers表按
customer_id
進行排序。
同時遍歷兩個已排序的表,將具有相同
customer_id
的行配對。
這個就有點類似於原生的排序-合並聯接了。也是資料庫場景的標準處理辦法。
對於已經排序的數據集或數據分布均勻的情況,這種方法非常有效。如果數據未預先排序,這種方法可能會非常慢,因為它要求數據在聯接之前進行排序。
當然,這個演算法也會造成記憶體溢位的場景,解決方案如下:
當數據集太大而無法一次性載入到記憶體中時,可以使用外部排序演算法。外部排序演算法會將數據分割成多個批次,每個批次單獨排序,然後將排序後的批次合並。這種方法通常涉及到磁盤I/O操作,因此會比記憶體中操作慢。
對於合並步驟,可以使用流式處理技術,一次只處理數據的一小部份,並持續將結果輸出到下一個處理步驟或儲存系統。這樣可以避免一次性載入大量數據到記憶體中。
當記憶體不足以處理數據時,可以使用磁盤空間作為臨時儲存。資料庫管理系統通常有機制來處理記憶體溢位,比如建立磁盤上的臨時檔來儲存過程中的數據。
在分布式系統中,可以將數據分散到多個節點上進行處理,這樣每個節點只需要處理數據的一部份,從而減少單個節點上的記憶體壓力。