System.Threading.Channels - Producteur-consommateur haute performance et asynchrone sans allocation et plongée de pile

Rebonjour. Il y a quelque temps, j'ai écrit sur un autre outil peu connu des amateurs de haute performance - System.IO.Pipelines . À la base, le System.Threading.Channels considéré (ci-après «canaux») est construit sur des principes similaires à ceux des pipelines, résolvant le même problème - producteur-consommateur. Cependant, il possède une API beaucoup plus simple qui se fondra gracieusement dans tout type de code d'entreprise. En même temps, il utilise l'asynchronie sans allocations et sans pile-plongée, même dans le cas asynchrone! (Pas toujours, mais souvent).







Table des matières







introduction



Le problème Producteur / Consommateur se rencontre assez souvent sur le chemin des programmeurs et depuis plus d'une dizaine d'années. Edsger Dijkstra lui-même a contribué à résoudre ce problème - il a eu l'idée d'utiliser des sémaphores pour synchroniser les fils lors de l'organisation du travail sur une base producteur / consommateur. Et bien que sa solution dans sa forme la plus simple soit connue et plutôt banale, dans le monde réel, ce modèle (producteur / consommateur) peut se produire sous une forme beaucoup plus compliquée. De plus, les normes de programmation modernes laissent leurs marques, le code est écrit de manière plus simplifiée et est décomposé pour une réutilisation ultérieure. Tout est fait pour abaisser le seuil d'écriture du code qualité et simplifier ce processus. Et l'espace de noms en question - System.Threading.Channels - est une autre étape vers cet objectif.



Je regardais System.IO.Pipelines il y a quelque temps. Là, un travail plus attentif et une compréhension approfondie de la question étaient nécessaires, l'étendue et la mémoire étaient utilisées, et pour un travail efficace, il était nécessaire de ne pas appeler de méthodes évidentes (pour éviter des allocations de mémoire inutiles) et de penser constamment en octets. Pour cette raison, l'interface de programmation de Pipeline n'était ni triviale ni intuitive.



Dans System.Threading.Channels, l'utilisateur est présenté avec une API beaucoup plus simple à utiliser. Il convient de mentionner que malgré la simplicité de l'API, cet outil est hautement optimisé et n'allouera probablement pas de mémoire pendant son travail. Cela est peut-être dû au fait que sous le capot ValueTask est utilisé partout , et même dans le cas d'une asynchronie réelle, IValueTaskSource est utilisé, qui est réutilisé pour d'autres opérations. C'est précisément tout l'intérêt de la mise en œuvre des chaînes.



Les canaux sont généralisés, le type de généralisation, comme vous pouvez le deviner, est le type dont les instances seront produites et consommées. Ce qui est intéressant, c'est que l'implémentation de la classe Channel, qui tient sur 1 ligne (source github ):



namespace System.Threading.Channels
{
    public abstract class Channel<T> : Channel<T, T> { }
}


Ainsi, la classe principale de canaux est paramétrée par 2 types - séparément pour le canal producteur et le canal consommateur. Mais pour les canaux implémentés, cela n'est pas utilisé.

Pour ceux qui sont familiers avec Pipelines, l'approche générale de démarrage semblera familière. À savoir. Nous créons 1 classe centrale à partir de laquelle nous retirons séparément les producteurs ( ChannelWriter ) et les consommateurs ( ChannelReader ). Malgré les noms, il convient de rappeler qu'il s'agit exactement du producteur / consommateur, et non du lecteur / écrivain d'une autre tâche multithreading classique du même nom. ChannelReader modifie l'état du canal général (extrait la valeur), qui n'est plus disponible. Donc, il ne lit pas plutôt, mais consomme. Mais nous apprendrons la mise en œuvre plus tard.



Début des travaux. Canal



La prise en main des canaux commence par la classe abstraite Channel <T> et la classe statique Channel , qui crée l'implémentation la plus appropriée. Plus loin de ce canal commun, vous pouvez obtenir un ChannelWriter pour écrire sur le canal et ChannelReader pour la consommation du canal. Un canal est un référentiel d'informations générales pour ChannelWriter et ChannelReader, ce sont donc toutes les données qui y sont stockées. Et déjà la logique de leur enregistrement ou de leur consommation est dispersée dans le ChannelWriter et le ChannelReader. Classiquement, les canaux peuvent être divisés en 2 groupes - illimités et limités. Les premiers sont plus simples dans la mise en œuvre, vous pouvez y écrire sans limite (tant que la mémoire le permet). Les seconds sont limités par une certaine valeur maximale du nombre d'enregistrements.



C'est là que la nature de l'asynchronie est légèrement différente. Dans les canaux illimités, l'opération d'écriture se terminera toujours de manière synchrone, rien n'interrompt l'écriture sur le canal. La situation est différente pour les chaînes limitées. Avec un comportement standard (qui peut être remplacé), l'opération d'écriture se terminera de manière synchrone tant qu'il y aura de la place pour de nouvelles instances dans le canal. Dès que le canal est plein, l'opération d'écriture ne se termine pas tant que l'espace n'est pas libre (après que le consommateur a consommé le consommé). Par conséquent, ici, l'opération sera vraiment asynchrone avec le changement de flux et les changements associés (ou sans changement, qui sera décrit plus loin).



Pour la plupart, le comportement des lecteurs est le même - s'il y a quelque chose dans le canal, alors le lecteur le lit simplement et se termine en synchronisation. S'il n'y a rien, alors il attend que quelqu'un écrive quelque chose.



La classe statique Channel contient 4 méthodes pour créer les canaux ci-dessus:



Channel<T> CreateUnbounded<T>();
Channel<T> CreateUnbounded<T>(UnboundedChannelOptions options);
Channel<T> CreateBounded<T>(int capacity);
Channel<T> CreateBounded<T>(BoundedChannelOptions options);


Si vous le souhaitez, vous pouvez spécifier des options plus précises pour créer un canal qui aidera à l'optimiser pour les besoins spécifiés.



UnboundedChannelOptions contient 3 propriétés dont la valeur par défaut est false:



  1. AllowSynchronousContinuations — , , . -. , . , , , . , , , . , - - , ;
  2. SingleReader — , . , ;
  3. SingleWriter — , ;


BoundedChannelOptions contient les mêmes 3 propriétés et 2 autres en plus



  1. AllowSynchronousContinuations - idem;
  2. SingleReader est le même;
  3. SingleWriter est le même;
  4. Capacité - le nombre d'enregistrements à insérer dans le canal. Ce paramètre est également un paramètre de constructeur;
  5. FullMode - l'énumération BoundedChannelFullMode, qui dispose de 4 options, détermine le comportement lors de la tentative d'écriture sur un canal complet:

    • Attendre - Attend l'espace libre pour terminer l'opération asynchrone
    • DropNewest - un élément enregistrable écrase le plus récent, se termine de manière synchrone
    • DropOldest - un élément enregistrable remplace la plus ancienne des extrémités existantes de manière synchrone
    • DropWrite - l'élément en cours d'écriture n'est pas écrit, il se termine de manière synchrone




En fonction des paramètres passés et la méthode appelée, l' un des 3 mises en œuvre sera créé: SingleConsumerUnboundedChannel , UnboundedChannel , BoundedChannel . Mais ce n'est pas si important, car nous utiliserons le canal via la classe de base Channel <TWrite, TRead>.



Il a 2 propriétés:



  • ChannelReader <TRead> Reader {get; ensemble protégé; }
  • ChannelWriter <TWrite> Writer {get; ensemble protégé; }


Et aussi, 2 opérateurs de type implicite cast en ChannelReader <TRead> et ChannelWriter <TWrite>.



Un exemple de la façon de commencer à travailler avec les canaux:



Channel<int> channel = Channel.CreateUnbounded<int>();
//  
ChannelWriter<int> writer = channel.Writer;
ChannelReader<int> reader = channel.Reader; 
// 
ChannelWriter<int> writer = channel;
ChannelReader<int> reader = channel;


Les données sont stockées dans une file d'attente. Pour 3 types, 3 files d'attente différentes sont utilisées - ConcurrentQueue <T>, Deque <T> et SingleProducerSingleConsumerQueue <T>. À ce stade, il me semblait que j'étais dépassé et que je manquais un tas de nouvelles collections simples. Mais je m'empresse de bouleverser - ils ne sont pas pour tout le monde. Ils sont étiquetés internes, ils ne peuvent donc pas être utilisés. Mais si vous en avez soudainement besoin en vente, vous pouvez les trouver ici (SingleProducerConsumerQueue) et ici (Deque) . La mise en œuvre de ce dernier est très simple. Je vous conseille de vous familiariser, ça s'apprend très vite.



Alors, passons à étudier directement ChannelReader et ChannelWriter, ainsi que des détails d'implémentation intéressants. Ils se résument tous à asynchrones, aucune allocation de mémoire à l'aide de IValueTaskSource.



ChannelReader - consommateur



Lorsqu'un objet consommateur est demandé, l'une des implémentations de la classe abstraite ChannelReader <T> est renvoyée. Encore une fois, contrairement aux pipelines, les API sont simples et il existe peu de méthodes. Il suffit de connaître la liste des méthodes pour comprendre comment l'utiliser dans la pratique.



Méthodes:



  1. Propriété get-only virtuelle Tâche terminée {get; } Un

    objet de type Task qui se termine lorsque le canal est fermé;
  2. Propriété get-only virtuelle int Count {get; }

    Ici, il convient de souligner que le nombre actuel d'objets lisibles est renvoyé;
  3. Propriété get-only virtuelle bool CanCount {get; }

    Indique si la propriété Count est disponible;
  4. bool TryRead(out T item)

    . bool, , . out ( null, );
  5. ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default)

    ValueTask true, , . ValueTask false, ( );
  6. ValueTask<T> ReadAsync(CancellationToken cancellationToken = default)

    . , . .



    , TryRead WaitToReadAsync. ( cancelation tokens), — TryRead. , while(true) WaitToReadAsync. true, , TryRead. TryRead , , . — , WaitToReadAsync, , , .

    , , - .




ChannelWriter - producteur



Tout est similaire au consommateur, alors regardez immédiatement les méthodes:



  1. Méthode virtuelle bool TryComplete (Exception? Error = null)

    Tente de marquer le canal comme terminé, c'est-à-dire montrent qu'aucune donnée ne sera écrite dessus. En tant que paramètre facultatif, vous pouvez lever une exception qui a provoqué l'arrêt du canal. Renvoie vrai s'il a pu se terminer, sinon faux (si le canal est déjà terminé ou ne prend pas en charge l'achèvement);
  2. Méthode abstraite bool TryWrite (élément T) Tente

    d'écrire une valeur dans le canal. Renvoie vrai si réussi et faux sinon.
  3. Méthode abstraite ValueTask <bool> WaitToWriteAsync (CancellationToken cancelToken = default)

    Renvoie une ValueTask avec la valeur true, qui se terminera lorsqu'il y aura une place pour l'enregistrement dans le canal. La valeur sera false si les écritures sur le canal ne sont plus autorisées;
  4. Méthode virtuelle ValueTask WriteAsync (élément T, CancellationToken cancelToken = default)

    Écrit de manière asynchrone sur le canal. Par exemple, si le canal est plein, l'opération sera vraiment asynchrone et ne se terminera qu'après avoir libéré de l'espace pour cet enregistrement;
  5. Méthode void Complete (Exception? Error = null)

    Essaie simplement de marquer le canal comme terminé à l'aide de TryComplete et, en cas d'échec, lance une exception.


Un petit exemple de ce qui précède (pour démarrer facilement vos propres expériences):



Channel<int> unboundedChannel = Channel.CreateUnbounded<int>();

//      ,        
ChannelWriter<int> writer = unboundedChannel;
ChannelReader<int> reader = unboundedChannel;

//     
int objectToWriteInChannel = 555;
await writer.WriteAsync(objectToWriteInChannel);
//  ,     ,   ,  
writer.Complete();

//         
int valueFromChannel = await reader.ReadAsync();


Passons maintenant à la partie la plus intéressante.



Asynchronie sans allocations



En train d'écrire et d'étudier le code, je me suis rendu compte qu'il n'y avait presque rien d'intéressant dans la mise en œuvre de toutes ces opérations. En général, il peut être décrit comme suit - en évitant les verrous inutiles à l'aide de collections simultanées et en utilisant abondamment ValueTask, qui est une structure qui économise de la mémoire. Cependant, je m'empresse de vous rappeler que cela ne vaut pas la peine de remplacer rapidement tous les fichiers de votre PC et de remplacer toutes les tâches par ValueTask. Cela n'a de sens que dans les cas où l'opération se termine dans la plupart des cas de manière synchrone. Après tout, comme nous nous en souvenons, avec l'asynchronie, un changement de flux est très probable, ce qui signifie que la pile ne sera pas la même qu'auparavant. Quoi qu'il en soit, un vrai professionnel de la performance le sait - n'optimisez pas avant que des problèmes surviennent.



Une bonne chose est que je ne m'enregistrerai pas en tant que professionnel, et il est donc temps de découvrir quel est le secret de l'écriture de code asynchrone sans allocation de mémoire, ce qui à première vue semble trop beau pour être vrai. Mais ça arrive.



Interface IValueTaskSource



Commençons notre voyage depuis le début - la structure ValueTask , qui a été ajoutée dans .net core 2.0 et modifiée en 2.1. À l'intérieur de cette structure, il y a un champ d'objet _obj délicat. Il est facile de deviner, sur la base du nom explicite, que l'une des 3 choses peut être cachée dans ce champ - null, Task / Task <T> ou IValueTaskSource. En fait, cela découle de la façon dont la ValueTask est créée.



Comme le fabricant l'assure, cette structure ne doit être utilisée que de manière évidente - avec le mot-clé wait. Autrement dit, vous ne devez pas appliquer wait plusieurs fois à la même ValueTask, utiliser des combinateurs, ajouter plusieurs suites, etc. De plus, vous ne devriez pas obtenir le résultat de ValueTask plus d'une fois. Et cela est dû au fait que nous essayons de comprendre - la réutilisation de tout cela sans allouer de mémoire.



J'ai déjà mentionné l'interface IValueTaskSource . C'est lui qui aide à économiser la mémoire. Cela se fait en réutilisant IValueTaskSource lui-même plusieurs fois pour de nombreuses tâches. Mais c'est précisément à cause de cette réutilisation qu'il n'y a aucun moyen de s'essayer à ValueTask.



Donc, IValueTaskSource. Cette interface dispose de 3 méthodes, en implémentant lesquelles vous économiserez avec succès de la mémoire et du temps sur l'allocation de ces octets précieux.



  1. GetResult - Il est appelé une fois, lorsque dans la machine d'état, formée lors de l'exécution pour les méthodes asynchrones, le résultat est nécessaire. ValueTask a une méthode GetResult, qui appelle la méthode d'interface du même nom, qui, comme nous le rappelons, peut être stockée dans le champ _obj.
  2. GetStatus - Appelé par la machine d'état pour déterminer l'état de l'opération. Également via ValueTask.
  3. OnCompleted - Encore une fois, appelé par la machine à états pour ajouter une continuation à une tâche en suspens à ce moment-là.


Mais malgré l'interface simple, la mise en œuvre nécessitera certaines compétences. Et ici, nous pouvons nous souvenir de ce que nous avons commencé - les canaux . Cette implémentation utilise la classe AsyncOperationqui est une implémentation de IValueTaskSource. Cette classe est cachée derrière le modificateur d'accès interne. Mais cela ne s'arrête pas pour comprendre les mécanismes de base. Cela soulève la question, pourquoi ne pas donner l'implémentation de IValueTaskSource aux masses? La première raison (pour le plaisir) est quand un marteau est en main, les clous sont partout, quand une implémentation IValueTaskSource est entre les mains, il y a un travail illettré avec de la mémoire partout. La deuxième raison (plus plausible) est que si l'interface est simple et polyvalente, l'implémentation réelle est optimale lors de l'utilisation de certaines nuances de l'application. Et probablement pour cette raison, il est possible de trouver des implémentations dans diverses parties du grand et puissant .net, telles que AsyncOperation sous le capot des canaux, AsyncIOOperation dans la nouvelle API socket, etc.

Cependant, pour être honnête, il existe encore une mise en œuvre commune -ManualResetValueTaskSourceCore . Mais c'est déjà trop loin du sujet de l'article.



ComparerExchange



Une méthode assez populaire d'une classe populaire qui évite la surcharge des primitives de synchronisation classiques. Je pense que la plupart le connaissent, mais cela vaut toujours la peine de le décrire en 3 mots, car cette construction est assez souvent utilisée dans AsyncOperation.

Dans la littérature traditionnelle, cette fonction est appelée comparer et échanger (CAS). Dans .net, il est disponible dans la classe Interlocked .



La signature est la suivante:



public static T CompareExchange<T>(ref T location1, T value, T comparand) where T : class;


Il existe également des surcharges avec int, long, float, double, IntPtr, object.



La méthode elle-même est atomique, c'est-à-dire qu'elle est exécutée sans interruption. Compare 2 valeurs et, si elles sont égales, affecte la nouvelle valeur à la variable. Ils résolvent le problème lorsque vous devez vérifier la valeur d'une variable et modifier la variable en fonction de celle-ci.



Disons que vous voulez incrémenter une variable si sa valeur est inférieure à 10.



Ensuite, il y a 2 threads.



Flux 1 Flux 2
Vérifie la valeur d'une variable pour une condition (c'est-à-dire inférieure à 10) qui est déclenchée -
Entre vérifier et modifier une valeur Attribue à une variable une valeur qui ne satisfait pas la condition (par exemple, 15)
Modifie la valeur, bien que ce ne soit pas le cas, car la condition n'est plus remplie -




Lorsque vous utilisez cette méthode, vous changez exactement la valeur souhaitée ou ne changez pas, tout en obtenant la valeur réelle de la variable.



location1 est une variable dont nous voulons changer la valeur. Il est comparé à comparand, en cas d'égalité, la valeur est écrite dans location1. Si l'opération réussit, la méthode renvoie la valeur passée de la variable location1. Sinon, la valeur actuelle de location1 sera renvoyée.

Plus profondément, il existe une instruction en langage assembleur, cmpxchg, qui fait cela. C'est elle qui est utilisée sous le capot.



Plongée en pile



En regardant tout ce code, je suis tombé plusieurs fois sur des références à "Stack Dive". C'est une chose très cool et intéressante qui est en fait très indésirable. L'essentiel est qu'avec l'exécution synchrone des continuations, nous pouvons épuiser les ressources de la pile.



Disons que nous avons 10 000 tâches, avec style



//code1
await ...
//code2


Supposons que la première tâche termine l'exécution et libère ainsi la poursuite de la seconde, que nous commençons immédiatement à exécuter de manière synchrone dans ce thread, c'est-à-dire en prenant un morceau de la pile avec le cadre de cette continuation. À son tour, cette continuation débloquera la poursuite de la troisième tâche, que nous commencerons également à effectuer immédiatement. Etc. S'il n'y a plus d'attente dans la suite ou quelque chose qui vide d'une manière ou d'une autre la pile, alors nous consommerons simplement l'espace de la pile jusqu'au bout. Ce qui pourrait provoquer StackOverflow et le blocage de l'application. Dans la revue de code, je mentionnerai comment AsyncOperation combat cela.



AsyncOperation en tant qu'implémentation IValueTaskSource



Code source .



Dans AsyncOperation, il existe un champ _continuation de type Action <object>. Le champ est utilisé pour, croyez-le ou non, des suites. Mais, comme c'est souvent le cas dans un code trop moderne, les champs ont des responsabilités supplémentaires (comme le garbage collector et le dernier bit de la référence de la table de méthode). Le champ _continuation appartient à la même série. Il y a 2 valeurs spéciales qui peuvent être stockées dans ce champ, en plus de la continuation elle-même et null. s_availableSentinel et s_completedSentinel . Ces champs indiquent que l'opération est disponible et terminée en conséquence. Il est disponible uniquement pour une réutilisation pour une opération complètement asynchrone.



AsyncOperation implémente également IThreadPoolWorkItemavec une seule méthode - void Execute () => SetCompletionAndInvokeContinuation (). La méthode SetCompletionAndInvokeContinuation consiste simplement à effectuer la continuation. Et cette méthode est appelée soit directement dans le code AsyncOperation, soit via l'Execute mentionné. Après tout, les types qui implémentent IThreadPoolWorkItem peuvent être jetés dans le pool de threads comme ceci ThreadPool.UnsafeQueueUserWorkItem (this, preferLocal: false).



La méthode Execute sera exécutée par le pool de threads.



Faire la suite en lui-même est assez trivial.



La suite est copiée dans une variable locale et s_completedSentinel est écrit à sa place- un objet de marionnette artificiel (ou une sentinelle, je ne sais pas comment me dire dans notre discours), qui indique que la tâche est terminée. Et puis une copie locale de la suite réelle est simplement exécutée. Avec un ExecutionContext, ces actions sont publiées dans le contexte. Il n'y a pas de secret ici. Ce code peut être appelé directement par la classe - simplement en appelant une méthode qui encapsule ces actions, ou via l'interface IThreadPoolWorkItem dans le pool de threads. Vous pouvez maintenant deviner comment la fonction fonctionne avec l'exécution de continuations de manière synchrone.



La première méthode de l'interface IValueTaskSource est GetResult ( github ).



C'est simple, il:



  1. _currentId.

    _currentId — , . . ;
  2. _continuation - s_availableSentinel. , , AsyncOperation . , (pooled = true);
  3. _result.

    _result TrySetResult .


Méthode TrySetResult ( github ).



La méthode est triviale. - il stocke le paramètre accepté dans _result et signale la complétion, à savoir qu'il appelle la méthode SignalCompleteion , ce qui est assez intéressant.



Méthode SignalCompletion ( github ).



Cette méthode utilise tout ce dont nous avons parlé au début.



Au tout début, si _continuation == null, nous écrivons la marionnette s_completedSentinel.



De plus, la méthode peut être divisée en 4 blocs. Je vais vous le dire tout de suite pour faciliter la compréhension du circuit, le bloc 4 n'est qu'une exécution synchrone de la suite. Autrement dit, l'exécution triviale de la continuation à travers la méthode, comme je l'ai décrit dans le paragraphe sur IThreadPoolWorkItem.



  1. _schedulingContext == null, .. ( if).

    _runContinuationsAsynchronously == true, , — ( if).

    IThreadPoolWorkItem . AsyncOperation . .

    , if ( , ), , 2 3 , — .. 4 ;
  2. _schedulingContext is SynchronizationContext, ( if).

    _runContinuationsAsynchronously = true. . , , . , . 2 , :

    sc.Post(s => ((AsyncOperation<TResult>)s).SetCompletionAndInvokeContinuation(), this);
    


    . , , ( , ), 4 — ;
  3. , 2 . .

    , _schedulingContext TaskScheduler, . , 2, .. _runContinuationsAsynchronously = true TaskScheduler . , Task.Factory.StartNew . .
  4. — . , .


La deuxième méthode de l'interface IValueTaskSource est GetStatus ( github )

Tout comme un âne de Saint-Pétersbourg.



Si _continuation! = _CompletedSentinel, alors retournez ValueTaskSourceStatus.Pending

If error == null, puis renvoyez ValueTaskSourceStatus.Succeeded

Si _error.SourceException est OperationCanceledException, puis retournez ValueTaskSourceStatus.Canceled

Well, puisque beaucoup de choses sont arrivées ici, retournez ValueTaskSourceStatus.Succeeded Si _error.SourceException est OperationCanceledException, puis renvoyez ValueTaskSourceStatus.Canceled Well, puisque beaucoup de choses sont arrivées ici, retournez ValueTaskSourceStatus



troisième et final. , mais la méthode la plus complexe de l'interface IValueTaskSource est OnCompleted ( github )



La méthode ajoute une continuation qui est exécutée à la fin.



Capture ExecutionContext et SynchronizationContext si nécessaire.



Ensuite, Interlocked.CompareExchange décrit ci-dessus est utilisé pour stocker la continuation dans le champ, en la comparant à null. Pour rappel, CompareExchange renvoie la valeur réelle d'une variable.



Si la suite est enregistrée, la valeur qui était dans la variable avant la mise à jour, c'est-à-dire null, est renvoyée. Cela signifie que l'opération n'était pas terminée au moment de l'enregistrement de la suite. Et celui qui le complète lui-même comprendra tout (comme nous l'avons vu plus haut). Et nous n'avons aucun sens pour effectuer des actions supplémentaires. Et c'est là que se termine le travail de la méthode.



Si la valeur n'a pas été enregistrée, c'est-à-dire que quelque chose autre que null a été renvoyé par CompareExchange. Dans ce cas, quelqu'un a réussi à mettre la valeur plus rapidement que nous. Autrement dit, l'une des 2 situations s'est produite - soit la tâche s'est terminée plus rapidement que nous sommes arrivés ici, soit il y a eu une tentative d'écrire plus d'une suite, ce qui ne peut pas être fait.



Ainsi, nous vérifions la valeur retournée, si elle est égale à s_completedSentinel - elle serait écrite si elle était complétée.



  • Si ce n'est pas s_completedSentinel , alors nous n'avons pas été utilisés comme prévu - ils ont essayé d'ajouter plus d'une suite. C'est celui qui a déjà été enregistré et celui que nous écrivons. Et c'est une situation exceptionnelle;
  • s_completedSentinel, , , . , _runContinuationsAsynchronously = false.

    , , OnCompleted, awaiter'. . , AsyncOperation — System.Threading.Channels. , . , . , , ( ) . , awaiter' , , . awaiter'.

    Pour éviter cette situation, vous devez exécuter la continuation de manière asynchrone quoi qu'il arrive. Il est exécuté selon les mêmes schémas que les 3 premiers blocs de la méthode SignalCompleteion - juste dans un pool, dans un contexte ou via une usine et un ordonnanceur


Voici un exemple de suites synchrones:



class Program
    {
        static async Task Main(string[] args)
        {
            Channel<int> unboundedChannel = Channel.CreateUnbounded<int>(new UnboundedChannelOptions
            {
                AllowSynchronousContinuations = true
            });

            ChannelWriter<int> writer = unboundedChannel;
            ChannelReader<int> reader = unboundedChannel;

            Console.WriteLine($"Main, before await. Thread id: {Thread.CurrentThread.ManagedThreadId}");

            var writerTask = Task.Run(async () =>
            {
                Thread.Sleep(500);
                int objectToWriteInChannel = 555;
                Console.WriteLine($"Created thread for writing with delay, before await write. Thread id: {Thread.CurrentThread.ManagedThreadId}");
                await writer.WriteAsync(objectToWriteInChannel);
                Console.WriteLine($"Created thread for writing with delay, after await write. Thread id: {Thread.CurrentThread.ManagedThreadId}");
            });

            //Blocked here because there are no items in channel
            int valueFromChannel = await reader.ReadAsync();
            Console.WriteLine($"Main, after await (will be processed by created thread for writing). Thread id: {Thread.CurrentThread.ManagedThreadId}");

            await writerTask;

            Console.Read();
        }
    }


Sortie:



principale, avant d'attendre. ID de fil: 1

Fil créé pour l'écriture avec délai, avant attente d'écriture. Thread id: 4

Main, after await (sera traité par le thread créé pour l'écriture). ID de fil: 4

Fil créé pour l'écriture avec retard, après attente d'écriture. Identifiant de la discussion: 4



All Articles