本文 字数:13714 ; 估计阅读时间:35 分钟
审 校: 庄晓东(魏庄)
介绍
今天,欢迎来自我们的技术合作伙伴Streamkap的客座文章,Streamkap是一种针对ClickHouse开箱即用的变更数据捕捉(CDC-change data capture)解决方案。这篇博客深入探讨了构建这样一个产品的细节和挑战。对于那些只想要一个开箱即用CDC解决方案的ClickHouse用户,我们很高兴向你推荐Streamkap这个托管服务。
我们很高兴地宣布:新的ClickHouse连接器,可以从诸如PostgreSQL、MySQL、SQL Server、Oracle和MongoDB等数据库中流式传输CDC数据到ClickHouse。
Streamkap最近切换到使用ClickHouse来实时处理所有日志和指标,因为我们发现其他解决方案无法实现我们所需的查询性能。在采用ClickHouse之后,我们希望开始提供一个ClickHouse CDC集成,但发现现有的连接器存在问题,因此我们着手构建了一个新的连接器来解决这些问题。
在这篇文章中,我们假设您熟悉ClickHouse数据库和Change Data Capture(CDC)的概念,但如果不熟悉,您可以通过阅读有关流式变更数据捕捉的文章来了解更多。
我们将深入探讨为ClickHouse构建CDC解决方案的挑战,以及我们如何解决这些问题,讨论我们如何处理模式演变、数据一致性和快照。最后,我们将展示如何在保持性能的流式流程的同时实现所有需求。
技术
ClickHouse是一个开源的列式数据库。列导向结构意味着数据是按列而不是按行存储和检索的。ClickHouse已经成为构建实时应用程序的首选,因为它能够摄取大量数据,并且在写入时,而不是在读取时将数据物化。这导致查询速度显着加快,使ClickHouse适用于支撑实时应用程序。
Streamkap是一个无服务器流式传输平台,可以实现:实时的变更数据捕捉(CDC),并传入ClickHouse。在底层,诸如Kafka、Debezium、Flink之类的技术与生产级连接器/管道相结合。
下面是Streamkap从数据库流式传输到ClickHouse的概述。
挑战
当我们首次尝试将CDC数据流式传输到ClickHouse时,我们寻找了可以使用的现有连接器。在考察了官方的ClickHouse Kafka Connect连接器,以及市场上的其他连接器后,我们很快意识到:我们需要对其进行大量修改,才能支持不同的用例。意识到这些连接器需要进行大量修改,我们开始构建自己的解决方案。以下是我们需要确保:在将解决方案带入生产之前需要解决的一些关键需求。
数据类型
现有解决方案对数据类型的支持不太好:
嵌套结构
嵌套数组,包含嵌套结构的数组
具有微秒精度的时间戳
具有微秒精度的时间
不包含时间信息的日期(自历元以来的天数)
将JSON传输为普通字符串字段
元数据
在处理CDC数据时,添加额外的元数据列(如时间戳和CDC记录类型)很有帮助。这样可以使后续处理转换更简单、更强大,同时还能诊断延迟问题。
插入/更新
在Streamkap中,我们看到一半的客户希望使用插入或更新。插入是追加模式,因此保持所有更改的历史记录,而更新则仅显示最终数据(插入+更新)。虽然大多数公司在批量ETL中习惯了这种能力,但当与流式ETL结合使用时,这是一个新概念。了解更多关于批处理与实时处理的信息
模式(Schema)演变
当源表发生变化时,我们需要更新目标表以处理这种模式漂移,以避免导致管道中断。
半结构化数据
像MongoDB/Elasticsearch这样的源允许复杂的嵌套记录结构中存在不一致性,需要在插入到ClickHouse之前由摄取管道进行调和。例如:
日期/时间在某些记录中表示为数字(自纪元以来的秒/毫秒)而在其他记录中表示为字符串(ISO格式)
在某些记录中,嵌套字段是字符串,而在其他记录中是更复杂的嵌套结构
深度嵌套的复杂半结构化数据通常需要在插入到ClickHouse之前进行预处理,将其映射到适当的类型,例如元组、嵌套。
我们的方法
现在让我们深入研究我们的连接器,以及如何解决这些挑战。
数据类型
我们发现默认方法通常是将数据插入到ClickHouse中,然后在加载后转换数据。
我们内置支持以下数据类型:
Kafka Connect Data Type | ClickHouse Data type |
---|---|
INT8 | Int8 |
INT16 | Int16 |
INT32 | Int32 |
INT64 | Int64 |
FLOAT32 | Float32 |
FLOAT64 | Float64 |
BOOLEAN | Bool |
BYTES | BLOB (String) |
STRING | String |
org.apache.kafka.connect.data.Decimal | DECIMAL(38, 0) |
org.apache.kafka.connect.data.Timestampio.debezium.time.ZonedTimestamp | DateTime64 |
org.apache.kafka.connect.data.Date | Date |
io.debezium.data.Json | String |
STRUCT | Tuple |
ARRAY | Array |
JSON字段当前作为字符串进行摄取,
allow_experimental_object_type=1
的使用目前正在测试中。
元数据
连接器为每个插入到ClickHouse表中的插入添加了额外的关键列,以便在加载后进行更好的分析和建模,以及支持更新。
以下元数据列被添加到每个ClickHouse表中:
_streamkap_ts_ms :CDC事件时间戳
_streamkap_deleted :如果当前的CDC事件是一个删除事件
_streamkap_partition :表示内部Streamkap分区编号,通过对源记录的键字段应用一致性哈希获取
_streamkap_source_ts_ms :变更事件在源数据库中发生的时间戳
_streamkap_op :CDC事件操作类型(c insert, u update, d delete, r snapshot, t truncate)
插入/更新
Streamkap连接器支持两种将数据摄入到ClickHouse的模式: 插入 (追加)和 更新 。
更新模式是我们连接器的默认模式,当需要ClickHouse表包含源数据的最新版本时使用。
插入(追加)模式
插入模式导致每个变更都被跟踪,并作为新行插入到ClickHouse中,而删除事件将在ClickHouse中标记为已删除,使用元值 _streamkap_deleted 。
这在处理较大的数据量时很有用,可以保持延迟较低,并保持更改的历史记录。
例如,Streamkap在收集我们的指标时使用插入模式,因为只有不可变数据被插入。
然后,我们在指标表上使用Materialized Views创建了许多聚合,以进行时间序列分析。对这个表设置一个合适的TTL,以便ClickHouse为我们处理删除操作,同时提供足够的历史数据来调查任何问题,或者如果我们必须出于某种原因重建Materialized Views。
要使用插入(追加)模式,使用ClickHouse引擎MergeTree。
更新模式
更新是插入和更新的组合。如果行的主键有匹配,值将被覆盖。相反,如果没有匹配,事件将被插入。
更新模式使用ClickHouse的ReplacingMergeTree引擎实现。
ReplacingMergeTree引擎根据排序键在周期性的后台合并中去重数据,允许清理旧记录。这个过程的异步性意味着可能会有一个小窗口,留下了视图中的旧记录。因此,查询必须使用FINAL修饰符来确保返回数据的最新版本,然后在查询时对任何剩余的相同记录进行去重。
带有基本类型的更新示例
这里以JSON格式显示了一个用于更新的输入记录。键只有一个字段,id,这是将对行进行去重的主键:
{
"id": "123456hYCcEM62894000000000",
"str_col": "some-str-values-000000000",
"IntColumn": 123000,
"Int8": 0,
"InT16": 10,
"bool_col": true,
"double_col": 1.7976931348623157E308,
"json_col": "{\"a\": 0}",
"__deleted": false,
"created_at": 1707379532748,
"date_col": 19761,
"ts_tz": "2023-10-24T15:19:51Z",
"_streamkap_ts_ms": 1707379532748,
"binary_col": "AQIDBAU=",
"byte_buf": "AQIDBAU=",
"bigint_col": "E4f/////0tCeAA=="
}
结果表:
SHOWCREATETABLE streamkap_test_nominal_upsert
FORMAT Vertical
Queryid: 1abf2898-69b3-4785-a849-65c3879493bb
Row1:
──────
statement: CREATETABLE streamkap.streamkap_test_nominal_upsert
(
`id`StringCOMMENT'id',
`str_col`StringCOMMENT'str_col',
`IntColumn` Int32 COMMENT'IntColumn',
`Int8`Int8COMMENT'Int8',
`InT16` Int16 COMMENT'InT16',
`bool_col`BoolCOMMENT'bool_col',
`double_col` Float64 COMMENT'double_col',
`json_col`StringCOMMENT'json_col',
`__deleted`BoolCOMMENT'__deleted',
`created_at` DateTime64(3) COMMENT'created_at',
`date_col`DateCOMMENT'date_col',
`ts_tz` DateTime64(3) COMMENT'ts_tz',
`_streamkap_ts_ms` Int64 COMMENT'_streamkap_ts_ms',
`binary_col`StringCOMMENT'binary_col',
`byte_buf`StringCOMMENT'byte_buf',
`bigint_col`Decimal(38, 0) COMMENT'bigint_col',
`_streamkap_partition` Int32 COMMENT'_streamkap_partition',
`_streamkap_deleted` UInt8 MATERIALIZEDif(__deleted = true, 1, 0)
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{uuid}/{shard}', '{replica}', _streamkap_ts_ms, _streamkap_deleted)
PARTITIONBY _streamkap_partition
PRIMARY KEYid
ORDERBYid
SETTINGS index_granularity = 8192
示例数据:
SELECT*
FROMstreamkap_test_nominal_upsert
FORMATVertical
Row1:
──────
id: 123456hYCcEM62894000000000
str_col: some-str-values-000000000
IntColumn: 123000
Int8: 0
InT16: 10
bool_col: true
double_col: 1.7976931348623157e308
json_col: {"a": 0}
__deleted: false
created_at: 2024-02-08 08:03:37.368
date_col: 2024-02-08
ts_tz: 2023-10-24 15:19:51.000
_streamkap_ts_ms: 1707379417368
binary_col:
byte_buf:
bigint_col: 92233720368547000000000
_streamkap_partition: 0
Row2:
──────
id: 123456hYCcEM62894000000000
str_col: some-str-values-000000000
IntColumn: 123000
Int8: 0
InT16: 10
bool_col: true
double_col: 1.7976931348623157e308
json_col: {"a": 0}
__deleted: false
created_at: 2024-02-08 08:03:41.608
date_col: 2024-02-08
ts_tz: 2023-10-24 15:19:51.000
_streamkap_ts_ms: 1707379421608
binary_col: java.nio.HeapByteBuffer[pos=0 lim=5 cap=5]
byte_buf: java.nio.HeapByteBuffer[pos=0 lim=5 cap=5]
bigint_col: 92233720368547000000000
_streamkap_partition: 0
去重数据,使用 FINAL :
SELECT*
FROMstreamkap_test_nominal_upsert
FINAL
FORMATVertical
Row1:
──────
id: 123456hYCcEM62894000000000
str_col: some-str-values-000000000
IntColumn: 123000
Int8: 0
InT16: 10
bool_col: true
double_col: 1.7976931348623157e308
json_col: {"a": 0}
__deleted: false
created_at: 2024-02-08 08:03:41.608
date_col: 2024-02-08
ts_tz: 2023-10-24 15:19:51.000
_streamkap_ts_ms: 1707379421608
binary_col: java.nio.HeapByteBuffer[pos=0 lim=5 cap=5]
byte_buf: java.nio.HeapByteBuffer[pos=0 lim=5 cap=5]
bigint_col: 92233720368547000000000
_streamkap_partition: 0
处理半结构化数据
嵌套数组和结构体
下面,我们提供了一些如何将复杂结构自动映射到ClickHouse类型的示例。
为了支持包含结构的数组,我们需要修改Streamkap在ClickHouse中的角色,将flatten_nested设置为0:
ALTERROLE STREAMKAP_ROLE SETTINGS flatten_nested = 0;
包含子数组的嵌套结构字段
这里以JSON格式显示了一个输入记录,键只有一个字段 id :
{
"id": 1,
"obj": {
"nb": 123,
"str": "abc",
"sub_arr": [
{
"sub_nb": 789,
"sub_str": "mnp"
}
]
}
}
结果表。注意 obj 列已经被映射到一个 Tuple(nb Int32, str String, sub_arr Array(Tuple(n Int32, s String)), sub_arr_str Array(String)) 来处理复杂的结构:
SHOWCREATETABLE chdb.streamkap_nested_struct_with_array
CREATETABLE chdb.streamkap_nested_struct_with_array
(
`obj` Tuple(nb Int32, strString, sub_arr Array(Tuple(n Int32, s String)), sub_arr_str Array(String)) COMMENT'obj',
`__deleted`BoolCOMMENT'__deleted',
`_streamkap_ts_ms` Int64 COMMENT'_streamkap_ts_ms',
`_streamkap_partition` Int32 COMMENT'_streamkap_partition',
`id` Int32 COMMENT'id',
`_streamkap_deleted` UInt8 MATERIALIZEDif(__deleted = true, 1, 0)
)
ENGINE = ReplacingMergeTree(_streamkap_ts_ms, _streamkap_deleted)
PARTITIONBY _streamkap_partition
PRIMARY KEYid
ORDERBYid
SETTINGS index_granularity = 8192
示例数据:
SELECT *
FROM chdb.streamkap_nested_struct_with_array
LIMIT1format Vertical
obj: (123,'abc',[(789,'mnp')],['efg'])
__deleted: false
_streamkap_ts_ms: 1702519029407
_streamkap_partition: 0
id: 1
包含子结构的嵌套数组字段
这里以JSON格式显示了一个输入记录,键只有一个字段 id :
{
"id": 1,
"arr": [
{
"nb": 123,
"str": "abc"
}
]
}
SHOWCREATETABLE streamkap_nested_array_of_struct
CREATETABLE streamkap.streamkap_nested_array_of_struct
(
`arr`Array(Tuple(nb Int32, strString)) COMMENT'arr',
`__deleted`BoolCOMMENT'__deleted',
`_streamkap_ts_ms` Int64 COMMENT'_streamkap_ts_ms',
`_streamkap_partition` Int32 COMMENT'_streamkap_partition',
`id` Int32 COMMENT'id',
`_streamkap_deleted` UInt8 MATERIALIZEDif(__deleted = true, 1, 0)
)
ENGINE = ReplacingMergeTree( _streamkap_ts_ms, _streamkap_deleted)
PARTITIONBY _streamkap_partition
PRIMARY KEYid
ORDERBYid
SETTINGS index_granularity = 8192
示例数据:
SELECT*
FROMstreamkap_nested_array_of_struct
LIMIT1 format Vertical
arr: [(123,'abc')]
__deleted: false
_streamkap_ts_ms: 1702529856885
_streamkap_partition: 0
id: 1
快照
快照是指将现有数据从数据库加载到ClickHouse的过程。
我们有两种方法可以加载这些历史数据。
阻塞快照
阻塞快照的目的是捕获数据库表的整个当前状态,并将使用大型的select语句来实现。这些也可以并发运行,速度非常快。从效率上讲,阻塞快照可能会对系统资源产生较大的影响,特别是对于大型表,每个查询可能需要更长的时间。
增量快照
增量快照旨在提高效率,对系统资源的影响通常较小,并且特别适用于非常大的表格或希望同时进行快照和流式传输的情况。
数据一致性和交付保证
交付保证主要是指在出现故障场景时,未确认的CDC事件可能被重播,导致重复的行插入到ClickHouse中。
Streamkap为ClickHouse提供至少一次交付保证。
使用插入摄入模式,可能会将一些重复的行插入到ClickHouse中。然而,通过在materialized view中添加去重代码,不会有任何影响。
如前所述,对于更新摄入模式,我们通过源记录键进行去重。强制使用一次性交付保证会增加性能开销,而没有额外的好处,因为相同的进程处理重复的CDC事件,将所有CDC事件合并为一个记录的最终记录状态。
变换
Streamkap支持管道中的变换,因此数据可以在发送到ClickHouse之前进行预处理。
这对于半结构化数据、预处理和清理任务特别有用。这可能比在摄入后处理数据效率更高。
在清理过的结构化数据上进行实时分析自然是在ClickHouse中完成的,查询性能受益于将数据转换移到插入时间。
下面,我们介绍了Streamkap执行的一些常见转换。
修复半结构化数据中的不一致性
考虑修复一个不一致的半结构化日期字段:
"someDateField": {"$date": "2023-08-04T09:12:20.29Z"}
"someDateField": "2023-08-07T08:14:57.817325+00:00"
"someDateField": {"$date": {"$numberLong": 1702853448000}}
使用Streamkap转换,所有记录都可以转换为用于摄入到Clickhouse DateTime64列的公共格式:
"someDateField": "yyyy-MM-dd HH:mm:ss.SSS"
拆分大型半结构化JSON文档
对于文档数据库,子实体可以被建模为嵌套在父实体文档内部的子数组:
{
"key": "abc1234",
"array": [
{
"id": "11111",
"someField": "aa-11"
},
{
"id": "22222",
"someField": "bb-22"
}
]
}
在ClickHouse中,将这些子实体表示为单独的行是有意义的。使用Streamkap转换,子实体记录可以拆分为单独的记录:
{
"id": "11111",
"parentKey": "abc1234",
"someField": "aa-11"
}
{
"id": "22222",
"parentKey": "abc1234",
"someField": "bb-22"
}
模式演变
模式演变或漂移处理是对目标表进行更改以反映上游更改的过程。
Streamkap连接器会自动处理以下情况的模式漂移。
额外列: 检测到一个额外的字段,并且将创建一个新的列来接收新的数据。
删除列: 此列现在将被忽略,并且不会采取进一步的操作。
更改列类型: 在表中创建一个附加列,使用后缀表示新类型。例如。 ColumnName_type
可以在任何阶段向管道中添加附加表。我们在下面展示了一些此模式演变的示例。
添加列
在模式演变之前考虑以下输入记录:
{
"id": "123456hYCcEM62894000000000",
"str_col": "some-str-values-000000000",
"IntColumn": 123000,
"Int8": 0,
"InT16": 10,
"bool_col": true,
"double_col": 1.7976931348623157E308,
"json_col": "{\"a\": 0}",
"binary_col": "AQIDBAU=",
"byte_buf": "AQIDBAU=",
"bigint_col": "E4f/////0tCeAA==",
"__deleted": false,
"created_at": 1702894985613,
"ts_tz": "2023-10-24T15:19:51Z",
"_streamkap_ts_ms": 1702894985613
}
一个新的列 new_double_col 被添加到上游模式中。这导致ClickHouse模式演变:
{
"id": "123456hYCcEM62894xxx",
"str_col": "some-str-values-000000000",
"IntColumn": 123000,
"Int8": 0,
"InT16": 10,
"bool_col": true,
"double_col": 1.7976931348623157E308,
"json_col": "{\"a\": 0}",
"binary_col": "AQIDBAU=",
"byte_buf": "AQIDBAU=",
"bigint_col": "E4f/////0tCeAA==",
"__deleted": false,
"created_at": 1702894985613,
"ts_tz": "2023-10-24T15:19:51Z",
"_streamkap_ts_ms": 1702894985613,
"new_double_col": 1.7976931348623157E308
}
ClickHouse数据:
SELECT
id,
new_double_col
FROM streamkap_test_nominal_add_new_column
ORDERBY _streamkap_ts_ms ASC
┌─id─────────────────────────┬─new_double_col─┐
│ 123456hYCcEM62894000000000 │ 0 │
└────────────────────────────┴────────────────┘
┌─id───────────────────┬─────────new_double_col─┐
│ 123456hYCcEM62894xxx │ 1.7976931348623157e308 │
└──────────────────────┴────────────────────────┘
将Int演变为String
模式演变之前的输入记录:
{
"id": "123456hYCcEM62894000000000",
. . .
"IntColumn": 123000,
. . .
"_streamkap_ts_ms": 1702894492041
}
模式演变后摄入的新记录:
{
"id": "123456hYCcEM62894xxx",
. . .
"IntColumn": "new-str-value",
. . .
}
ClickHouse数据,在IntColumn_str已添加的情况下:
SELECT
id,
IntColumn,
IntColumn_str
FROM streamkap_test_nominal_evolve_int2string
ORDERBY _streamkap_ts_ms ASC
┌─id─────────────────────────┬─IntColumn─┬─IntColumn_str─┐
│ 123456hYCcEM62894000000000 │ 123000 │ │
└────────────────────────────┴───────────┴───────────────┘
┌─id───────────────────┬─IntColumn─┬─IntColumn_str─┐
│ 123456hYCcEM62894xxx │ 0 │ new-str-value │
└──────────────────────┴───────────┴───────────────┘
性能
以下15分钟的负载测试旨在展示与延迟相关的各种批量大小的性能特征。此外,我们将评估Streamkap ClickHouse目的地连接器的可伸缩性。
ClickHouse Cloud实例详细信息:每个32GiB的3个节点,每个节点有8个vCPU
输入记录格式包含基本类型,一个中等字符串约100个字符和一个大字符串约1000个字符:
select* from streamkap_test_nominal_perf limit 1 format Vertical;
id: 123456hYCcEM62894000000001
str_col: some-str-values-000000001
IntColumn: 123001
Int8: 1
InT16: 10
bool_col: true
double_col: 1.7976931348623157e308
json_col: {"a": 1}
__deleted: false
created_at: 1970-01-01 00:00:19.751
ts_tz: 2023-10-24 15:19:51.000
_streamkap_ts_ms: 1706539233685
binary_col: java.nio.HeapByteBuffer[pos=0 lim=5 cap=5]
byte_buf: java.nio.HeapByteBuffer[pos=0 lim=5 cap=5]
bigint_col: 92233720368547000000001
medium_str: str-medium-000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001
large_str: str-large-000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001
_streamkap_partition: 0
当前测试中将摄入模式设置为「upsert」。使用「append」时,吞吐量会稍微更好,因为不需要一些内存中的去重逻辑。
基线单个分区
单个Streamkap任务和Clickhouse分区的基线测试,具有多个批量大小。
吞吐量:
每个批量大小的延迟:
通常,用于回填的原始吞吐量是必需的,而延迟则不是一个关注点。在这种情况下,超过100k行的较大批量大小更合适。
通常,流式更改的吞吐量要求较低,并且可能希望延迟较小。在这种情况下,较小的批量大小更合适。
这些是使用固定批量大小的人工测试,以说明吞吐量和延迟之间的权衡。在实践中,批量大小随内部队列大小而变化。如果有许多记录在队列中等待,批量大小将增长,因此,吞吐量将增加。
可伸缩性
以相同的批量大小进行测试:每个批量大小100,000条记录,并逐渐增加任务的数量:1、2、4和8。我们可以看到,吞吐量与任务数量呈近似线性关系。
总结
这只是我们与ClickHouse合作的开始,在接下来的几周里,我们将继续构建尽可能处理变更数据捕获事件及其以上的最佳集成。
以下是我们希望获得反馈的一些领域,以确定社区是否会投票支持这些领域:
使用allow_experimental_object_type=1
自动创建的材料化视图,基于模板
跨多个表的流式ACID事务
单记录转换
多记录转换(分割、连接、聚合)
确保一次性
希望这个连接器能够使您更轻松地享受ClickHouse的优势,就像我们一样。
Streamkap和ClickHouse都提供免费试用;您可以在Streamkap.com和ClickHouse.com上注册。