Spark Getting Started Guide
Setup
Setup
To use LakeSoul in Spark, first configure Spark Catalog. LakeSoul uses Apache Spark’s DataSourceV2 API for data source and catalog implementations. Moreover, LakeSoul provides scala table API to extend the capability of LakeSoul table.
Spark 3 Support Matrix
LakeSoul | Spark Version |
---|---|
2.2.x-2.4.x | 3.3.x |
2.0.x-2.1.x | 3.1.x |
Spark Shell/SQL/PySpark
Run spark-shell/spark-sql/pyspark with the LakeSoulSparkSessionExtension
sql extension.
- Spark SQL
- Scala
- PySpark
spark-sql --conf spark.sql.extensions=com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension --conf spark.sql.catalog.lakesoul=org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog --conf spark.sql.defaultCatalog=lakesoul --jars lakesoul-spark-spark-3.3-2.6.2.jar
spark-shell --conf spark.sql.extensions=com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension --conf spark.sql.catalog.lakesoul=org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog --conf spark.sql.defaultCatalog=lakesoul --jars lakesoul-spark-spark-3.3-2.6.2.jar
wget https://github.com/lakesoul-io/LakeSoul/tree/main/python/lakesoul/spark/tables.py
pyspark --conf spark.sql.extensions=com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension --conf spark.sql.catalog.lakesoul=org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog --conf spark.sql.defaultCatalog=lakesoul --jars lakesoul-spark-spark-3.3-2.6.2.jar --py-files tables.py
Setup Maven Project
Include maven dependencies in your project:
<dependency>
<groupId>com.dmetasoul</groupId>
<artifactId>lakesoul-spark</artifactId>
<version>3.3-2.6.2</version>
</dependency>
- Scala
// Scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.lakesoul.LakeSoulOptions
import spark.implicits._
import com.dmetasoul.lakesoul.tables.LakeSoulTable
val builder = SparkSession.builder()
.master("local")
.config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog")
.config("spark.sql.defaultCatalog", "lakesoul")
val spark = builder.getOrCreate()
Create Namespace
First, create a namespace for LakeSoul table, default namespace of LakeSoul Catalog is default
.
- Spark SQL
- Scala
- PySpark
CREATE NAMESPACE IF NOT EXISTS lakesoul_namespace;
USE lakesoul_namespace;
SHOW TABLES;
// Scala
spark.sql("CREATE NAMESPACE IF NOT EXISTS lakesoul_namespace")
spark.sql("USE lakesoul_namespace")
spark.sql("SHOW TABLES")
// python
spark.sql("CREATE NAMESPACE IF NOT EXISTS lakesoul_namespace")
spark.sql("USE lakesoul_namespace")
spark.sql("SHOW TABLES")
Create Table
Create a partitioned LakeSoul table using SQL with the clause USING lakesoul
, or using DataFrameWriterV2
API at the first save
.
- Spark SQL
- Scala
- PySpark
CREATE TABLE lakesoul_table (id BIGINT, name STRING, `date` STRING)
USING lakesoul
PARTITIONED BY (`date`)
LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_table';
// Scala
spark.sql("CREATE TABLE lakesoul_table (id BIGINT, name STRING, `date` STRING) USING lakesoul PARTITIONED BY (`date`) LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_table'")
// python
spark.sql("CREATE TABLE lakesoul_table (id BIGINT, name STRING, `date` STRING) USING lakesoul PARTITIONED BY (`date`) LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_table'")
Primary Key Table
In LakeSoul, a table with primary keys is defined as a hash-partitioned table. To create such a table, use the USING lakesoul
clause and specify the TBLPROPERTIES
setting, where 'hashPartitions'
designates a comma-separated list of primary key column names, and 'hashBucketNum'
determines the size or number of hash buckets.
- Spark SQL
- Scala
- PySpark
CREATE TABLE lakesoul_hash_table (id BIGINT NOT NULL, name STRING, date STRING)
USING lakesoul
PARTITIONED BY (date)
LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_hash_table'
TBLPROPERTIES ( 'hashPartitions'='id', 'hashBucketNum'='2');
// Scala
spark.sql("CREATE TABLE lakesoul_hash_table (id BIGINT NOT NULL, name STRING, date STRING) USING lakesoul PARTITIONED BY (date) LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_hash_table' TBLPROPERTIES ( 'hashPartitions'='id', 'hashBucketNum'='2')")
// python
spark.sql("CREATE TABLE lakesoul_hash_table (id BIGINT NOT NULL, name STRING, date STRING) USING lakesoul PARTITIONED BY (date) LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_hash_table' TBLPROPERTIES ( 'hashPartitions'='id', 'hashBucketNum'='2')")
CDC Table
Optionally, a hash-partitioned LakeSoul table has the capability to record Change Data Capture (CDC) data, enabling the tracking of data modifications. To create a LakeSoul table with CDC support, one can utilize the DDL statement for a hash-partitioned LakeSoul table and include an additional TBLPROPERTIES
setting specifying the 'lakesoul_cdc_change_column'
attribute. This attribute introduces an implicit column that assists the table in efficiently handling CDC information, ensuring precise tracking and management of data changes.
- Spark SQL
- Scala
- PySpark
CREATE TABLE lakesoul_cdc_table (id BIGINT NOT NULL, name STRING, date STRING)
USING lakesoul
PARTITIONED BY (date)
LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_cdc_table'
TBLPROPERTIES('hashPartitions'='id', 'hashBucketNum'='2', 'lakesoul_cdc_change_column' = 'op');
// Scala
spark.sql("CREATE TABLE lakesoul_cdc_table (id BIGINT NOT NULL, name STRING, date STRING) USING lakesoul PARTITIONED BY (date) LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_cdc_table' TBLPROPERTIES('hashPartitions'='id', 'hashBucketNum'='2', 'lakesoul_cdc_change_column' = 'op')")
// python
spark.sql("CREATE TABLE lakesoul_cdc_table (id BIGINT NOT NULL, name STRING, date STRING) USING lakesoul PARTITIONED BY (date) LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_cdc_table' TBLPROPERTIES('hashPartitions'='id', 'hashBucketNum'='2', 'lakesoul_cdc_change_column' = 'op')")
Insert/Merge Data
To append new data to a non-hash-partitioned table using Spark SQL, use INSERT INTO.
To append new data to a table using DataFrame, use DataFrameWriterV2
API. If this is the first write of the table, it will also auto-create the corresponding LakeSoul table.
- Spark SQL
- Scala
- PySpark
INSERT INTO TABLE lakesoul_table VALUES (1, 'Alice', '2024-01-01'), (2, 'Bob', '2024-01-01'), (1, 'Cathy', '2024-01-02');
// Scala
val data = Seq(Row(1L, "Alice", "2024-01-01"), Row(2L, "Bob", "2024-01-01"), Row(1L, "Cathy", "2024-01-02"))
val schema = StructType(Seq(StructField("id", LongType, false), StructField("name", StringType, true), StructField("date", StringType, false)))
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
df.write.format("lakesoul").insertInto("lakesoul_table")
// python
from pyspark.sql.types import *
data = [(1,"Cathy","2024-01-02")]
schema = StructType([StructField("id", LongType(), False), StructField("name", StringType(), True), StructField("date", StringType(), False)])
df = spark.createDataFrame(data,schema=schema)
df.write.format("lakesoul").insertInto("lakesoul_table")
To append new data to a hash-partitioned table using Spark SQL, use Merge INTO.
To append new data to a hash-partitioned table using DataFrame, use LakeSoulTable
upsert API.
- Spark SQL
- Scala
- PySpark
CREATE OR REPLACE VIEW spark_catalog.default.source_view (id , name, date)
AS SELECT 1L as `id`, 'George' as `name`, '2024-01-01' as `date`;
MERGE INTO lakesoul_hash_table AS t
USING spark_catalog.default.source_view AS s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
// Scala
val tablePath = "file:/tmp/lakesoul_namespace/lakesoul_upsert_table"
// Init hash table with first dataframe
val df = Seq((20201101, 1, 1), (20201101, 2, 2), (20201101, 3, 3), (20201102, 4, 4)).toDF("range", "hash", "value")
val writer = df.write.format("lakesoul").mode("overwrite")
writer
.option("rangePartitions", "range")
.option("hashPartitions", "hash")
.option("hashBucketNum", 2)
.save(tablePath)
// merge the second dataframe into hash table using LakeSoulTable upsert API
val dfUpsert = Seq((20201101, 1, 1), (20201101, 2, 2), (20201101, 3, 3), (20201102, 4, 4)).toDF("range", "hash", "value")
LakeSoulTable.forPath(tablePath).upsert(dfUpsert)
// python
from pyspark.sql.types import *
from tables import LakeSoulTable
tablePath = "file:/tmp/lakesoul_namespace/lakesoul_upsert_table"
df = spark.createDataFrame([(20201101, 1, 1), (20201101, 2, 2), (20201101, 3, 3), (20201102, 4, 4)],schema='range string,hash string,value string')
df.write.format("lakesoul").mode("overwrite").option("rangePartitions", "range").option("hashPartitions", "hash").option("hashBucketNum", 2).save(tablePath)
dfUpsert = spark.createDataFrame([(20201111, 1, 1), (20201111, 2, 2), (20201111, 3, 3), (20201112, 4, 4)],schema='range string,hash string,value string')
LakeSoulTable.forPath(spark,tablePath).upsert(dfUpsert)
Update Data
LakeSoul tables can be updated by a DataFrame or using a standard UPDATE
statement.
To update data to a table using DataFrame, use LakeSoulTable
updateExpr API.
- Spark SQL
- Scala
- PySpark
UPDATE lakesoul_table SET name = 'David' WHERE id = 2;
// Scala
val tablePath = "file:/tmp/lakesoul_namespace/lakesoul_table"
LakeSoulTable.forPath(tablePath).updateExpr("id = 2", Seq(("name"->"'David'")).toMap)
// python
from tables import LakeSoulTable
tablePath = "file:/tmp/lakesoul_namespace/lakesoul_upsert_table"
LakeSoulTable.forPath(spark,tablePath).update("hash = 4", { "value":"5"})
Delete Data
LakeSoul tables can be removes the records by a DataFrame or using a standard DELETE
statement.
To delete data to a table using DataFrame, use LakeSoulTable
delete
API.
- Spark SQL
- Scala
- PySpark
DELETE FROM lakesoul_table WHERE id =1;
// Scala
val tablePath = "file:/tmp/lakesoul_namespace/lakesoul_table"
LakeSoulTable.forPath(tablePath).delete("id = 1 or id =2")
// python
from tables import LakeSoulTable
tablePath = "file:/tmp/lakesoul_namespace/lakesoul_upsert_table"
LakeSoulTable.forPath(spark,tablePath).delete("hash = 4")
Query Data
LakeSoul tables can be queried using a DataFrame or Spark SQL.
- Spark SQL
- Scala
- PySpark
SELECT * FROM lakesoul_table;
// Scala
// query data with DataFrameReader API
val tablePath = "file:/tmp/lakesoul_namespace/lakesoul_table"
spark.read.format("lakesoul").load(tablePath)
// query data with LakeSoulTable API
val tablePath = "file:/tmp/lakesoul_namespace/lakesoul_table"
LakeSoulTable.forPath(tablePath).toDF
val tableName = "lakesoul_table"
LakeSoulTable.forName(tableName).toDF
// python
from tables import LakeSoulTable
tablePath = "file:/tmp/lakesoul_namespace/lakesoul_upsert_table"
// query data with LakeSoulTable API
LakeSoulTable.forPath(spark,tablePath).toDF().show()
// query data with DataFrameReader API
spark.read.format("lakesoul").load(tablePath).show()
Time Travel Query
LakeSoul supports time travel query to query the table at any point-in-time in history or the changed data between two commit time.
// Scala
val tablePath = "file:/tmp/lakesoul_namespace/cdc_table"
Seq(("range1", "hash1", "insert"), ("range2", "hash2", "insert"), ("range3", "hash2", "insert"), ("range4", "hash2", "insert"), ("range4", "hash4", "insert"), ("range3", "hash3", "insert"))
.toDF("range", "hash", "op")
.write
.mode("append")
.format("lakesoul")
.option("rangePartitions", "range")
.option("hashPartitions", "hash")
.option("hashBucketNum", "2")
.option("shortTableName", "cdc_table")
.option("lakesoul_cdc_change_column", "op")
.save(tablePath)
// record the version of 1st commit
import java.text.SimpleDateFormat
val versionA: String = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(System.currentTimeMillis)
val lakeTable = LakeSoulTable.forPath(tablePath)
lakeTable.upsert(Seq(("range1", "hash1-1", "delete"), ("range2", "hash2-10", "delete"))
.toDF("range", "hash", "op"))
// record the version of 2nd commit
val versionB: String = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(System.currentTimeMillis)
lakeTable.upsert(Seq(("range1", "hash1-13", "insert"), ("range2", "hash2-13", "update"))
.toDF("range", "hash", "op"))
lakeTable.upsert(Seq(("range1", "hash1-15", "insert"), ("range2", "hash2-15", "update"))
.toDF("range", "hash", "op"))
// record the version of 3rd,4th commits
val versionC: String = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(System.currentTimeMillis)
Complete Query
// Scala
spark.sql("SELECT * FROM cdc_table")
Snapshot Query
LakeSoul supports snapshot query for query the table at a point-in-time in history.
// Scala
spark.read.format("lakesoul")
.option(LakeSoulOptions.PARTITION_DESC, "range=range2")
.option(LakeSoulOptions.READ_END_TIME, versionB)
.option(LakeSoulOptions.READ_TYPE, LakeSoulOptions.ReadType.SNAPSHOT_READ)
.load(tablePath)
Incremental Query
LakeSoul supports incremental query to obtain a set of records that changed between a start and end time.
// Scala
spark.read.format("lakesoul")
.option(LakeSoulOptions.PARTITION_DESC, "range=range1")
.option(LakeSoulOptions.READ_START_TIME, versionA)
.option(LakeSoulOptions.READ_END_TIME, versionB)
.option(LakeSoulOptions.READ_TYPE, LakeSoulOptions.ReadType.INCREMENTAL_READ)
.load(tablePath)
Next steps
Next, you can learn more usage cases about LakeSoul tables in Spark at Spark API docs.