paint-brush
.NET でイベント ストリーミング アプリケーションを構築する策略 に@bbejeck
2,850 測定値
2,850 測定値

.NET でイベント ストリーミング アプリケーションを構築する方法

Bill Bejeck14m2023/02/13
Read on Terminal Reader

長すぎる; 読むには

ストリーム処理は、イベントをアプリケーションの主要な入力または出力と見なすソフトウェア開発へのアプローチです。このブログ投稿では、Apache Kafka、.NET プロデューサーおよびコンシューマー クライアント、および Microsoft の Task Parallel Library (TPL) を使用して、イベント ストリーミング アプリケーションを構築します。 Kafka クライアントと TPL が面倒な作業のほとんどを処理します。ビジネス ロジックに集中するだけで済みます。
featured image - .NET でイベント ストリーミング アプリケーションを構築する方法
Bill Bejeck HackerNoon profile picture
0-item


立ち止まって日常工作的生活的について考えてみると、すべてを弄出来事として簡単に見ることができます。次のシーケンスを検討してください。


  1. あなたの車の「低燃料」インジケータが点灯します
  2. その結果、次のガソリンスタンドに立ち寄って給油します
  3. 車にガソリンを入れると、会社のリワードクラブに参加して割引を受けるように促されます
  4. 中に入ってサインアップすると、次の購入に使用できるクレジットを獲得できます


ここで延々と続けることもできますが、私は言いたいことを言いました。人生图片は一連の起来事です。その事実を考えると、近几日の新しいソフトウェア システムをどのように設計しますか?さまざまな結果を収集して多种の間隔で処理しますか、それとも 1 日の終わりまで待ってから処理しますか?いいえ、あなたはしません。各イベントが発生したらすぐに対応する用不着があります。確かに、個々の状況に応じてすぐに対応できない場合もあるかもしれません… 1 日分のトランザクションを一直にダンプすることを考えてみてください。それでも、データを受け取ったらすぐに行動するでしょう。


では、イベントを运营するためのソフトウェア システムをどのように実装すればよいでしょうか。答えはストリーム処理です。


ストリーム処理とは

イベント データを処理するためのデファクト テクノロジとなったストリーム処理は、イベントをアプリケーションの一般な入力または负荷率と見なすソフトウェア開発へのアプローチです。たとえば、情報に基づいて行動したり、不对なクレジット カード購入の将性がある場合に対応したりするのを待っていても含意がありません。また、マイクロサービスでレコードの受信フローを処理する必备がある場合もあり、それらを最も効率的に処理することがアプリケーションにとって最適です。ユース ケースが何であれ、イベント ストリーミング アプローチがイベントを処理するための最良のアプローチであると言っても過言ではありません。


このブログ发表文章では、Apache Kafka®、.NET プロデューサーおよびコンシューマー クライアント、および Microsoft のを利用して、イベント ストリーミング アプリケーションを構築します。一見すると、これら 3 つすべてを一緒に作業する将性が高い候補として自動的にまとめることはできないかもしれません。確かに、Kafka と .NET クライアントは素晴らしい組み合わせですが、TPL はどこに当てはまりますか?


多くの場合、スループットは关键性な要件であり、Kafka からの消費とダウンストリーム処理の間のインピーダンスの不同步によるボトルネックを逃避するために、機会があればいつでもプロセス内並列化をお勧めします。


3 つのコンポーネントがどのように連携して、堅牢で効率的なイベント ストリーミング アプリケーションを構築するかを読んでください。最良の一部分は、Kafka クライアントと TPL が面倒な作業のほとんどを処理することです。ビジネス ロジックに低效するだけで済みます。


アプリケーションに飛び込む前に、各コンポーネントについて簡単に説明しましょう。

アパッチ・カフカ

ストリーム処理がイベント ストリームを処理するための事実上の標準である場合、 イベント ストリーミング アプリケーションを構築するための事実上の標準です。 Apache Kafka は、特别にスケーラブルで弾力性があり、フォールト トレラントで可靠な措施で提拱了される疏散化ログです。簡単に言えば、Kafka はブローカー (サーバー) とクライアントを应用します。ブローカーは、データ センターまたはクラウド リージョンにまたがることができる Kafka クラスターの疏散化ストレージ レイヤーを确立します。クライアントは、ブローカー クラスターからイベント データを読み書きする機能を提拱了します。 Kafka クラスターはフォールト トレラントです。いずれかのブローカーに障害が発生した場合、他のブローカーが作業を引き継いで継続的な運用を保証します。

コンフルエントな .NET クライアント

前の片段で、クライアントが Kafka ブローカー クラスターに書き込みまたは読み取りを行うことを述べました。 Apache Kafka は Java クライアントにバンドルされていますが、このブログ記事のアプリケーションの学校にある .NET Kafka プロデューサーとコンシューマーなど、他のいくつかのクライアントも巧用できます。 .NET プロデューサーとコンシューマーは、Kafka によるイベント ストリーミングのパワーを .NET 開発者にもたらします。 .NET クライアントの詳細については、按照してください。

タスク並列ライブラリ

Task Parallel Library ( ) は、「System.Threading および System.Threading.Tasks 名前空間内のパブリック型と API のセット」であり、並行アプリケーションの制成作業を簡素化します。 TPL は、次の詳細を処理することで、同時実行の追加をより的管理しやすいタスクにします。


1. 作業の分开の処理 2. ThreadPool でのスレッドのスケジューリング 3. キャンセル、状態治理などの低レベルの詳細


つまり、TPL を应用すると、アプリケーションの処理パフォーマンスを极大化しながら、ビジネス ロジックに分布できるということです。具有的には、TPL のサブセットを应用します。


データフロー ライブラリは、インプロセス メッセージ パッシングとパイプライン タスクを可以にするアクター ベースのプログラミング モデルです。 Dataflow コンポーネントは、TPL の型とスケジューリング インフラストラクチャに基づいて構築され、C# 言語とシームレスに統合されます。一般 、Kafka からの読み取りは愈来愈に高速度ですが、一般 、処理 (DB 呼び出しまたは RPC 呼び出し) がボトルネックになります。順序付けの保証を犠牲にすることなく、より高いスループットを達成するために灵活运用できる並列化の機会は、検討する価値があります。


このブログ论文投稿では、これらの Dataflow コンポーネントを .NET Kafka クライアントと共に活用して、データが凭借会になったときにデータを処理するストリーム処理アプリケーションを構築します。

データフロー ブロック

制作するアプリケーションに入る前に、次のことを行います。 TPL データフロー ライブラリの構成成分に関する图片背景情報を供应する必备があります。ここで説明するアプローチは、高いスループットを必备とする CPU と I/O を分散的に选用するタスクがある場合に最も適しています。 TPL データフロー ライブラリは、着信データまたはレコードをバッファリングおよび処理できるブロックで構成され、ブロックは次の 3 つのカテゴリのいずれかに分類されます。


  1. ソース ブロック – データのソースとして機能し、他のブロックはそこから読み取ることができます。
  2. ターゲット ブロック – 他のブロックから書き込むことができるデータのレシーバーまたはシンク。
  3. Propagator ブロック – ソース ブロックとターゲット ブロックの両方として動作します。


さまざまなブロックを取り、それらを接続して、線形処理パイプラインまたは処理のより複雑なグラフを出现します。次の図を検討してください。



グラフの各ノードは、異なる処理または計算タスクを表します。



データフロー ライブラリには、バッファリング、実行、グループ化の 3 つのカテゴリに分類される定義済みのブロック タイプがいくつか含义されています。このブログ网上投稿用に開発されたプロジェクトでは、バッファリング タイプと実行タイプを利用しています。 BufferBlock<T> は、データをバッファリングする汎用構造体であり、プロデューサー/コンシューマー アプリケーションでの利用に最適です。 BufferBlock は、受信データの処理に先入れ先出しキューを利用します。


BufferBlock (およびそれを拡張するクラス) は、データフロー ライブラリでメッセージを直接読み書きできる唯一のブロック タイプです。他のタイプは、ブロックからメッセージを受信したり、ブロックにメッセージを送信したりすることを期待しています。このため、ソース ブロックを作成してISourceBlockインターフェイスを実装し、 ITargetBlockインターフェイスを実装するシンク ブロックを実装するときに、 BufferBlockをデリゲートとして使用しました。


このアプリケーションで使用されるもう 1 つの Dataflow ブロック タイプはです。データフロー ライブラリのほとんどのブロック タイプと同様に、変換ブロックが受け取る入力レコードごとに実行するデリゲートとして機能するFunc<TInput, TOutput>を提供することによって、TransformBlock のインスタンスを作成します。


Dataflow ブロックの 2 つの必要な機能は、バッファするレコードの数と並列処理のレベルを制御できることです。


非常大バッファー容积を設定することにより、アプリケーションが処理パイプラインのある時点で長時間の待機に遭到した場合、アプリケーションは自動的にバック プレッシャーを適用します。この背圧は、データの過剰蓄積を防ぐために不必要です。その後、問題が治まり、バッファのサイズが小さくなると、再びデータが消費されます。


ブロックの同時実行を設定する機能は、パフォーマンスにとって比较重要性です。 1 つのブロックが CPU または I/O 集中大型のタスクを実行する場合、処理を並列化してスループットを往上させる自然是な傾向があります。ただし、同時実行性を追加すると、処理順序という問題が発生する有必要条件性があります。ブロックのタスクにスレッドを追加すると、データの阻力順序を保証できなくなります。場合によっては順序が問題にならないこともありますが、比较重要性な場合は、考慮すべき刻骨铭心なトレードオフになります。つまり、同時実行性によるスループットの往上と順序阻力の処理です。幸いなことに、Dataflow ライブラリでは、このトレードオフを行う有必要はありません。


ブロックの並列処理を複数に設定すると、フレームワークは入力レコードの元の順序を維持することを保証します (並列処理による順序の維持は構成可能であり、デフォルト値は true です)。データの元の順序が A、B、C の場合、出力順序は A、B、C になります。懐疑的ですか?私は自分がそうだったことを知っているので、テストしたところ、宣伝どおりに機能することがわかりました.このテストについては、この記事の後半で説明します。並列処理の増加は、ステートレス操作またはassociative および commutativeであるステートフル操作でのみ行う必要があることに注意してください。つまり、操作の順序またはグループ化を変更しても結果には影響しません。


この時点で、これがどこに向かっているのかがわかります。将会な限り最速の手段で処理する必要性があるイベントを表す Kafka トピックがあります。そのため、.NET KafkaConsumer を含むソース ブロック、ビジネス ロジックを実行する処理ブロック、および最終結果を Kafka トピックに書き戻す .NET KafkaProducer を含むシンク ブロックで構成されるストリーミング アプリケーションを構築します。アプリケーションの内容提要図を次に示します。




アプリケーションの構造は次のとおりです。


  1. ソース ブロック: .NET KafkaConsumer とBufferBlockデリゲートのラップ
  2. 変換ブロック: 逆シリアル化
  3. 変換ブロック: 受信 JSON データを購入オブジェクトにマッピングする
  4. Transform ブロック: CPU を集中的に使用するタスク (シミュレート)
  5. 変換ブロック: シリアル化
  6. ターゲット ブロック: .NET KafkaProducer およびBufferBlockデリゲートのラップ


次に、アプリケーションの与会人员的なフローと、Kafka とデータフロー ライブラリを活用して強力なイベント ストリーミング アプリケーションを構築する際の重点なポイントについて説明します。


イベント ストリーミング アプリケーション

シナリオは次のとおりです。オンライン ストアから購入のレコードを受け取る Kafka トピックがあり、受信データ结构类型は JSON です。購入の詳細に ML 推論を適用して、これらの購入イベントを処理したいと考えています。さらに、JSON レコードを Protobuf 结构类型に変換したいと考えています。これは、全社的なデータ结构类型であるためです。もちろん、アプリケーションのスループットは难以欠です。 ML 工作は CPU を一起的に适用するため、アプリケーションのスループットを最大的化する方式 が重要です。そのため、アプリケーションのその的部分の並列化を巧用できます。


パイプラインへのデータの消費

ソース ブロックから始めて、ストリーミング アプリケーションの重要なポイントを見ていきましょう。 ISourceBlockインターフェイスの実装については前に説明しましたが、 BufferBlockISourceBlock実装しているため、すべてのインターフェイス メソッドを満たすデリゲートとして使用します。したがって、ソース ブロックの実装は KafkaConsumer と BufferBlock をラップします。ソース ブロック内には、コンシューマーが消費したレコードをバッファーに渡すことだけを担当する別のスレッドがあります。そこから、バッファはレコードをパイプラインの次のブロックに転送します。


レコードをバッファに転送する前に、 ConsumeRecord ( Consumer.consume呼び出しによって返される) は、キーと値に加えて、アプリケーションにとって重要な元のパーティションとオフセットをキャプチャするRecord抽象化によってラップされます。その理由はすぐに説明します。また、パイプライン全体がRecord抽象化で機能することにも注意してください。したがって、すべての変換により、キー、値、および元のオフセットのようなその他の重要なフィールドをラップする新しいRecordオブジェクトが生成され、パイプライン全体で保持されます。


処理ブロック

アプリケーションは、処理をいくつかの異なるブロックに切割します。各ブロックは処理チェーンの次のステップにリンクするため、ソース ブロックは、逆シリアル化を処理する最先のブロックにリンクします。 .NET KafkaConsumer はレコードの逆シリアル化を処理できますが、シリアル化されたペイロードをコンシューマーに渡し、Transform ブロックで逆シリアル化します。逆シリアル化は CPU を汇聚的に便用する或者性があるため、これを処理ブロックに手机配置すると、必要性に応じて工作を並列化できます。


逆シリアル化の後、レコードは、JSON ペイロードを Protobuf 行驶の Purchase データ モデル オブジェクトに変換する別の Transform ブロックに流れます。さらに興味深いのは、データが次のブロックに入るときです。これは、購入トランザクションを完完全全に了するために用得着な CPU 集中型机のタスクを表しています。アプリケーションはこの一些をシミュレートし、保证された関数は 1 ~ 3 秒のランダムな時間でスリープします。


このシミュレートされた処理ブロックは、Dataflow ブロック フレームワークの機能を活用する場所です。 Dataflow ブロックをインスタンス化するときは、検出された各レコードに適用されるデリゲート Func インスタンスと、 ExecutionDataflowBlockOptionsインスタンスを提供します。前に Dataflow ブロックの構成について説明しましたが、ここでもう一度簡単に確認します。 ExecutionDataflowBlockOptionsは、そのブロックの最大バッファー サイズと最大並列化度という 2 つの重要なプロパティが含まれています。


パイプライン内のすべてのブロックのバッファ サイズ構成を 10,000 レコードに設定しますが、シミュレートされた CPU 集大中小型を除いて、デフォルトの並列化レベル 1 を选用し、4 に設定します。デフォルトの Dataflow バッファ サイズは無制限。パフォーマンスへの影響については次のセクションで説明しますが、ここではアプリケーションの内容提要を説明します。


集中処理ブロックは、sink ブロックにフィードするシリアル化変換ブロックに転送されます。次に、.NET KafkaProducer がラップされ、最終結果が Kafka トピックに生成されます。シンク ブロックは、デリゲートBufferBlockと生成用の別のスレッドも使用します。スレッドは、バッファから次に利用可能なレコードを取得します。次に、 KafkaProducer.Produceメソッドを呼び出して、 DeliveryReportをラップするActionデリゲートを渡します。プロデュース リクエストが完了すると、プロデューサー I/O スレッドがActionデリゲートを実行します。


これで、アプリケーションの大まかなチュートリアルは完です。ここで、設定の最重要な有些であるコミット オフセットの処理方式について説明しましょう。これは、コンシューマーからレコードをパイプライン処理する場合に不能欠です。


オフセットのコミット

Kafka でデータを処理する場合、アプリケーションが所定のポイントまで正常に処理したレコードのオフセット (オフセットは Kafka トピック内のレコードの論理位置) を定期的にコミットします。では、なぜオフセットをコミットするのでしょうか?これは簡単な質問です。コンシューマーが制御された方法で、またはエラーによってシャットダウンすると、最後にコミットされた既知のオフセットから処理が再開されます。オフセットを定期的にコミットすることにより、コンシューマーはレコードを再処理しません。または、いくつかのレコードを処理した後、コミットするにアプリケーションがシャットダウンした場合に、少なくとも最小限の量を再処理しません。このアプローチは、少なくとも 1 回の処理として知られています。これは、レコードが少なくとも 1 回処理されることを保証し、エラーが発生した場合、それらの一部が再処理される可能性がありますが、代替手段がデータ損失のリスクがある場合、これは優れたオプションです。 Kafka は 1 回限りの処理保証も提供します。このブログ投稿ではトランザクションについては触れませんが、Kafka でのトランザクションの詳細については、次の記事を参照してください。.


オフセットをコミットする方式方法步骤はいくつかありますが、最も単純で最も大体的な方式方法步骤は自動コミット アプローチです。コンシューマーがレコードを読み取り、アプリケーションがそれらを処理します。 (レコードのタイムスタンプに基づいて) 構成几率な時間が経過すると、コンシューマーは既に消費されたレコードのオフセットをコミットします。通常情况下、自動コミットは合情合理なアプローチです。明显的な消費プロセス ループでは、前に消費したすべてのレコードを普通 に処理するまで、消費者に戻りません。予期しないエラーまたはシャットダウンが発生した場合、コードはコンシューマーに返されないため、コミットは発生しません。しかし、ここでのアプリケーションでは、パイプライン処理を行っています。消費されたレコードを确认してバッファにプッシュし、さらに消費するために戻ってきます。処理が成功的するのを待つ用不着はありません。


パイプライン アプローチでは、少なくとも 1 回の処理をどのように保証しますか?メソッドIConsumer.StoreOffset活用します。このメソッドは、単一のパラメーター ( TopicPartitionOffset ) を処理し、次のコミットのために (他のオフセットと共に) 保存します。自動コミットが Java API でどのように機能するかを対照的に示していることに注意してください。


そのため、コミット手順は次のように動作します。シンク ブロックが Kafka に生成二维码するレコードを有すると、アクション デリゲートにも展示されます。プロデューサーがコールバックを実行すると、元のオフセットがコンシューマー (ソース ブロック内の同じインスタンス) に渡され、コンシューマーは StoreOffset メソッドを应用します。コンシューマーに対して自動コミットを引き続き有効にしていますが、コンシューマーがこの時点までに消費した新的のオフセットをやみくもにコミットするのではなく、コミットするオフセットを展示しています。



オフセットのコミット


そのため、アプリケーションはパイプラインを采用しますが、ブローカーから ack を受信した後にのみコミットします。つまり、ブローカーとレプリカ ブローカーの最短セットがレコードを保存图片したことを代表します。このように動作すると、ブロックが作業を実行している間、コンシューマーが継続的にパイプラインを拿得してフィードできるため、アプリケーションの進行が速くなります。このアプローチが几率なのは、.NET コンシューマー クライアントがスレッド セーフであるため (有一部电影のメソッドはスレッド セーフではなく、そのように文書化されています)、単一のコンシューマーをソース ブロック スレッドとシンク ブロック スレッドの両方で防护に動作させることができます。


プロデュース段階でエラーが発生した場合、アプリケーションはエラーをログに記録し、レコードをネストされたBufferBlockに戻して、プロデューサがブローカへのレコードの送信を再試行するようにします。しかし、この再試行ロジックはやみくもに行われるため、実際には、より堅牢なソリューションが必要になるでしょう。

パフォーマンスへの影響

アプリケーションがどのように機能するかを説明したので、パフォーマンスの数値を見てみましょう。すべてのテストは macOS Big Sur (11.6) ラップトップでローカルに実行されたため、このシナリオではマイレージが異なる場合があります。パフォーマンス テストのセットアップは簡単です。


  1. JSON 样式の Kafka トピックに 100 万レコードを转成します。このステップは前期に行われており、テスト測定には含まれていません。
  2. Kafka Dataflow 対応アプリケーションを起動し、すべてのブロックの並列化を 1 (デフォルト) に設定します。
  3. アプリケーションは 100 万件のレコードを合适に処理するまで実行され、その後シャットダウンされます
  4. すべてのレコードを処理するのにかかった時間を記録する


2 番目のラウンドの真正唯一的の違いは、シミュレートされた CPU 集中形ブロックの MaxDegreeOfParallelism を 4 に設定したことです。結果は次のとおりです。


レコード数同時実行係数時間(分)
1M 1 38
1M 4 9


そのため、構成を設定するだけで、イベントの順序を維持しながらスループットを大面积的に积极乐观させることができました。したがって、並列処理の更大多少次を 4 に設定すると、4 倍上文の快速化が美景できます。しかし、このパフォーマンス积极乐观の非常重要な这部分は、正しく実行するのが難しい並行コードをまったく做成していないことです。


ブログ期刊投稿の前半で、Dataflow ブロックとの同時実行がイベントの順序を維持することを検証するテストについて言及したので、それについて説明しましょう。トライアルには、次の手順が含まれていました。


  1. Kafka トピックに 1M 整数 (0 ~ 999,999) を提取する
  2. リファレンス アプリケーションを整数型で動作するように変更する
  3. シミュレートされたリモート プロセス ブロックの同時実行レベル 1 でアプリケーションを実行します—Kafka トピックに转换します
  4. 同時実行レベル 4 でアプリケーションを再実行し、数値を別の Kafka トピックに添加します。
  5. プログラムを実行して、両方の結果トピックから整数を消費し、それらをメモリ内の配列に格納します
  6. 両方の配列を比較し、それらが同じ順序であることを確認します


このテストの結果、両方の配列に 0 から 999,999 までの順序で整数が含まれており、複数の並列処理レベルで Dataflow ブロックを动用すると、受信データの処理順序が維持されることが証明されました。 Dataflow 並列処理の詳細については、 を图案填充してください。

まとめ

この投稿は、.NET Kafka クライアントと Task Parallel Library を使用して、堅牢で高スループットのイベント ストリーミング アプリケーションを構築する方法を紹介しました。 Kafka は高パフォーマンスのイベント ストリーミングを提供し、Task Parallel Library は、すべての詳細を処理するためのバッファリングを備えた並行アプリケーションを作成するためのビルディング ブロックを提供し、開発者がビジネス ロジックに集中できるようにします。アプリケーションのシナリオは少し不自然ですが、うまくいけば、2 つのテクノロジを組み合わせることの有用性がわかります。試してみる-.




바카라사이트 바카라사이트 온라인바카라