22

xxx

 3 years ago
source link: https://studygolang.com/articles/17987?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

kube-controller-manager

基本流程:

1、构造

2、配置

3、初始化

4、执行

1460000018046826?w=543&h=773

入口函数:/cmd/kube-controller-manager/controller-manager.go

func main() {

    rand.Seed(time.Now().UnixNano())
    
    //构造,配置,初始化command
    command := app.NewControllerManagerCommand()

    logs.InitLogs()
    defer logs.FlushLogs()

    //执行
    if err := command.Execute(); err != nil {
        fmt.Fprintf(os.Stderr, "%v\n", err)
        os.Exit(1)
    }

}

构造执行器: /cmd/kube-controller-manager/app/controllermanager.go

func NewControllerManagerCommand() *cobra.Command {

    //初始化Controller-manager的配置选项结构
    s, err := options.NewKubeControllerManagerOptions()

    ...

    //创建执行命令结构
    cmd := &cobra.Command{
        Use: "kube-controller-manager",
        Long: `The Kubernetes controller manager is a daemon that embeds...'
        //获取所有控制器
        c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List())
        ...
    }


    //返回执行器
    return cmd;
}

进入执行:/cmd/kube-controller-manager/app/controllermanager.go

func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
    ...
    //初始化controller manager 的HTTP服务
    var unsecuredMux *mux.PathRecorderMux
    if c.SecureServing != nil {
    ...
    //构造run的执行体
    run := func(stop <-chan struct{}) {
    ...
     //如果只是单节点,直接运行run    if !c.ComponentConfig.GenericComponent.LeaderElection.LeaderElect {
        run(wait.NeverStop)
        panic("unreachable")
    }
    //非单点,选主后执行run
    //进行选主,并在选为主节点后执行run
    leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
    ...
    //选主完成后执行
    OnStartedLeading: run,
    ...
}

run的执行体:/cmd/kube-controller-manager/app/controllermanager.go >> run()

run := func(stop <-chan struct{}) {
     
    //创建控制器上下文
    ctx, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, stop)
    if err != nil {
    glog.Fatalf("error building controller context: %v", err)
            }
    saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder:     rootClientBuilder}.startServiceAccountTokenController

    //初始化所有控制器
    if err := StartControllers(ctx, saTokenControllerInitFunc, NewControllerInitializers(ctx.LoopMode)); err != nil {
    glog.Fatalf("error starting controllers: %v", err)
    }

    //启动监听资源的事件
            ctx.InformerFactory.Start(ctx.Stop)
            close(ctx.InformersStarted)

            select {}
    }

选主流程:/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/interface.go

//选主主要有client-go工具类完成,选择configmap/endpoint来创建资源,哪个执行单元创建成功了此资源便可获得锁,锁信息存储在此configmap/endpoint中,选主代码如下
func New(lockType string, ns string, name string, client corev1.CoreV1Interface, rlc ResourceLockConfig) (Interface, error) {
    switch lockType {
    case EndpointsResourceLock:
        return &EndpointsLock{
            EndpointsMeta: metav1.ObjectMeta{
                Namespace: ns,
                Name:      name,
            },
            Client:     client,
            LockConfig: rlc,
        }, nil
    case ConfigMapsResourceLock:
        return &ConfigMapLock{
            ConfigMapMeta: metav1.ObjectMeta{
                Namespace: ns,
                Name:      name,
            },
            Client:     client,
            LockConfig: rlc,
        }, nil
    default:
        return nil, fmt.Errorf("Invalid lock-type %s", lockType)
    }
}

初始化所有控制器:/cmd/kube-controller-manager/app/controllermanager.go

func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc) error {
    ···
    //遍历所有的controller list
    for controllerName, initFn := range controllers {
        if !ctx.IsControllerEnabled(controllerName) {
        glog.Warningf("%q is disabled", controllerName)
        continue
    }
    time.Sleep(wait.Jitter(ctx.ComponentConfig.GenericComponent.ControllerStartInterval.Duration, ControllerStartJitter))
    glog.V(1).Infof("Starting %q", controllerName)
    //执行每个controller的初始化函数
    started, err := initFn(ctx)
    ···
    }

    return nil
}

创建控制器上下文:/cmd/kube-controller-manager/app/controllermanager.go

func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) (ControllerContext, error) {
    //拿到对apiServer资源的操作的句柄
    versionedClient := rootClientBuilder.ClientOrDie("shared-informers")
    sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)())

    //确认api Server的健康(最多等待的时间为10s),再获取连接
    if err := genericcontrollermanager.WaitForAPIServer(versionedClient, 10*time.Second); err != nil {
    return ControllerContext{}, fmt.Errorf("failed to wait for apiserver being healthy: %v", err)
    }
    //创建并返回controllerContext
    ctx := ControllerContext{
        ClientBuilder:      clientBuilder,
        InformerFactory:    sharedInformers,
        ...
    }

    
    return ctx,nil
}

kube-scheduler

基本流程

1、初始化配置

2、构造

3、从队列中获取pod

4、进行绑定

bVbnSR2?w=570&h=765

入口函数:/cmd/kube-scheduler/scheduler.go

func main() {
    rand.Seed(time.Now().UnixNano())
    //构造
    command := app.NewSchedulerCommand()
    pflag.CommandLine.SetNormalizeFunc(utilflag.WordSepNormalizeFunc)
    // utilflag.InitFlags()
    logs.InitLogs()
    defer logs.FlushLogs()
    //执行
    if err := command.Execute(); err != nil {
        fmt.Fprintf(os.Stderr, "%v\n", err)
        os.Exit(1)
    }
}

注册调度策略:pkg/scheduler/algorithmprovider/defaults/defaults.go

func registerAlgorithmProvider(predSet, priSet sets.String) {
    // Registers algorithm providers. By default we use 'DefaultProvider', but user can specify one to be used
    // by specifying flag.
    factory.RegisterAlgorithmProvider(factory.DefaultProvider, predSet, priSet)
    // Cluster autoscaler friendly scheduling algorithm.
    factory.RegisterAlgorithmProvider(ClusterAutoscalerProvider, predSet,
        copyAndReplace(priSet, priorities.LeastRequestedPriority, priorities.MostRequestedPriority))
}

从组件入口:/cmd/kube-scheduler/app/server.go

func NewSchedulerCommand() *cobra.Command {
    //初始化默认的参数
    opts, err := options.NewOptions()
    
    //构造执行命令对象
    cmd := &cobra.Command{
    Use: "kube-scheduler",
    Long: `The Kubernetes ······`,
    Run: func(cmd *cobra.Command, args []string) {
    ...
    }
    //读取配置参数
    opts.AddFlags(cmd.Flags())
    cmd.MarkFlagFilename("config", "yaml", "yml", "json")

    return cmd
}

启动:/cmd/kube-scheduler/app/server.go

func Run(c schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {
    //设置调度算法
    algorithmprovider.ApplyFeatureGates()
    ...
    //初始化schedulerConfig
    schedulerConfig, err := NewSchedulerConfig(c)

    //创建Scheduler对象
    sched := scheduler.NewFromConfig(schedulerConfig)
    
    // 进行健康检查
    if c.InsecureServing != nil {    
    ...
    //是否需要选主
    if c.LeaderElection != nil {
    ...
    //执行调度任务
    run(stopCh)

}

执行:/cmd/kube-scheduler/app/server.go

//开始执行调度任务
func (sched *Scheduler) Run() {
    if !sched.config.WaitForCacheSync() {
    return
    }

    if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
        go sched.config.VolumeBinder.Run(sched.bindVolumesWorker, sched.config.StopEverything)
    }

    //串行执行调度任务
    go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}

调度pod逻辑:/cmd/kube-scheduler/scheduler.go

func (sched *Scheduler) scheduleOne() {
    //从队列中获取pod
    pod := sched.config.NextPod()
    ...
    //给获取到的pod调度到合适的位置
     suggestedHost, err := sched.schedule(pod)
    ...
    //在缓存中预先绑定主机(调用apiserver的延时问题)
     assumedPod := pod.DeepCopy()
    ...
    //通过apiserver的client进行绑定
    go func() {
    err := sched.bind(assumedPod, &v1.Binding{
                   ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name:     assumedPod.Name, UID: assumedPod.UID}
    ...
}

寻找合适的节点:/pkg/scheduler/core/generic_scheduler.go

func (sched *Scheduler) scheduleOne() {
    //从队列中获取pod
    pod := sched.config.NextPod()
    ...
    //给获取到的pod调度到合适的位置
     suggestedHost, err := sched.schedule(pod)
    ...
    //在缓存中预先绑定主机(调用apiserver的延时问题)
     assumedPod := pod.DeepCopy()
    ...
    //通过apiserver的client进行绑定
    go func() {
    err := sched.bind(assumedPod, &v1.Binding{
                   ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name:     assumedPod.Name, UID: assumedPod.UID}
    ...
}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK