L'audit chez Dodo n'est pas sur papier: l'auditeur dispose d'une tablette où l'auditeur note tous les produits et crée des rapports. Mais jusqu'en 2020, la révision des pizzerias était effectuée précisément sur des morceaux de papier - simplement parce que c'était plus facile et plus facile ainsi. Ceci, bien sûr, a conduit à des données inexactes, des erreurs et des pertes - les gens font des erreurs, des morceaux de papier sont perdus et il y en a bien d'autres. Nous avons décidé de résoudre ce problème et d'améliorer la manière de la tablette. L'implémentation a décidé d'utiliser DDD. Comment nous l'avons fait, nous vous en dirons plus.
Tout d'abord, brièvement sur les processus métier afin de comprendre le contexte. Examinons le schéma du mouvement des produits et où se trouvent les révisions, puis passons aux détails techniques, qui seront nombreux.
Schéma du mouvement des produits et pourquoi un audit est nécessaire
Il y a plus de 600 pizzerias dans notre réseau (et ce nombre continuera d'augmenter). Chaque jour, il y a un mouvement de matières premières dans chacune d'entre elles: de la préparation et de la vente des produits, de la radiation des ingrédients par date d'expiration, au mouvement des matières premières vers d'autres pizzerias de la chaîne. Le solde de la pizzeria contient constamment environ 120 articles nécessaires à la production de produits, et en plus beaucoup de consommables, de produits ménagers et de produits chimiques pour garder la pizzeria propre. Tout cela nécessite une «comptabilité» pour savoir quelles matières premières sont en abondance et lesquelles font défaut.
«Comptabilité» décrit tout mouvement de matières premières dans les pizzerias. La livraison est un plus sur le bilan et la radiation est un moins. Par exemple, lorsque nous commandons une pizza, le caissier accepte la commande et l'envoie pour traitement. La pâte est ensuite étalée et farcie avec des ingrédients tels que du fromage, de la sauce tomate et du pepperoni. Tous ces produits entrent en production - sont radiés. En outre, une radiation peut se produire lorsque la date d'expiration se termine.
À la suite des livraisons et des radiations, des «soldes d'entrepôt» sont formés. Il s'agit d'un rapport qui reflète la quantité de matières premières figurant au bilan en fonction des opérations du système d'information. Tout cela est le «solde de règlement». Mais il y a une «valeur réelle» - la quantité de matière première actuellement en stock.
Révisions
Pour calculer la valeur réelle, des «révisions» sont utilisées (elles sont également appelées «inventaires»).
Les audits permettent de calculer avec précision la quantité de matières premières à acheter. Trop d'achats gèleront le fonds de roulement et le risque de radiation de produits excédentaires augmentera, ce qui entraînera également des pertes. Non seulement le surplus de matières premières est dangereux, mais aussi sa pénurie - cela peut entraîner un arrêt de la production de certains produits, ce qui entraînera une diminution des revenus. Les audits aident à voir combien de bénéfices une entreprise perd en raison de pertes enregistrées et non comptabilisées de matières premières, et à réduire les coûts.
Les révisions partagent leurs données en tenant dûment compte du traitement ultérieur, par exemple, des rapports de construction.
Problèmes dans le processus de révision ou Ancienneté des révisions
Les révisions sont un processus laborieux. Il prend beaucoup de temps et se compose de plusieurs étapes: comptage et fixation des restes de matières premières, synthèse des résultats des matières premières par zones de stockage, saisie des résultats dans le système d'information Dodo IS.
Auparavant, les audits étaient effectués avec un stylo et un formulaire papier, sur lequel figurait une liste de matières premières. Lors de la synthèse, de la conciliation et du transfert manuels des résultats vers Dodo IS, il existe un risque d'erreur. Dans un audit complet, plus de 100 articles de matières premières sont comptés, et le calcul lui-même est souvent effectué en fin de soirée ou tôt le matin, dont la concentration peut souffrir.
Comment résoudre le problème
Notre équipe Game of Threads développe la comptabilité dans les pizzerias. Nous avons décidé de lancer un projet appelé «tablette de l'auditeur», qui simplifiera l'audit des pizzerias. Nous avons décidé de tout faire dans notre propre système d'information Dodo IS, dans lequel les principaux composants de la comptabilité sont implémentés, nous n'avons donc pas besoin d'intégrations avec des systèmes tiers. De plus, tous les pays de notre présence pourront utiliser l'outil sans recourir à des intégrations supplémentaires.
Avant même de commencer à travailler sur le projet, nous, dans l'équipe, avons discuté du désir d'appliquer le DDD dans la pratique. Heureusement, l'un des projets a déjà appliqué avec succès cette approche, nous avons donc eu un exemple que vous pouvez regarder - c'est le projet « caisse ».
Dans cet article, je parlerai des modèles DDD tactiques que nous avons utilisés dans le développement: agrégats, commandes, événements de domaine, service d'application et intégration de contextes limités. Nous ne décrirons pas les schémas stratégiques et les fondamentaux de DDD, sinon l'article sera très long. Nous en avons déjà parlé dans l'article « Que pouvez-vous apprendre sur Domain Driven Design en 10 minutes? "
Nouvelle version des révisions
Avant de commencer l'audit, vous devez savoir exactement ce qu'il faut compter. Pour cela, nous avons besoin de modèles de révision . Ils sont configurés par le rôle "office manager". Le modèle de révision est une entité InventoryTemplate. Il contient les champs suivants:
- identifiant du modèle;
- ID de la pizzeria;
- nom du modèle;
- catégorie de révision: mensuelle, hebdomadaire, quotidienne;
- unités;
- zones de stockage et matières premières dans cette zone de stockage
Pour cette entité, une fonctionnalité CRUD a été implémentée et nous ne nous attarderons pas dessus en détail.
Une fois que l'auditeur a une liste de modèles, il peut démarrer l' audit . Cela se produit généralement lorsque la pizzeria est fermée. En ce moment, il n'y a pas de commandes et les matières premières ne bougent pas - vous pouvez obtenir de manière fiable des données sur les soldes.
A partir de l'audit, l'auditeur sélectionne une zone, par exemple un réfrigérateur, et va y compter les matières premières. Dans le réfrigérateur, il voit 5 paquets de fromage de 10 kg chacun, entre 10 kg * 5 dans la calculatrice, appuie sur "Entrer plus". Puis il remarque 2 autres packs sur l'étagère du haut, et clique sur "Ajouter". En conséquence, il a 2 mesures - 50 et 20 kg chacune.
Mesurenous appelons la quantité de matières premières saisie par l'inspecteur dans une certaine zone, mais pas nécessairement le total. L'auditeur peut saisir deux mesures d'un kilogramme ou seulement deux kilogrammes en une seule mesure - n'importe quelle combinaison peut l'être. L'essentiel est que l'auditeur lui-même soit clair.
Interface de la calculatrice.
Ainsi, étape par étape, l'auditeur examine toutes les matières premières en 1 à 2 heures, puis termine l'audit.
L'algorithme des actions est assez simple:
- l'auditeur peut démarrer l'audit;
- l'auditeur peut ajouter des mesures dans la révision commencée;
- l'auditeur peut terminer l'audit.
À partir de cet algorithme, les exigences commerciales du système sont formées.
Implémentation de la première version de l'agrégat, commandes et événements du domaine
Tout d'abord, définissons les termes inclus dans l'ensemble de modèles tactiques DDD. Nous y ferons référence dans cet article.
Modèles DDD tactiques
L'agrégat est un cluster d'objets d'entité et de valeur. Les objets d'un cluster sont une seule entité en termes de modification des données. Chaque agrégat a un élément racine à travers lequel les entités et les valeurs sont accessibles. Les unités ne doivent pas être conçues trop grandes. Ils consommeront beaucoup de mémoire et la probabilité d'une transaction réussie diminuera.
La limite d'agrégation est un ensemble d'objets qui doivent être cohérents dans une seule transaction: tous les invariants de ce cluster doivent être respectés.
Les invariants sont des règles métier qui ne peuvent pas être incohérentes.
CommanderEst une sorte d'action sur l'unité. À la suite de cette action, l'état de l'agrégat peut être modifié et un ou plusieurs événements de domaine peuvent être générés.
Un événement de domaine est une notification d'un changement d'état d'un agrégat, nécessaire pour maintenir la cohérence. L'agrégat assure la cohérence transactionnelle: toutes les données doivent être modifiées ici et maintenant. La cohérence qui en résulte garantit la cohérence à long terme - les données changeront, mais pas ici et maintenant, mais après une période indéfinie. Cet intervalle dépend de nombreux facteurs: l'encombrement des files d'attente de messages, la disponibilité des services externes à traiter ces messages, le réseau.
Élément racineEst une entité avec un identifiant global unique. Les éléments enfants ne peuvent avoir une identité locale que dans un agrégat entier. Ils peuvent se référer les uns aux autres et ne peuvent référencer que leur élément racine.
Équipes et événements
Décrivons l'exigence commerciale en équipe. Les commandes ne sont que des DTO avec des champs descriptifs.
La commande "ajouter une mesure" comporte les champs suivants:
- valeur de mesure - la quantité de matières premières dans une certaine unité de mesure, peut être nulle si la mesure a été supprimée;
- version - la mesure peut être modifiée, une version est donc nécessaire;
- identifiant de la matière première;
- unité de mesure: kg / g, l / ml, pièces;
- identifiant de la zone de stockage.
Mesure en ajoutant un code de commande
public sealed class AddMeasurementCommand
{
// ctor
public double? Value { get; }
public int Version { get; }
public UUId MaterialTypeId { get; }
public UUId MeasurementId { get; }
public UnitOfMeasure UnitOfMeasure { get; }
public UUId InventoryZoneId { get; }
}
Nous avons également besoin d'un événement qui résultera de l'exécution de ces commandes. Nous marquons l'événement avec une interface
IPublicInventoryEvent
- nous en aurons besoin pour l'intégration avec les consommateurs externes à l'avenir.
Dans l'événement "mesure" les champs sont les mêmes que dans la commande "Ajouter mesure", sauf que l'événement stocke également l'identifiant de l'unité sur laquelle il s'est produit et sa version.
Code d'événement "gelé"
public class MeasurementEvent : IPublicInventoryEvent
{
public UUId MaterialTypeId { get; set; }
public double? Value { get; set; }
public UUId MeasurementId { get; set; }
public int MeasurementVersion { get; set; }
public UUId AggregateId { get; set; }
public int Version { get; set; }
public UnitOfMeasure UnitOfMeasure { get; set; }
public UUId InventoryZoneId { get; set; }
}
Lorsque nous avons décrit les commandes et les événements, nous pouvons implémenter l'agrégat
Inventory
.
Mise en œuvre de l'agrégat d'inventaire
Inventaire du diagramme d'agrégation UML.
L'approche est la suivante: le début de la révision initie la création de l'agrégat
Inventory
, pour cela nous utilisons la méthode factory Create
et démarrons la révision avec la commande StartInventoryCommand
.
Chaque commande modifie l'état de l'agrégat et enregistre les événements dans la liste
changes
, qui seront envoyés au stockage pour enregistrement. En outre, sur la base de ces changements, des événements pour le monde extérieur seront générés.
Lorsque l'agrégat a
Inventory
été créé, nous pouvons le restaurer pour chaque demande ultérieure de changement d'état.
- Les modifications (
changes
) sont enregistrées depuis la dernière restauration de l'unité.
- L'état est restauré par une méthode
Restore
qui lit tous les événements précédents, triés par version, sur l'instance actuelle de l'agrégatInventory
.
C'est la mise en œuvre de l'idée
Event Sourcing
au sein de l'unité. Nous Event Sourcing
parlerons de la manière de mettre en œuvre l'idée dans le cadre du référentiel un peu plus tard. Il y a une belle illustration du livre de Vaughn Vernon: L'
état de l'unité est restauré en appliquant les événements dans l'ordre dans lequel ils se produisent.
Ensuite, plusieurs mesures sont effectuées par l'équipe
AddMeasurementCommand
. L'audit se termine par une commande FinishInventoryCommand
. L'agrégat valide son état dans des méthodes de mutation pour se conformer à ses invariants.
Il est important de noter que l'unité est
Inventory
entièrement versionnée, ainsi que chaque mesure. Les mesures sont plus difficiles - vous devez résoudre les conflits dans la méthode de gestion des événements When(MeasurementEvent e)
. Dans le code, je montrerai uniquement le traitement de la commande AddMeasurementCommand
.
Code d'inventaire global
public sealed class Inventory : IEquatable<Inventory>
{
private readonly List<IInventoryEvent> _changes = new List<IInventoryEvent>();
private readonly List<InventoryMeasurement> _inventoryMeasurements = new List<InventoryMeasurement>();
internal Inventory(UUId id, int version, UUId unitId, UUId inventoryTemplateId,
UUId startedBy, InventoryState state, DateTime startedAtUtc, DateTime? finishedAtUtc)
: this(id)
{
Version = version;
UnitId = unitId;
InventoryTemplateId = inventoryTemplateId;
StartedBy = startedBy;
State = state;
StartedAtUtc = startedAtUtc;
FinishedAtUtc = finishedAtUtc;
}
private Inventory(UUId id)
{
Id = id;
Version = 0;
State = InventoryState.Unknown;
}
public UUId Id { get; private set; }
public int Version { get; private set; }
public UUId UnitId { get; private set; }
public UUId InventoryTemplateId { get; private set; }
public UUId StartedBy { get; private set; }
public InventoryState State { get; private set; }
public DateTime StartedAtUtc { get; private set; }
public DateTime? FinishedAtUtc { get; private set; }
public ReadOnlyCollection<IInventoryEvent> Changes => _changes.AsReadOnly();
public ReadOnlyCollection<InventoryMeasurement> Measurements => _inventoryMeasurements.AsReadOnly();
public static Inventory Restore(UUId inventoryId, IInventoryEvent[] events)
{
var inventory = new Inventory(inventoryId);
inventory.ReplayEvents(events);
return inventory;
}
public static Inventory Restore(UUId id, int version, UUId unitId, UUId inventoryTemplateId,
UUId startedBy, InventoryState state, DateTime startedAtUtc, DateTime? finishedAtUtc,
InventoryMeasurement[] measurements)
{
var inventory = new Inventory(id, version, unitId, inventoryTemplateId,
startedBy, state, startedAtUtc, finishedAtUtc);
inventory._inventoryMeasurements.AddRange(measurements);
return inventory;
}
public static Inventory Create(UUId inventoryId)
{
if (inventoryId == null)
{
throw new ArgumentNullException(nameof(inventoryId));
}
return new Inventory(inventoryId);
}
public void ReplayEvents(params IInventoryEvent[] events)
{
if (events == null)
{
throw new ArgumentNullException(nameof(events));
}
foreach (var @event in events.OrderBy(e => e.Version))
{
Mutate(@event);
}
}
public void AddMeasurement(AddMeasurementCommand command)
{
if (command == null)
{
throw new ArgumentNullException(nameof(command));
}
Apply(new MeasurementEvent
{
AggregateId = Id,
Version = Version + 1,
UnitId = UnitId,
Value = command.Value,
MeasurementVersion = command.Version,
MaterialTypeId = command.MaterialTypeId,
MeasurementId = command.MeasurementId,
UnitOfMeasure = command.UnitOfMeasure,
InventoryZoneId = command.InventoryZoneId
});
}
private void Apply(IInventoryEvent @event)
{
Mutate(@event);
_changes.Add(@event);
}
private void Mutate(IInventoryEvent @event)
{
When((dynamic) @event);
Version = @event.Version;
}
private void When(MeasurementEvent e)
{
var existMeasurement = _inventoryMeasurements.SingleOrDefault(x => x.MeasurementId == e.MeasurementId);
if (existMeasurement is null)
{
_inventoryMeasurements.Add(new InventoryMeasurement
{
Value = e.Value,
MeasurementId = e.MeasurementId,
MeasurementVersion = e.MeasurementVersion,
PreviousValue = e.PreviousValue,
MaterialTypeId = e.MaterialTypeId,
UserId = e.By,
UnitOfMeasure = e.UnitOfMeasure,
InventoryZoneId = e.InventoryZoneId
});
}
else
{
if (!existMeasurement.Value.HasValue)
{
throw new InventoryInvalidStateException("Change removed measurement");
}
if (existMeasurement.MeasurementVersion == e.MeasurementVersion - 1)
{
existMeasurement.Value = e.Value;
existMeasurement.MeasurementVersion = e.MeasurementVersion;
existMeasurement.UnitOfMeasure = e.UnitOfMeasure;
existMeasurement.InventoryZoneId = e.InventoryZoneId;
}
else if (existMeasurement.MeasurementVersion < e.MeasurementVersion)
{
throw new MeasurementConcurrencyException(Id, e.MeasurementId, e.Value);
}
else if (existMeasurement.MeasurementVersion == e.MeasurementVersion &&
existMeasurement.Value != e.Value)
{
throw new MeasurementConcurrencyException(Id, e.MeasurementId, e.Value);
}
else
{
throw new NotChangeException();
}
}
}
// Equals
// GetHashCode
}
Lorsque l'événement «Mesuré» se produit, la présence d'une mesure existante avec cet identifiant est vérifiée. Si ce n'est pas le cas, une nouvelle mesure est ajoutée.
Si tel est le cas, des vérifications supplémentaires sont nécessaires:
- vous ne pouvez pas modifier une mesure à distance;
- la version entrante doit être plus grande que la précédente.
Si les conditions sont remplies, nous pouvons définir une nouvelle valeur et une nouvelle version pour la mesure existante. Si la version est plus petite, il s'agit d'un conflit. Pour cela, nous lançons une exception
MeasurementConcurrencyException
. Si la version correspond et que les valeurs diffèrent, il s'agit également d'une situation de conflit. Eh bien, si la version et la valeur correspondent à la fois, aucun changement ne s'est produit. De telles situations ne se produisent généralement pas.
L'entité «mesure» contient exactement les mêmes champs que la commande «Ajouter une mesure».
Code d'entité "gelé"
public class InventoryMeasurement
{
public UUId MeasurementId { get; set; }
public UUId MaterialTypeId { get; set; }
public UUId UserId { get; set; }
public double? Value { get; set; }
public int MeasurementVersion { get; set; }
public UnitOfMeasure UnitOfMeasure { get; set; }
public UUId InventoryZoneId { get; set; }
}
L'utilisation de méthodes d'agrégats publiques est bien démontrée par les tests unitaires.
Code de test unitaire "ajout d'une mesure après le début de la révision"
[Fact]
public void WhenAddMeasurementAfterStartInventory_ThenInventoryHaveOneMeasurement()
{
var inventoryId = UUId.NewUUId();
var inventory = Domain.Inventories.Entities.Inventory.Create(inventoryId);
var unitId = UUId.NewUUId();
inventory.StartInventory(Create.StartInventoryCommand()
.WithUnitId(unitId)
.Please());
var materialTypeId = UUId.NewUUId();
var measurementId = UUId.NewUUId();
var measurementVersion = 1;
var value = 500;
var cmd = Create.AddMeasurementCommand()
.WithMaterialTypeId(materialTypeId)
.WithMeasurement(measurementId, measurementVersion)
.WithValue(value)
.Please();
inventory.AddMeasurement(cmd);
inventory.Measurements.Should().BeEquivalentTo(new InventoryMeasurement
{
MaterialTypeId = materialTypeId,
MeasurementId = measurementId,
MeasurementVersion = measurementVersion,
Value = value,
UnitOfMeasure = UnitOfMeasure.Quantity
});
}
Rassembler tout cela: commandes, événements, agrégat d'inventaire
Cycle de vie agrégé de l'inventaire lors de l'exécution de l'inventaire final.
Le diagramme montre le processus de traitement des commandes
FinishInventoryCommand
. Avant le traitement, il est nécessaire de restaurer l'état de l'unité Inventory
au moment de l'exécution de la commande. Pour ce faire, nous chargeons dans la mémoire tous les événements qui ont été exécutés sur cet appareil et les lisons (p. 1).
Au moment de l'achèvement de la révision, nous avons déjà les événements suivants - le début de la révision et l'ajout de trois mesures. Ces événements sont apparus à la suite du traitement des commandes
StartInventoryCommand
et AddMeasurementCommand
, en conséquence. Dans la base de données, chaque ligne de la table contient l'ID de révision, la version et le corps de l'événement lui-même.
A ce stade, nous exécutons la commande
FinishInventoryCommand
(p. 2). Cette commande vérifiera d'abord la validité de l'état actuel de l'unité - que la révision est dans un état InProgress
, puis générera un nouveau changement d'état, en ajoutant un événement FinishInventoryEvent
à la liste changes
(élément 3).
Une fois la commande terminée, toutes les modifications seront enregistrées dans la base de données. En conséquence, une nouvelle ligne avec l'événement
FinishInventoryEvent
et la dernière version de l'unité apparaîtra dans la base de données (p. 4).
Type
Inventory
(révision) - élément agrégé et racine par rapport à leurs entités imbriquées. Ainsi, le type Inventory
définit les limites de l'unité. Les limites d'agrégation incluent une liste d'entités de type Measurement
(mesure) et une liste de tous les événements exécutés sur l'agrégat ( changes
).
Implémentation de l'ensemble de la fonctionnalité
Par fonctionnalités, nous entendons la mise en œuvre d'une exigence métier spécifique. Dans notre exemple, nous considérerons la fonction Ajouter une mesure. Pour implémenter cette fonctionnalité, nous devons comprendre le concept de "service d'application" (
ApplicationService
).
Un service d'application est un client direct du modèle de domaine. Les services d'application garantissent les transactions lors de l'utilisation de la base de données ACID, garantissant que les transitions d'état sont préservées atomiques. En outre, les services d'application répondent également aux problèmes de sécurité.
Nous avons déjà une unité
Inventory
... Pour implémenter l'ensemble de la fonctionnalité, nous utiliserons entièrement le service d'application. Dans celui-ci, vous devez vérifier la présence de toutes les entités connectées, ainsi que les droits d'accès de l'utilisateur. Ce n'est qu'après que toutes les conditions sont remplies, qu'il est possible de sauvegarder l'état actuel de l'unité et d'envoyer des événements au monde extérieur. Pour implémenter un service d'application, nous utilisons MediatR
.
Code de fonction "ajout d'une mesure"
public class AddMeasurementChangeHandler
: IRequestHandler<AddMeasurementChangeRequest, AddMeasurementChangeResponse>
{
// dependencies
// ctor
public async Task<AddMeasurementChangeResponse> Handle(
AddMeasurementChangeRequest request,
CancellationToken ct)
{
var inventory =
await _inventoryRepository.GetAsync(request.AddMeasurementChange.InventoryId, ct);
if (inventory == null)
{
throw new NotFoundException($"Inventory {request.AddMeasurementChange.InventoryId} is not found");
}
var user = await _usersRepository.GetAsync(request.UserId, ct);
if (user == null)
{
throw new SecurityException();
}
var hasPermissions =
await _authPermissionService.HasPermissionsAsync(request.CountryId, request.Token, inventory.UnitId, ct);
if (!hasPermissions)
{
throw new SecurityException();
}
var unit = await _unitRepository.GetAsync(inventory.UnitId, ct);
if (unit == null)
{
throw new InvalidRequestDataException($"Unit {inventory.UnitId} is not found");
}
var unitOfMeasure =
Enum.Parse<UnitOfMeasure>(request.AddMeasurementChange.MaterialTypeUnitOfMeasure);
var addMeasurementCommand = new AddMeasurementCommand(
request.AddMeasurementChange.Value,
request.AddMeasurementChange.Version,
request.AddMeasurementChange.MaterialTypeId,
request.AddMeasurementChange.Id,
unitOfMeasure,
request.AddMeasurementChange.InventoryZoneId);
inventory.AddMeasurement(addMeasurementCommand);
await HandleAsync(inventory, ct);
return new AddMeasurementChangeResponse(request.AddMeasurementChange.Id, user.Id, user.GetName());
}
private async Task HandleAsync(Domain.Inventories.Entities.Inventory inventory, CancellationToken ct)
{
await _inventoryRepository.AppendEventsAsync(inventory.Changes, ct);
try
{
await _localQueueDataService.Publish(inventory.Changes, ct);
}
catch (Exception ex)
{
_logger.LogError(ex, "error occured while handling action");
}
}
}
Sourcing événementiel
Lors de la mise en œuvre, nous avons décidé de choisir l'approche ES pour plusieurs raisons:
- Dodo a des exemples d'utilisation réussie de cette approche.
- ES facilite la compréhension du problème lors d'un incident - toutes les actions de l'utilisateur sont stockées.
- Si vous adoptez l'approche traditionnelle, vous ne pourrez pas passer à ES.
L'idée de mise en œuvre est assez simple - nous ajoutons tous les nouveaux événements apparus à la suite de commandes dans la base de données. Pour restaurer l'agrégat, nous récupérons tous les événements et les lisons sur l'instance. Afin de ne pas avoir un grand lot d'événements à chaque fois, nous supprimons les états tous les N événements et lisons le reste de cet instantané.
ID de magasin d'agrégation d'inventaire
internal sealed class InventoryRepository : IInventoryRepository
{
// dependencies
// ctor
static InventoryRepository()
{
EventTypes = typeof(IEvent)
.Assembly.GetTypes().Where(x => typeof(IEvent).IsAssignableFrom(x))
.ToDictionary(t => t.FullName, x => x);
}
public async Task AppendAsync(IReadOnlyCollection<IEvent> events, CancellationToken ct)
{
using (var session = await _dbSessionFactory.OpenAsync())
{
if (events.Count == 0) return;
try
{
foreach (var @event in events)
{
await session.ExecuteAsync(Sql.AppendEvent,
new
{
@event.AggregateId,
@event.Version,
@event.UnitId,
Type = @event.GetType().FullName,
Data = JsonConvert.SerializeObject(@event),
CreatedDateTimeUtc = DateTime.UtcNow
}, cancellationToken: ct);
}
}
catch (MySqlException e)
when (e.Number == (int) MySqlErrorCode.DuplicateKeyEntry)
{
throw new OptimisticConcurrencyException(events.First().AggregateId, "");
}
}
}
public async Task<Domain.Models.Inventory> GetInventoryAsync(
UUId inventoryId,
CancellationToken ct)
{
var events = await GetEventsAsync(inventoryId, 0, ct);
if (events.Any()) return Domain.Models.Inventory.Restore(inventoryId, events);
return null;
}
private async Task<IEvent[]> GetEventsAsync(
UUId id,
int snapshotVersion,
CancellationToken ct)
{
using (var session = await _dbSessionFactory.OpenAsync())
{
var snapshot = await GetInventorySnapshotAsync(session, inventoryId, ct);
var version = snapshot?.Version ?? 0;
var events = await GetEventsAsync(session, inventoryId, version, ct);
if (snapshot != null)
{
snapshot.ReplayEvents(events);
return snapshot;
}
if (events.Any())
{
return Domain.Inventories.Entities.Inventory.Restore(inventoryId, events);
}
return null;
}
}
private async Task<Inventory> GetInventorySnapshotAsync(
IDbSession session,
UUId id,
CancellationToken ct)
{
var record =
await session.QueryFirstOrDefaultAsync<InventoryRecord>(Sql.GetSnapshot, new {AggregateId = id},
cancellationToken: ct);
return record == null ? null : Map(record);
}
private async Task<IInventoryEvent[]> GetEventsAsync(
IDbSession session,
UUId id,
int snapshotVersion,
CancellationToken ct)
{
var rows = await session.QueryAsync<EventRecord>(Sql.GetEvents,
new
{
AggregateId = id,
Version = snapshotVersion
}, cancellationToken: ct);
return rows.Select(Map).ToArray();
}
private static IEvent Map(EventRecord e)
{
var type = EventTypes[e.Type];
return (IEvent) JsonConvert.DeserializeObject(e.Data, type);
}
}
internal class EventRecord
{
public string Type { get; set; }
public string Data { get; set; }
}
Après plusieurs mois de fonctionnement, nous nous sommes rendu compte que nous n'avons pas un grand besoin de stocker toutes les actions des utilisateurs sur l'instance d'unité. L'entreprise n'utilise en aucun cas ces informations. Cela étant dit, le maintien de cette approche comporte des frais généraux. Après avoir évalué tous les avantages et inconvénients, nous prévoyons de passer de l'approche ES à l'approche traditionnelle - de remplacer le signe
Events
par Inventories
et Measurements
.
Intégration avec des contextes délimités externes
C'est ainsi que le contexte borné interagit
Inventory
avec le monde extérieur.
Interaction du contexte de révision avec d'autres contextes. Le diagramme montre les contextes, les services et leur appartenance les uns aux autres.
Dans le cas de
Auth
, Inventory
et Datacatalog
, il y a un contexte borné pour chaque service. Le monolithe remplit plusieurs fonctions, mais nous ne nous intéressons plus qu'à la fonctionnalité comptable des pizzerias. En plus des révisions, la comptabilité comprend également les mouvements de matières premières dans les pizzerias: entrées, transferts, radiations.
HTTP
Le service
Inventory
interagit avec Auth
HTTP. Tout d'abord, l'utilisateur est confronté à Auth
, ce qui l'invite à choisir l'un des rôles dont il dispose.
- Le système a un rôle «auditeur», que l'utilisateur choisit lors de l'audit.
- .
- .
À la dernière étape, l'utilisateur dispose d'un jeton de
Auth
. Le service de révision doit vérifier ce jeton, il demande donc Auth
une vérification. Auth
vérifiera si la durée de vie du jeton a expiré, s'il appartient au propriétaire ou s'il dispose des droits d'accès nécessaires. Si tout va bien, il Inventory
enregistre les tampons dans les cookies - ID utilisateur, connexion, ID de pizzeria et définit la durée de vie des cookies.
Remarque .
Auth
Nous avons décrit plus en détail le fonctionnement du service dans l'article " Subtilités d'autorisation: un aperçu de la technologie OAuth 2.0 ".
Il
Inventory
interagit avec d' autres services via des files d'attente de messages. La société utilise RabbitMQ comme courtier de messages, ainsi que la liaison ci-dessus - MassTransit.
RMQ: événements de consommation
Service d'annuaire -
Datacatalog
- fournira Inventory
toutes les entités nécessaires: matières premières pour la comptabilité, pays, divisions et pizzerias.
Sans entrer dans les détails de l'infrastructure, je décrirai l'idée de base de la consommation d'événements. Du côté du service d'annuaire, tout est déjà prêt pour la publication d'événements, regardons l'exemple de l'entité matière première.
Code de contrat de l'événement Datacatalog
namespace Dodo.DataCatalog.Contracts.Products.v1
{
public class MaterialType
{
public UUId Id { get; set; }
public int Version { get; set; }
public int CountryId { get; set; }
public UUId DepartmentId { get; set; }
public string Name { get; set; }
public MaterialCategory Category { get; set; }
public UnitOfMeasure BasicUnitOfMeasure { get; set; }
public bool IsRemoved { get; set; }
}
public enum UnitOfMeasure
{
Quantity = 1,
Gram = 5,
Milliliter = 7,
Meter = 8,
}
public enum MaterialCategory
{
Ingredient = 1,
SemiFinishedProduct = 2,
FinishedProduct = 3,
Inventory = 4,
Packaging = 5,
Consumables = 6
}
}
Cet article est publié dans
exchange
. Chaque service peut créer son propre bundle exchange-queue
pour consommer des événements.
Schéma de publication d'un événement et de sa consommation via les primitives RMQ.
En fin de compte, il existe une file d'attente pour chaque entité à laquelle le service peut s'abonner. Il ne reste plus qu'à enregistrer la nouvelle version dans la base de données.
Code consommateur d'événement de Datacatalog
public class MaterialTypeConsumer : IConsumer<Dodo.DataCatalog.Contracts.Products.v1.MaterialType>
{
private readonly IMaterialTypeRepository _materialTypeRepository;
public MaterialTypeConsumer(IMaterialTypeRepository materialTypeRepository)
{
_materialTypeRepository = materialTypeRepository;
}
public async Task Consume(ConsumeContext<Dodo.DataCatalog.Contracts.Products.v1.MaterialType> context)
{
var materialType = new AddMaterialType(context.Message.Id,
context.Message.Name,
(int)context.Message.Category,
(int)context.Message.BasicUnitOfMeasure,
context.Message.CountryId,
context.Message.DepartmentId,
context.Message.IsRemoved,
context.Message.Version);
await _materialTypeRepository.SaveAsync(materialType, context.CancellationToken);
}
}
RMQ: Publication d'événements
La partie comptable du monolithe consomme des données
Inventory
pour prendre en charge le reste de la fonctionnalité qui nécessite des données de révision. Tous les événements dont nous voulons informer les autres services, nous les avons marqués avec l'interface IPublicInventoryEvent
. Lorsqu'un événement de ce type se produit, nous les isolons du changelog ( changes
) et les envoyons à la file d'attente de distribution. Pour cela, deux tableaux sont utilisés publicqueue
et publicqueue_archive
.
Pour garantir la livraison des messages, nous utilisons un modèle que nous appelons généralement "file d'attente locale", c'est-à-dire
Transactional outbox pattern
. L'enregistrement de l'état de l'agrégat Inventory
et l'envoi d'événements à la file d'attente locale se produisent en une seule transaction. Dès que la transaction est validée, nous essayons immédiatement d'envoyer des messages au courtier.
Si le message a été envoyé, il est supprimé de la file d'attente
publicqueue
. Sinon, une tentative d'envoi du message sera effectuée ultérieurement. Les abonnés au monolithe et aux pipelines de données consomment ensuite des messages. La table publicqueue_archive
stocke pour toujours les données pour une redistribution pratique des événements si cela est nécessaire à un moment donné.
Code de publication d'événements sur le courtier de messages
internal sealed class BusDataService : IBusDataService
{
private readonly IPublisherControl _publisherControl;
private readonly IPublicQueueRepository _repository;
private readonly EventMapper _eventMapper;
public BusDataService(
IPublicQueueRepository repository,
IPublisherControl publisherControl,
EventMapper eventMapper)
{
_repository = repository;
_publisherControl = publisherControl;
_eventMapper = eventMapper;
}
public async Task ConsumePublicQueueAsync(int batchEventSize, CancellationToken cancellationToken)
{
var events = await _repository.GetAsync(batchEventSize, cancellationToken);
await Publish(events, cancellationToken);
}
public async Task Publish(IEnumerable<IPublicInventoryEvent> events, CancellationToken ct)
{
foreach (var @event in events)
{
var publicQueueEvent = _eventMapper.Map((dynamic) @event);
await _publisherControl.Publish(publicQueueEvent, ct);
await _repository.DeleteAsync(@event, ct);
}
}
}
Nous envoyons des événements au monolithe pour les rapports. Le rapport de perte et de surplus vous permet de comparer deux révisions l'une avec l'autre. En outre, il existe un rapport important "soldes des entrepôts", qui a déjà été mentionné précédemment.
Pourquoi envoyer des événements au pipeline de données? Tout de même - pour les rapports, mais uniquement sur de nouveaux rails. Auparavant, tous les rapports vivaient dans un monolithe, mais maintenant ils sont supprimés. Il partage deux responsabilités - le stockage et le traitement des données de production et d'analyse: OLTP et OLAP. C'est important à la fois en termes d'infrastructure et de développement.
Conclusion
En suivant les principes et les pratiques de la conception pilotée par domaine, nous avons réussi à créer un système fiable et flexible qui répond aux besoins commerciaux des utilisateurs. Nous avons non seulement un produit décent, mais aussi un bon code facile à modifier. Nous espérons que dans vos projets, il y aura une place pour l'utilisation de la conception pilotée par domaine.
Vous pouvez trouver plus d'informations sur DDD sur notre communauté DDDevotion et sur la chaîne Youtube DDDevotion . Vous pouvez discuter de l'article dans Telegram sur le chat Dodo Engineering .