visit
Companies using Hudi in production include , , , and . These are some of the largest in the world. The key to Hudi in this use case is that it provides an incremental data processing stack that conducts low-latency processing on columnar data. Typically, systems write data out once using an open file format like Apache Parquet or ORC, and store this on top of highly scalable object storage or distributed file system. Hudi serves as a data plane to ingest, transform, and manage this data. Hudi interacts with storage using the , which is compatible with (but not necessarily optimal for) implementations ranging from HDFS to object storage to in-memory file systems.
.
.
The timeline is critical to understand because it serves as a source of truth event log for all of Hudi’s table metadata. The timeline is stored in the .hoodie
folder, or bucket in our case. Events are retained on the timeline until they are removed. The timeline exists for an overall table as well as for file groups, enabling reconstruction of a file group by applying the delta logs to the original base file. In order to optimize for frequent writes/commits, Hudi’s design keeps metadata small relative to the size of the entire table.
A typical Hudi architecture relies on Spark or Flink pipelines to deliver data to Hudi tables. The Hudi writing path is optimized to be more efficient than simply writing a Parquet or Avro file to disk. Hudi analyzes write operations and classifies them as incremental (insert
, upsert
, delete
) or batch operations (insert_overwrite
, insert_overwrite_table
, delete_partition
, bulk_insert
) and then applies necessary .
Hudi’s greatest strength is the speed with which it ingests both streaming and batch data. By providing the ability to upsert
, Hudi executes tasks orders of magnitudes faster than rewriting entire tables or partitions.
Schema is a critical component of every Hudi table. Hudi can enforce schema, or it can allow schema evolution so the streaming data pipeline can adapt without breaking. In addition, Hudi enforces schema-on-writer to ensure changes don’t break pipelines. Hudi relies on Avro to store, manage and evolve a table’s schema.
Hudi provides ACID transactional guarantees to data lakes. Hudi ensures atomic writes: commits are made atomically to a timeline and given a time stamp that denotes the time at which the action is deemed to have occurred. Hudi isolates snapshots between writer, table, and reader processes so each operates on a consistent snapshot of the table. Hudi rounds this out with optimistic concurrency control (OCC) between writers and non-blocking MVCC-based concurrency control between table services and writers and between multiple table services.
AWS: aws-java-sdk:1.10.34
(or higher)
Hadoop: hadoop-aws:2.7.3
(or higher)
the Jar files, unzip them and copy them to /opt/spark/jars
.
mc alias set myminio //<your-MinIO-IP:port> <your-MinIO-access-key> <your-MinIO-secret-key>
mc mb myminio/hudi
spark-shell \
--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.0,org.apache.hadoop:hadoop-aws:3.3.4 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.hadoop.fs.s3a.access.key=<your-MinIO-access-key>' \
--conf 'spark.hadoop.fs.s3a.secret.key=<your-MinIO-secret-key>'\
--conf 'spark.hadoop.fs.s3a.endpoint=<your-MinIO-IP>:9000' \
--conf 'spark.hadoop.fs.s3a.path.style.access=true' \
--conf 'fs.s3a.signing-algorithm=S3SignerType'
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecord
val tableName = "hudi_trips_cow"
val basePath = "s3a://hudi/hudi_trips_cow"
val dataGen = new DataGenerator
The following will generate new trip data, load them into a DataFrame and write the DataFrame we just created to MinIO as a Hudi table. mode(Overwrite)
overwrites and recreates the table in the event that it already exists. The trips data relies on a record key (uuid
), partition field (region/country/city
) and logic (ts
) to ensure trip records are unique for each partition. We will use the default write operation, upsert
. When you have a workload without updates, you could use insert
or bulk_insert
which could be faster.
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)
Open a browser and log into MinIO at //<your-MinIO-IP>:<port>
with your access key and secret key. You will see the Hudi table in the bucket.
The bucket also contains a .hoodie
path that contains metadata, and americas
and asia
paths that contain data.
Take a look at the metadata. This is what my .hoodie
path looks like after completing the entire tutorial. We can see that I modified the table on Tuesday September 13, 2022 at 9:02, 10:37, 10:48, 10:52 and 10:56.
// spark-shell
val tripsSnapshotDF = spark.
read.
format("hudi").
load(basePath)
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show()
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()
spark.read.
format("hudi").
option("as.of.instant", "2022-09-13 09:02:08.200").
load(basePath)
Note that we’re using the append
save mode. A general guideline is to use append
mode unless you are creating a new table so no records are overwritten. A typical way of working with Hudi is to ingest streaming data in real-time, appending them to the table, and then write some logic that merges and updates existing records based on what was just appended. Alternatively, writing using overwrite
mode deletes and recreates the table if it already exists.
// spark-shell
val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
// spark-shell
// reload data
spark.
read.
format("hudi").
load(basePath).
createOrReplaceTempView("hudi_trips_snapshot")
val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50)
val beginTime = commits(commits.length - 2) // commit time we are interested in
// incrementally query data
val tripsIncrementalDF = spark.read.format("hudi").
option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
load(basePath)
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show()
// spark-shell
val beginTime = "000" // Represents all commits > this time.
val endTime = commits(commits.length - 2) // commit time we are interested in
//incrementally query data
val tripsPointInTimeDF = spark.read.format("hudi").
option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
option(END_INSTANTTIME_OPT_KEY, endTime).
load(basePath)
tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")
spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()
// spark-shell
spark.
read.
format("hudi").
load(basePath).
createOrReplaceTempView("hudi_trips_snapshot")
// fetch total records count
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is not null").count()
// fetch two records for soft deletes
val softDeleteDs = spark.sql("select * from hudi_trips_snapshot").limit(2)
// prepare the soft deletes by ensuring the appropriate fields are nullified
val nullifyColumns = softDeleteDs.schema.fields.
map(field => (field.name, field.dataType.typeName)).
filter(pair => (!HoodieRecord.HOODIE_META_COLUMNS.contains(pair._1)
&& !Array("ts", "uuid", "partitionpath").contains(pair._1)))
val softDeleteDf = nullifyColumns.
foldLeft(softDeleteDs.drop(HoodieRecord.HOODIE_META_COLUMNS: _*))(
(ds, col) => ds.withColumn(col._1, lit(null).cast(col._2)))
// simply upsert the table after setting these fields to null
softDeleteDf.write.format("hudi").
options(getQuickstartWriteConfigs).
option(OPERATION_OPT_KEY, "upsert").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
// reload data
spark.
read.
format("hudi").
load(basePath).
createOrReplaceTempView("hudi_trips_snapshot")
// This should return the same total count as before
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
// This should return (total - 2) count as two records are updated with nulls
spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is not null").count()
// spark-shell
// fetch total records count
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
// fetch two records to be deleted
val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)
// issue deletes
val deletes = dataGen.generateDeletes(ds.collectAsList())
val hardDeleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2))
hardDeleteDf.write.format("hudi").
options(getQuickstartWriteConfigs).
option(OPERATION_OPT_KEY,"delete").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
// run the same read query as above.
val roAfterDeleteViewDF = spark.
read.
format("hudi").
load(basePath)
roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
// fetch should return (total - 2) records
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
The data lake becomes a data lakehouse when it gains the ability to update existing data. We’re going to generate some new trip data and then overwrite our existing data. This operation is faster than an upsert
where Hudi computes the entire target partition at once for you. Here we specify configuration in order to bypass the automatic indexing, precombining and repartitioning that upsert
would do for you.
// spark-shell
spark.
read.format("hudi").
load(basePath).
select("uuid","partitionpath").
sort("partitionpath","uuid").
show(100, false)
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.
read.json(spark.sparkContext.parallelize(inserts, 2)).
filter("partitionpath = 'americas/united_states/san_francisco'")
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(OPERATION.key(),"insert_overwrite").
option(PRECOMBINE_FIELD.key(), "ts").
option(RECORDKEY_FIELD.key(), "uuid").
option(PARTITIONPATH_FIELD.key(), "partitionpath").
option(TBL_NAME.key(), tableName).
mode(Append).
save(basePath)
// Should have different keys now for San Francisco alone, from query before.
spark.
read.format("hudi").
load(basePath).
select("uuid","partitionpath").
sort("partitionpath","uuid").
show(100, false)
-- Alter table name
ALTER TABLE oldTableName RENAME TO newTableName
-- Alter table add columns
ALTER TABLE tableIdentifier ADD COLUMNS(colAndType (,colAndType)*)
-- Alter table column type
ALTER TABLE tableIdentifier CHANGE COLUMN colName colName colType
-- Alter table properties
ALTER TABLE tableIdentifier SET TBLPROPERTIES (key = 'value')
#Alter table examples
--rename to:
ALTER TABLE hudi_cow_nonpcf_tbl RENAME TO hudi_cow_nonpcf_tbl2;
--add column:
ALTER TABLE hudi_cow_nonpcf_tbl2 add columns(remark string);
--change column:
ALTER TABLE hudi_cow_nonpcf_tbl2 change column uuid uuid bigint;
--set properties;
alter table hudi_cow_nonpcf_tbl2 set tblproperties (hoodie.keep.max.commits = '10');
Currently, SHOW partitions
only works on a file system, as it is based on the file system table path.