7

用Python操作Kubernetes的Job

 3 years ago
source link: https://note.qidong.name/2020/08/python-k8s-job/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

用Python操作Kubernetes的Job

2020-08-14 21:22:39 +08  字数:2554  标签: Python K8s

关于Kubernetes的Python SDK,几乎只有官方项目的examples。 关于Job的基本增删改查操作,可以参考job_crud.py。 但是,这只是基本用法,缺乏一些实用细节。

本文给出Python SDK操作Kubernetes Job的更多示例代码,以及相关解释。

pip install kubernetes

初始化

from kubernetes.client import BatchV1Api
from kubernetes.config import load_kube_config

load_kube_config()
batch = BatchV1Api()

load_kube_config是从默认位置,也就是~/.kube/config加载配置。 如果在其它位置,可以通过第一个参数传入其路径。

BatchV1Api()可以当做Job的客户端来用。 命名上,Batch和Job是类似的概念,前者强调批量。

创建Job

以下来自官方样例job_crud.py

def create_job_object():
    # Configureate Pod template container
    container = client.V1Container(
        name="pi",
        image="perl",
        command=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"])
    # Create and configurate a spec section
    template = client.V1PodTemplateSpec(
        metadata=client.V1ObjectMeta(labels={"app": "pi"}),
        spec=client.V1PodSpec(restart_policy="Never", containers=[container]))
    # Create the specification of deployment
    spec = client.V1JobSpec(
        template=template,
        backoff_limit=4)
    # Instantiate the job object
    job = client.V1Job(
        api_version="batch/v1",
        kind="Job",
        metadata=client.V1ObjectMeta(name=JOB_NAME),
        spec=spec)

    return job


def create_job(api_instance, job):
    api_response = api_instance.create_namespaced_job(
        body=job,
        namespace="default")
    print("Job created. status='%s'" % str(api_response.status))

虽然,根据官方教程这样的写法,也能得到可用的V1Job,拿去执行创建操作。 但还是过于陌生和偏门,不如主流、常见的YAML方便、易读写。

这里该出两种更方便的做法。

直接使用YAML

---
apiVersion: batch/v1
kind: Job
metadata:
  name: hello
spec:
  template:
    spec:
      containers:
        - name: echo
          image: alpine:3.11
          args:
            - 'echo'
            - 'Hello world!'

以上是一个最精简的Job配置样例,

通过读取文件为dict,可以直接拿去使用。

from kubernetes.client import V1Job
import yaml

with open('job.yaml') as file:
    cfg = yaml.safe_load(file)
job = batch.create_namespaced_job(namespace='default', body=cfg)
assert isinstance(job, V1Job)

create_namespaced_job同样接受字典作为body输入,因此YAML配置可以读出后直接传入。

这里返回的V1Job只是创建时的状态,但是会包含更多集群中的信息。

使用dict

由于create_namespaced_job接受字典作为body输入,因此直接使用dict也是可行的。

cfg = {
    'apiVersion': 'batch/v1',
    'kind': 'Job',
    'metadata': {
        'name': 'hello'
    },
    'spec': {
        'template': {
            'spec': {
                'restartPolicy':
                'Never',
                'containers': [{
                    'name': 'upload',
                    'image': 'alpine:3.11',
                    'args': ['echo', 'Hello world!']
                }]
            }
        }
    }
}
batch.create_namespaced_job(namespace='default', body=cfg)

由于dict结构与YAML相同,而又没有类的束缚,所以也很灵活方便。

此外,从YAML读出为dict后,也可以通过修改部分字段,达到动态变化的效果。 这种结合YAML和dict的使用方式,是对官方用法的最佳替代。

监控Job运行

在创建Job后,通常需要监控Job的运行,做一些外围处理。 轮询当然是下下策,而Kubernetes提供了一个Watch机制,通过接收Event,实现对状态变化的掌控。 Event只有在状态变化时才会有,所以是非常理想的回调。

from kubernetes.client import V1Job
from kubernetes.watch import Watch

job_name = 'hello'
watcher = Watch()
for event in watcher.stream(
        batch.list_namespaced_job,
        namespace='default',
        label_selector=f'job-name={job_name}',
):  # yapf: disable
    assert isinstance(event, dict)
    job = event['object']
    assert isinstance(job, V1Job)
    # handle job.status

Watch().stream就是前面说的理想回调,它第一个参数是列出类的函数,这里选择list_namespaced_job。 后面的参数,都是list_namespaced_job的参数。 除了必备的namespace以外,label_selector也是一个常用参数,可以避免关注无关的Job。 每个Job在创建后,都会自动带一个f'job-name={job_name}'的Label,可以借此筛选。 job_name就是metadata里设置的name,如这里job-name=hello

event是一个dict,只有三个值。 其中event['raw_object']只是event['object']dict形式,没有太大意义。 event['type']常见三个值,对应增删改。

  • ADDED,创建时的信息,和create_namespaced_job的返回值通常没有区别。
  • MODIFIED,Job状态变化时的信息。
  • DELETED,Job删除时的信息。

以上三个状态值,对其它类型的资源也是通用的,比如Pod、Deployment等。

V1Job的使用

对于具体的V1Job实例,其它字段都是和创建时的配置差不多的,只是多一些集群中的具体信息。 所以,常用的还是.status字段。

>>> from kubernetes.client import V1JobStatus
>>> isinstance(job.status, V1JobStatus)
True
>>> print(job.status)
{'active': None,
 'completion_time': datetime.datetime(2020, 8, 10, 9, 49, 38, tzinfo=tzutc()),
 'conditions': [{'last_probe_time': datetime.datetime(2020, 8, 10, 9, 49, 38, tzinfo=tzutc()),
   'last_transition_time': datetime.datetime(2020, 8, 10, 9, 49, 38, tzinfo=tzutc()),
   'message': None,
   'reason': None,
   'status': 'True',
   'type': 'Complete'}],
 'failed': None,
 'start_time': datetime.datetime(2020, 8, 10, 9, 49, 32, tzinfo=tzutc()),
 'succeeded': 1}

直接使用job.status.succeeded,可以得到成功的Container数量。 下面几乎每一级都有特定的类,可以连续使用.操作符。 如果有特殊需要,也可以用job.to_dict()转换成字典来用。

其它字段,作用基本上也和名称相关,不难推测。

列出Job

列出所有Job的list_job_for_all_namespaces不常用,一般只列出指定Namespace的Job。

from kubernetes.client import V1JobList, V1Job

job_list = batch.list_namespaced_job(namespace='default')
assert isinstance(job_list, V1JobList)

assert isinstance(job_list.items, list)
for job in job_list.items:
    assert isinstance(job, V1Job)

与监控的示例相比,这里去掉了label_selector,可以获取Namespace中所有的Job。 如果有需要,可以通过自定义Label把所有Job分类,并使用label_selector获取指定类型的Job。

读取Job

如果知道Job的name,可以直接通过read_*系列接口,获得指定的V1Job

from kubernetes.client import V1Job

job = batch.read_namespaced_job(name='hello', namespace='default')
assert isinstance(job, V1Job)

如果更看重状态,可以改用read_namespaced_job_status。 虽然访问的API不同,但在Python的V1Job这个结果层面,没有本质差异。

列出一个Job的Pod

Pod是Kubernetes调度的最小单元,也是最常用的一种资源。

from typing import List

from kubernetes.client import CoreV1Api, V1Pod


def get_pods_by(job_name: str) -> List[V1Pod]:
    core = CoreV1Api()
    pods = core.list_namespaced_pod(
        namespace='default',
        label_selector=f'job-name={job_name}',
        limit=1,
    )
    return pods.items

这里的get_pods_by,可以用job_name获取对应的Pod。 limit=1是在已知Pod只有一个的情况下做出的优化,可按需调整或去掉。

删除Job

删除一个Job:

from kubernetes.client import V1Status

status = batch.delete_namespaced_job(
    namespace='default',
    name=job_name,
    propagation_policy='Background',
)
assert isinstance(status, V1Status)

其中,propagation_policy='Background'是不可省略的关键,否则默认是Orphan,其Pod不会被删除。 这属于API设计的一个失误,与kubectl的默认行为不符合。 而且,应该没有人在删除了Job之后,还要保留Pod的吧。 这里也可以选择'Foreground',阻塞等待相关资源的删除完毕。

删除多个、或所有Job:

status = batch.delete_collection_namespaced_job(
    namespace='default',
    propagation_policy='Background',
    label_selector='some-label=your-value',
)
assert isinstance(status, V1Status)

如果没有label_selector,那就是删除一个Namespace中的所有Job。

更新Job

这个比较少用,因为一般都是建新的。 用法其实和create_namespaced_job差不多,参考官方样例即可。

def update_job(api_instance, job):
    # Update container image
    job.spec.template.spec.containers[0].image = "perl"
    api_response = api_instance.patch_namespaced_job(
        name=JOB_NAME,
        namespace="default",
        body=job)
    print("Job updated. status='%s'" % str(api_response.status))

总结

用Python操作Kubernetes的Job,总体上还是比较方便的,虽然有一些坑。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK