对许多 各个领域和技术水平详细了解丝毫并会让您的活动更方便。
@GetMapping("/greeting/{firstName}/{lastName}") public Mono<String> greeting(@PathVariable String firstName, @PathVariable String lastName) { return Flux.fromIterable(Arrays.asList(firstName, lastName)) .filter(this::wasWorkingNiceBeforeRefactoring) .transform(this::senselessTransformation) .collect(Collectors.joining()) .map(names -> "Hello, " + names); } private boolean wasWorkingNiceBeforeRefactoring(String aName) { // We don't want to greet with John, sorry return !aName.equals("John"); } private Flux<String> senselessTransformation(Flux<String> flux) { return flux .single() .flux() .subscribeOn(Schedulers.parallel()); }
> curl localhost:8080/greeting/John/Doe > Hello, Doe
但是当你像curl localhost:8080/greeting/Mick/Jagger
一样运行它时,你会看到下一个堆栈跟踪:
java.lang.IndexOutOfBoundsException: Source emitted more than one item at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:134) ~[reactor-core-3.5.5.jar:3.5.5] Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Error has been observed at the following site(s): *__checkpoint ⇢ Handler com.example.demo.controller.GreetingController#greeting(String, String) [DispatcherHandler] *__checkpoint ⇢ HTTP GET "/greeting/Mick/Jagger" [ExceptionHandlingWebHandler] Original Stack Trace: <18 internal lines> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na] (4 internal lines)
它所揭示的只是 1) 它发生在GreetingController#greeting
方法中,以及 2) 客户端执行了一个`HTTP GET "/greeting/Mick/Jagger
@GetMapping("/greeting/{firstName}/{lastName}") public Mono<String> greeting(@PathVariable String firstName, @PathVariable String lastName) { return Flux.fromIterable(Arrays.asList(firstName, lastName)) // <...> .doOnError(e -> logger.error("Error while greeting", e)); }
doOnError
在调试过程中可以/不能提供帮助的一些方法:日志记录:您可以使用doOnError
来记录错误消息并提供有关反应流中出错的更多上下文。这在调试具有许多运算符的复杂流中的问题时特别有用。
恢复: doOnError
也可用于从错误中恢复并继续处理流。例如,您可以使用onErrorResume
在出现错误时提供回退值或流。
调试: doOnError
很可能不会提供任何更好的堆栈跟踪,除了您已经在日志中看到的内容。不要依赖它作为一个好的故障排除程序。
下一站是用log()
方法调用替换之前添加的doOnError()
。越简单越好。默认情况下log()
会观察所有 Reactive Streams 信号并将它们跟踪到 INFO 级别的日志中。
我们可以看到调用了哪些 Reactive 方法( onSubscribe
、 request
和onError
)。此外,了解从哪些线程(池)中调用了这些方法可能是非常有用的信息。但是,它与我们的案例无关。
关于线程池
线程名称ctor-http-nio-2
代表reactor-http-nio-2
。响应式方法onSubscribe()
和request()
在 IO 线程池(调度程序)上执行。这些任务在提交它们的线程上立即执行。
通过在senselessTransformation
中使用.subscribeOn(Schedulers.parallel())
我们指示 Reactor 在另一个线程池上订阅更多元素。这就是为什么onError
在parallel-1
线程上执行的原因。
您可以在本文中阅读有关线程池的更多信息。
log()
方法允许您将日志记录语句添加到流中,从而更轻松地跟踪数据流和诊断问题。如果我们有更复杂的数据流,比如 flatMap、子链、阻塞调用等,我们将从将它们全部记录下来受益匪浅。对于日常使用来说,这是一件非常简单和美好的事情。但是,我们仍然不知道根本原因。
指令Hooks.onOperatorDebug()
告诉 Reactor 为反应流中的所有操作符启用调试模式,允许更详细的错误消息和堆栈跟踪。
public Mono<String> greeting(@PathVariable String firstName, @PathVariable String lastName) { Hooks.onOperatorDebug(); return // <...> }
通过添加Hooks.onOperatorDebug()
我们终于可以在调查中取得进展。 Stacktrace 更有用:
在第 42 行,我们有single()
调用。
不要向上滚动,接下来是senselessTransformation
:
private Flux<String> senselessTransformation(Flux<String> flux) { return flux .single() // line 42 .flux() .subscribeOn(Schedulers.parallel()); }
single()
从 Flux 源发出一项,或为具有多个元素的源发出IndexOutOfBoundsException
信号。这意味着该方法中的通量会发出不止一项。通过在调用层次结构中往上走,我们看到最初有一个包含两个元素的 Flux Flux.fromIterable(Arrays.asList(firstName, lastName))
。
过滤方法wasWorkingNiceBeforeRefactoring
在它等于John时从 flux 中删除一个项目。这就是代码适用于名为 John 的大学的原因。嗯。
Hooks.onOperatorDebug()
在调试复杂的反应流时特别有用,因为它提供了有关如何处理流的更多详细信息。但是,启用调试模式会影响应用程序的性能(由于填充的堆栈跟踪),因此它应该只在开发和调试期间使用,而不应在生产中使用。
为了以最小的性能影响实现与Hooks.onOperatorDebug()
几乎相同的效果,有一个特殊的checkpoint()
运算符。它将为流的该部分启用调试模式,同时不影响流的其余部分。
public Mono<String> greeting(@PathVariable String firstName, @PathVariable String lastName) { return Flux.fromIterable(Arrays.asList(firstName, lastName)) .filter(this::wasWorkingNiceBeforeRefactoring) /* new */ .checkpoint("After filtering") .transform(this::senselessTransformation) /* new */ .checkpoint("After transformation") .collect(Collectors.joining()) .map(names -> "Hello, " + names); }
看看日志:
这个检查点故障告诉我们,在我们描述为After transformation 的第二个检查点之后观察到了错误。这并不意味着在执行过程中没有到达第一个检查点。是的,但是错误仅在第二个之后才开始出现。这就是为什么我们看不到After filtering 的原因。
除了描述之外,您还可以通过将true
作为第二个参数添加到checkpoint()
方法来强制 Reactor 为您的检查点生成堆栈跟踪。请务必注意,生成的堆栈跟踪将引导您找到带有检查点的行。它不会为原始异常填充堆栈跟踪。所以它没有多大意义,因为您可以通过提供描述轻松找到检查点。