As empresas que usam o Hudi na produção incluem , , e . Estes são alguns dos maiores do mundo. A chave para o Hudi neste caso de uso é que ele fornece uma pilha incremental de processamento de dados que conduz processamento de baixa latência em dados colunares. Normalmente, os sistemas gravam os dados uma vez usando um formato de arquivo aberto, como Apache Parquet ou ORC, e os armazenam em cima de um armazenamento de objetos altamente escalável ou de um sistema de arquivos distribuído. Hudi serve como um plano de dados para ingerir, transformar e gerenciar esses dados. O Hudi interage com o armazenamento usando a , que é compatível com (mas não necessariamente ideal para) implementações que vão desde HDFS até armazenamento de objetos e sistemas de arquivos na memória.
.
.
É fundamental entender a linha do tempo porque ela serve como uma fonte de registro de eventos verdadeiros para todos os metadados da tabela do Hudi. A linha do tempo é armazenada na pasta .hoodie
ou no balde, no nosso caso. Os eventos são retidos na linha do tempo até serem removidos. A linha do tempo existe para uma tabela geral, bem como para grupos de arquivos, permitindo a reconstrução de um grupo de arquivos aplicando os logs delta ao arquivo base original. Para otimizar gravações/confirmações frequentes, o design do Hudi mantém os metadados pequenos em relação ao tamanho da tabela inteira.
Uma arquitetura Hudi típica depende de pipelines Spark ou Flink para entregar dados às tabelas Hudi. O caminho de gravação do Hudi é otimizado para ser mais eficiente do que simplesmente gravar um arquivo Parquet ou Avro no disco. Hudi analisa as operações de gravação e as classifica como incrementais ( insert
, upsert
, delete
) ou operações em lote ( insert_overwrite
, insert_overwrite_table
, delete_partition
, bulk_insert
) e então aplica necessárias.
A maior força do Hudi é a velocidade com que ele ingere dados de streaming e em lote. Ao fornecer a capacidade de upsert
, o Hudi executa tarefas em ordens de grandeza mais rápidas do que reescrever tabelas ou partições inteiras.
O esquema é um componente crítico de cada tabela Hudi. O Hudi pode impor o esquema ou permitir a evolução do esquema para que o pipeline de streaming de dados possa se adaptar sem quebrar. Além disso, o Hudi impõe o esquema no gravador para garantir que as alterações não interrompam os pipelines. Hudi confia no Avro para armazenar, gerenciar e desenvolver o esquema de uma tabela.
Hudi fornece garantias transacionais ACID para data lakes. Hudi garante gravações atômicas: os commits são feitos atomicamente em uma linha do tempo e recebem um carimbo de data/hora que indica o momento em que a ação é considerada como tendo ocorrido. Hudi isola instantâneos entre processos gravadores, tabelas e leitores para que cada um opere em um instantâneo consistente da tabela. Hudi completa isso com controle de simultaneidade otimista (OCC) entre gravadores e controle de simultaneidade sem bloqueio baseado em MVCC entre serviços de tabela e escritores e entre vários serviços de tabela.
AWS: aws-java-sdk:1.10.34
(ou superior)
Hadoop: hadoop-aws:2.7.3
(ou superior)
os arquivos Jar, descompacte-os e copie-os para /opt/spark/jars
.
mc alias set myminio //<your-MinIO-IP:port> <your-MinIO-access-key> <your-MinIO-secret-key> mc mb myminio/hudi
spark-shell \ --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.0,org.apache.hadoop:hadoop-aws:3.3.4 \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \ --conf 'spark.hadoop.fs.s3a.access.key=<your-MinIO-access-key>' \ --conf 'spark.hadoop.fs.s3a.secret.key=<your-MinIO-secret-key>'\ --conf 'spark.hadoop.fs.s3a.endpoint=<your-MinIO-IP>:9000' \ --conf 'spark.hadoop.fs.s3a.path.style.access=true' \ --conf 'fs.s3a.signing-algorithm=S3SignerType'
import org.apache.hudi.QuickstartUtils._ import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.hudi.common.model.HoodieRecord
val tableName = "hudi_trips_cow" val basePath = "s3a://hudi/hudi_trips_cow" val dataGen = new DataGenerator
A seguir irá gerar novos dados de viagem, carregá-los em um DataFrame e gravar o DataFrame que acabamos de criar no MinIO como uma tabela Hudi. mode(Overwrite)
sobrescreve e recria a tabela caso ela já exista. Os dados de viagens dependem de uma chave de registro ( uuid
), campo de partição ( region/country/city
) e lógica ( ts
) para garantir que os registros de viagem sejam exclusivos para cada partição. Usaremos a operação de gravação padrão, upsert
. Quando você tem uma carga de trabalho sem atualizações, você pode usar insert
ou bulk_insert
, que pode ser mais rápido.
val inserts = convertToStringList(dataGen.generateInserts(10)) val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) df.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Overwrite). save(basePath)
Abra um navegador e faça login no MinIO em //<your-MinIO-IP>:<port>
com sua chave de acesso e chave secreta. Você verá a tabela Hudi no balde.
O bucket também contém um caminho .hoodie
que contém metadados e caminhos americas
e asia
que contém dados.
Dê uma olhada nos metadados. Esta é a aparência do meu caminho .hoodie
depois de concluir todo o tutorial. Podemos ver que modifiquei a tabela na terça-feira, 13 de setembro de 2022, às 9h02, 10h37, 10h48, 10h52 e 10h56.
// spark-shell val tripsSnapshotDF = spark. read. format("hudi"). load(basePath) tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show() spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()
spark.read. format("hudi"). option("as.of.instant", "2022-09-13 09:02:08.200"). load(basePath)
Observe que estamos usando o modo de salvamento append
. Uma diretriz geral é usar o modo append
, a menos que você esteja criando uma nova tabela para que nenhum registro seja substituído. Uma maneira típica de trabalhar com o Hudi é ingerir dados de streaming em tempo real, anexando-os à tabela e, em seguida, escrever alguma lógica que mescle e atualize os registros existentes com base no que acabou de ser anexado. Alternativamente, escrever usando o modo overwrite
exclui e recria a tabela se ela já existir.
// spark-shell val updates = convertToStringList(dataGen.generateUpdates(10)) val df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) df.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath)
// spark-shell // reload data spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot") val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50) val beginTime = commits(commits.length - 2) // commit time we are interested in // incrementally query data val tripsIncrementalDF = spark.read.format("hudi"). option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). load(basePath) tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show()
// spark-shell val beginTime = "000" // Represents all commits > this time. val endTime = commits(commits.length - 2) // commit time we are interested in //incrementally query data val tripsPointInTimeDF = spark.read.format("hudi"). option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). option(END_INSTANTTIME_OPT_KEY, endTime). load(basePath) tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()
// spark-shell spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot") // fetch total records count spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is not null").count() // fetch two records for soft deletes val softDeleteDs = spark.sql("select * from hudi_trips_snapshot").limit(2) // prepare the soft deletes by ensuring the appropriate fields are nullified val nullifyColumns = softDeleteDs.schema.fields. map(field => (field.name, field.dataType.typeName)). filter(pair => (!HoodieRecord.HOODIE_META_COLUMNS.contains(pair._1) && !Array("ts", "uuid", "partitionpath").contains(pair._1))) val softDeleteDf = nullifyColumns. foldLeft(softDeleteDs.drop(HoodieRecord.HOODIE_META_COLUMNS: _*))( (ds, col) => ds.withColumn(col._1, lit(null).cast(col._2))) // simply upsert the table after setting these fields to null softDeleteDf.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION_OPT_KEY, "upsert"). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath) // reload data spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot") // This should return the same total count as before spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() // This should return (total - 2) count as two records are updated with nulls spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is not null").count()
// spark-shell // fetch total records count spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() // fetch two records to be deleted val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2) // issue deletes val deletes = dataGen.generateDeletes(ds.collectAsList()) val hardDeleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2)) hardDeleteDf.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION_OPT_KEY,"delete"). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath) // run the same read query as above. val roAfterDeleteViewDF = spark. read. format("hudi"). load(basePath) roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot") // fetch should return (total - 2) records spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
O data lake se torna um data lakehouse quando ganha a capacidade de atualizar os dados existentes. Vamos gerar alguns novos dados de viagem e, em seguida, substituir os dados existentes. Esta operação é mais rápida do que um upsert
, onde o Hudi calcula toda a partição de destino de uma vez para você. Aqui especificamos a configuração para ignorar a indexação, pré-combinação e reparticionamento automáticos que upsert
faria por você.
// spark-shell spark. read.format("hudi"). load(basePath). select("uuid","partitionpath"). sort("partitionpath","uuid"). show(100, false) val inserts = convertToStringList(dataGen.generateInserts(10)) val df = spark. read.json(spark.sparkContext.parallelize(inserts, 2)). filter("partitionpath = 'americas/united_states/san_francisco'") df.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION.key(),"insert_overwrite"). option(PRECOMBINE_FIELD.key(), "ts"). option(RECORDKEY_FIELD.key(), "uuid"). option(PARTITIONPATH_FIELD.key(), "partitionpath"). option(TBL_NAME.key(), tableName). mode(Append). save(basePath) // Should have different keys now for San Francisco alone, from query before. spark. read.format("hudi"). load(basePath). select("uuid","partitionpath"). sort("partitionpath","uuid"). show(100, false)
-- Alter table name ALTER TABLE oldTableName RENAME TO newTableName -- Alter table add columns ALTER TABLE tableIdentifier ADD COLUMNS(colAndType (,colAndType)*) -- Alter table column type ALTER TABLE tableIdentifier CHANGE COLUMN colName colName colType -- Alter table properties ALTER TABLE tableIdentifier SET TBLPROPERTIES (key = 'value') #Alter table examples --rename to: ALTER TABLE hudi_cow_nonpcf_tbl RENAME TO hudi_cow_nonpcf_tbl2; --add column: ALTER TABLE hudi_cow_nonpcf_tbl2 add columns(remark string); --change column: ALTER TABLE hudi_cow_nonpcf_tbl2 change column uuid uuid bigint; --set properties; alter table hudi_cow_nonpcf_tbl2 set tblproperties (hoodie.keep.max.commits = '10');
Atualmente, SHOW partitions
funcionam apenas em um sistema de arquivos, pois são baseadas no caminho da tabela do sistema de arquivos.