Qui sera intéressé ?
Le réacteur d'aujourd'hui est élégant, à la mode, jeune. Pourquoi sommes-nous si nombreux à pratiquer la programmation réactive ? Peu de gens peuvent répondre à cette question sans équivoque. Bon - si vous comprenez votre gain, mauvais - si le réacteur est imposé par l'organisation comme une donnée. La plupart des arguments « FOR » sont l'utilisation d'une architecture de microservices, qui à son tour oblige les microservices à communiquer souvent et beaucoup entre eux. Pour la communication, dans la plupart des cas, l'interaction HTTP est choisie. HTTP a besoin d'un serveur Web léger, à quoi pensez-vous en premier ? Matou. Ici, il y a des problèmes avec la limite du nombre maximum de sessions, après quoi le serveur Web commence à rejeter les demandes (bien que cette limite ne soit pas si facile à atteindre). Ici, le réacteur vient à la rescousse, qui n'est pas limité par de telles limites, et, par exemple,Netty en tant que serveur Web qui fonctionne avec une réactivité prête à l'emploi. Puisqu'il existe un serveur web réactif, vous avez besoin d'un client web réactif (Spring WebClient ou Reactive Feign), et comme le client est réactif, alors toute cette horreur s'infiltre dans la logique métier, Mono et Flux deviennent vos meilleurs amis (bien qu'au début il n'est que de la haine :) )
Parmi les tâches commerciales, il existe très souvent des procédures sérieuses qui traitent de grandes quantités de données, et nous devons également utiliser un réacteur pour elles. Ici, les surprises commencent, si vous ne savez pas comment faire cuire le réacteur, vous pouvez avoir beaucoup de problèmes. Dépassement de la limite des descripteurs de fichiers sur le serveur, OutOfMemory en raison de la vitesse incontrôlée du code non bloquant, et bien plus encore, dont nous parlerons aujourd'hui. Mes collègues et moi avons connu beaucoup de difficultés dues à des problèmes pour comprendre comment garder le réacteur sous contrôle, mais tout ce qui ne nous tue pas nous rend plus intelligents !
Code bloquant et non bloquant
, . , . , - , - . , , .
- HTTP , , . Reactive Feign Playtika, Spring Boot + WebFlux + Eureka .
-: , , reactive, - :) Hibernate + PostgreSQL - , JavaMail - , IBMMQ - . , , MongoDB - . , , , (Thread.sleep() / Socket.read() ), - . ? , . 2 :
. BlockHound ( )
, , :
Schedulers.boundedElastic()
.publishOn
&subscribeOn
, , !
1
@Test
fun testLevel1() {
val result = Mono.just("")
.map { "123" }
.block()
assertEquals("123", result)
}
, reactor . ? Mono.just
:) map
"123" block
subscribe
.
block
, , , .block
RestController
, .
2
fun nonBlockingMethod1sec(data: String)
= data.toMono().delayElement(Duration.ofMillis(1000))
@Test
fun testLevel2() {
val result = nonBlockingMethod1sec("Hello world")
.flatMap { nonBlockingMethod1sec(it) }
.block()
assertEquals("Hello world", result)
}
, nonBlockingMethod1sec
, - . - , , .
3
fun collectTasks() = (0..99)
@Test
fun testLevel3() {
val result = nonBlockingMethod1sec("Hello world")
.flatMap { businessContext ->
collectTasks()
.toFlux()
.map {
businessContext + it
}
.collectList()
}
.block()!!
assertEquals(collectTasks().toList().size, result.size)
}
- Flux
! collectTasks
, , Flux
- . map. collectList
.
, . " ", .
4
fun collectTasks() = (0..100)
@Test
fun testLevel4() {
val result = nonBlockingMethod1sec("Hello world")
.flatMap { businessContext ->
collectTasks().toFlux()
.flatMap {
Mono.deferContextual { reactiveContext ->
val hash = businessContext + it + reactiveContext["requestId"]
hash.toMono()
}
}.collectList()
}
.contextWrite { it.put("requestId", UUID.randomUUID().toString()) }
.block()!!
assertEquals(collectTasks().toList().size, result.size)
}
. (15)
, (10)
. .
5
fun collectTasks() = (0..1000)
fun doSomethingNonBlocking(data: String)
= data.toMono().delayElement(Duration.ofMillis(1000))
fun doSomethingBlocking(data: String): String {
Thread.sleep(1000); return data
}
val pool = Schedulers.newBoundedElastic(10, Int.MAX_VALUE, "test-pool")
private val logger = getLogger()
@Test
fun testLevel5() {
val counter = AtomicInteger(0)
val result = nonBlockingMethod1sec("Hello world")
.flatMap { _ ->
collectTasks().toFlux()
.parallel()
.runOn(pool)
.flatMap {
Mono.deferContextual { _ ->
doSomethingNonBlocking(it.toString())
.doOnRequest { logger.info("Added task in pool ${counter.incrementAndGet()}") }
.doOnNext { logger.info("Non blocking code finished ${counter.get()}") }
.map { doSomethingBlocking(it) }
.doOnNext { logger.info("Removed task from pool ${counter.decrementAndGet()}") }
}
}.sequential()
.collectList()
}
.block()!!
assertEquals(collectTasks().toList().size, result.size)
}
! , . : doSomethingNonBlocking
(3)
& doSomethingBlocking
(6)
- , . (10)
, (15)
. parallel
(19)
sequential
(29)
. (20)
. , , doOnRequest
( ), doOnNext
( ). - , .
"", , . - , , . , , . , Flux .
. . , ? 100 , 1 , 1 , 10 ? ( senior reactor developer :))
12 . :) , 100 10 , 10 . , . " " .
(26) .map { doSomethingBlocking(it) }
. , , ?
2 ! 1 " " 1 . 100 . 10 ? ? .
collectTasks()
... 1000? 15000? ?
2 ! 1 " " 1 . . . ?
?
? ? ? 30000 , , , , ( web-client feign, ?) , , SSH . , , " ".
. Thread Pool & Reactor
- , - X , X , - . ? :) .
thread pool - . - , .

reactor! ?

, , . ? epoll , . . , , . , " ?", , . . , - , 500 -, . ! , , Schedulers.boundedElastic()
.
"", ?
!
, , , , , , 4-8 production 32 .
parallel
parallelism

parallelism
, rails ( , , ). Prefetch .
parallelism , .
flatMap
( Flux) , maxConcurrency

maxConcurrency
, Integer.MAX_VALUE
( . ?
, , ( http ), ! .

.
:
parallel (parallelism)
flatMap (maxConcurrency)
, .
- * Integer.MAX_VALUE *
, 5 5 . !
val result = nonBlockingMethod1sec("Hello world")
.flatMap { _ ->
collectTasks().toFlux()
.parallel(1)
.runOn(pool, 1)
.flatMap({
Mono.deferContextual { _ ->
doSomethingNonBlocking(it.toString())
}
}, false, 1, 1)
.sequential()
.collectList()
}
.block()!!
, ?
Thread Pool
? . - , , ! ? , :)
, Schedulers.parallel() ? =)
( parallel, ), , , .
. , , , . , , production . .
, round-robin, .

production , , , .

collectList()
, , 1 . , , .
concatMap
flatMap
( , )
, ( )
, ( )
prefetch
( !)
prefetch
flatMap
& runOn
, , , . - 256. 1, "work stealing", , , , .

C'est tout pour moi. Il sera intéressant de lire vos remarques et commentaires, je ne prétends pas être vrai à 100%, mais tous les résultats sont appuyés par des exemples pratiques, sur Spring Boot + Project Reactor 3.4. Merci à tous!