Spring WebFlux ist ein reaktives, nicht blockierendes Webframework zum Erstellen moderner, skalierbarer Webanwendungen in Java. Es ist Teil des Spring Frameworks und nutzt die Reactor-Bibliothek zur Implementierung reaktiver Programmierung in Java.
Der Begriff „reaktiv“ bezieht sich auf Programmiermodelle, die darauf basieren, auf Änderungen zu reagieren – Netzwerkkomponenten reagieren auf E/A-Ereignisse, UI-Controller reagieren auf Mausereignisse und andere. In diesem Sinne ist die Nichtblockierung reaktiv, denn anstatt blockiert zu werden, reagieren wir jetzt auf Benachrichtigungen, wenn Vorgänge abgeschlossen sind oder Daten verfügbar werden.
In Spring WebFlux (und nicht blockierenden Servern im Allgemeinen) wird davon ausgegangen, dass Anwendungen nicht blockieren. Daher verwenden nicht blockierende Server einen kleinen Thread-Pool mit fester Größe (Event-Loop-Worker), um Anfragen zu verarbeiten.
Während die Verarbeitung von WebFlux-Anfragen etwas anders ist:
Wir brauchen eine ziemlich minimalistische App, die von generiert wird. Der Code ist im verfügbar.
Alle Thread-bezogenen Themen sind sehr CPU-abhängig. Normalerweise hängt die Anzahl der Verarbeitungsthreads, die Anforderungen bearbeiten, von der Anzahl der CPU-Kerne ab . Zu Bildungszwecken können Sie die Anzahl der Threads in einem Pool einfach manipulieren, indem Sie die CPUs begrenzen, wenn der Docker-Container ausgeführt wird:
docker run --cpus=1 -d --rm --name webflux-threading -p 8081:8080 local/webflux-threading
Unsere App ist eine einfache Wahrsagerin. Durch den Aufruf von /karma
endpoint erhalten Sie 5 Datensätze mit balanceAdjustment
. Jede Anpassung ist eine ganze Zahl, die ein Ihnen gegebenes Karma darstellt. Ja, wir sind sehr großzügig, da die App nur positive Zahlen generiert. Kein Pech mehr!
@GetMapping("/karma") public Flux<Karma> karma() { return prepareKarma() .map(Karma::new) .log(); } private Flux<Integer> prepareKarma() { Random random = new Random(); return Flux.fromStream( Stream.generate(() -> random.nextInt(10)) .limit(5)); }
log
Methode ist hier von entscheidender Bedeutung. Es beobachtet alle Reactive Streams-Signale und verfolgt sie in Protokollen auf der INFO-Ebene.
Die Protokollausgabe auf „curl localhost:8081/karma
lautet wie folgt:
Wie wir sehen können, findet die Verarbeitung im IO-Thread-Pool statt. Threadname ctor-http-nio-2
steht für reactor-http-nio-2
. Aufgaben wurden sofort in einem Thread ausgeführt, der sie übermittelt hat. Reactor hat keine Anweisungen gesehen, sie in einem anderen Pool einzuplanen.
@GetMapping("/delayedKarma") public Flux<Karma> delayedKarma() { return karma() .delayElements(Duration.ofMillis(100)); }
Wir müssen log
hier nicht hinzufügen, da sie bereits im ursprünglichen karma()
Aufruf deklariert wurde.
Diesmal wurde nur das allererste Element im IO-Thread reactor-http-nio-4
empfangen. Die Verarbeitung der restlichen 4 war einem parallel
Thread-Pool gewidmet.
Javadoc von delayElements
bestätigt dies:
Signale werden verzögert und im parallelen Standard-Scheduler fortgesetzt
Sie können den gleichen Effekt ohne Verzögerung erzielen, indem Sie .subscribeOn(Schedulers.parallel())
an einer beliebigen Stelle in der Aufrufkette angeben.
Die Verwendung parallel
Schedulers kann die Leistung und Skalierbarkeit verbessern, indem er die gleichzeitige Ausführung mehrerer Aufgaben in verschiedenen Threads ermöglicht, wodurch CPU-Ressourcen besser genutzt und eine große Anzahl gleichzeitiger Anforderungen verarbeitet werden können.
Es kann jedoch auch die Komplexität des Codes und die Speichernutzung erhöhen und möglicherweise zur Erschöpfung des Thread-Pools führen, wenn die maximale Anzahl von Arbeitsthreads überschritten wird. Daher sollte die Entscheidung für die Verwendung parallel
Thread-Pools auf den spezifischen Anforderungen und Kompromissen der Anwendung basieren.
Wir werden eine flatMap
verwenden und eine Wahrsagerin fairer machen. Für jede Karma-Instanz wird die ursprüngliche Anpassung mit 10 multipliziert und die entgegengesetzten Anpassungen generiert, wodurch effektiv eine ausgeglichene Transaktion entsteht, die die ursprüngliche Anpassung ausgleicht.
@GetMapping("/fairKarma") public Flux<Karma> fairKarma() { return delayedKarma() .flatMap(this::makeFair); } private Flux<Karma> makeFair(Karma original) { return Flux.just(new Karma(original.balanceAdjustment() * 10), new Karma(original.balanceAdjustment() * -10)) .subscribeOn(Schedulers.boundedElastic()) .log(); }
Wie Sie sehen, sollte makeFair's
Flux in einem boundedElastic
Thread-Pool abonniert werden. Schauen wir uns an, was wir in den Protokollen der ersten beiden Karmas haben:
Der Reaktor abonniert das erste Element mit balanceAdjustment=9
im E/A-Thread
Dann arbeitet der boundedElastic
Pool an der Karma-Fairness, indem er 90
und -90
Anpassungen im boundedElastic-1
Thread ausgibt
Elemente nach dem ersten werden im parallelen Thread-Pool abonniert (da wir immer noch delayedElements
in der Kette haben)
boundedElastic
-Scheduler?
Standardmäßig der boundedElastic
Thread-Pool eine maximale Größe, die der Anzahl der verfügbaren Prozessoren multipliziert mit 10 entspricht. Sie können ihn jedoch bei Bedarf so konfigurieren, dass eine andere maximale Größe verwendet wird
Durch die Verwendung eines asynchronen Thread-Pools wie boundedElastic
können Sie Aufgaben auf separate Threads auslagern und den Haupt-Thread für die Bearbeitung anderer Anforderungen freigeben. Die begrenzte Beschaffenheit des Thread-Pools kann Thread-Hunger und übermäßige Ressourcennutzung verhindern, während die Elastizität des Pools es ihm ermöglicht, die Anzahl der Arbeitsthreads dynamisch an die Arbeitslast anzupassen.
single
: Dies ist ein Single-Threaded-serialisierter Ausführungskontext, der für die synchrone Ausführung konzipiert ist. Dies ist nützlich, wenn Sie sicherstellen müssen, dass eine Aufgabe in der richtigen Reihenfolge ausgeführt wird und dass nicht zwei Aufgaben gleichzeitig ausgeführt werden.
immediate
: Dies ist eine triviale No-Op-Implementierung eines Schedulers, der Aufgaben im aufrufenden Thread sofort ausführt, ohne dass der Thread gewechselt werden muss.