Configuration d'un cluster Airflow multinƓud avec HDP Ambari et Celery pour les pipelines de donnĂ©es

Airflow est le choix idéal pour les pipelines de données, c'est-à-dire l'orchestration et la planification ETL. Il est largement utilisé et populaire pour les futurs pipelines de données. Il fournit le remplissage, la gestion des versions et le lignage par abstraction fonctionnelle.













La programmation fonctionnelle est l'avenir.







, DAG — . , . Apache Spark Apache Drill, , , , DAG! Airflow.







, :







  1. .







  2. .







  3. .







  4. .







  5. , .









— .







Haskell, Scala, Erlang Kotlin, , , , , ! . — .







ETL / Data Lake / Streaming Infrastructure , Hadoop / Spark , Hortonworks, MapR, Cloudera . . , , Apache Hadoop / Apache Spark Cluster, Airflow Cluster .







ETL , , Oozie, Luigi Airflow. Oozie XML, 2019 ! :), Luigi , Airflow Airbnb.







Luigi Airflow?







  1. Airflow , Luigi cron.







  2. Luigi .







  3. Luigi .







  4. Luigi - Cron.







  5. Luigi .







  6. Luigi , .









Airflow Luigi Scikit-learn, Numpy, Pandas, Theano . .







, Airflow Celery RabbitMQ Ambari.







, .







Airflow Hadoop Spark Cluster, Airflow Spark/Hive/Hadoop Map Reduce, .







!







airflow-ambari-mpack ( Apache Airflow Apache Ambari), FOSS Contributor https://github.com/miho120/ambari-airflow-mpack, .







:







1 4, RabbitMQ .







Airflow Celery RabbitMQ







  1. Apache MPack Airflow


a. git clone https://github.com/miho120/ambari-mpack.git
b. stop ambari server
c. install the apache mpack for airflow on ambari server
d. start ambari server
      
      





  1. Airflow Service Ambari


Ambari







http://<HOST_NAME>:8080
      
      





Ambari (Ambari UI), -> . (Actions -> Add Service)













HDP Ambari Dashboard







1 , Airflow Ambari.













Airflow Ambari







, -, . Airflow -, master , , Install Worker data-.













Ambari Master / Name Airflow







, - Airflow Airflow Name Hadoop / Spark.













, Airflow Worker Data .







, 3 (worker) data .













Airflow Ambari













Ambari UI: 3 Airflow







, , / , . + .







Airflow Ambari:







Airflow Service, Config Ambari.













Airflow Ambari







  1. Executor


executor = CeleryExecutor
      
      











Advanced airflow-core-site Executor CeleryExecutor.







  1. SQL Alchemy Connection


sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@{HOSTNAME}/airflow
      
      











SQL Alchemy Connection







SQL Alchemy postgresql, .







  1. URL-


broker_url= pyamqp://guest:guest@{RabbitMQ-HOSTNAME}:5672/
celery_result_backend = db+postgresql://airflow:airflow@{HOSTNAME}/airflow
      
      











URL- Celery result backend Airflow









dags_are_paused_at_creation = True
load_examples = False
      
      











Airflow-core-site.







, Ambari Airflow, Ambari , Service Actions -> InitDB.













Airflow Initdb Ambari







airflow. Airflow.







- - Airflow:







  1. RabbitMQ :








  1. RabbitMQ :








  1. RabbitMQ :








  1. Celery Flower


Celery Flower — - Celery. — 5555.













, 3 , «» Celery .







Celery Flower







, «Celery Flower», -, Celery, . airflow flower



, - Flower.







nohup airflow flower >> /var/local/airflow/logs/flower.logs &
      
      





Airflow Ambari HDP Hadoop / Spark Cluster.







, .







Multi-Node Airflow Cluster







, Apache Airflow, « Multi-Node Airflow Cluster HDP Ambari Celery ». , . , .







, Multi-Node Airflow Cluster.







1. LocalExecutor CeleryExecutor , .







Worker Scheduler Celery.







:







AttributeError: ‘DisabledBackend’ object has no attribute ‘_get_task_meta_for’
Apr 10 21:03:52 charlie-prod airflow_control.sh: [2019–04–10 21:03:51,962] {celery_executor.py:112} ERROR — Error syncing the celery executor, ignoring it:
Apr 10 21:03:52 charlie-prod airflow_control.sh: [2019–04–10 21:03:51,962] {celery_executor.py:113} ERROR — ‘DisabledBackend’ object has no attribute ‘_get_task_meta_for’
      
      





Airflow , , , . . Celery .







:







Celery 3.3.5 ( Airflow 1.10 ( ).







pip install --upgrade celery
3.3.5 => 4.3
      
      





2: DAG CeleryExecutor DAG - , , .







Apr 11 14:13:13 charlie-prod airflow_control.sh: return load(BytesIO(s))
Apr 11 14:13:13 charlie-prod airflow_control.sh: TypeError: Required argument ‘object’ (pos 1) not found
Apr 11 14:13:13 charlie-prod airflow_control.sh: [2019–04–11 14:13:13,847: ERROR/ForkPoolWorker-6285] Pool process <celery.concurrency.asynpool.Worker object at 0x7f3a88b7b250> error: TypeError(“Required argument ‘object’ (pos 1) not found”,)
Apr 11 14:13:13 charlie-prod airflow_control.sh: Traceback (most recent call last):
Apr 11 14:13:13 charlie-prod airflow_control.sh: File “/usr/lib64/python2.7/site-packages/billiard/pool.py”, line 289, in __call__
Apr 11 14:13:13 charlie-prod airflow_control.sh: sys.exit(self.workloop(pid=pid))
Apr 11 14:13:13 charlie-prod airflow_control.sh: File “/usr/lib64/python2.7/site-packages/billiard/pool.py”, line 347, in workloop
Apr 11 14:13:13 charlie-prod airflow_control.sh: req = wait_for_job()
Apr 11 14:13:13 charlie-prod airflow_control.sh: File “/usr/lib64/python2.7/site-packages/billiard/pool.py”, line 447, in receive
Apr 11 14:13:13 charlie-prod airflow_control.sh: ready, req = _receive(1.0)
Apr 11 14:13:13 charlie-prod airflow_control.sh: File “/usr/lib64/python2.7/site-packages/billiard/pool.py”, line 419, in _recv
Apr 11 14:13:13 charlie-prod airflow_control.sh: return True, loads(get_payload())
Apr 11 14:13:13 charlie-prod airflow_control.sh: File “/usr/lib64/python2.7/site-packages/billiard/common.py”, line 101, in pickle_loads
Apr 11 14:13:13 charlie-prod airflow_control.sh: return load(BytesIO(s))
Apr 11 14:13:13 charlie-prod airflow_control.sh: TypeError: Required argument ‘object’ (pos 1) not found
      
      





:







.







airflow : https://blog.csdn.net/u013492463/article/details/80881260







, , , , .







:







broker_url= amqp://guest:guest@{RabbitMQ-HOSTNAME}:5672/
      
      





, pyamqp , , .







amqp://



— , librabbitmq



, , py-amqp



, .







pyamqp://



librabbitmq://



, , . pyamqp: // amqp (http://github.com/celery/py-amqp)







:







broker_url= pyamqp://guest:guest@{RabbitMQ-HOSTNAME}:5672/
      
      





amqp pyamqp .







:







pip install pyamqp
      
      





3: SQL Alchemy







:







SQL alchemy connection







sql_alchemy_conn = postgresql://airflow:airflow@{HOST_NAME}:5432/airflow
      
      





:







:







sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@{HOSTNAME}/airflow
      
      





psycopg2



pip



wheel.







PostGreSQL: psycopg2









Psycopg



— PostgreSQL Python.







4: HDP 2.6.2 Ambari, Worker Installation .







- , , Celery worker , DAG .







:) .







by ‘SSLError(SSLError(“bad handshake: SysCallError(104, ‘ECONNRESET’)”,),)’: /simple/apache-airflow/
 Retrying (Retry(total=2, connect=None, read=None, redirect=None, status=None)) after connection broken by ‘SSLError(SSLError(“bad handshake: SysCallError(104, ‘ECONNRESET’)”,),)’: /simple/apache-airflow/
 Retrying (Retry(total=1, connect=None, read=None, redirect=None, status=None)) after connection broken by ‘SSLError(SSLError(“bad handshake: SysCallError(104, ‘ECONNRESET’)”,),)’: /simple/apache-airflow/
 Retrying (Retry(total=0, connect=None, read=None, redirect=None, status=None)) after connection broken by ‘SSLError(SSLError(“bad handshake: SysCallError(104, ‘ECONNRESET’)”,),)’: /simple/apache-airflow/
 Could not fetch URL https://pypi.org/simple/apache-airflow/: There was a problem confirming the ssl certificate: HTTPSConnectionPool(host=’pypi.org’, port=443): Max retries exceeded with url: /simple/apache-airflow/ (Caused by SSLError(SSLError(“bad handshake: SysCallError(104, ‘ECONNRESET’)”,),)) — skipping
      
      





:







, pip wheel , worker Ambari. wheel’s of pip.







, pypi wheel.







pip install --trusted-host pypi.python.org --trusted-host pypi.org --trusted-host files.pythonhosted.org --upgrade  --ignore-installed apache-airflow[celery]==1.10.0' returned 1. Collecting apache-airflow[celery]==1.10.0
      
      





, , . , .







resource_management.core.exceptions.ExecutionFailed: Execution of ‘export SLUGIFY_USES_TEXT_UNIDECODE=yes && pip install — trusted-host pypi.python.org — trusted-host pypi.org — trusted-host files.pythonhosted.org — upgrade — ignore-installed apache-airflow[celery]==1.10.0’ returned 1. Collecting apache-airflow[celery]==1.10.0
 Retrying (Retry(total=4, connect=None, read=None, redirect=None)) after connection broken by ‘ProtocolError(‘Connection aborted.’, error(104, ‘Connection reset by peer’))’: /simple/apache-airflow/
 Retrying (Retry(total=3, connect=None, read=None, redirect=None)) after connection broken by ‘ProtocolError(‘Connection aborted.’, error(104, ‘Connection reset by peer’))’: /simple/apache-airflow/
 Retrying (Retry(total=2, connect=None, read=None, redirect=None)) after connection broken by ‘ProtocolError(‘Connection aborted.’, error(104, ‘Connection reset by peer’))’: /simple/apache-airflow/
 Retrying (Retry(total=1, connect=None, read=None, redirect=None)) after connection broken by ‘ProtocolError(‘Connection aborted.’, error(104, ‘Connection reset by peer’))’: /simple/apache-airflow/
 Retrying (Retry(total=0, connect=None, read=None, redirect=None)) after connection broken by ‘ProtocolError(‘Connection aborted.’, error(104, ‘Connection reset by peer’))’: /simple/apache-airflow/
 Could not find a version that satisfies the requirement apache-airflow[celery]==1.10.0 (from versions: )
No matching distribution found for apache-airflow[celery]==1.10.0
You are using pip version 8.1.2, however version 19.0.3 is available.
You should consider upgrading via the ‘pip install — upgrade pip’ command.
      
      





pip, .







, Hack , , — , celery wheel pip, .







. , . https://github.com/miho120/ambari-airflow-mpack/blob/e1c9ca004adaa3320e35ab7baa7fdb9b9695b635/airflow-service-mpack/common-services/AIRFLOW/1.10.0/package/scripts/airflow_worker_control.py







Dans le cluster, j'ai commenté manuellement ces lignes temporairement ( plus tard rétabli aprÚs avoir installé avec succÚs le worker ) et ajouté un worker d'Ambari qui a fonctionné comme un charme :) et ce hack a fait ma journée.







AprĂšs avoir installĂ© worker sur un autre nƓud, vous devrez peut-ĂȘtre redĂ©marrer le service de flux d'air Ă  partir d'Ambari. Vous pouvez en savoir plus sur mon prĂ©cĂ©dent article de blog; Configuration d'un cluster de flux d'air multi-nƓuds avec HDP Ambari et Celery pour les pipelines de donnĂ©es








All Articles