40

Airflow 使用及原理分析

 4 years ago
source link: https://www.tuicool.com/articles/BrmiQvM
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

什么是 Airflow?

Airflow 是一个使用 Python 语言编写的 Data Pipeline 调度和监控工作流的平台。

Airflow 是通过 DAG(Directed acyclic graph 有向无环图)来管理任务流程的任务调度工具,不需要知道业务数据的具体内容,设置任务的依赖关系即可实现任务调度。

这个平台拥有和 Hive、Presto、MySQL、HDFS、Postgres 等数据源之间交互的能力,并且提供了钩子(hook)使其拥有很好地扩展性。

除了一个命令行界面,该工具还提供了一个基于 Web 的用户界面可以可视化管道的依赖关系、监控进度、触发任务等。

Airflow 的架构

在一个可扩展的生产环境中,Airflow 含有以下组件:

  • 元数据库:这个数据库存储有关任务状态的信息。
  • 调度器:Scheduler 是一种使用 DAG 定义结合元数据中的任务状态来决定哪些任务需要被执行以及任务执行优先级的过程。调度器通常作为服务运行。
  • 执行器:Executor 是一个消息队列进程,它被绑定到调度器中,用于确定实际执行每个任务计划的工作进程。有不同类型的执行器,每个执行器都使用一个指定工作进程的类来执行任务。例如,LocalExecutor 使用与调度器进程在同一台机器上运行的并行进程执行任务。其他像 CeleryExecutor 的执行器使用存在于独立的工作机器集群中的工作进程执行任务。
  • Workers:这些是实际执行任务逻辑的进程,由正在使用的执行器确定。

BFveyei.jpg!web

Airflow 解决哪些问题

通常,在一个运维系统,数据分析系统,或测试系统等大型系统中,我们会有各种各样的依赖需求。包括但不限于:

  • 时间依赖:任务需要等待某一个时间点触发。
  • 外部系统依赖:任务依赖外部系统需要调用接口去访问。
  • 任务间依赖:任务 A 需要在任务 B 完成后启动,两个任务互相间会产生影响。
  • 资源环境依赖:任务消耗资源非常多, 或者只能在特定的机器上执行。

Crontab 可以很好地处理定时执行任务的需求,但仅能管理时间上的依赖。

Airflow 是一种 WMS,即:它将任务以及它们的依赖看作代码,按照那些计划规范任务执行,并在实际工作进程之间分发需执行的任务。

Airflow 提供了一个用于显示当前活动任务和过去任务状态的优秀 UI,并允许用户手动管理任务的执行和状态。

Airflow 中的工作流是具有方向性依赖的任务集合。

具体说就是 Airflow 的核心概念 DAG(有向无环图)—— 来表现工作流。

dag 中的每个节点都是一个任务,dag 中的边表示的是任务之间的依赖(强制为有向无环,因此不会出现循环依赖,从而导致无限执行循环)。

Airflow 安装并运用

# 默认目录在 ~/airflow,也可以使用以下命令来指定目录

export AIRFLOW_HOME={yourpath}/airflow



pip install apache-airflow



# 配置文件中的 sql_alchemy_conn

vim airflow/airflow.cfg



# 初始化数据库

airflow initdb

定义第一个 DAG

在 $AIRFLOW_HOME 目录下新建 dags 文件夹,后面的所有 dag 文件都要存储在这个目录。

新建 dag 文件 demo.py,语句含义见注释:

from datetime import datetime, timedelta



from airflow import DAG

from airflow.utils.dates import days_ago

from airflow.utils.helpers import chain

from airflow.operators.bash_operator import BashOperator

from airflow.operators.python_operator import PythonOperator





def default_options():

default_args = {

    'owner': 'airflow',  # 拥有者名称

    'start_date': days_ago(1),  # 第一次开始执行的时间,为 UTC 时间

    'retries': 1,  # 失败重试次数

    'retry_delay': timedelta(seconds=5)  # 失败重试间隔

}

return default_args





# 定义 DAG

def test1(dag):

t = "pwd"

# Operator 支持多种类型, 这里使用 BashOperator

task = BashOperator(

    task_id='test1',  # task_id

    bash_command=t,  # 指定要执行的命令

    dag=dag  # 指定归属的 dag

)

return task





def hello_world_1():

current_time = str(datetime.today())

print('hello world at {}'.format(current_time))





def test2(dag):

# PythonOperator

task = PythonOperator(

    task_id='test2',

    python_callable=hello_world_1,  # 指定要执行的函数

    dag=dag)

return task





def test3(dag):

t = "date"

task = BashOperator(

    task_id='test3',

    bash_command=t,

    dag=dag)

return task





with DAG(

    'test_task',  # dag_id

    default_args=default_options(),  # 指定默认参数

    schedule_interval="20 8 * * *"  # 执行周期

) as d:

task1 = test1(d)

task2 = test2(d)

task3 = test3(d)

chain(task1, task2, task3)  # 指定执行顺序

写完后执行 python $AIRFLOW_HOME/dags/demo.py 检查是否有错误,如果命令行没有报错,就表示没问题。

命令行输入 airflow list_dags -sd $AIRFLOW_HOME/dags 查看生效的 dags:

-------------------------------------------------------------------

DAGS

-------------------------------------------------------------------

test_task

也可以用位位移指定任务执行顺序

可以使用位移符号:

task1 >> task2 >> task3

等价于:

task1.set_downstream(task2)

task2.set_downstream(task23)

Web UI

启动命令 airflow webserver

任务图视图:

2iIVZvA.jpg!web

任务树视图:

YJZBBvU.jpg!web

常用命令

# 测试任务,格式:airflow test dag_id task_id execution_time

airflow test test_task test1 2019-09-10



# 开始运行任务(这一步也可以在 Web 界面点 trigger 按钮)

airflow trigger_dag test_task



# 守护进程运行 webserver,默认端口为 8080,也可以通过`-p`来指定

airflow webserver -D  



# 守护进程运行调度器     

airflow scheduler -D   



# 守护进程运行调度器    

airflow worker -D          



# 暂停任务

airflow pause dag_id      



# 取消暂停,等同于在 Web 管理界面打开 off 按钮

airflow unpause dag_id     



# 查看 task 列表

airflow list_tasks dag_id  查看 task 列表



# 清空任务状态

airflow clear dag_id       



# 运行task

airflow run dag_id task_id execution_date       

Airflow 核心原理分析

概念及发展

  • JOB:最上层的工作。分为 SchedulerJob、BackfillJob 和 LocalTaskJob。SchedulerJob 由 Scheduler 创建,BackfillJob 由 Backfill 创建,LocalTaskJob 由前面两种 Job 创建。
  • DAG:有向无环图,用来表示工作流。
  • DAG Run:工作流实例,表示某个工作流的一次运行(状态)。
  • Task:任务,工作流的基本组成部分。
  • TaskInstance:任务实例,表示某个任务的一次运行(状态)。

在早期版本 Airflow 中,DAG 执行主要有两种完全独立的执行途径:SchedulerJob 和 BackfillJob。在一次较大的重构中增加了 DagRun 方式,以跟踪 DAG 的执行状态。

结构关系图:

fa2Q7nV.png!web

DagRun 执行流程描述

DagRuns 表示某个时间点 DAG 的状态(也称为 DagInstances)。

要运行 DAG 或管理 DAG 的执行,必须首先创建一个 DagRun 实例。

但是仅创建 DagRun 不足以实际运行 DAG(就像创建 TaskInstance 与实际运行任务并不一样)。

因此需要一种机制来实现上述流程。结构相当简单,维护一组要执行的 DagRuns 集合,并循环遍历该集合,直到所有 DagRuns 成功或失败为止。

基本的 DagRuns 循环如下所示:

  • 刷新 dags
  • 收集新的 DagRuns
  • 执行 DagRuns(包括更新 DagRuns 的状态为成功或失败)
  • 唤醒 executor/心跳检查

Scheduler 的调度逻辑

调度器实际上就是一个 airflow.jobs.SchedulerJob 实例 Job 持续运行 run 方法。job.run() 在开始时将自身的信息加入到 job 表中,并维护状态和心跳,预期能够正常结束,将结束时间也更新到表中。

但是实际上往往因为异常中断,导致结束时间为空。不管是如何进行的退出,SchedulerJob 退出时会关闭所有子进程。

这里主要介绍下 Scheduler 的调度逻辑:

  • 遍历dags路径下的所有 dag 文件, 启动一定数量的进程(进程池),并且给每个进程指派一个 dag 文件。
    每个 DagFileProcessor 解析分配给它的 dag 文件,并根据解析结果在 DB 中创建 DagRuns 和 TaskInstance。
  • 在 scheduler_loop 中,检查与活动 DagRun 关联的 TaskInstance 的状态,解析 TaskInstance 之间的任何依赖,标识需要被执行的 TaskInstance,然后将它们添加至 executor 队列,将新排列的 TaskInstance 状态更新为 QUEUED 状态。
  • 每个可用的 executor 从队列中取一个 TaskInstance,然后开始执行它,将此 TaskInstance 的数据库记录更新为 SCHEDULED
  • 当一个 TaskInstance 完成运行,关联的 executor 就会报告到队列并更新数据库中的 TaskInstance 的状态(例如“完成”、“失败”等)。
  • 一旦所有的dag处理完毕后,就会进行下一轮循环处理。这里还有一个细节就是上一轮的某个dag的处理时间可能很长,导致到下一轮处理的时候这个dag还没有处理完成。

Airflow 的处理逻辑是在这一轮不为这个 dag 创建进程,这样就不会阻塞进程去处理其余dag。

yMnuE3z.png!web

文档原文:

  • Enumerate the all the files in the DAG directory.
  • Start a configurable number of processes and for each one, assign a DAG file to process.
  • In each child process, parse the DAG file, create the necessary DagRuns given the state of the DAG's task instances, and for all the task instances that should run, create a TaskInstance (with the SCHEDULED state) in the ORM.
  • Back in the main scheduler process, query the ORM for task instances in the SCHEDULED state.
    If any are found, send them to the executor and set the task instance state to QUEUED .
  • If any of the child processes have finished, create another process to work on the next file in the series, provided that the number of running processes is less than the configured limit.
  • Once a process has been launched for all of the files in the DAG directory, the cycle is repeated.
    If the process to parse a particular DAG file is still running when the file's turn comes up in the next cycle, a new process is not launched and a process for the next file in the series is launched instead.
    This way, a DAG file that takes a long time to parse does not necessarily block the processing of other DAGs.

Scheduler 模块代码结构

DagFileProcessor 在子进程中解析 DAG 定义文件。对于发现的 DAG,检查 DagRun 和 TaskInstance 的状态。如果有 TaskInstance 可以运行,将状态标记为 SCHEDULED

为每个 dag 文件分配一个进程,同时在 DagFileProcessorManager 中保存有 dag 和 processor 的映射表。在 dag 没有被任何 processor 处理的时候,才会给它创建新的处理进程。

DagFileProcessorManager 控制 DagFileProcessors 如何启动。它追踪哪些文件应该被处理并且确保一旦有一个 DagFileProcessor 完成解析,下一个dag文件应该得到处理。并且控制 DagFileProcessors 的数量。

SchedulerJob 通过agent获取manager的 DAG 定义文件解析结果,并且将 SCHEDULED 状态的 TaskInstance 发送给 executor 执行。

DagFileProcessorAgent 作为一个采集代理,scheduler可以借助agent获取manager获取到的 DAG 解析结果,并且可以控制manager的行为。

核心类分析

Dag

  • following_schedule() 计算当前 dag 的下一次调度时间
  • previous_schedule() 计算当前 dag 的上一次调度时间
  • get_dagrun() 返回给定执行日期的 dagrun(如果存在)
  • create_dagrun() 创建一个包括与此 dag 相关任务的 dagrun
  • ckear() 清除指定日期范围内与当前 dag 相关的一组任务实例
  • run() 实例化为 BackfillJob 同时调用 job.run()

DagRun

ID_PREFIX = 'scheduled__'

ID_FORMAT_PREFIX = ID_PREFIX + '{0}'



id = Column(Integer, primary_key=True)

dag_id = Column(String(ID_LEN))

execution_date = Column(UtcDateTime, default=timezone.utcnow)

start_date = Column(UtcDateTime, default=timezone.utcnow)

end_date = Column(UtcDateTime)

_state = Column('state', String(50), default=State.RUNNING)

run_id = Column(String(ID_LEN))

external_trigger = Column(Boolean, default=True)

conf = Column(PickleType)
  • get_dag() 返回与当前 DagRun 相关的 Dag
  • get_task_instances() 返回与当前 DagRun 的所有 TaskInstances
  • update_state() 根据 TaskInstances 的状态确定 DagRun 的总体状态
  • get_latest_runs() 返回每个 Dag 的最新一次 DagRun

TaskInstance

__tablename__ = "task_instance"



task_id = Column(String(ID_LEN), primary_key=True)

dag_id = Column(String(ID_LEN), primary_key=True)

execution_date = Column(UtcDateTime, primary_key=True)

start_date = Column(UtcDateTime)

end_date = Column(UtcDateTime)

duration = Column(Float)

state = Column(String(20))

_try_number = Column('try_number', Integer, default=0)

max_tries = Column(Integer)

hostname = Column(String(1000))

unixname = Column(String(1000))

job_id = Column(Integer)

pool = Column(String(50), nullable=False)

queue = Column(String(256))

priority_weight = Column(Integer)

operator = Column(String(1000))

queued_dttm = Column(UtcDateTime)

pid = Column(Integer)

executor_config = Column(PickleType(pickler=dill))
  • get_dagrun() 返回当前 TaskInstance 的 DagRun
  • run() TaskInstance run
  • get_template_context() 通过 Jinja2 模板获取上下文
  • xcom_push() 创建一个 XCom 可用于 task 发送参数
  • xcom_pull() 创建一个 XCom 可用于 task 接收参数

SchedulerJob

def _execute(self):

"""

The actual scheduler loop. The main steps in the loop are:

    #. Harvest DAG parsing results through DagFileProcessorAgent

    #. Find and queue executable tasks

        #. Change task instance state in DB

        #. Queue tasks in executor

    #. Heartbeat executor

        #. Execute queued tasks in executor ake_aware(execution_date,

                                                 self.task.dag.timezone)

"""

self.processor_agent = DagFileProcessorAgent()  # 通过检查当前 processor 数量来控制进程个数

self.executor.start()



# Start after resetting orphaned tasks to avoid stressing out DB.

self.processor_agent.start()  # 在解析 dag 文件时,只会对最近修改过的文件进行解析

execute_start_time = timezone.utcnow()



# For the execute duration, parse and schedule DAGs

while (timezone.utcnow() - execute_start_time).total_seconds() < \

        self.run_duration or self.run_duration < 0:

    # Starting Loop...



    self.processor_agent.heartbeat()  # 控制 DagFileProcessor 解析 DAG 文件的速度



    # Harvesting DAG parsing results

    simple_dags = self.processor_agent.harvest_simple_dags()



    if len(simple_dags) > 0:

        self._execute_task_instances()

    ...



    # Call heartbeats

    self.executor.heartbeat()

    # heartbeat()中根据 parallelism 得出当前可用的 slots 数量,

    # 决定 execute_async 多少个 task



    # Process events from the executor

    self._process_executor_events(simple_dag_bag)



    # Ran scheduling loop for all tasks done

    ...



# Stop any processors

self.processor_agent.terminate()



# Verify that all files were processed, and if so, deactivate DAGs that

# haven't been touched by the scheduler as they likely have been

# deleted.

...



self.executor.end()
  • create_dag_run() 根据调度周期检查是否需要为 DAG 创建新的 DagRun。如果已调度,则返回 DagRun,否则返回 None
  • process_file() 解析 DAG 定义文件
  • _execute_task_instances() 尝试执行调度器调度过的 TaskInstances
    There are three steps:
    1. Pick TaskInstances by priority with the constraint that they are in the expected states
      and that we do exceed max_active_runs or pool limits.
    2. Change the state for the TaskInstances above atomically.
    3. Enqueue the TaskInstances in the executor.
  • reduce_in_chunks() 用来进行小的分批处理

总结

本文在第一部分着重介绍了 Airflow 的理念、使用场景及其一般架构。

提供了相对简单易懂的安装及操作命令,并附带了一个使用案例用来介绍代码如何编排以及 WebUI 的使用。

在第二部分开篇介绍了 Airflow 任务创建、调度和管理的一些基础概念,以及 Airflow 版本迭代的一些重要变化。

Airflow 目前还是处于快速开发中,当前版本有很多遗留问题,版本升级也不是向后兼容的,变动很大。

Scheduler 毫无疑问是整个 Airflow 的核心模块,逻辑结构复杂。

本文从 Scheduler 模块的主要逻辑入手,分析了控制循环和代码结构,重点分析了从 dag.py 代码文件到可调度执行的 TaskInstances 所经历的阶段;以及介绍了并发控制的实现和性能优化。

最后结合源码介绍了 Airflow 核心类的模型定义和主要方法,以了解各个类所扮演的角色及其实现的功能。

参考


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK