Atomics.wait()
est bloquante, elle ne peut pas être appelée sur le thread principal (si vous essayez de le faire, une erreur sera générée TypeError
).
Le moteur V8, depuis la version 8.7, prend en charge une option non bloquante
Atomics.wait()
appelée Atomics.waitAsync () . Cette nouvelle méthode peut être utilisée sur le thread principal.
Aujourd'hui, nous allons vous montrer comment utiliser ces API de bas niveau pour créer un mutex qui peut s'exécuter à la fois de manière synchrone (sur les threads de travail) et de manière asynchrone (sur les threads de travail ou sur le thread principal).
Atomics.wait () et Atomics.waitAsync ()
Méthodes
Atomics.wait()
et Atomics.waitAsync()
prenez les paramètres suivants:
buffer
: un tableau de typeInt32Array
ouBigInt64Array
, basé surSharedArrayBuffer
.index
: l'index réel de l'élément dans le tableau.expectedValue
: la valeur que l'on s'attend à représenter en mémoire à l'emplacement décrit avecbuffer
etindex
.timeout
: délai en millisecondes (facultatif, par défautInfinity
).
Atomics.wait()
renvoie une chaîne. Si la valeur attendue n'est pas trouvée dans l'emplacement mémoire spécifié, il Atomics.wait()
se ferme immédiatement, renvoyant une chaîne not-equal
. Sinon, le fil est bloqué. L'un des événements suivants doit se produire pour que le verrou soit libéré. Le premier est un appel depuis un autre thread d'une méthode Atomics.notify()
avec une indication de la place en mémoire qui intéresse la méthode Atomics.wait()
. Le second est l'expiration du délai. Dans le premier cas, il Atomics.wait()
renverra une chaîne ok
, dans le second - une valeur de chaîne timed-out
.
La méthode
Atomics.notify()
prend les paramètres suivants:
typedArray
: un tableau de typeInt32Array
ouBigInt64Array
, basé surSharedArrayBuffer
.index
: l'index réel de l'élément dans le tableau.count
: nombre d'agents en attente de notification (paramètre optionnel, défini par défautInfinity
).
La méthode
Atomics.notify()
notifie le nombre spécifié d'agents en attente de notification à l'adresse décrite typedArray
et index
en les contournant dans l'ordre FIFO. Si plusieurs appels ont été effectués Atomics.wait()
ou Atomics.waitAsync()
regardent le même endroit en mémoire, ils se retrouvent tous dans la même file d'attente.
Contrairement à une méthode
Atomics.wait()
, une méthode Atomics.waitAsync()
renvoie immédiatement une valeur à l'emplacement de l'appel. Il peut s'agir de l'une des valeurs suivantes:
{ async: false, value: 'not-equal' }
- si l'emplacement mémoire spécifié ne contient pas la valeur attendue.{ async: false, value: 'timed-out' }
- uniquement lorsque le délai d'expiration est défini sur 0.{ async: true, value: promise }
- dans d'autres cas.
Une promesse, après un certain temps, peut être résolue avec succès par une valeur de chaîne
ok
(si une méthode a été appelée Atomics.notify()
, à laquelle des informations sur la place en mémoire qui ont été transmises ont été transmises Atomics.waitAsync()
). Il peut être résolu avec une valeur timed-out
. Cette promesse n'est jamais rejetée.
L'exemple suivant montre les bases de l'utilisation
Atomics.waitAsync()
:
const sab = new SharedArrayBuffer(16);
const i32a = new Int32Array(sab);
const result = Atomics.waitAsync(i32a, 0, 0, 1000);
// | | ^ - ()
// | ^
// ^
if (result.value === 'not-equal') {
// SharedArrayBuffer .
} else {
result.value instanceof Promise; // true
result.value.then(
(value) => {
if (value == 'ok') { /* */ }
else { /* - */ }
});
}
// :
Atomics.notify(i32a, 0);
Voyons maintenant comment créer un mutex qui peut être utilisé à la fois en modes synchrone et asynchrone. Il est à noter que l'implémentation de la version synchrone du mutex a été discutée précédemment. Par exemple - dans ce matériau.
Dans cet exemple, nous n'utiliserons pas le paramètre
timeout
lors de l'appel Atomics.wait()
et Atomics.waitAsync()
. Ce paramètre peut être utilisé pour implémenter des conditions liées au délai d'expiration.
Notre classe
AsyncLock
représentant un mutex fonctionne avec un tampon SharedArrayBuffer
et implémente les méthodes suivantes:
lock()
: bloque le thread jusqu'à ce que nous ayons la possibilité de capturer le mutex (applicable uniquement dans le thread de travail).unlock()
: libère le mutex (celui-ci est le contrairelock()
).executeLocked(callback)
: essaie d'acquérir le verrou sans bloquer le thread. Cette méthode peut être utilisée sur le thread principal. Il prévoit d'exécuter le rappel au moment où nous pouvons acquérir le verrou.
Voyons comment ces méthodes peuvent être implémentées. La déclaration de classe comprend des constantes et un constructeur qui prend un tampon
SharedArrayBuffer
.
class AsyncLock {
static INDEX = 0;
static UNLOCKED = 0;
static LOCKED = 1;
constructor(sab) {
this.sab = sab;
this.i32a = new Int32Array(sab);
}
lock() {
/* … */
}
unlock() {
/* … */
}
executeLocked(f) {
/* … */
}
}
Ici, l'élément
i32a[0]
contient la valeur LOCKED
ou UNLOCKED
. Il, en plus, représente la place dans la mémoire qui intéresse Atomics.wait()
et Atomics.waitAsync()
. La classe AsyncLock
fournit les fonctionnalités de base suivantes:
i32a[0] == LOCKED
et le thread est dans un état d'attente (après avoir été appeléAtomics.wait()
ouAtomics.waitAsync()
), en regardanti32a[0]
, il sera finalement notifié.- Une fois le thread notifié, il essaiera d'acquérir le verrou. S'il réussit, alors, quand il libère le verrou, il appellera
Atomics.notify()
.
Capture et déverrouillage synchrones
Considérez le code d'une méthode
lock()
qui ne peut être appelée qu'à partir d'un thread de travail.
lock() {
while (true) {
const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
/* >>> */ AsyncLock.UNLOCKED,
/* >>> */ AsyncLock.LOCKED);
if (oldValue == AsyncLock.UNLOCKED) {
return;
}
Atomics.wait(this.i32a, AsyncLock.INDEX,
AsyncLock.LOCKED); // <<< ,
}
}
Lorsqu'une méthode est appelée à partir d'un thread
lock()
, elle essaie d'abord d'acquérir le verrou, en l'utilisant Atomics.compareExchange()
pour changer l'état du verrou de UNLOCKED
à LOCKED
. La méthode Atomics.compareExchange()
tente d'effectuer une opération atomique de modification de l'état de verrouillage, elle retourne la valeur d'origine située dans la zone mémoire spécifiée. Si la valeur d'origine était UNLOCKED
, alors nous savons que le changement d'état a réussi et que le thread a acquis le verrou. Vous n'avez rien d'autre à faire.
S'il
Atomics.compareExchange()
n'a pas pu changer l'état du verrou, cela signifie qu'un autre thread tient le verrou. En conséquence, le thread à partir duquel la méthode est appelée lock()
essaie d'utiliser la méthodeAtomics.wait()
afin d'attendre que le verrou soit libéré par un autre thread. Si la valeur attendue est toujours stockée dans la zone mémoire d'intérêt (dans notre cas - AsyncLock.LOCKED
), l'appel Atomics.wait()
bloquera le thread. Le retour de Atomics.wait()
ne se produira que lorsqu'un autre thread appelle Atomics.notify()
.
La méthode
unlock()
libère le verrou en le définissant dans l'état UNLOCKED
et l'appelle Atomics.notify()
pour notifier les agents qui attendent que le verrou soit libéré. On suppose qu'une opération de changement d'état de verrouillage réussit toujours. Cela est dû au fait que le thread effectuant cette opération tient un verrou. Par conséquent, rien d'autre ne doit appeler la méthode pour le moment unlock()
.
unlock() {
const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
/* >>> */ AsyncLock.LOCKED,
/* >>> */ AsyncLock.UNLOCKED);
if (oldValue != AsyncLock.LOCKED) {
throw new Error('Tried to unlock while not holding the mutex');
}
Atomics.notify(this.i32a, AsyncLock.INDEX, 1);
}
Dans un cas typique, tout se passe comme ceci: le verrou est libre et le thread T1 le capture en changeant son état à l'aide de
Atomics.compareExchange()
. Le thread T2 essaie d'acquérir le verrou en l'appelant Atomics.compareExchange()
, mais ne peut pas changer son état. Puis T2 appelle Atomics.wait()
, cet appel bloquera le thread. Après un certain temps, le thread T1 libère le verrou et appelle Atomics.notify()
. Cela provoque le Atomics.wait()
retour de l'appel à T2 ok
et le thread T2 pour quitter le verrou. T2 tente alors d'acquérir à nouveau le verrou. Cette fois, il réussit.
Il y a deux cas particuliers ici. Leur analyse vise à en démontrer les raisons
Atomics.wait()
et à Atomics.waitAsync()
rechercher une valeur spécifique à l'index spécifié de l'élément du tableau. Voici les cas:
- T1 , T2 . T2 ,
Atomics.compareExchange()
, . T1 , T2Atomics.wait()
. T2Atomics.wait()
,not-equal
. T2 . - T1 , T2
Atomics.wait()
. T1 , T2 (Atomics.wait()
)Atomics.compareExchange()
. , T3, . .Atomics.compareExchange()
T2 . T2Atomics.wait()
, T3 .
Le dernier cas particulier démontre le fait que notre mutex ne fonctionne pas correctement. Il peut arriver que le thread T2 attendait la libération du verrou, mais T3 a réussi à l'acquérir immédiatement après sa libération. Une implémentation de verrouillage plus adaptée à une utilisation dans le monde réel peut utiliser plusieurs états de verrouillage existants pour distinguer les situations dans lesquelles la serrure a simplement été «acquise» et dans lesquelles «il y a eu un conflit lors de l'acquisition».
Capture de verrouillage asynchrone
Une méthode non bloquante
executeLocked()
peut, contrairement à une méthode lock()
, être appelée depuis le thread principal. Il reçoit, comme seul paramètre, un rappel et planifie le rappel après avoir acquis avec succès le verrou.
executeLocked(f) {
const self = this;
async function tryGetLock() {
while (true) {
const oldValue = Atomics.compareExchange(self.i32a, AsyncLock.INDEX,
/* >>> */ AsyncLock.UNLOCKED,
/* >>> */ AsyncLock.LOCKED);
if (oldValue == AsyncLock.UNLOCKED) {
f();
self.unlock();
return;
}
const result = Atomics.waitAsync(self.i32a, AsyncLock.INDEX,
AsyncLock.LOCKED);
// ^ ,
await result.value;
}
}
tryGetLock();
}
La fonction interne
tryGetLock()
essaie d'abord d'acquérir le verrou avec Atomics.compareExchange()
. Si l'appel de cette méthode aboutit à un changement d'état de verrouillage réussi, la fonction peut appeler un rappel, puis libérer le verrou et quitter.
Si l'appel
Atomics.compareExchange()
n'a pas permis d'acquérir le verrou, nous devons essayer de le refaire, au moment où le verrou sera probablement libre. Mais nous ne pouvons pas bloquer le thread et attendre que le verrou soit libéré. Au lieu de cela, nous Atomics.waitAsync()
planifions une nouvelle tentative d'acquisition du verrou à l'aide de la méthode et de la promesse qu'elle renvoie.
Si nous réussissons à exécuter la méthode
Atomics.waitAsync()
, la promesse renvoyée par cette méthode est résolue lorsque le thread qui détenait le verrou appelleAtomics.notify()
... Après cela, le thread qui voulait acquérir le verrou, comme auparavant, essaie de le faire à nouveau.
Ici, ces cas particuliers sont possibles qui sont caractéristiques de la version synchrone (le verrou est libéré entre les appels
Atomics.compareExchange()
et Atomics.waitAsync()
; le verrou est capturé par un autre thread, faisant cela entre les moments de résolution de la promesse et l'appel Atomics.compareExchange()
). Par conséquent, dans un code similaire applicable dans les projets réels, cela doit être pris en compte.
Résultat
Dans cet article, nous avons parlé des primitives de synchronisation de bas niveau
Atomics.wait()
, Atomics.waitAsync()
et Atomics.notify()
. Nous avons analysé un exemple de création d'un mutex basé sur eux, qui peut être utilisé à la fois dans le thread principal et dans les threads de travail.
Atomics.wait (), Atomics.waitAsync () et Atomics.notify () seront-ils utiles dans vos projets?