O Spring WebFlux é uma estrutura da Web reativa e sem bloqueio para criar aplicativos da Web modernos e escaláveis em Java. Faz parte do Spring Framework e usa a biblioteca Reactor para implementar a programação reativa em Java.
O termo “reativo” refere-se a modelos de programação construídos para reagir a mudanças — componentes de rede reagindo a eventos de E/S, controladores de IU reagindo a eventos de mouse e outros. Nesse sentido, o não-bloqueio é reativo, porque, em vez de estarmos bloqueados, estamos agora no modo de reagir a notificações à medida que as operações são concluídas ou os dados se tornam disponíveis.
No Spring WebFlux (e servidores sem bloqueio em geral), presume-se que os aplicativos não bloqueiam. Portanto, os servidores sem bloqueio usam um pool de encadeamentos pequeno e de tamanho fixo (operadores de loop de eventos) para manipular solicitações.
Embora o processamento da solicitação do WebFlux seja um pouco diferente:
Precisamos de um aplicativo bem minimalista gerado pelo . O código está disponível no .
Todos os tópicos relacionados a threads são muito dependentes da CPU. Normalmente, o número de threads de processamento que atendem às solicitações está relacionado ao número de núcleos da CPU . Para fins educacionais, você pode facilmente manipular a contagem de threads em um pool, limitando as CPUs ao executar o contêiner do Docker:
docker run --cpus=1 -d --rm --name webflux-threading -p 8081:8080 local/webflux-threading
Nosso aplicativo é uma cartomante simples. Ao chamar /karma
endpoint, você obterá 5 registros com balanceAdjustment
. Cada ajuste é um número inteiro que representa um carma dado a você. Sim, somos muito generosos porque o app gera apenas números positivos. Não há mais azar!
@GetMapping("/karma") public Flux<Karma> karma() { return prepareKarma() .map(Karma::new) .log(); } private Flux<Integer> prepareKarma() { Random random = new Random(); return Flux.fromStream( Stream.generate(() -> random.nextInt(10)) .limit(5)); }
log
método é uma coisa crucial aqui. Ele observa todos os sinais de fluxos reativos e os rastreia em logs no nível INFO.
A saída de logs em curl localhost:8081/karma
é a seguinte:
Como podemos ver, o processamento está acontecendo no pool de threads de E/S. O nome do tópico ctor-http-nio-2
significa reactor-http-nio-2
. As tarefas foram executadas imediatamente em um thread que as enviou. O Reactor não viu nenhuma instrução para agendá-los em outro pool.
@GetMapping("/delayedKarma") public Flux<Karma> delayedKarma() { return karma() .delayElements(Duration.ofMillis(100)); }
Não precisamos adicionar o método log
aqui porque ele já foi declarado na chamada karma()
original.
Desta vez, apenas o primeiro elemento foi recebido no IO thread reactor-http-nio-4
. O processamento dos 4 restantes foi dedicado a um pool de threads parallel
.
Javadoc de delayElements
confirma isso:
Os sinais são atrasados e continuam no Agendador padrão paralelo
Você pode obter o mesmo efeito sem demora especificando .subscribeOn(Schedulers.parallel())
em qualquer lugar na cadeia de chamadas.
O uso do agendador parallel
pode melhorar o desempenho e a escalabilidade, permitindo que várias tarefas sejam executadas simultaneamente em diferentes threads, o que pode utilizar melhor os recursos da CPU e lidar com um grande número de solicitações simultâneas.
No entanto, ele também pode aumentar a complexidade do código e o uso de memória e potencialmente levar ao esgotamento do pool de encadeamentos se o número máximo de encadeamentos de trabalho for excedido. Portanto, a decisão de usar o pool de encadeamentos parallel
deve ser baseada nos requisitos específicos e compensações do aplicativo.
Vamos usar um flatMap
e fazer uma cartomante mais justa . Para cada instância de Karma, ele multiplicará o ajuste original por 10 e gerará os ajustes opostos, criando efetivamente uma transação balanceada que compensa a original.
@GetMapping("/fairKarma") public Flux<Karma> fairKarma() { return delayedKarma() .flatMap(this::makeFair); } private Flux<Karma> makeFair(Karma original) { return Flux.just(new Karma(original.balanceAdjustment() * 10), new Karma(original.balanceAdjustment() * -10)) .subscribeOn(Schedulers.boundedElastic()) .log(); }
Como você pode ver, o Flux makeFair's
deve estar inscrito em um pool de threads boundedElastic
. Vamos verificar o que temos em logs para os dois primeiros Karmas:
O Reactor inscreve o primeiro elemento com balanceAdjustment=9
no thread IO
Em seguida, o pool boundedElastic
funciona com justiça de Karma emitindo ajustes 90
e -90
no thread boundedElastic-1
Os elementos após o primeiro são inscritos no pool de threads paralelos (porque ainda temos delayedElements
na cadeia)
boundedElastic
?
Por padrão, o pool de threads boundedElastic
um tamanho máximo do número de processadores disponíveis multiplicado por 10, mas você pode configurá-lo para usar um tamanho máximo diferente, se necessário
Usando um pool de encadeamento assíncrono como boundedElastic
, você pode descarregar tarefas para encadeamentos separados e liberar o encadeamento principal para lidar com outras solicitações. A natureza limitada do pool de threads pode evitar a falta de threads e o uso excessivo de recursos, enquanto a elasticidade do pool permite que ele ajuste o número de threads de trabalho dinamicamente com base na carga de trabalho.
single
: Este é um contexto de execução serializado de encadeamento único projetado para execução síncrona. É útil quando você precisa garantir que uma tarefa seja executada em ordem e que duas tarefas não sejam executadas simultaneamente.
immediate
: Esta é uma implementação trivial e não operacional de um agendador que executa tarefas imediatamente no thread de chamada sem nenhuma troca de thread.