ドメインとテクノロジーについて少し知っていても、生命が楽になるわけではありません。
@GetMapping("/greeting/{firstName}/{lastName}") public Mono<String> greeting(@PathVariable String firstName, @PathVariable String lastName) { return Flux.fromIterable(Arrays.asList(firstName, lastName)) .filter(this::wasWorkingNiceBeforeRefactoring) .transform(this::senselessTransformation) .collect(Collectors.joining()) .map(names -> "Hello, " + names); } private boolean wasWorkingNiceBeforeRefactoring(String aName) { // We don't want to greet with John, sorry return !aName.equals("John"); } private Flux<String> senselessTransformation(Flux<String> flux) { return flux .single() .flux() .subscribeOn(Schedulers.parallel()); }
> curl localhost:8080/greeting/John/Doe > Hello, Doe
しかし、 curl localhost:8080/greeting/Mick/Jagger
のように実行すると、次のスタックトレースが表示されます。
java.lang.IndexOutOfBoundsException: Source emitted more than one item at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:134) ~[reactor-core-3.5.5.jar:3.5.5] Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Error has been observed at the following site(s): *__checkpoint ⇢ Handler com.example.demo.controller.GreetingController#greeting(String, String) [DispatcherHandler] *__checkpoint ⇢ HTTP GET "/greeting/Mick/Jagger" [ExceptionHandlingWebHandler] Original Stack Trace: <18 internal lines> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na] (4 internal lines)
明らかになるのは、1) GreetingController#greeting
メソッドで発生し、2) クライアントが「HTTP GET "/greeting/Mick/Jagger を実行した」ということだけです。
@GetMapping("/greeting/{firstName}/{lastName}") public Mono<String> greeting(@PathVariable String firstName, @PathVariable String lastName) { return Flux.fromIterable(Arrays.asList(firstName, lastName)) // <...> .doOnError(e -> logger.error("Error while greeting", e)); }
doOnError
役立つ/役に立たないいくつかの方法を次に示します。ロギング: doOnError
を使用してエラー メッセージをログに記録し、リアクティブ ストリームで何が問題になったかについてより多くのコンテキストを提供できます。これは、多数のオペレーターを含む複雑なストリームで問題をデバッグする場合に特に役立ちます。
回復: doOnError
使用して、エラーから回復し、ストリームの処理を続行することもできます。たとえば、 onErrorResume
使用して、エラーの場合にフォールバック値またはストリームを提供できます。
デバッグ: ほとんどの場合、 doOnError
ログで既に見たものを除いて、より良いスタック トレースを提供しません。これを優れたトラブルシューティング ツールとして信頼しないでください。
次のステップは、以前に追加したdoOnError()
log()
メソッド呼び出しに置き換えることです。それが得るのと同じくらい簡単です。 log()
は、すべての Reactive Streams シグナルを監視し、デフォルトで INFO レベルのログにトレースします。
どの Reactive メソッドが呼び出されたかを確認できます ( onSubscribe
、 request
、およびonError
)。さらに、これらのメソッドがどのスレッド (プール) から呼び出されたかを知ることは、非常に役立つ情報です。ただし、それは私たちの場合には関係ありません。
スレッドプールについて
スレッド名ctor-http-nio-2
reactor-http-nio-2
の略です。リアクティブ メソッドonSubscribe()
およびrequest()
IO スレッド プール (スケジューラ) で実行されました。これらのタスクは、それらを送信したスレッドですぐに実行されました。
senselessTransformation
内に.subscribeOn(Schedulers.parallel())
を持つことで、Reactor に別のスレッド プールの追加の要素をサブスクライブするよう指示しました。これが、 parallel-1
スレッドでonError
が実行された理由です。
スレッド プールの詳細については、この記事を参照してください。
log()
メソッドを使用すると、ロギング ステートメントをストリームに追加できるため、データの流れの追跡と問題の診断が容易になります。 flatMap、サブチェーン、ブロッキング呼び出しなどでより複雑なデータ フローを使用している場合は、すべてをログに記録することで多くのメリットが得られます。毎日使うのにとても簡単で素敵なものです。しかし、根本的な原因はまだわかっていません。
命令Hooks.onOperatorDebug()
は、Reactor に、リアクティブ ストリーム内のすべてのオペレーターに対してデバッグ モードを有効にするよう指示し、より詳細なエラー メッセージとスタック トレースを可能にします。
public Mono<String> greeting(@PathVariable String firstName, @PathVariable String lastName) { Hooks.onOperatorDebug(); return // <...> }
Hooks.onOperatorDebug()
を追加することで、最終的に調査を進めることができます。 Stacktrace の方がはるかに便利です。
42 行目には、 single()
呼び出しがあります。
上にスクロールしないでください。senselessTransformation senselessTransformation
次のようになります。
private Flux<String> senselessTransformation(Flux<String> flux) { return flux .single() // line 42 .flux() .subscribeOn(Schedulers.parallel()); }
single()
は、Flux ソースから 1 つの項目を発行するか、複数の要素を持つソースに対してIndexOutOfBoundsException
を通知します。これは、メソッド内のフラックスが複数のアイテムを発行することを意味します。呼び出し階層を上に行くと、もともとFlux.fromIterable(Arrays.asList(firstName, lastName))
2 つの要素を持つ Flux があることがわかります。
フィルタリング メソッドwasWorkingNiceBeforeRefactoring
項目がJohnと等しい場合にフラックスから項目を削除します。これが、John という名前の大学でコードが機能する理由です。は。
Hooks.onOperatorDebug()
、ストリームがどのように処理されているかについてより詳細な情報を提供するため、複雑なリアクティブ ストリームをデバッグするときに特に役立ちます。ただし、デバッグ モードを有効にすると、アプリケーションのパフォーマンスに影響を与える可能性があるため (スタック トレースが読み込まれるため)、開発およびデバッグ中にのみ使用し、運用環境では使用しないでください。
Hooks.onOperatorDebug()
とほぼ同じ効果を最小限のパフォーマンスへの影響で達成するために、特別なcheckpoint()
演算子があります。ストリームのそのセクションのデバッグ モードを有効にしますが、ストリームの残りの部分は影響を受けません。
public Mono<String> greeting(@PathVariable String firstName, @PathVariable String lastName) { return Flux.fromIterable(Arrays.asList(firstName, lastName)) .filter(this::wasWorkingNiceBeforeRefactoring) /* new */ .checkpoint("After filtering") .transform(this::senselessTransformation) /* new */ .checkpoint("After transformation") .collect(Collectors.joining()) .map(names -> "Hello, " + names); }
ログを見てください:
このチェックポイントの内訳は、変換後として説明されている 2 番目のチェックポイントの後にエラーが観察されたことを示しています。実行中に最初のチェックポイントに到達していないという意味ではありません。だったのですが、エラーが出るようになったのは2回目以降です。そのため、 After filtering は表示されません。
説明の他に、 checkpoint()
メソッドの 2 番目のパラメーターとしてtrue
を追加することで、Reactor にチェックポイントのスタック トレースを生成させることができます。生成されたスタック トレースによって、チェックポイントのある行が表示されることに注意してください。元の例外のスタックトレースは作成されません。説明を提供することでチェックポイントを簡単に見つけることができるため、あまり意味がありません。