Saber un poco sobre un dominio y tecnologías no le hará la vida más fácil.
@GetMapping("/greeting/{firstName}/{lastName}") public Mono<String> greeting(@PathVariable String firstName, @PathVariable String lastName) { return Flux.fromIterable(Arrays.asList(firstName, lastName)) .filter(this::wasWorkingNiceBeforeRefactoring) .transform(this::senselessTransformation) .collect(Collectors.joining()) .map(names -> "Hello, " + names); } private boolean wasWorkingNiceBeforeRefactoring(String aName) { // We don't want to greet with John, sorry return !aName.equals("John"); } private Flux<String> senselessTransformation(Flux<String> flux) { return flux .single() .flux() .subscribeOn(Schedulers.parallel()); }
> curl localhost:8080/greeting/John/Doe > Hello, Doe
Pero cuando lo ejecutas como curl localhost:8080/greeting/Mick/Jagger
, ves el siguiente seguimiento de pila:
java.lang.IndexOutOfBoundsException: Source emitted more than one item at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:134) ~[reactor-core-3.5.5.jar:3.5.5] Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Error has been observed at the following site(s): *__checkpoint ⇢ Handler com.example.demo.controller.GreetingController#greeting(String, String) [DispatcherHandler] *__checkpoint ⇢ HTTP GET "/greeting/Mick/Jagger" [ExceptionHandlingWebHandler] Original Stack Trace: <18 internal lines> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na] (4 internal lines)
Todo lo que revela es que 1) ocurrió en el método GreetingController#greeting
, y 2) el cliente realizó un `HTTP GET "/saludo/Mick/Jagger
@GetMapping("/greeting/{firstName}/{lastName}") public Mono<String> greeting(@PathVariable String firstName, @PathVariable String lastName) { return Flux.fromIterable(Arrays.asList(firstName, lastName)) // <...> .doOnError(e -> logger.error("Error while greeting", e)); }
doOnError
puede/no puede ser útil durante la depuración: Registro : puede usar doOnError
para registrar mensajes de error y proporcionar más contexto sobre lo que salió mal en su transmisión reactiva. Esto puede ser especialmente útil al depurar problemas en un flujo complejo con muchos operadores.
Recuperación : doOnError
también se puede usar para recuperarse de errores y continuar procesando la transmisión. Por ejemplo, puede usar onErrorResume
para proporcionar un valor alternativo o una secuencia en caso de error.
Depuración : lo más probable es doOnError
no proporcione un seguimiento de pila mejor, excepto lo que ya vio en los registros. No confíe en él como un buen solucionador de problemas.
La próxima parada es reemplazar doOnError()
previamente agregado por la llamada al método log()
. Tan simple como se pone. log()
observa todas las señales de Reactive Streams y las rastrea en registros bajo el nivel INFO de forma predeterminada.
Podemos ver qué métodos Reactivos se han llamado ( onSubscribe
, request
y onError
). Además, saber desde qué subprocesos (grupos) se han llamado estos métodos puede ser información muy útil. Sin embargo, no es relevante para nuestro caso.
Acerca de los grupos de subprocesos
El nombre del subproceso ctor-http-nio-2
significa reactor-http-nio-2
. Los métodos reactivos onSubscribe()
y request()
se ejecutaron en el grupo de subprocesos de IO (programador). Estas tareas se ejecutaron inmediatamente en un subproceso que las envió.
Al tener .subscribeOn(Schedulers.parallel())
dentro de senselessTransformation
, le indicamos a Reactor que suscriba más elementos en otro grupo de subprocesos. Esa es la razón por la que onError
se ha ejecutado en el subproceso parallel-1
.
Puede leer más sobre el grupo de subprocesos en este artículo .
log()
le permite agregar declaraciones de registro a su transmisión, lo que facilita el seguimiento del flujo de datos y el diagnóstico de problemas. Si tuviéramos un flujo de datos más complejo con cosas como flatMap, subcadenas, bloqueo de llamadas, etc., nos beneficiaríamos mucho al tenerlo todo desconectado. Es una cosa muy fácil y agradable para el uso diario. Sin embargo, todavía no conocemos la causa raíz.
La instrucción Hooks.onOperatorDebug()
le dice a Reactor que habilite el modo de depuración para todos los operadores en flujos reactivos, lo que permite mensajes de error más detallados y seguimientos de pila.
public Mono<String> greeting(@PathVariable String firstName, @PathVariable String lastName) { Hooks.onOperatorDebug(); return // <...> }
Al agregar Hooks.onOperatorDebug()
finalmente podemos avanzar en nuestra investigación. Stacktrace es mucho más útil:
Y en la línea 42 tenemos una llamada single()
.
No se desplace hacia arriba, el senselessTransformation
se ve a continuación:
private Flux<String> senselessTransformation(Flux<String> flux) { return flux .single() // line 42 .flux() .subscribeOn(Schedulers.parallel()); }
single()
emite un elemento de la fuente Flux o señala IndexOutOfBoundsException
para una fuente con más de un elemento. Eso significa que el flujo en el método emite más de 1 elemento. Al subir en la jerarquía de llamadas vemos que originalmente hay un Flux con dos elementos Flux.fromIterable(Arrays.asList(firstName, lastName))
.
El método de filtrado wasWorkingNiceBeforeRefactoring
elimina un elemento de un flujo cuando es igual a John . Esa es la razón por la que el código funciona para una universidad llamada John. Eh.
Hooks.onOperatorDebug()
puede ser particularmente útil al depurar flujos reactivos complejos, ya que proporciona información más detallada sobre cómo se procesa el flujo. Sin embargo, habilitar el modo de depuración puede afectar el rendimiento de su aplicación (debido a los seguimientos de pila completados), por lo que solo debe usarse durante el desarrollo y la depuración, y no en producción.
Para lograr casi el mismo efecto que ofrece Hooks.onOperatorDebug()
con un impacto mínimo en el rendimiento, hay un operador especial checkpoint()
. Habilitará el modo de depuración para esa sección de la transmisión, mientras que el resto de la transmisión no se verá afectado.
public Mono<String> greeting(@PathVariable String firstName, @PathVariable String lastName) { return Flux.fromIterable(Arrays.asList(firstName, lastName)) .filter(this::wasWorkingNiceBeforeRefactoring) /* new */ .checkpoint("After filtering") .transform(this::senselessTransformation) /* new */ .checkpoint("After transformation") .collect(Collectors.joining()) .map(names -> "Hello, " + names); }
Echa un vistazo a los registros:
Este desglose de puntos de control nos dice que el error se ha observado después de nuestro segundo punto de control descrito como Después de la transformación . No significa que no se haya alcanzado el primer punto de control durante la ejecución. Lo fue, pero el error comenzó a aparecer solo después del segundo. Es por eso que no vemos Después de filtrar .
Además de la descripción, puede obligar a Reactor a generar un seguimiento de pila para su punto de control agregando true
como segundo parámetro al método checkpoint()
. Es importante tener en cuenta que el stacktrace generado lo llevará a la línea con un punto de control. No completará un seguimiento de pila para la excepción original. Por lo tanto, no tiene mucho sentido porque puede encontrar fácilmente un punto de control proporcionando una descripción.