Spring WebFlux là một khung web phản ứng, không chặn để xây dựng các ứng dụng web hiện đại, có thể mở rộng trong Java. Nó là một phần của Spring Framework và nó sử dụng thư viện Reactor để triển khai lập trình phản ứng trong Java.
Thuật ngữ “phản ứng” đề cập đến các mô hình lập trình được xây dựng xung quanh việc phản ứng với sự thay đổi — các thành phần mạng phản ứng với các sự kiện I/O, bộ điều khiển giao diện người dùng phản ứng với các sự kiện chuột và những thứ khác. Theo nghĩa đó, không chặn là phản ứng, bởi vì, thay vì bị chặn, chúng tôi hiện đang ở chế độ phản ứng với các thông báo khi hoạt động hoàn tất hoặc dữ liệu có sẵn.
Trong Spring WebFlux (và các máy chủ không chặn nói chung), giả định rằng các ứng dụng không chặn. Do đó, các máy chủ không chặn sử dụng một nhóm luồng nhỏ, có kích thước cố định (nhân viên vòng lặp sự kiện) để xử lý các yêu cầu.
Mặc dù quá trình xử lý yêu cầu WebFlux hơi khác một chút:
Chúng tôi cần một ứng dụng khá tối giản được tạo bởi . Mã có sẵn trong .
Tất cả các chủ đề liên quan đến luồng đều phụ thuộc rất nhiều vào CPU. Thông thường, số luồng xử lý xử lý các yêu cầu có liên quan đến số lượng lõi CPU . Đối với mục đích giáo dục, bạn có thể dễ dàng thao tác số lượng luồng trong nhóm bằng cách giới hạn CPU khi chạy bộ chứa Docker:
docker run --cpus=1 -d --rm --name webflux-threading -p 8081:8080 local/webflux-threading
Ứng dụng của chúng tôi là một thầy bói đơn giản. Bằng cách gọi /karma
điểm cuối, bạn sẽ nhận được 5 bản ghi với balanceAdjustment
. Mỗi lần điều chỉnh là một số nguyên đại diện cho một nghiệp được ban cho bạn. Vâng, chúng tôi rất hào phóng vì ứng dụng chỉ tạo ra các số dương. Không còn xui xẻo nữa!
@GetMapping("/karma") public Flux<Karma> karma() { return prepareKarma() .map(Karma::new) .log(); } private Flux<Integer> prepareKarma() { Random random = new Random(); return Flux.fromStream( Stream.generate(() -> random.nextInt(10)) .limit(5)); }
log
là một điều quan trọng ở đây. Nó quan sát tất cả các tín hiệu Luồng phản ứng và theo dõi chúng vào các bản ghi dưới cấp INFO.
Đầu ra nhật ký trên curl localhost:8081/karma
như sau:
Như chúng ta có thể thấy, quá trình xử lý đang diễn ra trên nhóm luồng IO. Tên chủ đề ctor-http-nio-2
là viết tắt của reactor-http-nio-2
. Các tác vụ được thực thi ngay lập tức trên một chuỗi đã gửi chúng. Reactor không thấy bất kỳ hướng dẫn nào để lên lịch cho chúng trên một nhóm khác.
@GetMapping("/delayedKarma") public Flux<Karma> delayedKarma() { return karma() .delayElements(Duration.ofMillis(100)); }
Chúng ta không cần thêm phương thức log
ở đây vì nó đã được khai báo trong lệnh gọi karma()
ban đầu.
Lần này chỉ nhận được phần tử đầu tiên trên luồng IO reactor-http-nio-4
. Việc xử lý 4 phần còn lại được dành riêng cho nhóm luồng song parallel
.
Javadoc của delayElements
xác nhận điều này:
Tín hiệu bị trì hoãn và tiếp tục trên Bộ lập lịch mặc định song song
Bạn có thể đạt được hiệu quả tương tự mà không bị chậm trễ bằng cách chỉ định .subscribeOn(Schedulers.parallel())
ở bất kỳ đâu trong chuỗi cuộc gọi.
Sử dụng bộ lập lịch parallel
có thể cải thiện hiệu suất và khả năng mở rộng bằng cách cho phép nhiều tác vụ được thực thi đồng thời trên các luồng khác nhau, điều này có thể sử dụng tốt hơn tài nguyên CPU và xử lý một số lượng lớn yêu cầu đồng thời.
Tuy nhiên, nó cũng có thể làm tăng độ phức tạp của mã và mức sử dụng bộ nhớ, đồng thời có khả năng dẫn đến cạn kiệt nhóm luồng nếu vượt quá số lượng luồng công nhân tối đa. Do đó, quyết định sử dụng nhóm luồng parallel
phải dựa trên các yêu cầu cụ thể và sự đánh đổi của ứng dụng.
Chúng tôi sẽ sử dụng flatMap
và làm cho một thầy bói trở nên công bằng hơn. Đối với mỗi phiên bản Karma, nó sẽ nhân số điều chỉnh ban đầu lên 10 và tạo ra các điều chỉnh ngược lại, tạo ra một giao dịch cân bằng bù đắp cho giao dịch ban đầu một cách hiệu quả.
@GetMapping("/fairKarma") public Flux<Karma> fairKarma() { return delayedKarma() .flatMap(this::makeFair); } private Flux<Karma> makeFair(Karma original) { return Flux.just(new Karma(original.balanceAdjustment() * 10), new Karma(original.balanceAdjustment() * -10)) .subscribeOn(Schedulers.boundedElastic()) .log(); }
Như bạn thấy, Flux makeFair's
phải được đăng ký vào nhóm chủ đề boundedElastic
. Hãy kiểm tra những gì chúng ta có trong nhật ký của hai Karma đầu tiên:
Lò phản ứng đăng ký phần tử đầu tiên với balanceAdjustment=9
trên luồng IO
Sau đó, nhóm boundedElastic
hoạt động dựa trên sự công bằng của Karma bằng cách đưa ra các điều chỉnh 90
và -90
trên luồng boundedElastic-1
Các phần tử sau phần tử đầu tiên được đăng ký trên nhóm luồng song song (vì chúng tôi vẫn có delayedElements
trong chuỗi)
boundedElastic
là gì ?
Theo mặc định, nhóm luồng boundedElastic
kích thước tối đa bằng số lượng bộ xử lý khả dụng nhân với 10, nhưng bạn có thể định cấu hình nó để sử dụng kích thước tối đa khác nếu cần
Bằng cách sử dụng nhóm luồng không đồng bộ như boundedElastic
, bạn có thể giảm tải các tác vụ cho các luồng riêng biệt và giải phóng luồng chính để xử lý các yêu cầu khác. Bản chất có giới hạn của nhóm luồng có thể ngăn chặn tình trạng thiếu luồng và sử dụng tài nguyên quá mức, trong khi tính linh hoạt của nhóm cho phép nó điều chỉnh động số lượng luồng công nhân dựa trên khối lượng công việc.
single
: Đây là ngữ cảnh thực thi tuần tự, đơn luồng được thiết kế để thực thi đồng bộ. Nó rất hữu ích khi bạn cần đảm bảo rằng một tác vụ được thực hiện theo thứ tự và không có hai tác vụ nào được thực hiện đồng thời.
immediate
: Đây là một triển khai tầm thường, không cần thao tác của bộ lập lịch thực hiện ngay các tác vụ trên luồng đang gọi mà không cần bất kỳ chuyển đổi luồng nào.