Use LakeSoul CDC Table Format
CDC (Change Data Capture) is an important data source for Lakehouse. The goal of LakeSoul CDC table format is to sync the change of online OLTP database into LakeSoul in a very low latency, usually several minutes, manner so that the downstream analytics could get the newest results as soon as possible without the need of tranditional T+1 database dump. Compared to normal table, CDC table format supports delete row capability.
LakeSoul uses an extra change operation column (column name is configurable) to model the CDC data and can consume the CDC sources including Debezium, canal as well as Flink CDC. By default LakeSoul would not enable this format, that means normal table would only support upsert operations, but not delete. To enable CDC format, you need to add extra property to enable it when creating table.
To create a LakeSoul CDC table, add a table property lakesoul_cdc_change_column
with the column name that records the change type. This column should be of string
type and contains one of the three values: update
, insert
, delete
.
During the automatic merging phase of LakeSoul batch read data (including using Spark/Flink batch read), the latest insert
, update
data will be kept, and delete
rows will be automatically filtered out. When using Spark/Flink streaming incremental reading, the value of the CDC change column (including insert
, update
, delete
) will be retained. In Flink, this column will be automatically converted to corresponding value of the RowKind field of the RowData class object, so that LakeSoul fully supports the Flink Changelog Stream semantics during Flink stream reading, and can perform incremental calculations.
Create LakeSoul Table with CDC Format
In Spark
Use Scala API or SQL, assuming change operation column name is change_type
:
- Scala
- SQL
import com.dmetasoul.lakesoul.tables.LakeSoulTable
LakeSoulTable.createTable(data, path).shortTableName("cdc_ingestion").hashPartitions("id").hashBucketNum(2).rangePartitions("rangeid").tableProperty("lakesoul_cdc_change_column" -> "change_type").create()
CREATE TABLE table_name (id string, date string, change_type string) USING lakesoul
PARTITIONED BY (date)
LOCATION 's3://lakesoul-bucket/table_path'
TBLPROPERTIES('lakesoul_cdc_change_column'='change_type',
'hashPartitions'='id',
'hashBucketNum'='2');
Note that LakeSoul CDC ingestion table must have primary key(s) and the primary keys(s) should be the same with the online OLTP table.
In Flink
Please refer to Flink Connector
Incremental Read for LakeSoul CDC Table
The LakeSoul adopts the primary key sharding mode for incremental upsert, so the incremental data does not need to be merged with the stock data when writing. For CDC tables, the delta data is the content of the original CDC stream. The CDC incremental read of the LakeSoul table can fully retain the CDC operation flags, namely insert, update, and delete.
Version 2.2.0 supports incremental streaming reading in Spark.
From version 2.3.0, Flink Table Source is supported, and streaming incremental reading/writing is supported for Flink ChangeLog Stream semantics, please refer to Flink Connector .