Dans cet article, nous expliquerons comment implémenter un orchestrateur de tâches infini à l'aide de files d'attente. Comme objectif ultime, nous devons mettre en œuvre un système capable de gérer les tâches avec une longue durée de vie, un système distribué, où un groupe de tâches est hébergé sur un serveur spécifique, et si ce serveur échoue, les tâches sont automatiquement redistribuées vers les libres.
Dans la plupart des cas, tout développement d'entreprise se résume à remplir les mêmes exigences: une application est créée, selon le type d'application, elle a une sorte de cycle de vie, à la fin de la vie de l'application, nous recevons (... ou ne recevons pas) ce que nous voulons. Par application, on peut vouloir dire n'importe quoi, de l'achat en ligne d'un produit, d'un mandat, ou du calcul de la trajectoire d'un missile balistique. Chaque application a son propre mode de vie et il est important de noter la durée de vie , et plus cette durée est courte, mieux c'est. En d'autres termes, plus tôt mon virement bancaire est terminé, mieux c'est. Les exigences sont également similaires, plus d' opérations RPC par seconde, moins de latence , le système devrait être tolérant aux pannes, évolutif et devrait être prêt hier... Il existe un million d'outils, des centaines de bases de données, différentes approches et modèles. Et tout est écrit depuis longtemps, il suffit d'utiliser correctement les technologies toutes faites dans nos projets.
Le sujet de l'orchestration des tâches n'est pas nouveau, mais à ma grande surprise, il n'y a tout simplement pas de solutions toutes faites pour gérer des tâches infinies (dont la durée de vie est illimitée), avec la possibilité de redistribuer les tâches entre les serveurs actifs. Par conséquent, nous mettrons en œuvre notre propre solution. Mais tout d'abord….
, . — (Job), , . . , “”, . : , . , , , . “”- WebSocket , connected. , , , , . , “” Observer , , .
, , . :
, , .
, , .
, , . , , , , .
/, , ( , RAM ..), .
: N , . , , .
3 . #, . , C# .Net.
Task. . Task “”.
Schedulers. , . , , .
, , . , . RabbitMq, Framework - MassTransit, . .
Task
Task. , ( , ).
. , “Hello Word” :
public async Task SendEmailAsync(Email email, CancellationToken token)
{
//
}
, , await SendEmailAsync.
foreach (var email in emails
{
if(token.IsCancellationRequested)
break;
_emailSender.SendEmailAsync(email, token); // await
}
:
.
FireAndForget Exception .
.
, , .
await- , async/await .
email, , CancellationToken. , , , , . RetryPolicy , ?! , .
Schedulers
.NET , .
. , ( , — , , instance ) /Tasks. Hangfire, - UI, , . .
, Hangfire. BackgroundJob.Enqueue(Expression<Action> methodCall).
var jobIds = new List<string>();
foreach (var email in emails)
{
if(token.IsCancellationRequested)
break;
jobIds.Add(BackgroundJob.Enqueue(
async () => await _emailSender.SendEmailAsync(email, token)));
}
, , . RetryPolicy , . , , .
, . , “” :
_observer.DoWork(observerArg, new CancellationToken())
- , . BackgroundJobClient.
var client = new BackgroundJobClient(JobStorage.Current);
// , .
var state = new EnqueuedState(“unique-queue-name”);
client.Create(() =>_observer.DoWork(observerArg, new CancellationToken()), state);
, . - unique-queue-name.
// instance hangfire .
_server = new BackgroundJobServer(new BackgroundJobServerOptions()
{
WorkerCount = 10,
Queues = new[] { “unique-queue-name” },
ServerName = _serverOptions.ServerName
});
WorkerCount - , . , .
, , . : . , . Hangfire , , .
_monitoringApi = JobStorage.Current.GetMonitoringApi();
:
Observer-service - , , ( HangFilre WorkerCount ).
Observer-manager - , ... . , , . .
Scheduler common db – - , Hangfire MsSql, PostgreSql Redis.
— . “”.
, , , , , , .
, , . , . Hangfire. :
1) . , , .
2) . . , , , .
3) . custom-id, . - .
4) , “default” . , , . job-filters . , .
5) , , , . , , , framework .
, . , , , , .
, ,
, , , . . ? , — , , . , ? , . , , . , .
? “”. - PrefetchCount .
Ready.
Conumer , Unacked. Consumer .
, _Error .
acknowledged, Consumer.
- PrefetchCount , ( ), WorkerCount, Hangfire.
:
Observer-services, . PrefetchCount 1
. , . , , Unacked.
"”, :
Observer-services , , Round-robin.
msg1 . , “Observer 1”. Unacked , .
msg2 . “Observer 1” , , “Observer 2”.
, “Observer-service 1” , ( - “ ... ?”).
, , acknowledgement Unacked Ready. . , , .
- , . _Error, RetryPolicy. , .
RetryPolicy :
1000 .
5 1,4,10... .
int.MaxValue .
? “”, /. PrefetchCount, 10, 10 , . - , , . , 10 , 5 “”, , , 11- , .
? ? , , ... ?! , , "" , CancellationToken.
Manager. . , , . , . , , :
Id () - Guid .
Name (), , , .
CreatedAt/ModifyAt ( / ).
WorkersCount, PrefetchCount - , .
Manager .
Id |
Name |
WorkerCount |
CreatedAt |
ModifyAt |
IsDeleted |
{Unique id} |
Observer service 1 |
10 |
{some date} |
null |
false |
{Unique id} |
Observer service 2 |
10 |
{some date} |
null |
false |
{Unique id} |
Observer service 3 |
10 |
{some date} |
null |
false |
. , , 3 - .
, , , N . IsDeleted=true.
, (Kill –9, ). , Docker. , , . “”, , , . , - ….
“” API. ( , “State queue” ). “” , , , , - .
, , “”. , , , , .
, , “” Created.
Id |
Name |
CreatedAt |
ModifyAt |
ServiceId |
Status |
{Observer id} |
My_new_observer |
{created date} |
null |
null |
Created |
, , , Processing .
Id |
Name |
CreatedAt |
ModifyAt |
ServiceId |
Status |
{Observer id} |
My_new_observer |
{created date} |
{modify date} |
{Observer service 1 id} |
Processing |
“” .
:
Created
Processing
OnDeleting
Deleted
"", :
1) , CancellationToken.
2) , FanOut. , “” , .
, — , ... “ ”.
Observer-service , . , “” CancellationToken. “” .
“” . , id . , .
Created, “” . - , “”.
OnDeleting Deleted, - “” , .
Processing, “” OnDeleting . . , “”, CancellationToken “state queue”. , OnDeleting Deleted.
Id |
Name |
CreatedAt |
ModifyAt |
ServiceId |
Status |
{Observer id} |
My_new_observer |
{created date} |
{modify date} |
{Observer service 1 id} |
Deleted |
:
1) .
, . , - MsSql, RabbitMq, Kafka, Kubernetes , , SLA . , . - , .
2) blackout, .
, - , , , , , “”, . “”, . ( , .)
3) .
, "”, . "”, , .
4) . , "”.
. - , , .
5) “”, , .
, , “” . . . , , , , , .
, . , , - Unacked, - Ready. , , polling , . - "”, . , , , PrefetchCount. , , .