Nous allons vous présenter les nouvelles fonctionnalités de KubernetesExecutor 2.0. Alerte spoil !!! Le processus est plus rapide, plus flexible et plus facile à comprendre.
Avec Airflow 2.0, nous sommes heureux de présenter un KubernetesExecutor entièrement repensé. Cette nouvelle architecture est plus rapide, plus flexible et plus facile à comprendre que KubernetesExecutor 1.10. Dans un premier temps, nous aimerions vous présenter les nouvelles fonctionnalités de KubernetesExecutor 2.0!
Qu'est-ce que KubernetesExecutor?
En 2018, nous avons introduit KubernetesExecutor basé sur les idées d'autoscaling et de flexibilité. Airflow n'avait pas encore de concept clair pour l'autoscaling des Celery Workers (bien que notre récent travail avec KEDA à cet égard ait été très réussi), nous voulions donc créer un système qui pourrait répondre aux besoins de l'utilisateur. À la suite de cette recherche, un système a été créé qui utilise l'API Kubernetes pour exécuter une tâche pod par flux d'air. Un effet secondaire précieux de ce système basé sur l'API Kubernetes est qu'il a permis aux utilisateurs d'ajouter des modules complémentaires et des contraintes uniques pour chaque tâche.
À l'aide de l'API Kubernetes et de KubernetesExecutor, les utilisateurs d'Airflow peuvent déterminer que certaines tâches ont accès à certains secrets ou qu'une tâche ne peut être effectuée que sur un nœud existant dans l'Union européenne (ce qui peut être utile pour la gestion des données). Les utilisateurs peuvent également spécifier le nombre de ressources consommées par une tâche, ce qui peut varier considérablement en fonction de ce que fait la tâche (par exemple, l'accès aux GPU est nécessaire pour exécuter un script TensorFlow). Avec cette API, KubernetesExecutor permet aux ingénieurs de données d'avoir un contrôle beaucoup plus fin sur la façon dont Airflow effectue ses tâches qu'ils ne le feraient simplement utiliser les files d'attente Celery existantes.
, KubernetesExecutor . pod , , Celery ( , ). , CeleryExecutor , . , CeleryExecutor, KubernetesExecutor Airflow, Airflow 2.0 , CeleryKubernetesExecutor, !
KubernetesExecutor
podtemplate
Airflow 1.10.12 pod_template_file
. Kubernetes KubernetesExecutor. , Airflow API Kubernetes .
pod_template_files
Airflow. pod_template_file
, , , CeleryExecutor .
pod pod_template_files
, 2.0 , , pod Kubernetes, . pod , Celery. — KubernetesExecutor.
Execitor_config
Airflow 2.0 executor_config
, . , Python , API Kubernetes. executor_config
podOverride
. , .
, executeor_config
- Airflow 2.0, . , .
podmutationhook
1.10.12, pod_mutation_hook
Kubernetes V1Pod Airflow pod Kubernetes API , Airflow pod. pod, KubernetesExecutor, pod, KubernetesPodOperator.
KubernetesExecutor. , pod_template_file
pod, Kubernetes pod_override
pod_mutation_hook
pod. , .
, KubernetesExecutor.
, , , . Pod , . .
.
. pod, . V1pod, .
Airflow DevOps, .
, DAG, , executor_config
podOverride. , Kubernetes DAG, , KubernetesPodOperator . KubernetesPodOperator Docker , . , executeor_config
, Kubernetes API podOverride , , , , . . , .
, , , , Python pod, . executeor_config
podOverride , PythonOperator API TaskFlow. DAG :
from airflow.decorators import dag, task from datetime import datetime import os import json import requests from kubernetes.client import models as k8s new_config ={ "pod_override": k8s.V1Pod( metadata=k8s.V1ObjectMeta(labels={"purpose": "pod-override-example"}), spec=k8s.V1PodSpec( containers=[ k8s.V1Container( name="base", env=[ k8s.V1EnvVar(name="STATE", value="wa") ], ) ] ) ) } default_args = { 'start_date': datetime(2021, 1, 1) } @dag('k8s_executor_example', schedule_interval='@daily', default_args=default_args, catchup=False) def taskflow(): @task(executor_config=new_config) def get_testing_increase(): """ Gets totalTestResultsIncrease field from Covid API for given state and returns value """ url = 'https://covidtracking.com/api/v1/states/' res = requests.get(url+'{0}/current.json'.format(os.environ['STATE'])) return{'testing_increase': json.loads(res.text)['totalTestResultsIncrease']} get_testing_increase() dag = taskflow()
new_config
, pod Kubernetes API. DAG , API Covid . , podOverride. Airflow Kubernetes.
KubernetesExecutor
KubernetesExecutor, . , — .
YAML. DAG, DAG git DAG Kubernetes Volume.
, airflow.cfg YAML . YAML .
La meilleure partie de ces trois nouvelles fonctionnalités est qu'elles sont toutes disponibles dans Airflow 1.10.13. Vous pouvez démarrer immédiatement le processus de migration et profiter des avantages et de l'accélération de cette conception plus simple. Nous attendons vos commentaires avec impatience et n'hésitez pas à nous contacter pour toute question, demande de fonctionnalité ou documentation!