Saber um pouco sobre um domínio e tecnologias não vai facilitar sua vida.
@GetMapping("/greeting/{firstName}/{lastName}") public Mono<String> greeting(@PathVariable String firstName, @PathVariable String lastName) { return Flux.fromIterable(Arrays.asList(firstName, lastName)) .filter(this::wasWorkingNiceBeforeRefactoring) .transform(this::senselessTransformation) .collect(Collectors.joining()) .map(names -> "Hello, " + names); } private boolean wasWorkingNiceBeforeRefactoring(String aName) { // We don't want to greet with John, sorry return !aName.equals("John"); } private Flux<String> senselessTransformation(Flux<String> flux) { return flux .single() .flux() .subscribeOn(Schedulers.parallel()); }
> curl localhost:8080/greeting/John/Doe > Hello, Doe
Mas quando você o executa como curl localhost:8080/greeting/Mick/Jagger
, você vê o próximo stacktrace:
java.lang.IndexOutOfBoundsException: Source emitted more than one item at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:134) ~[reactor-core-3.5.5.jar:3.5.5] Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Error has been observed at the following site(s): *__checkpoint ⇢ Handler com.example.demo.controller.GreetingController#greeting(String, String) [DispatcherHandler] *__checkpoint ⇢ HTTP GET "/greeting/Mick/Jagger" [ExceptionHandlingWebHandler] Original Stack Trace: <18 internal lines> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na] (4 internal lines)
Tudo o que revela é que 1) ocorreu no método GreetingController#greeting
e 2) o cliente executou um `HTTP GET "/greeting/Mick/Jagger
@GetMapping("/greeting/{firstName}/{lastName}") public Mono<String> greeting(@PathVariable String firstName, @PathVariable String lastName) { return Flux.fromIterable(Arrays.asList(firstName, lastName)) // <...> .doOnError(e -> logger.error("Error while greeting", e)); }
doOnError
pode/não pode ser útil durante a depuração: Registro : você pode usar doOnError
para registrar mensagens de erro e fornecer mais contexto sobre o que deu errado em seu fluxo reativo. Isso pode ser especialmente útil ao depurar problemas em um fluxo complexo com muitos operadores.
Recuperação : doOnError
também pode ser usado para recuperar erros e continuar processando o fluxo. Por exemplo, você pode usar onErrorResume
para fornecer um valor ou fluxo de fallback em caso de erro.
Depuração : provavelmente doOnError
não fornecerá nenhum stacktrace melhor, exceto o que você já viu nos logs. Não confie nele como um bom solucionador de problemas.
A próxima parada é substituir a chamada de método doOnError()
adicionada anteriormente pela chamada de método log()
. Por mais simples que pareça. log()
observa todos os sinais de fluxos reativos e os rastreia em logs no nível INFO por padrão.
Podemos ver quais métodos Reativos foram chamados ( onSubscribe
, request
e onError
). Além disso, saber de quais threads (pools) esses métodos foram chamados pode ser uma informação muito útil. No entanto, não é relevante para o nosso caso.
Sobre pools de threads
O nome do tópico ctor-http-nio-2
significa reactor-http-nio-2
. Os métodos reativos onSubscribe()
e request()
foram executados no pool de threads de E/S (scheduler). Essas tarefas foram executadas imediatamente em um thread que as enviou.
Tendo .subscribeOn(Schedulers.parallel())
dentro de senselessTransformation
, instruímos o Reactor a inscrever outros elementos em outro pool de threads. Essa é a razão pela qual onError
foi executado no thread parallel-1
.
Você pode ler mais sobre pool de threads neste artigo .
log()
permite adicionar instruções de log ao seu stream, facilitando o rastreamento do fluxo de dados e o diagnóstico de problemas. Se estivéssemos tendo um fluxo de dados mais complexo com coisas como flatMap, subchains, chamadas de bloqueio, etc., nos beneficiaríamos muito de ter tudo registrado. É uma coisa muito fácil e agradável para o uso diário. No entanto, ainda não sabemos a causa raiz.
A instrução Hooks.onOperatorDebug()
informa ao Reactor para ativar o modo de depuração para todos os operadores em fluxos reativos, permitindo mensagens de erro e rastreamentos de pilha mais detalhados.
public Mono<String> greeting(@PathVariable String firstName, @PathVariable String lastName) { Hooks.onOperatorDebug(); return // <...> }
Adicionando Hooks.onOperatorDebug()
podemos finalmente progredir em nossa investigação. Stacktrace é muito mais útil:
E na linha 42 temos a chamada single()
.
Não role para cima, o senselessTransformation
aparece a seguir:
private Flux<String> senselessTransformation(Flux<String> flux) { return flux .single() // line 42 .flux() .subscribeOn(Schedulers.parallel()); }
single()
emite um item da fonte Flux ou sinaliza IndexOutOfBoundsException
para uma fonte com mais de um elemento. Isso significa que o fluxo no método emite mais de 1 item. Subindo na hierarquia de chamadas vemos que originalmente existe um Flux com dois elementos Flux.fromIterable(Arrays.asList(firstName, lastName))
.
O método de filtragem wasWorkingNiceBeforeRefactoring
remove um item de um fluxo quando ele é igual a John . Essa é a razão pela qual o código funciona para uma faculdade chamada John. Huh.
Hooks.onOperatorDebug()
pode ser particularmente útil ao depurar fluxos reativos complexos, pois fornece informações mais detalhadas sobre como o fluxo está sendo processado. No entanto, ativar o modo de depuração pode afetar o desempenho do seu aplicativo (devido aos rastreamentos de pilha preenchidos), portanto, ele deve ser usado apenas durante o desenvolvimento e a depuração, e não na produção.
Para obter quase o mesmo efeito que Hooks.onOperatorDebug()
oferece com impacto mínimo no desempenho, existe um operador checkpoint()
especial. Ele ativará o modo de depuração para essa seção do fluxo, deixando o restante do fluxo inalterado.
public Mono<String> greeting(@PathVariable String firstName, @PathVariable String lastName) { return Flux.fromIterable(Arrays.asList(firstName, lastName)) .filter(this::wasWorkingNiceBeforeRefactoring) /* new */ .checkpoint("After filtering") .transform(this::senselessTransformation) /* new */ .checkpoint("After transformation") .collect(Collectors.joining()) .map(names -> "Hello, " + names); }
Dê uma olhada nos registros:
Essa divisão de pontos de verificação nos informa que o erro foi observado após nosso segundo ponto de verificação descrito como Após a transformação . Isso não significa que o primeiro ponto de verificação não foi alcançado durante a execução. Era, mas o erro começou a aparecer só depois do segundo. É por isso que não vemos Após a filtragem .
Além da descrição, você pode forçar o Reactor a gerar um stacktrace para seu ponto de verificação adicionando true
como segundo parâmetro ao método checkpoint()
. É importante observar que o stacktrace gerado o levará à linha com um ponto de verificação. Ele não preencherá um stacktrace para a exceção original. Portanto, não faz muito sentido porque você pode encontrar facilmente um ponto de verificação fornecendo uma descrição.