Surveillance des processus commerciaux Camunda



Salut, Habr.



Je m'appelle Anton et je suis responsable technique chez DomClick . Je crée et maintiens des microservices qui permettent à l'infrastructure DomClick d'échanger des données avec les services internes de Sberbank.



Ceci est la suite d'une série d'articles sur notre expérience d'utilisation du moteur de diagramme de processus métier Camunda . L'article précédent était consacré au développement d'un plugin pour Bitbucket qui vous permet de visualiser les changements dans les schémas BPMN. Aujourd'hui, je vais parler de la surveillance des projets qui utilisent Camunda, de l'utilisation d'outils tiers (dans notre cas, il s'agit de la pile Elasticsearch de Kibana et Grafana ), ainsi que le "natif" de Camunda - Cockpit . Je décrirai les difficultés rencontrées lors de l'utilisation de Cockpit et nos solutions.



Lorsque vous avez beaucoup de microservices, vous voulez tout savoir sur leur travail et leur Ă©tat actuel: plus il y a de surveillance, plus vous vous sentez confiant Ă  la fois dans des situations normales et d'urgence, lors de la libĂ©ration, etc. Nous utilisons la pile Elasticsearch: Kibana et Grafana comme outils de surveillance. Dans Kibana, nous examinons les journaux et dans Grafana - les mĂ©triques. La base de donnĂ©es contient Ă©galement des donnĂ©es historiques sur les processus Camunda. Il semblerait que cela devrait suffire pour comprendre si le service fonctionne normalement, et sinon, pourquoi. Le hic, c'est que vous devez examiner les donnĂ©es Ă  trois endroits diffĂ©rents et qu'elles n'ont pas toujours un lien clair les unes avec les autres. L'analyse et l'analyse d'un incident peuvent prendre du temps. En particulier, pour l'analyse des donnĂ©es de la base de donnĂ©es: Camunda a un schĂ©ma de donnĂ©es loin d'ĂȘtre Ă©vident, il stocke certaines variables sous une forme sĂ©rialisĂ©e. En thĂ©orie,Cockpit, un outil Camunda de surveillance des processus mĂ©tier, peut vous faciliter la tĂąche.





Interface du cockpit.



Le problĂšme principal est que Cockpit ne peut pas fonctionner avec une URL personnalisĂ©e. Il y a beaucoup de demandes Ă  ce sujet sur leur forum, mais jusqu'Ă  prĂ©sent, il n'y a pas de telles fonctionnalitĂ©s prĂȘtes Ă  l'emploi. La seule issue est de le faire vous-mĂȘme. Cockpit dispose d'une configuration automatique Sring Boot CamundaBpmWebappAutoConfiguration



, vous devez donc le remplacer par le vÎtre. Nous nous intéressons au CamundaBpmWebappInitializer



bean principal qui initialise les filtres Web et les servlets du Cockpit.



Nous devons transmettre au filtre principal ( LazyProcessEnginesFilter



) des informations sur l'URL Ă  laquelle il fonctionnera, ainsi que des ResourceLoadingProcessEnginesFilter



informations sur l'URL Ă  laquelle il servira les ressources JS et CSS.



Pour ce faire, dans notre implémentation, CamundaBpmWebappInitializer



changez la ligne:



registerFilter("Engines Filter", LazyProcessEnginesFilter::class.java, "/api/*", "/app/*")

      
      





sur:



registerFilter("Engines Filter", CustomLazyProcessEnginesFilter::class.java, singletonMap("servicePath", servicePath), *urlPatterns)

      
      





servicePath



Est notre URL personnalisĂ©e. Dans le mĂȘme nous indiquons CustomLazyProcessEnginesFilter



notre mise en Ɠuvre ResourceLoadingProcessEnginesFilter



:



class CustomLazyProcessEnginesFilter:
       LazyDelegateFilter<ResourceLoaderDependingFilter>
       (CustomResourceLoadingProcessEnginesFilter::class.java)

      
      





En CustomResourceLoadingProcessEnginesFilter



plus servicePath



de tous les liens vers des ressources que nous prévoyons de donner cÎté client:



override fun replacePlaceholder(
       data: String,
       appName: String,
       engineName: String,
       contextPath: String,
       request: HttpServletRequest,
       response: HttpServletResponse
) = data.replace(APP_ROOT_PLACEHOLDER, "$contextPath$servicePath")
           .replace(BASE_PLACEHOLDER,
                   String.format("%s$servicePath/app/%s/%s/", 
contextPath, appName, engineName))
           .replace(PLUGIN_PACKAGES_PLACEHOLDER,
                   createPluginPackagesString(appName, contextPath))
           .replace(PLUGIN_DEPENDENCIES_PLACEHOLDER,
                   createPluginDependenciesString(appName))

      
      





Nous pouvons maintenant dire Ă  notre Cockpit Ă  quelle URL il doit Ă©couter les requĂȘtes et fournir des ressources.



Mais ça ne peut pas ĂȘtre aussi simple, n'est-ce pas? Dans notre cas, Cockpit n'est pas en mesure de fonctionner immĂ©diatement sur plusieurs instances de l'application (par exemple, dans les pods Kubernetes), car au lieu d'OAuth2 et de JWT, le bon vieux jsessionid est utilisĂ©, qui est stockĂ© dans le cache local. Cela signifie que si vous essayez de vous connecter Ă  Cockpit connectĂ© Ă  Camunda, lancĂ© dans plusieurs instances Ă  la fois, ayant le mĂȘme jsessionid Ă©mis, alors Ă  chaque demande de ressources du client, vous pouvez obtenir une erreur 401 avec probabilitĂ© x, oĂč x = (1 - 1 / number_pods). Que peux-tu y faire? Le cockpit a le mĂȘme CamundaBpmWebappInitializer



votre filtre d'authentification est dĂ©clarĂ©, dans lequel tout le travail avec les jetons a lieu; vous devez le remplacer par le vĂŽtre. Dans celui-ci, nous prenons jsessionid du cache de session, l'enregistrons dans la base de donnĂ©es s'il s'agit d'une demande d'autorisation ou vĂ©rifions sa validitĂ© par rapport Ă  la base de donnĂ©es dans d'autres cas. C'est fait, nous pouvons maintenant surveiller les incidents par processus mĂ©tier via l'interface graphique pratique du Cockpit, oĂč vous pouvez immĂ©diatement voir les erreurs et les variables de stacktrace que le processus avait au moment de l'incident.



Et dans les cas oĂč la cause de l'incident est claire Ă  partir du stacktrace de l'exception, Cockpit vous permet de rĂ©duire le temps d'analyse de l'incident Ă  3-5 minutes: je suis entrĂ©, j'ai regardĂ© les incidents dans le processus, regardĂ© le stacktrace, les variables, et voilĂ  - l'incident a Ă©tĂ© rĂ©glĂ©, nous avons mis un bogue dans JIRA et a continuĂ©. Mais que se passe-t-il si la situation est un peu plus compliquĂ©e, que le stacktrace n'est qu'une consĂ©quence d'une erreur antĂ©rieure, ou si le processus s'est terminĂ© sans crĂ©er d'incident du tout (c'est-Ă -dire que techniquement tout s'est bien passĂ©, mais, du point de vue de la logique mĂ©tier, les mauvaises donnĂ©es ont Ă©tĂ© transfĂ©rĂ©es, ou le processus est allĂ© dans la mauvaise branche schĂšme). Dans ce cas, vous devez retourner Ă  Kibana, consulter les journaux et essayer de les connecter aux processus Camunda, ce qui prend encore beaucoup de temps. Bien sĂ»r, vous pouvez ajouter l'UUID du processus en cours et l'ID de l'Ă©lĂ©ment de schĂ©ma BPMN actuel (activityId) Ă  chaque journal, mais cela nĂ©cessite beaucoup de travail manuel,encombre la base de code, complique la rĂ©vision du code. L'ensemble de ce processus peut ĂȘtre automatisĂ©.



Le projet Sleuth permet de tracer les journaux avec un identifiant unique (dans notre cas, l'UUID du processus). La configuration du contexte Sleuth est décrite en détail dans la documentation, ici je vais seulement vous montrer comment le démarrer dans Camunda.



Tout d'abord, vous devez vous inscrire customPreBPMNParseListeners



auprĂšs de la processEngine



Camunda actuelle . Dans l'écouteur, remplacez les méthodes parseStartEvent



(ajoutez un écouteur à l'événement de début du processus de niveau supérieur) et parseServiceTask



(ajoutez un écouteur à l'événement de début ServiceTask



).



Dans le premier cas, nous créons un contexte Sleuth:



customContext[X_B_3_TRACE_ID] = businessKey
customContext[X_B_3_SPAN_ID] = businessKeyHalf
customContext[X_B_3_PARENT_SPAN_ID] = businessKeyHalf
customContext[X_B_3_SAMPLED] = "0" 
val contextFlags: TraceContextOrSamplingFlags = tracing.propagation()
       .extractor(OrcGetter())
       .extract(customContext)
val newSpan: Span = tracing.tracer().nextSpan(contextFlags)
tracing.currentTraceContext().newScope(newSpan.context())

      
      





... et enregistrez-le dans une variable de processus métier:



execution.setVariable(TRACING_CONTEXT, sleuthService.tracingContextHeaders)

      
      





Dans le second cas, on le restaure Ă  partir de cette variable:



val storedContext = execution
       .getVariableTyped<ObjectValue>(TRACING_CONTEXT)
       .getValue(HashMap::class.java) as HashMap<String?, String?>
val contextFlags: TraceContextOrSamplingFlags = tracing.propagation()
       .extractor(OrcGetter())
       .extract(storedContext)
val newSpan: Span = tracing.tracer().nextSpan(contextFlags)
tracing.currentTraceContext().newScope(newSpan.context())

      
      





Nous devons tracer les journaux avec des paramÚtres supplémentaires tels que activityId



(ID de l'élément BPMN actuel), activityName



(son nom commercial) et scenarioId



(ID du diagramme de processus métier). Cette fonctionnalité n'est apparue qu'avec la sortie de Sleuth 3.



Pour chaque paramÚtre, vous devez déclarer BaggageField



:



companion object {
   val HEADER_BUSINESS_KEY = BaggageField.create("HEADER_BUSINESS_KEY")
   val HEADER_SCENARIO_ID = BaggageField.create("HEADER_SCENARIO_ID")
   val HEADER_ACTIVITY_NAME = BaggageField.create("HEADER_ACTIVITY_NAME")
   val HEADER_ACTIVITY_ID = BaggageField.create("HEADER_ACTIVITY_ID")
}

      
      





Déclarez ensuite trois beans pour gérer ces champs:



@Bean
open fun propagateBusinessProcessLocally(): BaggagePropagationCustomizer =
       BaggagePropagationCustomizer { fb ->
           fb.add(SingleBaggageField.local(HEADER_BUSINESS_KEY))
           fb.add(SingleBaggageField.local(HEADER_SCENARIO_ID))
           fb.add(SingleBaggageField.local(HEADER_ACTIVITY_NAME))
           fb.add(SingleBaggageField.local(HEADER_ACTIVITY_ID))
       }

/** [BaggageField.updateValue] now flushes to MDC  */
@Bean
open fun flushBusinessProcessToMDCOnUpdate(): CorrelationScopeCustomizer =
       CorrelationScopeCustomizer { builder ->
           builder.add(SingleCorrelationField.newBuilder(HEADER_BUSINESS_KEY).flushOnUpdate().build())
           builder.add(SingleCorrelationField.newBuilder(HEADER_SCENARIO_ID).flushOnUpdate().build())
           builder.add(SingleCorrelationField.newBuilder(HEADER_ACTIVITY_NAME).flushOnUpdate().build())
           builder.add(SingleCorrelationField.newBuilder(HEADER_ACTIVITY_ID).flushOnUpdate().build())
       }

/** [.BUSINESS_PROCESS] is added as a tag only in the first span.  */
@Bean
open fun tagBusinessProcessOncePerProcess(): SpanHandler =
       object : SpanHandler() {
           override fun end(context: TraceContext, span: MutableSpan, cause: Cause): Boolean {
               if (context.isLocalRoot && cause == Cause.FINISHED) {
                   Tags.BAGGAGE_FIELD.tag(HEADER_BUSINESS_KEY, context, span)
                   Tags.BAGGAGE_FIELD.tag(HEADER_SCENARIO_ID, context, span)
                   Tags.BAGGAGE_FIELD.tag(HEADER_ACTIVITY_NAME, context, span)
                   Tags.BAGGAGE_FIELD.tag(HEADER_ACTIVITY_ID, context, span)
               }
               return true
           }
       }

      
      





Ensuite, nous pouvons enregistrer des champs supplémentaires dans le contexte Sleuth:



HEADER_BUSINESS_KEY.updateValue(businessKey)
HEADER_SCENARIO_ID.updateValue(scenarioId)
HEADER_ACTIVITY_NAME.updateValue(activityName)
HEADER_ACTIVITY_ID.updateValue(activityId)

      
      





Lorsque nous pouvons voir les journaux séparément pour chaque processus métier par sa clé, l'analyse des incidents est beaucoup plus rapide. Certes, vous devez toujours basculer entre Kibana et Cockpit, ce serait les combiner dans une seule interface utilisateur.



Et il y a une telle opportunité. Cockpit prend en charge les extensions personnalisées - plugins, Kibana dispose d'une API Rest et de deux bibliothÚques clientes pour l' utiliser : elasticsearch-rest-low-level-client et elasticsearch-rest-high-level-client .



Le plugin est un projet Maven hérité de l'artefact camunda-release-parent, avec un backend Jax-RS et un frontend AngularJS. Oui, AngularJS, pas Angular.



Le cockpit a détaillé documentation sur la façon d'écrire des plugins pour cela.



Je préciserai seulement que pour afficher les logs sur le frontend, nous nous intéressons au panneau d'onglets sur la page d'informations de définition de processus (cockpit.processDefinition.runtime.tab) et à la page de vue Process Instance (cockpit.processInstance.runtime.tab). Nous enregistrons nos composants pour eux:



ViewsProvider.registerDefaultView('cockpit.processDefinition.runtime.tab', {
   id: 'process-definition-runtime-tab-log',
   priority: 20,
   label: 'Logs',
   url: 'plugin://log-plugin/static/app/components/process-definition/processDefinitionTabView.html'
});

ViewsProvider.registerDefaultView('cockpit.processInstance.runtime.tab', {
   id: 'process-instance-runtime-tab-log',
   priority: 20,
   label: 'Logs',
   url: 'plugin://log-plugin/static/app/components/process-instance/processInstanceTabView.html'
});

      
      





Cockpit a un composant d'interface utilisateur pour afficher les informations sous forme de tableau, cependant, aucune documentation ne le dit, les informations Ă  son sujet et son utilisation ne peuvent ĂȘtre trouvĂ©es qu'en lisant le code source de Cockpit. En bref, l'utilisation du composant ressemble Ă  ceci:



<div cam-searchable-area (1)
    config="searchConfig" (2)
    on-search-change="onSearchChange(query, pages)" (3)
    loading-state="’Loading...’" (4)
    text-empty="Not found"(5)
    storage-group="'ANU'"
    blocked="blocked">
   <div class="col-lg-12 col-md-12 col-sm-12">
       <table class="table table-hover cam-table">
           <thead cam-sortable-table-header (6)
                  default-sort-by="time"
                  default-sort-order="asc" (7)
                  sorting-id="admin-sorting-logs"
                  on-sort-change="onSortChanged(sorting)"
                  on-sort-initialized="onSortInitialized(sorting)" (8)>
           <tr>
               <!-- headers -->
           </tr>
           </thead>
           <tbody>
           <!-- table content -->
           </tbody>
       </table>
   </div>
</div>

      
      





  1. Attribut pour déclarer le composant de recherche.
  2. Configuration des composants. Ici, nous avons la structure suivante:



    tooltips = { //     , 
                       //         
       'inputPlaceholder': 'Add criteria',
       'invalid': 'This search query is not valid',
       'deleteSearch': 'Remove search',
       'type': 'Type',
       'name': 'Property',
       'operator': 'Operator',
       'value': 'Value'
    },
    operators =  { //,   ,    
         'string': [
           {'key': 'eq',  'value': '='},
           {'key': 'like','value': 'like'}
       ]
    },
    types = [// ,     ,    businessKey
       {
           'id': {
               'key': 'businessKey',
               'value': 'Business Key'
           },
           'operators': [
               {'key': 'eq', 'value': '='}
           ],
           enforceString: true
       }
    ]
    
          
          





  3. La fonction de recherche de données est utilisée à la fois lors de la modification des paramÚtres de recherche et lors du téléchargement initial.
  4. Quel message afficher lors du chargement des données.
  5. Quel message afficher si rien n'a été trouvé.
  6. Attribut pour déclarer la table de mappage de données de recherche.
  7. Champ et type de tri par défaut.
  8. Fonctions de tri.


Sur le backend, vous devez configurer le client pour qu'il fonctionne avec l'API Kibana. Pour ce faire, utilisez simplement RestHighLevelClient de la bibliothÚque elasticsearch-rest-high-level-client. Là, spécifiez le chemin d'accÚs à Kibana, les données d'authentification: login et mot de passe, et si le protocole de cryptage est utilisé, vous devez spécifier l'implémentation X509TrustManager appropriée.



Pour former une requĂȘte de recherche, nous l'utilisons QueryBuilders.boolQuery()



, cela vous permet de composer des requĂȘtes complexes de la forme:



val boolQueryBuilder = QueryBuilders.boolQuery();

KibanaConfiguration.ADDITIONAL_QUERY_PARAMS.forEach((key, value) ->
       boolQueryBuilder.filter()
               .add(QueryBuilders.matchPhraseQuery(key, value))
);
if (!StringUtils.isEmpty(businessKey)) {
   boolQueryBuilder.filter()
           .add(QueryBuilders.matchPhraseQuery(KibanaConfiguration.BUSINESS_KEY, businessKey));
}
if (!StringUtils.isEmpty(procDefKey)) {
   boolQueryBuilder.filter()
           .add(QueryBuilders.matchPhraseQuery(KibanaConfiguration.SCENARIO_ID, procDefKey));
}
if (!StringUtils.isEmpty(activityId)) {
   boolQueryBuilder.filter()
           .add(QueryBuilders.matchPhraseQuery(KibanaConfiguration.ACTIVITY_ID, activityId));
}

      
      





Désormais, directement depuis Cockpit, nous pouvons afficher les journaux séparément pour chaque processus et pour chaque activité. Cela ressemble à ceci:





Onglet de visualisation des logs dans l'interface Cockpit.



Mais on ne peut pas s'arrĂȘter lĂ , dans les plans de l'idĂ©e pour le dĂ©veloppement du projet. Tout d'abord, dĂ©veloppez vos capacitĂ©s de recherche. Souvent, au dĂ©but de l'analyse d'un incident, il n'y a pas de processus de clĂ© mĂ©tier disponible, mais il y a des informations sur d'autres paramĂštres clĂ©s, et il serait bien d'ajouter la possibilitĂ© de personnaliser la recherche pour eux. De plus, le tableau dans lequel les informations sur les journaux sont affichĂ©es n'est pas interactif: il n'y a aucun moyen d'accĂ©der Ă  l'instance de processus requise en cliquant dans la ligne correspondante du tableau. Bref, il y a place pour le dĂ©veloppement. (DĂšs que le week-end sera terminĂ©, je publierai un lien vers le Github du projet et j'inviterai toutes les personnes intĂ©ressĂ©es.)



All Articles