跳到主要内容

· 阅读需 6 分钟

近期,湖仓框架 LakeSoul 发布了 2.3.0 版本。本次新版本发布是 LakeSoul 进入 Linux 基金会孵化后的第一次版本发布。本次新版本新增 Flink SQL/Table API,支持流、批的读写。重构了多表实时入湖的 Flink DataStream API,以更好地支持多种数据源的实时入湖。新增了全局自动小文件合并服务。

LakeSoul 在 2.3.0 版本中,全面支持了 Flink SQL/Table API,同时支持流、批两种方式,对 LakeSoul 表进行读或写。在流式的读取或写入时,LakeSoul 完整地支持了 Flink Changelog Stream 语义。

在流式写入时,可以对接多种 CDC 采集工具,包括 Debezium、 Flink CDC Connector,支持行级别 upsert、delete;在读取时,对于 Append 或 Upsert 方式增量更新的表,均可流式增量读取,并在 Flink 中进行增量计算。同时,LakeSoul 也支持 Flink 批计算模式,能够支持批量 upsert、全量读取、快照读取等功能。

使用 LakeSoul + Flink SQL,可以很方便地在数据湖上构建大规模、低成本、高性能的实时数仓。具体使用方法可以参考 Flink SQL 文档

LakeSoul 从 2.1 版本起能够支持数据库整库同步,并提供了 MySQL 整库自动同步工具

在本次 2.3 版本更新中,我们重构了整库同步时的 DDL 解析方式。具体来说,LakeSoul 在进行整库同步时不再需要上游提供 DDL 事件,或去上游数据库获取表的 Schema 等信息,而是直接从同步的数据中解析是否出现新表、是否已有表的 Schema 发生了变更。遇到新表或 Schema 变更时,会自动在 LakeSoul 湖仓中执行建表或 Schema 变更。

这个改动使得 LakeSoul 可以支持任意类型的数据源入湖,例如 MySQL、Oracle 的 CDC 采集,或者从 Kafka 中消费 CDC 事件。开发者只需要将 CDC 消息解析为 BinarySourceRecord 对象,并创建 DataStream<BinarySourceRecord>,即可实现整库同步。LakeSoul 已经实现了从 Debezium DML 消息格式到 BinarySourceRecord 对象的转换,其他 CDC 格式可以参考实现。

全局自动小文件合并(Compaction)服务

LakeSoul 支持流式、并发的 Upsert 或 Append 操作,每次 Upsert/Append 都会写入一批文件,在读取时自动进行合并(Merge on Read)。

LakeSoul 的 MOR 性能已经相对高效(参考之前的性能对比),实测在 Upsert 100 次后的 MOR 性能下降幅度约 15%。不过为了能够有更高的读性能,LakeSoul 也提供了小文件合并(Compaction)的功能。Compaction 功能是一个 Spark 的 API,需要对每个表独立调用,使用起来较为繁琐。

本次 2.3 版本更新,LakeSoul 提供了全局自动小文件合并服务。这个服务实际是一个 Spark 作业,通过监听 LakeSoul PG 元数据库的写入事件,自动触发对符合条件的表的合并操作。这个合并服务有如下几个优点:

  1. 全局合并。合并服务只需要在集群中启动一次,就会自动对所有表进行合并(也支持按 database 划分为多个),不需要再在各个表的写入作业中进行配置,使用方便。
  2. 分离式的合并服务。由于 LakeSoul 能够支持并发写,因此合并服务的写入不影响其他写入作业,可以并发执行。
  3. 弹性伸缩。全局合并服务使用 Spark 实现,可以通过开启 Spark 的 Dynamic Allocation 实现自动弹性伸缩。

总结

LakeSoul 2.3 版本更新,能够更好地支持构建大规模实时湖仓,提供了高性能 IO、全链路流式计算、方便快捷的多源入湖等核心功能,全局合并服务在提升性能的同时保持了简单易用的特性,降低了数据湖的维护成本。

在下个版本,LakeSoul 还将原生支持 RBAC 权限控制、原生 Python 读取等功能。LakeSoul 目前是 Linux Foundation AI&Data 的 Sandbox 孵化项目,也欢迎广大开发者和用户参与到社区中来,一起打造更快、更好用的湖仓一体框架。

· 阅读需 15 分钟

近日,经过数月研发,开源湖仓框架 LakeSoul 发布了 2.2.0 版本(Github 发布日志),这个版本最重要的升级是在 Spark、Flink 中默认启用了全新的 Native IO,性能再次获得显著提升,扩大了 LakeSoul 在云原生湖仓框架领域的性能领先优势。本文为您详细解读 LakeSoul 2.2.0 版本的更新以及 Native IO 的技术细节。

LakeSoul 2.2.0 版本更新内容

在 2.2.0 版本中,LakeSoul 实现了全新的 Native IO,将全量、增量的读写逻辑都迁移到了新的 IO 层实现,并做了大量性能和正确性测试,在 Spark 和 Flink 中都默认启用了新 IO 层。

2.2.0 版本还发布了 Spark 中的快照读取、回滚和清理增量批式读取、增量流式读取等多个新接口,从而更为完善地支持高性能的流式增量的 ETL 数据建模流程。以下我们详细讲解这几个新的改进和功能点。

1. Native IO 详细解读

作为云原生湖仓框架,对云存储以及 HDFS 的读写性能是重中之重。LakeSoul 支持流批一体的特性,在存储上针对主键表支持 Upsert 更新写入和 Merge on Read 读取,IO 层的实现有一定的复杂度。先前 LakeSoul 的 Upsert 写入主要依赖计算框架来实现,例如在 Spark 中,需要由 Spark 对主键进行 shuffle 和排序后写入存储,而读取时则在 Spark 中实现了一套有序文件归并的 Reader。这套实现带来几个问题:

  1. 性能有优化空间。LakeSoul 主键模型采用哈希分桶有序的方式存储,在 Upsert 写入时不需要与历史合并,带来很高的写吞吐,但是 MOR 读时需要做多个文件的有序归并,在文件数较多的情况下,尤其是在高延迟对象存储的场景下,读性能有较大的影响,需要优化。
  2. 原方案与 Spark 耦合较紧,不方便在 Flink 等其他计算引擎中复用读写逻辑,尤其是较为复杂的 MOR 读逻辑。
  3. 原方案采用 Java 实现,不方便对接其他语言如 C++ 实现的计算引擎。

综合考虑以上几个问题,LakeSoul 选择使用 Native Code (Rust 语言)重新实现 IO 层的读写逻辑,并向上层提供基于 CFFI 的接口,以方便 Java、Python 等其他语言进行进一步封装,从而与计算引擎对接。具体的设计和实现包括如下几点:

  1. 使用 Apache Arrow 和 Rust

    LakeSoul 使用 Parquet 作为物理存储格式,因此我们选择使用 Arrow 作为 IO 的内存中间表示层,一方面是因为 Arrow 作为成熟的内存列格式,有丰富的库支持,例如 Rust 的实现 Arrow-rs,另一方面选用列式格式可以获得向量化加速的能力,也能够很方便的对接向量化计算引擎。

    LakeSoul 依赖 Arrow 的 Rust 实现,即 arrow-rs,其中已经包含了 Parquet Reader、Writer,以及对象存储的抽象层 object_store,能够很好地支持诸如 S3 等云原生对象存储。而选择 Rust 实现,主要考虑是 Rust 对异步的支持较为完善,对对象存储的操作均可实现为 async 接口,能够通过异步的方式对 IO 进行进一步优化。

  2. LakeSoul Native Writer

    LakeSoul 的 Native Writer,通过 Arrow-rs 中的 object_store 库所封装的 MultipartUpload 功能来实现单文件并发写。具体来说,Parquet 文件可以组织成多个 RowGroup,每个 RowGroup 会提前在内存中序列化为一个连续的 buffer。LakeSoul 将一个 RowGroup buffer 作为一个对象存储的 part,提交给后台线程异步上传,与此同时计算线程可以继续组织下一个 RowGroup 的内容,从而实现了单文件在对象存储上并发写入。

    LakeSoul Native Writer 支持在写入时对主键进行排序,这里使用了 Arrow DataFusion 库的 Sort 实现,其支持将排序中间结果溢写(spill)到磁盘,这样 Writer 可以支持大批量的数据写入,这在 Flink Sink 中有比较重要的意义。其原因是主键的排序只是对同一批写入的文件排序,而 LakeSoul 的 Flink Sink 仅在 Flink 执行 checkpoint 时写出文件,而两次 checkpoint 间隔可能很长。通过磁盘溢写,避免了占用过大内存导致 OOM 的问题。

    使用对象存储的 MultipartUpload 还有一个优势,就是消除了 staging 中间文件以及 commit 时的文件拷贝开销。Staging 文件原先是为了避免在目标表或分区目录下出现尚未完整写入的结果,避免引起下游读到不完整的数据。无论是 Flink 还是 Spark,在写 Staging 文件时需要先在一个临时目录下写入,commit 时再重命名临时目录为最终目标目录,这样会带来溢写额外的开销,尤其是对象存储不支持原子重命名时,需要完整拷贝一遍数据。并且 Staging 文件也使得 Flink Sink 状态维护更为复杂。LakeSoul 支持 MultipartUpload 的 abort 操作,如果中间取消写入,则会 abort 未写完的文件,并且 LakeSoul 的元数据层在 Spark、Flink 中均实现了两阶段提交协议来保证一致性。

  3. LakeSoul Native Reader

    Reader 部分,我们主要针对对象存储访问 Parquet 文件和 MOR 多个有序文件归并这两个方面做了优化。

    对象存储通常有着较高的延迟,arrow-rs 中的 object_store 库已经做了包括小请求合并(coalesce)和并发请求的优化,在我们实际测试中效果仍不够理想。因此我们做了两方面的进一步优化:

    一是拆分读请求为多个并发请求。对象存储的底层实际是 HTTP 协议,可以在 HTTP 请求体中指定要访问的对象文件的范围。我们将单个请求的范围大小限定在 8M 以内,大于 8M 的请求会自动拆分成多个并发请求,避免一次请求中访问太大的文件范围造成长时间的阻塞等待。

    第二个优化是对 Parquet 的 RowGroup 做后台异步 prefetch。这个同样也是为了能够让计算线程和 IO 线程能够同时工作,用预取掩盖延迟。默认会预取一个 RowGroup,避免占用过大内存。

    在 MOR 有序归并方面,我们基于 arrow-rs 实现了多个 Arrow RecordBatch 异步流的归并。实现时采用了 arrow-rs 提供的 Row Format 来优化主键的序关系比较性能,使用 interleave 来优化多个流合并为一个 RecordBatch 的性能。

  4. Spark、Flink 引擎对接 Native IO 层

    前面提到,Native IO 层采用 Rust 实现。在其之上,我们封装了一层 C 接口,提供读和写的功能。其中读取时支持同步阻塞和异步回调两种方式,方便上层根据需要选择(写时只提供阻塞接口,因为底层已经做了 Multipart 上传的并发)。在 C 接口之上,使用 jnr-ffi 实现了 Java 接口。最后在 Spark、Flink 侧分别实现了 Reader 和 Writer 的对接。

  5. LakeSoul IO 性能评测

    LakeSoul 的 Native IO 层对 Parquet 文件格式实现了向量化的读写,并对对象存储做了异步并发、预取等性能优化。对 Parquet 文件的直接读写,相比 Spark、Flink 中原生的 parquet-mr 和 Hadoop S3A 的实现,有着显著的性能提升:

    nativeio-vs-parquet-mr

    我们也做了 LakeSoul 与 Hudi、Iceberg 在 COW、MOR 场景下的读写性能对比评测。评测场景是首先写入 1000 万条数据,然后 Upsert 10 次,每次 200 万行。该性能评测的数据和代码均已公开,见性能评测代码库。性能对比如下:

    write-perf

    写性能方面,对于 COW 模式,三个框架都需要全量重写数据,计算开销基本一致,LakeSoul 借助 Native IO 的优化,性能是最好的。对于 MOR 模式,LakeSoul 采用的主键分桶排序模式,不需要维护索引更新,因此取得了很高的写吞吐,显著优于其他两个框架。

    read-perf

    读性能方面,对于 COW 模式,都是读取合并后的文件,没有 merge 开销,LakeSoul 同样得益于 IO 层优化取得领先。而对于 MOR 模式,虽然 LakeSoul 的有序合并相比 Hudi 的 bloom filter 过滤和 Iceberg 的 pos-delete 过滤在理论性能上有劣势,但是 LakeSoul 凭借 IO 层的多项优化,读性能仍然领先 Iceberg 1倍。

    值得一提的是,LakeSoul 在读写过程中,CPU、内存占用也都是三个框架之中最小的。另外,LakeSoul 的 MOR 读,将增量数据拆分为 100 次 Upsert 写入后,读性能仅有约 15% 的下降,也意味着 LakeSoul 可以在高频写入更新,并且没有 compaction 的情况下,仍然保证较好的读性能。

2. 快照和增量读取功能介绍

LakeSoul 2.2.0 针对快照、增量场景,提供了一系列新的接口。LakeSoul 采用了 MVCC 的多版本控制机制,会在元数据中记录每个版本所对应的文件。因此可以很容易地支持快照读(又称 Time Travel),即读取之前某个时间点的数据快照版本。LakeSoul 还能够支持回滚到某个时间点的数据,方便在最新数据有问题时还原到之前的正确版本。

对于快照读、快照回滚、快照清理,使用者只需要提供快照的时间点,用时间戳字符串表示,例如"2022-01-01 15:15:15",该时间戳不需要与实际写入时间严格对应,这个时间会作为写入版本的时间戳上界,LakeSoul 会自动找到小于等于该时间戳所对应的快照版本。

快照相关的功能点可以参考快照使用教程

LakeSoul 还提供了增量读取功能。在流式 ETL 中,增量读取功能具有重要意义,增量读能够让整个 ETL 链路都转为增量计算,提高实时性的同时还能节约计算资源。LakeSoul 2.2.0 支持在 Spark 中的增量 batch 读取、增量流式读取。增量流式读取时,LakeSoul 会作为 Spark 的 Streaming 数据源,自动发现并读取表的增量数据。具体使用方法可以参考增量查询教程

值得一提的是,不同于 Hudi、Iceberg,LakeSoul 对于主键表、非主键表均可支持增量读,对于 LakeSoul CDC 表(参考 LakeSoul CDC 表格式),也同样能够增量读取 CDC 流,这个流表示了 LakeSoul 表本身的增量修改,包括 insert、update 和 delete 操作,能够支持下游灵活的增量计算。在下一个版本,LakeSoul 将支持 LakeSoul 表的 CDC 增量流式读取为 Flink ChangeLog Stream,借助 Flink SQL 即可完成高效的流式增量 ETL 开发。

总结

LakeSoul 2.2.0 发布了全新的 Native IO ,进一步扩大了云原生湖仓框架领域的性能优势。提供了快照和增量的接口,能够更好地支持流式数据建模开发。

在下个版本,LakeSoul 将会发布全局自动 Compaction,Flink Stream SQL Source 等重要功能,敬请期待。