Spring WebFlux は、Java で最新のスケーラブルな Web アプリケーションを構築するためのリアクティブで非ブロッキングの Web フレームワークです。これは Spring Framework の一部であり、Reactor ライブラリを使用して Java でリアクティブ プログラミングを実装します。
「リアクティブ」という用語は、I/O イベントに反応するネットワーク コンポーネント、マウス イベントに反応する UI コントローラーなど、変化に反応するように構築されたプログラミング モデルを指します。その寓意で、ノンブロッキングはリアクティブです。なぜなら、ブロックされるのではなく、运行が结束したりデータが采取几率になったりしたときに通知模板に反応するモードになっているからです。
Spring WebFlux (および普通的なノンブロッキング サーバー) では、アプリケーションはブロックされないと想定されています。したがって、非ブロッキング サーバーは、小さな统一サイズのスレッド プール (イベント ループ ワーカー) を便用して请求を処理します。
WebFlux リクエストの処理は诺干異なりますが、次のようになります。
によって生成されたかなり最小限のアプリが必要です。コードはで入手できます。
すべてのスレッド関連のトピックは、CPU に大きく依存しています。通常、リクエストを処理する処理スレッドの数は、 CPU コアの数に関連しています。教育目的で、Docker コンテナーの実行時に CPU を制限することで、プール内のスレッド数を簡単に操作できます。
docker run --cpus=1 -d --rm --name webflux-threading -p 8081:8080 local/webflux-threading
私たちのアプリはシンプルな占い師です。 /karma
エンドポイントを呼び出すと、 balanceAdjustment
で 5 つのレコードが取得されます。各調整は、与えられたカルマを表す整数です。はい、アプリは正の数のみを生成するため、非常に寛大です。不運はもうありません!
@GetMapping("/karma") public Flux<Karma> karma() { return prepareKarma() .map(Karma::new) .log(); } private Flux<Integer> prepareKarma() { Random random = new Random(); return Flux.fromStream( Stream.generate(() -> random.nextInt(10)) .limit(5)); }
log
メソッドはここで重要です。すべての Reactive Streams シグナルを監視し、それらを INFO レベルのログにトレースします。
curl localhost:8081/karma
のログ出力は次のとおりです。
ご覧のとおり、IO スレッド プールで処理が行われています。スレッド名ctor-http-nio-2
reactor-http-nio-2
の略です。タスクは、それらを送信したスレッドですぐに実行されました。 Reactor は、別のプールでそれらをスケジュールするための指示を表示しませんでした。
@GetMapping("/delayedKarma") public Flux<Karma> delayedKarma() { return karma() .delayElements(Duration.ofMillis(100)); }
元のkarma()
呼び出しですでに宣言されているため、ここでlog
メソッドを追加する必要はありません。
今回は、最初の要素のみが IO スレッドのreactor-http-nio-4
で受信されました。残りの 4 つの処理は、 parallel
スレッド プール専用でした。
delayElements
の Javadoc はこれを確認します。
シグナルは遅延され、並列のデフォルト スケジューラで続行されます
呼び出しチェーンの任意の場所で.subscribeOn(Schedulers.parallel())
を指定することで、遅延なく同じ効果を得ることができます。
parallel
スケジューラを使用すると、複数のタスクを異なるスレッドで同時に実行できるため、パフォーマンスとスケーラビリティが向上します。これにより、CPU リソースをより有効に活用し、多数の同時要求を処理できます。
ただし、コードの複雑さとメモリ使用量が増加する可能性もあり、ワーカー スレッドの最大数を超えると、スレッド プールが枯渇する可能性があります。したがって、 parallel
スレッド プールを使用するかどうかは、アプリケーションの特定の要件とトレードオフに基づいて決定する必要があります。
flatMap
使用して、占い師をより一视同仁にします。 Karma インスタンスごとに、元の調整を 10 倍して反対の調整を生成し、元の調整を補うバランスのとれたトランザクションを効果的に作成します。
@GetMapping("/fairKarma") public Flux<Karma> fairKarma() { return delayedKarma() .flatMap(this::makeFair); } private Flux<Karma> makeFair(Karma original) { return Flux.just(new Karma(original.balanceAdjustment() * 10), new Karma(original.balanceAdjustment() * -10)) .subscribeOn(Schedulers.boundedElastic()) .log(); }
ご覧のとおり、 makeFair's
Flux は、 boundedElastic
スレッド プールにサブスクライブする必要があります。最初の 2 つのカルマのログを確認してみましょう。
Reactor は、IO スレッドでbalanceAdjustment=9
の最初の要素をサブスクライブします
次に、 boundedElastic
プールは、 boundedElastic-1
スレッドで90
および-90
調整を発行することにより、カルマの公平性に取り組みます
最初の要素の後の要素は、並列スレッド プールでサブスクライブされます (チェーンにまだdelayedElements
があるため)
boundedElastic
スケジューラとは?
デフォルトでは、 boundedElastic
スレッド プールの最大サイズは、使用可能なプロセッサの数に 10 をたものですが、必要に応じて別の最大サイズを使用するように構成できます。
boundedElastic
のような非同期スレッド プールを使用することで、タスクをオフロードしてスレッドを分離し、メイン スレッドを解放して他のリクエストを処理することができます。スレッド プールの制限された性質により、スレッドの枯渇や過度のリソース使用を防ぐことができます。また、プールの弾力性により、ワークロードに基づいてワーカー スレッドの数を動的に調整できます。
single
: これは、同期実行用に設計されたシングルスレッドのシリアル化された実行コンテキストです。タスクが順番に実行され、2 つのタスクが同時に実行されないようにする必要がある場合に便利です。
immediate
: これは、スレッドの切り替えなしで、呼び出し元のスレッドでタスクをすぐに実行するスケジューラーの簡単な no-op 実装です。