當前位置: 妍妍網 > 碼農

分布式場景使用Join的正確姿勢

2024-03-26碼農

背景

最近在閱讀查詢最佳化器的論文,發現System R中對於Join操作的定義一般分為了兩種,即巢狀迴圈、排序-合並聯接。

考慮到我的領域是在處理分庫分表或者其他的分區模式,這讓我開始不由得聯想我們怎麽在分布式場景套用這個Join邏輯,對於兩個不同柯瑞面的不同表我們是沒有辦法直接進行Join操作的。查閱資料後發現原來早有定義,即分布式聯接演算法。

分布式聯接演算法

跨界點處理數據即分布式聯接演算法,常見的有四種模型: Shuffle Join (洗牌聯接)、 Broadcast Join (廣播聯接)、 MapReduce Join (MapReduce聯接)、 Sort-Merge Join (排序-合並聯接)。

接下來將進行逐一了解與分析,以便後續開發的套用。

Shuffle Join(洗牌聯接)

先上原理解釋:

Shuffle Join 的核心思想是將來自不同節點的數據重新分發(洗牌),使得可以聯接的數據行最終位於同一個節點上。

通常,對於要聯接的兩個表,會對聯接鍵套用相同的哈希函式,哈希函式的結果決定了數據行應該被發送到哪個節點。這樣,所有具有相同 哈希值 的行都會被送到同一個節點,然後在該節點上執行聯接操作。

可能解釋完還是有點模糊,舉個例子,有兩張表,分別以id欄位進行分庫操作,且哈希演算法相同(為了簡單,這裏只介紹分庫場景,分庫分表同理。演算法有很多種,這裏舉例是hash演算法),那麽這兩張表的分片或授權以在同一個物理庫中,這樣我們不需要做大表維度的處理,我們可以直接下推Join操作到對應的物理庫操作即可。

ShardingSphere 中,這種場景類似於繫結表的定義,如果兩張表的演算法相同,可以直接配置繫結表的關系,進行相同演算法的連線查詢,避免復雜的 笛卡爾積

這樣做的好處是可以盡量下推到資料庫操作,在中介軟體層面我們可以做並列處理,適合大規模的數據操作。

但是,這很理想,有多少表會采用相同演算法處理呢。

Broadcast Join(廣播聯接)

先上原理解釋:

當一個表的大小相對較小時,可以將這個小表的全部數據廣播到所有包含另一個表數據的節點上。

每個節點上都有小表的完整副本,因此可以獨立地與原生的大表數據進行聯接操作,而不需要跨節點通訊。

舉個例子,有一張非常小的表A,還有一張按照ID分片的表B,我們可以在每一個物理庫中復制一份表A,這樣我們的Join操作就可以直接下推到每一個資料庫操作了。

這種情況比Shuffle Join甚至還有效能高效,這種類似於 ShardingSphere 中的廣播表的定義,其存在類似於字典表,在每一個資料庫都同時存在一份,每次寫入會同步到多個節點。

這種操作的好處顯而易見,不僅支持並列操作而且效能極佳。

但是缺點也顯而易見,如果小表不夠小數據冗余不說,廣播可能會消耗大量的網路頻寬和資源。

MapReduce Join(MapReduce聯接)

先上原理解釋:

MapReduce是一種編程模型,用於處理和生成大數據集,其中的聯接操作可以分為兩個階段:Map階段和Reduce階段。

Map階段:
  • 每個節點讀取其數據分片,並對需要聯接的鍵值對套用一個對映函式,生成中間鍵值對。

  • Reduce階段:
  • 中間鍵值對會根據鍵進行排序(在某些實作中排序發生在Shuffle階段)和分組,然後發送到Reduce節點。

  • 在Reduce節點上,具有相同鍵的所有值都會聚集在一起,這時就可以執行聯接操作。

  • MapReduce Join 不直接套用於 傳統資料庫 邏輯,而是適用於Hadoop這樣的分布式處理系統中。但是為了方便理解,還是用 SQL語言 來分析,例如一條SQL:

    SELECT orders.order_id, orders.date, customers.name
    FROM orders
    JOIN customers ON orders.customer_id = customers.customer_id;

    會被轉換為兩個SQL:

    SELECT customer_id, order_id, date FROM orders;
    SELECT customer_id, name FROM customers;

    這個過程就是Map階段,即讀取orders和customers表的數據,並為每條記錄輸出鍵值對,鍵是 customer_id ,值是記錄的其余部份。

    下一個階段可有可無,即Shuffle階段。如果不在這裏排序可能會在Map階段執行SQL時候排序/分組或者在接下來的Reduce階段進行額外排序/分組。在這個階段主要將收集到的數據按照 customer_id 排序分組,以確保相同的 customer_id 的數據達到Reduce階段。

    Reduce階段將每個對應的 customer_id 進行聯接操作,輸出並返回最後的結果。

    這種操作普遍套用於兩個演算法完全不相同的表單,也是一種標準的處理模型,在這個過程中,我們以一張邏輯表的維度進行操作。這種演算法可能會消耗大量記憶體,甚至導致記憶體溢位,並且在處理大數據量時會相當耗時,因此不適合需要低延遲的場景。

    額外補充

    記憶體溢位場景普遍在如下場景:

  • 大鍵值對數量: 如果Map階段產生了大量的鍵值對,這些數據需要在記憶體中進行緩存以進行排序和傳輸,這可能會消耗大量記憶體。

  • 數據傾斜: 如果某個鍵非常常見,而其他鍵則不那麽常見,那麽處理這個鍵的Reducer可能會接收到大量的數據,導致記憶體不足。這種現象稱為數據傾斜。

  • 大值列表: 在Reduce階段,如果某個鍵對應的值列表非常長,處理這些值可能會需要很多記憶體。

  • 不合理的並列度: 如果Reduce任務的數量設定得不合適(太少或太多),可能會導致單個任務處理不均勻,從而導致記憶體問題。

  • 我能想到的相應解決方案:

  • 記憶體到磁盤的溢寫:當Map任務的輸出緩沖區滿了,它會將數據溢寫到磁盤。這有助於限制記憶體使用,但會增加I/O開銷。

  • 透過設定合適的Map和Reduce任務數量,可以更有效地分配資源,避免某些任務過載。具體操作可以將Map操作的分段比如1~100,100~200,Reduce階段開設較少的並行處理。

  • 最佳化數據分布,比如使用範圍分區( range partitioning )或哈希分區( hash partitioning )來減少數據傾斜。

  • Sort-Merge Join(排序-合並聯接)

    先上原理解釋:

    在分布式環境中, Sort-Merge Join 首先在每個節點上對數據進行局部排序,然後將排序後的數據合並起來,最後在合並的數據上執行聯接操作。

    這通常涉及到多階段處理,包括局部排序、數據洗牌(重新分發),以及最終的排序和合並。

    舉個理解,還是上面的SQL。

    SELECT orders.order_id, orders.date, customers.name
    FROM orders
    JOIN customers ON orders.customer_id = customers.customer_id;

  • 對orders表按 customer_id 進行排序。

  • 對customers表按 customer_id 進行排序。

  • 同時遍歷兩個已排序的表,將具有相同 customer_id 的行配對。

  • 這個就有點類似於原生的排序-合並聯接了。也是資料庫場景的標準處理辦法。

    對於已經排序的數據集或數據分布均勻的情況,這種方法非常有效。如果數據未預先排序,這種方法可能會非常慢,因為它要求數據在聯接之前進行排序。

    當然,這個演算法也會造成記憶體溢位的場景,解決方案如下:

  • 當數據集太大而無法一次性載入到記憶體中時,可以使用外部排序演算法。外部排序演算法會將數據分割成多個批次,每個批次單獨排序,然後將排序後的批次合並。這種方法通常涉及到磁盤I/O操作,因此會比記憶體中操作慢。

  • 對於合並步驟,可以使用流式處理技術,一次只處理數據的一小部份,並持續將結果輸出到下一個處理步驟或儲存系統。這樣可以避免一次性載入大量數據到記憶體中。

  • 當記憶體不足以處理數據時,可以使用磁盤空間作為臨時儲存。資料庫管理系統通常有機制來處理記憶體溢位,比如建立磁盤上的臨時檔來儲存過程中的數據。

  • 在分布式系統中,可以將數據分散到多個節點上進行處理,這樣每個節點只需要處理數據的一部份,從而減少單個節點上的記憶體壓力。