Utilisation d'Azure Service Bus Ă  partir de Java

Bonjour chers collÚgues! Il se trouve que notre application est écrite dans la pile java, mais est hébergée dans Azure. Et nous essayons de tirer le meilleur parti des services de gestion du fournisseur de cloud.



L'un d'eux est Azure Service Bus, et aujourd'hui je souhaite parler des fonctionnalités de son utilisation dans une application Spring Boot standard.



Si vous voulez en savoir plus sur les fonctionnalités du rùteau, faites défiler jusqu'à la fin de l'article



Qu'est-ce qu'Azure Service Bus?



Quelques mots sur Azure Service Bus est un courtier de messages cloud (remplacement cloud de RabbitMQ, ActiveMQ). Prend en charge les files d'attente (le message est remis à un destinataire) et les sujets (le mécanisme de publication / abonnement) - plus en détail ici, le



support est déclaré:



  1. Messages ordonnés - la documentation dit qu'il s'agit d'un FIFO, MAIS il est implémenté en utilisant le concept de sessions de messages - un groupe de messages, pas la file d'attente entiÚre. Si vous devez garantir l'ordre des messages, vous combinez les messages dans un groupe, et maintenant les messages du groupe seront remis en tant que FIFO. Ainsi, Azure Service Bus Queue n'est pas un FIFO - il délivre vos messages aussi aléatoirement qu'il convient
  2. File d'attente de lettres mortes - tout est simple ici, ils n'ont pas réussi à livrer le message aprÚs N tentatives ou une période de temps - déplacé vers DLQ
  3. Livraison planifiée - vous pouvez définir un délai avant la livraison
  4. Report de message - masque les messages dans la file d'attente, le message ne sera pas remis automatiquement, mais il peut ĂȘtre rĂ©cupĂ©rĂ© par ID. Nous devons stocker cet identifiant quelque part


Comment s'intégrer à Azure Service Bus



Azure Service Bus prend en charge AMQP 1.0, ce qui signifie qu'il n'est pas compatible avec les clients RabbitMQ. bunny utilise AMQP 0.9.1



Le seul client "standard" qui peut fonctionner avec le Service Bus est Apache Qpid .



Il existe 3 façons de coupler votre application Spring Boot avec Service Bus:



  1. JMS + QPID — , — QPID — .

    timeout producer — — factory.setCacheProducers(false);
  2. Spring Cloud — Azure Service Bus — , . Service Bus

    ( 1.2.6) — , azure service bus java sdk.



    Spring Integration — , «Scheduled delivery» «Message deferral» .



    sdk, MessageAndSessionPump

  3. azure service bus java sdk — ,


Spring Cloud — Azure Service Bus



Je vais m'attarder sur cette méthode plus en détail et vous parler des fonctionnalités de l'utilisation de l'

exemple d'application se trouvant dans le référentiel officiel, il est donc inutile de dupliquer le code - le référentiel avec un exemple est ici .



Parce que c'est Spring Integration Messaging, tout se résume à Channel, MessageHandler, MessagingGateway, ServiceActivator.



Et puis il y a le ServiceBusQueueTemplate .



Envoi de messages



Nous devons avoir un canal dans lequel nous écrivons le message que nous voulons envoyer, à l'autre extrémité il y a un MessageHandler qui l'envoie au bus de service.



Le MessagHandler est com.microsoft.azure.spring.integration.core.DefaultMessageHandler - il s'agit du connecteur vers le service externe.



Comment le lier Ă  un canal? - ajoutez l'annotation - @ServiceActivator (inputChannel = OUTPUT_CHANNEL) et maintenant notre MessagHandler Ă©coute le canal OUTPUT_CHANNEL .



Ensuite, nous devons en quelque sorte écrire notre message sur le canal - ici encore la magie du printemps - nous annonçons MessagingGateway et le lions au canal par son nom.



Un extrait de l' exemple :



@MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
public interface QueueOutboundGateway {
    void send(String text);
}


C'est tout: Gateway -> Channel -> MessagHandler -> ServiceBusQueueTemplate -> ServiceBusMessageConverter .



Dans le code, il reste à injecter notre passerelle et à appeler la méthode send .



J'ai mentionnĂ© ServiceBusMessageConverter dans la chaĂźne d'appels pour une raison - si vous souhaitez ajouter des en-tĂȘtes personnalisĂ©s (par exemple CORRELATION_ID) au message, c'est l'endroit oĂč ils doivent ĂȘtre dĂ©placĂ©s de org.springframework.messaging.MessageHeaders vers le message azure.

La méthode spéciale setCustomHeaders .



Dans ce cas, votre passerelle ressemblera Ă  ceci:



@MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
public interface QueueOutboundGateway {
    void send(@Payload String text, @Header("CORRELATION_ID") String correlationId);
}


Recevoir des messages



D'accord, nous savons comment envoyer des messages, comment les obtenir maintenant?



Ici, tout est pareil - MessageProducer -> Channel -> Handler



Le MessageProducer est com.microsoft.azure.spring.integration.servicebus.inbound.ServiceBusQueueInboundChannelAdapter - c'est notre connecteur vers un service externe. À l'intĂ©rieur, il y a le mĂȘme ServiceBusQueueTemplate avec ServiceBusMessageConverter oĂč vous pouvez lire les en-tĂȘtes personnalisĂ©s et les placer dans le message d'intĂ©gration de printemps.



Le canal y est déjà installé à la main:



@Bean
public ServiceBusQueueInboundChannelAdapter queueMessageChannelAdapter(
        @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, 
        ServiceBusQueueOperation queueOperation) {
    queueOperation.setCheckpointConfig(CheckpointConfig.builder().checkpointMode(CheckpointMode.MANUAL).build());
    ServiceBusQueueInboundChannelAdapter adapter = new ServiceBusQueueInboundChannelAdapter(QUEUE_NAME,
            queueOperation);
    adapter.setOutputChannel(inputChannel);
    return adapter;
}


Mais le Handler lui-mĂȘme est attachĂ© au canal via @ServiceActivator .



@ServiceActivator(inputChannel = INPUT_CHANNEL)
public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
    String message = new String(payload);
.......


Vous pouvez immédiatement obtenir la ligne:



@ServiceActivator(inputChannel = INPUT_CHANNEL)
public void messageReceiver(String payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
.......


Vous avez peut-ĂȘtre remarquĂ© l'Ă©trange paramĂštre Checkpointer checkpointer , il est utilisĂ© pour accuser rĂ©ception manuellement du traitement des messages.

Si vous dĂ©finissez CheckpointMode.MANUAL lors de la crĂ©ation du ServiceBusQueueInboundChannelAdapter , vous devez envoyer vous-mĂȘme un accusĂ© de rĂ©ception du message. Si vous utilisez CheckpointMode.RECORD, la confirmation sera envoyĂ©e automatiquement - dĂ©tails dans le code ServiceBusQueueTemplate .







Caractéristiques d'utilisation



Donc, la liste des "rakes" et "chips" sur lesquels nous sommes déjà allés.



ReceiveMode.PEEKLOCK



Azure Service Bus prend en charge le mode PEEKLOCK - le consommateur prend un message, il se verrouille dans le bus de service, est inaccessible à quiconque pendant un certain temps (durée de verrouillage), mais n'en est pas supprimé. Si dans le délai imparti le consommateur n'a pas envoyé de confirmation de traitement - succÚs / abandon ou n'a pas prolongé le verrouillage - le message est considéré à nouveau disponible et une nouvelle tentative de livraison sera effectuée.



Fait intéressant, l' abandon réinitialise simplement le verrou et le message devient instantanément disponible pour une redistribution.



ServiceBusQueueTemplate défaut crée QueueClient le mode ReceiveMode.PEEKLOCK .



Si une exception non gérée vole dans notre gestionnaire- aucun accusé de réception ne sera envoyé au serveur et le message restera verrouillé et sera renvoyé avant l'expiration du délai.

Dans ce cas, le compteur de livraison augmentera, ce qui est logique.



Je ne sais pas s'il s'agit d'un bogue ou d'une fonctionnalitĂ© - mais il est trĂšs pratique de faire un dĂ©lai entre les tentatives pour les situations oĂč cela est nĂ©cessaire.



Si le message ne peut pas ĂȘtre traitĂ© mĂȘme avec une nouvelle tentative, il est nĂ©cessaire d'intercepter les exceptions et de marquer le message comme traitĂ© et d'ajouter une logique supplĂ©mentaire Ă  l'application, sinon il sera remis encore et encore jusqu'Ă  ce qu'il atteigne la limite du nombre de re-remise (configurĂ© lors de la crĂ©ation d'une file d'attente dans le bus de service )



Nombre de messages d'accÚs concurrentiel et de prélecture



Comme vous l'avez peut-ĂȘtre devinĂ©, le paramĂštre d' accĂšs concurrentiel est responsable du nombre de gestionnaires de messages parallĂšles et le nombre de messages de prĂ©lecture correspond au nombre de messages que nous allons entrer dans la mĂ©moire tampon Ă  partir du serveur.



Par défaut, le ServiceBusQueueTemplate est configuré automatiquement (AzureServiceBusQueueAutoConfiguration) avec une valeur de 1 pour les deux paramÚtres, c'est-à-dire par défaut, chaque file d'attente aura un thread de traitement, bien que le concept d'un bus de service avec accusé de réception pour chaque message individuel implique de nombreux processeurs simultanés. Ceci est d'autant plus important si vous avez un long traitement de demande.



Malheureusement, ces paramĂštres ne peuvent pas ĂȘtre dĂ©finis via la configuration de l'application (application.yml / application.properties) et ne peuvent ĂȘtre dĂ©finis que dans le code. Mais mĂȘme via le code, vous ne pourrez pas dĂ©finir diffĂ©rents paramĂštres pour diffĂ©rentes files d'attente.



Par conséquent, si vous devez définir des paramÚtres différents, vous devrez créer plusieurs beans ServiceBusQueueTemplate pour chaque ServiceBusQueueInboundChannelAdapter



CompletableFuture à l'intérieur d'Azure Service Bus Java SDK



Le sdk java de bus de service azure lui-mĂȘme est implĂ©mentĂ© autour de l' exĂ©cuteur CompletableFuture et CachedThreadPool - MessagingFactory.INTERNAL_THREAD_POOL, alors soyez prudent avec toutes sortes de threads locaux.



Messages ordonnés



Nous utilisons le bus de service comme file d'attente de travaux - certains travaux dĂ©pendent les uns des autres et doivent donc ĂȘtre exĂ©cutĂ©s dans l'ordre de leur crĂ©ation.



Comme je l'ai mentionnĂ© ci-dessus, les T-shirts utilisent le concept de sessions de messages - lorsque les messages sont regroupĂ©s dans une session par clĂ© (transmis dans l'en-tĂȘte), la session existe tant qu'il y a au moins un message avec la clĂ© de session - en dĂ©tail dans la documentation

Service bus garantit la livraison des messages au sein d'un tel groupe dans l'ordre d'ajout à serveur (c'est-à-dire dans l'ordre dans lequel le serveur de bus de service les a écrits dans le référentiel).



Il convient Ă©galement de mentionner si vous avez crĂ©Ă© une file d'attente pour les sessions activĂ©es - cela signifie que tous les messages doivent avoir un en-tĂȘte avec une clĂ© de session.



Immédiatement, nous avons été trÚs satisfaits de la possibilité du bus de service d'aligner les messages dans une file d'attente FIFO - mais pour un groupe de messages.



Mais aprÚs un certain temps, nous avons commencé à remarquer des problÚmes:



  • certains messages ont commencĂ© Ă  arriver un nombre infini de fois
  • le traitement de la file d'attente a ralenti
  • dans les statistiques du bus de service, la moitiĂ© des demandes sont marquĂ©es comme ayant Ă©chouĂ© et les demandes ayant Ă©chouĂ© apparaissent mĂȘme dans une file d'attente vide lorsqu'elles sont inactives


En regardant dans le code sdk, nous avons découvert la particularité de travailler avec des sessions:



  1. le consommateur capture la session et commence Ă  lire tous les messages disponibles
  2. traité simultanément le nombre de sessions égal au paramÚtre de concurrence
  3. unhandled exception — 1 ( ) — re-delivery ? 0 exception — ttl .
  4. — success abandon. — delay re-delivery

    .. abandon — , delivery counter .

    delivery count


En conséquence, ils ont abandonné cette fonction de bus de service et ont écrit un vélo, et le bus de service agit comme un déclencheur.



DĂšs que la file d'attente des sessions activĂ©es a Ă©tĂ© annulĂ©e, les erreurs dans les statistiques ont disparu; la requĂȘte au bus de service.



Dans le bundle JMS + Qpid - cette fonctionnalité n'est pas disponible.



ProblÚmes potentiels avec des tailles de file d'attente supérieures à 1G



Je n'ai pas encore rencontré, mais j'ai entendu dire qu'il commence à fonctionner instable si la taille de la file d'attente est supérieure à 1G.



Si vous rencontrez ceci ou vice versa, tout fonctionne - Ă©crivez dans les commentaires.



ProblÚmes avec les demandes de traçage



L'agent Azure Application Insights standard ne sait pas comment suivre l'envoi de messages en tant que dépendance et les messages entrants en tant que demandes.



J'ai dĂ» ajouter du code.



RĂ©sultat



Si vous avez besoin d'une file d'attente de travaux avec un temps de traitement des messages long et que vous n'avez pas besoin d'une file d'attente, vous pouvez utiliser.



Si le traitement des messages est rapide - utilisez Azure Event Hub - Kafka standard, le client standard fonctionne correctement.



All Articles