Bonjour, Habr! J'ai décidé de m'éloigner de Scala, Idris et autres FP pendant un moment et de parler un peu de l'Event Store - une base de données dans laquelle les événements peuvent être enregistrés dans des flux d'événements. Comme dans le bon vieux livre, nous avons aussi Mousquetaires en fait 4 et le quatrième est DDD. Tout d'abord, j'utilise Event Storming pour sélectionner les commandes, les événements et les entités qui leur sont associés. Ensuite, sur leur base, je vais enregistrer l'état de l'objet et le restaurer. Dans cet article, je vais faire une TodoList régulière. Pour plus de détails, bienvenue sous chat.
Contenu
- The Three Musketeers - Event Sourcing, Event Storming et Event Store - Entrez dans la bataille: Partie 1 - Essayer le DB Event Store
Liens
Sources
Images docker image
Event Store
Event Soucing
Event Storming
En fait, le magasin d'événements est une base de données conçue pour stocker des événements. Elle sait également créer des abonnements à des événements afin qu'ils puissent être traités d'une manière ou d'une autre. Il existe également des projections qui réagissent également aux événements et, sur leur base, accumulent certaines données. Par exemple, pendant l'événement TodoCreated, vous pouvez augmenter une sorte de compteur Count dans la projection. Pour l'instant, dans cette partie, j'utiliserai le magasin d'événements comme base de données en lecture et en écriture. Plus loin dans les articles suivants, je créerai une base de données séparée pour la lecture dans laquelle les données seront écrites en fonction des événements stockés dans la base de données pour l'écriture dans le magasin d'événements. Il y aura également un exemple de la façon de faire un "Voyage dans le temps" en ramenant le système à l'état qu'il avait dans le passé.
Et alors commençons Event Stroming. Habituellement, pour sa mise en œuvre, toutes les personnes et experts intéressés sont rassemblés pour dire quels événements dans le domaine que le logiciel va simuler. Par exemple, pour les logiciels d'usine - ProductManufactured. Pour le jeu - Dommages subis. Pour les logiciels financiers - Argent crédité sur le compte et ainsi de suite. Puisque notre domaine est aussi simple que TodoList, nous aurons peu d'événements. Et donc, écrivons les événements de notre domaine (domaine) au tableau.
Ajoutons maintenant les commandes qui déclenchent ces événements.
Ensuite, regroupons ces événements et commandes autour de l'entité avec un changement d'état auquel ils sont associés.
Mes commandes se transformeront simplement en noms de méthodes de service. Passons à la mise en œuvre.
Tout d'abord, décrivons les événements dans le code.
public interface IDomainEvent
{
// . id Event Strore
Guid EventId { get; }
// . Event Store
long EventNumber { get; set; }
}
public sealed class TodoCreated : IDomainEvent
{
//Id Todo
public Guid Id { get; set; }
// Todo
public string Name { get; set; }
public Guid EventId => Id;
public long EventNumber { get; set; }
}
public sealed class TodoRemoved : IDomainEvent
{
public Guid EventId { get; set; }
public long EventNumber { get; set; }
}
public sealed class TodoCompleted: IDomainEvent
{
public Guid EventId { get; set; }
public long EventNumber { get; set; }
}
Maintenant, notre noyau est une entité:
public sealed class Todo : IEntity<TodoId>
{
private readonly List<IDomainEvent> _events;
public static Todo CreateFrom(string name)
{
var id = Guid.NewGuid();
var e = new List<IDomainEvent>(){new TodoCreated()
{
Id = id,
Name = name
}};
return new Todo(new TodoId(id), e, name, false);
}
public static Todo CreateFrom(IEnumerable<IDomainEvent> events)
{
var id = Guid.Empty;
var name = String.Empty;
var completed = false;
var ordered = events.OrderBy(e => e.EventNumber).ToList();
if (ordered.Count == 0)
return null;
foreach (var @event in ordered)
{
switch (@event)
{
case TodoRemoved _:
return null;
case TodoCreated created:
name = created.Name;
id = created.Id;
break;
case TodoCompleted _:
completed = true;
break;
default: break;
}
}
if (id == default)
return null;
return new Todo(new TodoId(id), new List<IDomainEvent>(), name, completed);
}
private Todo(TodoId id, List<IDomainEvent> events, string name, bool isCompleted)
{
Id = id;
_events = events;
Name = name;
IsCompleted = isCompleted;
Validate();
}
public TodoId Id { get; }
public IReadOnlyList<IDomainEvent> Events => _events;
public string Name { get; }
public bool IsCompleted { get; private set; }
public void Complete()
{
if (!IsCompleted)
{
IsCompleted = true;
_events.Add(new TodoCompleted()
{
EventId = Guid.NewGuid()
});
}
}
public void Delete()
{
_events.Add(new TodoRemoved()
{
EventId = Guid.NewGuid()
});
}
private void Validate()
{
if (Events == null)
throw new ApplicationException(" ");
if (string.IsNullOrWhiteSpace(Name))
throw new ApplicationException(" ");
if (Id == default)
throw new ApplicationException(" ");
}
}
Nous nous connectons à l'Event Store:
services.AddSingleton(sp =>
{
// TCP .
// . .
var con = EventStoreConnection.Create(new Uri("tcp://admin:changeit@127.0.0.1:1113"), "TodosConnection");
con.ConnectAsync().Wait();
return con;
});
Et donc, la partie principale. Stockage et lecture des événements à partir du magasin d'événements lui-même:
public sealed class EventsRepository : IEventsRepository
{
private readonly IEventStoreConnection _connection;
public EventsRepository(IEventStoreConnection connection)
{
_connection = connection;
}
public async Task<long> Add(Guid collectionId, IEnumerable<IDomainEvent> events)
{
var eventPayload = events.Select(e => new EventData(
//Id
e.EventId,
//
e.GetType().Name,
// Json (True|False)
true,
//
Encoding.UTF8.GetBytes(JsonSerializer.Serialize((object)e)),
//
Encoding.UTF8.GetBytes((string)e.GetType().FullName)
));
//
var res = await _connection.AppendToStreamAsync(collectionId.ToString(), ExpectedVersion.Any, eventPayload);
return res.NextExpectedVersion;
}
public async Task<List<IDomainEvent>> Get(Guid collectionId)
{
var results = new List<IDomainEvent>();
long start = 0L;
while (true)
{
var events = await _connection.ReadStreamEventsForwardAsync(collectionId.ToString(), start, 4096, false);
if (events.Status != SliceReadStatus.Success)
return results;
results.AddRange(Deserialize(events.Events));
if (events.IsEndOfStream)
return results;
start = events.NextEventNumber;
}
}
public async Task<List<T>> GetAll<T>() where T : IDomainEvent
{
var results = new List<IDomainEvent>();
Position start = Position.Start;
while (true)
{
var events = await _connection.ReadAllEventsForwardAsync(start, 4096, false);
results.AddRange(Deserialize(events.Events.Where(e => e.Event.EventType == typeof(T).Name)));
if (events.IsEndOfStream)
return results.OfType<T>().ToList();
start = events.NextPosition;
}
}
private List<IDomainEvent> Deserialize(IEnumerable<ResolvedEvent> events) =>
events
.Where(e => IsEvent(e.Event.EventType))
.Select(e =>
{
var result = (IDomainEvent)JsonSerializer.Deserialize(e.Event.Data, ToType(e.Event.EventType));
result.EventNumber = e.Event.EventNumber;
return result;
})
.ToList();
private static bool IsEvent(string eventName)
{
return eventName switch
{
nameof(TodoCreated) => true,
nameof(TodoCompleted) => true,
nameof(TodoRemoved) => true,
_ => false
};
}
private static Type ToType(string eventName)
{
return eventName switch
{
nameof(TodoCreated) => typeof(TodoCreated),
nameof(TodoCompleted) => typeof(TodoCompleted),
nameof(TodoRemoved) => typeof(TodoRemoved),
_ => throw new NotImplementedException(eventName)
};
}
}
Le magasin d'entités semble très simple. Nous récupérons les événements d'entité de l'EventStore et les restaurons à partir d'eux, ou nous enregistrons simplement les événements d'entité.
public sealed class TodoRepository : ITodoRepository
{
private readonly IEventsRepository _eventsRepository;
public TodoRepository(IEventsRepository eventsRepository)
{
_eventsRepository = eventsRepository;
}
public Task SaveAsync(Todo entity) => _eventsRepository.Add(entity.Id.Value, entity.Events);
public async Task<Todo> GetAsync(TodoId id)
{
var events = await _eventsRepository.Get(id.Value);
return Todo.CreateFrom(events);
}
public async Task<List<Todo>> GetAllAsync()
{
var events = await _eventsRepository.GetAll<TodoCreated>();
var res = await Task.WhenAll(events.Where(t => t != null).Where(e => e.Id != default).Select(e => GetAsync(new TodoId(e.Id))));
return res.Where(t => t != null).ToList();
}
}
Le service dans lequel s'effectue le travail avec le référentiel et l'entité:
public sealed class TodoService : ITodoService
{
private readonly ITodoRepository _repository;
public TodoService(ITodoRepository repository)
{
_repository = repository;
}
public async Task<TodoId> Create(TodoCreateDto dto)
{
var todo = Todo.CreateFrom(dto.Name);
await _repository.SaveAsync(todo);
return todo.Id;
}
public async Task Complete(TodoId id)
{
var todo = await _repository.GetAsync(id);
todo.Complete();
await _repository.SaveAsync(todo);
}
public async Task Remove(TodoId id)
{
var todo = await _repository.GetAsync(id);
todo.Delete();
await _repository.SaveAsync(todo);
}
public async Task<List<TodoReadDto>> GetAll()
{
var todos = await _repository.GetAllAsync();
return todos.Select(t => new TodoReadDto()
{
Id = t.Id.Value,
Name = t.Name,
IsComplete = t.IsCompleted
}).ToList();
}
public async Task<List<TodoReadDto>> Get(IEnumerable<TodoId> ids)
{
var todos = await Task.WhenAll(ids.Select(i => _repository.GetAsync(i)));
return todos.Where(t => t != null).Select(t => new TodoReadDto()
{
Id = t.Id.Value,
Name = t.Name,
IsComplete = t.IsCompleted
}).ToList();
}
}
Eh bien, en fait, pour l'instant rien d'impressionnant. Dans le prochain article, lorsque j'ajoute une base de données séparée pour la lecture, tout scintillera de différentes couleurs. Cela nous accrochera immédiatement la cohérence dans le temps. Event Store et SQL DB sur une base maître-esclave. Un ES blanc et de nombreux MS SQL noirs à partir desquels lire les données.
Digression lyrique. À la lumière des événements récents, je n'ai pas pu m'empêcher de plaisanter sur le maître esclave et les noirs blancs. Ehe, l'ère s'en va, nous dirons à nos petits-enfants que nous vivions à une époque où les bases lors de la réplication étaient appelées maître et esclave.
Dans les systèmes où il y a beaucoup de lecture et peu d'écriture de données (la plupart d'entre eux), cela augmentera la vitesse de travail. En fait, la réplication du maître esclave lui-même, elle vise le fait que votre écriture ralentisse (ainsi qu'avec les index), mais en retour, la lecture est accélérée en répartissant la charge sur plusieurs bases de données.