9

Creating DAG in Apache Airflow

 3 years ago
source link: https://blog.knoldus.com/dag-in-apache-airflow/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Creating DAG in Apache Airflow

Reading Time: 4 minutes

If you are reading this blog I assume you are already familiar with the Apache Airflow basics. If not, please visit “Introduction to Apache-Airflow”.
Before proceeding further let’s understand about the DAGs

What is a DAG?

DAG stands for Directed Acyclic Graph. In simple terms, it is a graph with nodes, directed edges, and no cycles.

-Nv_2_4Z_J73LCM9yyUN7yyBmdsUnBfhR509O6bJrFFMPdyknu8zwMy2DZlK9FxfZ6QJ0-lURDN7n476UAmJjea6cSHx700E8G9IZxt_GQzOBLYvvskHajjFPMuK9WibiYpFBZQf

In the above example, 1st graph is a DAG while 2nd graph is NOT a DAG, because there is a cycle (Node A →Node B→ Node C →Node A). 

Here, In Apache Airflow, “DAG” means “data pipeline”. It is authored using Python programming language. Whenever a DAG is triggered, a DAGRun is created. A DAGRun is an instance of the DAG with an execution date in Airflow.

Steps for writing a DAG file:

  • Importing Modules
  • Defining default arguments
  • Instantiating the DAG
  • Defining the tasks
  • Defining dependencies

Step 1: Importing modules

Import Python dependencies needed for the workflow.
To create a DAG in Airflow, you always have to import the DAG class i.e. from airflow import DAG.
The next import is related to the operator such as BashOperator, PythonOperator, BranchPythonOperator, etc.

Example:

from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator, BranchPythonOperator

Step 2: Defining default arguments

It defines default and DAG-specific arguments. If a dictionary of default_args is passed to a DAG, it will apply them to any of its operators. This makes it easy to apply a common parameter to many operators without having to type it many times.

Example

default_args = {
   'owner': 'airflow',
   'depends_on_past': False,
   'email': ['[email protected]'],
   'email_on_retry': False,
   'retries': 1,
   'retry_delay': timedelta(minutes=5),
   'queue': 'bash_queue',
   'pool': 'backfill',
   'priority_weight': 10,
   'end_date': datetime(2016, 1, 1),
   'wait_for_downstream': False,
   'dag': dag,
   'sla': timedelta(hours=2),
   'execution_timeout': timedelta(seconds=300),
   'on_failure_callback': some_function,
   'on_success_callback': some_other_function,
   'on_retry_callback': another_function,
   'sla_miss_callback': yet_another_function,
   'trigger_rule': 'all_success'
}

Step 3: Instantiating the DAG

Give the DAG name (should be unique), configure the schedule, and set the DAG settings

Here is a couple of options you can use for your schedule_interval. You can choose to use some preset argument or cron-like argument:

PresetMeaningCronNoneDon’t schedule, use for exclusively “externally triggered” DAGs @onceSchedule once and only once @hourlyRun once an hour at the beginning of the hour0 * * * *@dailyRun once a day at midnight0 0 * * *@weeklyRun once a week at midnight on Sunday morning0 0 * * 0@monthlyRun once a month at midnight on the first day of the month0 0 1 * *@yearlyRun once a year at midnight of January 10 0 1 1 *

Example

  • Monthly schedule:
    • schedule_interval=’@monthly
    • schedule_interval=’0 0 1 * *’
  • Hourly schedule:
    • schedule_interval=’@hourly
    • schedule_interval=’0 * * * *’

Step 4: Defining the tasks

The next step is to lay out all the tasks in the workflow.

A node in the DAG represents the task.  Tasks are arranged into DAGs, and then have upstream and downstream dependencies set between them in order to express the order they should run in.

There are three basic kinds of tasks:

  • Operators– predefined task templates that you can string together quickly to build most parts of your DAGs.
  • Sensors– a special subclass of Operators which are entirely about waiting for an external event to happen.
  • TaskFlow– decorated @task, which is a custom Python function packaged up as a Task.

Example:

task_1 = PythonOperator(
   task_id = "task_1",
   python_callable = # method_1
)

task_2 = BranchPythonOperator(
   task_id = "task_2",
   python_callable = # method_2
)

task_3 = BashOperator(
   task_id = "task_3",
   bash_command = "echo 'This is Task 3'"
)

Step 5: Defining dependencies

The key part of using Tasks is defining how they relate to each other – their dependencies, or as we say in Airflow, their upstream and downstream tasks. Firstly, you should declare your Tasks, and then you declare their dependencies second.

There are two ways of declaring dependencies – using the >> and << (bitshift) operators:

first_task >> second_task >> [third_task, fourth_task]

Or the more explicit set_downstream and set_upstream methods:

first_task.set_downstream(second_task)
third_task.set_upstream(second_task)

Example:

  • task_2 and task_3 depends on task_1
    • task_1.set_downstream([task_2, task_3])
    • task_1 >> [task_2, task_3]
    • [task_2, task_3] << task_1

The below-attached screenshot is the complete example of a DAG creation in Apache-Airflow.

cTDOkbdX6FBhvlnXzysqjqtAH2diJ5YZScrqbtYpyY0-mh2dvQXJb0tEw7wRbf-PE6-qz4kVX_svqaM6gSLuLa6eQ2tmeYtOEXia6iI-F2v51hOHi0CXafhTzHgCz1kR1i1huT68

Viewing DAG in Airflow

After running the code, when you go to the browser and write, localhost:8080.
you will see various DAGs which are already created by Airflow.
Click on your DAG

e5XhP9C6f-vXGLL2Ke98GNCU9OOeHJ8iiqY5LGNY48hytDN-fY4sdiwsIP8dn4iWuMSADzPNPFjvt29-wfmLRPE-b2Ll_6ssDQ7pEK2aPGx8InhH7TLZqE37PlNPaHX3n7OvTUN1

After clicking, you will get a detailed view of the tasks.

hEdIHWfnZ2F6oTyOIXKCRGYPvYETmz37wzcqs3LsCxzhVFx0nZDR5udM7JnYLEnUP-947VoD6jAV1WlvSSlXRJmImCxrXJtcIsfDztuBREuabqsUhDpABHEtWBL26WqQldTgEuT9

You can also check the Graph View for better visualization of Tasks and their dependencies

YeiMrEvj-x8XNil7nWcRtiaXPziHT_4O48AvtlC7MNtirUbIdYxk3YaG4KuCBYTDoYbqc8r1PhCg692P4-2lEMdi7PApq77qgeT1Mi0XidTH1EN1tIRNimNt8-C22Nqe3MLmqaWJ

Similarly, you can check other views such as Calendar, Task Duration, Gantt, Code, etc.

I hope you are now able to understand the DAG creation in Apache Airflow. Stay tuned for the next part.

Read Apache-Airflow documentation for more knowledge.

To gain more information visit Knoldus Blogs.


Recommend

  • 49

    README.md vim2hs ⦂ Vim → Haskell "Vim to Haskell": A collection of vimscripts for Haskell development. Features Written from scratch for clean and org...

  • 36
    • www.8btc.com 6 years ago
    • Cache

    DAG技术解析

    DAG(有向无环图)技术是区块链领域的技术热点之一。DAG技术相比于原来的区块+链的数据结构有更快的交易速度以及更强的可扩展性,但由于其技术门槛和开发难度较高,在DAG技术上深耕的项目并不多见。我们希望能通过对具体项目原理的解析向读...

  • 16
    • www.tuicool.com 6 years ago
    • Cache

    D3-dag

    d3-dag Often data sets are hierarchical, but are not in a tree structure, such as genetic data. In these instances d3-hierarchy may not suit your needs, which is why d3-dag (Directed...

  • 857
    • Github github.com 6 years ago
    • Cache

    GitHub - apache/airflow: Apache Airflow

    README.md Apache Airflow

  • 28
    • bhavaniravi.com 4 years ago
    • Cache

    Introduction to Apache Airflow

    An Introduction to Apache Airflow What is Airflow? Airflow is a platform created by the community to programmatically author, schedule, and monitor workflows. Machine learning is the h...

  • 25

    专题介绍 2009 年,Spark 诞生于加州大学伯克利分校的 AMP 实验室(the Algorithms, Machines and People lab),并于 2010 年开源。2013 年,Spark 捐献给阿帕奇软件基金会(Apache Software Foundation),并于 20...

  • 14

    最近在做的工作比较需要一个支持任务编排工作流的框架或者平台,这里记录下实现上的一些思路。 任务编排工作流 任务编排是什么意思呢,顾名思义就是可以把"任务"这个原子单位按照自己的方式进行编排,任务之间可能互相依赖。复杂一点的编排之后...

  • 79

    有向无环图 邻接表实现 package main import ( "fmt" "github.com/eapache/queue" ) // 邻接表 type Vertex struct { Key string Parents []*Vertex Children []*Vertex Value interface{} } type DAG struct { Vertexes...

  • 11

    Recursion Tree and DAG (Dynamic Programming/DP) slowfast slide 1 (7%) This visualization can visualize the recursion tree of a recursive algorithm.But you can also visualize the Directed Acy...

  • 6

    藏民18小时前4514

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK