LakeSoul Flink CDC 整库千表同步入湖
LakeSoul 自 2.1.0 版本起,实现了 Flink CDC Sink,能够支持 Table API 及 SQL (单表),以及 Stream API (整库多表)。目前支持的上游数据源为 MySQL(5.6-8.0, PolarDB)、Oracle(11、12、19、21)、Postgresql(10-14, PolarDB)。入湖入口统一为 JdbcCdc。
主要功能特点
在 Stream API 中,LakeSoul Sink 主要功能点有:
- 支持整库千表(不同 schema)在同一个 Flink 作业中实时 CDC 同 步,不同表会自动写入 LakeSoul 对应表名中
- 对于 MySQL 和 PostgreSQL,支持 Schema 变更(DDL)自动同步到 LakeSoul,下游读取自动兼容新旧数据(目前支持增删列以及数值类型增加精度);
- 对于 Oracle,仅支持同步 schema 不再变更的表(由于 Flink CDC 2.4 的问题,列增加,列删除暂不支持),且不支持新表同步。
- MySQL 和 PostgreSQ L支持运行过程中上游数据库中新建表自动感知,在 LakeSoul 中自动建表;
- 支持严格一次(Exactly Once)语义,即使 Flink 作业发生 Failover,能够保证数据不丢不重;
- 提供 Flink 命令行启动入口类,支持指定库名、表名黑名单、并行度等参数;
命令行使用方法
1. 下载 LakeSoul Flink Jar
可以在 LakeSoul Release 页面下载:https://github.com/lakesoul-io/LakeSoul/releases/download/v2.6.0/lakesoul-flink-1.17-2.6.0.jar。
如果访问 Github 有问题,也可以通过这个链接下载:https://mirrors.huaweicloud.com/repository/maven/com/dmetasoul/lakesoul-flink/1.17-2.6.0/lakesoul-flink-1.17-2.6.0.jar
目前支持的 Flink 版本为 1.17。
2. 启动 Flink 作业
2.1 增加 LakeSoul 元数据库配置
在 $FLINK_HOME/conf/flink-conf.yaml
中增加如下配置:
containerized.master.env.LAKESOUL_PG_DRIVER: com.lakesoul.shaded.org.postgresql.Driver
containerized.master.env.LAKESOUL_PG_USERNAME: root
containerized.master.env.LAKESOUL_PG_PASSWORD: root
containerized.master.env.LAKESOUL_PG_URL: jdbc:postgresql://localhost:5432/test_lakesoul_meta?stringtype=unspecified
containerized.taskmanager.env.LAKESOUL_PG_DRIVER: com.lakesoul.shaded.org.postgresql.Driver
containerized.taskmanager.env.LAKESOUL_PG_USERNAME: root
containerized.taskmanager.env.LAKESOUL_PG_PASSWORD: root
containerized.taskmanager.env.LAKESOUL_PG_URL: jdbc:postgresql://localhost:5432/test_lakesoul_meta?stringtype=unspecified
注意这里 master 和 taskmanager 的环境变量都需要设置。
提示
PostgreSQL 数据库的连接信息、用户名密码需要根据实际情况修改。
警告
注意如果使用 Session 模式来启动作业,即将作业以 client 方式提交到 Flink Standalone Cluster,则 flink run
作为 client,是不会读取上面配置,因此需要再单独配置环境变量,即:
export LAKESOUL_PG_DRIVER=com.lakesoul.shaded.org.postgresql.Driver
export LAKESOUL_PG_URL=jdbc:postgresql://localhost:5432/test_lakesoul_meta?stringtype=unspecified
export LAKESOUL_PG_USERNAME=root
export LAKESOUL_PG_PASSWORD=root
2.2 启动同步作业
必填参数说明:
参数 | 含义 | 取值说明 |
---|---|---|
-c | 任务运行main函数入口类 | org.apache.flink.lakesoul.entry.JdbcCDC |
主程序包 | 任务运行jar包 | lakesoul-flink-1.17-2.6.0.jar |
--source_db.type | 源数据库类型 | mysql postgres oracle |
--source_db.host | 源数据库的地址,mongodb入湖则需要带上port | |
--source_db.port | 源数据库的端口,mongodb入湖不需要这个参数 | |
--source_db.user | 源数据库的用户名 | |
--source_db.password | 源数据库的密码 | |
--source.parallelism | 单表读取任务并行度,影响数据读取速度,值越大对 MySQL 压力越大 | 可以根据 MySQL 的写入 QPS 来调整并行度 |
--sink.parallelism | 单表写任务并行度,同时也是LakeSoul表主键分片的个数。影响入湖数据落地速度。值越大,小文件数越多,影响后续读取性能;值越小对写任务压力越大,发生数据倾斜可能性越大 | 可以根据最大表的数据量进行调整。一般建议一个并行度(主键分片)管理不超过1千万行数据。 |
--warehouse_path | 数据存储路径前缀(hdfs需要带上集群前缀) | LakeSoul 会将对应表数据写入到 warehouse_path/database_name/table_name/ 目录下 |
--flink.savepoint | Flink savepoint路径(hdfs需要带上集群前缀) | |
--flink.checkpoint | Flink checkpoint路径(hdfs需要带上集群前缀) |