跳到主要内容

LakeSoul CDC 表格式

CDC (Change Data Capture) 是湖仓重要的数据源之一. LakeSoul CDC 表的目标是能够从在线 OLTP 数据库快速同步数据到 LakeSoul 表中,从而下游分析计算任务在较小的时间间隔后就可以读到在线数据库的同步数据,消除了传统 T+1 复制的开销。相比普通支持 Upsert 的表,CDC 表额外支持了删除行的操作。

LakeSoul CDC 表使用一个额外的操作列(列名可以自定义)来记录 CDC 的操作类型,可以支持从 Debezium, canalFlink CDC 中导入 CDC 数据。LakeSoul 默认建表并不会启用 CDC 表格式,默认表仅支持 Upsert 操作。要开启对 CDC 的支持,需要在建表时增加额外的属性。

创建 LakeSoul CDC 表,需要添加一个表属性 lakesoul_cdc_change_column 来配置 CDC 变更类型的列名。这一列需要是 string 类型,包含三种取值之一: update, insert, delete.

在 LakeSoul 批量读数据的自动合并阶段(包括使用 Spark/Flink 批式读取),会保留最新的 insertupdate 数据,并自动过滤掉 delete 的行。而使用 Spark/Flink 流式增量读取时,会保留 CDC 变更列的值(也即包含了 insert, update, delete),在 Flink 中,这一列会被自动转换为 Flink RowData 对象的 RowKind 字段的对应值,从而在 Flink 流式读取时完整支持了 Flink Changelog Stream 语义,能够进行增量计算。

创建 LakeSoul CDC 表

使用 Spark

使用 Spark Scala API 或者 SQL,假设操作类型列名为 change_type:

import com.dmetasoul.lakesoul.tables.LakeSoulTable
LakeSoulTable.createTable(data, path).shortTableName("cdc_ingestion").hashPartitions("id").hashBucketNum(2).rangePartitions("rangeid").tableProperty("lakesoul_cdc_change_column" -> "change_type").create()

注意 LakeSoul CDC 表必须是主键表,并且主键列需要和 CDC 上游数据库表相同。

请参考 Flink Connector

LakeSoul CDC 表的增量读取

LakeSoul 的增量采用的是主键分片的模式,因此增量数据写入时不需要与存量数据做合并操作。对于 CDC 表,增量数据就是原始的 CDC 流的内容。对 LakeSoul 表的 CDC 增量读取,可以完整保留 CDC 操作标记,即 insert、update、delete。2.2.0 版本起在 Spark 中已经支持了增量流式读取。

2.3.0 版本起,支持了 Flink Table Source,支持 Flink ChangeLog Stream 的流式增量读写,请参考 Flink Connector