在生产中使用 Hudi 的公司包括 、 、 和 。这些是世界上最大的一些。在这个用例中,Hudi 的关键在于它提供了一个增量数据处理堆栈,可以对柱状数据进行低延迟处理。通常,系统使用 Apache Parquet 或 ORC 等开放文件格式一次性写出数据,并将其存储在高度可扩展的对象存储或分布式文件系统之上。 Hudi 充当数据平面来摄取、转换和管理这些数据。 Hudi 使用与存储进行交互,该 API 与从 HDFS 到对象存储再到内存文件系统的实现兼容(但不一定是最佳的)。
。
。
时间线对于理解至关重要,因为它是所有 Hudi 表元数据的真实事件日志来源。时间线存储在.hoodie
文件夹中,或者在我们的例子中存储在存储桶中。事件会保留在时间线上,直到被删除。时间线适用于整个表以及文件组,从而可以通过将增量日志应用到原始基础文件来重建文件组。为了优化频繁写入/提交,Hudi 的设计使元数据相对于整个表的大小较小。
典型的 Hudi 架构依赖于 Spark 或 Flink 管道将数据传输到 Hudi 表。 Hudi 写入路径经过优化,比简单地将 Parquet 或 Avro 文件写入磁盘更有效。 Hudi 分析写入操作并将其分类为增量操作( insert
、 upsert
、 delete
)或批量操作( insert_overwrite
、 insert_overwrite_table
、 delete_partition
、 bulk_insert
),然后应用必要的。
Hudi 的最大优势在于它摄取流数据和批量数据的速度。通过提供upsert
功能,Hudi 执行任务的速度比重写整个表或分区快几个数量级。
模式是每个 Hudi 表的重要组成部分。 Hudi 可以强制执行模式,也可以允许模式演变,以便流数据管道可以在不中断的情况下进行调整。此外,Hudi 强制执行 schema-on-writer 以确保更改不会破坏管道。 Hudi 依赖 Avro 来存储、管理和发展表的架构。
Hudi 为数据湖提供ACID 事务保证。 Hudi 确保原子写入:提交以原子方式提交到时间线,并给出一个时间戳,该时间戳表示操作被视为发生的时间。 Hudi 将写入器、表和读取器进程之间的快照隔离开来,以便每个进程都对表的一致快照进行操作。 Hudi 通过写入器之间的乐观并发控制 (OCC) 以及表服务和写入器之间以及多个表服务之间的基于非阻塞 MVCC 的并发控制来解决此问题。
AWS: aws-java-sdk:1.10.34
(或更高版本)
Hadoop: hadoop-aws:2.7.3
(或更高版本)
Jar 文件,解压缩并将其复制到/opt/spark/jars
。
mc alias set myminio //<your-MinIO-IP:port> <your-MinIO-access-key> <your-MinIO-secret-key> mc mb myminio/hudi
spark-shell \ --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.0,org.apache.hadoop:hadoop-aws:3.3.4 \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \ --conf 'spark.hadoop.fs.s3a.access.key=<your-MinIO-access-key>' \ --conf 'spark.hadoop.fs.s3a.secret.key=<your-MinIO-secret-key>'\ --conf 'spark.hadoop.fs.s3a.endpoint=<your-MinIO-IP>:9000' \ --conf 'spark.hadoop.fs.s3a.path.style.access=true' \ --conf 'fs.s3a.signing-algorithm=S3SignerType'
import org.apache.hudi.QuickstartUtils._ import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.hudi.common.model.HoodieRecord
val tableName = "hudi_trips_cow" val basePath = "s3a://hudi/hudi_trips_cow" val dataGen = new DataGenerator
下面将生成新的行程数据,将它们加载到 DataFrame 中,并将我们刚刚创建的 DataFrame 作为 Hudi 表写入 MinIO。如果表已存在, mode(Overwrite)
将覆盖并重新创建该表。行程数据依赖于记录键 ( uuid
)、分区字段 ( region/country/city
) 和逻辑 ( ts
) 以确保每个分区的行程记录是唯一的。我们将使用默认的写入操作upsert
。当您的工作负载没有更新时,您可以使用insert
或bulk_insert
,这可能会更快。
val inserts = convertToStringList(dataGen.generateInserts(10)) val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) df.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Overwrite). save(basePath)
打开浏览器并使用您的访问密钥和秘密密钥通过//<your-MinIO-IP>:<port>
登录 MinIO。您将在存储桶中看到 Hudi 表。
该存储桶还包含包含元数据的.hoodie
路径以及包含数据的americas
和asia
路径。
查看元数据。这就是完成整个教程后我的.hoodie
路径的样子。我们可以看到我在2022年9月13日星期二的9:02、10:37、10:48、10:52和10:56修改了表格。
// spark-shell val tripsSnapshotDF = spark. read. format("hudi"). load(basePath) tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show() spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()
spark.read. format("hudi"). option("as.of.instant", "2022-09-13 09:02:08.200"). load(basePath)
请注意,我们使用的是append
保存模式。一般准则是使用append
模式,除非您要创建新表,这样不会覆盖任何记录。使用 Hudi 的典型方法是实时摄取流数据,将它们附加到表中,然后编写一些逻辑,根据刚刚附加的内容合并和更新现有记录。或者,使用overwrite
模式写入会删除并重新创建该表(如果该表已存在)。
// spark-shell val updates = convertToStringList(dataGen.generateUpdates(10)) val df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) df.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath)
// spark-shell // reload data spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot") val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50) val beginTime = commits(commits.length - 2) // commit time we are interested in // incrementally query data val tripsIncrementalDF = spark.read.format("hudi"). option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). load(basePath) tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show()
// spark-shell val beginTime = "000" // Represents all commits > this time. val endTime = commits(commits.length - 2) // commit time we are interested in //incrementally query data val tripsPointInTimeDF = spark.read.format("hudi"). option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). option(END_INSTANTTIME_OPT_KEY, endTime). load(basePath) tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()
// spark-shell spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot") // fetch total records count spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is not null").count() // fetch two records for soft deletes val softDeleteDs = spark.sql("select * from hudi_trips_snapshot").limit(2) // prepare the soft deletes by ensuring the appropriate fields are nullified val nullifyColumns = softDeleteDs.schema.fields. map(field => (field.name, field.dataType.typeName)). filter(pair => (!HoodieRecord.HOODIE_META_COLUMNS.contains(pair._1) && !Array("ts", "uuid", "partitionpath").contains(pair._1))) val softDeleteDf = nullifyColumns. foldLeft(softDeleteDs.drop(HoodieRecord.HOODIE_META_COLUMNS: _*))( (ds, col) => ds.withColumn(col._1, lit(null).cast(col._2))) // simply upsert the table after setting these fields to null softDeleteDf.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION_OPT_KEY, "upsert"). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath) // reload data spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot") // This should return the same total count as before spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() // This should return (total - 2) count as two records are updated with nulls spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is not null").count()
// spark-shell // fetch total records count spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() // fetch two records to be deleted val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2) // issue deletes val deletes = dataGen.generateDeletes(ds.collectAsList()) val hardDeleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2)) hardDeleteDf.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION_OPT_KEY,"delete"). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath) // run the same read query as above. val roAfterDeleteViewDF = spark. read. format("hudi"). load(basePath) roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot") // fetch should return (total - 2) records spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
当数据湖获得更新现有数据的能力时,它就成为数据湖屋。我们将生成一些新的行程数据,然后覆盖现有数据。此操作比upsert
更快,其中 Hudi 会立即为您计算整个目标分区。在这里,我们指定配置,以便绕过upsert
为您执行的自动索引、预组合和重新分区。
// spark-shell spark. read.format("hudi"). load(basePath). select("uuid","partitionpath"). sort("partitionpath","uuid"). show(100, false) val inserts = convertToStringList(dataGen.generateInserts(10)) val df = spark. read.json(spark.sparkContext.parallelize(inserts, 2)). filter("partitionpath = 'americas/united_states/san_francisco'") df.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION.key(),"insert_overwrite"). option(PRECOMBINE_FIELD.key(), "ts"). option(RECORDKEY_FIELD.key(), "uuid"). option(PARTITIONPATH_FIELD.key(), "partitionpath"). option(TBL_NAME.key(), tableName). mode(Append). save(basePath) // Should have different keys now for San Francisco alone, from query before. spark. read.format("hudi"). load(basePath). select("uuid","partitionpath"). sort("partitionpath","uuid"). show(100, false)
-- Alter table name ALTER TABLE oldTableName RENAME TO newTableName -- Alter table add columns ALTER TABLE tableIdentifier ADD COLUMNS(colAndType (,colAndType)*) -- Alter table column type ALTER TABLE tableIdentifier CHANGE COLUMN colName colName colType -- Alter table properties ALTER TABLE tableIdentifier SET TBLPROPERTIES (key = 'value') #Alter table examples --rename to: ALTER TABLE hudi_cow_nonpcf_tbl RENAME TO hudi_cow_nonpcf_tbl2; --add column: ALTER TABLE hudi_cow_nonpcf_tbl2 add columns(remark string); --change column: ALTER TABLE hudi_cow_nonpcf_tbl2 change column uuid uuid bigint; --set properties; alter table hudi_cow_nonpcf_tbl2 set tblproperties (hoodie.keep.max.commits = '10');
目前, SHOW partitions
仅适用于文件系统,因为它基于文件系统表路径。