Les Apis sont décrits comme des classes, par exemple
class Categories(JsonEndpoint):
url = "http://127.0.0.1:8888/categories"
params = {"page": range(100), "language": "en"}
headers = {"User-Agent": get_user_agent}
results_key = "*.slug"
categories = Categories()
class Posts(JsonEndpoint):
url = "http://127.0.0.1:8888/categories/{category}/posts"
params = {"page": range(100), "language": "en"}
url_params = {"category": categories.iter_results()}
results_key = "posts"
async def comments(self, post):
comments = Comments(
self.session,
url_params={"category": post.url.params["category"], "id": post["id"]},
)
return [comment async for comment in comments]
posts = Posts()
Les paramètres et url_params peuvent contenir des fonctions (comme ici get_user_agent - retourne un useragent aléatoire), range, itérateurs, itérateurs attendables et asynchrones (vous pouvez donc les lier ensemble).
Les paramètres en-têtes et cookies peuvent également contenir des fonctions et à attendre.
L'api de catégorie dans l'exemple ci-dessus renvoie un tableau d'objets qui ont un slug, l'itérateur retournera exactement ceux-ci. En glissant cet itérateur dans les url_params des articles, l'itérateur parcourra récursivement toutes les catégories et toutes les pages de chacune. Il s'interrompra lorsqu'il rencontrera une erreur 404 ou une autre erreur et passera à la catégorie suivante.
Et les référentiels ont un exemple de serveur aiohttp pour ces classes afin que tout puisse être testé.
En plus d'obtenir des paramètres, vous pouvez les transmettre en tant que données ou json et définir une autre méthode.
results_key est pointé et essaiera d'extraire les clés des résultats. Par exemple, "comments. *. Text" renverra le texte de chaque commentaire du tableau à l'intérieur des commentaires.
Les résultats sont enveloppés dans un wrapper qui a des propriétés url et params. url est dérivée d'une chaîne qui a également des paramètres. Ainsi, vous pouvez savoir quels paramètres ont été utilisés pour obtenir ce résultat, comme le montre la méthode des commentaires.
Il existe également une classe Sink de base pour gérer les résultats. Par exemple, les plier dans mq ou une base de données. Il fonctionne dans des tâches distinctes et reçoit des données via asyncio.Queue.
class LoggingSink(Sink):
def transform(self, obj):
return repr(obj)
async def init(self):
from loguru import logger
self.logger = logger
async def process(self, obj):
self.logger.info(obj)
return True
sink = LoggingSink(num_tasks=1)
Un exemple de l'évier le plus simple. La méthode transform nous permet de manipuler l'objet et de renvoyer None si cela ne nous convient pas. ceux. dans les thèmes, vous pouvez également faire la validation.
Sink est un gestionnaire de contexte asynchrone qui, une fois sorti, attendra en théorie que tous les objets de la file d'attente soient traités, puis annulera ses tâches.
Et enfin, pour lier le tout, j'ai créé une classe Ouvrière. Il accepte un point de terminaison et plusieurs puits. Par exemple,
worker = Worker(endpoint=posts, sinks=[loggingsink, mongosink])
worker.run()
run exécutera asyncio.run_until_complete pour le pipeline de travail. Il a également une méthode de transformation.
Il existe également une classe WorkerGroup qui vous permet de créer plusieurs nœuds de calcul à la fois et de créer un asyncio.gather pour eux.
Le code contient un exemple de serveur qui génère des données via faker et des gestionnaires pour ses points de terminaison. Je pense que c'est le plus évident.
Tout cela est à un stade précoce de développement et jusqu'à présent, j'ai souvent changé d'API. Mais maintenant, il semble être venu à quoi il devrait ressembler. Je vais simplement fusionner les demandes et les commentaires dans mon code.