本文 字數:13714 ; 估計閱讀時間:35 分鐘
審 校: 莊曉東(魏莊)
介紹
今天,歡迎來自我們的技術合作夥伴Streamkap的客座文章,Streamkap是一種針對ClickHouse開箱即用的變更數據捕捉(CDC-change data capture)解決方案。這篇部落格深入探討了構建這樣一個產品的細節和挑戰。對於那些只想要一個開箱即用CDC解決方案的ClickHouse使用者,我們很高興向你推薦Streamkap這個托管服務。
我們很高興地宣布:新的ClickHouse連結器,可以從諸如PostgreSQL、MySQL、SQL Server、Oracle和MongoDB等資料庫中流式傳輸CDC數據到ClickHouse。
Streamkap最近切換到使用ClickHouse來即時處理所有日誌和指標,因為我們發現其他解決方案無法實作我們所需的查詢效能。在采用ClickHouse之後,我們希望開始提供一個ClickHouse CDC整合,但發現現有的連結器存在問題,因此我們著手構建了一個新的連結器來解決這些問題。
在這篇文章中,我們假設您熟悉ClickHouse資料庫和Change Data Capture(CDC)的概念,但如果不熟悉,您可以透過閱讀有關流式變更數據捕捉的文章來了解更多。
我們將深入探討為ClickHouse構建CDC解決方案的挑戰,以及我們如何解決這些問題,討論我們如何處理模式演變、數據一致性和快照。最後,我們將展示如何在保持效能的流式流程的同時實作所有需求。
技術
ClickHouse是一個開源的列式資料庫。列導向結構意味著數據是按列而不是按行儲存和檢索的。ClickHouse已經成為構建即時應用程式的首選,因為它能夠攝取大量數據,並且在寫入時,而不是在讀取時將數據物化。這導致查詢速度顯著加快,使ClickHouse適用於支撐即時應用程式。
Streamkap是一個無伺服器流式傳輸平台,可以實作:即時的變更數據捕捉(CDC),並傳入ClickHouse。在底層,諸如Kafka、Debezium、Flink之類的技術與生產級連結器/管道相結合。
下面是Streamkap從資料庫流式傳輸到ClickHouse的概述。
挑戰
當我們首次嘗試將CDC數據流式傳輸到ClickHouse時,我們尋找了可以使用的現有連結器。在考察了官方的ClickHouse Kafka Connect連結器,以及市場上的其他連結器後,我們很快意識到:我們需要對其進行大量修改,才能支持不同的用例。意識到這些連結器需要進行大量修改,我們開始構建自己的解決方案。以下是我們需要確保:在將解決方案帶入生產之前需要解決的一些關鍵需求。
數據型別
現有解決方案對數據型別的支持不太好:
巢狀結構
巢狀陣列,包含巢狀結構的陣列
具有微秒精度的時間戳
具有微秒精度的時間
不包含時間資訊的日期(自歷元以來的天數)
將JSON傳輸為普通字串欄位
後設資料
在處理CDC數據時,添加額外的後設資料列(如時間戳和CDC記錄型別)很有幫助。這樣可以使後續處理轉換更簡單、更強大,同時還能診斷延遲問題。
插入/更新
在Streamkap中,我們看到一半的客戶希望使用插入或更新。插入是追加模式,因此保持所有更改的歷史記錄,而更新則僅顯示最終數據(插入+更新)。雖然大多數公司在批次ETL中習慣了這種能力,但當與流式ETL結合使用時,這是一個新概念。了解更多關於批次處理與即時處理的資訊
模式(Schema)演變
當源表發生變化時,我們需要更新目標表以處理這種模式漂移,以避免導致管道中斷。
半結構化數據
像MongoDB/Elasticsearch這樣的源允許復雜的巢狀記錄結構中存在不一致性,需要在插入到ClickHouse之前由攝取管道進行調和。例如:
日期/時間在某些記錄中表示為數位(自紀元以來的秒/毫秒)而在其他記錄中表示為字串(ISO格式)
在某些記錄中,巢狀欄位是字串,而在其他記錄中是更復雜的巢狀結構
深度巢狀的復雜半結構化數據通常需要在插入到ClickHouse之前進行預處理,將其對映到適當的型別,例如元組、巢狀。
我們的方法
現在讓我們深入研究我們的連結器,以及如何解決這些挑戰。
數據型別
我們發現預設方法通常是將數據插入到ClickHouse中,然後在載入後轉換數據。
我們內建支持以下數據型別:
Kafka Connect Data Type | ClickHouse Data type |
---|---|
INT8 | Int8 |
INT16 | Int16 |
INT32 | Int32 |
INT64 | Int64 |
FLOAT32 | Float32 |
FLOAT64 | Float64 |
BOOLEAN | Bool |
BYTES | BLOB (String) |
STRING | String |
org.apache.kafka.connect.data.Decimal | DECIMAL(38, 0) |
org.apache.kafka.connect.data.Timestampio.debezium.time.ZonedTimestamp | DateTime64 |
org.apache.kafka.connect.data.Date | Date |
io.debezium.data.Json | String |
STRUCT | Tuple |
ARRAY | Array |
JSON欄位當前作為字串進行攝取,
allow_experimental_object_type=1
的使用目前正在測試中。
後設資料
連結器為每個插入到ClickHouse表中的插入添加了額外的關鍵列,以便在載入後進行更好的分析和建模,以及支持更新。
以下後設資料列被添加到每個ClickHouse表中:
_streamkap_ts_ms :CDC事件時間戳
_streamkap_deleted :如果當前的CDC事件是一個刪除事件
_streamkap_partition :表示內部Streamkap分區編號,透過對源記錄的鍵欄位套用一致性哈希獲取
_streamkap_source_ts_ms :變更事件在源資料庫中發生的時間戳
_streamkap_op :CDC事件操作型別(c insert, u update, d delete, r snapshot, t truncate)
插入/更新
Streamkap連結器支持兩種將數據攝入到ClickHouse的模式: 插入 (追加)和 更新 。
更新模式是我們連結器的預設模式,當需要ClickHouse表包含源數據的最新版本時使用。
插入(追加)模式
插入模式導致每個變更都被跟蹤,並作為新行插入到ClickHouse中,而刪除事件將在ClickHouse中標記為已刪除,使用元值 _streamkap_deleted 。
這在處理較大的數據量時很有用,可以保持延遲較低,並保持更改的歷史記錄。
例如,Streamkap在收集我們的指標時使用插入模式,因為只有不可變數據被插入。
然後,我們在指標表上使用Materialized Views建立了許多聚合,以進行時間序列分析。對這個表設定一個合適的TTL,以便ClickHouse為我們處理刪除操作,同時提供足夠的歷史數據來調查任何問題,或者如果我們必須出於某種原因重建Materialized Views。
要使用插入(追加)模式,使用ClickHouse引擎MergeTree。
更新模式
更新是插入和更新的組合。如果行的主鍵有匹配,值將被覆蓋。相反,如果沒有匹配,事件將被插入。
更新模式使用ClickHouse的ReplacingMergeTree引擎實作。
ReplacingMergeTree引擎根據排序鍵在周期性的後台合並中去重數據,允許清理舊記錄。這個過程的異步性意味著可能會有一個小視窗,留下了檢視中的舊記錄。因此,查詢必須使用FINAL修飾詞來確保返回數據的最新版本,然後在查詢時對任何剩余的相同記錄進行去重。
帶有基本型別的更新範例
這裏以JSON格式顯示了一個用於更新的輸入記錄。鍵只有一個欄位,id,這是將對行進行去重的主鍵:
{
"id": "123456hYCcEM62894000000000",
"str_col": "some-str-values-000000000",
"IntColumn": 123000,
"Int8": 0,
"InT16": 10,
"bool_col": true,
"double_col": 1.7976931348623157E308,
"json_col": "{\"a\": 0}",
"__deleted": false,
"created_at": 1707379532748,
"date_col": 19761,
"ts_tz": "2023-10-24T15:19:51Z",
"_streamkap_ts_ms": 1707379532748,
"binary_col": "AQIDBAU=",
"byte_buf": "AQIDBAU=",
"bigint_col": "E4f/////0tCeAA=="
}
結果表:
SHOWCREATETABLE streamkap_test_nominal_upsert
FORMAT Vertical
Queryid: 1abf2898-69b3-4785-a849-65c3879493bb
Row1:
──────
statement: CREATETABLE streamkap.streamkap_test_nominal_upsert
(
`id`StringCOMMENT'id',
`str_col`StringCOMMENT'str_col',
`IntColumn` Int32 COMMENT'IntColumn',
`Int8`Int8COMMENT'Int8',
`InT16` Int16 COMMENT'InT16',
`bool_col`BoolCOMMENT'bool_col',
`double_col` Float64 COMMENT'double_col',
`json_col`StringCOMMENT'json_col',
`__deleted`BoolCOMMENT'__deleted',
`created_at` DateTime64(3) COMMENT'created_at',
`date_col`DateCOMMENT'date_col',
`ts_tz` DateTime64(3) COMMENT'ts_tz',
`_streamkap_ts_ms` Int64 COMMENT'_streamkap_ts_ms',
`binary_col`StringCOMMENT'binary_col',
`byte_buf`StringCOMMENT'byte_buf',
`bigint_col`Decimal(38, 0) COMMENT'bigint_col',
`_streamkap_partition` Int32 COMMENT'_streamkap_partition',
`_streamkap_deleted` UInt8 MATERIALIZEDif(__deleted = true, 1, 0)
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{uuid}/{shard}', '{replica}', _streamkap_ts_ms, _streamkap_deleted)
PARTITIONBY _streamkap_partition
PRIMARY KEYid
ORDERBYid
SETTINGS index_granularity = 8192
範例數據:
SELECT*
FROMstreamkap_test_nominal_upsert
FORMATVertical
Row1:
──────
id: 123456hYCcEM62894000000000
str_col: some-str-values-000000000
IntColumn: 123000
Int8: 0
InT16: 10
bool_col: true
double_col: 1.7976931348623157e308
json_col: {"a": 0}
__deleted: false
created_at: 2024-02-08 08:03:37.368
date_col: 2024-02-08
ts_tz: 2023-10-24 15:19:51.000
_streamkap_ts_ms: 1707379417368
binary_col:
byte_buf:
bigint_col: 92233720368547000000000
_streamkap_partition: 0
Row2:
──────
id: 123456hYCcEM62894000000000
str_col: some-str-values-000000000
IntColumn: 123000
Int8: 0
InT16: 10
bool_col: true
double_col: 1.7976931348623157e308
json_col: {"a": 0}
__deleted: false
created_at: 2024-02-08 08:03:41.608
date_col: 2024-02-08
ts_tz: 2023-10-24 15:19:51.000
_streamkap_ts_ms: 1707379421608
binary_col: java.nio.HeapByteBuffer[pos=0 lim=5 cap=5]
byte_buf: java.nio.HeapByteBuffer[pos=0 lim=5 cap=5]
bigint_col: 92233720368547000000000
_streamkap_partition: 0
去重數據,使用 FINAL :
SELECT*
FROMstreamkap_test_nominal_upsert
FINAL
FORMATVertical
Row1:
──────
id: 123456hYCcEM62894000000000
str_col: some-str-values-000000000
IntColumn: 123000
Int8: 0
InT16: 10
bool_col: true
double_col: 1.7976931348623157e308
json_col: {"a": 0}
__deleted: false
created_at: 2024-02-08 08:03:41.608
date_col: 2024-02-08
ts_tz: 2023-10-24 15:19:51.000
_streamkap_ts_ms: 1707379421608
binary_col: java.nio.HeapByteBuffer[pos=0 lim=5 cap=5]
byte_buf: java.nio.HeapByteBuffer[pos=0 lim=5 cap=5]
bigint_col: 92233720368547000000000
_streamkap_partition: 0
處理半結構化數據
巢狀陣列和結構體
下面,我們提供了一些如何將復雜結構自動對映到ClickHouse型別的範例。
為了支持包含結構的陣列,我們需要修改Streamkap在ClickHouse中的角色,將flatten_nested設定為0:
ALTERROLE STREAMKAP_ROLE SETTINGS flatten_nested = 0;
包含子陣列的巢狀結構欄位
這裏以JSON格式顯示了一個輸入記錄,鍵只有一個欄位 id :
{
"id": 1,
"obj": {
"nb": 123,
"str": "abc",
"sub_arr": [
{
"sub_nb": 789,
"sub_str": "mnp"
}
]
}
}
結果表。註意 obj 列已經被對映到一個 Tuple(nb Int32, str String, sub_arr Array(Tuple(n Int32, s String)), sub_arr_str Array(String)) 來處理復雜的結構:
SHOWCREATETABLE chdb.streamkap_nested_struct_with_array
CREATETABLE chdb.streamkap_nested_struct_with_array
(
`obj` Tuple(nb Int32, strString, sub_arr Array(Tuple(n Int32, s String)), sub_arr_str Array(String)) COMMENT'obj',
`__deleted`BoolCOMMENT'__deleted',
`_streamkap_ts_ms` Int64 COMMENT'_streamkap_ts_ms',
`_streamkap_partition` Int32 COMMENT'_streamkap_partition',
`id` Int32 COMMENT'id',
`_streamkap_deleted` UInt8 MATERIALIZEDif(__deleted = true, 1, 0)
)
ENGINE = ReplacingMergeTree(_streamkap_ts_ms, _streamkap_deleted)
PARTITIONBY _streamkap_partition
PRIMARY KEYid
ORDERBYid
SETTINGS index_granularity = 8192
範例數據:
SELECT *
FROM chdb.streamkap_nested_struct_with_array
LIMIT1format Vertical
obj: (123,'abc',[(789,'mnp')],['efg'])
__deleted: false
_streamkap_ts_ms: 1702519029407
_streamkap_partition: 0
id: 1
包含子結構的巢狀陣列欄位
這裏以JSON格式顯示了一個輸入記錄,鍵只有一個欄位 id :
{
"id": 1,
"arr": [
{
"nb": 123,
"str": "abc"
}
]
}
SHOWCREATETABLE streamkap_nested_array_of_struct
CREATETABLE streamkap.streamkap_nested_array_of_struct
(
`arr`Array(Tuple(nb Int32, strString)) COMMENT'arr',
`__deleted`BoolCOMMENT'__deleted',
`_streamkap_ts_ms` Int64 COMMENT'_streamkap_ts_ms',
`_streamkap_partition` Int32 COMMENT'_streamkap_partition',
`id` Int32 COMMENT'id',
`_streamkap_deleted` UInt8 MATERIALIZEDif(__deleted = true, 1, 0)
)
ENGINE = ReplacingMergeTree( _streamkap_ts_ms, _streamkap_deleted)
PARTITIONBY _streamkap_partition
PRIMARY KEYid
ORDERBYid
SETTINGS index_granularity = 8192
範例數據:
SELECT*
FROMstreamkap_nested_array_of_struct
LIMIT1 format Vertical
arr: [(123,'abc')]
__deleted: false
_streamkap_ts_ms: 1702529856885
_streamkap_partition: 0
id: 1
快照
快照是指將現有數據從資料庫載入到ClickHouse的過程。
我們有兩種方法可以載入這些歷史數據。
阻塞快照
阻塞快照的目的是捕獲資料庫表的整個當前狀態,並將使用大型的select語句來實作。這些也可以並行執行,速度非常快。從效率上講,阻塞快照可能會對系統資源產生較大的影響,特別是對於大型表,每個查詢可能需要更長的時間。
增量快照
增量快照旨在提高效率,對系統資源的影響通常較小,並且特別適用於非常大的表格或希望同時進行快照和流式傳輸的情況。
數據一致性和交付保證
交付保證主要是指在出現故障場景時,未確認的CDC事件可能被重播,導致重復的行插入到ClickHouse中。
Streamkap為ClickHouse提供至少一次交付保證。
使用插入攝入模式,可能會將一些重復的行插入到ClickHouse中。然而,透過在materialized view中添加去重程式碼,不會有任何影響。
如前所述,對於更新攝入模式,我們透過源記錄鍵進行去重。強制使用一次性交付保證會增加效能開銷,而沒有額外的好處,因為相同的行程處理重復的CDC事件,將所有CDC事件合並為一個記錄的最終記錄狀態。
變換
Streamkap支持管道中的變換,因此數據可以在發送到ClickHouse之前進行預處理。
這對於半結構化數據、預處理和清理任務特別有用。這可能比在攝入後處理數據效率更高。
在清理過的結構化數據上進行實分時析自然是在ClickHouse中完成的,查詢效能受益於將數據轉換移到插入時間。
下面,我們介紹了Streamkap執行的一些常見轉換。
修復半結構化數據中的不一致性
考慮修復一個不一致的半結構化日期欄位:
"someDateField": {"$date": "2023-08-04T09:12:20.29Z"}
"someDateField": "2023-08-07T08:14:57.817325+00:00"
"someDateField": {"$date": {"$numberLong": 1702853448000}}
使用Streamkap轉換,所有記錄都可以轉換為用於攝入到Clickhouse DateTime64列的公共格式:
"someDateField": "yyyy-MM-dd HH:mm:ss.SSS"
拆分大型半結構化JSON文件
對於文件資料庫,子實體可以被建模為巢狀在父實體文件內部的子陣列:
{
"key": "abc1234",
"array": [
{
"id": "11111",
"someField": "aa-11"
},
{
"id": "22222",
"someField": "bb-22"
}
]
}
在ClickHouse中,將這些子實體表示為單獨的行是有意義的。使用Streamkap轉換,子實體記錄可以拆分為單獨的記錄:
{
"id": "11111",
"parentKey": "abc1234",
"someField": "aa-11"
}
{
"id": "22222",
"parentKey": "abc1234",
"someField": "bb-22"
}
模式演變
模式演變或漂移處理是對目標表進行更改以反映上遊更改的過程。
Streamkap連結器會自動處理以下情況的模式漂移。
額外列: 檢測到一個額外的欄位,並且將建立一個新的列來接收新的數據。
刪除列: 此列現在將被忽略,並且不會采取進一步的操作。
更改列型別: 在表中建立一個附加列,使用字尾表示新型別。例如。 ColumnName_type
可以在任何階段向管道中添加附加表。我們在下面展示了一些此模式演變的範例。
添加列
在模式演變之前考慮以下輸入記錄:
{
"id": "123456hYCcEM62894000000000",
"str_col": "some-str-values-000000000",
"IntColumn": 123000,
"Int8": 0,
"InT16": 10,
"bool_col": true,
"double_col": 1.7976931348623157E308,
"json_col": "{\"a\": 0}",
"binary_col": "AQIDBAU=",
"byte_buf": "AQIDBAU=",
"bigint_col": "E4f/////0tCeAA==",
"__deleted": false,
"created_at": 1702894985613,
"ts_tz": "2023-10-24T15:19:51Z",
"_streamkap_ts_ms": 1702894985613
}
一個新的列 new_double_col 被添加到上遊模式中。這導致ClickHouse模式演變:
{
"id": "123456hYCcEM62894xxx",
"str_col": "some-str-values-000000000",
"IntColumn": 123000,
"Int8": 0,
"InT16": 10,
"bool_col": true,
"double_col": 1.7976931348623157E308,
"json_col": "{\"a\": 0}",
"binary_col": "AQIDBAU=",
"byte_buf": "AQIDBAU=",
"bigint_col": "E4f/////0tCeAA==",
"__deleted": false,
"created_at": 1702894985613,
"ts_tz": "2023-10-24T15:19:51Z",
"_streamkap_ts_ms": 1702894985613,
"new_double_col": 1.7976931348623157E308
}
ClickHouse數據:
SELECT
id,
new_double_col
FROM streamkap_test_nominal_add_new_column
ORDERBY _streamkap_ts_ms ASC
┌─id─────────────────────────┬─new_double_col─┐
│ 123456hYCcEM62894000000000 │ 0 │
└────────────────────────────┴────────────────┘
┌─id───────────────────┬─────────new_double_col─┐
│ 123456hYCcEM62894xxx │ 1.7976931348623157e308 │
└──────────────────────┴────────────────────────┘
將Int演變為String
模式演變之前的輸入記錄:
{
"id": "123456hYCcEM62894000000000",
. . .
"IntColumn": 123000,
. . .
"_streamkap_ts_ms": 1702894492041
}
模式演變後攝入的新記錄:
{
"id": "123456hYCcEM62894xxx",
. . .
"IntColumn": "new-str-value",
. . .
}
ClickHouse數據,在IntColumn_str已添加的情況下:
SELECT
id,
IntColumn,
IntColumn_str
FROM streamkap_test_nominal_evolve_int2string
ORDERBY _streamkap_ts_ms ASC
┌─id─────────────────────────┬─IntColumn─┬─IntColumn_str─┐
│ 123456hYCcEM62894000000000 │ 123000 │ │
└────────────────────────────┴───────────┴───────────────┘
┌─id───────────────────┬─IntColumn─┬─IntColumn_str─┐
│ 123456hYCcEM62894xxx │ 0 │ new-str-value │
└──────────────────────┴───────────┴───────────────┘
效能
以下15分鐘的負載測試旨在展示與延遲相關的各種批次大小的效能特征。此外,我們將評估Streamkap ClickHouse目的地連結器的可伸縮性。
ClickHouse Cloud例項詳細資訊:每個32GiB的3個節點,每個節點有8個vCPU
輸入記錄格式包含基本型別,一個中等字串約100個字元和一個大字串約1000個字元:
select* from streamkap_test_nominal_perf limit 1 format Vertical;
id: 123456hYCcEM62894000000001
str_col: some-str-values-000000001
IntColumn: 123001
Int8: 1
InT16: 10
bool_col: true
double_col: 1.7976931348623157e308
json_col: {"a": 1}
__deleted: false
created_at: 1970-01-01 00:00:19.751
ts_tz: 2023-10-24 15:19:51.000
_streamkap_ts_ms: 1706539233685
binary_col: java.nio.HeapByteBuffer[pos=0 lim=5 cap=5]
byte_buf: java.nio.HeapByteBuffer[pos=0 lim=5 cap=5]
bigint_col: 92233720368547000000001
medium_str: str-medium-000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001
large_str: str-large-000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001
_streamkap_partition: 0
當前測試中將攝入模式設定為「upsert」。使用「append」時,吞吐量會稍微更好,因為不需要一些記憶體中的去重邏輯。
基線單個分區
單個Streamkap任務和Clickhouse分區的基線測試,具有多個批次大小。
吞吐量:
每個批次大小的延遲:
通常,用於回填的原始吞吐量是必需的,而延遲則不是一個關註點。在這種情況下,超過100k行的較大批次大小更合適。
通常,流式更改的吞吐量要求較低,並且可能希望延遲較小。在這種情況下,較小的批次大小更合適。
這些是使用固定批次大小的人工測試,以說明吞吐量和延遲之間的權衡。在實踐中,批次大小隨內部佇列大小而變化。如果有許多記錄在佇列中等待,批次大小將增長,因此,吞吐量將增加。
可伸縮性
以相同的批次大小進行測試:每個批次大小100,000條記錄,並逐漸增加任務的數量:1、2、4和8。我們可以看到,吞吐量與任務數量呈近似線性關系。
總結
這只是我們與ClickHouse合作的開始,在接下來的幾周裏,我們將繼續構建盡可能處理變更數據捕獲事件及其以上的最佳整合。
以下是我們希望獲得反饋的一些領域,以確定社群是否會投票支持這些領域:
使用allow_experimental_object_type=1
自動建立的材料化檢視,基於樣版
跨多個表的流式ACID事務
單記錄轉換
多記錄轉換(分割、連線、聚合)
確保一次性
希望這個連結器能夠使您更輕松地享受ClickHouse的優勢,就像我們一樣。
Streamkap和ClickHouse都提供免費試用;您可以在Streamkap.com和ClickHouse.com上註冊。