Bonjour, Habr! Karma a été épuisé à cause d'un commentaire imprudent sous l'article holivar, ce qui signifie que vous devez écrire un article intéressant (j'espère) et vous réhabiliter.
J'utilise un client serveur de télégramme en php depuis plusieurs années. Et comme de nombreux utilisateurs - fatigués de la croissance constante de la consommation de mémoire. Certaines sessions peuvent prendre de 1 à 8 gigaoctets de RAM! Le support des bases de données est promis depuis longtemps, mais aucun progrès n'a été réalisé dans ce sens. J'ai dû résoudre le problème moi-même :) La popularité du projet open source a imposé des exigences intéressantes sur la pull request:
- Rétrocompatibilité . Toutes les sessions existantes doivent continuer à fonctionner dans la nouvelle version (session est une instance sérialisée de l'application dans un fichier);
- Liberté de choix de la base de données . La possibilité de changer le type de stockage sans perdre de données et à tout moment, car les utilisateurs ont différentes configurations de l'environnement;
- Extensibilité . Facilité d'ajouter de nouveaux types de bases de données;
- Enregistrer l'interface . Le code d'application qui manipule les données ne doit pas changer;
- Asynchronie . Le projet utilise amphp, donc toutes les opérations de base de données doivent être non bloquantes;
Pour plus de détails j'invite tout le monde sous chat.
Que allons-nous transférer
La plupart de la mémoire de MadelineProto est occupée par les chats, les utilisateurs et les fichiers. Par exemple, dans le cache des pairs, j'ai plus de 20 000 entrées. Ce sont tous les utilisateurs que le compte a jamais vus (y compris les membres de tous les groupes), ainsi que les canaux, les robots et les groupes. Plus le compte est ancien et actif, plus il y aura de données en mémoire. Ce sont des dizaines et des centaines de mégaoctets, et la plupart d'entre eux ne sont pas utilisés. Mais vous ne pouvez pas vider tout le cache, car les télégrammes restreindront immédiatement sévèrement le compte lorsque vous tenterez de recevoir les mêmes données plusieurs fois. Par exemple, après avoir recréé la session sur mon serveur de démonstration public, les télégrammes dans une semaine ont répondu à la plupart des demandes avec l'erreur FLOOD_WAIT et rien n'a vraiment fonctionné. Une fois le cache réchauffé, tout est revenu à la normale.
Du point de vue du code, ces données sont stockées sous forme de tableaux dans les propriétés d'une paire de classes.
Architecture
Sur la base des exigences, un schéma est né:
- Tous les tableaux «lourds» sont remplacés par des objets qui implémentent ArrayAccess;
- Pour chaque type de base de données, nous créons nos propres classes qui héritent de celle de base;
- Les objets sont créés et écrits dans les propriétés pendant __consrtuct et __awake;
- La fabrique abstraite sélectionne la classe souhaitée pour l'objet, en fonction de la base de données sélectionnée dans les paramètres de l'application;
- Si l'application dispose déjà d'un autre type de stockage, nous lisons toutes les données à partir de là et écrivons le tableau dans le nouveau stockage.
Problèmes mondiaux asynchrones
La première chose que j'ai faite a été de créer des interfaces et une classe pour stocker des tableaux en mémoire. C'était le comportement par défaut, identique à celui de l'ancienne version du programme. Le premier soir, j'étais très enthousiasmé par le succès du prototype. Le code était simple et agréable. Jusqu'à présent, il n'a pas été découvert qu'il était impossible d'utiliser des générateurs à l'intérieur des méthodes de l'interface Iterator et à l'intérieur des méthodes responsables de unset et isset.
Il convient de clarifier ici que amphp utilise la syntaxe du générateur pour implémenter async en php. Le rendement devient analogue à async ... attendez de js. Si une méthode utilise l'asynchronie, alors pour en obtenir un résultat, vous devez attendre ce résultat dans le code en utilisant yield. Par exemple:
<?php
include 'vendor/autoload.php';
$MadelineProto = new \danog\MadelineProto\API('session.madeline');
$MadelineProto->async(true);
$MadelineProto->loop(function() use($MadelineProto) {
$myAsyncFunction = function() use($MadelineProto): \Generator {
$me = yield $MadelineProto->start();
yield $MadelineProto->echo(json_encode($me, JSON_PRETTY_PRINT|JSON_UNESCAPED_UNICODE));
};
yield $myAsyncFunction();
});
Si de la chaîne
yield $myAsyncFunction();
remove yield, l'application se terminera avant l'exécution de ce code. Nous n'obtiendrons pas le résultat.
L'ajout de yield avant d'appeler des méthodes et des fonctions n'est pas très difficile. Mais comme l'interface ArrayAccess est utilisée, les méthodes ne sont pas appelées directement. Par exemple, unset () appelle offsetUnset () et isset () appelle offsetIsset (). La situation est similaire avec les itérateurs foreach lors de l'utilisation de l'interface Iterator.
L'ajout de rendement devant les méthodes intégrées génère une erreur car ces méthodes ne sont pas conçues pour fonctionner avec des générateurs. Un peu plus dans les commentaires: ici et ici .
J'ai dû faire des compromis et réécrire le code pour utiliser mes propres méthodes. Heureusement, il y avait très peu de tels endroits. Dans la plupart des cas, les tableaux étaient utilisés pour la lecture ou l'écriture par clé. Cette fonctionnalité a fait de grands amis avec les générateurs.
L'interface résultante est:
<?php
use Amp\Producer;
use Amp\Promise;
interface DbArray extends DbType, \ArrayAccess, \Countable
{
public function getArrayCopy(): Promise;
public function isset($key): Promise;
public function offsetGet($offset): Promise;
public function offsetSet($offset, $value);
public function offsetUnset($offset): Promise;
public function count(): Promise;
public function getIterator(): Producer;
/**
* @deprecated
* @internal
* @see DbArray::isset();
*
* @param mixed $offset
*
* @return bool
*/
public function offsetExists($offset);
}
Exemples d'utilisation des données
<?php
...
//
$existingChat = yield $this->chats[$user['id']];
//.
yield $this->chats[$user['id']] = $user;
// yield, .
$this->chats[$user['id']] = $user;
//unset
yield $this->chats->offsetUnset($id);
//foreach
$iterator = $this->chats->getIterator();
while (yield $iterator->advance()) {
[$key, $value] = $iterator->getCurrent();
//
}
Stockage de données
La manière la plus simple de stocker des données est la sérialisation. J'ai dû abandonner l'utilisation de json pour prendre en charge des objets. Le tableau comporte deux colonnes principales: clé et valeur.
Un exemple de requête SQL pour créer une table:
CREATE TABLE IF NOT EXISTS `{$this->table}`
(
`key` VARCHAR(255) NOT NULL,
`value` MEDIUMBLOB NULL,
`ts` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`key`)
)
ENGINE = InnoDB
CHARACTER SET 'utf8mb4'
COLLATE 'utf8mb4_general_ci'
Chaque fois que l'application démarre, nous essayons de créer une table pour chaque propriété. Il n'est pas recommandé aux clients Telegram de redémarrer plus d'une fois toutes les quelques heures, nous n'aurons donc pas plusieurs demandes de création de tables par seconde :)
Étant donné que la clé primaire ne s'incrémente pas automatiquement, l'insertion et la mise à jour des données peuvent être effectuées avec une seule requête, comme dans un tableau normal:
INSERT INTO `{$this->table}`
SET `key` = :index, `value` = :value
ON DUPLICATE KEY UPDATE `value` = :value
Une table avec un nom au format% account_id% _% class% _% variable_name% est créée pour chaque variable. Mais lorsque vous démarrez l'application pour la première fois, il n'y a pas encore de compte. Dans ce cas, vous devez générer un identifiant temporaire aléatoire avec le préfixe tmp. A chaque lancement, la classe de chaque variable vérifie si l'identifiant du compte est apparu. Si id est présent, les tables seront renommées.
Index
La structure de la base de données est aussi simple que possible afin que de nouvelles propriétés soient ajoutées automatiquement à l'avenir. Il n'y a pas de connexions. Seuls les index de clé PRIMARY sont utilisés. Mais il y a des situations où vous devez rechercher dans d'autres domaines.
Par exemple, il existe un tableau / table de discussions. La clé est l'identifiant de chat. Mais souvent, vous devez rechercher par nom d'utilisateur. Lorsque l'application stockait des données dans des tableaux, la recherche par nom d'utilisateur était effectuée comme d'habitude en itérant sur le tableau dans foreach. Cette recherche a fonctionné à une vitesse acceptable en mémoire, mais pas dans la base de données. Par conséquent, une autre table / tableau a été créée et une propriété correspondante dans la classe. La clé est le nom d'utilisateur, la valeur est l'identifiant du chat. Le seul inconvénient de cette approche est que vous devez écrire du code supplémentaire pour synchroniser les deux tables.
Mise en cache
Le mysql local est rapide, mais un peu de cache ne fait jamais de mal. Surtout si la même valeur est utilisée plusieurs fois de suite. Par exemple, nous vérifions d'abord la présence d'un chat dans la base de données, puis nous en obtenons des données.
Un simple trait de
<?php
namespace danog\MadelineProto\Db;
use Amp\Loop;
use danog\MadelineProto\Logger;
trait ArrayCacheTrait
{
/**
* Values stored in this format:
* [
* [
* 'value' => mixed,
* 'ttl' => int
* ],
* ...
* ].
* @var array
*/
protected array $cache = [];
protected string $ttl = '+5 minutes';
private string $ttlCheckInterval = '+1 minute';
protected function getCache(string $key, $default = null)
{
$cacheItem = $this->cache[$key] ?? null;
$result = $default;
if (\is_array($cacheItem)) {
$result = $cacheItem['value'];
$this->cache[$key]['ttl'] = \strtotime($this->ttl);
}
return $result;
}
/**
* Save item in cache.
*
* @param string $key
* @param $value
*/
protected function setCache(string $key, $value): void
{
$this->cache[$key] = [
'value' => $value,
'ttl' => \strtotime($this->ttl),
];
}
/**
* Remove key from cache.
*
* @param string $key
*/
protected function unsetCache(string $key): void
{
unset($this->cache[$key]);
}
protected function startCacheCleanupLoop(): void
{
Loop::repeat(\strtotime($this->ttlCheckInterval, 0) * 1000, fn () => $this->cleanupCache());
}
/**
* Remove all keys from cache.
*/
protected function cleanupCache(): void
{
$now = \time();
$oldKeys = [];
foreach ($this->cache as $cacheKey => $cacheValue) {
if ($cacheValue['ttl'] < $now) {
$oldKeys[] = $cacheKey;
}
}
foreach ($oldKeys as $oldKey) {
$this->unsetCache($oldKey);
}
Logger::log(
\sprintf(
"cache for table:%s; keys left: %s; keys removed: %s",
$this->table,
\count($this->cache),
\count($oldKeys)
),
Logger::VERBOSE
);
}
}
Je voudrais porter une attention particulière à startCacheCleanupLoop. Grâce à la magie d'amphp, invalider le cache est aussi simple que possible. Le rappel commence à l'intervalle spécifié, effectue une boucle sur toutes les valeurs et examine le champ ts, qui stocke l'horodatage du dernier appel à cet élément. Si l'appel remonte à plus de 5 minutes (configurable dans les paramètres), alors l'élément est supprimé. Il est très facile d'implémenter un analogique ttl depuis redis ou memcache en utilisant amphp. Tout cela se passe en arrière-plan et ne bloque pas le thread principal.
Avec l'aide du cache et de l'asynchronie, non seulement les lectures sont accélérées, mais aussi les écritures.
Voici le code source de la méthode qui écrit les données dans la base de données.
/**
* Set value for an offset.
*
* @link https://php.net/manual/en/arrayiterator.offsetset.php
*
* @param string $index <p>
* The index to set for.
* </p>
* @param $value
*
* @throws \Throwable
*/
public function offsetSet($index, $value): Promise
{
if ($this->getCache($index) === $value) {
return call(fn () =>null);
}
$this->setCache($index, $value);
$request = $this->request(
"
INSERT INTO `{$this->table}`
SET `key` = :index, `value` = :value
ON DUPLICATE KEY UPDATE `value` = :value
",
[
'index' => $index,
'value' => \serialize($value),
]
);
//Ensure that cache is synced with latest insert in case of concurrent requests.
$request->onResolve(fn () => $this->setCache($index, $value));
return $request;
}
$ this-> request crée une Promise qui écrit les données de manière asynchrone. Et les opérations avec le cache se produisent de manière synchrone. Autrement dit, vous ne pouvez pas attendre une écriture dans la base de données et en même temps être sûr que les opérations de lecture commenceront immédiatement à renvoyer de nouvelles données.
La méthode onResolve d'amphp s'est avérée très utile. Une fois l'insertion terminée, les données seront à nouveau écrites dans le cache. Si une opération d'écriture est en retard et que le cache et la base commencent à différer, le cache sera mis à jour avec la valeur écrite en dernier dans la base. Ceux. notre cache redeviendra cohérent avec la base.
La source
→ Lien vers la demande d'extraction
Et juste comme ça, un autre utilisateur a ajouté le support de postgre. Il n'a fallu que 5 minutes pour rédiger les instructions .
La quantité de code pourrait être réduite en déplaçant les méthodes dupliquées dans la classe abstraite générale SqlArray.
Encore une chose
Il a été remarqué que lors du téléchargement de fichiers multimédias depuis un télégramme, le ramasse-miettes php standard ne gère pas le travail et des morceaux du fichier restent en mémoire. En règle générale, les fuites avaient la même taille que le fichier. Cause possible: le ramasse-miettes est automatiquement déclenché lorsque 10 000 liens s'accumulent. Dans notre cas, il y avait peu de liens (des dizaines), mais chacun pouvait faire référence à des mégaoctets de données en mémoire. C'était très paresseux d'étudier des milliers de lignes de code avec l'implémentation mtproto. Pourquoi ne pas essayer la béquille élégante avec \ gc_collect_cycles (); d'abord?
Étonnamment, cela a résolu le problème. Cela signifie qu'il suffit de configurer le démarrage périodique du nettoyage. Heureusement, amphp fournit des outils simples pour l'exécution en arrière-plan à des intervalles spécifiés.
Effacer la mémoire à chaque seconde semblait trop facile et peu efficace. J'ai opté pour un algorithme qui vérifie le gain de mémoire depuis le dernier nettoyage. L'effacement se produit si le gain est supérieur au seuil.
<?php
namespace danog\MadelineProto\MTProtoTools;
use Amp\Loop;
use danog\MadelineProto\Logger;
class GarbageCollector
{
/**
* Ensure only one instance of GarbageCollector
* when multiple instances of MadelineProto running.
* @var bool
*/
public static bool $lock = false;
/**
* How often will check memory.
* @var int
*/
public static int $checkIntervalMs = 1000;
/**
* Next cleanup will be triggered when memory consumption will increase by this amount.
* @var int
*/
public static int $memoryDiffMb = 1;
/**
* Memory consumption after last cleanup.
* @var int
*/
private static int $memoryConsumption = 0;
public static function start(): void
{
if (static::$lock) {
return;
}
static::$lock = true;
Loop::repeat(static::$checkIntervalMs, static function () {
$currentMemory = static::getMemoryConsumption();
if ($currentMemory > static::$memoryConsumption + static::$memoryDiffMb) {
\gc_collect_cycles();
static::$memoryConsumption = static::getMemoryConsumption();
$cleanedMemory = $currentMemory - static::$memoryConsumption;
Logger::log("gc_collect_cycles done. Cleaned memory: $cleanedMemory Mb", Logger::VERBOSE);
}
});
}
private static function getMemoryConsumption(): int
{
$memory = \round(\memory_get_usage()/1024/1024, 1);
Logger::log("Memory consumption: $memory Mb", Logger::ULTRA_VERBOSE);
return (int) $memory;
}
}