Je souhaite utiliser Reactor pour effectuer un traitement de blocage dans plusieurs threads et résumer les résultats. La méthode de montage spécifique est décrite dans le manuel.
https://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking
Cependant, si vous l'implémentez réellement de cette façon, Schedulers.elastic () générera beaucoup de threads, qui peuvent consommer des ressources.
public static void main(String[] args) {
final var webClient = WebClient.builder()
.baseUrl("http://example.com")
.build();
Flux.fromStream(IntStream.range(1, 100).boxed())
.flatMap(i -> Mono.fromCallable(() -> webClient.get()
.retrieve()
.bodyToMono(String.class)
.block()) //J'ose bloquer pour l'explication
.subscribeOn(Schedulers.elastic()))
.blockLast();
}
Journal d'exécution
・
・
02:24:57.014 [elastic-77] DEBUG o.s.w.r.f.client.ExchangeFunctions - [11ca9c18] HTTP GET http://example.com
02:24:57.017 [elastic-15] DEBUG o.s.w.r.f.client.ExchangeFunctions - [3ee270fc] HTTP GET http://example.com
02:24:57.017 [elastic-61] DEBUG o.s.w.r.f.client.ExchangeFunctions - [272cc048] HTTP GET http://example.com
02:24:57.015 [elastic-45] DEBUG o.s.w.r.f.client.ExchangeFunctions - [12f4ca28] HTTP GET http://example.com
02:24:57.014 [elastic-17] DEBUG o.s.w.r.f.client.ExchangeFunctions - [3d44ed66] HTTP GET http://example.com
02:24:57.017 [elastic-57] DEBUG o.s.w.r.f.client.ExchangeFunctions - [6b5899a3] HTTP GET http://example.com
02:24:57.017 [elastic-92] DEBUG o.s.w.r.f.client.ExchangeFunctions - [7ec595f3] HTTP GET http://example.com
02:24:57.015 [elastic-94] DEBUG o.s.w.r.f.client.ExchangeFunctions - [70f26d87] HTTP GET http://example.com
・
・
public static void main(String[] args) {
final var webClient = WebClient.builder()
.baseUrl("http://example.com")
.build();
Flux.fromStream(IntStream.range(1, 100).boxed())
.flatMap(i -> Mono.fromCallable(() -> webClient.get()
.retrieve()
.bodyToMono(String.class)
.block()) //J'ose bloquer pour l'explication
.subscribeOn(Schedulers.elastic())
, 10) //Spécifiez 10 pour la concurrence
.blockLast();
}
Journal d'exécution
02:28:06.020 [elastic-4] DEBUG o.s.w.r.f.client.ExchangeFunctions - [1b6144e3] HTTP GET http://example.com
02:28:06.020 [elastic-9] DEBUG o.s.w.r.f.client.ExchangeFunctions - [64d61eb3] HTTP GET http://example.com
02:28:06.020 [elastic-5] DEBUG o.s.w.r.f.client.ExchangeFunctions - [1b00ce18] HTTP GET http://example.com
02:28:06.020 [elastic-2] DEBUG o.s.w.r.f.client.ExchangeFunctions - [590d0628] HTTP GET http://example.com
02:28:06.021 [elastic-3] DEBUG o.s.w.r.f.client.ExchangeFunctions - [504a226f] HTTP GET http://example.com
02:28:06.021 [elastic-6] DEBUG o.s.w.r.f.client.ExchangeFunctions - [3ace12f2] HTTP GET http://example.com
02:28:06.021 [elastic-10] DEBUG o.s.w.r.f.client.ExchangeFunctions - [4135ca0a] HTTP GET http://example.com
02:28:06.021 [elastic-8] DEBUG o.s.w.r.f.client.ExchangeFunctions - [badf622] HTTP GET http://example.com
02:28:06.020 [elastic-7] DEBUG o.s.w.r.f.client.ExchangeFunctions - [2dfed701] HTTP GET http://example.com
02:28:06.021 [elastic-11] DEBUG o.s.w.r.f.client.ExchangeFunctions - [753526d8] HTTP GET http://example.com
02:28:06.673 [reactor-http-nio-8] DEBUG o.s.w.r.f.client.ExchangeFunctions - [2dfed701] Response 200 OK
02:28:06.673 [reactor-http-nio-9] DEBUG o.s.w.r.f.client.ExchangeFunctions - [504a226f] Response 200 OK
02:28:06.687 [reactor-http-nio-7] DEBUG o.s.w.r.f.client.ExchangeFunctions - [590d0628] Response 200 OK
02:28:06.757 [reactor-http-nio-6] DEBUG o.s.w.r.f.client.ExchangeFunctions - [753526d8] Response 200 OK
・
・
Le nombre de threads dans le planificateur élastique est limité à 10.
En fait, même en appelant flatMap (), la concurrence est spécifiée en interne. Cependant, parce que la valeur est Large Cela ressemblait à un journal sans restrictions.
De plus, comme Executor peut être spécifié pour subscribeOn, il est possible d'effectuer le même traitement en utilisant FixedThreadPool ou WorkStealingPool (ForkJoinPool), bien qu'il y ait des différences mineures dans l'algorithme. Vous pouvez spécifier .subscribeOn (Schedulers.fromExecutor (executorService))) au lieu de.subscribeOn (Schedulers.elastic ()).
En outre, si vous souhaitez arrêter sûrement ces threads de travail lorsque l'application est arrêtée, il semble efficace d'utiliser ExecutorService.