跳到主要内容

通过 Spark Streaming 导入 LakeSoul CDC 表

1. CDC 入湖介绍

LakeSoul 提供了一套独立的 CDC 语义表达规范,通过表属性设置一个 CDC Op 列,即可表示每条数据的操作类型,在后续 Merge 时会自动根据操作语义进行合并。可以通过 Debezium、Canal、Flink 等将 CDC 数据转换后导入 LakeSoul。这里提供一个端到端的完整示例。

2. Mysql 设置

2.1 创建库和测试表

Create database cdc
CREATE TABLE test(
id int primary key,
rangeid int,
value varchar(100)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

2.2 Benchmark 数据生成器

等流程全部组件搭建完后,使用MysqlBenchMark进行测试

使用步骤:

  • unzip MysqlBenchmark.zip

  • 手工修改mysqlcdc.conf相关配置

    ###Mysql configure###
    user=Mysql用户名
    passwd=密码
    host=Mysql主机
    port=Mysql端口
  • 往指定表插入数据(最后使用)

    bash MysqlCdcBenchmark.sh  insert  cdc(库名) test(表名) 10(插入行数) 1(线程数)
  • 更新指定表数据(最后使用)

    bash MysqlCdcBenchmark.sh  update  cdc(库名) test(表名) id(主键) value(更新列) 10(更新行数)
  • 删除指定表数据(最后使用)

    bash  MysqlCdcBenchmark.sh  delete  cdc(库名)  test(表名)  10(删除行数)

3. Kafka 安装和设置

3.1 安装 Kafka(已有请忽略)

  • 通过 Kafka K8s operator-Deploying and Upgrading (0.28.0) (strimzi.io)

    kubectl create -f install/cluster-operator -n my-cluster-operator-namespace
    kubectl apply -f examples/kafka/kafka-persistent-single.yaml
  • 其他方法安装请参考官方文档 - Apache Kafka

4. Debezium 安装设置

4.1 安装(已有请忽略)

  • 通过 K8s 来快速启动一个 Debezium 服务的容器 pod,pod 配置

    apiVersion: v1
    kind: PersistentVolumeClaim
    metadata:
    name: dbz-pod-claim
    spec:
    accessModes:
    - ReadWriteOnce
    # 这里填写 k8s 集群中 storage class
    storageClassName:
    resources:
    requests:
    storage: 10Gi
    ---
    apiVersion: v1
    kind: Pod
    metadata:
    name: dbz-pod
    namespace: dmetasoul
    spec:
    restartPolicy: Never
    containers:
    - name: dbs
    image: debezium/connect:latest
    env:
    - name: BOOTSTRAP_SERVERS
    # 这里替换成实际 kafka 的 IP 地址
    value: ${kafka_host}:9092
    - name: GROUP_ID
    value: "1"
    - name: CONFIG_STORAGE_TOPIC
    value: my_connect_configs
    - name: OFFSET_STORAGE_TOPIC
    value: my_connect_offsets
    - name: STATUS_STORAGE_TOPIC
    value: my_connect_statuses
    resources:
    requests:
    cpu: 500m
    memory: 4Gi
    limits:
    cpu: 4
    memory: 8Gi
    volumeMounts:
    - mountPath: "/kafka/data"
    name: dbz-pv-storage

    volumes:
    - name: dbz-pv-storage
    persistentVolumeClaim:
    claimName: dbz-pod-claim

    kubectl apply -f pod.yaml
    # 注:确认storage和namespace信息
  • 其他-参考官网 Debezium Release Series 1.8

4.2 使用 Debezium 创建CDC同步任务

  • 对接 Mysql 和 kafka 并创建 Debezium CDC 同步任务 (dbzhost 需要替换为 debezium 服务 IP)

    curl -X POST http://dbzhost:8083/connectors/ -H 'Cache-Control: no-cache' -H 'Content-Type: application/json' -d '{
    "name": "cdc",
    "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "tasks.max": "1",
    "database.hostname": "mysqlhost",
    "database.port": "mysqlport",
    "database.user": "mysqluser",
    "database.password": "mysqlpassword",
    "database.server.id": "1",
    "database.server.name": "cdcserver",
    "database.include.list": "cdc",
    "database.history.kafka.bootstrap.servers": "kafkahost:9092",
    "database.history.kafka.topic": "schema-changes.cdc",
    "decimal.handling.mode": "double",
    "table.include.list":"cdc.test"
    }
    }'

    注:host 等字段需要根据实际情况修改;输出到kafka topic为"database.server.name". "table.include.list"两个字段值的拼接

  • 查询 CDC 同步任务是否创建成功

    curl -H "Accept:application/json" dbzhost:8083 -X GET http://dbzhost:8083/connectors/
  • 测试完成后可以删除同步任务

    curl -i -X DELETE http://dbzhost:8083/connectors/cdc

5. Spark + Lakesoul CDC 入湖

5.1 安装

请参考 快速开始

5.2 使用 Spark Streaming,消费 Kafka 数据并同步更新至 LakeSoul

  • Spark-shell启动

    ./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2 --conf spark.sql.extensions=com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog

    关于其他元数据库配置,可以参考 配置 LakeSoul 元数据库设置 Spark 工程作业

  • 创建lakesoul表

    我们创建一个 LakeSoul 表 MysqlCdcTest,这个表会准实时流批一体地同步 MySQL 的数据。这个表同样使用 id 列作为主键,用 "op" 列表示CDC的更新。并且我们需要通过 lakesoul_cdc_change_column 这个表属性,指定 LakeSoul 表中,表示 CDC 状态更新的列名,这个示例中该列的名字为 "op"。

    import com.dmetasoul.lakesoul.tables.LakeSoulTable
    val path="/opt/spark/cdctest"
    val data=Seq((1L,1L,"hello world","insert")).toDF("id","rangeid","value","op")
    LakeSoulTable.createTable(data, path).shortTableName("cdc").hashPartitions("id").hashBucketNum(2).rangePartitions("rangeid").tableProperty("lakesoul_cdc_change_column" -> "op").create()
  • 启动 streaming 写入 LakeSoul

    读取 kafka,转换 Debezium 读取出的 json 格式,使用lakesoul upsert更新 LakeSoul 表:

    import com.dmetasoul.lakesoul.tables.LakeSoulTable
    val path="/opt/spark/cdctest"
    val lakeSoulTable = LakeSoulTable.forPath(path)
    var strList = List.empty[String]

    //js1 是示例数据,我们这里也用于生成schema,在下文from_json函数中转换数据使用,before和after中内容对应于mysql表字段
    val js1 = """{
    | "before": {
    | "id": 2,
    | "rangeid": 2,
    | "value": "sms"
    | },
    | "after": {
    | "id": 2,
    | "rangeid": 2,
    | "value": "sms"
    | },
    | "source": {
    | "version": "1.8.0.Final",
    | "connector": "mysql",
    | "name": "cdcserver",
    | "ts_ms": 1644461444000,
    | "snapshot": "false",
    | "db": "cdc",
    | "sequence": null,
    | "table": "sms",
    | "server_id": 529210004,
    | "gtid": "de525a81-57f6-11ec-9b60-fa163e692542:1621099",
    | "file": "binlog.000033",
    | "pos": 54831329,
    | "row": 0,
    | "thread": null,
    | "query": null
    | },
    | "op": "c",
    | "ts_ms": 1644461444777,
    | "transaction": null
    |}""".stripMargin
    strList = strList :+ js1
    val rddData = spark.sparkContext.parallelize(strList)
    val resultDF = spark.read.json(rddData)
    val sche = resultDF.schema

    import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

    //对接kafka 需要指定kafka.bootstrap.server ip地址和debezium输出到kafka的topic
    val kfdf = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafkahost:9092")
    .option("subscribe", "cdcserver.cdc.test")
    .option("startingOffsets", "latest")
    .load()

    //解析debezium中产生的json,对于Mysql insert、update、delete操作会自动生成一列,列名op
    val kfdfdata = kfdf
    .selectExpr("CAST(value AS STRING) as value")
    .withColumn("payload", from_json($"value", sche))
    .filter("value is not null")
    .drop("value")
    .select("payload.after", "payload.before", "payload.op")
    .withColumn(
    "op",
    when($"op" === "c", "insert")
    .when($"op" === "u", "update")
    .when($"op" === "d", "delete")
    .otherwise("unknown")
    )
    .withColumn(
    "data",
    when($"op" === "insert" || $"op" === "update", $"after")
    .when($"op" === "delete", $"before")
    )
    .drop($"after")
    .drop($"before")
    .select("data.*", "op")

    //使用lakesoul upsert更新表中数据并在屏幕上输出解析后的数据
    kfdfdata.writeStream
    .foreachBatch { (batchDF: DataFrame, _: Long) =>
    {
    lakeSoulTable.upsert(batchDF)
    batchDF.show
    }
    }
    .start()
    .awaitTermination()
  • 模拟Mysql负载

    参考上面的 Mysql Benchmark 工具,对线上 MySQL 表进行增、改、删操作 查看lakesoul数据

    import com.dmetasoul.lakesoul.tables.LakeSoulTable
    val path="/opt/spark/cdctest"
    val lakeSoulTable = LakeSoulTable.forPath(path)
    lakeSoulTable.toDF.select("*").show()