2

KubeController异步事件处理Infomer的实现

 1 year ago
source link: https://ninokop.github.io/2018/01/10/kubecontroller/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

KubeController异步事件处理Infomer的实现

Posted on

2018-01-10

|

Edited on 2019-05-27

| In Kubernetes

前两年根据Kube-Infomer的框架,写过route-controller实现gorouter和kubenretes集群中应用路由的转发,还写过一个stack-controller实现AOS的堆栈管理,当时对照1.5和1.7版本的代码做了些总结。1.9之后informer已经完全迁移到client-go了,根据1.9的代码整理一遍框架的源码实现。informer作为异步事件处理框架,完成了事件监听和分发处理两个过程。

informer

  1. 监听通过controller中的Reflector实现,上一节记录过reflector如何将listWatcher得到的事件写到Store里,这里informer使用的Store是DeltaFIFO,它支持实现完全按事件发生顺序的分发处理。
  2. 由Reflector生产的事件最终由processor消费。processor通过Pop队列里的事件,更新informer本地的indexer缓存,同时将事件distribute给所有的listerner。
  3. processor的listener由外部通过AddEventHandler注册,每个listener提供AddFunc UpdateFunc DeleteFunc方法。listener内部的实现加了一层缓存,用于存放pendingNotification。listerner最终实现了事件的分发,事件最终被注册的handler处理。

InfomerFactory

在DeploymentController初始化时,使用了PodInformer中的Lister()和Informer(),并通过AddEventHandler给Informer注册了事件分发处理的方式。Controller中使用的所有Informer都是从SharedInformerFactory中根据GroupVersionResource得到,同时informer的启动也是从这里开始start。

func Run(c *config.CompletedConfig) error { 
sharedInformerFactory := informers.NewSharedInformerFactory(
verClient, ResyncPeriod)
// startController
// podInformer := sharedInformerFactory.Core().V1().Pods()
sharedInformerFactory.Start()
select {}
}

Informer Register

informer的初始化通过client-go/tools/cache包提供的接口完成。而每个informer都通过其Informer接口实现向factory的注册。实际上一旦调用podInformer.Informer()就完成了注册,这是在startController中完成的,这之后就可以通过informerFactory.Start启动所有informer了。

func NewFilteredPodInformer(client kubernetes.Interface, namespace string,
resyncPeriod time.Duration, indexers cache.Indexers,
/*tweakListOptions*/) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return client.CoreV1().Pods(namespace).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return client.CoreV1().Pods(namespace).Watch(options)
},
},
&core_v1.Pod{},
resyncPeriod,
indexers,
)
}
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()

informerType := reflect.TypeOf(obj)
informer, exists := f.informers[informerType]
if exists {
return informer
}
informer = newFunc(f.client, f.defaultResync)
f.informers[informerType] = informer

return informer
}

Informer Run

最终informerFactory将注册到工厂的所有informer都启动,而informer启动后的工作就是事件监听和分发。cache.WaitCacheSync遍历所有informer看是否所有的informer都收到过事件,最终HasSynced的判断来自DeltaFIFO。

type sharedInformerFactory struct {
client clientset.Interface
lock sync.Mutex
defaultResync time.Duration

informers map[reflect.Type]cache.SharedIndexInformer
startedInformers map[reflect.Type]bool
}

func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()

for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
go informer.Run(stopCh)
f.startedInformers[informerType] = true
}
}
}

SharedInformer

informer作为异步事件处理框架,完成了事件监听和分发处理两个过程。成员中indexer是一个保存全量数据的缓存Store,informer对外提供的Lister通过Store完成,即Lister并没有直接操作etcd。

type sharedIndexInformer struct {
indexer Indexer
controller Controller

processor *sharedProcessor
cacheMutationDetector CacheMutationDetector

listerWatcher ListerWatcher
objectType runtime.Object

resyncCheckPeriod time.Duration
defaultEventHandlerResyncPeriod time.Duration
clock clock.Clock

started, stopped bool
startedLock sync.Mutex

blockDeltas sync.Mutex
}
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)

cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas,
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()

s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()

processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
wg.StartWithChannel(processorStopCh, s.processor.run)

s.controller.Run(stopCh)
}

Controller

controller的Run主要是个生产者消费者模式,reflector是生产者,而controller的Process函数s.HandleDeltas是事件的消费者。在controller的processLoop中不断地调用reflector的store的Pop消费事件,事件最终由sharedIndexInformer的HandleDeltas处理。

func (c *controller) Run(stopCh <-chan struct{}) {
go func() {
<-stopCh
c.config.Queue.Close()
}()
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
var wg wait.Group
defer wg.Wait()
wg.StartWithChannel(stopCh, r.Run)
wait.Until(c.processLoop, time.Second, stopCh)
}

func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
// handler err
}
}

DeltaFIFO

在informer框架中DeltaFIFO作为Reflector的Store,根据list watch结果对Store进行Add/Update/Delete等操作。数据结构中最重要的是items和queue,其中items缓存了几乎所有add到FIFO中的事件,它以[]Delta的形式存储,而queue则是存储这些事件的id作为FIFO处理的先后顺序。

跟UndeltaStore不同的是

type DeltaFIFO struct {
lock sync.RWMutex
cond sync.Cond

// We depend on the property that items in the set are in
// the queue and vice versa, and that all Deltas in this
// map have at least one Delta.
items map[string]Deltas
queue []string

populated bool
initialPopulationCount int

keyFunc KeyFunc
knownObjects KeyListerGetter

closed bool
closedLock sync.Mutex
}

所有对Store的增删改都会经过下面的函数,它负责将这个obj入队,并存储到items缓存中,即使是删除事件也进入items事件中等到处理。当DeltaFIFO有新的内容加入后通过调用f.cond.BroadCast通知所有在f.cond.Wait中的goroutine可以去尝试Lock。

func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
if actionType == Sync && f.willObjectBeDeletedLocked(id) {
return nil
}

newDeltas := append(f.items[id], Delta{actionType, obj})
newDeltas = dedupDeltas(newDeltas)
_, exists := f.items[id]
if len(newDeltas) > 0 {
if !exists {
f.queue = append(f.queue, id)
}
f.items[id] = newDeltas
f.cond.Broadcast()
} else if exists {
delete(f.items, id)
}
return nil
}

DeltaFIFO的生产者是Reflector,而消费者就是调用DeltaFIFO.Pop()的客户端。Pop方法的所有客户端都cond.Wait,但是只有当在cond.Wait中能真的获取到Lock才能从cond.Wait中返回。返回后取出queue的第一个id,在items获取该id对应的所有Delta事件,调用PopProcessFunc去处理。处理失败的item有可能再次加入队列。

处理失败后如果后续已经有deltas在缓存里,这些item就舍弃了。

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
if f.IsClosed() {
return nil, FIFOClosedError
}

f.cond.Wait()
}
id := f.queue[0]
f.queue = f.queue[1:]
item, ok := f.items[id]
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
if !ok {
// Item may have been deleted subsequently.
continue
}
delete(f.items, id)
err := process(item)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}

return item, err
}
}

ResourceEventHandler

通常往informer里添加的处理函数都满足以下接口,cache包里的ResourceEventHandlerFuncs刚好实现了这一组方法,因此只要注册AddFunc UpdatdFunc以及DeleteFunc即可。

type ResourceEventHandler interface {
OnAdd(obj interface{})
OnUpdate(oldObj, newObj interface{})
OnDelete(obj interface{})
}

在informer里AddEventHandler时,实际是向informer的processor里添加了listener,这个processorListener通过add run pop三个基本方法对外提供事件分发处理的功能。下面是简化删减版的注册方法。

func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler, 
resyncPeriod time.Duration) {
s.startedLock.Lock()
defer s.startedLock.Unlock()

listener := newProcessListener(handler, resyncPeriod,
determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod),
s.clock.Now(), initialBufferSize)

s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()

s.processor.addListener(listener)
}

sharedProcessor

注册到controller里的处理函数是HandleDeltas,它主要通过processor分发事件。除了分发事件以外,处理函数会同时更新informer本地的store。

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Added, Updated:
isSync := d.Type == Sync
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
s.indexer.Update(d.Object)
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
s.indexer.Add(d.Object)
s.processor.distribute(addNotification{newObj: d.Object}, isSync)
}
case Deleted:
s.indexer.Delete(d.Object)
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}

在消费事件时,通过informer的processor进行distrubute。processor进行分发的处理函数由外部通过AddEventHandler,向processor里addListener。其中addListener只是增加一个processor管理的listener,并在分发时遍历listeners,将事件发给所有的listener。

type sharedProcessor struct {
listenersStarted bool
listenersLock sync.RWMutex
listeners []*processorListener
syncingListeners []*processorListener
clock clock.Clock
wg wait.Group
}

func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
listener.add(obj)
}
}

processor的run保证所有listener都开始运行,并保证退出时所有listener的chan都关闭

func (p *sharedProcessor) run(stopCh <-chan struct{}) {
func() {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
p.listenersStarted = true
}()
<-stopCh
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
// Tell .pop() to stop. .pop() will tell .run() to stop
close(listener.addCh)
}
p.wg.Wait() // Wait for all .pop() and .run() to stop
}

processorListener

pendingNotifications装了所有还没分发的事件。其中处理事件processor开始distribute时,会调用listener的add方法,将事件发到addCh上。

buffer.NewRingGrowing

type processorListener struct {
nextCh chan interface{}
addCh chan interface{}

handler ResourceEventHandler
pendingNotifications buffer.RingGrowing

requestedResyncPeriod time.Duration
resyncPeriod time.Duration

nextResync time.Time
resyncLock sync.Mutex
}

func (p *processorListener) add(notification interface{}) {
p.addCh <- notification
}

listener的pop goroutine不断地从addCh中获取事件,写到本地的pendingNotification或写给nextCh,而nextCh从本地pendingNotification或addCh获取事件。最后由run方法消费事件和分发事件。run方法支持指数重试,退出也会重新开始。

func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
defer close(p.nextCh) // Tell .run() to stop

var nextCh chan<- interface{}
var notification interface{}
for {
select {
case nextCh <- notification:
var ok bool
notification, ok = p.pendingNotifications.ReadOne()
if !ok {
nextCh = nil
}
case notificationToAdd, ok := <-p.addCh:
if !ok {
return
}
if notification == nil {
notification = notificationToAdd
nextCh = p.nextCh
} else {
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}
func (p *processorListener) run() {
stopCh := make(chan struct{})
wait.Until(func() {
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
}
}
return true, nil
})
if err == nil {
close(stopCh)
}
}, 1*time.Minute, stopCh)
}

NewController

Kube-controller-manager是一个controller的集合,它实现了异步事件通知的一个通用框架,所有controller都以它为中心工作。以deploymentController为例记录下controller的实现逻辑。

func startDeploymentController(ctx ControllerContext) (bool, error) {
go deployment.NewDeploymentController(
ctx.InformerFactory.Extensions().V1beta1().Deployments(),
ctx.InformerFactory.Extensions().V1beta1().ReplicaSets(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.ClientBuilder.ClientOrDie("deployment-controller"),
).Run(int(ctx.Options.ConcurrentDeploymentSyncs), ctx.Stop)
return true, nil
}

DeploymentController的数据结构里listerSynced返回true表示这个lister已经至少开始工作了,Lister则是从informers的缓存中get数据的通道,rsControl提供一组操作RS的接口。节选一部分代码内容。

type DeploymentController struct {
rsControl controller.RSControlInterface
client clientset.Interface
eventRecorder record.EventRecorder

syncHandler func(dKey string) error
enqueueDeployment func(deployment *extensions.Deployment)

dLister extensionslisters.DeploymentLister
dListerSynced cache.InformerSynced
queue workqueue.RateLimitingInterface
}

func NewDeploymentController(dInformer extensionsinformers.DeploymentInformer,
rsInformer extensionsinformers.ReplicaSetInformer,
podInformer coreinformers.PodInformer,
client clientset.Interface) *DeploymentController {
dc := &DeploymentController{
client: client,
queue: workqueue.NewNamedRateLimitingQueue(
workqueue.DefaultControllerRateLimiter(), "deployment"),
}
dc.rsControl = controller.RealRSControl{
KubeClient: client,
Recorder: dc.eventRecorder,
}
dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dc.addDeployment,
UpdateFunc: dc.updateDeployment,
DeleteFunc: dc.deleteDeployment,
})

dc.syncHandler = dc.syncDeployment
dc.enqueueDeployment = dc.enqueue
dc.dLister = dInformer.Lister()
dc.dListerSynced = dInformer.Informer().HasSynced
return dc
}

Controller.Run的过程首先等待所有informer都工作之后,开始并发的起N个goroutine来处理事件分发。这个goroutine循环处理一个事情,就是从queue里拿任务交给syncHandler处理。

queue何时退出

func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer dc.queue.ShutDown()

if !controller.WaitForCacheSync("deployment", stopCh, dc.dListerSynced,
dc.rsListerSynced, dc.podListerSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(dc.worker, time.Second, stopCh)
}
<-stopCh
}

func (dc *DeploymentController) worker() {
for dc.processNextWorkItem() {
}
}

func (dc *DeploymentController) processNextWorkItem() bool {
key, quit := dc.queue.Get()
if quit {
return false
}
defer dc.queue.Done(key)
err := dc.syncHandler(key.(string))
dc.handleErr(err, key)
return true
}

queue当中的事件是如何进队的呢,通过注册到informer上的eventHandler处理的,比如当有新的deployment创建时,最终将要处理的deployment的key进队。

func (dc *DeploymentController) addDeployment(obj interface{}) {
d := obj.(*extensions.Deployment)
dc.enqueueDeployment(d)
}

func (dc *DeploymentController) enqueue(deployment *extensions.Deployment) {
key, err := controller.KeyFunc(deployment)
// process error
dc.queue.Add(key)
}

Question

  1. 为什么kubelet使用的UndeltaStore 而controller使用informer
  2. sharedInformer的shared体现在缓存indexer是公用的吗
  3. listerner使用的buffer作用
  4. controller中的queue如何做到限流

Reference

sync.Cond的源码观察


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK