Spring Integration - flux de données dynamiques

Feu d'artifice, Habr! Aujourd'hui, nous allons analyser un domaine assez spécifique - le streaming de données à l'aide du framework Spring Integration et comment effectuer ces flux au moment de l'exécution sans initialisation préalable dans le contexte de l'application. Un exemple d'application complet se trouve dans la Gita .



introduction



Spring Integration est un cadre d'intégration d'entreprise (EIP) qui utilise sous le capot des mécanismes de messagerie entre les adaptateurs de divers protocoles / systèmes d'intégration basés sur des canaux de messages (files d'attente conditionnelles). Les analogues célèbres sont Camel, Mule, Nifi.



À partir du cas de test, nous devrons - créer un service REST capable de lire les paramètres de requête reçus, accéder à notre base de données, par exemple, postgres, mettre à jour et extraire les données de la table en fonction des paramètres reçus de la source, et renvoyer le résultat à la file d'attente (requête / response), et créez également plusieurs instances avec différents chemins de requête.



Par convention, le diagramme de flux de données ressemblera à ceci:



image



Ensuite, je vais montrer comment vous pouvez simplement le faire sans trop danser avec un tambourin, en utilisant IntegrationFlowContext, avec des points de terminaison de composant / thread contrôlant REST. Tout le code principal du projet sera situé dans le référentiel, ici je n'indiquerai que quelques coupures. Eh bien, qui est intéressé, s'il vous plaît, sous chat.



Outils



Commençons par le bloc de dépendance par défaut. Fondamentalement, nous aurons besoin de projets à démarrage à ressort - pour l'idéologie REST de gestion des flux / composants, intégration à ressort - pour créer notre cas basé sur des canaux et des adaptateurs.



Et nous pensons immédiatement à ce dont nous avons besoin pour reproduire le cas. En plus des dépendances principales, nous avons besoin de sous-projets - integration-http, integration-jdbc, integration-groovy (fournit des transformateurs de données dynamiquement personnalisables basés sur des scripts Goovy). Séparément, je dirai que dans cet exemple, nous n'utiliserons pas le convertisseur groovy comme inutile, mais nous offrirons la possibilité de le personnaliser de l'extérieur.



Liste des dépendances
 <!-- Spring block -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-commons</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-groovy</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-http</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-jdbc</artifactId>
        </dependency>

        <!-- Db block -->
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
        </dependency>

        <dependency>
            <groupId>com.zaxxer</groupId>
            <artifactId>HikariCP</artifactId>
        </dependency>

        <!-- Utility block -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>

        <dependency>
            <groupId>org.reflections</groupId>
            <artifactId>reflections</artifactId>
            <version>0.9.12</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.4</version>
            <scope>provided</scope>
        </dependency>




Cuisine interne



Passons à la création des composants système nécessaires (wrappers / modèles). Nous aurons besoin de modèles de canal, bean, httpInboundGateway, handler, jdbcOutboundGateway et résultat.



  • bean - un objet d'aide nécessaire pour les adaptateurs, le fil
  • channel - canal pour livrer des messages vers / depuis les composants de flux
  • httpInboundGateway - point d'accès http auquel nous enverrons ensuite une demande avec des données pour un traitement ultérieur
  • handler - un type générique de handler (transformateurs à rainure, divers adaptateurs, etc.)
  • jdbcOutboundGateway - adaptateur jdbc
  • result - gestionnaire pour envoyer des informations à un canal spécifique


Nous aurons besoin de wrappers pour stocker les paramètres et initialiser correctement les composants d'un flux entier, donc nous créons immédiatement un magasin de composants, add. fonctionnalité des convertisseurs JSON -> Modèle de définition. La cartographie directe des champs à l'aide de jackson et d'objets dans mon cas n'était pas applicable - nous avons un vélo de plus pour un protocole de communication spécifique.



Faisons-le bien tout de suite , en utilisant des annotations :



StreamComponent - est responsable de l'identification des classes en tant que modèle de réglage d'un composant de flux et contient des informations de service - le nom du composant, le type du composant, si le composant est imbriqué et sa description;



SettingClass - responsable des options supplémentaires pour l'analyse du modèle, telles que l'analyse des champs de super classe et l'ignorance des champs lors de l'initialisation des valeurs;



SettingValue - responsable d'identifier le champ de classe comme personnalisable de l'extérieur, avec des paramètres de dénomination dans JSON, une description, un convertisseur de type, un indicateur de champ obligatoire et un indicateur d'objet interne à des fins d'information;



Gestionnaire de stockage de composants



Méthodes d'aide pour travailler avec des modèles pour les contrôleurs REST



Modèle de base - une abstraction avec un ensemble de champs auxiliaires / méthodes de modèle



Modèles de configuration de flux actuels



Mapper JSON -> Modèle de définition



Le principal terrain de travail a été préparé. Passons maintenant à la mise en œuvre, directement, des services qui seront responsables du cycle de vie, du stockage et de l'initialisation des flux et nous poserons immédiatement l'idée que nous pouvons paralléliser 1 flux avec la même dénomination dans plusieurs instances, c'est-à-dire nous devrons créer des identifiants uniques (guids) pour tous les composants du flux, sinon des collisions avec d'autres composants singleton (beans, canaux, etc.) peuvent se produire dans le contexte de l'application. Mais commençons par créer des mappeurs de deux composants - http et jdbc, i.e. l'incrémentation des modèles réalisés précédemment aux composants du flux lui-même (HttpRequestHandlerEndpointSpec et JdbcOutboundGateway). Service de gestion central



HttpRegistry



JdbcRegistry



( StreamDeployingService) exécute les fonctions de stockage des travailleurs / inactifs, en enregistre les nouveaux, démarre, arrête et supprime complètement les threads du contexte de l'application. Une caractéristique importante du service est l'introduction de la dépendance IntegrationFlowBuilderRegistry, qui nous aide à rendre l'application dynamique (peut-être rappelez-vous ces fichiers de configuration xml ou classes DSL pour des kilomètres). Selon la spécification du flux, il doit toujours commencer par un composant ou un canal entrant, nous en tenons donc compte dans l'implémentation de la méthode registerStreamContext.



Et le gestionnaire auxiliaire ( IntegrationFlowBuilderRegistry), qui remplit à la fois la fonction de mappeur de modèles vers les composants de flux et d'initialisation du flux lui-même à l'aide d'IntegrationFlowBuilder. J'ai également implémenté un gestionnaire de journal dans le pipeline de flux, un service de collecte de métriques de canal de flux (une option basculable) et une implémentation possible de convertisseurs de messages de flux basés sur l'implémentation Groovy (si soudainement cet exemple devient la base de la vente, alors la précompilation des scripts groovy doit être effectuée au stade de l'initialisation du flux , car exécutez des tests de charge dans la RAM et quel que soit le nombre de cœurs et la puissance dont vous disposez). En fonction de la configuration des étapes de journalisation et des paramètres de niveau de journalisation du modèle, il sera actif après chaque transmission d'un message d'un composant à un autre. La surveillance est activée et désactivée par un paramètre dans application.yml:



monitoring:
  injectction:
    default: true


Maintenant que nous avons tous les mécanismes pour initialiser les flux de traitement de données dynamiques, nous pouvons en plus écrire des mappeurs pour divers protocoles et adaptateurs tels que RabbitMQ, Kafka, Tcp, Ftp, etc. d'autant plus que dans la plupart des cas, vous n'avez pas besoin d'écrire quoi que ce soit de votre propre main (sauf, bien sûr, les modèles de configuration et les méthodes auxiliaires) - un assez grand nombre de composants sont déjà présents dans le référentiel .



La dernière étape sera la mise en place de contrôleurs pour obtenir des informations sur les composants du système existant, le contrôle de flux et la réception de métriques.



ComponentsController - fournit des informations sur tous les composants dans un modèle lisible par l'homme, et un composant par nom et type.



StreamController - assure une gestion complète des flux, à savoir l'initialisation de nouveaux modèles JSON, le démarrage, l'arrêt, la suppression et l'émission de métriques par identifiant.



Produit final



Nous lançons l'application résultante et décrivons le cas de test au format JSON.



Exemple de flux de données
Script d'initialisation de la base de données:



CREATE TABLE IF NOT EXISTS account_data
(
    id          INT                      NOT NULL,
    accountname VARCHAR(45)              NOT NULL,
    password    VARCHAR(128),
    email       VARCHAR(255),
    last_ip     VARCHAR(15) DEFAULT NULL NOT NULL
);

CREATE UNIQUE INDEX account_data_username_uindex
    ON account_data (accountname);

ALTER TABLE account_data
    ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY (
        SEQUENCE NAME account_data_id_seq
            START WITH 1
            INCREMENT BY 1
            NO MINVALUE
            NO MAXVALUE
            CACHE 1
        );

ALTER TABLE account_data
    ADD CONSTRAINT account_data_pk
        PRIMARY KEY (id);

CREATE TABLE IF NOT EXISTS account_info
(
    id             INT NOT NULL,
    banned         BOOLEAN  DEFAULT FALSE,
    premium_points INT      DEFAULT 0,
    premium_type   SMALLINT DEFAULT -1
);

ALTER TABLE account_info
    ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY (
        SEQUENCE NAME account_info_id_seq
            START WITH 1
            INCREMENT BY 1
            NO MINVALUE
            NO MAXVALUE
            CACHE 1
        );

ALTER TABLE account_info
    ADD CONSTRAINT account_info_account_data_id_fk FOREIGN KEY (id) REFERENCES account_data (id)
        ON UPDATE CASCADE ON DELETE CASCADE;

ALTER TABLE account_info
    ADD CONSTRAINT account_info_pk
        PRIMARY KEY (id);



INSERT INTO account_data (accountname, password, email, last_ip)
VALUES ('test', 'test', 'test@test', '127.0.0.1');
INSERT INTO account_info (banned, premium_points, premium_type)
VALUES (false, 1000, 1);


: order — , .. , . ( ). — .



{
  "flowName": "Rest Postgres stream",
  "components": [
    {
      "componentName": "bean",
      "componentType": "other",
      "componentParameters": {
        "id": "pgDataSource",
        "bean-type": "com.zaxxer.hikari.HikariDataSource",
        "property-args": [
          {
            "property-name": "username",
            "property-value": "postgres"
          },
          {
            "property-name": "password",
            "property-value": "postgres"
          },
          {
            "property-name": "jdbcUrl",
            "property-value": "jdbc:postgresql://localhost:5432/test"
          },
          {
            "property-name": "driverClassName",
            "property-value": "org.postgresql.Driver"
          }
        ]
      }
    },
    {
      "componentName": "message-channel",
      "componentType": "source",
      "componentParameters": {
        "id": "jdbcReqChannel",
        "order": 1,
        "channel-type": "direct",
        "max-subscribers": 1000
      }
    },
    {
      "componentName": "message-channel",
      "componentType": "source",
      "componentParameters": {
        "id": "jdbcRepChannel",
        "order": 1,
        "channel-type": "direct"
      }
    },
    {
      "componentName": "http-inbound-gateway",
      "componentType": "source",
      "componentParameters": {
        "order": 2,
        "http-inbound-supported-methods": [
          "POST"
        ],
        "payload-type": "org.genfork.integration.model.request.http.SimpleJdbcPayload",
        "log-stages": true,
        "log-level": "INFO",
        "request-channel": "jdbcReqChannel",
        "reply-channel": "jdbcRepChannel"
      }
    },
    {
      "componentName": "handler",
      "componentType": "processor",
      "componentParameters": {
        "order": 3,
        "handler-definition": {
          "componentName": "jdbc-outbound-adapter",
          "componentType": "app",
          "componentParameters": {
            "data-source": "pgDataSource",
            "query": "SELECT accountname, password, email, last_ip, banned, premium_points, premium_type FROM account_data d INNER JOIN account_info i ON d.id = i.id WHERE d.id = :payload.accountId",
            "update-query": "UPDATE account_info SET banned = true WHERE id = :payload.accountId",
            "jdbc-reply-channel": "jdbcRepChannel",
            "log-stages": true,
            "log-level": "INFO"
          }
        }
      }
    },
    {
      "componentName": "result",
      "componentType": "app",
      "componentParameters": {
        "order": 4,
        "cancel": false,
        "result-channel": "jdbcRepChannel"
      }
    }
  ]
}





Test:



1) Nous initialisons un nouveau flux à l'aide de la méthode



POST / stream / deploy, où notre JSON sera dans le corps de la requête.



En réponse, le système devra envoyer si tout est correct, sinon un message d'erreur sera visible:



{
    "status": "SUCCESS", -  
    "streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b" -  
}


2) Nous initions le démarrage en utilisant la méthode:



GET / stream / 2bf65d9d-97c6-4199-86aa-0c808c25071b / start, où nous indiquons l'identifiant du flux initialisé plus tôt.



En réponse, le système devra envoyer si tout est correct, sinon un message d'erreur sera visible:



{
    "status": "SUCCESS", -  
}


3) Appel d'un flux par un identifiant dans le système? Comment, quoi et où - dans le mappeur du modèle HttpRegistry, j'ai écrit la condition



Http.inboundGateway(localPath != null ? localPath : String.format("/stream/%s/call", uuid))


où, le paramètre http-inbound-path est pris en compte, et s'il n'est pas explicitement spécifié dans la configuration du composant, il est ignoré et le chemin de l'appel système est défini. Dans notre cas, ce sera:



POST / stream / ece4d4ac-3b46-4952-b0a6-8cf334074b99 / call - où l'identifiant du flux est présent, avec le corps de la requête:



{
    "accountId": 1
}


En réponse, nous recevrons, si les étapes de traitement de la demande ont fonctionné correctement, nous recevrons une structure plate d'enregistrements des tables account_data et account_info.



{
    "accountname": "test",
    "password": "test",
    "email": "test@test",
    "last_ip": "127.0.0.1",
    "banned": true,
    "premium_points": 1000,
    "premium_type": 1
}


La spécificité de l'adaptateur JdbcOutboundGateway est telle que si vous spécifiez le paramètre update-query, un gestionnaire supplémentaire est enregistré, qui met d'abord à jour les données, puis uniquement extrait par le paramètre de requête.



Si vous spécifiez les mêmes chemins manuellement, la possibilité de lancer des composants avec HttpInboundGateway comme point d'accès à un flux dans plusieurs instances sera supprimée car le système ne permettra pas d'enregistrer un chemin similaire.



4) Regardons les métriques en utilisant la méthode GET / stream / 2bf65d9d-97c6-4199-86aa-0c808c25071b / metrics



Contenu de la réponse
, / , / / :



[
    {
        "streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b",
        "channelName": "application.Rest Postgres stream_2bf65d9d-97c6-4199-86aa-0c808c25071b_jdbcReqChannel",
        "sendDuration": {
            "count": 1,
            "min": 153.414,
            "max": 153.414,
            "mean": 153.414,
            "standardDeviation": 0.0,
            "countLong": 1
        },
        "maxSendDuration": 153.414,
        "minSendDuration": 153.414,
        "meanSendDuration": 153.414,
        "meanSendRate": 0.001195117818082359,
        "sendCount": 1,
        "sendErrorCount": 0,
        "errorRate": {
            "count": 0,
            "min": 0.0,
            "max": 0.0,
            "mean": 0.0,
            "standardDeviation": 0.0,
            "countLong": 0
        },
        "meanErrorRate": 0.0,
        "meanErrorRatio": 1.1102230246251565E-16
    },
    {
        "streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b",
        "channelName": "application.2bf65d9d-97c6-4199-86aa-0c808c25071b.channel#2",
        "sendDuration": {
            "count": 1,
            "min": 0.1431,
            "max": 0.1431,
            "mean": 0.1431,
            "standardDeviation": 0.0,
            "countLong": 1
        },
        "maxSendDuration": 0.1431,
        "minSendDuration": 0.1431,
        "meanSendDuration": 0.1431,
        "meanSendRate": 0.005382436008121413,
        "sendCount": 1,
        "sendErrorCount": 0,
        "errorRate": {
            "count": 0,
            "min": 0.0,
            "max": 0.0,
            "mean": 0.0,
            "standardDeviation": 0.0,
            "countLong": 0
        },
        "meanErrorRate": 0.0,
        "meanErrorRatio": 0.0
    },
    {
        "streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b",
        "channelName": "application.Rest Postgres stream_2bf65d9d-97c6-4199-86aa-0c808c25071b_jdbcRepChannel",
        "sendDuration": {
            "count": 1,
            "min": 0.0668,
            "max": 0.0668,
            "mean": 0.0668,
            "standardDeviation": 0.0,
            "countLong": 1
        },
        "maxSendDuration": 0.0668,
        "minSendDuration": 0.0668,
        "meanSendDuration": 0.0668,
        "meanSendRate": 0.001195118373693797,
        "sendCount": 1,
        "sendErrorCount": 0,
        "errorRate": {
            "count": 0,
            "min": 0.0,
            "max": 0.0,
            "mean": 0.0,
            "standardDeviation": 0.0,
            "countLong": 0
        },
        "meanErrorRate": 0.0,
        "meanErrorRatio": 1.1102230246251565E-16
    }
]




Conclusion



Ainsi, il a été montré comment, après avoir passé un peu plus de temps et d'efforts, écrire une application pour l'intégration avec divers systèmes que pour écrire des gestionnaires manuels supplémentaires (pipelines) à chaque fois dans votre application pour l'intégration avec d'autres systèmes, 200 à 500 lignes de code chacun.



Dans l'exemple actuel, vous pouvez paralléliser le travail du même type de threads pour plusieurs instances au moyen d'identificateurs uniques, évitant les collisions dans le contexte global de l'application entre les dépendances de thread (bins, canaux, etc.).



De plus, vous pouvez développer le projet:



  • enregistrer les flux dans la base de données;
  • prendre en charge tous les composants d'intégration que nous fournit la communauté Spring and Spring-Integration;
  • faire des ouvriers qui effectueraient le travail avec des threads selon un horaire;
  • créer une interface utilisateur saine pour configurer les flux avec une "souris et des cubes de composants" conditionnels (d'ailleurs, l'exemple a été partiellement affiné pour le projet github.com/spring-cloud/spring-cloud-dataflow-ui ).


Et encore une fois, je dupliquerai le lien vers le référentiel .



All Articles