- la contre-pression est cool
- la contre-pression n'est disponible que dans les bibliothèques qui implémentent la spécification des flux réactifs
- cette spécification est si complexe que vous ne devriez même pas essayer de la mettre en œuvre vous-même
Dans cet article, je vais essayer de montrer que:
- la contre-pression est très simple
- pour implémenter une contre-pression asynchrone, il suffit de faire une version asynchrone du sémaphore
- s'il existe une implémentation de sémaphore asynchrone, l'interface org.reactivestreams.Publisher est implémentée en quelques dizaines de lignes de code
La contre-pression est une rétroaction qui ajuste la vitesse du producteur de données pour qu'elle corresponde à la vitesse du consommateur. En l'absence d'une telle connexion, un producteur plus rapide peut déborder du tampon du consommateur ou, si le tampon est sans dimension, épuiser toute la RAM.
Dans la programmation multithread, ce problème a été résolu par Dijkstroy, qui a proposé un nouveau mécanisme de synchronisation - le sémaphore. Un sémaphore peut être considéré comme un compteur de permission. On suppose que le producteur demande l'autorisation du sémaphore avant de commettre une action gourmande en ressources. Si le sémaphore est vide, le thread producteur est bloqué.
Les programmes asynchrones ne peuvent pas bloquer les threads, ils ne peuvent donc pas accéder à un sémaphore vide pour obtenir une autorisation (mais ils peuvent effectuer toutes les autres opérations de sémaphore). Ils doivent bloquer leur exécution d'une autre manière. De cette autre façon, ils quittent simplement le thread de travail sur lequel ils s'exécutaient, mais avant cela, ils s'arrangent pour retourner au travail dès que le sémaphore est plein.
La manière la plus élégante de mettre en pause et de reprendre un programme asynchrone est de le structurer comme un acteur de flux de données avec des ports :
Un modèle de flux de données - acteurs avec des ports, les connexions dirigées entre leurs ports et les jetons initiaux. Tiré de: Une description structurée des acteurs de flux de données et de son application
Il y a des ports d'entrée et de sortie. Les ports d'entrée reçoivent des jetons (messages et signaux) des ports de sortie d'autres acteurs. Si le port d'entrée contient des jetons et que le port de sortie a un emplacement pour placer des jetons, il est considéré comme actif. Si tous les ports de l'acteur sont actifs, il est envoyé pour exécution. Ainsi, lors de la reprise de son travail, le programme acteur peut lire en toute sécurité les jetons des ports d'entrée et écrire le week-end. Toute la sagesse de la programmation asynchrone réside dans ce mécanisme simple. L'allocation des ports en tant que sous-objets séparés des acteurs simplifie grandement le codage des programmes asynchrones et permet d'augmenter leur diversité en combinant des ports de types différents.
L'acteur classique Hewitt contient 2 ports - l'un est visible, avec un tampon pour les messages entrants, l'autre est un binaire caché qui se bloque lorsque l'acteur est envoyé pour exécution et empêche ainsi l'acteur de redémarrer jusqu'à la fin du lancement initial. Le sémaphore asynchrone souhaité est un croisement entre ces deux ports. Comme un tampon de messages, il peut stocker de nombreux jetons, et comme un port caché, ces jetons sont noirs, c'est-à-dire indiscernables, comme dans les réseaux de Petri, et un compteur de jetons suffit pour les stocker.
Au premier niveau de la hiérarchie, nous avons une classe
AbstractActor
avec trois classes imbriquées - base Port
et dérivés AsyncSemaPort
et InPort
, ainsi qu'un mécanisme de lancement d'un acteur pour exécution en l'absence de ports bloqués. En bref, cela ressemble à ceci:
public abstract class AbstractActor {
/** */
private int blocked = 0;
protected synchronized void restart() {
controlPort.unBlock();
}
private synchronized void incBlockCount() {
blocked++;
}
private synchronized void decBlockCount() {
blocked--;
if (blocked == 0) {
controlPort.block();
excecutor.execute(this::run);
}
}
protected abstract void turn() throws Throwable;
/** */
private void run() {
try {
turn();
restart();
} catch (Throwable throwable) {
whenError(throwable);
}
}
}
Il contient un ensemble minimal de classes de port:
Port
- classe de base de tous les ports
protected class Port {
private boolean isBlocked = true;
public Port() {
incBlockCount();
}
protected synchronized void block() {
if (isBlocked) {
return;
}
isBlocked = true;
incBlockCount();
}
protected synchronized void unBlock() {
if (!isBlocked) {
return;
}
isBlocked = false;
decBlockCount();
}
}
Sémaphore asynchrone:
public class AsyncSemaPort extends Port {
private long permissions = 0;
public synchronized void release(long n) {
permissions += n;
if (permissions > 0) {
unBlock();
}
}
public synchronized void aquire(long delta) {
permissions -= delta;
if (permissions <= 0) {
//
// ,
//
block();
}
}
}
InPort
- tampon minimum pour un message entrant:
public class InPort<T> extends Port implements OutMessagePort<T> {
private T item;
@Override
public void onNext(T item) {
this.item = item;
unBlock();
}
public synchronized T poll() {
T res = item;
item = null;
return res;
}
}
La version complète de la classe
AbstractActor
peut être consultée ici.
Au niveau suivant de la hiérarchie, nous avons trois acteurs abstraits avec des ports spécifiques, mais avec des routines de traitement non définies:
- une classe
AbstractProducer
est un acteur avec un port de type sémaphore asynchrone (et un port de contrôle interne, présent dans tous les acteurs par défaut). - la classe
AbstractTransformer
est un acteur Hewitt normal, avec une référence au port d'entrée de l'acteur suivant de la chaîne, où il envoie les jetons convertis. - la classe
AbstractConsumer
est également un acteur ordinaire, mais elle n'envoie nulle part les jetons convertis, alors qu'elle a un lien vers le sémaphore du producteur, et ouvre ce sémaphore après avoir absorbé le jeton d'entrée. Cela maintient le nombre de jetons dans le processus constant et aucun débordement de tampon ne se produit.
Au dernier niveau, déjà dans le répertoire de test, des acteurs spécifiques utilisés dans les tests sont définis :
- la classe
ProducerActor
génère un flux fini d'entiers. - la classe
TransformerActor
prend le numéro suivant du flux et l'envoie dans la chaîne. - class
ConsumerActor
- accepte et imprime les nombres résultants
Maintenant, nous pouvons construire une chaîne de gestionnaires de travail asynchrones et parallèles comme suit: producteur - n'importe quel nombre de transformateurs - consommateur
Ainsi, nous avons mis en œuvre une contre-pression, et même sous une forme plus générale que dans la spécification des flux réactifs - le retour peut couvrir un nombre arbitraire de cascades de traitement, et non seulement les adjacentes, comme dans la spécification.
Pour implémenter la spécification, vous devez définir un port de sortie sensible au nombre d'autorisations qui lui sont transmises à l'aide de la méthode request () - ce sera le cas
Publisher
, et compléter l'existant avec un InPort
appel à cette méthode - ce sera Subscriber
. Autrement dit, nous supposons que les interfaces Publisher
etSubscriber
décrire le comportement des ports et non des acteurs. Mais à en juger par le fait qu'il y a aussi dans la liste des interfaces Processor
, qui ne peut en aucun cas être une interface de port, les auteurs de la spécification considèrent leurs interfaces comme des interfaces d'acteurs. Eh bien, nous pouvons faire des acteurs qui implémentent toutes ces interfaces en déléguant l'exécution des fonctions d'interface aux ports correspondants.
Pour simplifier, laissez le nôtre
Publisher
ne pas avoir son propre tampon et écrira directement dans le tampon Subscriber
. Pour ce faire, vous avez besoin de quelqu'un pour Subscriber
vous abonner et remplir request()
, c'est-à-dire que nous avons 2 conditions et, par conséquent, nous avons besoin de 2 ports - InPort<Subscriber>
et AsyncSemaPort
. Aucun d'entre eux ne convient comme base de mise en œuvrePublisher
'a, car il contient des méthodes inutiles, nous allons donc faire de ces ports des variables internes:
public class ReactiveOutPort<T> implements Publisher<T>, Subscription, OutMessagePort<T> {
protected AbstractActor.InPort<Subscriber<? super T>> subscriber;
protected AbstractActor.AsyncSemaPort sema;
public ReactiveOutPort(AbstractActor actor) {
subscriber = actor.new InPort<>();
sema = actor.new AsyncSemaPort();
}
}
Cette fois, nous
ReactiveOutPort
n'avons pas défini la classe comme imbriquée, il fallait donc un paramètre de constructeur, une référence à l'acteur englobant, pour instancier les ports définis comme des classes imbriquées.
La méthode
subscribe(Subscriber subscriber)
se résume à enregistrer l'abonné et à appeler subscriber.onSubscribe()
:
public synchronized void subscribe(Subscriber<? super T> subscriber) {
if (subscriber == null) {
throw new NullPointerException();
}
if (this.subscriber.isFull()) {
subscriber.onError(new IllegalStateException());
return;
}
this.subscriber.onNext(subscriber);
subscriber.onSubscribe(this);
}
ce qui se traduit généralement par un appel
Publisher.request()
qui se résume à élever le sémaphore avec un appel AsyncSemaPort.release()
:
public synchronized void request(long n) {
if (subscriber.isEmpty()) {
return; // this spec requirement
}
if (n <= 0) {
subscriber.current().onError(new IllegalArgumentException());
return;
}
sema.release(n);
}
Et maintenant il ne nous reste pas à oublier de baisser le sémaphore en utilisant un appel
AsyncSemaPort.aquire()
au moment de l'utilisation des ressources:
public synchronized void onNext(T item) {
Subscriber<? super T> subscriber = this.subscriber.current();
if (subscriber == null) {
throw new IllegalStateException();
}
sema.aquire();
subscriber.onNext(item);
}
Le projet AsyncSemaphore a été spécialement conçu pour cet article. Il est volontairement rendu le plus compact possible pour ne pas fatiguer le lecteur. En conséquence, il contient des limitations importantes:
-
Publisher
'Subscriber
' -
Subscriber
' 1
De plus,
AsyncSemaPort
ce n'est pas un analogue complet d'un sémaphore synchrone - un seul client peut effectuer l'opération aquire()
y AsyncSemaPort
(c'est-à-dire l'acteur englobant). Mais ce n'est pas un inconvénient - AsyncSemaPort
il remplit bien son rôle. En principe, vous pouvez le faire différemment - prenez-le java.util.concurrent.Semaphore
et complétez-le avec une interface d'abonnement asynchrone (voir AsyncSemaphore.java du projet DF4J ). Un tel sémaphore peut lier des acteurs et des fils d'exécution dans n'importe quel ordre.
En général, chaque type d'interaction synchrone (bloquante) a sa propre contrepartie asynchrone (non bloquante). Donc, dans le même projet DF4J il y a une implémentation
BlockingQueue
, complété par une interface asynchrone. Cela ouvre la possibilité d'une transformation étape par étape d'un programme multithread en un programme asynchrone, en remplaçant en partie les threads par des acteurs.