當前位置: 妍妍網 > 資訊

【源碼解析系列】Apache Doris冷熱儲存分離源分碼析

2024-02-26資訊

最近某業務需要用到Doris冷熱分離的功能,為了方便後續的問題定位以及效能調優,筆者梳理了Doris冷熱分離功能的內核原理,並在梳理過程中發現若幹最佳化點,並貢獻給了社群

需求場景

在數據分析的實際場景中,冷熱數據往往面臨著不同的查詢頻次及響應速度要求。例如在日誌分析場景中,歷史數據的存取頻次很低,但需長時間備份以保證後續的審計和回溯的工作;在行為分析場景中,需支持近期流量數據的高頻查詢且時效性要求高,但為了保證歷史數據隨時可查,往往要求數據保存周期更為久遠;往往歷史數據的套用價值會隨著時間推移而降低,且需要應對的查詢需求也會隨之銳減。而隨著歷史數據的不斷增多,如果我們將所有數據儲存在本地,將造成大量的資源浪費。

為了解決滿足以上問題,冷熱數據分層技術應運而生,以更好滿足企業降本增效的趨勢。顧名思義, 冷熱分層是將冷熱數據分別儲存在成本不同的儲存介質上 ,例如熱數據儲存在成本更高的 SSD 盤上、以提高時效數據的查詢速度和響應能力,而冷數據則儲存在相對低成本的 HDD 盤甚至更為廉價的物件儲存上,以降低儲存成本。Doris還可以根據實際業務需求進行靈活的配置和調整,以滿足不同場景的要求。

冷熱分層一般適用於以下需求場景:

  • 數據 儲存周期長:面對歷史數據的不斷增加,儲存成本也隨之增加;

  • • 冷熱數據存取頻率及效能要求不同:熱數據存取頻率高且需要快速響應,而冷數據存取頻率低且響應速度要求不高;

  • • 數據備份和恢復成本高:備份和恢復大量數據需要消耗大量的時間和資源。

  • • ......

  • 在 Apache Doris 2.0 版本中支持三級儲存,分別是 SSD、HDD 和物件儲存。使用者可以配置使數據從 SSD 下沈到 HDD,並使用冷熱分層功能將數據從 SSD 或者 HDD 下沈到物件儲存中。

    由於使用HDD 盤做冷熱分離的數據處理邏輯與SSD槽相同,故本文本文將主要介紹基於物件儲存的冷熱分層。

    如何使用

    若要使用Doris 的冷熱分層功能,首先需要準備一個物件儲存的 Bucket 並獲取對應的 AK/SK。目前支持 AWS、Azure、阿裏雲、華為雲、騰訊雲、百度雲等多個雲的物件儲存。對應我們公司內部,可以選擇nos作為物件儲存,筆者的測試見: Doris & nos 冷熱儲存分離測試 。當準備就緒之後,下面為具體的使用步驟:

    1. 建立 Resource

    可以使用物件儲存的 Bucket 以及 AK/SK 建立 Resource。

    CREATE RESOURCE IF NOTEXISTS "${resource_name}"
    PROPERTIES(
    "type"="s3",
    "s3.endpoint" = "${S3Endpoint}",
    "s3.region" = "${S3Region}",
    "s3.root.path" = "path/to/root",
    "s3.access_key" = "${S3AK}",
    "s3.secret_key" = "${S3SK}",
    "s3.connection.maximum" = "50",
    "s3.connection.request.timeout" = "3000",
    "s3.connection.timeout" = "1000",
    "s3.bucket" = "${S3BucketName}"
    );

    2. 建立 Storage Policy

    可以透過 Storage Policy 控制數據冷卻時間,目前支持相對和絕對兩種冷卻時間的設定。

    CREATE STORAGE POLICY testPolicy
    PROPERTIES(
    "storage_resource" = "remote_s3",
    "cooldown_ttl" = "1d"
    );

    例如上方程式碼中名為 testPolicy storage policy 設定了新匯入的數據將在一天後開始冷卻,並且冷卻後的冷數據會存放到 remote_s3 所表示的物件儲存的 root path 下。除了設定 TTL 以外,在 Policy 中也支持設定冷卻的時間點,可以直接設定為:

    CREATE STORAGE POLICY testPolicyForTTlDatatime
    PROPERTIES(
    "storage_resource" = "remote_s3",
    "cooldown_datetime" = "2023-06-07 21:00:00"
    );

    3. 給表或者分區設定 Storage Policy

    在建立出對應的 Resource 和 Storage Policy 之後,我們可以在建表的時候對整張表設定 Cooldown Policy,也可以針對某個 Partition 設定 Cooldown Policy。這裏以 TPCH 測試數據集中的 lineitem 表舉例。如果需要將整張表都設定冷卻的策略,則可以直接在整張表的 properties 中設定:

    CREATETABLE IF NOTEXISTS lineitem1 (
    L_ORDERKEY INTEGERNOTNULL,
    L_PARTKEY INTEGERNOTNULL,
    L_SUPPKEY INTEGERNOTNULL,
    L_LINENUMBER INTEGERNOTNULL,
    L_QUANTITY DECIMAL(15,2NOTNULL,
    L_EXTENDEDPRICE DECIMAL(15,2NOTNULL,
    L_DISCOUNT DECIMAL(15,2NOTNULL,
    L_TAX DECIMAL(15,2NOTNULL,
    L_RETURNFLAG CHAR(1NOTNULL,
    L_LINESTATUS CHAR(1NOTNULL,
    L_SHIPDATE DATEV2 NOTNULL,
    L_COMMITDATE DATEV2 NOTNULL,
    L_RECEIPTDATE DATEV2 NOTNULL,
    L_SHIPINSTRUCT CHAR(25NOTNULL,
    L_SHIPMODE CHAR(10NOTNULL,
    L_COMMENT VARCHAR(44NOTNULL
    )
    DUPLICATE KEY(L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER)
    PARTITIONBYRANGE(`L_SHIPDATE`)
    (
    PARTITION `p202301` VALUES LESS THAN ("2017-02-01"),
    PARTITION `p202302` VALUES LESS THAN ("2017-03-01")
    )
    DISTRIBUTED BY HASH(L_ORDERKEY) BUCKETS 3
    PROPERTIES (
    "replication_num" = "3",
    "storage_policy" = "${policy_name}"
    )

    也可以對某個具體的 Partition 設定 Storage Policy,只需要在 Partition 的 Properties 中加上具體的 Policy Name 即可

    4. 檢視數據資訊

    一般可以透過 show tablets from lineitem1 直接檢視這張表的 Tablet 資訊。Tablet 資訊中區分了 LocalDataSize 和 RemoteDataSize,前者表示儲存在原生的數據,後者表示已經冷卻並移動到物件儲存上的數據。

    下方為數據剛匯入到 BE 時的數據資訊,可以看到數據還全部儲存在本地。

    *************************** 1.row ***************************
    TabletId: 2749703
    ReplicaId: 2749704
    BackendId: 10090
    SchemaHash: 1159194262
    Version: 3
    LstSuccessVersion: 3
    LstFailedVersion: -1
    LstFailedTime: NULL
    LocalDataSize: 73001235
    RemoteDataSize: 0
    RowCount: 1996567
    State: NORMAL
    LstConsistencyCheckTime: NULL
    CheckVersion: -1
    VersionCount: 3
    QueryHits: 0
    PathHash: -8567514893400420464
    MetaUrl: http://***.16.0.8:6781/api/meta/header/2749703
    CompactionStatus: http://***.16.0.8:6781/api/compaction/show?tablet_id=2749703
    CooldownReplicaId: 2749704
    CooldownMetaId:

    當數據到達冷卻時間後,再次進行 show tablets from table 可以看到對應的數據變化。

    *************************** 1.row ***************************
    TabletId: 2749703
    ReplicaId: 2749704
    BackendId: 10090
    SchemaHash: 1159194262
    Version: 3
    LstSuccessVersion: 3
    LstFailedVersion: -1
    LstFailedTime: NULL
    LocalDataSize: 0
    RemoteDataSize: 73001235
    RowCount: 1996567
    State: NORMAL
    LstConsistencyCheckTime: NULL
    CheckVersion: -1
    VersionCount: 3
    QueryHits: 0
    PathHash: -8567514893400420464
    MetaUrl: http://***.16.0.8:6781/api/meta/header/2749703
    CompactionStatus: http://***.16.0.8:6781/api/compaction/show?tablet_id=2749703
    CooldownReplicaId: 2749704
    CooldownMetaId: TUniqueId(hi:-8697097432131255833, lo:9213158865768502666)

    冷熱分層技術的具體實作

    儲存結構

    從上一篇文章: Apache Doris入門10問 , 可以知道,BE在儲存TABLET數據的時候,TABLET下面還會有ROWSET和SEGMENT的劃分。其中ROWSET代表著數據匯入批次,同一個ROWSET一般代表著一個批次的匯入任務,比如一次stream load,一個begin/commit事務等,都對應一個ROWSET。ROWSET的這種特性,意味著其具有著事務的特點,即是說,同一個ROWSET可以作為一個獨立的數據單元存在,其中的數據要麽全部有效,要麽全部無效。

    正因為如此,以ROWSET為基本單元對數據進行冷熱轉換,可以更容易的解決冷熱數據遷移過程中有新數據寫入的問題。

    如下圖所示,對於進入冷熱數據轉換狀態的TABLET,其ROWSET被分成兩部份:

  • • 一部份在本地,這部份數據往往是新寫入的數據,還未觸發上傳操作。

  • • 另一部份在遠端儲存集群(S3),這部份數據相對較早,是在此前已經觸發上傳到了儲存集群上的數據。

  • 兩部份合在一起才是完整的一個TABLET。

    那麽冷數據是何時以及如何上傳到物件儲存的呢?

    冷熱分層

    Doris透過BE的後台執行緒:cooldown_tasks_producer_thread,對本BE的所有存活的TABLET進行遍歷,檢查每個TABLET的配置資訊。當發現該TABLET配置了storage_policy,說明需要對其進行冷熱數據轉換。根據storage_policy中的配置,BE將從緩存資訊中的StoragePolicy列表中獲取對應的規則資訊,然後根據這個規則,判斷當前tablet是否需要進行冷熱數據轉換,將數據存放於遠端儲存集群上(如S3)。

    到這裏,先思考一個問題:具體的數據轉冷是怎麽實作的呢?直接將冷數據拷貝到S3,然後刪除本地檔就可以了嘛?

    答案是否定的。這裏不僅僅需要拷貝數據,還需要協調處理Be的後設資料。因為BE之間的tablet副本,rowset id不同,compaction merge進度不同。Leader 上傳完meta數據以後,slave副本就需要有辦法去同步這個meta以完成冷卻騰出本地盤的空間。具體的實作流程如下:

    Doris 的 FE 會從 Tablet 的所有可用本地副本中選擇一個本地副本作為上傳數據的 Leader,並透過 Doris 的周期匯報機制同步 Leader 的資訊給其它本地副本。在 Leader 上傳冷卻數據時,也會將冷卻數據的後設資料上傳到物件儲存,以便其他副本同步後設資料。因此,任何本地副本都可以提供查詢所需的數據,同時也保證了數據的高可用性和可靠性。

    介紹完了原理,這裏介紹下源分碼析。

    首先Be啟動的時候,會基於執行緒池,建立一系列後台執行緒:

    1. 1. 冷熱數據轉換後台執行緒

    2. 2. 無效冷數據清理執行緒

    3. 3. 冷數據compaction執行緒

    StorageEngine::start_bg_threads
    |--> ThreadPoolBuilder("CooldownTaskThreadPool")
    .set_min_threads(config::cooldown_thread_num)
    .set_max_threads(config::cooldown_thread_num)
    .build(&_cooldown_thread_pool);
    |--> //冷熱數據轉換後台執行緒
    |--> Thread::create(_cooldown_tasks_producer_callback(), _cooldown_tasks_producer_thread)
    |--> //無效冷數據清理執行緒
    |--> Thread::create(_remove_unused_remote_files_callback(), _remove_unused_remote_files_thread)
    |--> //冷數據compaction執行緒
    |--> Thread::create(_cold_data_compaction_producer_callback(), _cold_data_compaction_producer_thread)

    該章節主要介紹 冷熱數據轉換 後台執行緒的實作:

    _cooldown_tasks_producer_callback()
    |--> /*dead loop*/
    |--> //將所有需要轉冷的tablet維護到tablets
    |--> _tablet_manager->get_cooldown_tablets(&tablets, std::move(skip_tablet));
    | |--> /*loop*/
    | |--> get_cooldown_tablet
    | | |--> //skip all the tablets which are not running 
    | | |--> //and those pending to do cooldown
    | | |--> skip_tablet
    | | |--> tablet->need_cooldown(Tablet::need_cooldown)
    | | | |--> //判斷是否有對應的storage_policy
    | | | |--> get_storage_policy(id)
    | | | |--> //We pick the rowset with smallest start version in local
    | | | |--> //尋找當前tablet最小版本號的rowset
    | | | |--> Tablet::pick_cooldown_rowset
    | | | | |--> /*loop for _rs_version_map with lock*/
    | | | | |--> if (v.first < min_local_version) rowset = rs;
    | | | | |--> /*end loop for _rs_version_map*/
    | | | |--> //根據rowset判斷是否符合轉冷時間
    | | | |--> if (newest_cooldown_time > UnixSeconds()) return;
    | |--> /*end loop*/
    |
    |--> //遍歷上個步驟獲取到的tablets
    |--> /*loop for tablets*/
    |--> _running_cooldown_tablets.insert(tablet->tablet_id());
    |--> _cooldown_thread_pool->offer(std::move(task));
    | |--> task.work_function()
    | | |--> tablet->cooldown()(Tablet::cooldown())
    | | | |--> //對於主副本,執行數據傳輸工作
    | | | |--> //if _cooldown_replica_id == replica_id()
    | | | |--> //這裏是數據轉冷最核心的邏輯
    | | | |--> _cooldown_data
    | | | | |--> //拿到S3的一些控制代碼資訊
    | | | | |--> get_remote_file_system
    | | | | |--> old_rowset = pick_cooldown_rowset
    | | | | | |--> /*loop for _rs_version_map with lock*/
    | | | | | |--> if (v.first < min_local_version) rowset = rs;
    | | | | | |--> /*end loop for _rs_version_map*/
    | | | | |--> //將rowset數據按照new_rowset_id傳輸到S3
    | | | | |--> old_rowset->upload_to(BetaRowset::upload_to)
    | | | | | |--> RemoteFileSystem::batch_upload
    | | | | | | |--> S3FileSystem::batch_upload_impl
    | | | | | | | |--> transfer_manager->UploadFile
    | | | | |--> //根據new_rowset_id以及剛剛上傳的遠端目錄,生成新的rowset rowset_meta
    | | | | |--> new_rowset_meta = std::make_shared<RowsetMeta>(*old_rowset->rowset_meta());
    | | | | |--> RowsetFactory::create_rowset(new_rowset_meta)
    | | | | |--> //刪除舊的rowsets
    | | | | |--> delete_rowsets({std::move(old_rowset)}, false);
    | | | | |--> //添加新的rowsets
    | | | | |--> add_rowsets({std::move(new_rowset)});
    | | | | |--> //將meta資訊保存在本地
    | | | | |--> Tablet::save_meta
    | | | | | |--> TabletMeta::save_meta
    | | | | | | |--> //本質是將pb存在rockdb,其中_db是一個rockdb例項
    | | | | | | |--> TabletMetaManager::save
    | | | | | | | |--> _db->Put 
    | | | | |--> //將rowset_meta數據寫到S3
    | | | | |--> //meta主要的資訊是數據在s3上的路徑
    | | | | |--> //上傳的不是一整個meta資料庫,而是tablet粒度的
    | | | | |--> async_write_cooldown_meta
    | | | | | |--> write_cooldown_meta
    | | | |--> //else
    | | | |--> //對於從副本,需要同步後設資料
    | | | |--> //因為be之間的tablet副本,rowset id不同,compaction merge進度不同
    | | | |--> //主副本自己冷卻的數據自己是知道冷數據路徑的,並且也會更新meta持久化在rocksdb
    | | | |--> //從副本就需要有辦法去同步這個meta以完成冷卻騰出本地盤的空間
    | | | |--> _follow_cooldowned_data
    | | | | |--> get_remote_file_system
    | | | | |--> //從S3上讀取meta資訊
    | | | | |--> _read_cooldown_meta(fs, &cooldown_meta_pb)
    | | | | | |--> //建立遠端檔控制代碼
    | | | | | |--> fs->open_file(remote_meta_path, &tablet_meta_reader)
    | | | | | |--> //將遠端meta資訊讀到buf
    | | | | | |--> tablet_meta_reader->read_at(buf,file_size)
    | | | | | |--> //將buf的數據反序列化到tablet_meta_pb
    | | | | | |--> tablet_meta_pb->ParseFromArray(buf.get(), file_size)
    | | | | |--> //從副本的tablet meta需要跟S3上的保持一致,所以需要看下哪些rowset需要刪除
    | | | | |--> to_delete(rs_it, overlap_rowsets.end());
    | | | | |--> //從副本的tablet meta需要跟S3上的保持一致,所以需要看下哪些rowset需要加進去
    | | | | |--> add_rowsets(to_add)
    | | | | |--> _tablet_meta->set_cooldown_meta_id
    | | | | |--> //將新的meta資訊進行本地持久化
    | | | | |--> save_meta();
    | | | |--> //end

    冷數據Compaction

    在一些場景下會有大量修補數據的需求,在大量補數據的場景下往往需要刪除歷史數據,刪除可以透過 delete where 實作,Doris 在 Compaction 時會對符合刪除條件的數據做物理刪除。基於這些場景,冷熱分層也必須實作對冷數據進行 Compaction,因此在 Doris 2.0 版本中支持了對冷卻到物件儲存的冷數據進行 Compaction(ColdDataCompaction)的能力,使用者可以透過冷數據 Compaction,將分散的冷數據重新組織並壓縮成更緊湊的格式,從而減少儲存空間的占用,提高儲存效率。

    Doris 對於本地副本是各自進行 Compaction,這其實是有一些資源浪費的,理論上可以最佳化為單副本Compaction。在冷熱分層場景中,由於冷數據只有一份,因此天然的單副本做 Compaction 是最優秀方案,同時也會簡化處理數據沖突的操作。BE 後台執行緒會定期從冷卻的 Tablet 按照一定規則選出 N 個 Tablet 發起 ColdDataCompaction。與數據冷卻流程類似,只有 CooldownReplica 能執行該 Tablet 的 ColdDataCompaction。Compaction 下刷數據時每積累一定大小(預設 5MB)的數據,就會上傳一個 Part 到物件,而不會占用大量本地儲存空間。Compaction 完成後,CooldownReplica 將冷卻數據的後設資料更新到物件儲存,其他 Replica 只需從物件儲存同步後設資料,從而大量減少物件儲存的 IO 和節點自身的 CPU 開銷。

    _cold_data_compaction_producer_callback
    |--> /*dead loop*/
    |--> //在鎖保護下,保證任務數小於compaction執行緒數
    |--> //cold_data_compaction_thread_num表示冷數據compaction執行緒數
    |--> //copied_tablet_submittedsize()表示正在執行compaction的tablet數
    |--> if(cold_data_compaction_thread_num<copied_tablet_submitted.size()) continue
    |--> //從冷卻的Tablet中按照一定規則選出tablets
    |--> tablets = _tablet_manager->get_all_tablet
    |--> /*loop for tablets*/
    |--> //對於master 副本,將tablet以及對於的compaction score維護到compact任務佇列
    |--> tablet_to_compact.emplace_back(t, score);
    |--> //對於slave 副本,維護到follow任務佇列
    |--> tablet_to_follow.emplace_back(t, score)
    |--> /*end loop for tablets*/
    |
    |--> /*loop for tablet_to_compact*/
    |--> //將compaction任務丟到執行緒池
    |--> _cold_data_compaction_thread_pool->submit_func
    | |--> compaction->compact()-->ColdDataCompaction::compact()
    | | |--> ColdDataCompaction::prepare_compact
    | | | |--> ColdDataCompaction::pick_rowsets_to_compact
    | | | | |--> _tablet->traverse_rowsets
    | | | | |--> check_version_continuity
    | | |--> ColdDataCompaction::execute_compact_impl
    | | | |--> Compaction::do_compaction
    | | | | |--> Compaction::do_compaction_impl
    |--> /*end loop for tablet_to_compact*/
    |
    |--> /*loop for tablet_to_follow*/
    |--> //將follow任務丟到執行緒池
    |--> _cold_data_compaction_thread_pool->submit_func
    | |--> //這裏只是復用了函式,實際上不會執行數據cooldown,只會拉取後設資料
    | |--> Tablet::cooldown
    | | |--> _follow_cooldowned_data()

    冷數據Cache

    假設 Table Lineitem1 中的所有數據都已經冷卻並且上傳到物件儲存中,如果使用者在 Lineitem1 上進行對應的查詢,Doris 會根據對應 Partition 使用的 Policy 資訊找到對應的 Bucket 的 Root Path,並根據不同 Tablet 下的 Rowset 資訊下載查詢所需的數據到本地進行運算。

    Doris 2.0 在查詢上進行了最佳化,冷數據第一次查詢會進行完整的 S3 網路 IO,並將 Remote Rowset 的數據下載到本地後,存放到對應的 Cache 之中,後續的查詢將自動命中 Cache,以此來保證查詢效率。

    而 Cache 的粒度大小直接影響 Cache 的效率,比較大的粒度會導致 Cache 空間以及頻寬的浪費,過小粒度的 Cache 會導致物件儲存 IO 效率低下,Apache Doris 采用了以 Block 為粒度的 Cache 實作。當遠端數據被存取時會先將數據按照 Block 的粒度下載到原生的 Block Cache 中儲存,且 Block Cache 中的數據存取效能和非冷熱分層表的數據效能一致。

    那具體是怎麽實作的呢?

    前文提到 Doris 的冷熱分層是在 Rowset 級別進行的,當某個 Rowset 在冷卻後其所有的數據都會上傳到物件儲存上。而 Doris 在進行查詢的時候會讀取涉及到的 Tablet 的 Rowset 進行數據聚合和運算,當讀取到冷卻的 Rowset 時,會把查詢需要的冷數據下載到本地 Block Cache 之中。基於效能考量,Doris 的 Cache 按照 Block 對數據進行劃分。Block Cache 本身采用的是簡單的 LRU 策略,可以保證越是使用程度較高數據越能在 Block Cache 中存放的久。

    具體源碼實作如下:

    CachedRemoteFileReader::_read_from_cache
    | //session variable chooses to close file cache for this query
    | //1. 會話級別的變量可能禁止當前SQL使用file cache
    |--> if (!io_ctx->read_file_cache)
    |--> //直接從S3讀取檔
    |--> _remote_file_reader->read_at(offset, result, bytes_read, io_ctx)
    |--> S3FileReader::read_at_impl
    |--> //end if
    |
    |--> //2. 否則則嘗試從緩存讀取
    |--> //首先根據Block size進行一些IO對齊的操作
    |--> _align_size(offset, bytes_req)
    | |--> align_left = (left/file_cache_max_file_segment_size)*file_cache_max_file_segment_size;
    | |--> align_right = (right/file_cache_max_file_segment_size + 1)*file_cache_max_file_segment_size;
    |--> holder = _cache->get_or_set
    |--> /*loop for holder.file_segments*/
    |--> //如果cache miss,則需要需用從S3上讀
    |--> empty_segments.push_back(segment)
    |--> /*end loop for holder.file_segments*/
    |
    |--> //2.1. 如果empty_segments不為空,即存在cash miss
    |--> //發起一個IO,將cache miss的部份從S3讀取檔
    |--> empty_start = empty_segments.front()->range().left;
    |--> empty_end = empty_segments.back()->range().right;
    |--> _remote_file_reader->read_at(empty_start,empty_end)
    |--> //讀取完以後,異步的將數據儲存的本地
    |--> /*loop for empty_segments*/
    |--> FileBlock::append
    | |--> LocalWriterPtr::append
    | | |--> LocalFileWriter::appendv
    |--> FileBlock::finalize_write
    | |--> FileBlock::set_downloaded
    |--> /*end loop for empty_segments*/
    |--> //todo:這裏有一些前置條件,還沒看明白
    |--> //將讀取到的數據拷貝到結果集中
    |--> memcpy(result.data, src, copy_size);
    |
    |--> //2.2 遍歷所有block,匯總最終的結果
    |--> /*loop for holder.file_segments*/
    |--> //從本地盤讀取數據
    |--> FileBlock::read_at
    | |--> FileReader::read_at
    |--> /*end loop for holder.file_segments*/

    無效冷數據刪除

    冷數據在compaction以後,無效的冷數據如何刪除呢?

    Doris是透過BE的一個後台執行緒去定期掃描,然後讓對應的leader執行刪除操作。由於S3上的冷數據只有一個副本,所以刪除的這部份就要額外小點。因為Doris的tablet不像raft group一樣有個明確的leader,所以每次刪除前都需要跟FE確認,確認無誤後才能執行刪除。

    具體源碼如下:

    _remove_unused_remote_files_callback
    |--> /*dead loop*/
    |--> Tablet::remove_unused_remote_files
    | |--> //該buffer實際是是一個map,用來儲存待刪除檔的遠端目錄資訊
    | |--> unused_remote_files_buffer_t buffer;
    | |--> //獲取所有符合條件的tablets
    | |--> tablets = StorageEngine::instance()->tablet_manager()->get_all_tablet
    | |--> /*loop for tablets*/
    | |--> //收集可以回收的無效檔
    | |--> calc_unused_remote_files
    | | |--> std::vector<io::FileInfo> files
    | | |--> FileSystem::list(Path,files)
    | | | |--> //透過S3介面,根據指定路徑尋找可以刪除的檔
    | | | |--> S3FileSystem::list_impl(Path,files)
    | | | | |--> /*loop for objs
    | | | | |--> files->push_back(std::move(file_info));
    | | | | |--> /*end for objs
    | | |--> //獲得當前tablet的cooldowned rowsets
    | | |--> /*loop for t->_tablet_meta->all_rs_metas() */

    | | |--> cooldowned_rowsets.insert(rs_meta->rowset_id());
    | | |--> /*end loop for t->_tablet_meta->all_rs_metas() */
    | | |--> files.erase(std::remove_if(files.begin(), files.end(), std::move(filter)), files.end());
    | | | |--> //過濾掉不需要刪除的檔
    | | | |--> //返回false表示不需要刪除
    | | | |--> filter()
    | | | | |--> return cooldowned_rowsets.contains(rowset_id) || pending_remote_rowsets().contains(rowset_id)
    | | |--> buffer.insert({t->tablet_id(), {std::move(dest_fs), std::move(files)}});
    | |--> //積攢一批後,統一進行回收
    | |--> confirm_and_remove_files
    | | |--> //跟fe確認需要刪除的遠端檔列表
    | | |--> MasterServerClient::confirm_unused_remote_files
    | | | |--> //先給FE發送請求
    | | | |--> FrontendServiceClient::send_confirmUnusedRemoteFiles
    | | | |--> //根據FE的回包來確認待刪除資訊
    | | | |--> FrontendServiceClient::recv_confirmUnusedRemoteFiles
    | | |--> /* loop for result.confirmed_tablets */
    | | |--> //查詢buffer中是否有需要對應的tablet_id
    | | |--> if (auto it = buffer.find(id); LIKELY(it != buffer.end()))
    | | |--> //獲取需要刪除的檔路徑
    | | |--> /* loop for files*/
    | | |--> paths.push_back(std::move(file_path));
    | | |--> /* end loop for files*/
    | | |--> //批次刪除檔
    | | |--> FileSystem::batch_delete
    | | | |--> S3FileSystem::batch_delete_impl
    | | | | |--> //依次發送刪除請求
    | | | | |--> delete_request.SetDelete(std::move(del));
    | | |--> end if
    | | |--> /* end loop for result.confirmed_tablets */
    | |--> /*end loop for tablets*/
    |--> /*end loop*/

    源碼貢獻

    最佳化一:冷熱分層----尋找合適rowset 最佳化

    問題描述:

    cooldown_tasks_producer_thread在尋找tablet的時候,pick_cooldown_rowset呼叫了兩次。其中pick_cooldown_rowset是在鎖保護下遍歷所有的rowset,如果tablet裏的rowset比較多的話,會造成效能損耗。

    社群這樣做的原因是,上一輪選中的rowset可能被compaction merge掉了,所以又重新遍歷了一遍。

    問題解決:

    1. 1. 最佳化冷卻的rowset大機率不會立即進行compaction,所以可以在采用一種樂觀的事務類似的思路來處理。

    2. 2. 第一次遍歷的時候記錄rowset的狀態資訊、版本資訊等

    3. 3. 第二次直接去tablet找下rowset是否還在version rowset map中。不如不在或者狀態發生了變化,則再次執行遍歷

    最佳化前後的流程如下:

    issue

    https://github.com/apache/doris/issues/27055

    pr

    https://github.com/apache/doris/pull/27091

    最佳化二:冷熱分層----減少無效遍歷

    問題描述:

    冷熱分層後,slave 副本的be需要從S3同步後設資料。需要在 _meta_lock 的保護下遍歷 _rs_version_map 來獲取對應的區間。由於遍歷過程有一些判斷操作,目前的源碼 為了方便處理,直接 對 _rs_version_map 進行了兩次遍歷。由於整個過程是在鎖保護下進行的,並且 _rs_version_map ,在高並行場景下較大,所以會對效能產生影響。

    問題解決:

    將兩次遍歷改成1次遍歷,減少遍歷的開銷以及鎖沖突

    pr

    https://github.com/apache/doris/pull/27118

    最佳化三:冷熱分層----鎖區間最佳化

    問題描述:

    在執行冷熱分層過程中,需要遍歷所有tablet,並且判斷tablet是否符合轉冷條件。在判斷是否符合轉冷條件的時候,有一些檢查需要鎖保護,有一些不需要。而社群的程式碼處理的比較粗糙,直接加鎖然後進行判斷。

    問題解決:

    透過以下策略減少加鎖時間:

    1. 1. 先判斷並行安全的變量,如果不符合條件則直接返回;

    2. 2. 上一步透過後,再加鎖進行判斷

    pr

    https://github.com/apache/doris/pull/26984

    最佳化四:無效冷數據清理----過濾最佳化

    問題描述

    在清理無效冷數據檔前需要獲取所有的tablet。但是在冷熱分層模式下,是有master 副本,slave 副本的概念的,只有master才能去清理檔。

    問題解決:

    在獲取tablet時候增加過濾條件,防止獲得大量無效的tablet,導致後續的無效遍歷

    pr

    https://github.com/apache/doris/pull/27338

    最佳化五:無效冷數據清理----容量判斷最佳化

    問題描述

    在清理無效冷數據檔時,需要遍歷所有待處理的file,判斷file的rowset_id是否存在於cooldowned_rowsets中。這就導致就算沒有合適的cooldowned_rowsets,也會去對file無效遍歷。

    問題解決:

    在遍歷待處理的file之前判斷cooldowned_rowsets是否為空,如果為空則放棄這次清理操作。

    pr

    https://github.com/apache/doris/pull/27324

    最佳化六:冷數據Compaction----排序最佳化

    問題描述

    在執行冷數據Compaction之前,需要獲取所有Compaction sore中 top N的tablet,來保證獲取最合適的tablet去Compaction。但是程式碼在這塊的處理比較粗糙,每次遍歷的時候都會進行一次全量排序。

    問題解決:

    增加了一些策略來減少排序次數

    pr

    https://github.com/apache/doris/pull/27147

    關於作者

    隱形(邢穎) 網易資深資料庫內核工程師,畢業至今一直從事資料庫內核開發工作,目前主要參與 MySQL 與 Apache Doris 的開發維護和業務支持工作。

    作為 MySQL 內核貢獻者,為 MySQL 上報了 60 多個 Bugfix 及最佳化patch,多個送出被合入 MySQL 8.0 版本。從 2023 年起加入 Apache Doris 社群,Apache Doris Active Contributor,已為社群送出並合入數十個 Commits。