visit
Knowing a little about a domain and technologies isn't going to make your life easier.
@GetMapping("/greeting/{firstName}/{lastName}")
public Mono<String> greeting(@PathVariable String firstName, @PathVariable String lastName) {
return Flux.fromIterable(Arrays.asList(firstName, lastName))
.filter(this::wasWorkingNiceBeforeRefactoring)
.transform(this::senselessTransformation)
.collect(Collectors.joining())
.map(names -> "Hello, " + names);
}
private boolean wasWorkingNiceBeforeRefactoring(String aName) {
// We don't want to greet with John, sorry
return !aName.equals("John");
}
private Flux<String> senselessTransformation(Flux<String> flux) {
return flux
.single()
.flux()
.subscribeOn(Schedulers.parallel());
}
> curl localhost:8080/greeting/John/Doe
> Hello, Doe
But when you run it like curl localhost:8080/greeting/Mick/Jagger
, you see the next stacktrace:
java.lang.IndexOutOfBoundsException: Source emitted more than one item
at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:134) ~[reactor-core-3.5.5.jar:3.5.5]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
*__checkpoint ⇢ Handler com.example.demo.controller.GreetingController#greeting(String, String) [DispatcherHandler]
*__checkpoint ⇢ HTTP GET "/greeting/Mick/Jagger" [ExceptionHandlingWebHandler]
Original Stack Trace: <18 internal lines>
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na] (4 internal lines)
All it reveals is that 1) it occurred in the GreetingController#greeting
method, and 2) the client performed an `HTTP GET "/greeting/Mick/Jagger
@GetMapping("/greeting/{firstName}/{lastName}")
public Mono<String> greeting(@PathVariable String firstName, @PathVariable String lastName) {
return Flux.fromIterable(Arrays.asList(firstName, lastName))
// <...>
.doOnError(e -> logger.error("Error while greeting", e));
}
doOnError
can/can’t be helpful during debugging:Logging: You can use doOnError
to log error messages and provide more context about what went wrong in your reactive stream. This can be especially helpful when debugging issues in a complex stream with many operators.
Recovery: doOnError
can also be used to recover from errors and continue processing the stream. For example, you can use onErrorResume
to provide a fallback value or stream in case of an error.
Debugging: most probably doOnError
won’t provide any better stacktrace except what you already saw in the logs. Don’t rely on it as a good troubleshooter.
Next stop is to replace previously added doOnError()
by log()
method call. As simple as it gets. log()
observes all Reactive Streams signals and traces them into logs under INFO level by default.
We can see what Reactive methods have been called (onSubscribe
, request
and onError
). Additionally, knowing which threads (pools) these methods have been called from can be very useful information. However, it is not relevant to our case.
About thread pools
Thread name ctor-http-nio-2
stands for reactor-http-nio-2
. Reactive methods onSubscribe()
and request()
were executed on IO thread pool (scheduler). These tasks were executed immediately on a thread that submitted them.
By having .subscribeOn(Schedulers.parallel())
inside senselessTransformation
we’ve instructed Reactor to subscribe further elements on another thread pool. That’s the reason why onError
has been executed on parallel-1
thread.
You can read more about thread pool in this article.
log()
method allows you to add logging statements to your stream, making it easier to track the flow of data and diagnose issues. If we were having more complex data flow with things like flatMap, subchains, blocking calls, etc., we would benefit much from having it all logged down. It’s a very easy and nice thing for daily use. However, we still don’t know the root cause.
Instruction Hooks.onOperatorDebug()
tells Reactor to enable debug mode for all operators in reactive streams, allowing for more detailed error messages and stack traces.
public Mono<String> greeting(@PathVariable String firstName, @PathVariable String lastName) {
Hooks.onOperatorDebug();
return // <...>
}
By adding Hooks.onOperatorDebug()
we can finally make progress in our investigation. Stacktrace is way more useful:
And on line 42 we have single()
call.
Don’t scroll up, the senselessTransformation
looks next:
private Flux<String> senselessTransformation(Flux<String> flux) {
return flux
.single() // line 42
.flux()
.subscribeOn(Schedulers.parallel());
}
single()
emits one item from Flux source or signals IndexOutOfBoundsException
for a source with more than one element. That means flux in the method emits more than 1 item. By going up in the call hierarchy we see that originally there is a Flux with two elements Flux.fromIterable(Arrays.asList(firstName, lastName))
.
Filtering method wasWorkingNiceBeforeRefactoring
removes an item from a flux when it equals to John. That’s the reason why the code works for a college named John. Huh.
Hooks.onOperatorDebug()
can be particularly useful when debugging complex reactive streams, as it provides more detailed information about how the stream is being processed. However, enabling debug mode can impact the performance of your application (due to the populated stack traces), so it should only be used during development and debugging, and not in production.
To achieve nearly the same effect as Hooks.onOperatorDebug()
gives with minimum performance impact, there is a special checkpoint()
operator. It will enable debug mode for that section of the stream, while leaving the rest of the stream unaffected.
public Mono<String> greeting(@PathVariable String firstName, @PathVariable String lastName) {
return Flux.fromIterable(Arrays.asList(firstName, lastName))
.filter(this::wasWorkingNiceBeforeRefactoring)
/* new */ .checkpoint("After filtering")
.transform(this::senselessTransformation)
/* new */ .checkpoint("After transformation")
.collect(Collectors.joining())
.map(names -> "Hello, " + names);
}
Take a look at the logs:
This checkpoints breakdown tells us that the error has been observed after our second checkpoint described as After transformation. It doesn’t mean that the first checkpoint hasn’t been reached during execution. It was, but the error started to appear only after the second one. That’s why we don’t see After filtering.
Besides description, you can force Reactor to generate a stacktrace for your checkpoint by adding true
as the second parameter to checkpoint()
method. It’s important to note that the generated stacktrace will lead you to the line with a checkpoint. It won’t populate a stacktrace for the original exception. So it doesn’t make a lot of sense because you can easily find a checkpoint by providing a description.