Connaître un peu un domaine et des technologies ne va pas vous faciliter la vie.
@GetMapping("/greeting/{firstName}/{lastName}") public Mono<String> greeting(@PathVariable String firstName, @PathVariable String lastName) { return Flux.fromIterable(Arrays.asList(firstName, lastName)) .filter(this::wasWorkingNiceBeforeRefactoring) .transform(this::senselessTransformation) .collect(Collectors.joining()) .map(names -> "Hello, " + names); } private boolean wasWorkingNiceBeforeRefactoring(String aName) { // We don't want to greet with John, sorry return !aName.equals("John"); } private Flux<String> senselessTransformation(Flux<String> flux) { return flux .single() .flux() .subscribeOn(Schedulers.parallel()); }
> curl localhost:8080/greeting/John/Doe > Hello, Doe
Mais lorsque vous l'exécutez comme curl localhost:8080/greeting/Mick/Jagger
, vous voyez le stacktrace suivant :
java.lang.IndexOutOfBoundsException: Source emitted more than one item at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:134) ~[reactor-core-3.5.5.jar:3.5.5] Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Error has been observed at the following site(s): *__checkpoint ⇢ Handler com.example.demo.controller.GreetingController#greeting(String, String) [DispatcherHandler] *__checkpoint ⇢ HTTP GET "/greeting/Mick/Jagger" [ExceptionHandlingWebHandler] Original Stack Trace: <18 internal lines> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na] (4 internal lines)
Tout ce qu'il révèle, c'est que 1) cela s'est produit dans la méthode GreetingController#greeting
, et 2) le client a effectué un `HTTP GET "/greeting/Mick/Jagger
@GetMapping("/greeting/{firstName}/{lastName}") public Mono<String> greeting(@PathVariable String firstName, @PathVariable String lastName) { return Flux.fromIterable(Arrays.asList(firstName, lastName)) // <...> .doOnError(e -> logger.error("Error while greeting", e)); }
doOnError
peut/ne peut pas être utile pendant le débogage : Journalisation : vous pouvez utiliser doOnError
pour consigner les messages d'erreur et fournir plus de contexte sur ce qui s'est mal passé dans votre flux réactif. Cela peut être particulièrement utile lors du débogage de problèmes dans un flux complexe avec de nombreux opérateurs.
Récupération : doOnError
peut également être utilisé pour récupérer des erreurs et continuer à traiter le flux. Par exemple, vous pouvez utiliser onErrorResume
pour fournir une valeur ou un flux de secours en cas d'erreur.
Débogage : très probablement doOnError
ne fournira pas de meilleur stacktrace à l'exception de ce que vous avez déjà vu dans les journaux. Ne vous y fiez pas comme un bon dépanneur.
Le prochain arrêt consiste à remplacer doOnError()
précédemment ajouté par l'appel de méthode log()
. Aussi simple que possible. log()
observe tous les signaux Reactive Streams et les trace dans les journaux sous le niveau INFO par défaut.
Nous pouvons voir quelles méthodes réactives ont été appelées ( onSubscribe
, request
et onError
). De plus, savoir à partir de quels threads (pools) ces méthodes ont été appelées peut être une information très utile. Cependant, il n'est pas pertinent pour notre cas.
À propos des pools de threads
Le nom du fil ctor-http-nio-2
signifie reactor-http-nio-2
. Les méthodes réactives onSubscribe()
et request()
ont été exécutées sur le pool de threads IO (planificateur). Ces tâches ont été exécutées immédiatement sur un thread qui les a soumises.
En ayant .subscribeOn(Schedulers.parallel())
dans senselessTransformation
, nous avons demandé à Reactor de souscrire d'autres éléments sur un autre pool de threads. C'est la raison pour laquelle onError
a été exécuté sur le thread parallel-1
.
Vous pouvez en savoir plus sur le pool de threads dans cet article .
La méthode log()
vous permet d'ajouter des instructions de journalisation à votre flux, ce qui facilite le suivi du flux de données et le diagnostic des problèmes. Si nous avions un flux de données plus complexe avec des éléments tels que flatMap, des sous-chaînes, des appels bloquants, etc., nous aurions beaucoup à gagner à ce que tout soit enregistré. C'est une chose très facile et agréable pour un usage quotidien. Cependant, nous ne connaissons toujours pas la cause première.
L'instruction Hooks.onOperatorDebug()
indique à Reactor d'activer le mode débogage pour tous les opérateurs dans les flux réactifs, permettant des messages d'erreur plus détaillés et des traces de pile.
public Mono<String> greeting(@PathVariable String firstName, @PathVariable String lastName) { Hooks.onOperatorDebug(); return // <...> }
En ajoutant Hooks.onOperatorDebug()
nous pouvons enfin progresser dans notre enquête. Stacktrace est bien plus utile :
Et à la ligne 42, nous avons un appel single()
.
Ne faites pas défiler vers le haut, le senselessTransformation
regarde ensuite :
private Flux<String> senselessTransformation(Flux<String> flux) { return flux .single() // line 42 .flux() .subscribeOn(Schedulers.parallel()); }
single()
émet un élément de la source Flux ou signale IndexOutOfBoundsException
pour une source avec plus d'un élément. Cela signifie que le flux dans la méthode émet plus d'un élément. En remontant dans la hiérarchie des appels on voit qu'à l'origine il y a un Flux à deux éléments Flux.fromIterable(Arrays.asList(firstName, lastName))
.
La méthode de filtrage wasWorkingNiceBeforeRefactoring
supprime un élément d'un flux lorsqu'il est égal à John . C'est la raison pour laquelle le code fonctionne pour un collège nommé John. Hein.
Hooks.onOperatorDebug()
peut être particulièrement utile lors du débogage de flux réactifs complexes, car il fournit des informations plus détaillées sur la manière dont le flux est traité. Cependant, l'activation du mode débogage peut avoir un impact sur les performances de votre application (en raison des traces de pile remplies), il ne doit donc être utilisé que pendant le développement et le débogage, et non en production.
Pour obtenir presque le même effet que Hooks.onOperatorDebug()
donne avec un impact minimal sur les performances, il existe un opérateur spécial checkpoint()
. Cela activera le mode débogage pour cette section du flux, tout en laissant le reste du flux inchangé.
public Mono<String> greeting(@PathVariable String firstName, @PathVariable String lastName) { return Flux.fromIterable(Arrays.asList(firstName, lastName)) .filter(this::wasWorkingNiceBeforeRefactoring) /* new */ .checkpoint("After filtering") .transform(this::senselessTransformation) /* new */ .checkpoint("After transformation") .collect(Collectors.joining()) .map(names -> "Hello, " + names); }
Jetez un œil aux journaux :
Cette répartition des points de contrôle nous indique que l'erreur a été observée après notre deuxième point de contrôle décrit comme After transformation . Cela ne signifie pas que le premier point de contrôle n'a pas été atteint lors de l'exécution. C'était le cas, mais l'erreur n'a commencé à apparaître qu'après la deuxième. C'est pourquoi nous ne voyons pas After filtering .
Outre la description, vous pouvez forcer Reactor à générer un stacktrace pour votre point de contrôle en ajoutant true
comme deuxième paramètre à la méthode checkpoint()
. Il est important de noter que le stacktrace généré vous mènera à la ligne avec un point de contrôle. Il ne remplira pas de stacktrace pour l'exception d'origine. Cela n'a donc pas beaucoup de sens car vous pouvez facilement trouver un point de contrôle en fournissant une description.