Qu'est-ce que l'asynchronie devrait être en Python

Au cours des dernières années, le mot async- clé et la sémantique de programmation asynchrone ont imprégné de nombreux langages de programmation populaires: JavaScript , Rust , C # et bien d'autres . Bien sûr, Python l'a aussi async/await, il a été introduit dans Python 3.5.



Dans cet article, je souhaite discuter des problèmes du code asynchrone, spéculer sur les alternatives et proposer une nouvelle approche pour prendre en charge à la fois les applications synchrones et asynchrones.



Couleur de fonction



Lorsque des fonctions asynchrones sont incluses dans un langage de programmation, elles se divisent essentiellement en deux. Des fonctions rouges apparaissent (ou asynchrones) et certaines fonctions restent bleues (synchrones).



Le problème principal est que les fonctions bleues ne peuvent pas appeler les fonctions rouges, mais les fonctions rouges peuvent potentiellement provoquer des fonctions bleues. En Python, par exemple, cela est partiellement vrai: les fonctions asynchrones ne peuvent appeler que des fonctions synchrones non bloquantes. Mais il est impossible de déterminer à partir de la description si la fonction est bloquante ou non. Python est un langage de script.



Cette scission conduit à la division du langage en deux sous-ensembles: synchrone et asynchrone. Python 3.5 a été publié il y a plus de cinq ans, mais asyncil n'est toujours pas aussi bien pris en charge que les capacités synchrones de Python.



Vous pouvez en savoir plus sur les couleurs de fonction dans cet excellent article .



Dupliquer le code



Différentes couleurs de fonctions signifient une duplication de code dans la pratique.



Imaginez que vous développiez un outil CLI pour récupérer la taille d'une page Web et que vous souhaitiez conserver des méthodes synchrones et asynchrones de le faire. Par exemple, cela est nécessaire si vous écrivez une bibliothèque et que vous ne savez pas comment votre code sera utilisé. Et il ne s'agit pas seulement des bibliothèques PyPI, mais aussi de nos propres bibliothèques avec une logique commune pour divers services, écrites, par exemple, en Django et aiohttp. Bien que, bien sûr, les applications indépendantes soient principalement écrites de manière synchrone ou asynchrone.



Commençons par le pseudocode synchrone:



def fetch_resource_size(url: str) -> int:
    response = client_get(url)
    return len(response.content)


À l'air cool. Regardons maintenant l'analogue asynchrone:



async def fetch_resource_size(url: str) -> int:
    response = await client_get(url)
    return len(response.content)


En général, c'est le même code, mais avec l'ajout des mots asyncet await. Et je ne l'ai pas inventé - comparez les exemples de code dans le tutoriel sur httpx:





Il y a exactement la même image.



Abstraction et composition



Il s'avère que vous devez réécrire tout le code synchrone et organiser ici et là asyncet awaitpour que le programme devienne asynchrone.



Deux principes peuvent aider à résoudre ce problème. Tout d'abord, réécrivons le pseudocode impératif en fonctionnel. Cela vous permettra de voir l'image plus clairement.



def fetch_resource_size(url: str) -> Abstraction[int]:
    return client_get(url).map(
        lambda response: len(response.content),
    )


Vous demandez quelle est cette méthode .map, que fait-elle. C'est ainsi que la composition d'abstractions complexes et de fonctions pures se produit dans un style fonctionnel. Cela vous permet de créer une nouvelle abstraction avec un nouvel état à partir d'un état existant. Supposons qu'il client_get(url)retourne initialement Abstraction[Response]et que l'appel .map(lambda response: len(response.content))convertit la réponse en instance requise Abstraction[int].



Il devient clair ce qu'il faut faire ensuite. Remarquez avec quelle facilité nous sommes passés de plusieurs étapes indépendantes à des appels de fonction séquentiels. De plus, nous avons changé le type de réponse: maintenant la fonction renvoie une abstraction.



Réécrivons le code pour travailler avec la version asynchrone:



def fetch_resource_size(url: str) -> AsyncAbstraction[int]:
    return client_get(url).map(
        lambda response: len(response.content),
    )


La seule chose qui est différente est le type de retour - AsyncAbstraction. Le reste du code est exactement le même. Vous n'avez plus besoin d'utiliser des mots clés asyncet await. awaitn'est pas du tout utilisé ( pour le plaisir de cela, tout a été commencé ), et sans cela, il n'y a aucun intérêt async.



La dernière chose à faire est de décider du client dont nous avons besoin: asynchrone ou synchrone.



def fetch_resource_size(
    client_get: Callable[[str], AbstactionType[Response]],
    url: str,
) -> AbstactionType[int]:
    return client_get(url).map(
        lambda response: len(response.content),
    )


client_getest maintenant un argument de type appelable qui prend une chaîne d'URL en entrée et renvoie un type AbstractionTypesur l'objet Response. AbstractionType- soit Abstractionou à AsyncAbstractionpartir des exemples précédents.



Lorsque nous passons Abstraction, le code s'exécute de manière synchrone, lorsque AsyncAbstraction- le même code démarre automatiquement de manière asynchrone.



IOResult et FutureResult



Heureusement, les dry-python/returnsabstractions correctes sont déjà en place .



Permettez-moi de vous présenter un outil entièrement Python sécurisé, compatible avec mypy, indépendant du framework. Il a des abstractions impressionnantes, confortables et merveilleuses qui peuvent être utilisées dans absolument n'importe quel projet.



Option synchrone



Tout d'abord, mettons les dépendances pour obtenir un exemple reproductible.



pip install returns httpx anyio


Ensuite, transformons le pseudocode en code Python fonctionnel. Commençons par l'option synchrone.



from typing import Callable
 
import httpx
 
from returns.io import IOResultE, impure_safe
 
def fetch_resource_size(
    client_get: Callable[[str], IOResultE[httpx.Response]],
    url: str,
) -> IOResultE[int]:
    return client_get(url).map(
        lambda response: len(response.content),
    )
 
print(fetch_resource_size(
    impure_safe(httpx.get),
    'https://sobolevn.me',
))
# => <IOResult: <Success: 27972>>


Il a fallu quelques changements pour obtenir un code fonctionnel:



  • L'utilisation IOResultEest un moyen fonctionnel de gérer les erreurs d'E / S synchrones (les exceptions ne fonctionnent pas toujours ). Les types basés sur Resultvous permettent de simuler des exceptions, mais avec des valeurs distinctes Failure(). Les sorties réussies sont ensuite encapsulées dans un type Success. Habituellement, personne ne se soucie des exceptions, mais nous le faisons.
  • Utilisez httpxqui peut gérer les demandes synchrones et asynchrones.
  • Utilisez une fonction impure_safepour convertir le type de retour httpx.geten abstraction IOResultE.


Option asynchrone



Essayons de faire la même chose en code asynchrone.



from typing import Callable
 
import anyio
import httpx
 
from returns.future import FutureResultE, future_safe
 
def fetch_resource_size(
    client_get: Callable[[str], FutureResultE[httpx.Response]],
    url: str,
) -> FutureResultE[int]:
    return client_get(url).map(
        lambda response: len(response.content),
    )
 
page_size = fetch_resource_size(
    future_safe(httpx.AsyncClient().get),
    'https://sobolevn.me',
)
print(page_size)
print(anyio.run(page_size.awaitable))
# => <FutureResult: <coroutine object async_map at 0x10b17c320>>
# => <IOResult: <Success: 27972>>


Vous voyez: le résultat est exactement le même, mais maintenant le code s'exécute de manière asynchrone. Cependant, sa partie principale n'a pas changé. Cependant, vous devez faire attention aux points suivants:



  • Simultané IOResultEchangé en asynchrone FutureResultE, impure_safe- activé future_safe. Il fonctionne de la même, mais renvoie l'autre abstraction: FutureResultE.
  • Utilisé à AsyncClientpartir de httpx.
  • La valeur résultante FutureResultdoit être exécutée car les fonctions rouges ne peuvent pas s'appeler elles-mêmes.
  • L'utilitaire anyioest utilisé pour montrer que cette approche fonctionne avec une bibliothèque asynchrone: asyncio, trio, curio.


Deux en un



Je vais vous montrer comment combiner les versions synchrones et asynchrones dans une API de type sécurisé.



Les types plus élevés et une classe de type pour travailler avec IO n'ont pas encore été publiés (ils apparaîtront dans la version 0.15.0), je vais donc illustrer dans l'habituel @overload:



from typing import Callable, Union, overload
 
import anyio
import httpx
 
from returns.future import FutureResultE, future_safe
from returns.io import IOResultE, impure_safe
 
@overload
def fetch_resource_size(
    client_get: Callable[[str], IOResultE[httpx.Response]],
    url: str,
) -> IOResultE[int]:
    """Sync case."""
 
@overload
def fetch_resource_size(
    client_get: Callable[[str], FutureResultE[httpx.Response]],
    url: str,
) -> FutureResultE[int]:
    """Async case."""
 
def fetch_resource_size(
    client_get: Union[
        Callable[[str], IOResultE[httpx.Response]],
        Callable[[str], FutureResultE[httpx.Response]],
    ],
    url: str,
) -> Union[IOResultE[int], FutureResultE[int]]:
    return client_get(url).map(
        lambda response: len(response.content),
    )


Nous utilisons des décorateurs pour @overloaddécrire les données d'entrée autorisées et le type de valeur de retour. Vous @overloadpouvez en savoir plus sur le décorateur dans mon autre article .



Un appel de fonction avec un client synchrone ou asynchrone ressemble à ceci:



# Sync:
print(fetch_resource_size(
    impure_safe(httpx.get),
    'https://sobolevn.me',
))
# => <IOResult: <Success: 27972>>
 
# Async:
page_size = fetch_resource_size(
    future_safe(httpx.AsyncClient().get),
    'https://sobolevn.me',
)
print(page_size)
print(anyio.run(page_size.awaitable))
# => <FutureResult: <coroutine object async_map at 0x10b17c320>>
# => <IOResult: <Success: 27972>>


Comme vous pouvez le voir, fetch_resource_sizedans la variante synchrone, il la renvoie immédiatement IOResultet l'exécute. Alors que dans la version asynchrone, une boucle d'événement est nécessaire, comme pour une coroutine classique. anyioutilisé pour afficher les résultats.



Dans mypyce code, il n'y a pas de commentaires:



» mypy async_and_sync.py
Success: no issues found in 1 source file


Voyons ce qui se passe si quelque chose ne va pas.



---lambda response: len(response.content),
+++lambda response: response.content,


mypy trouve facilement de nouvelles erreurs:



» mypy async_and_sync.py
async_and_sync.py:33: error: Argument 1 to "map" of "IOResult" has incompatible type "Callable[[Response], bytes]"; expected "Callable[[Response], int]"
async_and_sync.py:33: error: Argument 1 to "map" of "FutureResult" has incompatible type "Callable[[Response], bytes]"; expected "Callable[[Response], int]"
async_and_sync.py:33: error: Incompatible return value type (got "bytes", expected "int")


Tour de passe-passe et pas de magie: l'écriture de code asynchrone avec les abstractions correctes ne nécessite qu'une vieille composition. Mais le fait que nous ayons la même API pour différents types est vraiment génial. Par exemple, il vous permet d'abstraire du fonctionnement des requêtes HTTP: de manière synchrone ou asynchrone.



Espérons que cet exemple a montré à quel point les programmes asynchrones peuvent être géniaux. Et si vous essayez dry-python / Returns , vous trouverez beaucoup plus de choses intéressantes. Dans la nouvelle version, nous avons déjà créé les primitives nécessaires pour travailler avec les types supérieurs et toutes les interfaces nécessaires. Le code ci-dessus peut maintenant être réécrit comme ceci:



from typing import Callable, TypeVar

import anyio
import httpx

from returns.future import future_safe
from returns.interfaces.specific.ioresult import IOResultLike2
from returns.io import impure_safe
from returns.primitives.hkt import Kind2, kinded

_IOKind = TypeVar('_IOKind', bound=IOResultLike2)

@kinded
def fetch_resource_size(
    client_get: Callable[[str], Kind2[_IOKind, httpx.Response, Exception]],
    url: str,
) -> Kind2[_IOKind, int, Exception]:
    return client_get(url).map(
        lambda response: len(response.content),
    )


# Sync:
print(fetch_resource_size(
    impure_safe(httpx.get),
    'https://sobolevn.me',
))
# => <IOResult: <Success: 27972>>

# Async:
page_size = fetch_resource_size(
    future_safe(httpx.AsyncClient().get),
    'https://sobolevn.me',
)
print(page_size)
print(anyio.run(page_size.awaitable))
# => <FutureResult: <coroutine object async_map at 0x10b17c320>>
# => <IOResult: <Success: 27972>>


Voir la branche `master`, elle y fonctionne déjà.



Plus de fonctionnalités de Dry-Python



Voici quelques autres fonctionnalités utiles de dry-python dont je suis le plus fier.





from returns.curry import curry, partial
 
def example(a: int, b: str) -> float:
    ...
 
reveal_type(partial(example, 1))
# note: Revealed type is 'def (b: builtins.str) -> builtins.float'
 
reveal_type(curry(example))
# note: Revealed type is 'Overload(def (a: builtins.int) -> def (b: builtins.str) -> builtins.float, def (a: builtins.int, b: builtins.str) -> builtins.float)'


Cela vous permet d'utiliser @curry, par exemple, comme ceci:



@curry
def example(a: int, b: str) -> float:
    return float(a + len(b))
 
assert example(1, 'abc') == 4.0
assert example(1)('abc') == 4.0




À l'aide d'un plugin mypy personnalisé, vous pouvez créer des pipelines fonctionnels qui renvoient des types.



from returns.pipeline import flow
assert flow(
    [1, 2, 3],
    lambda collection: max(collection),
    lambda max_number: -max_number,
) == -3


Habituellement, dans le code typé, il est très peu pratique de travailler avec des lambdas, car leurs arguments sont toujours de type Any. L'inférence mypyrésout ce problème.



Avec son aide, nous savons maintenant de quel lambda collection: max(collection)type Callable[[List[int]], int], mais lambda max_number: -max_numbersimple Callable[[int], int]. In flowpeut transmettre n'importe quel nombre d'arguments, et ils fonctionneront très bien. Tout cela grâce au plugin.





L'abstraction ci FutureResult- dessus , dont nous avons parlé plus tôt, peut être utilisée pour passer explicitement des dépendances à des programmes asynchrones dans un style fonctionnel.



Projets pour le futur



Avant de publier enfin la version 1.0, nous avons plusieurs tâches importantes à résoudre:



  • Implémentez les types de types supérieurs ou leur émulation ( problème ).
  • Ajoutez les classes de type appropriées pour implémenter les abstractions requises ( problème ).
  • Essayez peut-être un compilateur mypyc, qui permettra potentiellement aux programmes Python annotés typés d'être compilés en binaire. Ensuite, le code avec dry-python/returnsfonctionnera plusieurs fois plus rapidement ( problème ).
  • Explorez de nouvelles façons d'écrire du code fonctionnel en Python, telles que la "do-notation" .


conclusions



Tout problème peut être résolu par la composition et l'abstraction. Dans cet article, nous avons examiné comment résoudre le problème des couleurs de fonction et écrire un code simple, lisible et flexible qui fonctionne. Et faites une vérification de type.



Essayez dry-python / Returns et rejoignez la Semaine russe de Python : lors de la conférence, le développeur principal de dry-python Pablo Aguilar organisera un atelier sur l'utilisation de dry-python pour écrire la logique métier.



All Articles