Microservices avec communication via Axon

Dans ce tutoriel simple, nous allons créer quelques microservices sur Spring Boot et organiser la communication entre eux via le framework Axon.










Disons que nous avons une telle tâche.



Il existe une source de transactions sur le marché boursier. Cette source nous envoie des transactions via l'interface Rest.



Nous devons obtenir ces transactions, les enregistrer dans la base de données et créer un stockage en mémoire pratique.



Ce référentiel doit remplir les fonctions suivantes:



  • retourner une liste de métiers;
  • retourne la position complète, c'est-à-dire tableau "instrument" - "nombre actuel de titres";
  • renvoie une position pour un instrument donné.


Comment abordons-nous cette tâche?



Selon les préceptes de la mode des microservices, nous devons diviser la tâche en composants microservices:



  • réception d'une transaction par Rest;
  • enregistrer la transaction dans la base de données;
  • stockage en mémoire pour présenter des données de position.


Faisons les premier et troisième services dans le cadre de ce tutoriel, et laissons le second pour la deuxième partie (écrivez dans les commentaires si c'est intéressant).



Donc, nous avons deux microservices.



Le premier reçoit des données de l'extérieur.



Le second traite ces données et répond aux demandes entrantes.



Bien sûr, nous voulons obtenir une mise à l'échelle horizontale, des mises à jour non-stop et d'autres avantages des microservices.



Quelle est la tâche très difficile qui nous attend?



Il y en a en fait beaucoup, mais parlons maintenant de la manière dont les données circuleront entre ces microservices. Vous pouvez également faire du repos entre eux, vous pouvez mettre une sorte de file d'attente, vous pouvez trouver beaucoup de choses avec leurs avantages et leurs inconvénients.



Examinons une approche possible: la communication asynchrone via le framework Axon .



Quels sont les avantages de cette solution?



Premièrement, la communication asynchrone augmente la flexibilité (oui, il y a un inconvénient ici, mais nous ne parlons que des pros pour l'instant).



Deuxièmement, nous obtenons directement l' approvisionnement en événements et le CQRS .

Troisièmement, Axon fournit une infrastructure prête à l'emploi et nous devons uniquement nous concentrer sur le développement de la logique métier.



Commençons.



Nous aurons le projet sur gradle. Il comportera trois modules:



  • commun. module avec des structures de données communes (nous n'aimons pas le copier-coller);
  • tradeCreator. module avec microservice pour accepter les transactions au repos;
  • tradeQueries. module avec microservices pour l'affichage de la position.


Prenons Spring Boot comme base et connectons le démarreur Axon.



Axon fonctionne bien sans Spring, mais nous les utiliserons ensemble.



Ici, nous devons nous arrêter et vous dire quelques mots sur Axon.



C'est un système client-serveur. Il y a un serveur - c'est une application séparée, nous l'exécuterons dans docker.



Et il y a des clients qui s'intègrent dans des microservices.

Voici la photo. Tout d'abord, le serveur Axon (dans docker) est lancé, puis nos microservices.



Au démarrage, les microservices recherchent un serveur et commencent à interagir avec lui. L'interaction peut être conditionnellement divisée en deux types: technique et commerciale.



La technique est l'échange de messages «Je suis vivant» (ces messages peuvent être vus en mode de journalisation de débogage).



Les affaires sont obscurcies par des messages comme "nouvelle offre".



Une caractéristique importante, après le démarrage du microservice, il peut demander au serveur Axon "ce qui s'est passé" et le serveur envoie les événements accumulés au microservice. Ainsi, le microservice peut être redémarré de manière relativement sûre sans perdre de données.

Avec ce schéma d'échange, nous pouvons très facilement exécuter de nombreuses instances de microservices,

et sur différents hôtes.



Oui, une instance du serveur Axon n'est pas fiable, mais jusqu'à présent.



Nous travaillons dans les paradigmes Event Sourcing et CQRS. Cela signifie que nous devons avoir des «équipes», des «événements» et des «échantillons».



Nous aurons une commande: «créer un accord», un événement «accord créé» et trois sélections: «afficher toutes les offres», «afficher la position», «afficher la position d'un instrument».



Le schéma de travail est le suivant:



  1. Le microservice TradeCreator accepte une transaction Rest.
  2. Le microservice tradeCreator crée une commande "create trade" et l'envoie au serveur Axon.
  3. Le serveur Axon reçoit la commande et envoie la commande au destinataire intéressé, dans notre cas c'est le microservice tradeCreator.
  4. Le microservice tradeCreator reçoit une commande, génère un événement "transaction créée" et l'envoie au serveur Axon.
  5. Le serveur Axon reçoit l'événement et le transmet aux abonnés intéressés.
  6. Désormais, nous n'avons qu'un seul destinataire intéressé: le microservice tradeQueries.
  7. Le microservice tradeQueries reçoit l'événement et met à jour les données internes.


(Il est important qu'au moment de la formation de l'événement, le microservice tradeQueries puisse ne pas être disponible, mais dès qu'il démarre, il recevra immédiatement l'événement).



Oui, le serveur axone est au centre de la communication, tous les messages y transitent.



Passons au codage.



Afin de ne pas encombrer le message de code, ci-dessous je ne donnerai que des fragments, le lien vers l'ensemble de l'exemple sera ci-dessous.



Commençons par le module commun.



Dans celui-ci, les parties communes sont l'événement (classe CreatedTradeEvent). Faites attention au nom, en fait, c'est le nom de l'équipe qui a généré cet événement, mais au passé. Dans le passé, parce que tout d'abord, la commande qui entraîne la création de l'événement apparaît.



D'autres structures courantes incluent des classes pour décrire une position (position de classe), un métier (classe Trade) et un côté d'un métier (enum Side), c.-à-d. acheter ou vendre.



Passons au module tradeCreator.



Ce module dispose d'une interface Rest (classe TradeController) pour accepter les trades.

La commande "créer une transaction" est formée à partir de la transaction reçue et envoyée au serveur axon.



    @PostMapping("/trade")
    public ResponseEntity<String> create(@RequestBody Trade trade) {
        var createTradeCommand = CreateTradeCommand.builder()
                .tradeId(trade.getTradeId())
	...
                .build();
        var result = commandGateway.sendAndWait(createTradeCommand, 3, TimeUnit.SECONDS);
        return ResponseEntity.ok(result.get().toString());
    }


Pour traiter la commande, la classe TradeAggregate est utilisée.

Pour qu'Axon le trouve, nous ajoutons l'annotation @Aggregate.

La méthode de traitement de la commande ressemble à ceci (avec une abréviation):



    @CommandHandler
    public TradeAggregate(CreateTradeCommand command) {
        log.info("command: {}", command);
        var event = CreatedTradeEvent.builder()
                .tradeId(command.tradeId())
		....
                .build();
        AggregateLifecycle.apply(event);
    }


Un événement est généré à partir de la commande et envoyé au serveur.

La commande est dans la classe CreateTradeCommand.



Jetons maintenant un coup d'œil au dernier module tradeQueries.



Les sélections sont décrites dans le package de requêtes.

Ce module a également une interface de

classe publique TradeController Rest .



Par exemple, voyons le traitement de la requête: "afficher toutes les transactions".



    @GetMapping("/trade/all")
    public List<Trade> findAllTrades() {
        return queryGateway.query(new FindAllTradesQuery(),
                ResponseTypes.multipleInstancesOf(Trade.class)).join();
    }


Une demande de récupération est créée et envoyée au serveur.



La classe TradesEventHandler est utilisée pour traiter la demande de récupération.

Il a une méthode annotée



   @QueryHandler
    public List<Position> handleFindCurrentPositionQuery(FindCurrentPositionQuery query)


C'est lui qui est chargé de récupérer les données du stockage en mémoire.



La question se pose de savoir comment les informations sont mises à jour dans ce magasin.



Pour commencer, il ne s'agit que d'une collection de ConcurrentHashMaps adaptées à des sélections spécifiques.

Pour les mettre à jour, la méthode est appliquée:



    @EventHandler
    public void on(CreatedTradeEvent event) {
        log.info("event:{}", event);

        var trade = Trade.builder()
	...
                .build();
        trades.put(event.tradeId(), trade);
        position.merge(event.shortName(), event.size(),
                (oldValue, value) -> event.side() == Side.BUY ? oldValue + value : oldValue - value);
    }


Il reçoit l'événement «accord créé» et met à jour les cartes.



Ce sont les points forts du développement des microservices.



Qu'en est-il des lacunes d'Axon?



Tout d'abord, c'est la complication de l'infrastructure, un point de défaillance est apparu - le serveur Axon, toutes les communications passent par lui.



Deuxièmement, l'inconvénient de ces systèmes distribués se manifeste très clairement - une incohérence temporaire des données. Dans notre cas, un temps inacceptable peut s'écouler entre la réception d'une nouvelle offre et la mise à jour des données des échantillons.



Que reste-t-il dans les coulisses?



Rien n'est dit du tout sur Event Sourcing et CQRS, de quoi il s'agit et à quoi cela sert.

Sans divulguer ces concepts, certains points pourraient ne pas être clairs.



Peut-être que certains fragments de code nécessitent également des clarifications.



Nous en avons parlé lors d' un webinaire ouvert .



Exemple complet .



All Articles