Orchestrateur de tâches infinies

Dans cet article, nous expliquerons comment implĂ©menter un orchestrateur de tâches infini Ă  l'aide de files d'attente. Comme objectif ultime, nous devons mettre en Ĺ“uvre un système capable de gĂ©rer les tâches avec une longue durĂ©e de vie, un système distribuĂ©, oĂą un groupe de tâches est hĂ©bergĂ© sur un serveur spĂ©cifique, et si ce serveur Ă©choue, les tâches sont automatiquement redistribuĂ©es vers les libres. 





Dans la plupart des cas, tout dĂ©veloppement d'entreprise se rĂ©sume Ă  remplir les mĂŞmes exigences: une application est crĂ©Ă©e, selon le type d'application, elle a une sorte de cycle de vie, Ă  la fin de la vie de l'application, nous recevons (... ou ne recevons pas) ce que nous voulons. Par application, on peut vouloir dire n'importe quoi, de l'achat en ligne d'un produit, d'un mandat, ou du calcul de la trajectoire d'un missile balistique. Chaque application a son propre mode de vie et il est important de noter  la durĂ©e de vie , et plus cette durĂ©e est courte, mieux c'est. En d'autres termes, plus tĂ´t mon virement bancaire est terminĂ©, mieux c'est. Les exigences sont Ă©galement similaires, plus d'   opĂ©rations RPC par seconde, moins de  latence , le système devrait ĂŞtre tolĂ©rant aux pannes, Ă©volutif et devrait ĂŞtre prĂŞt  hier... Il existe un million d'outils, des centaines de bases de donnĂ©es, diffĂ©rentes approches et modèles. Et tout est Ă©crit depuis longtemps, il suffit d'utiliser correctement les technologies toutes faites dans nos projets. 





Le sujet de l'orchestration des tâches n'est pas nouveau, mais Ă  ma grande surprise, il n'y a tout simplement pas de solutions toutes faites pour gĂ©rer des tâches infinies (dont la durĂ©e de vie est illimitĂ©e), avec la possibilitĂ© de redistribuer les tâches entre les serveurs actifs. Par consĂ©quent, nous mettrons en Ĺ“uvre notre propre solution. Mais tout d'abord…. 






, . — (Job), , . . , “”, . : , . , , , . “”-  WebSocket ,  connected. , , , , . , “”  Observer  , , . 





, , .     : 





  • , , . 





  • , , . 





  • , , . , , , , . 





  • /, , ( , RAM ..), . 





: N , .  , , . 





3 . #, . , C# .Net. 





  1.  Task. .  Task  “”.  





  2. Schedulers. , . , , . 





  3. , , . ,    .  RabbitMq,  Framework - MassTransit, . . 





 Task 

 Task. , ( , ). 





. ,   “Hello Word”   : 





public async Task SendEmailAsync(Email email, CancellationToken token) 
{ 
	   //   
}
      
      



, ,  await   SendEmailAsync. 





foreach (var email in emails 
{ 
   if(token.IsCancellationRequested) 
        break; 
    _emailSender.SendEmailAsync(email, token); // await 
} 
      
      



:









  • FireAndForget   Exception  . 









  • , ,    .  





 await- ,    async/await  .





 email, ,  CancellationToken. , , , , .  RetryPolicy  , ?! , .





Schedulers

.NET , . 





  • HangFire 





  • Quartz.net 





  .  , ( , — , ,  instance )  /Tasks.  Hangfire, - UI, , . .





,  Hangfire.  BackgroundJob.Enqueue(Expression<Action> methodCall). 





var jobIds = new List<string>(); 
foreach (var email in emails) 
{ 
   if(token.IsCancellationRequested) 
      break; 
   jobIds.Add(BackgroundJob.Enqueue( 
      async () => await _emailSender.SendEmailAsync(email, token))); 
} 
      
      



, , .  RetryPolicy , . , , . 





, . , “” :





_observer.DoWork(observerArg, new CancellationToken())







- , .   BackgroundJobClient





var client = new BackgroundJobClient(JobStorage.Current);

//  ,    .
var state = new EnqueuedState(“unique-queue-name”); 
client.Create(() =>_observer.DoWork(observerArg, new CancellationToken()), state);
      
      



, . - unique-queue-name





//  instance hangfire . 
_server = new BackgroundJobServer(new BackgroundJobServerOptions() 
{   
    WorkerCount = 10, 
    Queues = new[] { “unique-queue-name” }, 
    ServerName = _serverOptions.ServerName 
}); 
      
      



WorkerCount - , . , . 





, , . : . , .  Hangfire  , , . 





_monitoringApi = JobStorage.Current.GetMonitoringApi(); 
      
      







Observer-service  - , , ( HangFilre  WorkerCount ). 





Observer-manager - , ... .  , , . . 





Scheduler common db â€“ -  , Hangfire   MsSql,  PostgreSql   Redis.





—    . “”.  





, , , , , , . 





, , . , .    Hangfire. :





1)   . , , . 





2)  . . , , , . 





3)  .  custom-id, . - . 





4)  , “default” . , , .  job-filters   . , . 





5)  , , , .  , , ,  framework  .  





6)  ,  Hangfire   MsSql,  Redis, . 





, .  , , , , . 





, ,

, , , . . ? , — , , . , ? , . , , . , . 





? “”.  - PrefetchCount  .  





  •  Ready. 





  •  Conumer  ,  Unacked.  Consumer  .  





  • , _Error .  





  •  acknowledged,  Consumer.  





- PrefetchCount  , ( ),  WorkerCount,  Hangfire. 









 Observer-services, . PrefetchCount  1



. , . , ,  Unacked. 





"”, : 





 Observer-services  , ,  Round-robin





  • msg1  .  , “Observer 1”.  Unacked  ,  . 





  • msg2  . “Observer 1”  ,  ,  â€śObserver 2”. 





, “Observer-service 1”  , ( - “ ... ?”). 





 , ,  acknowledgement   Unacked  Ready.  . , , . 





-  , . _Error,  RetryPolicy. ,    .  

 RetryPolicy  : 





  • 1000 . 





  • 5 1,4,10...  . 





  •  int.MaxValue . 





? “”, /.  PrefetchCount, 10, 10 , .    - , ,  . , 10 , 5 “”, , ,    11- , .





 ? ? , , ... ?! , , "" ,  CancellationToken.  





 Manager. . , , . , . , , : 





  • Id () - Guid  . 





  • Name (), , , . 





  • CreatedAt/ModifyAt ( / ). 





  • WorkersCount,  PrefetchCount - , . 





Manager  . 





Id





Name





WorkerCount





CreatedAt





ModifyAt





IsDeleted





{Unique id} 





Observer service 1





10





{some date}





null





false





{Unique id} 





Observer service 2





10





{some date}





null





false





{Unique id} 





Observer service 3





10





{some date}





null





false





 . , , 3 - . 





,  ,   , N .  IsDeleted=true





, (Kill â€“9, ). ,  Docker. , , . “”, , , . , - …. 





“” API. ( , “State queue” ). “” , , , , - .





, , “”. , , , , . 





,  ,  “”  Created.





Id





Name





CreatedAt





ModifyAt





ServiceId





Status





{Observer id} 





My_new_observer





{created date}





null





null





Created





, , ,  Processing  .  





Id





Name





CreatedAt





ModifyAt





ServiceId





Status





{Observer id} 





My_new_observer





{created date}





{modify date} 





{Observer service 1 id} 





Processing 





“” . 









  • Created  





  • Processing 





  • OnDeleting





  • Deleted 





"", : 





1) ,  CancellationToken. 





2) ,  FanOut. ,  “” , . 





, — , ... “ ”.  





 Observer-service  , . , “”  CancellationToken. “” . 





“” . ,  id . , .  





  •  Created, “” . - , “”. 





  •  OnDeleting  Deleted, - “” , . 





  •  Processing, “”  OnDeleting  .    . , “”,  CancellationToken  “state queue”. ,  OnDeleting  Deleted





Id





Name





CreatedAt





ModifyAt





ServiceId





Status





{Observer id} 





My_new_observer 





{created date}





{modify date} 





{Observer service 1 id} 





Deleted 





: 





1)





  ,  . , - MsSql, RabbitMq, Kafka,  Kubernetes  , , SLA . , . - , .  





2)  blackout, . 





, - , , , , , “”, . “”, .  ( , .) 





3)





, "”, . "”, , . 





4) . , "”. 





. - , , . 





5) “”, , . 





, , “” . . . , , , , , . 





, . , , - Unacked, - Ready. , ,  polling , . - "”, . , , ,  PrefetchCount. , , .








All Articles