23

源码面前,了无密码:Kuberentes Scheduler 源码剖析

 3 years ago
source link: http://legendtkl.com/2020/06/06/kubernetes-scheduler-sourcecode/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

本篇文章介绍一下 Kubernetes 的默认调度器 kube-scheduler 的源码实现。kubernetes 代码版本:v1.18.4-rc.0。

0. 入口

入口函数在路径 kubernetes/cmd/kube-scheduler/scheduler.go#main() ,如下

func main() {
	rand.Seed(time.Now().UnixNano())

	command := app.NewSchedulerCommand()

	// TODO: once we switch everything over to Cobra commands, we can go back to calling
	// utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
	// normalize func and add the go flag set by hand.
	pflag.CommandLine.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
	// utilflag.InitFlags()
	logs.InitLogs()
	defer logs.FlushLogs()

	if err := command.Execute(); err != nil {
		os.Exit(1)
	}
}

核心逻辑就是:1. 创建一个 SchedulerCommand(第 4 行);2. 接收参数并执行(第 14 行)。我们先看一下创建 SchedulerCommand 的逻辑。

// NewSchedulerCommand creates a *cobra.Command object with default parameters and registryOptions
func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
	opts, err := options.NewOptions()
	if err != nil {
		klog.Fatalf("unable to initialize command options: %v", err)
	}

	cmd := &cobra.Command{
		Use: "kube-scheduler",
		Long: `The Kubernetes scheduler is a policy-rich, topology-aware,
workload-specific function that significantly impacts availability, performance,
and capacity. The scheduler needs to take into account individual and collective
resource requirements, quality of service requirements, hardware/software/policy
constraints, affinity and anti-affinity specifications, data locality, inter-workload
interference, deadlines, and so on. Workload-specific requirements will be exposed
through the API as necessary. See [scheduling](https://kubernetes.io/docs/concepts/scheduling/)
for more information about scheduling and the kube-scheduler component.`,
		Run: func(cmd *cobra.Command, args []string) {
			if err := runCommand(cmd, args, opts, registryOptions...); err != nil {
				fmt.Fprintf(os.Stderr, "%v\n", err)
				os.Exit(1)
			}
		},
	}
	fs := cmd.Flags()
	
  ...
  
	return cmd
}

首先我们可以看到 NewSchedulerCommand 接收一个不定参数,registryOptions。从名字我们可以看出来首先这个参数是作用于一个 Registry 的,这个 Registry 实际上就是用来管理 kuberentes 中的 plugin 的。

// Registry is a collection of all available plugins. The framework uses a
// registry to enable and initialize configured plugins.
// All plugins must be in the registry before initializing the framework.
type Registry map[string]PluginFactory

而 registryOptions 中的 option 其实是一种函数传参的方式的使用。option 传参的方式最早由 Rob Pike 提出来的,简单来说就是将可选的 option 参数封装成多个函数传给目标函数,然后在目标函数内部通过调用 option 函数的方式来初始化。后面我们看到 RegistryOptions 初始化的部分再来介绍。对于 option 这种方式感兴趣的同学可以参考我之前的一篇文章: http://legendtkl.com/2016/11/05/code-scalability/

其次是 cmd,通过 cobra.Command 构建出来的一个 CLI 处理工具,对于命令行的输入通过第 18 行的匿名函数来处理,匿名函数内部会调用函数 runCommand 来启动 scheduler 进程。去掉一些不重要的代码逻辑,runCommand 主要做的事情就是创建 scheduler 参数,然后通过 Run 函数启动 scheduler 进程。

// runCommand runs the scheduler.
func runCommand(cmd *cobra.Command, args []string, opts *options.Options, registryOptions ...Option) error {
	...
	
  // 创建 scheduler 参数
	c, err := opts.Config()
	if err != nil {
		return err
	}

	// Get the completed config
  // 参数补充
	cc := c.Complete()

	// Configz registration.
	if cz, err := configz.New("componentconfig"); err == nil {
		cz.Set(cc.ComponentConfig)
	} else {
		return fmt.Errorf("unable to register configz: %s", err)
	}

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	return Run(ctx, cc, registryOptions...)
}

Run 函数的主要逻辑如下:

  1. 初始化 Registry,第 6 ~ 11 行就是 option 这种函数传参的处理逻辑。
  2. 创建 scheduler 实例
  3. 其他初始化操作,包括 EventBroadcast、健康检测、metric 等相关逻辑
  4. 启动 Pod Informer 来监听 Pod
  5. 运行调度器(分没有启动 leader 选举,但是对应的方法都是 sched.Run 方法)
// Run executes the scheduler based on the given configuration. It only returns on error or when context is done.
func Run(ctx context.Context, cc schedulerserverconfig.CompletedConfig, outOfTreeRegistryOptions ...Option) error {
	// To help debugging, immediately log version
	klog.V(1).Infof("Starting Kubernetes Scheduler version %+v", version.Get())

	outOfTreeRegistry := make(framework.Registry)
	for _, option := range outOfTreeRegistryOptions {
		if err := option(outOfTreeRegistry); err != nil {
			return err
		}
	}

	recorderFactory := getRecorderFactory(&cc)
	// Create the scheduler.
	sched, err := scheduler.New(cc.Client,
		cc.InformerFactory,
		cc.PodInformer,
		recorderFactory,
		ctx.Done(),
		scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
		scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource),
		scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
		scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
		scheduler.WithBindTimeoutSeconds(cc.ComponentConfig.BindTimeoutSeconds),
		scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
		scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
		scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
		scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
	)
	if err != nil {
		return err
	}

	// Prepare the event broadcaster.
	if cc.Broadcaster != nil && cc.EventClient != nil {
		cc.Broadcaster.StartRecordingToSink(ctx.Done())
	}
	if cc.CoreBroadcaster != nil && cc.CoreEventClient != nil {
		cc.CoreBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: cc.CoreEventClient.Events("")})
	}
	// Setup healthz checks.
	var checks []healthz.HealthChecker
	if cc.ComponentConfig.LeaderElection.LeaderElect {
		checks = append(checks, cc.LeaderElection.WatchDog)
	}

	// Start up the healthz server.
	if cc.InsecureServing != nil {
		separateMetrics := cc.InsecureMetricsServing != nil
		handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, separateMetrics, checks...), nil, nil)
		if err := cc.InsecureServing.Serve(handler, 0, ctx.Done()); err != nil {
			return fmt.Errorf("failed to start healthz server: %v", err)
		}
	}
	if cc.InsecureMetricsServing != nil {
		handler := buildHandlerChain(newMetricsHandler(&cc.ComponentConfig), nil, nil)
		if err := cc.InsecureMetricsServing.Serve(handler, 0, ctx.Done()); err != nil {
			return fmt.Errorf("failed to start metrics server: %v", err)
		}
	}
	if cc.SecureServing != nil {
		handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, false, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
		// TODO: handle stoppedCh returned by c.SecureServing.Serve
		if _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil {
			// fail early for secure handlers, removing the old error loop from above
			return fmt.Errorf("failed to start secure server: %v", err)
		}
	}

	// Start all informers.
	go cc.PodInformer.Informer().Run(ctx.Done())
	cc.InformerFactory.Start(ctx.Done())

	// Wait for all caches to sync before scheduling.
	cc.InformerFactory.WaitForCacheSync(ctx.Done())

	// If leader election is enabled, runCommand via LeaderElector until done and exit.
	if cc.LeaderElection != nil {
		cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
			OnStartedLeading: sched.Run,
			OnStoppedLeading: func() {
				klog.Fatalf("leaderelection lost")
			},
		}
		leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
		if err != nil {
			return fmt.Errorf("couldn't create leader elector: %v", err)
		}

		leaderElector.Run(ctx)

		return fmt.Errorf("lost lease")
	}

	// Leader election is disabled, so runCommand inline until done.
	sched.Run(ctx)
	return fmt.Errorf("finished without leader elect")
}

scheduler 实例

首先我们看一下 scheduler 的定义,路径为 pkg/scheduler/scheduler.go

// Scheduler 监听未调度的 Pod,为其寻找适合的 Node 节点,并写回到 api server
type Scheduler struct {
  // 调度器 Cache
	SchedulerCache internalcache.Cache

	Algorithm core.ScheduleAlgorithm
	// PodConditionUpdater is used only in case of scheduling errors. If we succeed
	// with scheduling, PodScheduled condition will be updated in apiserver in /bind
	// handler so that binding and setting PodCondition it is atomic.
	podConditionUpdater podConditionUpdater
  // 在抢占情况下用来驱逐 pod,更新抢占者的 'NominatedNode' 字段
	podPreemptor podPreemptor

  // 返回下一个需要调度的 Pod,如果没有需要调度的 pod,则该方法将 block 住。这里不使用 channel 数据结构是因为调度过程可能会花费一定时间,设计者并不想在这个时间内让 Pod 停留在 channel 中。注:虽然官方没有说,这里还有一个可能的原因是 channel 不能持久化数据。
	NextPod func() *framework.PodInfo

	// Error is called if there is an error. It is passed the pod in
	// question, and the error
	Error func(*framework.PodInfo, error)

  // 用一个空的 struct channel 来标识是否需要 stop。Golang 中的惯用用法。
	StopEverything <-chan struct{}

  // 处理 PVC/PV
	VolumeBinder scheduling.SchedulerVolumeBinder

  // 是否禁止 Pod 抢占
	DisablePreemption bool

	// 调度队列,需要调度的 Pod 都存在这个队列里面,内部实现是一个优先级队列
	SchedulingQueue internalqueue.SchedulingQueue

	// Profiles are the scheduling profiles.
	Profiles profile.Map
	
	scheduledPodsHasSynced func() bool
}

运行调度器

下面看一下调度器

// Run begins watching and scheduling. It waits for cache to be synced, then starts scheduling and blocked until the context is done.
func (sched *Scheduler) Run(ctx context.Context) {
	if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) {
		return
	}
	sched.SchedulingQueue.Run()
	wait.UntilWithContext(ctx, sched.scheduleOne, 0)
	sched.SchedulingQueue.Close()
}

在 scheduler 的 Run 函数中主要做了三件事情:

  1. 等待 scheduler cache 同步(scheduler 刚起来,相当于冷启动)
  2. 运行调度器队列的 Run 函数
  3. 运行 scheduler 的 scheduleOne 函数

调度队列

调度队列的 Run 函数第一次看到总是给你一点点疑惑,作为一个队列难道还需要启动吗?确实是这样,如果调度队列只是一个优先级队列,那么确实不需要启动。kubernetes 中的调度队列是由三个队列组成,分别是:

  • activeQueue:待调度的 pod 队列,scheduler 会监听这个队列
  • backoffQueue:在 kubernetes 中,如果调度失败了,就相当于一次 backoff。backoffQueue 专门用来存放 backoff 的 pod。一般会有一个 backoffLimit 的限制就是最多容忍多少次 backoff。其次每次 backoff 之间的时间成倍增长。
  • unschedulableQueue:调度过程被终止的 pod 存放的队列。

调度队列的 Run 函数做的事情就是将 backoffQueue 和 unschedulableQueue 中 pod 定期移动到 activeQueue 中。

// Run starts the goroutine to pump from podBackoffQ to activeQ
func (p *PriorityQueue) Run() {
	go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
	go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)
}

其中 wait.Until 实际上就是一个类似 Cron 的定时调度器。细节实现暂时不细说了。

// Until loops until stop channel is closed, running f every period.
//
// Until is syntactic sugar on top of JitterUntil with zero jitter factor and
// with sliding = true (which means the timer for period starts after the f
// completes).
func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
	JitterUntil(f, period, 0.0, true, stopCh)
}

我们再来看一下两个 flush 函数的逻辑。首先是 flushBackoffQCompleted() ,主要逻辑如下:

getBackoffTime
calculateBackoffDuration()
// flushBackoffQCompleted Moves all pods from backoffQ which have completed backoff in to activeQ
func (p *PriorityQueue) flushBackoffQCompleted() {
	p.lock.Lock()
	defer p.lock.Unlock()
	for {
		rawPodInfo := p.podBackoffQ.Peek()
		if rawPodInfo == nil {
			return
		}
		pod := rawPodInfo.(*framework.PodInfo).Pod
		boTime := p.getBackoffTime(rawPodInfo.(*framework.PodInfo))
		if boTime.After(p.clock.Now()) {
			return
		}
		_, err := p.podBackoffQ.Pop()
		if err != nil {
			klog.Errorf("Unable to pop pod %v from backoff queue despite backoff completion.", nsNameForPod(pod))
			return
		}
		p.activeQ.Add(rawPodInfo)
		metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc()
		defer p.cond.Broadcast()
	}
}

// getBackoffTime returns the time that podInfo completes backoff
func (p *PriorityQueue) getBackoffTime(podInfo *framework.PodInfo) time.Time {
	duration := p.calculateBackoffDuration(podInfo)
	backoffTime := podInfo.Timestamp.Add(duration)
	return backoffTime
}

// calculateBackoffDuration is a helper function for calculating the backoffDuration
// based on the number of attempts the pod has made.
func (p *PriorityQueue) calculateBackoffDuration(podInfo *framework.PodInfo) time.Duration {
	duration := p.podInitialBackoffDuration
	for i := 1; i < podInfo.Attempts; i++ {
		duration = duration * 2
		if duration > p.podMaxBackoffDuration {
			return p.podMaxBackoffDuration
		}
	}
	return duration
}

下面我们看一下 unschedulableQueue 中的 pod 是如何 flush 的,也就是函数 flushUnschedulableQLeftover 的实现逻辑。逻辑非常简单,如果 pod 在 unschedulableQueue 中停留时间超过了 60s,就会被移除到 activeQueue。

// flushUnschedulableQLeftover moves pod which stays in unschedulableQ longer than the unschedulableQTimeInterval
// to activeQ.
func (p *PriorityQueue) flushUnschedulableQLeftover() {
	p.lock.Lock()
	defer p.lock.Unlock()

	var podsToMove []*framework.PodInfo
	currentTime := p.clock.Now()
	for _, pInfo := range p.unschedulableQ.podInfoMap {
		lastScheduleTime := pInfo.Timestamp
		if currentTime.Sub(lastScheduleTime) > unschedulableQTimeInterval {
			podsToMove = append(podsToMove, pInfo)
		}
	}

	if len(podsToMove) > 0 {
		p.movePodsToActiveOrBackoffQueue(podsToMove, UnschedulableTimeout)
	}
}

const (
	// If the pod stays in unschedulableQ longer than the unschedulableQTimeInterval,
	// the pod will be moved from unschedulableQ to activeQ.
	unschedulableQTimeInterval = 60 * time.Second

	queueClosed = "scheduling queue is closed"
)

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK