Hudi を運用環境で使用している企業には、 、 、 、 があります。これらは世界最大級のの一部です。このユースケースにおける Hudi の重要な点は、列形式データに対して低レイテンシの処理を実行する増分データ処理スタックを提供することです。通常、システムは、Apache Parquet や ORC などのオープン ファイル形式を使用してデータを一度書き出し、これを拡張性の高いオブジェクト ストレージまたは分散ファイル システム上に保存します。 Hudi は、このデータを取り込み、変換、管理するためのデータ プレーンとして機能します。 Hudi は、 を使用してストレージと対話します。これは、HDFS からオブジェクト ストレージ、インメモリ ファイル システムに至るまでの実装と互換性があります (ただし、必ずしも最適であるとは限りません)。
。
。
タイムラインは、 Hudi のすべてのテーブル メタデータの信頼できるイベント ログのソースとして機能するため、理解することが重要です。タイムラインは.hoodie
フォルダー、この場合はバケットに保存されます。イベントは削除されるまでタイムライン上に保持されます。タイムラインはファイル グループだけでなくテーブル全体にも存在し、差分ログを元のベース ファイルに適用することでファイル グループを再構築できます。頻繁な書き込み/コミットを最適化するために、Hudi の設計では、テーブル全体のサイズに比べてメタデータが小さく保たれます。
一般的な Hudi アーキテクチャは、Spark または Flink パイプラインに依存してデータを Hudi テーブルに配信します。 Hudi 書き込みパスは、Parquet または Avro ファイルを単にディスクに書き込むよりも効率的になるように最適化されています。 Hudi は書き込み操作を分析し、増分操作 ( insert
、 upsert
、 delete
) またはバッチ操作 ( insert_overwrite
、 insert_overwrite_table
、 delete_partition
、 bulk_insert
) として分類し、必要な適用します。
Hudi の最大の強みは、ストリーミング データとバッチ データの両方を取り込む速度です。 upsert
機能を提供することにより、Hudi はテーブル全体やパーティション全体を書き換えるよりもはるかに高速にタスクを実行します。
スキーマは、すべての Hudi テーブルの重要なコンポーネントです。 Hudi はスキーマを強制したり、ストリーミング データ パイプラインが中断することなく適応できるようにスキーマの進化を許可したりできます。さらに、Hudi は変更によってパイプラインが中断されないようにスキーマオンライターを強制します。 Hudi は、Avro を利用してテーブルのスキーマを保存、管理、展開します。
Hudi は、データ レイクにACID トランザクション保証を提供します。 Hudi はアトミックな書き込みを保証します。コミットはタイムラインに対してアトミックに行われ、アクションが発生したとみなされる時刻を示すタイムスタンプが与えられます。 Hudi は、ライター、テーブル、リーダーのプロセス間でスナップショットを分離し、それぞれがテーブルの一貫したスナップショットで動作するようにします。 Hudi は、ライター間のオプティミスティック同時実行制御 (OCC) と、テーブル サービスとライターの間、および複数のテーブル サービス間のノンブロッキング MVCC ベースの同時実行制御によってこれを完成させます。
AWS: aws-java-sdk:1.10.34
(またはそれ以降)
Hadoop: hadoop-aws:2.7.3
(またはそれ以降)
Jar ファイルを解凍し、 /opt/spark/jars
にコピーします。
mc alias set myminio //<your-MinIO-IP:port> <your-MinIO-access-key> <your-MinIO-secret-key> mc mb myminio/hudi
spark-shell \ --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.0,org.apache.hadoop:hadoop-aws:3.3.4 \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \ --conf 'spark.hadoop.fs.s3a.access.key=<your-MinIO-access-key>' \ --conf 'spark.hadoop.fs.s3a.secret.key=<your-MinIO-secret-key>'\ --conf 'spark.hadoop.fs.s3a.endpoint=<your-MinIO-IP>:9000' \ --conf 'spark.hadoop.fs.s3a.path.style.access=true' \ --conf 'fs.s3a.signing-algorithm=S3SignerType'
import org.apache.hudi.QuickstartUtils._ import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.hudi.common.model.HoodieRecord
val tableName = "hudi_trips_cow" val basePath = "s3a://hudi/hudi_trips_cow" val dataGen = new DataGenerator
以下は、新しい旅行データを生成し、DataFrame にロードし、作成した DataFrame を Hudi テーブルとして MinIO に書き込みます。 mode(Overwrite)
テーブルがすでに存在する場合、テーブルを上書きして再作成します。旅行データは、レコード キー ( uuid
)、パーティション フィールド ( region/country/city
)、およびロジック ( ts
) に依存して、旅行レコードが各パーティションで一意であることを保証します。デフォルトの書き込み操作upsert
を使用します。更新のないワークロードがある場合は、 insert
またはbulk_insert
使用すると、より高速になる可能性があります。
val inserts = convertToStringList(dataGen.generateInserts(10)) val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) df.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Overwrite). save(basePath)
ブラウザを開き、アクセス キーと秘密キーを使用して//<your-MinIO-IP>:<port>
にある MinIO にログインします。バケット内に Hudi テーブルが表示されます。
このバケットには、メタデータを含む.hoodie
パスと、データを含むamericas
およびasia
パスも含まれています。
メタデータを見てみましょう。チュートリアル全体を完了した後の.hoodie
パスは次のようになります。 2022 年 9 月 13 日火曜日の 9:02、10:37、10:48、10:52、10:56 にテーブルを変更したことがわかります。
// spark-shell val tripsSnapshotDF = spark. read. format("hudi"). load(basePath) tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show() spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()
spark.read. format("hudi"). option("as.of.instant", "2022-09-13 09:02:08.200"). load(basePath)
append
保存モードを使用していることに注意してください。一般的なガイドラインは、新しいテーブルを作成する場合を除き、レコードが上書きされないようにappend
モードを使用することです。 Hudi を使用する一般的な方法は、ストリーミング データをリアルタイムで取り込んでテーブルに追加し、追加された内容に基づいて既存のレコードをマージおよび更新するロジックを作成することです。あるいは、 overwrite
モードを使用して書き込むと、テーブルがすでに存在する場合は削除して再作成されます。
// spark-shell val updates = convertToStringList(dataGen.generateUpdates(10)) val df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) df.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath)
// spark-shell // reload data spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot") val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50) val beginTime = commits(commits.length - 2) // commit time we are interested in // incrementally query data val tripsIncrementalDF = spark.read.format("hudi"). option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). load(basePath) tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show()
// spark-shell val beginTime = "000" // Represents all commits > this time. val endTime = commits(commits.length - 2) // commit time we are interested in //incrementally query data val tripsPointInTimeDF = spark.read.format("hudi"). option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). option(END_INSTANTTIME_OPT_KEY, endTime). load(basePath) tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()
// spark-shell spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot") // fetch total records count spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is not null").count() // fetch two records for soft deletes val softDeleteDs = spark.sql("select * from hudi_trips_snapshot").limit(2) // prepare the soft deletes by ensuring the appropriate fields are nullified val nullifyColumns = softDeleteDs.schema.fields. map(field => (field.name, field.dataType.typeName)). filter(pair => (!HoodieRecord.HOODIE_META_COLUMNS.contains(pair._1) && !Array("ts", "uuid", "partitionpath").contains(pair._1))) val softDeleteDf = nullifyColumns. foldLeft(softDeleteDs.drop(HoodieRecord.HOODIE_META_COLUMNS: _*))( (ds, col) => ds.withColumn(col._1, lit(null).cast(col._2))) // simply upsert the table after setting these fields to null softDeleteDf.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION_OPT_KEY, "upsert"). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath) // reload data spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot") // This should return the same total count as before spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() // This should return (total - 2) count as two records are updated with nulls spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is not null").count()
// spark-shell // fetch total records count spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() // fetch two records to be deleted val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2) // issue deletes val deletes = dataGen.generateDeletes(ds.collectAsList()) val hardDeleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2)) hardDeleteDf.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION_OPT_KEY,"delete"). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath) // run the same read query as above. val roAfterDeleteViewDF = spark. read. format("hudi"). load(basePath) roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot") // fetch should return (total - 2) records spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
データ レイクは、既存のデータを更新できるようになると、データ レイクハウスになります。新しい旅行データを生成し、既存のデータを上書きします。この操作は、Hudi がターゲット パーティション全体を一度に計算するupsert
よりも高速です。ここでは、 upsert
によって行われる自動インデックス作成、事前結合、および再パーティション化をバイパスするための構成を指定します。
// spark-shell spark. read.format("hudi"). load(basePath). select("uuid","partitionpath"). sort("partitionpath","uuid"). show(100, false) val inserts = convertToStringList(dataGen.generateInserts(10)) val df = spark. read.json(spark.sparkContext.parallelize(inserts, 2)). filter("partitionpath = 'americas/united_states/san_francisco'") df.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION.key(),"insert_overwrite"). option(PRECOMBINE_FIELD.key(), "ts"). option(RECORDKEY_FIELD.key(), "uuid"). option(PARTITIONPATH_FIELD.key(), "partitionpath"). option(TBL_NAME.key(), tableName). mode(Append). save(basePath) // Should have different keys now for San Francisco alone, from query before. spark. read.format("hudi"). load(basePath). select("uuid","partitionpath"). sort("partitionpath","uuid"). show(100, false)
-- Alter table name ALTER TABLE oldTableName RENAME TO newTableName -- Alter table add columns ALTER TABLE tableIdentifier ADD COLUMNS(colAndType (,colAndType)*) -- Alter table column type ALTER TABLE tableIdentifier CHANGE COLUMN colName colName colType -- Alter table properties ALTER TABLE tableIdentifier SET TBLPROPERTIES (key = 'value') #Alter table examples --rename to: ALTER TABLE hudi_cow_nonpcf_tbl RENAME TO hudi_cow_nonpcf_tbl2; --add column: ALTER TABLE hudi_cow_nonpcf_tbl2 add columns(remark string); --change column: ALTER TABLE hudi_cow_nonpcf_tbl2 change column uuid uuid bigint; --set properties; alter table hudi_cow_nonpcf_tbl2 set tblproperties (hoodie.keep.max.commits = '10');
現在、 SHOW partitions
ファイル システム テーブル パスに基づいているため、ファイル システム上でのみ機能します。