Overclocking REACTEUR

Qui sera intéressé ?

Le rĂ©acteur d'aujourd'hui est Ă©lĂ©gant, Ă  la mode, jeune. Pourquoi sommes-nous si nombreux Ă  pratiquer la programmation rĂ©active ? Peu de gens peuvent rĂ©pondre Ă  cette question sans Ă©quivoque. Bon - si vous comprenez votre gain, mauvais - si le rĂ©acteur est imposĂ© par l'organisation comme une donnĂ©e. La plupart des arguments « FOR Â» sont l'utilisation d'une architecture de microservices, qui Ă  son tour oblige les microservices Ă  communiquer souvent et beaucoup entre eux. Pour la communication, dans la plupart des cas, l'interaction HTTP est choisie. HTTP a besoin d'un serveur Web lĂ©ger, Ă  quoi pensez-vous en premier ? Matou. Ici, il y a des problèmes avec la limite du nombre maximum de sessions, après quoi le serveur Web commence Ă  rejeter les demandes (bien que cette limite ne soit pas si facile Ă  atteindre). Ici, le rĂ©acteur vient Ă  la rescousse, qui n'est pas limitĂ© par de telles limites, et, par exemple,Netty en tant que serveur Web qui fonctionne avec une rĂ©activitĂ© prĂŞte Ă  l'emploi. Puisqu'il existe un serveur web rĂ©actif, vous avez besoin d'un client web rĂ©actif (Spring WebClient ou Reactive Feign), et comme le client est rĂ©actif, alors toute cette horreur s'infiltre dans la logique mĂ©tier, Mono et Flux deviennent vos meilleurs amis (bien qu'au dĂ©but il n'est que de la haine :) )





Parmi les tâches commerciales, il existe très souvent des procédures sérieuses qui traitent de grandes quantités de données, et nous devons également utiliser un réacteur pour elles. Ici, les surprises commencent, si vous ne savez pas comment faire cuire le réacteur, vous pouvez avoir beaucoup de problèmes. Dépassement de la limite des descripteurs de fichiers sur le serveur, OutOfMemory en raison de la vitesse incontrôlée du code non bloquant, et bien plus encore, dont nous parlerons aujourd'hui. Mes collègues et moi avons connu beaucoup de difficultés dues à des problèmes pour comprendre comment garder le réacteur sous contrôle, mais tout ce qui ne nous tue pas nous rend plus intelligents !





Code bloquant et non bloquant

, . , . , - , - . , , .





- HTTP , , . Reactive Feign Playtika, Spring Boot + WebFlux + Eureka .





-: , , reactive, - :) Hibernate + PostgreSQL - , JavaMail - , IBMMQ - . , , MongoDB - . , , , (Thread.sleep() / Socket.read() ), - . ? , . 2 :





  • . BlockHound ( )





  • , , : Schedulers.boundedElastic()



    . publishOn



    & subscribeOn







, , !





1





    @Test
    fun testLevel1() {
        val result = Mono.just("")
            .map { "123" }
            .block()

        assertEquals("123", result)
    }
      
      



, reactor . ? Mono.just



:) map



"123" block



subscribe



.





block



, , , . block



RestController



, .





2





    fun nonBlockingMethod1sec(data: String) 
    = data.toMono().delayElement(Duration.ofMillis(1000))

    @Test
    fun testLevel2() {
        val result = nonBlockingMethod1sec("Hello world")
            .flatMap { nonBlockingMethod1sec(it) }
            .block()

        assertEquals("Hello world", result)
    }
      
      



, nonBlockingMethod1sec



, - . - , , .





3





    fun collectTasks() = (0..99)

    @Test
    fun testLevel3() {
        val result = nonBlockingMethod1sec("Hello world")
            .flatMap { businessContext ->
                collectTasks()
                    .toFlux()
                    .map {
                        businessContext + it
                    }
                    .collectList()
            }
            .block()!!

        assertEquals(collectTasks().toList().size, result.size)
    }
      
      



- Flux



! collectTasks



, , Flux



- . map. collectList



.





, . " ", .





4





    fun collectTasks() = (0..100)
    
    @Test
    fun testLevel4() {
        val result = nonBlockingMethod1sec("Hello world")
            .flatMap { businessContext ->
                collectTasks().toFlux()
                    .flatMap {
                        Mono.deferContextual { reactiveContext ->
                            val hash = businessContext + it + reactiveContext["requestId"]
                            hash.toMono()
                        }
                    }.collectList()
            }
            .contextWrite { it.put("requestId", UUID.randomUUID().toString()) }
            .block()!!

        assertEquals(collectTasks().toList().size, result.size)
    }
      
      



. (15)



, (10)



. .





5





    fun collectTasks() = (0..1000)
    
    fun doSomethingNonBlocking(data: String)
        = data.toMono().delayElement(Duration.ofMillis(1000))
    
    fun doSomethingBlocking(data: String): String {
        Thread.sleep(1000); return data
    }

    val pool = Schedulers.newBoundedElastic(10, Int.MAX_VALUE, "test-pool")
    private val logger = getLogger()

    @Test
    fun testLevel5() {
        val counter = AtomicInteger(0)
        val result = nonBlockingMethod1sec("Hello world")
            .flatMap { _ ->
                collectTasks().toFlux()
                    .parallel()
                    .runOn(pool)
                    .flatMap {
                        Mono.deferContextual { _ ->
                            doSomethingNonBlocking(it.toString())
                                .doOnRequest { logger.info("Added task in pool ${counter.incrementAndGet()}") }
                                .doOnNext { logger.info("Non blocking code finished ${counter.get()}") }
                                .map { doSomethingBlocking(it) }
                                .doOnNext { logger.info("Removed task from pool ${counter.decrementAndGet()}") }
                        }
                    }.sequential()
                    .collectList()
            }
            .block()!!

        assertEquals(collectTasks().toList().size, result.size)
    }
      
      



! , . : doSomethingNonBlocking



(3)



& doSomethingBlocking



(6)



- , . (10)



, (15)



. parallel



(19)



sequential



(29)



. (20)



. , , doOnRequest



( ), doOnNext



( ). - , .





"", , . - , , . , , . , Flux .





. . , ? 100 , 1 , 1 , 10 ? ( senior reactor developer :))





12 . :) , 100 10 , 10 . , . " " .





(26) .map { doSomethingBlocking(it) }



. , , ?





2 ! 1 " " 1 . 100 . 10 ? ? .





collectTasks()



... 1000? 15000? ?





2 ! 1 " " 1 . . . ?





?

? ? ? 30000 , , , , ( web-client feign, ?) , , SSH . , , " ".





. Thread Pool & Reactor

- , - X , X , - . ? :) .





thread pool - . - , .





reactor! ?





, , . ? epoll , . . , , . , " ?", , . . , - , 500 -, . ! , , Schedulers.boundedElastic()



.





"", ?





!

, , , , , , 4-8 production 32 .





parallel



parallelism







parallelism



, rails ( , , ). Prefetch .





parallelism , .





flatMap



( Flux) , maxConcurrency







maxConcurrency



, Integer.MAX_VALUE



( . ?





, , ( http ), ! .





.





:





  • parallel (parallelism)





  • flatMap (maxConcurrency)









, .





- * Integer.MAX_VALUE *







, 5 5 . !





        val result = nonBlockingMethod1sec("Hello world")
            .flatMap { _ ->
                collectTasks().toFlux()
                    .parallel(1)
                    .runOn(pool, 1)
                    .flatMap({
                        Mono.deferContextual { _ ->
                            doSomethingNonBlocking(it.toString())
                        }
                    }, false, 1, 1)
                    .sequential()
                    .collectList()
            }
            .block()!!
      
      



, ?





Thread Pool

? . - , , ! ? , :)





, Schedulers.parallel() ? =) ( parallel, ) , , , .





. , , , . , , production . .





, round-robin, .





Réacteur bien chargé (les tâches sont uniformément réparties).  54 tâches bloquantes (1 sec chacune), distribution à tour de rôle sur 6 rails
( ). 54 ( 1),
 round-robin 6

production , , , .





Pool mal chargé (les tâches ne sont pas uniformément réparties) 54 tâches bloquantes (chacune pendant 1 seconde sauf pendant 2 secondes), répartition en round-robin sur 6 rails
( ) 54 ( 1 2),
 round-robin 6

collectList()



, , 1 . , , .









  • concatMap



    flatMap



    ( , )





  • , ( )





  • , ( )





  • prefetch



    ( !)





prefetch



flatMap



& runOn



, , , . - 256. 1, "work stealing", , , , .





Pool bien chargé (les tâches sont réparties uniformément) 54 tâches bloquantes (chacune pendant 1 seconde à l'exception de 2 secondes), distribution en round-robin sur 6 rails Prefetch !
( ) 54 ( 1 2),
 round-robin 6 Prefetch !

C'est tout pour moi. Il sera intéressant de lire vos remarques et commentaires, je ne prétends pas être vrai à 100%, mais tous les résultats sont appuyés par des exemples pratiques, sur Spring Boot + Project Reactor 3.4. Merci à tous!








All Articles