Ecrire un service web en Python avec FastAPI

image



Je sais, je sais, vous pensez probablement "quoi, encore?!"



Oui, sur Habré, ils ont déjà écrit à plusieurs reprises sur le framework FastAPI . Mais je propose de considérer cet outil un peu plus en détail et d'écrire l'API de votre propre mini Habr sans karma et sans notes, mais avec blackjack et avec des tests, authentification, migrations et travail asynchrone avec la base de données.

Schéma de base de données et migrations



Tout d'abord, en utilisant SQLAlchemy Expression Language , nous décrirons le schéma de la base de données. Créons un fichier models / users.py :



import sqlalchemy
from sqlalchemy.dialects.postgresql import UUID

metadata = sqlalchemy.MetaData()


users_table = sqlalchemy.Table(
    "users",
    metadata,
    sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
    sqlalchemy.Column("email", sqlalchemy.String(40), unique=True, index=True),
    sqlalchemy.Column("name", sqlalchemy.String(100)),
    sqlalchemy.Column("hashed_password", sqlalchemy.String()),
    sqlalchemy.Column(
        "is_active",
        sqlalchemy.Boolean(),
        server_default=sqlalchemy.sql.expression.true(),
        nullable=False,
    ),
)


tokens_table = sqlalchemy.Table(
    "tokens",
    metadata,
    sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
    sqlalchemy.Column(
        "token",
        UUID(as_uuid=False),
        server_default=sqlalchemy.text("uuid_generate_v4()"),
        unique=True,
        nullable=False,
        index=True,
    ),
    sqlalchemy.Column("expires", sqlalchemy.DateTime()),
    sqlalchemy.Column("user_id", sqlalchemy.ForeignKey("users.id")),
)


Et le fichier models / posts.py :



import sqlalchemy

from .users import users_table

metadata = sqlalchemy.MetaData()


posts_table = sqlalchemy.Table(
    "posts",
    metadata,
    sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
    sqlalchemy.Column("user_id", sqlalchemy.ForeignKey(users_table.c.id)),
    sqlalchemy.Column("created_at", sqlalchemy.DateTime()),
    sqlalchemy.Column("title", sqlalchemy.String(100)),
    sqlalchemy.Column("content", sqlalchemy.Text()),
)


Pour automatiser les migrations de bases de données, installez alembic :



$ pip install alembic


Pour initialiser Alembic, exécutez:



$ alembic init migrations


Cette commande créera dans le répertoire courant un fichier alembic.ini et un répertoire de migrations contenant:



  • le répertoire des versions dans lequel les fichiers de migration seront stockés
  • script env.py qui s'exécute lorsque l'alambic est appelé
  • un fichier script.py.mako contenant un modèle pour les nouvelles migrations.


Nous indiquerons l'url de notre base de données, pour cela, dans le fichier alembic.ini, ajoutez la ligne:



sqlalchemy.url = postgresql://%(DB_USER)s:%(DB_PASS)s@%(DB_HOST)s:5432/%(DB_NAME)s


Le format de % (variable_name) nous permet de définir différentes valeurs de variables en fonction de l'environnement, en les remplaçant dans le fichier env.py comme ceci:



from os import environ
from alembic import context
from app.models import posts, users

# Alembic Config   
#     alembic.ini
config = context.config

section = config.config_ini_section
config.set_section_option(section, "DB_USER", environ.get("DB_USER"))
config.set_section_option(section, "DB_PASS", environ.get("DB_PASS"))
config.set_section_option(section, "DB_NAME", environ.get("DB_NAME"))
config.set_section_option(section, "DB_HOST", environ.get("DB_HOST"))

fileConfig(config.config_file_name)

target_metadata = [users.metadata, posts.metadata]


Ici, nous récupérons les valeurs de DB_USER, DB_PASS, DB_NAME et DB_HOST à ​​partir des variables d'environnement. De plus, le fichier env.py spécifie les métadonnées de notre base de données dans l'attribut target_metadata , sans lequel Alembic ne pourra pas déterminer quelles modifications doivent être apportées à la base de données.



Tout est prêt et nous pouvons générer des migrations et mettre à jour la base de données:




$ alembic revision --autogenerate -m "Added required tables"
$ alembic upgrade head


Nous lançons l'application et connectons la base de données



Créons un fichier main.py :



from fastapi import FastAPI

app = FastAPI()

@app.get("/")
def read_root():
    return {"Hello": "World"}


Et lancez l'application en exécutant la commande:



$ uvicorn main:app --reload


Assurons-nous que tout fonctionne comme il se doit. Ouvrez http://127.0.0.1:8000/ dans le navigateur et consultez
{"Hello": "World"}


Pour nous connecter à la base de données, nous utiliserons le module de bases de données , qui nous permet d'exécuter des requêtes de manière asynchrone.



Nous allons configurer le démarrage et shutdhown événements de notre service, au cours de laquelle la connexion et la déconnexion de la base de données se produit. Éditons le fichier main.py :



from os import environ

import databases

#      
DB_USER = environ.get("DB_USER", "user")
DB_PASSWORD = environ.get("DB_PASSWORD", "password")
DB_HOST = environ.get("DB_HOST", "localhost")
DB_NAME = "async-blogs"
SQLALCHEMY_DATABASE_URL = (
    f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:5432/{DB_NAME}"
)
#   database,      
database = databases.Database(SQLALCHEMY_DATABASE_URL)


app = FastAPI()


@app.on_event("startup")
async def startup():
    #       
    await database.connect()


@app.on_event("shutdown")
async def shutdown():
    #       
    await database.disconnect()


@app.get("/")
async def read_root():
    #    ,      
    query = (
        select(
            [
                posts_table.c.id,
                posts_table.c.created_at,
                posts_table.c.title,
                posts_table.c.content,
                posts_table.c.user_id,
                users_table.c.name.label("user_name"),
            ]
        )
        .select_from(posts_table.join(users_table))
        .order_by(desc(posts_table.c.created_at))
    )
    return await database.fetch_all(query)


Nous ouvrons http://127.0.0.1:8000/ et si nous voyons une liste vide [] dans la réponse , alors tout s'est bien passé et nous pouvons continuer.



Validation des demandes et des réponses



Nous mettrons en œuvre la possibilité d'enregistrement des utilisateurs. Pour ce faire, nous devons valider les requêtes et réponses HTTP. Pour résoudre ce problème, nous utiliserons la bibliothèque pydantic :



pip install pydantic


Créez un fichier schemas / users.py et ajoutez un modèle chargé de valider le corps de la requête:



from pydantic import BaseModel, EmailStr

class UserCreate(BaseModel):
    """  sign-up  """
    email: EmailStr
    name: str
    password: str


Notez que les types de champ sont définis à l'aide de l'annotation de type. En plus des types de données intégrés tels que int et str , pydantic propose un grand nombre de types qui fournissent une validation supplémentaire. Par exemple, le type EmailStr vérifie que la valeur reçue est un e-mail valide. Pour utiliser le type EmailStr , vous devez installer le module email-validator :



pip install email-validator


Le corps de la réponse doit contenir ses propres champs spécifiques, par exemple id et access_token , ajoutons donc les modèles responsables de la génération de la réponse au fichier schemas / users.py :



from typing import Optional
from pydantic import UUID4, BaseModel, EmailStr, Field, validator


class UserCreate(BaseModel):
    """  sign-up  """
    email: EmailStr
    name: str
    password: str


class UserBase(BaseModel):
    """       """
    id: int
    email: EmailStr
    name: str


class TokenBase(BaseModel):
    token: UUID4 = Field(..., alias="access_token")
    expires: datetime
    token_type: Optional[str] = "bearer"

    class Config:
        allow_population_by_field_name = True

    @validator("token")
    def hexlify_token(cls, value):
        """  UUID  hex  """
        return value.hex


class User(UserBase):
    """         """
    token: TokenBase = {}


Pour chaque champ du modèle, vous pouvez écrire un validateur personnalisé . Par exemple, hexlify_token convertit la valeur UUID en une chaîne hexadécimale. Il convient de noter que vous pouvez utiliser la classe Field lorsque vous devez remplacer le comportement par défaut d'un champ de modèle. Par exemple, token: UUID4 = Field (..., alias = "access_token") définit l'alias access_token pour le champ token . Pour indiquer que le champ est obligatoire, une valeur spéciale - ... ( points de suspension ) est transmise comme premier paramètre .



Ajoutons le fichier utils / users.py , dans lequel nous allons créer les méthodes nécessaires pour écrire un utilisateur dans la base de données:



import hashlib
import random
import string
from datetime import datetime, timedelta
from sqlalchemy import and_

from app.models.database import database
from app.models.users import tokens_table, users_table
from app.schemas import users as user_schema

def get_random_string(length=12):
    """   ,    """
    return "".join(random.choice(string.ascii_letters) for _ in range(length))


def hash_password(password: str, salt: str = None):
    """     """
    if salt is None:
        salt = get_random_string()
    enc = hashlib.pbkdf2_hmac("sha256", password.encode(), salt.encode(), 100_000)
    return enc.hex()


def validate_password(password: str, hashed_password: str):
    """ ,         """
    salt, hashed = hashed_password.split("$")
    return hash_password(password, salt) == hashed


async def get_user_by_email(email: str):
    """     """
    query = users_table.select().where(users_table.c.email == email)
    return await database.fetch_one(query)


async def get_user_by_token(token: str):
    """       """
    query = tokens_table.join(users_table).select().where(
        and_(
            tokens_table.c.token == token,
            tokens_table.c.expires > datetime.now()
        )
    )
    return await database.fetch_one(query)


async def create_user_token(user_id: int):
    """       user_id """
    query = (
        tokens_table.insert()
        .values(expires=datetime.now() + timedelta(weeks=2), user_id=user_id)
        .returning(tokens_table.c.token, tokens_table.c.expires)
    )

    return await database.fetch_one(query)


async def create_user(user: user_schema.UserCreate):
    """      """
    salt = get_random_string()
    hashed_password = hash_password(user.password, salt)
    query = users_table.insert().values(
        email=user.email, name=user.name, hashed_password=f"{salt}${hashed_password}"
    )
    user_id = await database.execute(query)
    token = await create_user_token(user_id)
    token_dict = {"token": token["token"], "expires": token["expires"]}

    return {**user.dict(), "id": user_id, "is_active": True, "token": token_dict}




Créez un fichier routers / users.py et ajoutez une route d' inscription , indiquant qu'il attend un modèle CreateUser dans la demande et renvoie un modèle utilisateur :

from fastapi import APIRouter
from app.schemas import users
from app.utils import users as users_utils


router = APIRouter()


@router.post("/sign-up", response_model=users.User)
async def create_user(user: users.UserCreate):
    db_user = await users_utils.get_user_by_email(email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return await users_utils.create_user(user=user)


Il ne reste plus qu'à connecter les routes à partir du fichier routers / users.py . Pour ce faire, ajoutez les lignes suivantes à main.py :



from app.routers import users
app.include_router(users.router)


Authentification et contrôle d'accès



Maintenant que nous avons des utilisateurs dans notre base de données, nous sommes prêts à configurer l'authentification des applications. Ajoutons un point de terminaison qui prend un nom d'utilisateur et un mot de passe et renvoie un jeton. Mettons à jour le fichier routers / users.py en ajoutant:



from fastapi import Depends
from fastapi.security import OAuth2PasswordRequestForm


@router.post("/auth", response_model=users.TokenBase)
async def auth(form_data: OAuth2PasswordRequestForm = Depends()):
    user = await users_utils.get_user_by_email(email=form_data.username)

    if not user:
        raise HTTPException(status_code=400, detail="Incorrect email or password")

    if not users_utils.validate_password(
        password=form_data.password, hashed_password=user["hashed_password"]
    ):
        raise HTTPException(status_code=400, detail="Incorrect email or password")

    return await users_utils.create_user_token(user_id=user["id"])


En même temps, nous n'avons pas besoin de décrire le modèle de requête nous-mêmes, Fastapi fournit une classe de dépendance spéciale OAuth2PasswordRequestForm , qui fait que l'itinéraire attend deux champs nom d'utilisateur et mot de passe.



Pour restreindre l'accès à certaines routes pour les utilisateurs non authentifiés, nous écrirons une méthode de dépendance. Il vérifie que le jeton fourni appartient à l'utilisateur actif et renvoie les détails de l'utilisateur. Cela nous permettra d'utiliser les informations utilisateur sur toutes les routes nécessitant une authentification. Créons un fichier utils / dependecies.py :



from app.utils import users as users_utils
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer


oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth")


async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = await users_utils.get_user_by_token(token)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    if not user["is_active"]:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user"
        )
    return user


Veuillez noter qu'une dépendance peut à son tour dépendre d'une autre dépendance. Par exemple, OAuth2PasswordBearer est une dépendance qui indique clairement à FastAPI que l'itinéraire actuel nécessite une authentification.



Pour vérifier que tout fonctionne comme prévu, ajoutez la route / users / me , qui renvoie les détails de l'utilisateur actuel. Ajoutez les lignes à routers / users.py :



from app.utils.dependencies import get_current_user


@router.get("/users/me", response_model=users.UserBase)
async def read_users_me(current_user: users.User = Depends(get_current_user)):
    return current_user


Nous avons maintenant la route / users / me , à laquelle seuls les utilisateurs authentifiés ont accès.



Tout est prêt pour enfin ajouter la possibilité pour les utilisateurs de créer et d'éditer des publications:



utils / posts.py
from datetime import datetime

from app.models.database import database
from app.models.posts import posts_table
from app.models.users import users_table
from app.schemas import posts as post_schema
from sqlalchemy import desc, func, select


async def create_post(post: post_schema.PostModel, user):
    query = (
        posts_table.insert()
        .values(
            title=post.title,
            content=post.content,
            created_at=datetime.now(),
            user_id=user["id"],
        )
        .returning(
            posts_table.c.id,
            posts_table.c.title,
            posts_table.c.content,
            posts_table.c.created_at,
        )
    )
    post = await database.fetch_one(query)

    # Convert to dict and add user_name key to it
    post = dict(zip(post, post.values()))
    post["user_name"] = user["name"]
    return post


async def get_post(post_id: int):
    query = (
        select(
            [
                posts_table.c.id,
                posts_table.c.created_at,
                posts_table.c.title,
                posts_table.c.content,
                posts_table.c.user_id,
                users_table.c.name.label("user_name"),
            ]
        )
        .select_from(posts_table.join(users_table))
        .where(posts_table.c.id == post_id)
    )
    return await database.fetch_one(query)


async def get_posts(page: int):
    max_per_page = 10
    offset1 = (page - 1) * max_per_page
    query = (
        select(
            [
                posts_table.c.id,
                posts_table.c.created_at,
                posts_table.c.title,
                posts_table.c.content,
                posts_table.c.user_id,
                users_table.c.name.label("user_name"),
            ]
        )
        .select_from(posts_table.join(users_table))
        .order_by(desc(posts_table.c.created_at))
        .limit(max_per_page)
        .offset(offset1)
    )
    return await database.fetch_all(query)


async def get_posts_count():
    query = select([func.count()]).select_from(posts_table)
    return await database.fetch_val(query)


async def update_post(post_id: int, post: post_schema.PostModel):
    query = (
        posts_table.update()
        .where(posts_table.c.id == post_id)
        .values(title=post.title, content=post.content)
    )
    return await database.execute(query)





routeurs / posts.py
from app.schemas.posts import PostDetailsModel, PostModel
from app.schemas.users import User
from app.utils import posts as post_utils
from app.utils.dependencies import get_current_user
from fastapi import APIRouter, Depends, HTTPException, status

router = APIRouter()


@router.post("/posts", response_model=PostDetailsModel, status_code=201)
async def create_post(post: PostModel, current_user: User = Depends(get_current_user)):
    post = await post_utils.create_post(post, current_user)
    return post


@router.get("/posts")
async def get_posts(page: int = 1):
    total_cout = await post_utils.get_posts_count()
    posts = await post_utils.get_posts(page)
    return {"total_count": total_cout, "results": posts}


@router.get("/posts/{post_id}", response_model=PostDetailsModel)
async def get_post(post_id: int):
    return await post_utils.get_post(post_id)


@router.put("/posts/{post_id}", response_model=PostDetailsModel)
async def update_post(
    post_id: int, post_data: PostModel, current_user=Depends(get_current_user)
):
    post = await post_utils.get_post(post_id)
    if post["user_id"] != current_user["id"]:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="You don't have access to modify this post",
        )

    await post_utils.update_post(post_id=post_id, post=post_data)
    return await post_utils.get_post(post_id)





Connectons de nouvelles routes en ajoutant à main.py

from app.routers import posts
app.include_router(posts.router)


Essai



Nous écrirons des tests dans pytest :



$ pip install pytest


Pour tester les points de terminaison, FastAPI fournit un outil spécial TestClient .



Écrivons un test de point de terminaison qui ne nécessite pas de connexion à la base de données:



from app.main import app
from fastapi.testclient import TestClient

client = TestClient(app)


def test_health_check():
    response = client.get("/")
    assert response.status_code == 200
    assert response.json() == {"Hello": "World"}


Comme vous pouvez le voir, tout est assez simple. Vous devez initialiser TestClient et l'utiliser pour tester les requêtes HTTP.



Pour tester le reste des points de terminaison, vous devez créer une base de données de test. Éditons le fichier main.py , en y ajoutant la configuration de base de test:



from os import environ

import databases

DB_USER = environ.get("DB_USER", "user")
DB_PASSWORD = environ.get("DB_PASSWORD", "password")
DB_HOST = environ.get("DB_HOST", "localhost")

TESTING = environ.get("TESTING")

if TESTING:
    #      
    DB_NAME = "async-blogs-temp-for-test"
    TEST_SQLALCHEMY_DATABASE_URL = (
        f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:5432/{DB_NAME}"
    )
    database = databases.Database(TEST_SQLALCHEMY_DATABASE_URL)
else:
    DB_NAME = "async-blogs"
    SQLALCHEMY_DATABASE_URL = (
        f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:5432/{DB_NAME}"
    )
    database = databases.Database(SQLALCHEMY_DATABASE_URL)


Nous utilisons toujours la base de données "async-blogs" pour notre application. Mais si la valeur de la variable d'environnement TESTING est définie, la base de données "async-blogs-temp-for-test" est utilisée .



Pour créer automatiquement la base de données "async-blogs-temp-for-test" lors de l'exécution des tests et la supprimer après les avoir exécutés, créez un fixture dans le fichier tests / conftest.py :



import os

import pytest

#  `os.environ`,    
os.environ['TESTING'] = 'True'

from alembic import command
from alembic.config import Config
from app.models import database
from sqlalchemy_utils import create_database, drop_database


@pytest.fixture(scope="module")
def temp_db():
    create_database(database.TEST_SQLALCHEMY_DATABASE_URL) #  
    base_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
    alembic_cfg = Config(os.path.join(base_dir, "alembic.ini")) #   alembic 
    command.upgrade(alembic_cfg, "head") #  

    try:
        yield database.TEST_SQLALCHEMY_DATABASE_URL
    finally:
        drop_database(database.TEST_SQLALCHEMY_DATABASE_URL) #  


Pour créer et supprimer la base de données, nous utiliserons la bibliothèque sqlalchemy_utils .



En utilisant le fixture temp_db dans nos tests, nous pouvons tester tous les points de terminaison de notre application:



def test_sign_up(temp_db):
    request_data = {
        "email": "vader@deathstar.com",
        "name": "Darth Vader",
        "password": "rainbow"
    }
    with TestClient(app) as client:
        response = client.post("/sign-up", json=request_data)
    assert response.status_code == 200
    assert response.json()["id"] == 1
    assert response.json()["email"] == "vader@deathstar.com"
    assert response.json()["name"] == "Darth"
    assert response.json()["token"]["expires"] is not None
    assert response.json()["token"]["access_token"] is not None


tests / test_posts.py
import asyncio

from app.main import app
from app.schemas.users import UserCreate
from app.utils.users import create_user, create_user_token
from fastapi.testclient import TestClient


def test_create_post(temp_db):
    user = UserCreate(
        email="vader@deathstar.com",
        name="Darth",
        password="rainbow"
    )
    request_data = {
      "title": "42",
      "content": "Don't panic!"
    }
    with TestClient(app) as client:
        # Create user and use his token to add new post
        loop = asyncio.get_event_loop()
        user_db = loop.run_until_complete(create_user(user))
        response = client.post(
            "/posts",
            json=request_data,
            headers={"Authorization": f"Bearer {user_db['token']['token']}"}
        )
    assert response.status_code == 201
    assert response.json()["id"] == 1
    assert response.json()["title"] == "42"
    assert response.json()["content"] == "Don't panic!"


def test_create_post_forbidden_without_token(temp_db):
    request_data = {
      "title": "42",
      "content": "Don't panic!"
    }
    with TestClient(app) as client:
        response = client.post("/posts", json=request_data)
    assert response.status_code == 401


def test_posts_list(temp_db):
    with TestClient(app) as client:
        response = client.get("/posts")
    assert response.status_code == 200
    assert response.json()["total_count"] == 1
    assert response.json()["results"][0]["id"] == 1
    assert response.json()["results"][0]["title"] == "42"
    assert response.json()["results"][0]["content"] == "Don't panic!"


def test_post_detail(temp_db):
    post_id = 1
    with TestClient(app) as client:
        response = client.get(f"/posts/{post_id}")
    assert response.status_code == 200
    assert response.json()["id"] == 1
    assert response.json()["title"] == "42"
    assert response.json()["content"] == "Don't panic!"


def test_update_post(temp_db):
    post_id = 1
    request_data = {
      "title": "42",
      "content": "Life? Don't talk to me about life."
    }
    with TestClient(app) as client:
        # Create user token to add new post
        loop = asyncio.get_event_loop()
        token = loop.run_until_complete(create_user_token(user_id=1))
        response = client.put(
            f"/posts/{post_id}",
            json=request_data,
            headers={"Authorization": f"Bearer {token['token']}"}
        )
    assert response.status_code == 200
    assert response.json()["id"] == 1
    assert response.json()["title"] == "42"
    assert response.json()["content"] == "Life? Don't talk to me about life."


def test_update_post_forbidden_without_token(temp_db):
    post_id = 1
    request_data = {
      "title": "42",
      "content": "Life? Don't talk to me about life."
    }
    with TestClient(app) as client:
        response = client.put(f"/posts/{post_id}", json=request_data)
    assert response.status_code == 401




tests / test_users.py
import asyncio
import pytest

from app.main import app
from app.schemas.users import UserCreate
from app.utils.users import create_user, create_user_token
from fastapi.testclient import TestClient


def test_sign_up(temp_db):
    request_data = {
        "email": "vader@deathstar.com",
        "name": "Darth",
        "password": "rainbow"
    }
    with TestClient(app) as client:
        response = client.post("/sign-up", json=request_data)
    assert response.status_code == 200
    assert response.json()["id"] == 1
    assert response.json()["email"] == "vader@deathstar.com"
    assert response.json()["name"] == "Darth"
    assert response.json()["token"]["expires"] is not None
    assert response.json()["token"]["token"] is not None


def test_login(temp_db):
    request_data = {"username": "vader@deathstar.com", "password": "rainbow"}
    with TestClient(app) as client:
        response = client.post("/auth", data=request_data)
    assert response.status_code == 200
    assert response.json()["token_type"] == "bearer"
    assert response.json()["expires"] is not None
    assert response.json()["access_token"] is not None


def test_login_with_invalid_password(temp_db):
    request_data = {"username": "vader@deathstar.com", "password": "unicorn"}
    with TestClient(app) as client:
        response = client.post("/auth", data=request_data)
    assert response.status_code == 400
    assert response.json()["detail"] == "Incorrect email or password"


def test_user_detail(temp_db):
    with TestClient(app) as client:
        # Create user token to see user info
        loop = asyncio.get_event_loop()
        token = loop.run_until_complete(create_user_token(user_id=1))
        response = client.get(
            "/users/me",
            headers={"Authorization": f"Bearer {token['token']}"}
        )
    assert response.status_code == 200
    assert response.json()["id"] == 1
    assert response.json()["email"] == "vader@deathstar.com"
    assert response.json()["name"] == "Darth"


def test_user_detail_forbidden_without_token(temp_db):
    with TestClient(app) as client:
        response = client.get("/users/me")
    assert response.status_code == 401


@pytest.mark.freeze_time("2015-10-21")
def test_user_detail_forbidden_with_expired_token(temp_db, freezer):
    user = UserCreate(
        email="sidious@deathstar.com",
        name="Palpatine",
        password="unicorn"
    )
    with TestClient(app) as client:
        # Create user and use expired token
        loop = asyncio.get_event_loop()
        user_db = loop.run_until_complete(create_user(user))
        freezer.move_to("'2015-11-10'")
        response = client.get(
            "/users/me",
            headers={"Authorization": f"Bearer {user_db['token']['token']}"}
        )
    assert response.status_code == 401




Sources PS



C'est tout, le référentiel source de la publication peut être consulté sur GitHub .



All Articles