Utilisation de la liaison Spring Cloud Stream avec Kafka Message Broker

Bonjour à tous! Je m'appelle Vitaly, je suis développeur chez Web3Tech. Dans cet article, je présenterai les concepts et les constructions de base du framework Spring Cloud Stream pour prendre en charge et travailler avec les courtiers de messages Kafka, avec une boucle complète de leurs tests unitaires contextuels. Nous utilisons un tel schéma dans notre projet de vote électronique entièrement russe sur la plateforme blockchain Waves Enterprise .





Dans le cadre de l'équipe de projet Spring Cloud, Spring Cloud Stream est basé sur Spring Boot et utilise Spring Integration pour fournir une communication avec les courtiers de messages. Cependant, il s'intègre facilement à divers courtiers de messages et nécessite une configuration minimale pour créer des microservices basés sur des événements ou des messages.





Configuration et dépendances

Tout d'abord, nous devons ajouter la dépendance spring-cloud-starter-stream-kafka à build.gradle :





dependencies {
   implementation(kotlin("stdlib"))
   implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinCoroutinesVersion")
   implementation("com.fasterxml.jackson.module:jackson-module-kotlin")

   implementation("org.springframework.boot:spring-boot-starter-web")
   implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka")

   testImplementation("org.springframework.boot:spring-boot-starter-test")
   testImplementation("org.springframework.cloud:spring-cloud-stream-test-support")
   testImplementation("org.springframework.kafka:spring-kafka-test:springKafkaTestVersion")
}
      
      



Dans la configuration du projet Spring Cloud Stream, vous devez inclure l'URL du courtier Kafka, le nom de la file d'attente (sujet) et d'autres paramètres de liaison. Voici un exemple de configuration YAML pour le service application.yaml :





spring:
 application:
   name: cloud-stream-binding-kafka-app
 cloud:
   stream:
     kafka:
       binder:
         brokers: 0.0.0.0:8080
         configuration:
           auto-offset-reset: latest
     bindings:
       customChannel:                   #Channel name
         destination: 0.0.0.0:8080      #Destination to which the message is sent (topic)
         group: input-group-N
         contentType: application/json
         consumer:
           max-attempts: 1
           autoCommitOffset: true
           autoCommitOnError: false
      
      



Concept et classes

, , Spring Cloud Stream, , (SpringCloudStreamBindingKafkaApp.kt):





@EnableBinding(ProducerBinding::class)

@SpringBootApplication
 
 class SpringCloudStreamBindingKafkaApp

 fun main(args: Array<String>) {

    SpringApplication.run(SpringCloudStreamBindingKafkaApp::class.java, *args)

 }
      
      



@EnableBinding , .





.





Binding — , .

Binder — middleware .

Channel — middleware .

StreamListeners — (beans), , MessageConverter middleware “DTO”.

Message Schema — , . .





send/receive, producer consumer. , Spring Cloud Stream.





Producer Kafka, (ProducerBinding.kt):





interface ProducerBinding {

   @Output(BINDING_TARGET_NAME)
   fun messageChannel(): MessageChannel
}
      
      



onsumer Kafka .





ConsumerBinding.kt:





interface ConsumerBinding {

   companion object {
       const val BINDING_TARGET_NAME = "customChannel"
   }

   @Input(BINDING_TARGET_NAME)
   fun messageChannel(): MessageChannel
}
      
      



Consumer.kt:





@EnableBinding(ConsumerBinding::class)
class Consumer(val messageService: MessageService) {

   @StreamListener(target = ConsumerBinding.BINDING_TARGET_NAME)
   fun process(
       @Payload message: Map<String, Any?>,
       @Header(value = KafkaHeaders.OFFSET, required = false) offset: Int?
   ) {
       messageService.consume(message)
   }
}
      
      



Kafka . Kafka, spring-kafka-test.





MessageCollector

, . ProducerBinding payload ProducerTest.kt:





@SpringBootTest
class ProducerTest {

   @Autowired
   lateinit var producerBinding: ProducerBinding

   @Autowired
   lateinit var messageCollector: MessageCollector

   @Test
   fun `should produce somePayload to channel`() {
       // ARRANGE
       val request = mapOf(1 to "foo", 2 to "bar", "three" to 10101)

       // ACT
producerBinding.messageChannel().send(MessageBuilder.withPayload(request).build())
       val payload = messageCollector.forChannel(producerBinding.messageChannel())
           .poll()
           .payload

       // ASSERT
       val payloadAsMap = jacksonObjectMapper().readValue(payload.toString(), Map::class.java)
       assertTrue(request.entries.stream().allMatch { re ->
           re.value == payloadAsMap[re.key.toString()]
       })

       messageCollector.forChannel(producerBinding.messageChannel()).clear()
   }
}
      
      



Embedded Kafka

@ClassRule . Kafka Zookeeper , . Kafka Zookeper (ConsumerTest.kt):





@SpringBootTest
@ActiveProfiles("test")
@EnableAutoConfiguration(exclude = [TestSupportBinderAutoConfiguration::class])
@EnableBinding(ProducerBinding::class)
class ConsumerTest {

   @Autowired
   lateinit var producerBinding: ProducerBinding

   @Autowired
   lateinit var objectMapper: ObjectMapper

   @MockBean
   lateinit var messageService: MessageService

   companion object {
       @ClassRule @JvmField
       var embeddedKafka = EmbeddedKafkaRule(1, true, "any-name-of-topic")
   }

   @Test
   fun `should consume via txConsumer process`() {
       // ACT
       val request = mapOf(1 to "foo", 2 to "bar")
       producerBinding.messageChannel().send(MessageBuilder.withPayload(request)
           .setHeader("someHeaderName", "someHeaderValue")
           .build())

       // ASSERT
       val requestAsMap = objectMapper.readValue<Map<String, Any?>>(objectMapper.writeValueAsString(request))
       runBlocking {
           delay(20)
           verify(messageService).consume(requestAsMap)
       }
   }
}
      
      



Dans cet article, j'ai démontré les capacités de Spring Cloud Stream et comment l'utiliser avec Kafka. Spring Cloud Stream offre une interface conviviale avec des nuances simplifiées de configuration de courtier, est rapidement mis en œuvre, fonctionne de manière stable et prend en charge les courtiers modernes populaires tels que Kafka. En conséquence, j'ai donné un certain nombre d'exemples avec des tests unitaires basés sur EmbeddedKafkaRule à l'aide de MessageCollector.





Toutes les sources peuvent être trouvées sur Github . Merci d'avoir lu!








All Articles