paint-brush
Apache Iceberg と MinIO を运用したデータ レイクハウスの作为 に@minio
7,696 測定値
7,696 測定値

Apache Iceberg と MinIO を使用したデータ レイクハウスの作成

MinIO12m2023/09/14
Read on Terminal Reader

長すぎる; 読むには

データ レイクハウスの有望な点は、Apache Iceberg と MinIO を使用した集中ソリューションで構造化データと非構造化データをすべて処理できることにあります。
featured image - Apache Iceberg と MinIO を使用したデータ レイクハウスの作成
MinIO HackerNoon profile picture
0-item
1-item
2-item
前回の网上投稿では、Apache Iceberg の内容梗概を説明し、Apache Iceberg がストレージに MinIO を适用する工艺を示しました。開発マシンのセットアップ工艺も示しました。これを行うために、Docker Compose を适用して、処理エンジンとして Apache Spark コンテナー、REST カタログ、ストレージ用の MinIO をインストールしました。最後に、Apache Spark を适用してデータを取り込み、PyIceberg を适用してデータをクエリする异常に単純な例で締めくくりました。 Apache Iceberg を初めて适用する場合、または開発マシンに Apache Iceberg をセットアップする用不着がある場合は、このをお読みください。


この文章发表では、前回の文章发表の続きから、通常情况的なビッグ データの問題、つまり、生データ、非構造化データ、および構造化データ (生のデータから厳選されたデータ) のためのストレージを保证数据する単一のソリューションの一定要性について調査します。データ)。さらに、同じソリューションは、厳選されたデータに対する効率的なレポート作为を也许 にする処理エンジンを保证数据する一定要があります。これは、データ レイクハウスの約束です。構造化データに対するデータ ウェアハウスの機能と非構造化データに対するデータ レイクの機能をすべて2元化されたソリューションで保证数据します。


ビッグデータのシナリオをさらに詳しく見てみましょう。

よくある問題

下例の図は、一般的的な問題と仮説的な解決策を示しています。データは複数の場所から複数の主要结构类型でデータセンターに供应量します。有有有必不可少条件的なのは、処理エンジンがビジネス インテリジェンス、データ剖析、機械学習を効率的にサポートできるように生データを変換できる一块钱化されたソリューションです。同時に、このソリューションは、データ探索性と機械学習のために非構造化データ (テキスト、画像图片、オーディオ、ビデオ) を同步保存できる有有有必不可少条件的があります。また、変換を再実行する有有有必不可少条件的がある場合や、データの融合性の問題を調査する有有有必不可少条件的がある場合に備えて、変換されたデータを元の主要结构类型で保持良好する有有有必不可少条件的があります。


構造化データ


详细的な例として、顧客のために投資信託を管理工作している社会的な储存銀行を像してください。各顧客の各ファンドの会計帳簿と投資記録簿を表すデータが、社会中の地域分布から絶えずデータ レイクハウスにストリーミングされます。そこから、应急な通過チェック (送信されたものはすべて受信されたかどうか) を行う用不着的があり、データ品質チェックを実行する用不着的があります。最後に、データを切割成して、一天の始まりと終わりのレポートをサポートする別のストアにロードすることができます。


あるいは、この図は、気象観測所が気温やその他の気象関連データを送信する IOT シナリオを表している很有可能があります。どのようなシナリオであっても、用得着なのは、データを元の样式で人身安全に另存し、より構造化された具体方案で另存する用得着があるデータをすべて 1 つの多ソリューションで変換および処理する具体方案です。これがデータ レイクハウスの約束です。データ ウェアハウスとデータ レイクの長所を 1 つの多ソリューションに組み合わせたものです。


上記の仮説的な解決策を現実にしてみましょう。これを下の図に示します。


データ レイクハウスの作成


データ レイクハウスには 2 つの論理コンポーネントがあります。 1 つ目は、データ ウェアハウスに相等于する構造化データ用の Apache Iceberg の実装です。 (これは、で構築したものです。そのため、ここでは詳しく説明しません。) 2 番目の論理コンポーネントは、非構造化データ用の MinIO (データ レイクハウスのデータ レイク側) です。 Lakehouse に入るすべてのデータは、MinIO のこの論理インスタンスに配信されます。実際には、上に示した MinIO の 2 つの論理インスタンスは、データ センター内の MinIO の同じインスタンスである有几率があります。 MinIO を実行しているクラスターがすべての受信データの取り込みと Apache Iceberg の処理要件を処理できる場合、そのようなデプロイメントはコストを節約します。実際、これがこの投稿格式で行うことです。 Apache Iceberg の MinIO インスタンス内のバケットを在使用して、すべての非構造化データと生データを保持稳定します。


この演習で食用するデータセットを導入し、それを MinIO に取り込むことで、データの操作步骤を開始しましょう。

その日のグローバルサマリーデータセット

この网上投稿で実験するデータセットは、Global Surface Summary of the Day (GSOD) として知られる公開データセットで、米国海洋资源大気局 (NOAA) によって监管されています。 NOAA は現在、地球中の 9000 往上の観測点からのデータを維持しており、GSOD データセットにはこれらの観測点からの 1 日あたりの慨括情報が含まれています。データはダウンロードできます。 gzip ファイルは 1 年に 1 つあります。 1929 年に始まり、2022 年に終わります (この記事の執筆時点)。データ レイクハウスを構築するために、毎年ファイルをダウンロードし、データ レイクハウスに适用されている MinIO インスタンスに手机配置しました。すべてのファイルを「lake」という名前のバケットに置きます。MinIO のインスタンス内の 2 つのバケットを下面的に示します。 「warehouse」バケットは、Apache Iceberg をインストールしたときに弄成されました。


日別データセット



MinIO コンソールを用到して生データを手動で取り込みました。プロフェッショナルなパイプラインでは、これを自動化された方案で実行する必要条件があります。 MinIO にデータをストリーミングする方案」を参考してください。


これらのファイルは、ダウンロードの利便性を考慮してパッケージ化されています。これらのファイルを简单用到してレポートやグラフを作为しようとすると、十分的に IO を一起的に用到する操作的 (CPU を一起的に用到する概率性があります) になります。同一した測点からの年間平均的気温をグラフにしたいと想象作文してください。これを行うには、すべてのファイルを開いてすべての行を検索し、対象の日にステーションに保持一致するエントリを探す一定があります。より良いオプションは、データをキュレーションし、キュレーションされたデータに関するレポートを作为するために Data Lakehouse 機能を用到することです。一开始のステップは、新しい Jupyter ノートブックをセットアップすることです。

Jupyter ノートブックをセットアップする

まず、Apache Spark 処理エンジンにインストールされている Jupyter Notebook サーバーに移動します。これはにあります。新しいノートブックを弄成し、最原始のセルに左右に示すインポートを追加します。 (この文章投稿で弄成した进行したノートブックはすべて、あります。)


 from collections import namedtuple import csv import json import logging import tarfile from time import time from typing import List from minio import Minio from minio.error import S3Error import pandas as pd import pyarrow as pa import pyarrow.parquet as pq pd.options.mode.chained_assignment = None bucket_name = 'lake'


MinIO ライブラリをインポートしていることに小心してください。私たちが構築しているノートブックは、非構造化ストレージ (MinIO Data Lake) から構造化ストレージ (企业内部で MinIO を应用する Apache Iceberg) への ETL パイプラインです。ノートブックの開始有些は次のようになります。


MinIO ライブラリのインポート


これで、Iceberg データベースとデータ用のテーブルを制作できます。

Iceberg データベースとテーブルの作成

GSOD データセットのデータベースとテーブルの弄成は簡単です。接下来のスクリプトは、「noaa」という名前のデータベースを弄成します。これをインポート後のセルに追加します。


 %%sql CREATE DATABASE IF NOT EXISTS noaa;


以下のスクリプトはgsodテーブルを作成します。

 %%sql CREATE TABLE IF NOT EXISTS noaa.gsod ( station string, date timestamp, latitude double, longitude double, name string, temp double ) USING iceberg PARTITIONED BY (station)


Apache Iceberg を使って遊んでいると、実験をやり直すためにテーブルを削除したくなることがよくあります。以下のスクリプトは、 gsodテーブルの設定を変更したい場合にそのテーブルを削除します。


 %%sql DROP TABLE IF EXISTS noaa.gsod;

MinIO から Iceberg へのデータの取り込み

これで、生の年ベースの zip ファイルが Lakehouse に存储されたので、それらを吸出、変換し、Data Lakehouse にロードできます。初始にいくつかのヘルパー関数を紹介しましょう。这の関数は、プレフィックスに相同する任意されたバケット内の MinIO オブジェクトのリストを返します。


 def get_object_list(bucket_name: str, prefix: str) -> List[str]: ''' Gets a list of objects from a bucket. ''' logger = logging.getLogger('gsod_logger') logger.setLevel(logging.INFO) # Load the credentials and connection information. with open('credentials.json') as f: credentials = json.load(f) # Get data of an object. try: # Create client with access and secret key client = Minio(credentials['url'], # host.docker.internal credentials['accessKey'], credentials['secretKey'], secure=False) object_list = [] objects = client.list_objects(bucket_name, prefix=prefix, recursive=True) for obj in objects: object_list.append(obj.object_name) except S3Error as s3_err: logger.error(f'S3 Error occurred: {s3_err}.') raise s3_err except Exception as err: logger.error(f'Error occurred: {err}.') raise err return object_list


上記のコードでは、MinIO 認証情報ファイルが必要性であることに关注してください。これは MinIO コンソールから具有できます。 MinIO 認証情報の具有的办法がわからない場合は、こののセクションで認証情報を转换成してダウンロードする的办法を説明します。


次に、MinIO からオブジェクトを作为する関数が必要的性です。オブジェクトは tar ファイルであるため、tar アーカイブからデータを挤出し、Pandas DataFrame に変換するためにこの関数も必要的性です。これは以内の関数を选用して行われます。


 def tar_to_df(bucket_name: str, object_name: str) -> pd.DataFrame: ''' This function will take a tarfile reference in MinIO and do the following: - unzip the tarfile - turn the data into a single DataFrame object ''' logger = logging.getLogger('gsod_logger') logger.setLevel(logging.INFO) # Temp file to use for processing the tar files. temp_file_name = 'temp.tar.gz' # Load the credentials and connection information. with open('credentials.json') as f: credentials = json.load(f) # Get data of an object. try: # Create client with access and secret key client = Minio(credentials['url'], # host.docker.internal credentials['accessKey'], credentials['secretKey'], secure=False) object_info = client.fget_object(bucket_name, object_name, temp_file_name) Row = namedtuple('Row', ('station', 'date', 'latitude', 'longitude', 'elevation', 'name', 'temp', 'temp_attributes', 'dewp', 'dewp_attributes', 'slp', 'SLP_attributes', 'stp', 'stp_attributes', 'visib', 'visib_attributes', 'wdsp', 'wdsp_attributes', 'mxspd', 'gust', 'max', 'max_attributes', 'min', 'min_attributes', 'prcp', 'prcp_attributes', 'sndp', 'frshtt')) # Columns of interest and their data types. dtypes={ 'station': 'string', 'date': 'datetime64[ns]', 'latitude': 'float64', 'longitude': 'float64', 'name': 'string', 'temp': 'float64' } tar = tarfile.open(temp_file_name, 'r:gz') all_rows = [] for member in tar.getmembers(): member_handle = tar.extractfile(member) byte_data = member_handle.read() decoded_string = byte_data.decode() lines = decoded_string.splitlines() reader = csv.reader(lines, delimiter=',') # Get all the rows in the member. Skip the header. _ = next(reader) file_rows = [Row(*l) for l in reader] all_rows += file_rows df = pd.DataFrame.from_records(all_rows, columns=Row._fields) df = df[list(dtypes.keys())] for c in df.columns: if dtypes[c] == 'float64': df[c] = pd.to_numeric(df[c], errors='coerce') df = df.astype(dtype=dtypes) except S3Error as s3_err: logger.error(f'S3 Error occurred: {s3_err}.') raise s3_err except Exception as err: logger.error(f'Error occurred: {err}.') raise err return df


これらの関数は両方とも、MinIO で何をしているかに関係なく再根据できる汎用ユーティリティです。これらをコード スニペットの個人コレクションまたは組織の Github Gist に入れることを検討してください。


これで、Lakehouse の倉庫側にデータを送信する準備が整いました。これは、Spark セッションを開始し、すべての GSOD tar ファイルをループし、抽取、変換して、Iceberg テーブルに送信する以上のコードで実行できます。


 from pyspark.sql import SparkSession spark = SparkSession.builder.appName('Jupyter').getOrCreate() objects = get_object_list(bucket_name, 'noaa/gsod') for obj in reversed(objects): print(obj) df = tar_to_df(bucket_name, obj) table = pa.Table.from_pandas(df) pq.write_table(table, 'temp.parquet') df = spark.read.parquet('temp.parquet') df.write.mode('append').saveAsTable('noaa.gsod')


このセクションのコードは、MinIO バケットからデータを手動でロードしました。運用環境では、このコードをサービスにデプロイし、自動取り込みにを用することができます。

PyIceberg を使用した Iceberg Data Lakehouse のクエリ

レポート用に新しいノートブックを始めましょう。下のセルは、必要性なユーティリティをインポートします。到底的には、データの完成には PyIceberg、データのラングリングには Pandas、データの視覚化には Seaborn を用します。


 from pyiceberg.catalog import load_catalog from pyiceberg.expressions import GreaterThanOrEqual, EqualTo import pandas as pd import seaborn as sns pd.options.mode.chained_assignment = None catalog = load_catalog('default')


私たちがやりたいのは、单一の気象観測所の年間月均気温を計算することです。これにより、1 年ごとに 1 つの数値が得られ、1 年のすべての季節が考慮されます。最开始のステップは、单一のステーションのすべてのデータについて Iceberg にクエリを実行することです。これは、以內で PyIceberg を食用して行われます。


 tbl = catalog.load_table('noaa.gsod') sc = tbl.scan(row_filter="station == '72502014734'") df = sc.to_arrow().to_pandas() df.head(10)


上記のコードで选择されているステーション ID は、米国ニュージャージー州のニューアーク リバティー国際空港にあるステーションのものです。 1973 年から運用されています (約 50 年間のデータ)。コードを実行すると、以内の汽耗率が得られます。 (サンプルを拿得するために DataFrame head() 関数を选择しています。)


出力


次に、年ごとにグループ化し、均值を計算する必需があります。 Pandas を采用すると、これは数行のコードになります。ループ処理は必需ありません。


 df['year'] = df['date'].dt.year df = df[['year','temp']] grouped_by_year = df.groupby('year') average_by_year = grouped_by_year.mean() average_by_year


このセルを実行すると、年ごとに 1 つの値が表明されます。上数字年を下列に示します。


年ごとにグループ化する


最後に、年間平均的を視覚化できます。 Seaborn を安全使用してライン プロットを制作します。これにはたった 1 行のコードが必须です。


 sns.lineplot(data=df, x="year", y="temp", errorbar=None)


折れ線グラフを下类に示します。


折れ線グラフ


レポートを初めて実行した後に必ず実行する有必要があるもう 1 つのコマンドを以上に示します。


 [task.file.file_path for task in sc.plan_files()]


これは、クエリに共同するデータを持つ Apache Iceberg 内のすべてのデータ ファイルのリストを提高するリスト内包表記です。 Iceberg のメタデータは多くのものを排除できますが、多くのものがあるでしょう。関連するすべてのファイルを見ると、极速オブジェクト ストレージが Lakehouse の至关重要な要素であるという事実がわかります。

まとめ

この论文投稿では、MinIO と Apache Iceberg を食用してデータ レイクハウスを構築しました。これは GSOD データセットを食用して行いました。まず、生データが Data Lakehouse (MinIO) の Lake 側にアップロードされました。そこから、Apache Iceberg (データ レイクハウスのデータ ウェアハウス側) にデータベースとテーブルを制成しました。次に、データをレイクからデータ レイクハウス内のウェアハウスに移動するためのシンプルな ETL パイプラインを構築しました。


Apache Iceberg にデータを完全性に入力すると、年間平均的気温レポートを作为して視覚化できるようになりました。


運用環境でデータ レイクハウスを構築する場合は、MinIO のエンタープライズ機能が必要性になることに提前准备してください。 、、 、およびを検討することを検討してください。


でも公開されています。


바카라사이트 바카라사이트 온라인바카라