उत्पादन में हुडी का उपयोग करने वाली कंपनियों में , , और शामिल हैं। ये दुनिया की कुछ सबसे बड़ी हैं। इस उपयोग के मामले में हुडी की कुंजी यह है कि यह एक वृद्धिशील डेटा प्रोसेसिंग स्टैक प्रदान करता है जो स्तंभ डेटा पर कम-विलंबता प्रसंस्करण का संचालन करता है। आमतौर पर, सिस्टम Apache Parquet या ORC जैसे खुले फ़ाइल प्रारूप का उपयोग करके एक बार डेटा लिखते हैं, और इसे अत्यधिक स्केलेबल ऑब्जेक्ट स्टोरेज या वितरित फ़ाइल सिस्टम के शीर्ष पर संग्रहीत करते हैं। हुडी इस डेटा को ग्रहण करने, बदलने और प्रबंधित करने के लिए एक डेटा प्लेन के रूप में कार्य करता है। हुडी का उपयोग करके स्टोरेज के साथ इंटरैक्ट करता है, जो HDFS से लेकर ऑब्जेक्ट स्टोरेज से लेकर इन-मेमोरी फाइल सिस्टम तक के कार्यान्वयन के साथ संगत है (लेकिन जरूरी नहीं कि इसके लिए इष्टतम हो)।
.
.
समयरेखा को समझना महत्वपूर्ण है क्योंकि यह हुडी के सभी टेबल मेटाडेटा के लिए सत्य इवेंट लॉग के स्रोत के रूप में कार्य करता है। टाइमलाइन हमारे मामले में .hoodie
फ़ोल्डर या बकेट में संग्रहीत होती है। इवेंट को टाइमलाइन पर तब तक बनाए रखा जाता है जब तक उन्हें हटा नहीं दिया जाता। समयरेखा समग्र तालिका के साथ-साथ फ़ाइल समूहों के लिए भी मौजूद है, जो मूल आधार फ़ाइल में डेल्टा लॉग को लागू करके फ़ाइल समूह के पुनर्निर्माण को सक्षम करती है। बार-बार लिखने/कमिट करने के लिए अनुकूलन करने के लिए, हुडी का डिज़ाइन संपूर्ण तालिका के आकार के सापेक्ष मेटाडेटा को छोटा रखता है।
एक विशिष्ट हुडी आर्किटेक्चर हुडी टेबलों पर डेटा पहुंचाने के लिए स्पार्क या फ्लिंक पाइपलाइनों पर निर्भर करता है। हुडी लेखन पथ को डिस्क पर केवल पारक्वेट या एवरो फ़ाइल लिखने की तुलना में अधिक कुशल बनाने के लिए अनुकूलित किया गया है। हुडी लेखन संचालन का विश्लेषण करता है और उन्हें वृद्धिशील ( insert
, upsert
, delete
) या बैच संचालन ( insert_overwrite
, insert_overwrite_table
, delete_partition
, bulk_insert
) के रूप में वर्गीकृत करता है और फिर आवश्यक लागू करता है।
हुडी की सबसे बड़ी ताकत वह गति है जिसके साथ यह स्ट्रीमिंग और बैच डेटा दोनों को ग्रहण करता है। upsert
की क्षमता प्रदान करके, हुडी संपूर्ण तालिकाओं या विभाजनों को फिर से लिखने की तुलना में तेजी से परिमाण के कार्य आदेशों को निष्पादित करता है।
स्कीमा प्रत्येक हुडी तालिका का एक महत्वपूर्ण घटक है। हुडी स्कीमा को लागू कर सकता है, या यह स्कीमा विकास की अनुमति दे सकता है ताकि स्ट्रीमिंग डेटा पाइपलाइन बिना टूटे अनुकूलित हो सके। इसके अलावा, हुडी यह सुनिश्चित करने के लिए स्कीमा-ऑन-राइटर लागू करता है कि परिवर्तन पाइपलाइनों को न तोड़ें। हुडी टेबल के स्कीमा को संग्रहीत करने, प्रबंधित करने और विकसित करने के लिए एवरो पर निर्भर करता है।
हुडी डेटा लेक को ACID लेनदेन संबंधी गारंटी प्रदान करता है। हुडी परमाणु लेखन को सुनिश्चित करता है: प्रतिबद्धताओं को परमाणु रूप से एक समयरेखा के लिए बनाया जाता है और एक समय टिकट दिया जाता है जो उस समय को दर्शाता है जिस पर कार्रवाई हुई मानी जाती है। हुडी राइटर, टेबल और रीडर प्रक्रियाओं के बीच स्नैपशॉट को अलग करता है, इसलिए प्रत्येक टेबल के सुसंगत स्नैपशॉट पर काम करता है। हुडी ने लेखकों के बीच आशावादी समवर्ती नियंत्रण (ओसीसी) और टेबल सेवाओं और लेखकों के बीच और कई टेबल सेवाओं के बीच गैर-अवरुद्ध एमवीसीसी-आधारित समवर्ती नियंत्रण के साथ इसे पूरा किया।
AWS: aws-java-sdk:1.10.34
(या उच्चतर)
Hadoop: hadoop-aws:2.7.3
(या उच्चतर)
जार फ़ाइलें , उन्हें अनज़िप करें और उन्हें /opt/spark/jars
पर कॉपी करें।
mc alias set myminio //<your-MinIO-IP:port> <your-MinIO-access-key> <your-MinIO-secret-key> mc mb myminio/hudi
spark-shell \ --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.0,org.apache.hadoop:hadoop-aws:3.3.4 \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \ --conf 'spark.hadoop.fs.s3a.access.key=<your-MinIO-access-key>' \ --conf 'spark.hadoop.fs.s3a.secret.key=<your-MinIO-secret-key>'\ --conf 'spark.hadoop.fs.s3a.endpoint=<your-MinIO-IP>:9000' \ --conf 'spark.hadoop.fs.s3a.path.style.access=true' \ --conf 'fs.s3a.signing-algorithm=S3SignerType'
import org.apache.hudi.QuickstartUtils._ import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.hudi.common.model.HoodieRecord
val tableName = "hudi_trips_cow" val basePath = "s3a://hudi/hudi_trips_cow" val dataGen = new DataGenerator
निम्नलिखित नए यात्रा डेटा उत्पन्न करेगा, उन्हें डेटाफ़्रेम में लोड करेगा और हमारे द्वारा बनाए गए डेटाफ़्रेम को मिनिओ में हुडी तालिका के रूप में लिखेगा। mode(Overwrite)
उस स्थिति में तालिका को अधिलेखित और पुन: बनाता है जब वह पहले से मौजूद हो। यात्रा डेटा एक रिकॉर्ड कुंजी ( uuid
), विभाजन फ़ील्ड ( region/country/city
) और तर्क ( ts
) पर निर्भर करता है ताकि यह सुनिश्चित किया जा सके कि यात्रा रिकॉर्ड प्रत्येक विभाजन के लिए अद्वितीय हैं। हम डिफ़ॉल्ट राइट ऑपरेशन, upsert
उपयोग करेंगे। जब आपके पास अपडेट के बिना कार्यभार हो, तो आप insert
या bulk_insert
उपयोग कर सकते हैं जो तेज़ हो सकता है।
val inserts = convertToStringList(dataGen.generateInserts(10)) val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) df.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Overwrite). save(basePath)
एक ब्राउज़र खोलें और अपनी एक्सेस कुंजी और गुप्त कुंजी के साथ //<your-MinIO-IP>:<port>
पर MiniIO में लॉग इन करें। आपको बाल्टी में हुडी टेबल दिखाई देगी।
बकेट में एक .hoodie
पथ भी होता है जिसमें मेटाडेटा होता है, और americas
और asia
पथ होते हैं जिनमें डेटा होता है।
मेटाडेटा पर एक नज़र डालें. संपूर्ण ट्यूटोरियल पूरा करने के बाद मेरा .hoodie
पथ इस तरह दिखता है। हम देख सकते हैं कि मैंने तालिका को मंगलवार 13 सितंबर, 2022 को 9:02, 10:37, 10:48, 10:52 और 10:56 पर संशोधित किया।
// spark-shell val tripsSnapshotDF = spark. read. format("hudi"). load(basePath) tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show() spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()
spark.read. format("hudi"). option("as.of.instant", "2022-09-13 09:02:08.200"). load(basePath)
ध्यान दें कि हम append
सेव मोड का उपयोग कर रहे हैं। एक सामान्य दिशानिर्देश यह है कि जब तक आप एक नई तालिका नहीं बना रहे हों, तब तक append
मोड का उपयोग करें ताकि कोई भी रिकॉर्ड अधिलेखित न हो। हुडी के साथ काम करने का एक विशिष्ट तरीका वास्तविक समय में स्ट्रीमिंग डेटा को शामिल करना, उन्हें तालिका में जोड़ना और फिर कुछ तर्क लिखना है जो अभी जोड़े गए रिकॉर्ड के आधार पर मौजूदा रिकॉर्ड को मर्ज और अपडेट करता है। वैकल्पिक रूप से, overwrite
मोड का उपयोग करके लिखने से तालिका पहले से मौजूद होने पर हटा दी जाती है और पुनः बनाई जाती है।
// spark-shell val updates = convertToStringList(dataGen.generateUpdates(10)) val df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) df.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath)
// spark-shell // reload data spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot") val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50) val beginTime = commits(commits.length - 2) // commit time we are interested in // incrementally query data val tripsIncrementalDF = spark.read.format("hudi"). option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). load(basePath) tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show()
// spark-shell val beginTime = "000" // Represents all commits > this time. val endTime = commits(commits.length - 2) // commit time we are interested in //incrementally query data val tripsPointInTimeDF = spark.read.format("hudi"). option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). option(END_INSTANTTIME_OPT_KEY, endTime). load(basePath) tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()
// spark-shell spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot") // fetch total records count spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is not null").count() // fetch two records for soft deletes val softDeleteDs = spark.sql("select * from hudi_trips_snapshot").limit(2) // prepare the soft deletes by ensuring the appropriate fields are nullified val nullifyColumns = softDeleteDs.schema.fields. map(field => (field.name, field.dataType.typeName)). filter(pair => (!HoodieRecord.HOODIE_META_COLUMNS.contains(pair._1) && !Array("ts", "uuid", "partitionpath").contains(pair._1))) val softDeleteDf = nullifyColumns. foldLeft(softDeleteDs.drop(HoodieRecord.HOODIE_META_COLUMNS: _*))( (ds, col) => ds.withColumn(col._1, lit(null).cast(col._2))) // simply upsert the table after setting these fields to null softDeleteDf.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION_OPT_KEY, "upsert"). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath) // reload data spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot") // This should return the same total count as before spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() // This should return (total - 2) count as two records are updated with nulls spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is not null").count()
// spark-shell // fetch total records count spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() // fetch two records to be deleted val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2) // issue deletes val deletes = dataGen.generateDeletes(ds.collectAsList()) val hardDeleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2)) hardDeleteDf.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION_OPT_KEY,"delete"). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath) // run the same read query as above. val roAfterDeleteViewDF = spark. read. format("hudi"). load(basePath) roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot") // fetch should return (total - 2) records spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
डेटा लेक तब डेटा लेकहाउस बन जाता है जब वह मौजूदा डेटा को अपडेट करने की क्षमता हासिल कर लेता है। हम कुछ नया यात्रा डेटा तैयार करने जा रहे हैं और फिर अपने मौजूदा डेटा को अधिलेखित कर देंगे। यह ऑपरेशन एक upsert
से तेज़ है जहां हुडी आपके लिए एक ही बार में संपूर्ण लक्ष्य विभाजन की गणना करता है। यहां हम स्वचालित अनुक्रमण, पूर्व संयोजन और पुनर्विभाजन को बायपास करने के लिए कॉन्फ़िगरेशन निर्दिष्ट करते हैं जो upsert
आपके लिए करेगा।
// spark-shell spark. read.format("hudi"). load(basePath). select("uuid","partitionpath"). sort("partitionpath","uuid"). show(100, false) val inserts = convertToStringList(dataGen.generateInserts(10)) val df = spark. read.json(spark.sparkContext.parallelize(inserts, 2)). filter("partitionpath = 'americas/united_states/san_francisco'") df.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION.key(),"insert_overwrite"). option(PRECOMBINE_FIELD.key(), "ts"). option(RECORDKEY_FIELD.key(), "uuid"). option(PARTITIONPATH_FIELD.key(), "partitionpath"). option(TBL_NAME.key(), tableName). mode(Append). save(basePath) // Should have different keys now for San Francisco alone, from query before. spark. read.format("hudi"). load(basePath). select("uuid","partitionpath"). sort("partitionpath","uuid"). show(100, false)
-- Alter table name ALTER TABLE oldTableName RENAME TO newTableName -- Alter table add columns ALTER TABLE tableIdentifier ADD COLUMNS(colAndType (,colAndType)*) -- Alter table column type ALTER TABLE tableIdentifier CHANGE COLUMN colName colName colType -- Alter table properties ALTER TABLE tableIdentifier SET TBLPROPERTIES (key = 'value') #Alter table examples --rename to: ALTER TABLE hudi_cow_nonpcf_tbl RENAME TO hudi_cow_nonpcf_tbl2; --add column: ALTER TABLE hudi_cow_nonpcf_tbl2 add columns(remark string); --change column: ALTER TABLE hudi_cow_nonpcf_tbl2 change column uuid uuid bigint; --set properties; alter table hudi_cow_nonpcf_tbl2 set tblproperties (hoodie.keep.max.commits = '10');
वर्तमान में, SHOW partitions
केवल फ़ाइल सिस्टम पर काम करता है, क्योंकि यह फ़ाइल सिस्टम तालिका पथ पर आधारित है।