4

Apache Airflow:工作流程管理控制台

 2 years ago
source link: https://tech.hahow.in/apache-airflow-%E5%B7%A5%E4%BD%9C%E6%B5%81%E7%A8%8B%E7%AE%A1%E7%90%86%E6%8E%A7%E5%88%B6%E5%8F%B0-4dc8e6fc1a6a
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Apache Airflow:工作流程管理控制台

如果你對以下議題有興趣,歡迎繼續看下去
👉 Airflow 的核心概念和主要架構
👉 如何運行 Airflow
👉 實戰:使用 Airflow 設計 Facebook 廣告資料下載的工作流程

上一篇「淺談 Cloud Dataflow & Apache Beam 處理資料流」介紹了 Hahow 處理資料的工作流程 1.0,搭配 Cloud Dataflow 處理資料管道,快速的去做 PoC 驗證需求。

隨著時間推進,Data Team 陸續收集和統整了內部夥伴的需求,覺得是時候引入完整性、擴充性更好的方案來應對接下來的各種需求,希望新的方案可以做到:

  • 支援 backfilling:陸續有每日更新數據的需求出現,需支援針對特定區間做資料回補
  • 支援單個任務重跑:一個工作流程會包含很多不同的任務,有些流程如果失敗了不需要整個重啟,只需執行特定任務即可,這樣也比較好測試
  • 任務可以重用也可以組合成不同的工作流程:1.0 僅支援直線式的流程,希望可以讓單個任務被不同的流程使用,做到更好的模組化來重用
0*QdDHMk3sRbj10Lvy.png?q=20
apache-airflow-%E5%B7%A5%E4%BD%9C%E6%B5%81%E7%A8%8B%E7%AE%A1%E7%90%86%E6%8E%A7%E5%88%B6%E5%8F%B0-4dc8e6fc1a6a
有兩個任務分別要等 [A, B] 或 [B, C] 完成才會執行
  • (非必要)有更直觀的介面可以操作、監控執行狀態和 logs 等資訊:1.0 都在 command line 操作,如果有 GUI 介面會更好

盤點了需要達到的目標以後,腦海中就馬上浮現在 1.0 前看過相關文件,但在需求驗證期間下沒選擇使用的 Apache Airflow。

導入工作流程管理平台:Airflow

1*FJsMPN5kPMI7JuqhsaP7rA.png?q=20
apache-airflow-%E5%B7%A5%E4%BD%9C%E6%B5%81%E7%A8%8B%E7%AE%A1%E7%90%86%E6%8E%A7%E5%88%B6%E5%8F%B0-4dc8e6fc1a6a
Source: Wikipedia

Apache Airflow(以下簡稱 Airflow)是一個由 Airbnb 開源的工作流程管理平台,於 2016 年成為 Apache Incubator 的項目。

Airflow 讓開發者可以以 configuration as code(Python)的方式建構 ETL ,排程處理資料,且搭配 WebUI 控制排程、監控狀態等,以下會陸續介紹幾個 Airflow 的核心概念。

DAG

在 Airflow 每個工作流程被定義為一個 Directed Acyclic Graph(以下簡稱為 DAG),每個 DAG 包含了各種工作的集合,而 DAG 定義了工作和工作之間的關係、執行順序等。

Operators

DAG 定義的是整個工作流程如何運作,而 Operators 則定義了工作流程裏每個工作的工作內容,比如說 Hahow 比較常用的有:

  • BashOperator: 執行 bash 指令
  • PythonOperator: 執行 Python function
  • GoogleCloudStorageToBigQueryOperator: 從 Google Cloud Storage 匯入資料到 Big Query
  • BigQueryOperator: 執行 Big Query 的 SQL 指令
# 輸出 hello 的工作內容
hello = BashOperator(
task_id='hello',
bash_command='echo hello',
)

如果 Airflow 提供的 Operators 沒辦法滿足你的需求,可以透過繼承自定義 Operator,操作 Hooks 來達成目的。Hooks 可以看作是連結外部服務或資料庫的介面。

此外,Operator 的部分參數是可以透過 Jinja 語法傳遞的,官方有一些預設的變數可以使用,也可以傳遞自己設定的變數。

# 輸出執行時間的日期(YYYY-MM-DD)
today = BashOperator(
task_id='echo_today',
bash_command='echo {{ ds }}'
)

# 輸出透過 params 傳遞的自定義參數
ping = BashOperator(
task_id='pong',
bash_command='echo {{ params.pong }}',
params={
'pong': 'pong'
}
)

Tasks

定義好工作內容後,透過程式碼宣告把工作內容指派為 task(工作),這些工作實體化為 task instance 提供 DAG 去定義它們之間的關係和執行順序等,而工作執行也會有它自己的生命週期。

0*pGGIDSYCxxG2R_jt.png?q=20
apache-airflow-%E5%B7%A5%E4%BD%9C%E6%B5%81%E7%A8%8B%E7%AE%A1%E7%90%86%E6%8E%A7%E5%88%B6%E5%8F%B0-4dc8e6fc1a6a
Task Lifecycle Source: Apache Airflow

Airflow 是可以針對單一工作測試的,因此 Operator 和 Task 在功能切分上設計得越乾淨和功能單一,測試就越簡單。

XComs

XCom 是 Cross-communication 的縮寫,在 Airflow 每個工作都是獨立的實體,所產生的任何資料會隨著工作完成消失,如果想取得其他工作的資料可以透過 XCom 傳遞,但需注意 XCom 的大小是有限制的,僅適合用來傳遞 ID、檔名等可辨識的唯一值,交給其他 Operator 把完整資料捉出來。

# Source: Apache Airflow# MAX XCOM Size is 48KB# https://github.com/apache/airflow/pull/1618#discussion_r68249677[docs]MAX_XCOM_SIZE = 49344

Airflow 動起來!

了解了核心概念,定義好工作流程、工作內容、分配好工作以後,是時候讓 Airflow 開始工作了!Airflow 要可以運轉需要有幾個角色:

  • Web Server:DAG 定義好之後預設都是關閉的,可以透過 WebUI 打開排程。WebUI 會顯示 DAG 的工作狀態、工作流程、運行時間、工作執行的 logs 等資訊。
0*qH3fMCzjn66knmf-.png?q=20
apache-airflow-%E5%B7%A5%E4%BD%9C%E6%B5%81%E7%A8%8B%E7%AE%A1%E7%90%86%E6%8E%A7%E5%88%B6%E5%8F%B0-4dc8e6fc1a6a
工作運行時間的甘特圖
  • Scheduler:負責 DAG 實際的排程和分配工作給 Executor 執行
  • Metadata Database:儲存 DAG 執行的資訊、狀態,Airflow 本身的設定如用戶、連線等設定。Web Server 顯示的資訊就是從 Metadata DB 來的,而 Scheduler 也會更新這些資訊到 DB 讓 Web 和 Scheduler 可以同步。

Scheduler 排程以後,還需要有個機制決定如何分配工作,而那個機制就是 Executor。它會把工作交給 Worker 運行,不同 Executor 也有不同的優缺點,詳細比較可以參考這篇文章,比較常用的有:

  • Local Executor:在單一機器平行運行多個工作,原則上機器規格開得夠大的話初期使用應該就很足夠了,這也是 Hahow 目前還在使用的 Executor
  • Celery Executor:把工作分散到各個不同機器的 Celery Worker,需要有 message broker 作為溝通的管道,因為是分散式的所以適合做水平擴充
0*U85SYY3wFioFRnqC.png?q=20
apache-airflow-%E5%B7%A5%E4%BD%9C%E6%B5%81%E7%A8%8B%E7%AE%A1%E7%90%86%E6%8E%A7%E5%88%B6%E5%8F%B0-4dc8e6fc1a6a
Celery Executor 運作圖 Source: Medium
  • Kubernetes Executor:版本 v1.10 之後才有的,透過 Kubernetes API 為每個工作開啟一個 Pod,工作完成後就會砍掉,可以跟據每個工作分配不同的 CPU、memory、設定等,達到資源利用最大化。

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK