paint-brush
Hudi と MinIO を食用したストリーミング データ レイクの開発 に@minio
7,233 測定値
7,233 測定値

Hudi と MinIO を使用したストリーミング データ レイクの開発

MinIO14m2023/08/29
Read on Terminal Reader

長すぎる; 読むには

Apache Hudi はデータ レイク用の最初のオープン テーブル形式であり、ストリーミング アーキテクチャで検討する価値があります。 Hudi ストレージに MinIO を使用すると、マルチクラウドのデータ レイクと分析への道が開かれます。
featured image - Hudi と MinIO を使用したストリーミング データ レイクの開発
MinIO HackerNoon profile picture
0-item
1-item
2-item
Apache Hudi は、コア ウェアハウスとデータベースの機能をデータ レイクに同时供给するストリーミング データ レイク プラットフォームです。 やのようなオープン ファイル方法を谎称することに満足せず、Hudi はテーブル、トランザクション、更新换代/挿入/削除、长度なインデックス、ストリーミング インジェスト サービス、データ クラスタリング/圧縮の最適化、同時実行性を供给します。


2016 年に導入された Hudi は、Hadoop エコシステムにしっかりと根付いており、「Hadoop Upserts と D Incrementals」という名前の背後にある预示を説明しています。これは、HDFS 上の大規模な概述データセットのストレージを治理するために開発されました。 Hudi の主な的目的は、ストリーミング データの取り込み中の遅延を短縮することです。


フーディテーブル


時間の経過とともに、Hudi はと MinIO を含むオブジェクト ストレージを运行するように進化してきました。 HDFS からの Hudi の移行は、パフォーマンス、スケーラブル、クラウドネイティブのオブジェクト ストレージのために従来の HDFS を残す游戏のより大きなトレンドと連動しています。 Apache Spark、Flink、Presto、Trino などの分析一下ワークロードを公路化する最適化を具备するという Hudi の約束は、大規模なクラウドネイティブ アプリケーションのパフォーマンスという MinIO の約束とうまく一样します。


Hudi を運用環境で使用している企業には、 、 、 、 があります。これらは世界最大級のの一部です。このユースケースにおける Hudi の重要な点は、列形式データに対して低レイテンシの処理を実行する増分データ処理スタックを提供することです。通常、システムは、Apache Parquet や ORC などのオープン ファイル形式を使用してデータを一度書き出し、これを拡張性の高いオブジェクト ストレージまたは分散ファイル システム上に保存します。 Hudi は、このデータを取り込み、変換、管理するためのデータ プレーンとして機能します。 Hudi は、 を使用してストレージと対話します。これは、HDFS からオブジェクト ストレージ、インメモリ ファイル システムに至るまでの実装と互換性があります (ただし、必ずしも最適であるとは限りません)。

Hudi ファイル形式

Hudi は、ベース ファイルと、指定のベース ファイルに対する升级更新/変更を另存するデルタ ログ ファイルを的使用します。ベース ファイルは、Parquet (列状) または HFile (インデックス付き) にすることができます。変更が発生したときにベース ファイルに変更を記録することが适度的であるため、デルタ ログは (行) として另存されます。


Hudi は、独特のベース ファイルに対するすべての変更をブロックのシーケンスとしてエンコードします。ブロックには、データ ブロック、削除ブロック、またはロールバック ブロックを同一个できます。これらのブロックは、新しいベース ファイルを派生するためにマージされます。このエンコーディングでは、自我完結型のログも制作されます。



Hudi ファイル形式

Hudi テーブルの形式

テーブル结构は、テーブルのファイル レイアウト、テーブルのスキーマ、およびテーブルへの変更を追跡するメタデータで構成されます。 Hudi は、ストリーム処理の重視と共同して、書き込み時のスキーマを強制し、下位互換性のない変更によってパイプラインが终断されないようにします。


Hudi は、对应のテーブル/パーティションのファイルをグループ化し、レコード キーとファイル グループをマッピングします。所诉したように、すべての游戏升级は对应のファイル グループの差分ログ ファイルに記録されます。この設計は、クエリを処理するためにすべてのベース ファイルに対してすべてのデータ レコードをマージする相应がある Hive ACID よりも効率的です。 Hudi の設計は、データセット全体人员ではなくファイル グループの差分ログを処理するため、キーベースの快速路な游戏升级/削除を想定しています。


Hudi は、单一のテーブル/パーティションのファイルをグループ化し、レコード キーとファイル グループをマッピングします。上述情况したように、すべての创新は单一のファイル グループの差分ログ ファイルに記録されます。この設計は、クエリを処理するためにすべてのベース ファイルに対してすべてのデータ レコードをマージする有需要がある Hive ACID よりも効率的です。 Hudi の設計は、データセット全体人员ではなくファイル グループの差分ログを処理するため、キーベースの快速な创新/削除を想定しています。


Hudi テーブルの形式


タイムラインは、 Hudi のすべてのテーブル メタデータの信頼できるイベント ログのソースとして機能するため、理解することが重要です。タイムラインは.hoodieフォルダー、この場合はバケットに保存されます。イベントは削除されるまでタイムライン上に保持されます。タイムラインはファイル グループだけでなくテーブル全体にも存在し、差分ログを元のベース ファイルに適用することでファイル グループを再構築できます。頻繁な書き込み/コミットを最適化するために、Hudi の設計では、テーブル全体のサイズに比べてメタデータが小さく保たれます。


タイムライン上の新しいイベントは内层メタデータ テーブルに另存され、一連のマージ オン リード テーブルとして実装されるため、書き込み増幅が低くなります。その結果、Hudi はメタデータに対する光速な変更を及时に吸収できます。さらに、メタデータ テーブルは HFile ベース ファイル方法を实用し、メタデータ テーブル全队を読み取る必不可少性を避让する一連のインデックス付きキー検索によりパフォーマンスをさらに最適化します。テーブルの那部であるすべての电学ファイル パスは、コストと時間のかかるクラウド ファイルの一覧觉得を避让するためにメタデータに含まれています。

ヒューディライター

Hudi ライターは、Hudi が升级や削除などの极其に高な増分変更を能够にする ACID トランザクション サポートを備えた高性价比参数書き込みレイヤーとして機能するアーキテクチャを促進します。


一般的な Hudi アーキテクチャは、Spark または Flink パイプラインに依存してデータを Hudi テーブルに配信します。 Hudi 書き込みパスは、Parquet または Avro ファイルを単にディスクに書き込むよりも効率的になるように最適化されています。 Hudi は書き込み操作を分析し、増分操作 ( insertupsertdelete ) またはバッチ操作 ( insert_overwriteinsert_overwrite_tabledelete_partitionbulk_insert ) として分類し、必要な適用します。


Hudi ライターはメタデータの固执も承担します。各レコードには、コミット時刻とそのレコードに固定性のシーケンス番號 (これは Kafka オフセットに似ています) が書き込まれ、レコード レベルの変更を導出できるようになります。ユーザーは、受信データ ストリーム内のイベント時間フィールドを确定し、メタデータと Hudi タイムラインを用到してそれらを追跡することもできます。 Hudi には各レコードの到着時刻とイベント時刻の両方が含まれており、複雑なストリーム処理パイプラインに強力なを構築できるため、ストリーム処理が大幅度的に缓解される可以性があります。

Hudi リーダー

ライターとリーダーの間のスナップショットの分離により、Spark、Hive、Flink、Prest、Trino、Impala を含むすべての重点なデータ レイク クエリ エンジンからテーブル スナップショットを一貫してクエリできるようになります。 Parquet や Avro と同様に、Hudi テーブルはやなどによって冗余テーブルとして読み取ることができます。


Hudi リーダーは軽量になるように開発されています。已经な限り、Presto や Spark などのエンジン僵板のベクトル化リーダーとキャッシュが用到的されます。 Hudi がクエリのベース ファイルとログ ファイルをマージする一定がある場合、Hudi はスピル已经マップや遅延読み取りなどのメカニズムを用到的してマージ パフォーマンスを向下させると同時に、読み取りに最適化されたクエリも可以提供します。


Hudi には、是に強力な増分クエリ機能が往往含まれています。メタデータはその中核であり、大規模なコミットを小さなチャンクとして操作できるようにし、データの書き込みと増分クエリを非常に分離します。メタデータを効率的に操作することで、タイム トラベルは、開始点と終了点が定義された単なる増分クエリになります。 Hudi は、随便の時点でキーを単一のファイル グループにアトミックにマップし、Hudi テーブルで非常な CDC 機能をサポートします。 Hudi ライターのセクションで説明したように、各テーブルはファイル グループで構成され、各ファイル グループには孤身一人の个人完結型メタデータがあります。

フーディ万歳!

Hudi の最大の強みは、ストリーミング データとバッチ データの両方を取り込む速度です。 upsert機能を提供することにより、Hudi はテーブル全体やパーティション全体を書き換えるよりもはるかに高速にタスクを実行します。


Hudi の取り込み加速度を活用するには、データ レイクハウスには高い IOPS とスループットを備えたストレージ レイヤーが这个必备です。 MinIO のスケーラビリティと高效能の組み合わせは、まさに Hudi が这个必备とするものです。 MinIO は、リアルタイムのエンタープライズ データ レイクを強化するのに这个必备なパフォーマンスを相当に備えています。 は、 32 ノードの既製 NVMe SSD。


アクティブなエンタープライズ Hudi データ レイクには、大批量の小さな Parquet ファイルと Avro ファイルが存放されています。 MinIO には、データ レイクの高速收费站化を或者にする许多含まれています。小さなオブジェクトはメタデータとともにインラインで存放されるため、Hudi メタデータやインデックスなどの小さなファイルの読み取りと書き込みの両方に必要的な IOPS が削減されます。


スキーマは、すべての Hudi テーブルの重要なコンポーネントです。 Hudi はスキーマを強制したり、ストリーミング データ パイプラインが中断することなく適応できるようにスキーマの進化を許可したりできます。さらに、Hudi は変更によってパイプラインが中断されないようにスキーマオンライターを強制します。 Hudi は、Avro を利用してテーブルのスキーマを保存、管理、展開します。


Hudi は、データ レイクにACID トランザクション保証を提供します。 Hudi はアトミックな書き込みを保証します。コミットはタイムラインに対してアトミックに行われ、アクションが発生したとみなされる時刻を示すタイムスタンプが与えられます。 Hudi は、ライター、テーブル、リーダーのプロセス間でスナップショットを分離し、それぞれがテーブルの一貫したスナップショットで動作するようにします。 Hudi は、ライター間のオプティミスティック同時実行制御 (OCC) と、テーブル サービスとライターの間、および複数のテーブル サービス間のノンブロッキング MVCC ベースの同時実行制御によってこれを完成させます。

Hudi と MinIO のチュートリアル

このチュートリアルでは、Spark、Hudi、および MinIO のセットアップ手順を説明し、いくつかの总体的な Hudi 機能を紹介します。このチュートリアルは、 に基づいており、クラウドネイティブの MinIO オブジェクト ストレージで動作するように調整されています。


バージョン控制されたバケットを进行すると、Hudi にメンテナンスのオーバーヘッドがいくらか追加されることに需要注意してください。オブジェクトが削除されると、が弄成されます。 Hudi が削除マーカーの数は時間の経過とともに増加します。削除マーカーの数が 1000 に達するとリスト进行が变慢する可以性があるため、これらの削除マーカーをクリーンアップするように正しく構成することが核心です。Hudi プロジェクトの控制者は、ライフサイクル ルールを便用して 1 日後に削除マーカーをクリーンアップすることを推奨しています。

前提条件

Apache Spark。


MinIO。 IP アドレス、コンソールの TCP ポート、アクセス キー、および秘蜜キーを記録します。


MinIO クライアント。


S3A を施用してオブジェクト ストレージを操作方法するには、AWS および AWS Hadoop ライブラリをダウンロードし、クラスパスに追加します。
  • AWS: aws-java-sdk:1.10.34 (またはそれ以降)

  • Hadoop: hadoop-aws:2.7.3 (またはそれ以降)


Jar ファイルを解凍し、 /opt/spark/jarsにコピーします。

MinIO バケットを作成する

MinIO クライアントを操作して、Hudi データを格納するバケットを弄成します。
 mc alias set myminio //<your-MinIO-IP:port> <your-MinIO-access-key> <your-MinIO-secret-key> mc mb myminio/hudi

Hudi で Spark を起動する

ストレージに MinIO を选择するように構成された Hudi で Spark シェルを起動します。 MinIO 設定を选择して S3A のエントリを構成してください。


 spark-shell \ --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.0,org.apache.hadoop:hadoop-aws:3.3.4 \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \ --conf 'spark.hadoop.fs.s3a.access.key=<your-MinIO-access-key>' \ --conf 'spark.hadoop.fs.s3a.secret.key=<your-MinIO-secret-key>'\ --conf 'spark.hadoop.fs.s3a.endpoint=<your-MinIO-IP>:9000' \ --conf 'spark.hadoop.fs.s3a.path.style.access=true' \ --conf 'fs.s3a.signing-algorithm=S3SignerType'


次に、Spark 内で Hudi を成长期化します。
 import org.apache.hudi.QuickstartUtils._ import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.hudi.common.model.HoodieRecord


これにより、Hudi を繰り返し在使用してを制作することが簡単になることに重视してください。

テーブルを作成する

試してみて、Scala を选择してシンプルな小さな Hudi テーブルを制成してください。 Hudi DataGenerator は、 に基づいてサンプルの挿入と更行を导出する及时かつ簡単な技巧です。


 val tableName = "hudi_trips_cow" val basePath = "s3a://hudi/hudi_trips_cow" val dataGen = new DataGenerator

Hudi にデータを挿入し、MinIO にテーブルを書き込みます

以下は、新しい旅行データを生成し、DataFrame にロードし、作成した DataFrame を Hudi テーブルとして MinIO に書き込みます。 mode(Overwrite)テーブルがすでに存在する場合、テーブルを上書きして再作成します。旅行データは、レコード キー ( uuid )、パーティション フィールド ( region/country/city )、およびロジック ( ts ) に依存して、旅行レコードが各パーティションで一意であることを保証します。デフォルトの書き込み操作upsertを使用します。更新のないワークロードがある場合は、 insertまたはbulk_insert使用すると、より高速になる可能性があります。


 val inserts = convertToStringList(dataGen.generateInserts(10)) val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) df.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Overwrite). save(basePath)


ブラウザを開き、アクセス キーと秘密キーを使用して//<your-MinIO-IP>:<port>にある MinIO にログインします。バケット内に Hudi テーブルが表示されます。


MinIO コンソール


このバケットには、メタデータを含む.hoodieパスと、データを含むamericasおよびasiaパスも含まれています。


メタデータ


メタデータを見てみましょう。チュートリアル全体を完了した後の.hoodieパスは次のようになります。 2022 年 9 月 13 日火曜日の 9:02、10:37、10:48、10:52、10:56 にテーブルを変更したことがわかります。


チュートリアル完了後の .hoodie パス

クエリデータ

Hudi データを DataFrame にロードし、サンプル クエリを実行してみましょう。
 // spark-shell val tripsSnapshotDF = spark. read. format("hudi"). load(basePath) tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show() spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()

Hudi とのタイムトラベル

いいえ、1988 年にコンサートを観に行ったという話ではありません。


Hudi テーブルに書き込むたびに、新しいスナップショットが制成されます。スナップショットは、タイム トラベル クエリで根据できるテーブルのバージョンと考えてください。


いくつかのタイム トラベル クエリを試してください (タイムスタンプを適切なものに変更する需要があります)。


 spark.read. format("hudi"). option("as.of.instant", "2022-09-13 09:02:08.200"). load(basePath)

データを更新する

このプロセスは、很早以前に新しいデータを挿入したときと似ています。 Hudi のデータ游戏游戏更新機能を紹介するために、既存の旅途レコードの游戏游戏更新を提取し、それらを DataFrame にロードして、その DataFrame を MinIO に既に保持されている Hudi テーブルに書き込みます。


append保存モードを使用していることに注意してください。一般的なガイドラインは、新しいテーブルを作成する場合を除き、レコードが上書きされないようにappendモードを使用することです。 Hudi を使用する一般的な方法は、ストリーミング データをリアルタイムで取り込んでテーブルに追加し、追加された内容に基づいて既存のレコードをマージおよび更新するロジックを作成することです。あるいは、 overwriteモードを使用して書き込むと、テーブルがすでに存在する場合は削除して再作成されます。


 // spark-shell val updates = convertToStringList(dataGen.generateUpdates(10)) val df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) df.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath)


データをクエリすると、内容更新された去旅游記録が表明されます。

インクリメンタルクエリ

Hudi は、増分クエリを运行して、相关のタイムスタンプ以降に変更されたレコードのストリームを保证できます。有必要的なのは、現在のコミットまでの変更を確認するために変更がストリーミングされる開始時刻を确定することだけです。また、終了時刻を运行してストリームを制限することもできます。


増分クエリは、バッチ データにストリーミング パイプラインを構築できるため、Hudi にとって极为に重点です。


 // spark-shell // reload data spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot") val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50) val beginTime = commits(commits.length - 2) // commit time we are interested in // incrementally query data val tripsIncrementalDF = spark.read.format("hudi"). option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). load(basePath) tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show()

ポイントインタイムクエリ

Hudi は、某一の日時のデータをクエリできます。


 // spark-shell val beginTime = "000" // Represents all commits > this time. val endTime = commits(commits.length - 2) // commit time we are interested in //incrementally query data val tripsPointInTimeDF = spark.read.format("hudi"). option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). option(END_INSTANTTIME_OPT_KEY, endTime). load(basePath) tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()

論理的な削除によるデータの削除

Hudi は、レコードを削除する 2 つの異なる方案をサポートしています。論理的な削除では、レコード キーが始终维持され、他のすべてのフィールドの値が null になります。論理的な削除は MinIO に始终维持され、高中物理的な削除を便用してのみデータ レイクから削除されます。


 // spark-shell spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot") // fetch total records count spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is not null").count() // fetch two records for soft deletes val softDeleteDs = spark.sql("select * from hudi_trips_snapshot").limit(2) // prepare the soft deletes by ensuring the appropriate fields are nullified val nullifyColumns = softDeleteDs.schema.fields. map(field => (field.name, field.dataType.typeName)). filter(pair => (!HoodieRecord.HOODIE_META_COLUMNS.contains(pair._1) && !Array("ts", "uuid", "partitionpath").contains(pair._1))) val softDeleteDf = nullifyColumns. foldLeft(softDeleteDs.drop(HoodieRecord.HOODIE_META_COLUMNS: _*))( (ds, col) => ds.withColumn(col._1, lit(null).cast(col._2))) // simply upsert the table after setting these fields to null softDeleteDf.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION_OPT_KEY, "upsert"). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath) // reload data spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot") // This should return the same total count as before spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() // This should return (total - 2) count as two records are updated with nulls spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is not null").count()

物理削除によるデータの削除

対照的に、ハード削除は、私たちが削除として認識しているものです。レコード キーと関連フィールドがテーブルから削除されます。


 // spark-shell // fetch total records count spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() // fetch two records to be deleted val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2) // issue deletes val deletes = dataGen.generateDeletes(ds.collectAsList()) val hardDeleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2)) hardDeleteDf.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION_OPT_KEY,"delete"). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath) // run the same read query as above. val roAfterDeleteViewDF = spark. read. format("hudi"). load(basePath) roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot") // fetch should return (total - 2) records spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()

上書き挿入

データ レイクは、既存のデータを更新できるようになると、データ レイクハウスになります。新しい旅行データを生成し、既存のデータを上書きします。この操作は、Hudi がターゲット パーティション全体を一度に計算するupsertよりも高速です。ここでは、 upsertによって行われる自動インデックス作成、事前結合、および再パーティション化をバイパスするための構成を指定します。


 // spark-shell spark. read.format("hudi"). load(basePath). select("uuid","partitionpath"). sort("partitionpath","uuid"). show(100, false) val inserts = convertToStringList(dataGen.generateInserts(10)) val df = spark. read.json(spark.sparkContext.parallelize(inserts, 2)). filter("partitionpath = 'americas/united_states/san_francisco'") df.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION.key(),"insert_overwrite"). option(PRECOMBINE_FIELD.key(), "ts"). option(RECORDKEY_FIELD.key(), "uuid"). option(PARTITIONPATH_FIELD.key(), "partitionpath"). option(TBL_NAME.key(), tableName). mode(Append). save(basePath) // Should have different keys now for San Francisco alone, from query before. spark. read.format("hudi"). load(basePath). select("uuid","partitionpath"). sort("partitionpath","uuid"). show(100, false)

テーブルスキーマとパーティショニングを進化させる

スキーマの進化により、Hudi テーブルのスキーマを変更して、時間の経過とともにデータ内で発生する変更に適応できます。


一些に、スキーマとパーティショニングをクエリして展開する做法の例をいくつか示します。より詳細な説明については、 を按照してください。。これらのコマンドを実行すると、Hudi テーブル スキーマがこのチュートリアルとは異なるものに変更されることに目光してください。


 -- Alter table name ALTER TABLE oldTableName RENAME TO newTableName -- Alter table add columns ALTER TABLE tableIdentifier ADD COLUMNS(colAndType (,colAndType)*) -- Alter table column type ALTER TABLE tableIdentifier CHANGE COLUMN colName colName colType -- Alter table properties ALTER TABLE tableIdentifier SET TBLPROPERTIES (key = 'value') #Alter table examples --rename to: ALTER TABLE hudi_cow_nonpcf_tbl RENAME TO hudi_cow_nonpcf_tbl2; --add column: ALTER TABLE hudi_cow_nonpcf_tbl2 add columns(remark string); --change column: ALTER TABLE hudi_cow_nonpcf_tbl2 change column uuid uuid bigint; --set properties; alter table hudi_cow_nonpcf_tbl2 set tblproperties (hoodie.keep.max.commits = '10');


現在、 SHOW partitionsファイル システム テーブル パスに基づいているため、ファイル システム上でのみ機能します。


このチュートリアルでは、Spark を施用して Hudi の機能を紹介しました。ただし、Hudi は複数のテーブル タイプ/クエリ タイプをサポートでき、Hudi テーブルは Hive、Spark、Presto などのクエリ エンジンからクエリできます。 Hudi プロジェクトには、すべての依存システムがローカルで実行されている Docker ベースのセットアップでこれらすべてを紹介するあります。

やあ!やあ! MinIO で Hudi データレイクを構築しましょう!

Apache Hudi はデータ レイク用の初期のオープン テーブル主要形式であり、ストリーミング アーキテクチャで検討する価値があります。 は活発であり、クラウドネイティブのストリーミング データ レイクのために Hadoop/HDFS を Hudi/オブジェクト ストレージに置き換えることに重中之重が置かれています。 Hudi ストレージに MinIO を适用すると、マルチクラウドのデータ レイクと探讨への道が開かれます。 MinIO には、オンプレミス、パブリック/プライベート クラウド、エッジなどの場所間でデータを减幅するための含まれており、初中地理的な負荷细化や高ホットホット フェイルオーバーなど、企業が重要とする優れた機能を実現します。


今すぐ MinIO で Hudi を試してください。ご質問がある場合、またはヒントを共出したい場合は、 を通じてご連絡ください。


でも公開されています。


바카라사이트 바카라사이트 온라인바카라