Les entreprises utilisant Hudi en production incluent , , et . Ce sont quelques-uns des plus grands au monde. La clé de Hudi dans ce cas d’utilisation est qu’il fournit une pile de traitement de données incrémentielle qui effectue un traitement à faible latence sur les données en colonnes. En règle générale, les systèmes écrivent les données une seule fois en utilisant un format de fichier ouvert comme Apache Parquet ou ORC, et les stockent sur un stockage d'objets hautement évolutif ou un système de fichiers distribué. Hudi sert de plan de données pour ingérer, transformer et gérer ces données. Hudi interagit avec le stockage à l'aide de l' , qui est compatible (mais pas nécessairement optimale) avec les implémentations allant de HDFS au stockage d'objets en passant par les systèmes de fichiers en mémoire.
.
.
La chronologie est essentielle à comprendre car elle sert de source de journal d'événements véridique pour toutes les métadonnées de la table de Hudi. La chronologie est stockée dans le dossier .hoodie
, ou bucket dans notre cas. Les événements sont conservés sur la chronologie jusqu'à ce qu'ils soient supprimés. La chronologie existe pour un tableau global ainsi que pour les groupes de fichiers, permettant la reconstruction d'un groupe de fichiers en appliquant les journaux delta au fichier de base d'origine. Afin d'optimiser les écritures/validations fréquentes, la conception de Hudi maintient les métadonnées petites par rapport à la taille de la table entière.
Une architecture Hudi typique s'appuie sur les pipelines Spark ou Flink pour fournir des données aux tables Hudi. Le chemin d'écriture Hudi est optimisé pour être plus efficace que la simple écriture d'un fichier Parquet ou Avro sur le disque. Hudi analyse les opérations d'écriture et les classe en opérations incrémentielles ( insert
, upsert
, delete
) ou par lots ( insert_overwrite
, insert_overwrite_table
, delete_partition
, bulk_insert
), puis applique nécessaires.
La plus grande force de Hudi réside dans la rapidité avec laquelle il ingère à la fois les données en streaming et par lots. En offrant la possibilité d' upsert
, Hudi exécute des tâches de plusieurs ordres de grandeur plus rapidement que la réécriture de tables ou de partitions entières.
Le schéma est un élément essentiel de chaque table Hudi. Hudi peut appliquer le schéma ou permettre son évolution afin que le pipeline de données en streaming puisse s'adapter sans se rompre. De plus, Hudi applique le schéma sur l'écrivain pour garantir que les modifications ne interrompent pas les pipelines. Hudi s'appuie sur Avro pour stocker, gérer et faire évoluer le schéma d'une table.
Hudi fournit des garanties transactionnelles ACID aux lacs de données. Hudi garantit les écritures atomiques : les validations sont effectuées de manière atomique sur une chronologie et reçoivent un horodatage qui indique l'heure à laquelle l'action est censée s'être produite. Hudi isole les instantanés entre les processus d'écriture, de table et de lecture afin que chacun fonctionne sur un instantané cohérent de la table. Hudi complète cela avec un contrôle de concurrence optimiste (OCC) entre les rédacteurs et un contrôle de concurrence non bloquant basé sur MVCC entre les services de table et les rédacteurs et entre plusieurs services de table.
AWS : aws-java-sdk:1.10.34
(ou version ultérieure)
Hadoop : hadoop-aws:2.7.3
(ou supérieur)
les fichiers Jar, décompressez-les et copiez-les dans /opt/spark/jars
.
mc alias set myminio //<your-MinIO-IP:port> <your-MinIO-access-key> <your-MinIO-secret-key> mc mb myminio/hudi
spark-shell \ --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.0,org.apache.hadoop:hadoop-aws:3.3.4 \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \ --conf 'spark.hadoop.fs.s3a.access.key=<your-MinIO-access-key>' \ --conf 'spark.hadoop.fs.s3a.secret.key=<your-MinIO-secret-key>'\ --conf 'spark.hadoop.fs.s3a.endpoint=<your-MinIO-IP>:9000' \ --conf 'spark.hadoop.fs.s3a.path.style.access=true' \ --conf 'fs.s3a.signing-algorithm=S3SignerType'
import org.apache.hudi.QuickstartUtils._ import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.hudi.common.model.HoodieRecord
val tableName = "hudi_trips_cow" val basePath = "s3a://hudi/hudi_trips_cow" val dataGen = new DataGenerator
Ce qui suit générera de nouvelles données de voyage, les chargera dans un DataFrame et écrira le DataFrame que nous venons de créer dans MinIO en tant que table Hudi. mode(Overwrite)
écrase et recrée la table dans le cas où elle existe déjà. Les données de voyage s'appuient sur une clé d'enregistrement ( uuid
), un champ de partition ( region/country/city
) et une logique ( ts
) pour garantir que les enregistrements de voyage sont uniques pour chaque partition. Nous utiliserons l'opération d'écriture par défaut, upsert
. Lorsque vous avez une charge de travail sans mises à jour, vous pouvez utiliser insert
ou bulk_insert
, ce qui pourrait être plus rapide.
val inserts = convertToStringList(dataGen.generateInserts(10)) val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) df.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Overwrite). save(basePath)
Ouvrez un navigateur et connectez-vous à MinIO sur //<your-MinIO-IP>:<port>
avec votre clé d'accès et votre clé secrète. Vous verrez la table Hudi dans le seau.
Le compartiment contient également un chemin .hoodie
qui contient des métadonnées, ainsi que des chemins americas
et asia
qui contiennent des données.
Jetez un œil aux métadonnées. Voici à quoi ressemble mon chemin .hoodie
après avoir terminé l'intégralité du didacticiel. On voit que j'ai modifié le tableau le mardi 13 septembre 2022 à 9h02, 10h37, 10h48, 10h52 et 10h56.
// spark-shell val tripsSnapshotDF = spark. read. format("hudi"). load(basePath) tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show() spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()
spark.read. format("hudi"). option("as.of.instant", "2022-09-13 09:02:08.200"). load(basePath)
Notez que nous utilisons le mode de sauvegarde append
. Une ligne directrice générale consiste à utiliser le mode append
, sauf si vous créez une nouvelle table afin qu'aucun enregistrement ne soit écrasé. Une façon typique de travailler avec Hudi consiste à ingérer des données en streaming en temps réel, en les ajoutant à la table, puis à écrire une logique qui fusionne et met à jour les enregistrements existants en fonction de ce qui vient d'être ajouté. Alternativement, l'écriture en mode overwrite
supprime et recrée la table si elle existe déjà.
// spark-shell val updates = convertToStringList(dataGen.generateUpdates(10)) val df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) df.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath)
// spark-shell // reload data spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot") val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50) val beginTime = commits(commits.length - 2) // commit time we are interested in // incrementally query data val tripsIncrementalDF = spark.read.format("hudi"). option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). load(basePath) tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show()
// spark-shell val beginTime = "000" // Represents all commits > this time. val endTime = commits(commits.length - 2) // commit time we are interested in //incrementally query data val tripsPointInTimeDF = spark.read.format("hudi"). option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). option(END_INSTANTTIME_OPT_KEY, endTime). load(basePath) tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()
// spark-shell spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot") // fetch total records count spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is not null").count() // fetch two records for soft deletes val softDeleteDs = spark.sql("select * from hudi_trips_snapshot").limit(2) // prepare the soft deletes by ensuring the appropriate fields are nullified val nullifyColumns = softDeleteDs.schema.fields. map(field => (field.name, field.dataType.typeName)). filter(pair => (!HoodieRecord.HOODIE_META_COLUMNS.contains(pair._1) && !Array("ts", "uuid", "partitionpath").contains(pair._1))) val softDeleteDf = nullifyColumns. foldLeft(softDeleteDs.drop(HoodieRecord.HOODIE_META_COLUMNS: _*))( (ds, col) => ds.withColumn(col._1, lit(null).cast(col._2))) // simply upsert the table after setting these fields to null softDeleteDf.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION_OPT_KEY, "upsert"). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath) // reload data spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot") // This should return the same total count as before spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() // This should return (total - 2) count as two records are updated with nulls spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is not null").count()
// spark-shell // fetch total records count spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() // fetch two records to be deleted val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2) // issue deletes val deletes = dataGen.generateDeletes(ds.collectAsList()) val hardDeleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2)) hardDeleteDf.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION_OPT_KEY,"delete"). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath) // run the same read query as above. val roAfterDeleteViewDF = spark. read. format("hudi"). load(basePath) roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot") // fetch should return (total - 2) records spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
Le lac de données devient un lac de données lorsqu'il acquiert la capacité de mettre à jour les données existantes. Nous allons générer de nouvelles données de voyage, puis écraser nos données existantes. Cette opération est plus rapide qu'un upsert
où Hudi calcule la totalité de la partition cible en même temps pour vous. Ici, nous spécifions la configuration afin de contourner l'indexation, la précombinaison et le repartitionnement automatiques upsert
ferait pour vous.
// spark-shell spark. read.format("hudi"). load(basePath). select("uuid","partitionpath"). sort("partitionpath","uuid"). show(100, false) val inserts = convertToStringList(dataGen.generateInserts(10)) val df = spark. read.json(spark.sparkContext.parallelize(inserts, 2)). filter("partitionpath = 'americas/united_states/san_francisco'") df.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION.key(),"insert_overwrite"). option(PRECOMBINE_FIELD.key(), "ts"). option(RECORDKEY_FIELD.key(), "uuid"). option(PARTITIONPATH_FIELD.key(), "partitionpath"). option(TBL_NAME.key(), tableName). mode(Append). save(basePath) // Should have different keys now for San Francisco alone, from query before. spark. read.format("hudi"). load(basePath). select("uuid","partitionpath"). sort("partitionpath","uuid"). show(100, false)
-- Alter table name ALTER TABLE oldTableName RENAME TO newTableName -- Alter table add columns ALTER TABLE tableIdentifier ADD COLUMNS(colAndType (,colAndType)*) -- Alter table column type ALTER TABLE tableIdentifier CHANGE COLUMN colName colName colType -- Alter table properties ALTER TABLE tableIdentifier SET TBLPROPERTIES (key = 'value') #Alter table examples --rename to: ALTER TABLE hudi_cow_nonpcf_tbl RENAME TO hudi_cow_nonpcf_tbl2; --add column: ALTER TABLE hudi_cow_nonpcf_tbl2 add columns(remark string); --change column: ALTER TABLE hudi_cow_nonpcf_tbl2 change column uuid uuid bigint; --set properties; alter table hudi_cow_nonpcf_tbl2 set tblproperties (hoodie.keep.max.commits = '10');
Actuellement, SHOW partitions
ne fonctionnent que sur un système de fichiers, car elles sont basées sur le chemin de la table du système de fichiers.