Utilisation d'Atomics.wait (), Atomics.notify () et Atomics.waitAsync ()

Les méthodes statiques Atomics.wait () et Atomics.notify () sont des primitives de synchronisation de bas niveau qui peuvent être utilisées pour implémenter des mutex et d'autres mécanismes similaires. Mais, comme la méthode Atomics.wait()est bloquante, elle ne peut pas être appelée sur le thread principal (si vous essayez de le faire, une erreur sera générée TypeError).



Le moteur V8, depuis la version 8.7, prend en charge une option non bloquante Atomics.wait()appelée Atomics.waitAsync () . Cette nouvelle méthode peut être utilisée sur le thread principal. Aujourd'hui, nous allons vous montrer comment utiliser ces API de bas niveau pour créer un mutex qui peut s'exécuter à la fois de manière synchrone (sur les threads de travail) et de manière asynchrone (sur les threads de travail ou sur le thread principal).











Atomics.wait () et Atomics.waitAsync ()



Méthodes Atomics.wait()et Atomics.waitAsync()prenez les paramètres suivants:



  • buffer: un tableau de type Int32Arrayou BigInt64Array, basé sur SharedArrayBuffer.
  • index: l'index réel de l'élément dans le tableau.
  • expectedValue: la valeur que l'on s'attend à représenter en mémoire à l'emplacement décrit avec bufferet index.
  • timeout: délai en millisecondes (facultatif, par défaut Infinity).


Atomics.wait()renvoie une chaîne. Si la valeur attendue n'est pas trouvée dans l'emplacement mémoire spécifié, il Atomics.wait()se ferme immédiatement, renvoyant une chaîne not-equal. Sinon, le fil est bloqué. L'un des événements suivants doit se produire pour que le verrou soit libéré. Le premier est un appel depuis un autre thread d'une méthode Atomics.notify()avec une indication de la place en mémoire qui intéresse la méthode Atomics.wait(). Le second est l'expiration du délai. Dans le premier cas, il Atomics.wait()renverra une chaîne ok, dans le second - une valeur de chaîne timed-out.



La méthode Atomics.notify()prend les paramètres suivants:



  • typedArray: un tableau de type Int32Arrayou BigInt64Array, basé sur SharedArrayBuffer.
  • index: l'index réel de l'élément dans le tableau.
  • count: nombre d'agents en attente de notification (paramètre optionnel, défini par défaut Infinity).


La méthode Atomics.notify()notifie le nombre spécifié d'agents en attente de notification à l'adresse décrite typedArrayet indexen les contournant dans l'ordre FIFO. Si plusieurs appels ont été effectués Atomics.wait()ou Atomics.waitAsync()regardent le même endroit en mémoire, ils se retrouvent tous dans la même file d'attente.



Contrairement à une méthode Atomics.wait(), une méthode Atomics.waitAsync()renvoie immédiatement une valeur à l'emplacement de l'appel. Il peut s'agir de l'une des valeurs suivantes:



  • { async: false, value: 'not-equal' } - si l'emplacement mémoire spécifié ne contient pas la valeur attendue.
  • { async: false, value: 'timed-out' } - uniquement lorsque le délai d'expiration est défini sur 0.
  • { async: true, value: promise } - dans d'autres cas.


Une promesse, après un certain temps, peut être résolue avec succès par une valeur de chaîne ok(si une méthode a été appelée Atomics.notify(), à laquelle des informations sur la place en mémoire qui ont été transmises ont été transmises Atomics.waitAsync()). Il peut être résolu avec une valeur timed-out. Cette promesse n'est jamais rejetée.



L'exemple suivant montre les bases de l'utilisation Atomics.waitAsync():



const sab = new SharedArrayBuffer(16);
const i32a = new Int32Array(sab);
const result = Atomics.waitAsync(i32a, 0, 0, 1000);
//                                     |  |  ^ - ()
//                                     |  ^  
//                                     ^ 

if (result.value === 'not-equal') {
  //   SharedArrayBuffer   .
} else {
  result.value instanceof Promise; // true
  result.value.then(
    (value) => {
      if (value == 'ok') { /*   */ }
      else { /*  - */ }
    });
}

//      :
Atomics.notify(i32a, 0);


Voyons maintenant comment créer un mutex qui peut être utilisé à la fois en modes synchrone et asynchrone. Il est à noter que l'implémentation de la version synchrone du mutex a été discutée précédemment. Par exemple - dans ce matériau.



Dans cet exemple, nous n'utiliserons pas le paramètre timeoutlors de l'appel Atomics.wait()et Atomics.waitAsync(). Ce paramètre peut être utilisé pour implémenter des conditions liées au délai d'expiration.



Notre classe AsyncLockreprésentant un mutex fonctionne avec un tampon SharedArrayBufferet implémente les méthodes suivantes:



  • lock(): bloque le thread jusqu'à ce que nous ayons la possibilité de capturer le mutex (applicable uniquement dans le thread de travail).
  • unlock(): libère le mutex (celui-ci est le contraire lock()).
  • executeLocked(callback): essaie d'acquérir le verrou sans bloquer le thread. Cette méthode peut être utilisée sur le thread principal. Il prévoit d'exécuter le rappel au moment où nous pouvons acquérir le verrou.


Voyons comment ces méthodes peuvent être implémentées. La déclaration de classe comprend des constantes et un constructeur qui prend un tampon SharedArrayBuffer.



class AsyncLock {
  static INDEX = 0;
  static UNLOCKED = 0;
  static LOCKED = 1;

  constructor(sab) {
    this.sab = sab;
    this.i32a = new Int32Array(sab);
  }

  lock() {
    /* … */
  }

  unlock() {
    /* … */
  }

  executeLocked(f) {
    /* … */
  }
}


Ici, l'élément i32a[0]contient la valeur LOCKEDou UNLOCKED. Il, en plus, représente la place dans la mémoire qui intéresse Atomics.wait()et Atomics.waitAsync(). La classe AsyncLockfournit les fonctionnalités de base suivantes:



  1. i32a[0] == LOCKEDet le thread est dans un état d'attente (après avoir été appelé Atomics.wait()ou Atomics.waitAsync()), en regardant i32a[0], il sera finalement notifié.
  2. Une fois le thread notifié, il essaiera d'acquérir le verrou. S'il réussit, alors, quand il libère le verrou, il appellera Atomics.notify().


Capture et déverrouillage synchrones



Considérez le code d'une méthode lock()qui ne peut être appelée qu'à partir d'un thread de travail.



lock() {
  while (true) {
    const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
                        /*   >>> */  AsyncLock.UNLOCKED,
                        /*   >>> */  AsyncLock.LOCKED);
    if (oldValue == AsyncLock.UNLOCKED) {
      return;
    }
    Atomics.wait(this.i32a, AsyncLock.INDEX,
                 AsyncLock.LOCKED); // <<< ,    
  }
}


Lorsqu'une méthode est appelée à partir d'un thread lock(), elle essaie d'abord d'acquérir le verrou, en l'utilisant Atomics.compareExchange()pour changer l'état du verrou de UNLOCKEDà LOCKED. La méthode Atomics.compareExchange()tente d'effectuer une opération atomique de modification de l'état de verrouillage, elle retourne la valeur d'origine située dans la zone mémoire spécifiée. Si la valeur d'origine était UNLOCKED, alors nous savons que le changement d'état a réussi et que le thread a acquis le verrou. Vous n'avez rien d'autre à faire.



S'il Atomics.compareExchange()n'a pas pu changer l'état du verrou, cela signifie qu'un autre thread tient le verrou. En conséquence, le thread à partir duquel la méthode est appelée lock()essaie d'utiliser la méthodeAtomics.wait()afin d'attendre que le verrou soit libéré par un autre thread. Si la valeur attendue est toujours stockée dans la zone mémoire d'intérêt (dans notre cas - AsyncLock.LOCKED), l'appel Atomics.wait()bloquera le thread. Le retour de Atomics.wait()ne se produira que lorsqu'un autre thread appelle Atomics.notify().



La méthode unlock()libère le verrou en le définissant dans l'état UNLOCKEDet l'appelle Atomics.notify()pour notifier les agents qui attendent que le verrou soit libéré. On suppose qu'une opération de changement d'état de verrouillage réussit toujours. Cela est dû au fait que le thread effectuant cette opération tient un verrou. Par conséquent, rien d'autre ne doit appeler la méthode pour le moment unlock().



unlock() {
  const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
                      /*   >>> */  AsyncLock.LOCKED,
                      /*   >>> */  AsyncLock.UNLOCKED);
  if (oldValue != AsyncLock.LOCKED) {
    throw new Error('Tried to unlock while not holding the mutex');
  }
  Atomics.notify(this.i32a, AsyncLock.INDEX, 1);
}


Dans un cas typique, tout se passe comme ceci: le verrou est libre et le thread T1 le capture en changeant son état à l'aide de Atomics.compareExchange(). Le thread T2 essaie d'acquérir le verrou en l'appelant Atomics.compareExchange(), mais ne peut pas changer son état. Puis T2 appelle Atomics.wait(), cet appel bloquera le thread. Après un certain temps, le thread T1 libère le verrou et appelle Atomics.notify(). Cela provoque le Atomics.wait()retour de l'appel à T2 oket le thread T2 pour quitter le verrou. T2 tente alors d'acquérir à nouveau le verrou. Cette fois, il réussit.



Il y a deux cas particuliers ici. Leur analyse vise à en démontrer les raisons Atomics.wait()et à Atomics.waitAsync()rechercher une valeur spécifique à l'index spécifié de l'élément du tableau. Voici les cas:



  • T1 , T2 . T2 , Atomics.compareExchange(), . T1 , T2 Atomics.wait(). T2 Atomics.wait(), not-equal. T2 .
  • T1 , T2 Atomics.wait() . T1 , T2 ( Atomics.wait()) Atomics.compareExchange() . , T3, . . Atomics.compareExchange() T2 . T2 Atomics.wait() , T3 .


Le dernier cas particulier démontre le fait que notre mutex ne fonctionne pas correctement. Il peut arriver que le thread T2 attendait la libération du verrou, mais T3 a réussi à l'acquérir immédiatement après sa libération. Une implémentation de verrouillage plus adaptée à une utilisation dans le monde réel peut utiliser plusieurs états de verrouillage existants pour distinguer les situations dans lesquelles la serrure a simplement été «acquise» et dans lesquelles «il y a eu un conflit lors de l'acquisition».



Capture de verrouillage asynchrone



Une méthode non bloquante executeLocked()peut, contrairement à une méthode lock(), être appelée depuis le thread principal. Il reçoit, comme seul paramètre, un rappel et planifie le rappel après avoir acquis avec succès le verrou.



executeLocked(f) {
  const self = this;

  async function tryGetLock() {
    while (true) {
      const oldValue = Atomics.compareExchange(self.i32a, AsyncLock.INDEX,
                          /*   >>> */  AsyncLock.UNLOCKED,
                          /*   >>> */  AsyncLock.LOCKED);
      if (oldValue == AsyncLock.UNLOCKED) {
        f();
        self.unlock();
        return;
      }
      const result = Atomics.waitAsync(self.i32a, AsyncLock.INDEX,
                                       AsyncLock.LOCKED);
                                   //  ^ ,    
      await result.value;
    }
  }

  tryGetLock();
}


La fonction interne tryGetLock()essaie d'abord d'acquérir le verrou avec Atomics.compareExchange(). Si l'appel de cette méthode aboutit à un changement d'état de verrouillage réussi, la fonction peut appeler un rappel, puis libérer le verrou et quitter.



Si l'appel Atomics.compareExchange()n'a pas permis d'acquérir le verrou, nous devons essayer de le refaire, au moment où le verrou sera probablement libre. Mais nous ne pouvons pas bloquer le thread et attendre que le verrou soit libéré. Au lieu de cela, nous Atomics.waitAsync()planifions une nouvelle tentative d'acquisition du verrou à l'aide de la méthode et de la promesse qu'elle renvoie.



Si nous réussissons à exécuter la méthode Atomics.waitAsync(), la promesse renvoyée par cette méthode est résolue lorsque le thread qui détenait le verrou appelleAtomics.notify()... Après cela, le thread qui voulait acquérir le verrou, comme auparavant, essaie de le faire à nouveau.



Ici, ces cas particuliers sont possibles qui sont caractéristiques de la version synchrone (le verrou est libéré entre les appels Atomics.compareExchange()et Atomics.waitAsync(); le verrou est capturé par un autre thread, faisant cela entre les moments de résolution de la promesse et l'appel Atomics.compareExchange()). Par conséquent, dans un code similaire applicable dans les projets réels, cela doit être pris en compte.



Résultat



Dans cet article, nous avons parlé des primitives de synchronisation de bas niveau Atomics.wait(), Atomics.waitAsync()et Atomics.notify(). Nous avons analysé un exemple de création d'un mutex basé sur eux, qui peut être utilisé à la fois dans le thread principal et dans les threads de travail.



Atomics.wait (), Atomics.waitAsync () et Atomics.notify () seront-ils utiles dans vos projets?



All Articles