7

Apache Airflow介绍

 3 years ago
source link: http://just4coding.com/2020/06/13/airflow/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Apache Airflow介绍

发表于

2020-06-13

很多业务场景都需要在后台定期执行任务,如数据ETL(Extract-Transform-Load)操作。简单处理可通过crontab来管理。当任务需要在多台机器上执行,或者任务之间有依赖关系时,crontab便不太能满足需求。这种场景下需要分布式任务调度系统来组织任务编排,管理任务依赖,调度任务工作流和监视任务执行状态。

比较优秀的开源解决方案有:

Azkaban和Oozie都更聚集在大数据处理平台上的任务调度,Airflow的应用场景更为通用。本文简单介绍Airflow。

Airflow使用Python开发,它通过DAGs(Directed Acyclic Graph, 有向无环图)来表达一个工作流中所要执行的任务,以及任务之间的关系和依赖。比如,如下的工作流中,任务T1执行完成,T2T3才能开始执行,T2T3都执行完成,T4才能开始执行。

1.png

Airflow使用Code as Configuration的理念,DAG由Python源码文件来定义, 如:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator

dag = DAG(
dag_id='dummy_dag',
schedule_interval=dt.timedelta(days=1),
start_date=days_ago(1)
)

t1 = DummyOperator(task_id='task1', dag=dag)
t2 = DummyOperator(task_id='task2', dag=dag)
t3 = DummyOperator(task_id='task3', dag=dag)
t4 = DummyOperator(task_id='task4', dag=dag)

t1 >> [t2, t3] >> t4

Etsy提供了一个工具,可以从YAML文件来生成DAG文件,如果不习惯使用Python来定义DAG文件,可以使用该工具。

DAG的一次执行叫做DAG run, 包括了本次工作流执行中的任务执行实例。DAG文件定义了工作流逻辑,任务本身的实现由Operator定义。Operator一般是原子任务,两个Operator之间一般不共享资源。DAG保证他们之间的执行顺序,而他们实际完全有可能在不同的机器上运行。如果两个Operator之间确实共享信息,可以使用airflow的XComs机制

Airflow提供了各种Operator实现,可以完成各种任务实现:

  • BashOperator: 执行Bash命令
  • PythonOperator: 执行Python函数
  • EmailOperator: 发送邮件
  • SimpleHttpOperator: 发送HTTP请求

一些情况下,我们需要根据执行结果执行不同的任务,这样工作流会产生分支。如:

2.png

这种需求可以使用BranchPythonOperator来实现。参考:

Airflow的整体架构主要由三部分:

  • WebServer:提供WEB服务,会定时扫描指定目录下的DAG文件
  • Scheduler: 定期检查所有DAG及其中的任务,如果发现符合执行条件,则发起相应的任务供worker接收
  • Worker: 接收任务并执行任务

Worker的具体实现由配置文件中的executor来指定,airflow支持多种Executor:

  • SequentialExecutor: 单进程顺序执行,一般只用来测试
  • LocalExecutor: 本地多进程执行
  • CeleryExecutor: 使用Celery进行分布式任务调度
  • DaskExecutor:使用Dask进行分布式任务调度
  • KubernetesExecutor: 1.10.0新增, 创建临时POD执行每次任务

生产环境一般使用CeleryExecutorKubernetesExecutor

使用CeleryExecutor的架构如图:

3.png

使用KubernetesExecutor的架构如图:

4.png

可以根据实际任务的执行环境要求来进行选择,当然kubernetes平台已经非常成熟和便捷,资源使用弹性更加具备优势。但目前KubernetesExecutor还不太稳定,简单场景可以通过使用LocalExecutor执行kubectl相关操作或者使用KubernetesPodOperator来变相实现在Kubernetes集群中运行任务的需求。

下面介绍在Kubernetes环境中使用Helm来安装Airflow.

Bitnami提供了Apache Airflow Helm Chart

它需要一个GIT仓库地址来拉取DAG文件,需要提前准备好:

export REPOSITORY_URL=https://github.com/flygoast/airflow-dag-examples

在Kubernetes集群安装:

helm install airflow bitnami/airflow \
--set airflow.cloneDagFilesFromGit.enabled=true \
--set airflow.cloneDagFilesFromGit.repository=$REPOSITORY_URL \
--set airflow.cloneDagFilesFromGit.branch=master \
--set airflow.baseUrl=http://127.0.0.1:8080

等待安装完成:

flygoast@bogon devel % kubectl get pods
NAME READY STATUS RESTARTS AGE
airflow-postgresql-0 1/1 Running 0 16m
airflow-redis-master-0 1/1 Running 0 16m
airflow-redis-slave-0 1/1 Running 0 16m
airflow-redis-slave-1 1/1 Running 0 15m
airflow-scheduler-8557d55695-wlkd9 2/2 Running 0 16m
airflow-web-766bb56d6d-8fl7l 2/2 Running 0 16m
airflow-worker-0 2/2 Running 0 16m

配置端口映射:

kubectl port-forward --namespace default svc/airflow 8080:8080

获取密码:

flygoast@bogon devel % echo Password: $(kubectl get secret --namespace default airflow -o jsonpath="{.data.airflow-password}" | base64 --decode)
Password: c9Yq6z17tk

使用user和密码登录http://127.0.0.1:8080, airflow安装完成:

5.png

当我们更新了GIT仓库中的DAG文件后,需要执行helm upgrade:

helm upgrade airflow bitnami/airflow \
--set airflow.cloneDagFilesFromGit.enabled=true \
--set airflow.cloneDagFilesFromGit.repository=$REPOSITORY_URL \
--set airflow.cloneDagFilesFromGit.branch=master \
--set airflow.baseUrl=http://127.0.0.1:8080

参考链接:


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK