11

kubernetes之kube-scheduler源码浅析

 3 years ago
source link: https://www.maideliang.com/index.php/archives/34/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

kubernetes之kube-scheduler源码浅析

2019.02.12原创文章 0 °C

kube-scheduler是集群中Master节点的重要组件,其功能是根据调度算法计算,将Pod合理bind到Kubernetes集群中的各个node节点上,scheduler是怎么调度工作的?没什么文档比看源码逻辑更直接了,由于能力有限且源码庞大复杂,如有错误之处还望指正。

1cad0751-f657-4766-986a-20032bd6f083.png

k8s版本:1.13
代码下载: go get k8s.io/kubernetes

scheduler代码入口位置:
kubernetes/cmd/kube-scheduler/scheduler.go

func main() {
    rand.Seed(time.Now().UnixNano())

    command := app.NewSchedulerCommand()

    pflag.CommandLine.SetNormalizeFunc(utilflag.WordSepNormalizeFunc)

    logs.InitLogs()
    defer logs.FlushLogs()

    if err := command.Execute(); err != nil {
        fmt.Fprintf(os.Stderr, "%v\n", err)
        os.Exit(1)
    }
}

Scheduler命令启动执行NewSchedulerCommand创建一个新的Scheduler调度,NewSchedulerCommand是干嘛的,接着看看,NewSchedulerCommand部分代码:
kubernetes/cmd/kube-scheduler/app/server.go

func NewSchedulerCommand() *cobra.Command {
    opts, err := options.NewOptions()//读取初始化参数
    if err != nil {
        klog.Fatalf("unable to initialize command options: %v", err)
    }

    cmd := &cobra.Command{
        Use: "kube-scheduler",
        Long: `The Kubernetes scheduler is a policy-rich, topology-aware,
workload-specific function that significantly impacts availability, performance,
and capacity. The scheduler needs to take into account individual and collective
resource requirements, quality of service requirements, hardware/software/policy
constraints, affinity and anti-affinity specifications, data locality, inter-workload
interference, deadlines, and so on. Workload-specific requirements will be exposed
through the API as necessary.`,
        Run: func(cmd *cobra.Command, args []string) {
            if err := runCommand(cmd, args, opts); err != nil {
                fmt.Fprintf(os.Stderr, "%v\n", err)
                os.Exit(1)
            }
        },
    }
    fs := cmd.Flags()
    namedFlagSets := opts.Flags()
    verflag.AddFlags(namedFlagSets.FlagSet("global"))
    globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name())
    for _, f := range namedFlagSets.FlagSets {
        fs.AddFlagSet(f)
    }

    usageFmt := "Usage:\n  %s\n"
    cols, _, _ := apiserverflag.TerminalSize(cmd.OutOrStdout())
    cmd.SetUsageFunc(func(cmd *cobra.Command) error {
        fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine())
        apiserverflag.PrintSections(cmd.OutOrStderr(), namedFlagSets, cols)
        return nil
    })
    cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {
        fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine())
        apiserverflag.PrintSections(cmd.OutOrStdout(), namedFlagSets, cols)
    })
    cmd.MarkFlagFilename("config", "yaml", "yml", "json")

    return cmd
}

NewSchedulerCommand中
1、NewOptions进行读取参数实例化,其中参数缺省值在kubernetes/vendor/k8s.io/kube-scheduler/config/v1alpha1/types.go定义,

更多的默认参数可以通过命令kube-scheduler --help来查看。

2、runCommand来启动一个调度器,接着看runCommand的函数段:

func runCommand(cmd *cobra.Command, args []string, opts *options.Options) error {
    verflag.PrintAndExitIfRequested()
    utilflag.PrintFlags(cmd.Flags())

    if len(args) != 0 {
        fmt.Fprint(os.Stderr, "arguments are not supported\n")
    }

    if errs := opts.Validate(); len(errs) > 0 {
        fmt.Fprintf(os.Stderr, "%v\n", utilerrors.NewAggregate(errs))
        os.Exit(1)
    }

    if len(opts.WriteConfigTo) > 0 {
        if err := options.WriteConfigFile(opts.WriteConfigTo, &opts.ComponentConfig); err != nil {
            fmt.Fprintf(os.Stderr, "%v\n", err)
            os.Exit(1)
        }
        klog.Infof("Wrote configuration to: %s\n", opts.WriteConfigTo)
    }

    c, err := opts.Config()
    if err != nil {
        fmt.Fprintf(os.Stderr, "%v\n", err)
        os.Exit(1)
    }

    stopCh := make(chan struct{})

    // Get the completed config
    cc := c.Complete()

    // To help debugging, immediately log version
    klog.Infof("Version: %+v", version.Get())

    // Apply algorithms based on feature gates.
    // TODO: make configurable?
    algorithmprovider.ApplyFeatureGates()

    // Configz registration.
    if cz, err := configz.New("componentconfig"); err == nil {
        cz.Set(cc.ComponentConfig)
    } else {
        return fmt.Errorf("unable to register configz: %s", err)
    }

    return Run(cc, stopCh)
}

runCommand主要是读取schedule的启动配置参数,最后根据配置参数运行Run,并且根据Run处理返回相应的错误err。再继续往下看看Run干了什么:

func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {
    // Create the scheduler.
    sched, err := scheduler.New(cc.Client,
        cc.InformerFactory.Core().V1().Nodes(),
        cc.PodInformer,
        cc.InformerFactory.Core().V1().PersistentVolumes(),
        cc.InformerFactory.Core().V1().PersistentVolumeClaims(),
        cc.InformerFactory.Core().V1().ReplicationControllers(),
        cc.InformerFactory.Apps().V1().ReplicaSets(),
        cc.InformerFactory.Apps().V1().StatefulSets(),
        cc.InformerFactory.Core().V1().Services(),
        cc.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
        cc.InformerFactory.Storage().V1().StorageClasses(),
        cc.Recorder,
        cc.ComponentConfig.AlgorithmSource,
        stopCh,
        scheduler.WithName(cc.ComponentConfig.SchedulerName),
        scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight),
        scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
        scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
        scheduler.WithBindTimeoutSeconds(*cc.ComponentConfig.BindTimeoutSeconds))
    if err != nil {
        return err
    }

    // Prepare the event broadcaster.
    if cc.Broadcaster != nil && cc.EventClient != nil {
        cc.Broadcaster.StartLogging(klog.V(6).Infof)
        cc.Broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: cc.EventClient.Events("")})
    }

    // Setup healthz checks.
    var checks []healthz.HealthzChecker
    if cc.ComponentConfig.LeaderElection.LeaderElect {
        checks = append(checks, cc.LeaderElection.WatchDog)
    }

    // Start up the healthz server.
    if cc.InsecureServing != nil {
        separateMetrics := cc.InsecureMetricsServing != nil
        handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, separateMetrics, checks...), nil, nil)
        if err := cc.InsecureServing.Serve(handler, 0, stopCh); err != nil {
            return fmt.Errorf("failed to start healthz server: %v", err)
        }
    }
    if cc.InsecureMetricsServing != nil {
        handler := buildHandlerChain(newMetricsHandler(&cc.ComponentConfig), nil, nil)
        if err := cc.InsecureMetricsServing.Serve(handler, 0, stopCh); err != nil {
            return fmt.Errorf("failed to start metrics server: %v", err)
        }
    }
    if cc.SecureServing != nil {
        handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, false, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
        if err := cc.SecureServing.Serve(handler, 0, stopCh); err != nil {
            // fail early for secure handlers, removing the old error loop from above
            return fmt.Errorf("failed to start healthz server: %v", err)
        }
    }

    // Start all informers.
    go cc.PodInformer.Informer().Run(stopCh)
    cc.InformerFactory.Start(stopCh)

    // Wait for all caches to sync before scheduling.
    cc.InformerFactory.WaitForCacheSync(stopCh)
    controller.WaitForCacheSync("scheduler", stopCh, cc.PodInformer.Informer().HasSynced)

    // Prepare a reusable runCommand function.
    run := func(ctx context.Context) {
        sched.Run()
        <-ctx.Done()
    }

    ctx, cancel := context.WithCancel(context.TODO()) // TODO once Run() accepts a context, it should be used here
    defer cancel()

    go func() {
        select {
        case <-stopCh:
            cancel()
        case <-ctx.Done():
        }
    }()

    // If leader election is enabled, runCommand via LeaderElector until done and exit.
    if cc.LeaderElection != nil {
        cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
            OnStartedLeading: run,
            OnStoppedLeading: func() {
                utilruntime.HandleError(fmt.Errorf("lost master"))
            },
        }
        leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
        if err != nil {
            return fmt.Errorf("couldn't create leader elector: %v", err)
        }

        leaderElector.Run(ctx)

        return fmt.Errorf("lost lease")
    }

    // Leader election is disabled, so runCommand inline until done.
    run(ctx)
    return fmt.Errorf("finished without leader elect")
}

Run中根据config配置:
1、scheduler.New实例化一个新的调度器实例并返回sched
2、配置广播和健康检查以及metric信息(普罗米修斯监控使用)的端口:cc.InsecureServing.Serve(默认InsecureServing启动的监听端口为10251)
3、启动所有informer,其中单独执行PodInformer:cc.PodInformer.Informer().Run(stopCh),PodInformer提供pods信息的lister的访问入口。
PS:Informer 是 Client-go 中的一个核心工具包,Informer 最基本 的功能就是 List/Get Kubernetes 中的 Object,Informer的详细信息可以参考这篇文章。
4、调度前先执行kube-controller的controller.WaitForCacheSync,等待 pod的 informer 同步。
5、scheduler的高可用leader选举(--leader-elect),集群高可用部署时scheduler必须选举leader,默认即true。
6、运行真正的调度 run 函数,sched.Run()执行调度pod。

接着来看,第一步的scheduler.New是怎么来工作的,然后再看下最后一步的Run是怎么调度的,先跳转到scheduler.New代码部分:
kubernetes/pkg/scheduler/scheduler.go

func New(client clientset.Interface,
    nodeInformer coreinformers.NodeInformer,
    podInformer coreinformers.PodInformer,
    pvInformer coreinformers.PersistentVolumeInformer,
    pvcInformer coreinformers.PersistentVolumeClaimInformer,
    replicationControllerInformer coreinformers.ReplicationControllerInformer,
    replicaSetInformer appsinformers.ReplicaSetInformer,
    statefulSetInformer appsinformers.StatefulSetInformer,
    serviceInformer coreinformers.ServiceInformer,
    pdbInformer policyinformers.PodDisruptionBudgetInformer,
    storageClassInformer storageinformers.StorageClassInformer,
    recorder record.EventRecorder,
    schedulerAlgorithmSource kubeschedulerconfig.SchedulerAlgorithmSource,
    stopCh <-chan struct{},
    opts ...func(o *schedulerOptions)) (*Scheduler, error) {

    options := defaultSchedulerOptions
    for _, opt := range opts {
        opt(&options)
    }

    // Set up the configurator which can create schedulers from configs.
    configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
        SchedulerName:                  options.schedulerName,
        Client:                         client,
        NodeInformer:                   nodeInformer,
        PodInformer:                    podInformer,
        PvInformer:                     pvInformer,
        PvcInformer:                    pvcInformer,
        ReplicationControllerInformer:  replicationControllerInformer,
        ReplicaSetInformer:             replicaSetInformer,
        StatefulSetInformer:            statefulSetInformer,
        ServiceInformer:                serviceInformer,
        PdbInformer:                    pdbInformer,
        StorageClassInformer:           storageClassInformer,
        HardPodAffinitySymmetricWeight: options.hardPodAffinitySymmetricWeight,
        DisablePreemption:              options.disablePreemption,
        PercentageOfNodesToScore:       options.percentageOfNodesToScore,
        BindTimeoutSeconds:             options.bindTimeoutSeconds,
    })
    var config *factory.Config
    source := schedulerAlgorithmSource
    switch {
    case source.Provider != nil:
        // Create the config from a named algorithm provider.
        sc, err := configurator.CreateFromProvider(*source.Provider)
        if err != nil {
            return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err)
        }
        config = sc
    case source.Policy != nil:
        // Create the config from a user specified policy source.
        policy := &schedulerapi.Policy{}
        switch {
        case source.Policy.File != nil:
            if err := initPolicyFromFile(source.Policy.File.Path, policy); err != nil {
                return nil, err
            }
        case source.Policy.ConfigMap != nil:
            if err := initPolicyFromConfigMap(client, source.Policy.ConfigMap, policy); err != nil {
                return nil, err
            }
        }
        sc, err := configurator.CreateFromConfig(*policy)
        if err != nil {
            return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err)
        }
        config = sc
    default:
        return nil, fmt.Errorf("unsupported algorithm source: %v", source)
    }
    // Additional tweaks to the config produced by the configurator.
    config.Recorder = recorder
    config.DisablePreemption = options.disablePreemption
    config.StopEverything = stopCh
    // Create the scheduler.
    sched := NewFromConfig(config)
    return sched, nil
}

1、根据New的传参构建factory.NewConfigFactory配置configurator,factory.NewConfigFactory为一系列Informer初始化了回调函数,其中最重要的是PodInformer的两个回调函数,将已调度和未调度的Pod分别存入缓存和队列中。

// NewConfigFactory initializes the default implementation of a Configurator. To encourage eventual privatization of the struct type, we only
// return the interface.
func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
    stopEverything := args.StopCh
    if stopEverything == nil {
        stopEverything = wait.NeverStop
    }
    schedulerCache := schedulerinternalcache.New(30*time.Second, stopEverything)

    // storageClassInformer is only enabled through VolumeScheduling feature gate
    var storageClassLister storagelisters.StorageClassLister
    if args.StorageClassInformer != nil {
        storageClassLister = args.StorageClassInformer.Lister()
    }
    c := &configFactory{
        client:                         args.Client,
        podLister:                      schedulerCache,
        podQueue:                       internalqueue.NewSchedulingQueue(stopEverything),
        nodeLister:                     args.NodeInformer.Lister(),
        pVLister:                       args.PvInformer.Lister(),
        pVCLister:                      args.PvcInformer.Lister(),
        serviceLister:                  args.ServiceInformer.Lister(),
        controllerLister:               args.ReplicationControllerInformer.Lister(),
        replicaSetLister:               args.ReplicaSetInformer.Lister(),
        statefulSetLister:              args.StatefulSetInformer.Lister(),
        pdbLister:                      args.PdbInformer.Lister(),
        storageClassLister:             storageClassLister,
        schedulerCache:                 schedulerCache,
        StopEverything:                 stopEverything,
        schedulerName:                  args.SchedulerName,
        hardPodAffinitySymmetricWeight: args.HardPodAffinitySymmetricWeight,
        disablePreemption:              args.DisablePreemption,
        percentageOfNodesToScore:       args.PercentageOfNodesToScore,
    }

    c.scheduledPodsHasSynced = args.PodInformer.Informer().HasSynced
    // scheduled pod cache
    args.PodInformer.Informer().AddEventHandler(
        cache.FilteringResourceEventHandler{
            FilterFunc: func(obj interface{}) bool {
                switch t := obj.(type) {
                case *v1.Pod:
                    return assignedPod(t)
                case cache.DeletedFinalStateUnknown:
                    if pod, ok := t.Obj.(*v1.Pod); ok {
                        return assignedPod(pod)
                    }
                    runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod for filtering scheduledPod in %T", obj, c))
                    return false
                default:
                    runtime.HandleError(fmt.Errorf("unable to handle object for filtering scheduledPod in %T: %T", c, obj))
                    return false
                }
            },
            Handler: cache.ResourceEventHandlerFuncs{
                AddFunc:    c.addPodToCache,
                UpdateFunc: c.updatePodInCache,
                DeleteFunc: c.deletePodFromCache,
            },
        },
    )
    // unscheduled pod queue
    args.PodInformer.Informer().AddEventHandler(
        cache.FilteringResourceEventHandler{
            FilterFunc: func(obj interface{}) bool {
                switch t := obj.(type) {
                case *v1.Pod:
                    return !assignedPod(t) && responsibleForPod(t, args.SchedulerName)
                case cache.DeletedFinalStateUnknown:
                    if pod, ok := t.Obj.(*v1.Pod); ok {
                        return !assignedPod(pod) && responsibleForPod(pod, args.SchedulerName)
                    }
                    runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod for filtering unscheduledPod in %T", obj, c))
                    return false
                default:
                    runtime.HandleError(fmt.Errorf("unable to handle object for filtering unscheduledPod in %T: %T", c, obj))
                    return false
                }
            },
            Handler: cache.ResourceEventHandlerFuncs{
                AddFunc:    c.addPodToSchedulingQueue,
                UpdateFunc: c.updatePodInSchedulingQueue,
                DeleteFunc: c.deletePodFromSchedulingQueue,
            },
        },
    )
    // ScheduledPodLister is something we provide to plug-in functions that
    // they may need to call.
    c.scheduledPodLister = assignedPodLister{args.PodInformer.Lister()}

    args.NodeInformer.Informer().AddEventHandler(
        cache.ResourceEventHandlerFuncs{
            AddFunc:    c.addNodeToCache,
            UpdateFunc: c.updateNodeInCache,
            DeleteFunc: c.deleteNodeFromCache,
        },
    )

    args.PvInformer.Informer().AddEventHandler(
        cache.ResourceEventHandlerFuncs{
            // MaxPDVolumeCountPredicate: since it relies on the counts of PV.
            AddFunc:    c.onPvAdd,
            UpdateFunc: c.onPvUpdate,
        },
    )

    // This is for MaxPDVolumeCountPredicate: add/delete PVC will affect counts of PV when it is bound.
    args.PvcInformer.Informer().AddEventHandler(
        cache.ResourceEventHandlerFuncs{
            AddFunc:    c.onPvcAdd,
            UpdateFunc: c.onPvcUpdate,
        },
    )

    // This is for ServiceAffinity: affected by the selector of the service is updated.
    args.ServiceInformer.Informer().AddEventHandler(
        cache.ResourceEventHandlerFuncs{
            AddFunc:    c.onServiceAdd,
            UpdateFunc: c.onServiceUpdate,
            DeleteFunc: c.onServiceDelete,
        },
    )

    // Setup volume binder
    c.volumeBinder = volumebinder.NewVolumeBinder(args.Client, args.NodeInformer, args.PvcInformer, args.PvInformer, args.StorageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second)

    args.StorageClassInformer.Informer().AddEventHandler(
        cache.ResourceEventHandlerFuncs{
            AddFunc: c.onStorageClassAdd,
        },
    )

    // Setup cache debugger
    debugger := cachedebugger.New(
        args.NodeInformer.Lister(),
        args.PodInformer.Lister(),
        c.schedulerCache,
        c.podQueue,
    )
    debugger.ListenForSignal(c.StopEverything)

    go func() {
        <-c.StopEverything
        c.podQueue.Close()
    }()

    return c
}

2、根据Provider 和 configMap 来创建对应的调度器以及初始化环境变量、配置文件策略。
3、然后调用NewFromConfig方法,通过这个Config创建一个scheduler实例并返回sched

run的代码段

// Run begins watching and scheduling. It waits for cache to be synced, then starts a goroutine and returns immediately.
func (sched *Scheduler) Run() {
    if !sched.config.WaitForCacheSync() {
        return
    }

    go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}

Scheduler 通过 wait.Until() 工具,不停得调用 Scheduler.scheduleOne() 方法直到收到停止信号。

看看scheduleOne代码段,这是最终调度Pod的最终函数:

// scheduleOne does the entire scheduling workflow for a single pod.  It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne() {
    plugins := sched.config.PluginSet
    // Remove all plugin context data at the beginning of a scheduling cycle.
    if plugins.Data().Ctx != nil {
        plugins.Data().Ctx.Reset()
    }

    pod := sched.config.NextPod()
    // pod could be nil when schedulerQueue is closed
    if pod == nil {
        return
    }
    if pod.DeletionTimestamp != nil {
        sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
        klog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
        return
    }

    klog.V(3).Infof("Attempting to schedule pod: %v/%v", pod.Namespace, pod.Name)

    // Synchronously attempt to find a fit for the pod.
    start := time.Now()
    scheduleResult, err := sched.schedule(pod)
    if err != nil {
        // schedule() may have failed because the pod would not fit on any host, so we try to
        // preempt, with the expectation that the next time the pod is tried for scheduling it
        // will fit due to the preemption. It is also possible that a different pod will schedule
        // into the resources that were preempted, but this is harmless.
        if fitError, ok := err.(*core.FitError); ok {
            if !util.PodPriorityEnabled() || sched.config.DisablePreemption {
                klog.V(3).Infof("Pod priority feature is not enabled or preemption is disabled by scheduler configuration." +
                    " No preemption is performed.")
            } else {
                preemptionStartTime := time.Now()
                sched.preempt(pod, fitError)
                metrics.PreemptionAttempts.Inc()
                metrics.SchedulingAlgorithmPremptionEvaluationDuration.Observe(metrics.SinceInMicroseconds(preemptionStartTime))
                metrics.SchedulingLatency.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime))
            }
            // Pod did not fit anywhere, so it is counted as a failure. If preemption
            // succeeds, the pod should get counted as a success the next time we try to
            // schedule it. (hopefully)
            metrics.PodScheduleFailures.Inc()
        } else {
            klog.Errorf("error selecting node for pod: %v", err)
            metrics.PodScheduleErrors.Inc()
        }
        return
    }
    metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start))
    // Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
    // This allows us to keep scheduling without waiting on binding to occur.
    assumedPod := pod.DeepCopy()

    // Assume volumes first before assuming the pod.
    //
    // If all volumes are completely bound, then allBound is true and binding will be skipped.
    //
    // Otherwise, binding of volumes is started after the pod is assumed, but before pod binding.
    //
    // This function modifies 'assumedPod' if volume binding is required.
    allBound, err := sched.assumeVolumes(assumedPod, scheduleResult.SuggestedHost)
    if err != nil {
        klog.Errorf("error assuming volumes: %v", err)
        metrics.PodScheduleErrors.Inc()
        return
    }

    // Run "reserve" plugins.
    for _, pl := range plugins.ReservePlugins() {
        if err := pl.Reserve(plugins, assumedPod, scheduleResult.SuggestedHost); err != nil {
            klog.Errorf("error while running %v reserve plugin for pod %v: %v", pl.Name(), assumedPod.Name, err)
            sched.recordSchedulingFailure(assumedPod, err, SchedulerError,
                fmt.Sprintf("reserve plugin %v failed", pl.Name()))
            metrics.PodScheduleErrors.Inc()
            return
        }
    }
    // assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
    err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
    if err != nil {
        klog.Errorf("error assuming pod: %v", err)
        metrics.PodScheduleErrors.Inc()
        return
    }
    // bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
    go func() {
        // Bind volumes first before Pod
        if !allBound {
            err := sched.bindVolumes(assumedPod)
            if err != nil {
                klog.Errorf("error binding volumes: %v", err)
                metrics.PodScheduleErrors.Inc()
                return
            }
        }

        // Run "prebind" plugins.
        for _, pl := range plugins.PrebindPlugins() {
            approved, err := pl.Prebind(plugins, assumedPod, scheduleResult.SuggestedHost)
            if err != nil {
                approved = false
                klog.Errorf("error while running %v prebind plugin for pod %v: %v", pl.Name(), assumedPod.Name, err)
                metrics.PodScheduleErrors.Inc()
            }
            if !approved {
                sched.Cache().ForgetPod(assumedPod)
                var reason string
                if err == nil {
                    msg := fmt.Sprintf("prebind plugin %v rejected pod %v.", pl.Name(), assumedPod.Name)
                    klog.V(4).Infof(msg)
                    err = errors.New(msg)
                    reason = v1.PodReasonUnschedulable
                } else {
                    reason = SchedulerError
                }
                sched.recordSchedulingFailure(assumedPod, err, reason, err.Error())
                return
            }
        }

        err := sched.bind(assumedPod, &v1.Binding{
            ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
            Target: v1.ObjectReference{
                Kind: "Node",
                Name: scheduleResult.SuggestedHost,
            },
        })
        metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))
        if err != nil {
            klog.Errorf("error binding pod: %v", err)
            metrics.PodScheduleErrors.Inc()
        } else {
            klog.V(2).Infof("pod %v/%v is bound successfully on node %v, %d nodes evaluated, %d nodes were found feasible", assumedPod.Namespace, assumedPod.Name, scheduleResult.SuggestedHost, scheduleResult.EvaluatedNodes, scheduleResult.FeasibleNodes)
            metrics.PodScheduleSuccesses.Inc()
        }
    }()
}

scheduleOne是串行调度的,每次调度一个pod到合适的node节点上,
1、sched.config.NextPod从podQueue队列中取出下一个Pod。如果这个Pod正在删除,则跳过。
2.1、sched.schedule进行算法调度这个Pod,如果调度分配失败(比如没有node资源可调度)则进行记录,并且如果开启抢占配置(preemption),失败调度的pod会尝试抢占记录,下次调度优先调度,然后记录调度成功和调度失败的次数,以及调度分配耗时。

// schedule implements the scheduling algorithm and returns the suggested result(host,
// evaluated nodes number,feasible nodes number).
func (sched *Scheduler) schedule(pod *v1.Pod) (core.ScheduleResult, error) {
    result, err := sched.config.Algorithm.Schedule(pod, sched.config.NodeLister)
    if err != nil {
        pod = pod.DeepCopy()
        sched.recordSchedulingFailure(pod, err, v1.PodReasonUnschedulable, err.Error())
        return core.ScheduleResult{}, err
    }
    return result, err
}

2.2 sched.config.Algorithm.Schedule对node列表进行调度,如果调度成功则返回一个调度节点的name,否则返回相应的错误信息。

// Schedule tries to schedule the given pod to one of the nodes in the node list.
// If it succeeds, it will return the name of the node.
// If it fails, it will return a FitError error with reasons.
func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (result ScheduleResult, err error) {
    trace := utiltrace.New(fmt.Sprintf("Scheduling %s/%s", pod.Namespace, pod.Name))
    defer trace.LogIfLong(100 * time.Millisecond)

    if err := podPassesBasicChecks(pod, g.pvcLister); err != nil {
        return result, err
    }

    nodes, err := nodeLister.List()
    if err != nil {
        return result, err
    }
    if len(nodes) == 0 {
        return result, ErrNoNodesAvailable
    }

    if err := g.snapshot(); err != nil {
        return result, err
    }

    trace.Step("Computing predicates")
    startPredicateEvalTime := time.Now()
    filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes)
    if err != nil {
        return result, err
    }

    if len(filteredNodes) == 0 {
        return result, &FitError{
            Pod:              pod,
            NumAllNodes:      len(nodes),
            FailedPredicates: failedPredicateMap,
        }
    }
    metrics.SchedulingAlgorithmPredicateEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPredicateEvalTime))
    metrics.SchedulingLatency.WithLabelValues(metrics.PredicateEvaluation).Observe(metrics.SinceInSeconds(startPredicateEvalTime))

    trace.Step("Prioritizing")
    startPriorityEvalTime := time.Now()
    // When only one node after predicate, just use it.
    if len(filteredNodes) == 1 {
        metrics.SchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPriorityEvalTime))
        return ScheduleResult{
            SuggestedHost:  filteredNodes[0].Name,
            EvaluatedNodes: 1 + len(failedPredicateMap),
            FeasibleNodes:  1,
        }, nil
    }

    metaPrioritiesInterface := g.priorityMetaProducer(pod, g.cachedNodeInfoMap)
    priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
    if err != nil {
        return result, err
    }
    metrics.SchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPriorityEvalTime))
    metrics.SchedulingLatency.WithLabelValues(metrics.PriorityEvaluation).Observe(metrics.SinceInSeconds(startPriorityEvalTime))

    trace.Step("Selecting host")

    host, err := g.selectHost(priorityList)
    return ScheduleResult{
        SuggestedHost:  host,
        EvaluatedNodes: len(filteredNodes) + len(failedPredicateMap),
        FeasibleNodes:  len(filteredNodes),
    }, err
}

上面Schedule调度中:
2.2.1.获取可用的node列表
2.2.2.先进行Predicates预选(findNodesThatFit)调度算法计算,刷选出合适的filteredNodes,看看预选的代码段

func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) {
    var filtered []*v1.Node
    failedPredicateMap := FailedPredicateMap{}

    if len(g.predicates) == 0 {
        filtered = nodes
    } else {
        allNodes := int32(g.cache.NodeTree().NumNodes())
        numNodesToFind := g.numFeasibleNodesToFind(allNodes)

        // Create filtered list with enough space to avoid growing it
        // and allow assigning.
        filtered = make([]*v1.Node, numNodesToFind)
        errs := errors.MessageCountMap{}
        var (
            predicateResultLock sync.Mutex
            filteredLen         int32
        )

        ctx, cancel := context.WithCancel(context.Background())

        // We can use the same metadata producer for all nodes.
        meta := g.predicateMetaProducer(pod, g.cachedNodeInfoMap)

        checkNode := func(i int) {
            nodeName := g.cache.NodeTree().Next()
            fits, failedPredicates, err := podFitsOnNode(
                pod,
                meta,
                g.cachedNodeInfoMap[nodeName],
                g.predicates,
                g.schedulingQueue,
                g.alwaysCheckAllPredicates,
            )
            if err != nil {
                predicateResultLock.Lock()
                errs[err.Error()]++
                predicateResultLock.Unlock()
                return
            }
            if fits {
                length := atomic.AddInt32(&filteredLen, 1)
                if length > numNodesToFind {
                    cancel()
                    atomic.AddInt32(&filteredLen, -1)
                } else {
                    filtered[length-1] = g.cachedNodeInfoMap[nodeName].Node()
                }
            } else {
                predicateResultLock.Lock()
                failedPredicateMap[nodeName] = failedPredicates
                predicateResultLock.Unlock()
            }
        }

        // Stops searching for more nodes once the configured number of feasible nodes
        // are found.
        workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)

        filtered = filtered[:filteredLen]
        if len(errs) > 0 {
            return []*v1.Node{}, FailedPredicateMap{}, errors.CreateAggregateFromMessageCountMap(errs)
        }
    }

    if len(filtered) > 0 && len(g.extenders) != 0 {
        for _, extender := range g.extenders {
            if !extender.IsInterested(pod) {
                continue
            }
            filteredList, failedMap, err := extender.Filter(pod, filtered, g.cachedNodeInfoMap)
            if err != nil {
                if extender.IsIgnorable() {
                    klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
                        extender, err)
                    continue
                } else {
                    return []*v1.Node{}, FailedPredicateMap{}, err
                }
            }

            for failedNodeName, failedMsg := range failedMap {
                if _, found := failedPredicateMap[failedNodeName]; !found {
                    failedPredicateMap[failedNodeName] = []predicates.PredicateFailureReason{}
                }
                failedPredicateMap[failedNodeName] = append(failedPredicateMap[failedNodeName], predicates.NewFailureReason(failedMsg))
            }
            filtered = filteredList
            if len(filtered) == 0 {
                break
            }
        }
    }
    return filtered, failedPredicateMap, nil
}

2.2.3 numFeasibleNodesToFind刷选出可用的node数量
2.2.4 podFitsOnNode判断node是否合适在node节点上。TODO:kubernetes的extender的使用了解。
2.2.5.如果filteredNodes数量大于2,进行权重优选(PrioritizeNodes)调度算法计算刷选

func PrioritizeNodes(
    pod *v1.Pod,
    nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
    meta interface{},
    priorityConfigs []algorithm.PriorityConfig,
    nodes []*v1.Node,
    extenders []algorithm.SchedulerExtender,
) (schedulerapi.HostPriorityList, error) {
    // If no priority configs are provided, then the EqualPriority function is applied
    // This is required to generate the priority list in the required format
    if len(priorityConfigs) == 0 && len(extenders) == 0 {
        result := make(schedulerapi.HostPriorityList, 0, len(nodes))
        for i := range nodes {
            hostPriority, err := EqualPriorityMap(pod, meta, nodeNameToInfo[nodes[i].Name])
            if err != nil {
                return nil, err
            }
            result = append(result, hostPriority)
        }
        return result, nil
    }

    var (
        mu   = sync.Mutex{}
        wg   = sync.WaitGroup{}
        errs []error
    )
    appendError := func(err error) {
        mu.Lock()
        defer mu.Unlock()
        errs = append(errs, err)
    }

    results := make([]schedulerapi.HostPriorityList, len(priorityConfigs), len(priorityConfigs))

    // DEPRECATED: we can remove this when all priorityConfigs implement the
    // Map-Reduce pattern.
    for i := range priorityConfigs {
        if priorityConfigs[i].Function != nil {
            wg.Add(1)
            go func(index int) {
                defer wg.Done()
                var err error
                results[index], err = priorityConfigs[index].Function(pod, nodeNameToInfo, nodes)
                if err != nil {
                    appendError(err)
                }
            }(i)
        } else {
            results[i] = make(schedulerapi.HostPriorityList, len(nodes))
        }
    }

    workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
        nodeInfo := nodeNameToInfo[nodes[index].Name]
        for i := range priorityConfigs {
            if priorityConfigs[i].Function != nil {
                continue
            }

            var err error
            results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
            if err != nil {
                appendError(err)
                results[i][index].Host = nodes[index].Name
            }
        }
    })

    for i := range priorityConfigs {
        if priorityConfigs[i].Reduce == nil {
            continue
        }
        wg.Add(1)
        go func(index int) {
            defer wg.Done()
            if err := priorityConfigs[index].Reduce(pod, meta, nodeNameToInfo, results[index]); err != nil {
                appendError(err)
            }
            if klog.V(10) {
                for _, hostPriority := range results[index] {
                    klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), hostPriority.Host, priorityConfigs[index].Name, hostPriority.Score)
                }
            }
        }(i)
    }
    // Wait for all computations to be finished.
    wg.Wait()
    if len(errs) != 0 {
        return schedulerapi.HostPriorityList{}, errors.NewAggregate(errs)
    }

    // Summarize all scores.
    result := make(schedulerapi.HostPriorityList, 0, len(nodes))

    for i := range nodes {
        result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0})
        for j := range priorityConfigs {
            result[i].Score += results[j][i].Score * priorityConfigs[j].Weight
        }
    }

    if len(extenders) != 0 && nodes != nil {
        combinedScores := make(map[string]int, len(nodeNameToInfo))
        for i := range extenders {
            if !extenders[i].IsInterested(pod) {
                continue
            }
            wg.Add(1)
            go func(extIndex int) {
                defer wg.Done()
                prioritizedList, weight, err := extenders[extIndex].Prioritize(pod, nodes)
                if err != nil {
                    // Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
                    return
                }
                mu.Lock()
                for i := range *prioritizedList {
                    host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
                    if klog.V(10) {
                        klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), host, extenders[extIndex].Name(), score)
                    }
                    combinedScores[host] += score * weight
                }
                mu.Unlock()
            }(i)
        }
        // wait for all go routines to finish
        wg.Wait()
        for i := range result {
            result[i].Score += combinedScores[result[i].Host]
        }
    }

    if klog.V(10) {
        for i := range result {
            klog.Infof("Host %s => Score %d", result[i].Host, result[i].Score)
        }
    }
    return result, nil
}

2.2.6 selectHost() 如果优选出的多个得分相同的 Node,则随机选取一个 Node。
3、调度分配好后pod.DeepCopy()更新到缓存已分配记录(assumedPod)。
4、如果有volume的设置,在bind pod之前先进行volume的绑定。
5、异步bind pod到分配好的node节点上,调用 kube-apiserver API,将 Pod 绑定到选出的 Node,之后 kube-apiserver 会将元数据写入 etcd 中。
大致刷选流程图:
绘图1.png

总结:
kube-scheduler 作为 Kubernetes master上一个单独的进程提供调度服务,通过informer的list-watch机制,从apiserver端获取数据并缓存。

获取到待调度的 Pod 后,执行Schedule 方法进行调度,整个调度过程分两个关键步骤:Predicates 和 Priorities (其中刷选的调度策略policy可以通过启动参数--policy-config-file进行json格式自定义调度策略,如上流程图的policy摘至官方代码给的示例参数,默认调度策略是defaultProvider, defaultPredicates, defaultPriorities ),最终选出一个最适合该 Pod 的 Node,更新 SchedulerCache 中 Pod 的状态 (AssumePod),标志该 Pod 为 scheduled,并更新到 NodeInfo 中。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK