1

基于k8s的工作流

 1 year ago
source link: https://qiankunli.github.io/2022/11/09/workflow.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Kruise Rollout:灵活可插拔的渐进式发布框架 K8s 当中所有的 Pod 都是由工作负载来管理的,其中最常见的两个工作负载就是 Deployment 和 statefulset。Deployment 对于升级而言提供了 maxUnavailable 和 maxSurge 两个参数,但是本质上来讲 Deployment 它只支持流式的一次性发布,用户并不能控制分批。StatefulSet 虽然支持分批,但是跟我们想要的渐进式发布的能力还有比较大的距离。

渐进式发布

Argo Rollout 和 Flagger

Argo Rollout 是 Argo 公司推出的 Workload,它的实现思路是:重新定义一个类似于 Deployment 的工作负载,在实现 Deployment 原有能力的基础上,又扩展了 Rollout 的相关能力。它的优点是工作负载内置了 Rollout 能力,配置简单、实现也会比较简单,并且目前支持的功能也非常的丰富,支持各种发布策略、流量灰度和 metrics 分析,是一个比较成熟的项目。但是它也存在一些问题,因为它本身就是一个工作负载,所以它不能适用于社区 Deployment,尤其是针对已经用 Deployment 部署的公司,需要一次线上迁移工作负载的工作。

另一个社区项目是 Flagger,它的实现思路跟 Argo-Rollout 完全不同。它没有单独的实现一个 workload,而是在现有 Deployment 的基础之上,扩展了流量灰度、分批发布的能力。Flagger 的优势是支持原生 Deployment 、并且与社区的 Helm、Argo-CD 等方案都是兼容的。但是也存在一些问题,首先就是发布过程中的 Double Deployment 资源的问题,因为它是先升级用户部署的 Deployment,再升级 Primary,所以在这过程中需要准备双倍的 Pod 资源。第二呢,针对一些自建的容器平台需要额外对接,因为它的实现思路是将用户部署资源都 copy 一份,且更改资源的名字以及 Label。PS:当年我也尝试过用两个Deployment 实现分批发布

Kruise Rollout

Kruise Rollout: 让所有应用负载都能使用渐进式交付

在设计 Rollout 过程中有以下几个目标:

  1. 无侵入性:对原生的 Workload 控制器以及用户定义的 Application Yaml 定义不进行任何修改,保证原生资源的干净、一致
  2. 可扩展性:通过可扩展的方式,支持 K8s Native Workload、自定义 Workload 以及 Nginx、Isito 等多种 Traffic 调度方式
  3. 易用性:对用户而言开箱即用,能够非常方便的与社区 Gitops 或自建 PaaS 结合使用

apiVersion: rollouts.kruise.io/v1alpha1
kind: Rollout
spec:
  strategy:
    objectRef:    # 用于表明 Kruise Rollout 所作用的工作负载,例如:Deployment Name
      workloadRef:
        apiVersion: apps/v1
        # Deployment, CloneSet, AdDaemonSet etc.
        kind: Deployment 
        name: echoserver
    canary:       # 定义了 Rollout 发布的过程
      steps:
        # routing 5% traffics to the new version
      - weight: 5
        # Manual confirmation, release the back steps
        pause: {}
        # optional, The first step of released replicas. If not set, the default is to use 'weight', as shown above is 5%.
        replicas: 1
      - weight: 40
        # sleep 600s, release the back steps
        pause: {duration: 600}
      - weight: 60
        pause: {duration: 600}
      - weight: 80
        pause: {duration: 600}
        # 最后一批无需配置
      trafficRoutings:      # 流量灰度所需要的资源 Name,例如:Service、Ingress 或 Gateway API
        # echoserver service name
      - service: echoserver
        # nginx ingress
        type: nginx
        # echoserver ingress name
        ingress:
          name: echoserver
openkruise_rollout.png

Kruise Rollout 的工作机制

  1. 首先,用户基于容器平台做一次版本发布(一次发布从本质上讲就是将 K8s 资源 apply 到集群中)。
  2. Kruise Rollout 包含一个 webhook 组件,它会拦截用户的发布请求,然后通过修改 workload strategy 的方式 Pause 住 workload 控制器的工作。
  3. 然后,就是根据用户的 Rollout 定义,动态的调整 workload 的参数,比如:partition,实现 workload 的分批发布。
  4. 等到批次发布完成后,又会调整 ingress、service 配置,将特定的流量导入到新版本。
  5. 最后,Kruise Rollout 还能够通过 prometheus 中的业务指标判断发布是否正常。比如说,对于一个 web 类 http 的服务,可以校验 http 状态码是否正常。
  6. 上面的过程,就完成了第一批次的灰度,后面的批次也是类似的。完整的 Rollout 过程结束后,kruise 会将 workload 等资源的配置恢复回来。 所以说,整个 Rollout 过程,是与现有工作负载能力的一种协同,它尽量复用工作负载的能力,又做到了非 Rollout 过程的零入侵。

kubevela workflow

Control the delivery process of multiple applications

apiVersion: core.oam.dev/v1alpha1
kind: WorkflowRun
metadata:
  name: apply-applications
  namespace: default
  annotations:
    workflowrun.oam.dev/debug: "true"
spec:
  workflowSpec:
    steps:
      - name: check-app-exist     ## 查看 webservice-app
        type: read-app
        properties:
          name: webservice-app
      - name: apply-app1  
        type: apply-app
        if: status["check-app-exist"].message == "Application not found"    # 如果 webservice-app 不存在  部署 webservice-app
        properties:
          data:
            apiVersion: core.oam.dev/v1beta1
            kind: Application
            metadata:
              name: webservice-app
            spec:
              components:
                - name: express-server
                  type: webservice
                  properties:
                    image: crccheck/hello-world
                    ports:
                      - port: 8000
      - name: suspend       # 暂停,除非手动操作 vela workflow resume apply-applications 恢复
        type: suspend
        timeout: 24h
      - name: apply-app2
        type: apply-app
        properties:
          ref:
            name: my-app
            key: application
            type: configMap

kubevela workflow 源码分析

workflow
  /cmd
    /main.go
  /controllers
    /workflowrun_controller.go
  /pkg
    /cue          # cue相关处理逻辑
    /executor     # executor 执行引擎
    /tasks        # 内置或自定义 step
      /builtin
      /custom
      /template
      /discover.go

Reconcile逻辑

缘起 WorkflowRun 的Reconcile,先将 WorkflowRun 转为 WorkflowInstance,然后基于WorkflowInstances 生成taskRunners,最后由executor 执行taskRunners。

// workflow/controllers/workflowrun_controller.go
func (r *WorkflowRunReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
  run := new(v1alpha1.WorkflowRun)
  r.Get(ctx, client.ObjectKey{Name:      req.Name,Namespace: req.Namespace,}, run)
  instance, err := generator.GenerateWorkflowInstance(ctx, r.Client, run)
  runners, err := generator.GenerateRunners(logCtx, instance, types.StepGeneratorOptions{
    PackageDiscover: r.PackageDiscover,
    Client:          r.Client,
  })
  executor := executor.New(instance, r.Client)
  state, err := executor.ExecuteRunners(logCtx, runners)
  
  isUpdate = isUpdate && instance.Status.Message == ""
  run.Status = instance.Status
  run.Status.Phase = state
  switch state {
  case v1alpha1.WorkflowStateSuspending:
  case v1alpha1.WorkflowStateFailed:
  case v1alpha1.WorkflowStateTerminated:
  case v1alpha1.WorkflowStateExecuting:
  case v1alpha1.WorkflowStateSucceeded:
  case v1alpha1.WorkflowStateSkipped:
  }
  return ctrl.Result{}, nil
}

task 生成

// workflow/pkg/types/types.go
type WorkflowInstance struct {
	WorkflowMeta
	OwnerInfo []metav1.OwnerReference
	Debug     bool
	Context   map[string]interface{}
	Mode      *v1alpha1.WorkflowExecuteMode
	Steps     []v1alpha1.WorkflowStep
	Status    v1alpha1.WorkflowRunStatus
}
// workflow/pkg/types/types.go
type TaskRunner interface {
	Name() string
	Pending(ctx monitorContext.Context, wfCtx wfContext.Context, stepStatus map[string]v1alpha1.StepStatus) (bool, v1alpha1.StepStatus)
	Run(ctx wfContext.Context, options *TaskRunOptions) (v1alpha1.StepStatus, *Operation, error)
}

一个workflow由多个step组成,每个step 有name/type/if/timeout/dependsOn/inputs/outputs/properties 等属性(对应 WorkflowStep struct),之后将 WorkflowStep 转为 TaskRunner,由executor 驱动执行。

// workflow/pkg/generator/generator.go
func GenerateRunners(ctx monitorContext.Context, instance *types.WorkflowInstance,...) ([]types.TaskRunner, error) {
	taskDiscover := tasks.NewTaskDiscover(ctx, options)
	var tasks []types.TaskRunner
	for _, step := range instance.Steps {
		opt := &types.TaskGeneratorOptions{...}
		task, err := generateTaskRunner(ctx, instance, step, taskDiscover, opt, options)
		tasks = append(tasks, task)
	}
	return tasks, nil
}
func generateTaskRunner(ctx context.Context,instance *types.WorkflowInstance,step v1alpha1.WorkflowStep,...) (types.TaskRunner, error) {
	if step.Type == types.WorkflowStepTypeStepGroup {
    ...
	}
	genTask, err := taskDiscover.GetTaskGenerator(ctx, step.Type)
	task, err := genTask(step, options)
	return task, nil
}

基于自定义cue模版生成task

示例demo yaml 中的 read-app/apply-app/suspend 都是一个task type,有些是builtin的,有些是自定义扩展实现的。buildin的比较简单,直接实现了TaskRunner 接口方法,初始化taskDiscover时直接写入,custom 则由 TaskLoader 负责管理

// workflow/pkg/tasks/discover.go
type taskDiscover struct {
	builtin            map[string]types.TaskGenerator
	customTaskDiscover *custom.TaskLoader
}
func NewTaskDiscover(ctx monitorContext.Context, options types.StepGeneratorOptions) types.TaskDiscover {
	return &taskDiscover{
		builtin: map[string]types.TaskGenerator{
			types.WorkflowStepTypeSuspend:   builtin.Suspend,
			types.WorkflowStepTypeStepGroup: builtin.StepGroup,
		},
		customTaskDiscover: custom.NewTaskLoader(options.TemplateLoader.LoadTemplate, options.PackageDiscover, ...),
	}
}
func (td *taskDiscover) GetTaskGenerator(ctx context.Context, name string) (types.TaskGenerator, error) {
	tg, ok := td.builtin[name]          // 如果是自建的则直接返回
	if ok {
		return tg, nil
	}
	if td.customTaskDiscover != nil {   // 否则从custom 里寻找
		tg, err = td.customTaskDiscover.GetTaskGenerator(ctx, name)
		return tg, nil
	}
	return nil, errors.Errorf("can't find task generator: %s", name)
}

对于自定义 task type,从 static 目录里扫描模版文件,如果还找不到,查询对应type 的WorkflowStepDefinition 中拿到cue 模版文件。

TaskLoader.GetTaskGenerator(ctx, name)  # 此处的name值为step.Type   
  templ, err := WorkflowStepLoader.LoadTemplate
    files, err := templateFS.ReadDir(templateDir)
    staticFilename := name + ".cue"
    for _, file := range files {      # 找到对应的cue模版文件
      if staticFilename == file.Name() {  
        fileName := fmt.Sprintf("%s/%s", templateDir, file.Name())
        content, err := templateFS.ReadFile(fileName)
        return string(content), err
      }
    }
    getDefinitionTemplate
      definition := &unstructured.Unstructured{}
	    definition.SetAPIVersion("core.oam.dev/v1beta1")
	    definition.SetKind("WorkflowStepDefinition")
      cli.Get(ctx, types.NamespacedName{Name: definitionName, Namespace: ns}, definition)
      d := new(def)
      runtime.DefaultUnstructuredConverter.FromUnstructured(definition.Object, d)
      d.Spec.Schematic.CUE.Template
  TaskLoader.makeTaskGenerator(templ)
    return func(...)(TaskRunner,error){
      paramsStr, err := GetParameterTemplate(wfStep)  # 从  WorkflowStep 获取用户参数
      tRunner := new(taskRunner)
		  tRunner.name = wfStep.Name
      tRunner.checkPending = ...
      tRunner.run =  ...
      return tRunner, nil
    }

workflowSpec.step 有type 和properties 等参数,WorkflowStepDefinition 有对应type的cue template 文件,两相结合先把cue template中的参数替换成真实值,再根据cue 生成taskRunner 的Run方法。这里涉及到cue 包里的细节,还未深研究,其核心意图是:通过cue 代码(感觉也是dsl的一种)表述意图 + WorkflowStepDefinition 来支持自定义step type。PS: 估计是觉得通过代码方式扩展step type太慢了。

// workflow/pkg/tasks/custom/task.go
func(ctx wfContext.Context, options *types.TaskRunOptions) (stepStatus v1alpha1.StepStatus, operations *types.Operation, rErr error) {
  basicVal, basicTemplate, err := MakeBasicValue(tracer, ctx, t.pd, wfStep.Name, exec.wfStatus.ID, paramsStr, options.PCtx)
  var taskv *value.Value
  for _, hook := range options.PreCheckHooks {...}
  for _, hook := range options.PreStartHooks {...}
  // refresh the basic template to get inputs value involved
  basicTemplate, err = basicVal.String()
  taskv, err = value.NewValue(strings.Join([]string{templ, basicTemplate}, "\n"), t.pd, "", value.ProcessScript, value.TagFieldOrder)
  exec.doSteps(tracer, ctx, taskv)
  return exec.status(), exec.operation(), nil
}

executor执行框架

先检查 tasks 是否都结束了(Done/Succeeded/Failed),结束则直接返回,否则新建 engine 执行。

// workflow/pkg/executor/workflow.go
func (w *workflowExecutor) ExecuteRunners(ctx , taskRunners []types.TaskRunner) (v1alpha1.WorkflowRunPhase, error) {
	allTasksDone, allTasksSucceeded := w.allDone(taskRunners)
  if checkWorkflowTerminated(status, allTasksDone) {
    if isTerminatedManually(status) {
      return v1alpha1.WorkflowStateTerminated, nil
    }
    return v1alpha1.WorkflowStateFailed, nil
  }
  if checkWorkflowSuspended(status) {
    return v1alpha1.WorkflowStateSuspending, nil
  }
  if allTasksSucceeded {
    return v1alpha1.WorkflowStateSucceeded, nil
  }
  e := newEngine(ctx, wfCtx, w, status)
  err = e.Run(ctx, taskRunners, dagMode)
  allTasksDone, allTasksSucceeded = w.allDone(taskRunners)
  if status.Terminated {
    ...
  }
  if status.Suspend {
    wfContext.CleanupMemoryStore(e.instance.Name, e.instance.Namespace)
    return v1alpha1.WorkflowStateSuspending, nil
  }
  if allTasksSucceeded {
    return v1alpha1.WorkflowStateSucceeded, nil
  }
  return v1alpha1.WorkflowStateExecuting, nil
}  

engine 依次执行taskRunners ,并记录执行结果,每次从头开始遍历taskRunners,若已完成,则下一个。若未完成,先执行task的 Pending 方法,检查是否需要等待,若ok则执行task的Run 方法。task/step 的状态 WorkflowStepPhase:succeeded/failed/skipped/running/pending。

// workflow/pkg/executor/workflow.go
func (e *engine) Run(ctx monitorContext.Context, taskRunners []types.TaskRunner, dag bool) error {
	var err error
	if dag {
		err = e.runAsDAG(ctx, taskRunners, false)
	} else {
		err = e.steps(ctx, taskRunners, dag)
	}
	e.checkFailedAfterRetries()
	e.setNextExecuteTime()
	return err
}
func (e *engine) steps(ctx monitorContext.Context, taskRunners []types.TaskRunner, dag bool) error {	wf
	for index, runner := range taskRunners {
		if status, ok := e.stepStatus[runner.Name()]; ok {
			if types.IsStepFinish(status.Phase, status.Reason) {
				continue
			}
		}
		if pending, status := runner.Pending(ctx, wfCtx, e.stepStatus); pending {
			wfCtx.IncreaseCountValueInMemory(types.ContextPrefixBackoffTimes, status.ID)
			e.updateStepStatus(status)
			if dag {
				continue
			}
			return nil
		}
		options := e.generateRunOptions(e.findDependPhase(taskRunners, index, dag))
		status, operation, err := runner.Run(wfCtx, options)
		e.updateStepStatus(status)

		e.failedAfterRetries = e.failedAfterRetries || operation.FailedAfterRetries
		e.waiting = e.waiting || operation.Waiting
		// for the suspend step with duration, there's no need to increase the backoff time in reconcile when it's still running
		if !types.IsStepFinish(status.Phase, status.Reason) && !isWaitSuspendStep(status) {
			if err := handleBackoffTimes(wfCtx, status, false); err != nil {
				return err
			}
			if dag {
				continue
			}
			return nil
		}
		// clear the backoff time when the step is finished
		if err := handleBackoffTimes(wfCtx, status, true); err != nil {
			return err
		}
		e.finishStep(operation)
		if dag {
			continue
		}
		if e.needStop() {
			return nil
		}
	}
	return nil
}

resume 逻辑

用户触发执行vela workflow resume xx,将 处于running 状态的 suspend step 置为 succeed,之后更新 WorkflowRun 的status,触发WorkflowRun reconcile,进而触发 executor.ExecuteRunners 。

// workflow/pkg/utils/operation.go
func (wo workflowRunOperator) Resume(ctx context.Context) error {
  run := wo.run
	if run.Status.Terminated {
		return fmt.Errorf("can not resume a terminated workflow")
	}
	if run.Status.Suspend {
		ResumeWorkflow(ctx, wo.cli, run)
	}
	return wo.writeOutputF("Successfully resume workflow: %s\n", run.Name)
}
func ResumeWorkflow(ctx context.Context, cli client.Client, run *v1alpha1.WorkflowRun) error {
	run.Status.Suspend = false
	steps := run.Status.Steps
	for i, step := range steps {
		if step.Type == wfTypes.WorkflowStepTypeSuspend && step.Phase == v1alpha1.WorkflowStepPhaseRunning {
			steps[i].Phase = v1alpha1.WorkflowStepPhaseSucceeded
		}
		for j, sub := range step.SubStepsStatus {
			if sub.Type == wfTypes.WorkflowStepTypeSuspend && sub.Phase == v1alpha1.WorkflowStepPhaseRunning {
				steps[i].SubStepsStatus[j].Phase = v1alpha1.WorkflowStepPhaseSucceeded
			}
		}
	}
	retry.RetryOnConflict(retry.DefaultBackoff, func() error {
		return cli.Status().Patch(ctx, run, client.Merge)
	})
	return nil
}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK