6

深入解析kubernetes controller-runtime - Cylon

 1 year ago
source link: https://www.cnblogs.com/Cylon/p/16417725.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Overview#

controller-runtime 是 Kubernetes 社区提供可供快速搭建一套 实现了controller 功能的工具,无需自行实现Controller的功能了;在 KubebuilderOperator SDK 也是使用 controller-runtime 。本文将对 controller-runtime 的工作原理以及在不同场景下的使用方式进行简要的总结和介绍。

controller-runtime structure#

controller-runtime 主要组成是需要用户创建的 ManagerReconciler 以及 Controller Runtime 自己启动的 CacheController

  • Manager:是用户在初始化时创建的,用于启动 Controller Runtime 组件
  • Reconciler:是用户需要提供来处理自己的业务逻辑的组件(即在通过 code-generator 生成的api-like而实现的controller中的业务处理部分)。
  • Cache:一个缓存,用来建立 InformerApiServer 的连接来监听资源并将被监听的对象推送到queue中。
  • Controller: 一方面向 Informer 注册 eventHandler,另一方面从队列中获取数据。controller 将从队列中获取数据并执行用户自定义的 Reconciler 功能。

image

图:controller-runtime structure

image

图:controller-runtime flowchart

由图可知,Controller会向 Informer 注册一些列eventHandler;然后Cache启动Informer(informer属于cache包中),与ApiServer建立监听;当Informer检测到资源变化时,将对象加入queue,Controller 将元素取出并在用户端执行 Reconciler。

Controller引入#

我们从 controller-rumtime项目的 example 进行引入看下,整个架构都是如何实现的。

可以看到 example 下的实际上实现了一个 reconciler 的结构体,实现了 Reconciler 抽象和 Client 结构体

type reconciler struct {
	client.Client
	scheme *runtime.Scheme
}

那么来看下 抽象的 Reconciler 是什么,可以看到就是抽象了 Reconcile 方法,这个是具体处理的逻辑过程

type Reconciler interface {
	Reconcile(context.Context, Request) (Result, error)
}

下面在看下谁来实现了这个 Reconciler 抽象

type Controller interface {
	reconcile.Reconciler // 协调的具体步骤,通过ns/name\
    // 通过predicates来评估来源数据,并加入queue中(放入队列的是reconcile.Requests)
	Watch(src source.Source, eventhandler handler.EventHandler, predicates ...predicate.Predicate) error
    // 启动controller,类似于自定义的Run()
	Start(ctx context.Context) error
	GetLogger() logr.Logger
}

controller structure#

controller-runtime\pkg\internal\controller\controller.go 中实现了这个 Controller

type Controller struct {
	Name string // controller的标识
    
	MaxConcurrentReconciles int // 并发运行Reconciler的数量,默认1
	// 实现了reconcile.Reconciler的调节器, 默认DefaultReconcileFunc
	Do reconcile.Reconciler
	// makeQueue会构建一个对应的队列,就是返回一个限速队列
	MakeQueue func() workqueue.RateLimitingInterface
	// MakeQueue创造出来的,在出入队列就是操作的这个
	Queue workqueue.RateLimitingInterface

	// 用于注入其他内容
    // 已弃用
	SetFields func(i interface{}) error

	mu sync.Mutex
	// 标识开始的状态
	Started bool
	// 在启动时传递的上下文,用于停止控制器
	ctx context.Context
	// 等待缓存同步的时间 默认2分钟
	CacheSyncTimeout time.Duration

	// 维护了eventHandler predicates,在控制器启动时启动
	startWatches []watchDescription

	// 日志构建器,输出入日志
	LogConstructor func(request *reconcile.Request) logr.Logger

	// RecoverPanic为是否对reconcile引起的panic恢复
	RecoverPanic bool
}

看完了controller的structure,接下来看看controller是如何使用的

injection#

Controller.Watch 实现了注入的动作,可以看到 watch() 通过参数将 对应的事件函数传入到内部

func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
	c.mu.Lock()
	defer c.mu.Unlock()

	// 使用SetFields来完成注入操作
	if err := c.SetFields(src); err != nil {
		return err
	}
	if err := c.SetFields(evthdler); err != nil {
		return err
	}
	for _, pr := range prct {
		if err := c.SetFields(pr); err != nil {
			return err
		}
	}

	// 如果Controller还未启动,那么将这些动作缓存到本地
	if !c.Started {
		c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct})
		return nil
	}

	c.LogConstructor(nil).Info("Starting EventSource", "source", src)
	return src.Start(c.ctx, evthdler, c.Queue, prct...)
}

启动操作实际上为informer注入事件函数

type Source interface {
	// start 是Controller 调用,用以向 Informer 注册 EventHandler, 将 reconcile.Requests(一个入队列的动作) 排入队列。
	Start(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
}

func (is *Informer) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,
	prct ...predicate.Predicate) error {
	// Informer should have been specified by the user.
	if is.Informer == nil {
		return fmt.Errorf("must specify Informer.Informer")
	}

	is.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
	return nil
}

我们知道对于 eventHandler,实际上应该是一个 onAddonUpdate 这种类型的函数,queue则是workqueue,那么 Predicates 是什么呢?

通过追踪可以看到定义了 Predicate 抽象,可以看出Predicate 是Watch到的事件时什么类型的,当对于每个类型的事件,对应的函数就为 true,在 eventHandler 中,这些被用作,事件的过滤。

// Predicate filters events before enqueuing the keys.
type Predicate interface {
	// Create returns true if the Create event should be processed
	Create(event.CreateEvent) bool

	// Delete returns true if the Delete event should be processed
	Delete(event.DeleteEvent) bool

	// Update returns true if the Update event should be processed
	Update(event.UpdateEvent) bool

	// Generic returns true if the Generic event should be processed
	Generic(event.GenericEvent) bool
}

在对应的动作中,可以看到这里作为过滤操作

func (e EventHandler) OnAdd(obj interface{}) {
	c := event.CreateEvent{}

	// Pull Object out of the object
	if o, ok := obj.(client.Object); ok {
		c.Object = o
	} else {
		log.Error(nil, "OnAdd missing Object",
			"object", obj, "type", fmt.Sprintf("%T", obj))
		return
	}

	for _, p := range e.Predicates {
		if !p.Create(c) {
			return
		}
	}

	// Invoke create handler
	e.EventHandler.Create(c, e.Queue)
}

上面就看到了,对应是 EventHandler.Create 进行添加的,那么这些动作具体是在做什么呢?

在代码 pkg/handler ,可以看到这些操作,类似于create,这里将ns/name放入到队列中。

func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
	if evt.Object == nil {
		enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
		return
	}
	q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
		Name:      evt.Object.GetName(),
		Namespace: evt.Object.GetNamespace(),
	}})
}

unqueue#

上面看到了,入队的动作实际上都是将 ns/name 加入到队列中,那么出队列时又做了些什么呢?

通过 controller.Start() 可以看到controller在启动后都做了些什么动作

func (c *Controller) Start(ctx context.Context) error {
	c.mu.Lock()
	if c.Started {
		return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times")
	}

	c.initMetrics()

	// Set the internal context.
	c.ctx = ctx

	c.Queue = c.MakeQueue() // 初始化queue
	go func() { // 退出时,让queue关闭
		<-ctx.Done()
		c.Queue.ShutDown()
	}()

	wg := &sync.WaitGroup{}
	err := func() error {
		defer c.mu.Unlock()
		defer utilruntime.HandleCrash()

		// 启动informer前,将之前准备好的 evnetHandle predictates source注册
		for _, watch := range c.startWatches {
			c.LogConstructor(nil).Info("Starting EventSource", "source", fmt.Sprintf("%s", watch.src))
				// 上面我们看过了,start就是真正的注册动作
			if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
				return err
			}
		}

		// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
		c.LogConstructor(nil).Info("Starting Controller")
		 // startWatches上面我们也看到了,是evnetHandle predictates source被缓存到里面,
        // 这里是拿出来将其启动
		for _, watch := range c.startWatches {
			syncingSource, ok := watch.src.(source.SyncingSource)
			if !ok {
				continue
			}

			if err := func() error {
				// use a context with timeout for launching sources and syncing caches.
				sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
				defer cancel()

				// WaitForSync waits for a definitive timeout, and returns if there
				// is an error or a timeout
				if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
					err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
					c.LogConstructor(nil).Error(err, "Could not wait for Cache to sync")
					return err
				}

				return nil
			}(); err != nil {
				return err
			}
		}

		// which won't be garbage collected if we hold a reference to it.
		c.startWatches = nil

		// Launch workers to process resources
		c.LogConstructor(nil).Info("Starting workers", "worker count", c.MaxConcurrentReconciles)
		wg.Add(c.MaxConcurrentReconciles)
        // 启动controller消费端的线程
		for i := 0; i < c.MaxConcurrentReconciles; i++ {
			go func() {
				defer wg.Done()
				for c.processNextWorkItem(ctx) {
				}
			}()
		}

		c.Started = true
		return nil
	}()
	if err != nil {
		return err
	}

	<-ctx.Done() // 阻塞,直到上下文关闭
	c.LogConstructor(nil).Info("Shutdown signal received, waiting for all workers to finish")
	wg.Wait() // 等待所有线程都关闭
	c.LogConstructor(nil).Info("All workers finished")
	return nil
}

通过上面的分析,可以看到,每个消费的worker线程,实际上调用的是 processNextWorkItem 下面就来看看他究竟做了些什么?

func (c *Controller) processNextWorkItem(ctx context.Context) bool {
	obj, shutdown := c.Queue.Get() // 从队列中拿取数据
	if shutdown {
		return false
	}

	defer c.Queue.Done(obj)
	// 下面应该是prometheus指标的一些东西
	ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1)
	defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1)
	// 获得的对象通过reconcileHandler处理
	c.reconcileHandler(ctx, obj)
	return true
}

那么下面看看 reconcileHandler 做了些什么

func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
	// Update metrics after processing each item
	reconcileStartTS := time.Now()
	defer func() {
		c.updateMetrics(time.Since(reconcileStartTS))
	}()

	// 检查下取出的数据是否为reconcile.Request,在之前enqueue时了解到是插入的这个类型的值
	req, ok := obj.(reconcile.Request)
	if !ok {
		// 如果错了就忘记
		c.Queue.Forget(obj)
		c.LogConstructor(nil).Error(nil, "Queue item was not a Request", "type", fmt.Sprintf("%T", obj), "value", obj)
		return
	}

	log := c.LogConstructor(&req)

	log = log.WithValues("reconcileID", uuid.NewUUID())
	ctx = logf.IntoContext(ctx, log)

	// 这里调用了自己在实现controller实现的Reconcile的动作
	result, err := c.Reconcile(ctx, req)
	switch {
	case err != nil:
		c.Queue.AddRateLimited(req)
		ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc()
		ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc()
		log.Error(err, "Reconciler error")
	case result.RequeueAfter > 0:
		c.Queue.Forget(obj)
		c.Queue.AddAfter(req, result.RequeueAfter)
		ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc()
	case result.Requeue:
		c.Queue.AddRateLimited(req)
		ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Inc()
	default:
		c.Queue.Forget(obj)
		ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelSuccess).Inc()
	}
}

通过对example中的 Reconcile 查找其使用,可以看到,调用他的就是上面我们说道的 reconcileHandler ,到这里我们就知道了,controller 的运行流为 Controller.Start() > Controller.processNextWorkItem > Controller.reconcileHandler > Controller.Reconcile 最终到达了我们自定义的业务逻辑处理 Reconcile

image

Manager#

在上面学习 controller-runtime 时了解到,有一个 Manager 的组件,这个组件是做什么呢?我们来分析下。

Manager 是用来创建与启动 controller 的(允许多个 controller 与 一个 manager 关联),Manager会启动分配给他的所有controller,以及其他可启动的对象。

example 看到,会初始化一个 ctrl.NewManager

func main() {
   ctrl.SetLogger(zap.New())

   mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{})
   if err != nil {
      setupLog.Error(err, "unable to start manager")
      os.Exit(1)
   }

   // in a real controller, we'd create a new scheme for this
   err = api.AddToScheme(mgr.GetScheme())
   if err != nil {
      setupLog.Error(err, "unable to add scheme")
      os.Exit(1)
   }

   err = ctrl.NewControllerManagedBy(mgr).
      For(&api.ChaosPod{}).
      Owns(&corev1.Pod{}).
      Complete(&reconciler{
         Client: mgr.GetClient(),
         scheme: mgr.GetScheme(),
      })
   if err != nil {
      setupLog.Error(err, "unable to create controller")
      os.Exit(1)
   }

   err = ctrl.NewWebhookManagedBy(mgr).
      For(&api.ChaosPod{}).
      Complete()
   if err != nil {
      setupLog.Error(err, "unable to create webhook")
      os.Exit(1)
   }

   setupLog.Info("starting manager")
   if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
      setupLog.Error(err, "problem running manager")
      os.Exit(1)
   }
}

这个 manager 就是 controller-runtime\pkg\manager\manager.go 下的 Manager, Manager 通过初始化 Caches 和 Clients 等共享依赖,并将它们提供给 Runnables。

type Manager interface {
	// 提供了与APIServer交互的方式,如incluster,indexer,cache等
	cluster.Cluster

    // Runnable 是任意可允许的cm中的组件,如 webhook,controller,Caches,在new中调用时,
    // 可以看到是传入的是一个controller,这里可以启动的是带有Start()方法的,通过调用Start()
    // 来启动组件
    Add(Runnable) error
    
    // 实现选举方法。当elected关闭,则选举为leader
	Elected() <-chan struct{}

	// 这为一些列健康检查和指标的方法,和我们关注的没有太大关系
	AddMetricsExtraHandler(path string, handler http.Handler) error
	AddHealthzCheck(name string, check healthz.Checker) error
	AddReadyzCheck(name string, check healthz.Checker) error

	// Start将启动所有注册进来的控制器,直到ctx取消。如果有任意controller报错,则立即退出
    // 如果使用了 LeaderElection,则必须在此返回后立即退出二进制文件,
	Start(ctx context.Context) error

	// GetWebhookServer returns a webhook.Server
	GetWebhookServer() *webhook.Server

	// GetLogger returns this manager's logger.
	GetLogger() logr.Logger

	// GetControllerOptions returns controller global configuration options.
	GetControllerOptions() v1alpha1.ControllerConfigurationSpec
}

controller-manager#

controllerManager 则实现了这个manager的抽象

type controllerManager struct {
	sync.Mutex
	started bool

	stopProcedureEngaged *int64
	errChan              chan error
	runnables            *runnables
	
	cluster cluster.Cluster

	// recorderProvider 用于记录eventhandler source predictate
	recorderProvider *intrec.Provider

	// resourceLock forms the basis for leader election
	resourceLock resourcelock.Interface

	// 在退出时是否关闭选举租约
	leaderElectionReleaseOnCancel bool
	// 一些指标性的,暂时不需要关注
	metricsListener net.Listener
	metricsExtraHandlers map[string]http.Handler
	healthProbeListener net.Listener
	readinessEndpointName string
	livenessEndpointName string
	readyzHandler *healthz.Handler
	healthzHandler *healthz.Handler

	// 有关controller全局参数
	controllerOptions v1alpha1.ControllerConfigurationSpec

	logger logr.Logger

	// 用于关闭 LeaderElection.Run(...) 的信号
	leaderElectionStopped chan struct{}

    // 取消选举,在失去选举后,必须延迟到gracefulShutdown之后os.exit()
	leaderElectionCancel context.CancelFunc

	// leader取消选举
	elected chan struct{}

	port int
	host string
	certDir string
	webhookServer *webhook.Server
	webhookServerOnce sync.Once
	// 非leader节点强制leader的等待时间
	leaseDuration time.Duration
	// renewDeadline is the duration that the acting controlplane will retry
	// refreshing leadership before giving up.
	renewDeadline time.Duration
	// LeaderElector重新操作的时间
	retryPeriod time.Duration
	// gracefulShutdownTimeout 是在manager停止之前让runnables停止的持续时间。
	gracefulShutdownTimeout time.Duration

	// onStoppedLeading is callled when the leader election lease is lost.
	// It can be overridden for tests.
	onStoppedLeading func()

	shutdownCtx context.Context
	internalCtx    context.Context
	internalCancel context.CancelFunc
	internalProceduresStop chan struct{}
}

workflow#

了解完ControllerManager之后,我们通过 example 来看看 ControllerManager 的workflow

func main() {
   ctrl.SetLogger(zap.New())
   // New一个manager
   mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{})
   if err != nil {
      setupLog.Error(err, "unable to start manager")
      os.Exit(1)
   }

   // in a real controller, we'd create a new scheme for this
   err = api.AddToScheme(mgr.GetScheme())
   if err != nil {
      setupLog.Error(err, "unable to add scheme")
      os.Exit(1)
   }

   err = ctrl.NewControllerManagedBy(mgr).
      For(&api.ChaosPod{}).
      Owns(&corev1.Pod{}).
      Complete(&reconciler{
         Client: mgr.GetClient(),
         scheme: mgr.GetScheme(),
      })
   if err != nil {
      setupLog.Error(err, "unable to create controller")
      os.Exit(1)
   }

   err = ctrl.NewWebhookManagedBy(mgr).
      For(&api.ChaosPod{}).
      Complete()
   if err != nil {
      setupLog.Error(err, "unable to create webhook")
      os.Exit(1)
   }

   setupLog.Info("starting manager")
   if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
      setupLog.Error(err, "problem running manager")
      os.Exit(1)
   }
}
  • 通过 manager.New() 初始化一个manager,这里面会初始化一些列的manager的参数
  • 通过 ctrl.NewControllerManagedBy 注册 controller 到manager中
    • ctrl.NewControllerManagedBy 是 builder的一个别名,构建出一个builder类型的controller
    • builder 中的 ctrl 就是 controller
  • 启动manager

builder#

下面看来看下builder在构建时做了什么

// Builder builds a Controller.
type Builder struct {
	forInput         ForInput
	ownsInput        []OwnsInput
	watchesInput     []WatchesInput
	mgr              manager.Manager
	globalPredicates []predicate.Predicate
	ctrl             controller.Controller
	ctrlOptions      controller.Options
	name             string
}

我们看到 example 中是调用了 For() 动作,那么这个 For() 是什么呢?

通过注释,我们可以看到 For() 提供了 调解对象类型,ControllerManagedBy 通过 reconciling object 来相应对应create/delete/update 事件。调用 For() 相当于调用了 Watches(&source.Kind{Type: apiType}, &handler.EnqueueRequestForObject{})

func (blder *Builder) For(object client.Object, opts ...ForOption) *Builder {
	if blder.forInput.object != nil {
		blder.forInput.err = fmt.Errorf("For(...) should only be called once, could not assign multiple objects for reconciliation")
		return blder
	}
	input := ForInput{object: object}
	for _, opt := range opts {
		opt.ApplyToFor(&input) //最终把我们要监听的对象每个 opts注册进去
	}

	blder.forInput = input
	return blder
}

接下来是调用的 Owns()Owns() 看起来和 For() 功能是类似的。只是说属于不同,是通过Owns方法设置的

func (blder *Builder) Owns(object client.Object, opts ...OwnsOption) *Builder {
	input := OwnsInput{object: object}
	for _, opt := range opts {
		opt.ApplyToOwns(&input)
	}

	blder.ownsInput = append(blder.ownsInput, input)
	return blder
}

最后到了 Complete(),Complete 是完成这个controller的构建

// Complete builds the Application Controller.
func (blder *Builder) Complete(r reconcile.Reconciler) error {
	_, err := blder.Build(r)
	return err
}

// Build 创建控制器并返回
func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) {
	if r == nil {
		return nil, fmt.Errorf("must provide a non-nil Reconciler")
	}
	if blder.mgr == nil {
		return nil, fmt.Errorf("must provide a non-nil Manager")
	}
	if blder.forInput.err != nil {
		return nil, blder.forInput.err
	}
	// Checking the reconcile type exist or not
	if blder.forInput.object == nil {
		return nil, fmt.Errorf("must provide an object for reconciliation")
	}

	// Set the ControllerManagedBy
	if err := blder.doController(r); err != nil {
		return nil, err
	}

	// Set the Watch
	if err := blder.doWatch(); err != nil {
		return nil, err
	}

	return blder.ctrl, nil
}

这里面可以看到,会完成 doController 和 doWatch

doController会初始化好这个controller并返回

func (blder *Builder) doController(r reconcile.Reconciler) error {
	globalOpts := blder.mgr.GetControllerOptions()

	ctrlOptions := blder.ctrlOptions
	if ctrlOptions.Reconciler == nil {
		ctrlOptions.Reconciler = r
	}

	// 通过检索GVK获得默认的名称
	gvk, err := getGvk(blder.forInput.object, blder.mgr.GetScheme())
	if err != nil {
		return err
	}

	// 设置并发,如果最大并发为0则找到一个
    // 追踪下去看似是对于没有设置时,例如会根据 app group中的 ReplicaSet设定
    // 就是在For()传递的一个类型的数量来确定并发的数量
	if ctrlOptions.MaxConcurrentReconciles == 0 {
		groupKind := gvk.GroupKind().String()

		if concurrency, ok := globalOpts.GroupKindConcurrency[groupKind]; ok && concurrency > 0 {
			ctrlOptions.MaxConcurrentReconciles = concurrency
		}
	}

	// Setup cache sync timeout.
	if ctrlOptions.CacheSyncTimeout == 0 && globalOpts.CacheSyncTimeout != nil {
		ctrlOptions.CacheSyncTimeout = *globalOpts.CacheSyncTimeout
	}
	// 给controller一个name,如果没有初始化传递,则使用Kind做名称
	controllerName := blder.getControllerName(gvk)

	// Setup the logger.
	if ctrlOptions.LogConstructor == nil {
		log := blder.mgr.GetLogger().WithValues(
			"controller", controllerName,
			"controllerGroup", gvk.Group,
			"controllerKind", gvk.Kind,
		)

		lowerCamelCaseKind := strings.ToLower(gvk.Kind[:1]) + gvk.Kind[1:]

		ctrlOptions.LogConstructor = func(req *reconcile.Request) logr.Logger {
			log := log
			if req != nil {
				log = log.WithValues(
					lowerCamelCaseKind, klog.KRef(req.Namespace, req.Name),
					"namespace", req.Namespace, "name", req.Name,
				)
			}
			return log
		}
	}

	// 这里就是构建一个新的控制器了,也就是前面说到的  manager.New()
	blder.ctrl, err = newController(controllerName, blder.mgr, ctrlOptions)
	return err
}

manager.New()

start Manager#

接下来是manager的启动,也就是对应的 start()doWatch()

通过下述代码我们可以看出来,对于 doWatch() 就是把 compete() 前的一些资源的事件函数都注入到controller 中

func (blder *Builder) doWatch() error {
	// 调解类型,这也也就是对于For的obj来说,我们需要的是什么结构的,如非结构化数据或metadata-only
    // metadata-only就是配置成一个GVK schema.GroupVersionKind
	typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
	if err != nil {
		return err
    }&source.Kind{}
    // 一些准备工作,将对象封装为&source.Kind{}
    // 
	src := &source.Kind{Type: typeForSrc}
	hdler := &handler.EnqueueRequestForObject{} // 就是包含obj的一个事件队列
	allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
	// 这里又到之前说过的controller watch了
    // 将一系列的准备动作注入到cache 如 source eventHandler predicate
    if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
		return err
	}

	// 再重复 ownsInput 动作
	for _, own := range blder.ownsInput {
		typeForSrc, err := blder.project(own.object, own.objectProjection)
		if err != nil {
			return err
		}
		src := &source.Kind{Type: typeForSrc}
		hdler := &handler.EnqueueRequestForOwner{
			OwnerType:    blder.forInput.object,
			IsController: true,
		}
		allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
		allPredicates = append(allPredicates, own.predicates...)
		if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
			return err
		}
	}

	// 在对 ownsInput 进行重复的操作
	for _, w := range blder.watchesInput {
		allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
		allPredicates = append(allPredicates, w.predicates...)

		// If the source of this watch is of type *source.Kind, project it.
		if srckind, ok := w.src.(*source.Kind); ok {
			typeForSrc, err := blder.project(srckind.Type, w.objectProjection)
			if err != nil {
				return err
			}
			srckind.Type = typeForSrc
		}

		if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil {
			return err
		}
	}
	return nil
}

由于前两部 builder 的操作将 mgr 指针传入到 builder中,并且操作了 complete() ,也就是操作了 build() ,这代表了对 controller 完成了初始化,和事件注入(watch)的操作,所以 Start(),就是将controller启动

func (cm *controllerManager) Start(ctx context.Context) (err error) {
	cm.Lock()
	if cm.started {
		cm.Unlock()
		return errors.New("manager already started")
	}
	var ready bool
	defer func() {
		if !ready {
			cm.Unlock()
		}
	}()

	// Initialize the internal context.
	cm.internalCtx, cm.internalCancel = context.WithCancel(ctx)

	// 这个channel代表了controller的停止
	stopComplete := make(chan struct{})
	defer close(stopComplete)
	// This must be deferred after closing stopComplete, otherwise we deadlock.
	defer func() {
		stopErr := cm.engageStopProcedure(stopComplete)
		if stopErr != nil {
			if err != nil {
				err = kerrors.NewAggregate([]error{err, stopErr})
			} else {
				err = stopErr
			}
		}
	}()

	// Add the cluster runnable.
	if err := cm.add(cm.cluster); err != nil {
		return fmt.Errorf("failed to add cluster to runnables: %w", err)
	}
    // 指标类
	if cm.metricsListener != nil {
		cm.serveMetrics()
	}
	if cm.healthProbeListener != nil {
		cm.serveHealthProbes()
	}
	if err := cm.runnables.Webhooks.Start(cm.internalCtx); err != nil {
		if !errors.Is(err, wait.ErrWaitTimeout) {
			return err
		}
	}

	// 等待informer同步完成
	if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil {
		if !errors.Is(err, wait.ErrWaitTimeout) {
			return err
		}
	}

	// 非选举模式,runnable将在cache同步完成后启动
	if err := cm.runnables.Others.Start(cm.internalCtx); err != nil {
		if !errors.Is(err, wait.ErrWaitTimeout) {
			return err
		}
	}

	// Start the leader election and all required runnables.
	{
		ctx, cancel := context.WithCancel(context.Background())
		cm.leaderElectionCancel = cancel
		go func() {
			if cm.resourceLock != nil {
				if err := cm.startLeaderElection(ctx); err != nil {
					cm.errChan <- err
				}
			} else {
				// Treat not having leader election enabled the same as being elected.
				if err := cm.startLeaderElectionRunnables(); err != nil {
					cm.errChan <- err
				}
				close(cm.elected)
			}
		}()
	}

	ready = true
	cm.Unlock()
	select {
	case <-ctx.Done():
		// We are done
		return nil
	case err := <-cm.errChan:
		// Error starting or running a runnable
		return err
	}
}

可以看到上面启动了4种类型的runnable,实际上就是对这runnable进行启动,例如 controller,cache等。

回顾一下,我们之前在使用code-generator 生成,并自定义controller时,我们也是通过启动 informer.Start() ,否则会报错。

最后可以通过一张关系图来表示,client-go与controller-manager之间的关系

image

Reference

diving controller runtime


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK