2

如何使用 Kubernetes 和 GIT 部署 Airflow ?

 1 year ago
source link: https://www.jdon.com/62418
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

如何使用 Kubernetes 和 GIT 部署 Airflow ?


Apache Airflow 给我留下了深刻的印象。引擎快如闪电,编写管道真的很容易。
另一个很棒的功能是它与源代码控制同步。这样,我就知道在环境中执行了哪些内容。
一个好的提示:拥有从主分支读取的暂存环境和从发布分支读取的生产环境很有用。

什么是Apache Airflow
Apache Airflow 是一个构建数据和 ML 管道的工具。这并不完全正确。您可以构建运行任务的工作流。每个任务可以是一个 shell 脚本、python 程序或其他一些执行逻辑的任务。
您可以使用 Apache Airflow 进行数据处理,但它本身并不是一个数据处理工具。相反,它确保任务按照您在一台机器或多台机器(如有必要)上指定的顺序执行。它使用一种称为非对称有向图 (DAG) 的原理。它是任务的依赖关系图。
Apache Airflow 是一个强大的工具,因为您可以很好地了解工作流中执行的任务。它足够灵活,可以在单个工作流程中处理多种类型的任务。
您不能真正在任务之间流动数据。如果您在下一个任务中需要数据,则需要将数据存储在某处。但是,Apache Airflow 可以流动元数据,因此您可以传递数据可用的位置。
现在您已经了解了 Apache Airflow 可以做什么,让我们来看看在 Kubernetes 上部署它。

在 Kubernetes 上设置 Airflow
您可以使用 Python 在本地运行 Apache Airflow。这非常适合测试工作流程。但是,您将需要更多的计算能力来用于生产场景。

要在生产环境中运行 Apache Airflow,您需要设置多个虚拟机或使用 Kubernetes 之类的工具来托管这些工具。

出于本文的目的,我将使用 Kubernetes 作为托管平台。

Apache Airflow 提供了一个 helm 图表来在 Kubernetes 上部署所需的组件。

Helm 是 Kubernetes 的包管理器。它允许您在 Kubernetes 上安装各种应用程序,而无需手动安装数十或数百个清单。

如果你以前没有使用过 Helm,你可以在他们的网站上找到一个很棒的入门指南 。

在使用 Helm 安装 Airflow 之前,您需要告诉 Helm Apache Airflow Helm 存储库。使用以下命令配置正确的 Helm 存储库:

helm repo add apache-airflow https://airflow.apache.org
helm repo update

配置 Helm 存储库后,您可以在终端中使用另一个命令安装 Airflow 组件:

helm upgrade airflow apache-airflow/airflow --namespace airflow

此命令将使用一组默认配置设置安装 Airflow。这对开发来说很好,但我们需要更多的生产。

让 Airflow 为生产做好准备
当您不指定任何设置时,您会发现密码和密钥相当弱。因此,您需要在 Airflow 安装中添加一些配置。

让我们创建一个名为的新文件values-secrets.yml并向其中添加以下设置:

webserverSecretKey: <random-string>
defaultUser.password: <your-password>
data:
  metadataConnection:
    user: postgres
    pass: <your-password>

我们在这个文件中配置了一组密钥:

  • 首先,我们配置webserverSecretKey使会话使用唯一签名进行签名。这可以防止用户在 Airflow 的不同安装之间转移会话。
  • 接下来,我们为默认用户配置密码admin。你会想在这里设置一些强大的东西。
  • 然后,我们配置数据库连接的用户名和密码。Airflow 的舵图配置了数据库服务器和气流服务器。Helm 会自动为数据库服务器和气流服务器设置正确的密码,以便它可以连接。

设置机密后,您可以使用以下命令更新 Airflow 安装:

helm upgrade airflow apache-airflow/airflow --namespace airflow \
    -f values-secrets.yml

以前,我们只指定安装的名称和安装哪个包。现在我们添加了一组值以使用我们存储在 values-secrets.yml文件中的值。

在 Kubernetes 中安装 Airflow 后,您可以使用以下命令将端口转发到 Web 服务器来访问它:

kubectl port-forward -n airflow svc/airflow-webserver 8080:8080

打开网络浏览器并使用您存储在机密文件中的凭据登录。

如果一切按预期进行,您现在应该会看到 Airflow 的用户界面。让我们配置它以从存储库中加载一些 DAG。

将 GIT 存储库连接到 Airflow
Airflow 将 DAG 存储在一个名为dags. 您需要将 DAG 文件上传到此文件夹或通过其他方式同步它们。

我们很幸运,因为 Airflow 可以选择将 DAG 文件从 GIT 同步到正确的位置,以便 Airflow 运行它们。

我们需要创建另一个名为的文件values-sync.yml并向其中添加以下内容:

dags:
    gitSync:
        repo: <repo-url>
        branch: main
        depth: 1
        enabled: true
        subPath: dags
        sshKeySecret: airflow-ssh-secret
extraSecrets:
  airflow-ssh-secret:
    data: |
      gitSshKey: '<your-key>'

您需要配置 GIT 存储库的 URL。我在示例代码中使用了 Github 存储库。

接下来,您必须告诉 Airflow DAG 文件在存储库中的位置。我已将它们存储在子文件夹中,但您可以将它们存储在根文件夹中。如果要将文件存储在根文件夹中,则需要清除subPath.

如果您使用的是私有存储库,则需要将 SSH 公钥部署到要链接的 Github 存储库,并将私钥存储在gitSshKey 设置中。确保将私钥编码为 Base64,否则同步过程无法读取它。

配置 GIT 同步的设置后,您可以使用以下命令更新安装:

helm upgrade airflow apache-airflow/airflow --namespace airflow \
    -f values-secrets.yml -f values-override.yml

安装完成更新后,您可以将您的第一个 DAG 提交到源代码管理。让我们快速浏览一下构建示例 DAG 以确保完整性。

为 Apache Airflow 构建 DAG
如果您已经在 Apache Airflow 中编写 DAG,那么接下来的部分就很无聊了。但是,如果您以前没有看过 DAG,我认为最好看一下 DAG。

Apache Airflow 有两种使用 DAG 构建工作流的方法。您可以完全使用在操作员上运行的任务来构建它。您还可以使用任务流 API。

我将展示 Taskflow API,因为它是上手最快的 API。

例如,我创建了一个非常基本的 DAG,它不会做任何具体的事情。在这里向您展示 DAG 的基本原理:

from airflow.decorators import dag, task
import pendulum


@dag(
    schedule_interval=None,
    start_date=pendulum.datetime(2021, 1, 1, tz='UTC'),
    catchup=False,
    tags=['sample']
)
def sample_etl():
    @task()
    def extract():
        pass

    @task()
    def transform():
        pass

    @task()
    def load():
        pass

    extract() >> transform() >> load()

sample_dag = sample_etl()

我们需要两件事来构建 DAG。首先,我们需要创建一个标记为的函数@dag(),它将构建任务流。每个任务都是一个标有 的函数@task()。

我们将 设置schedule_interval为None所以 DAG 不会自动执行。您可以将间隔设置为每天、每小时或自定义设置。

开始日期告诉 Airflow 应该何时首次安排 DAG。我们已将此设置为过去的日期。但是您可以将其设置为未来的日期。

我们有三个任务,提取、转换和加载。这些任务必须按顺序执行。我们可以通过将每个任务转发到下一个来告诉运行时。

请注意,我们在这里调用任务,但不会执行它们。它只是告诉调度程序何时执行哪个任务。

最后,我们调用 DAG 函数来告诉运行时我们有一个新的 DAG 要加载。

它看起来像普通的 Python,但事实并非如此。每个函数都打包到一个 Kubernetes pod 中,并在 Airflow 调度 DAG 时执行。

当您将新 DAG 提交到链接存储库时,它将在您推送更改后一分钟可用。


对代码感兴趣?你可以在这里得到它并自己尝试一下


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK