4

浅析kubernetes中client-go Informer - Cylon

 1 year ago
source link: https://www.cnblogs.com/Cylon/p/16311233.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

之前了解了client-go中的架构设计,也就是 tools/cache 下面的一些概念,那么下面将对informer进行分析

Controller#

在client-go informer架构中存在一个 controller ,这个不是 Kubernetes 中的Controller组件;而是在 tools/cache 中的一个概念,controller 位于 informer 之下,Reflector 之上。code

Config#

从严格意义上来讲,controller 是作为一个 sharedInformer 使用,通过接受一个 Config ,而 Reflector 则作为 controller 的 slot。Config 则包含了这个 controller 里所有的设置。

type Config struct {
	Queue // DeltaFIFO
	ListerWatcher // 用于list watch的
	Process ProcessFunc // 定义如何从DeltaFIFO中弹出数据后处理的操作
	ObjectType runtime.Object // Controller处理的对象数据,实际上就是kubernetes中的资源
	FullResyncPeriod time.Duration // 全量同步的周期
	ShouldResync ShouldResyncFunc // Reflector通过该标记来确定是否应该重新同步
	RetryOnError bool
}

controller#

然后 controller 又为 reflertor 的上层

type controller struct {
	config         Config
	reflector      *Reflector 
	reflectorMutex sync.RWMutex
	clock          clock.Clock
}

type Controller interface {
	// controller 主要做两件事,
    // 1. 构建并运行 Reflector,将listerwacther中的泵压到queue(Delta fifo)中
    // 2. Queue用Pop()弹出数据,具体的操作是Process
    // 直到 stopCh 不阻塞,这两个协程将退出
	Run(stopCh <-chan struct{})
	HasSynced() bool // 这个实际上是从store中继承的,标记这个controller已经
	LastSyncResourceVersion() string
}

controller 中的方法,仅有一个 Run()New();这意味着,controller 只是一个抽象的概念,作为 Reflector, Delta FIFO 整合的工作流

1380340-20220525222439484-929645670.png

controller 则是 SharedInformer 了。

Queue#

这里的 queue 可以理解为是一个具有 Pop() 功能的 Indexer ;而 Pop() 的功能则是 controller 中的一部分;也就是说 queue 是一个扩展的 StoreStore 是不具备弹出功能的。

type Queue interface {
	Store
	// Pop会阻塞等待,直到有内容弹出,删除对应的值并处理计数器
	Pop(PopProcessFunc) (interface{}, error)

	// AddIfNotPresent puts the given accumulator into the Queue (in
	// association with the accumulator's key) if and only if that key
	// is not already associated with a non-empty accumulator.
	AddIfNotPresent(interface{}) error

	// HasSynced returns true if the first batch of keys have all been
	// popped.  The first batch of keys are those of the first Replace
	// operation if that happened before any Add, Update, or Delete;
	// otherwise the first batch is empty.
	HasSynced() bool
	Close() // 关闭queue
}

而弹出的操作是通过 controller 中的 processLoop() 进行的,最终走到Delta FIFO中进行处理。

通过忙等待去读取要弹出的数据,然后在弹出前 通过PopProcessFunc 进行处理

func (c *controller) processLoop() {
	for {
		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
		if err != nil {
			if err == ErrFIFOClosed {
				return
			}
			if c.config.RetryOnError {
				// This is the safe way to re-enqueue.
				c.config.Queue.AddIfNotPresent(obj)
			}
		}
	}
}

DeltaFIFO.Pop()

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
	f.lock.Lock()
	defer f.lock.Unlock()
	for {
		for len(f.queue) == 0 {
			// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
			// When Close() is called, the f.closed is set and the condition is broadcasted.
			// Which causes this loop to continue and return from the Pop().
			if f.IsClosed() {
				return nil, ErrFIFOClosed
			}

			f.cond.Wait()
		}
		id := f.queue[0]
		f.queue = f.queue[1:]
		if f.initialPopulationCount > 0 {
			f.initialPopulationCount--
		}
		item, ok := f.items[id]
		if !ok {
			// Item may have been deleted subsequently.
			continue
		}
		delete(f.items, id)
		err := process(item) // 进行处理
		if e, ok := err.(ErrRequeue); ok {
			f.addIfNotPresent(id, item) // 如果失败,再重新加入到队列中
			err = e.Err 
		}
		// Don't need to copyDeltas here, because we're transferring
		// ownership to the caller.
		return item, err
	}
}

Informer#

通过对 Reflector, Store, Queue, ListerWatcherProcessFunc, 等的概念,发现由 controller 所包装的起的功能并不能完成通过对API的动作监听,并通过动作来处理本地缓存的一个能力;这个情况下诞生了 informer 严格意义上来讲是 sharedInformer

func newInformer(
	lw ListerWatcher,
	objType runtime.Object,
	resyncPeriod time.Duration,
	h ResourceEventHandler,
	clientState Store,
) Controller {
	// This will hold incoming changes. Note how we pass clientState in as a
	// KeyLister, that way resync operations will result in the correct set
	// of update/delete deltas.
	fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
		KnownObjects:          clientState,
		EmitDeltaTypeReplaced: true,
	})

	cfg := &Config{
		Queue:            fifo,
		ListerWatcher:    lw,
		ObjectType:       objType,
		FullResyncPeriod: resyncPeriod,
		RetryOnError:     false,

		Process: func(obj interface{}) error {
			// from oldest to newest
			for _, d := range obj.(Deltas) {
				switch d.Type {
				case Sync, Replaced, Added, Updated:
					if old, exists, err := clientState.Get(d.Object); err == nil && exists {
						if err := clientState.Update(d.Object); err != nil {
							return err
						}
						h.OnUpdate(old, d.Object)
					} else {
						if err := clientState.Add(d.Object); err != nil {
							return err
						}
						h.OnAdd(d.Object)
					}
				case Deleted:
					if err := clientState.Delete(d.Object); err != nil {
						return err
					}
					h.OnDelete(d.Object)
				}
			}
			return nil
		},
	}
	return New(cfg)
}

newInformer是位于 tools/cache/controller.go 下,可以看出,这里面并没有informer的概念,这里通过注释可以看到,newInformer实际上是一个提供了存储和事件通知的informer。他关联的 queue 则是 Delta FIFO,并包含了 ProcessFunc, Store 等 controller的概念。最终对外的方法为 NewInformer()

func NewInformer(
	lw ListerWatcher,
	objType runtime.Object,
	resyncPeriod time.Duration,
	h ResourceEventHandler,
) (Store, Controller) {
	// This will hold the client state, as we know it.
	clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)

	return clientState, newInformer(lw, objType, resyncPeriod, h, clientState)
}

type ResourceEventHandler interface {
	OnAdd(obj interface{})
	OnUpdate(oldObj, newObj interface{})
	OnDelete(obj interface{})
}

可以看到 NewInformer() 就是一个带有 Store功能的controller,通过这些可以假定出,Informer 就是controller ,将queue中相关操作分发给不同事件处理的功能

SharedIndexInformer#

shareInformer 为客户端提供了与apiserver一致的数据对象本地缓存,并支持多事件处理程序的informer,而 shareIndexInformer 则是对shareInformer 的扩展

type SharedInformer interface {
	// AddEventHandler adds an event handler to the shared informer using the shared informer's resync
	// period.  Events to a single handler are delivered sequentially, but there is no coordination
	// between different handlers.
	AddEventHandler(handler ResourceEventHandler)
	// AddEventHandlerWithResyncPeriod adds an event handler to the
	// shared informer with the requested resync period; zero means
	// this handler does not care about resyncs.  The resync operation
	// consists of delivering to the handler an update notification
	// for every object in the informer's local cache; it does not add
	// any interactions with the authoritative storage.  Some
	// informers do no resyncs at all, not even for handlers added
	// with a non-zero resyncPeriod.  For an informer that does
	// resyncs, and for each handler that requests resyncs, that
	// informer develops a nominal resync period that is no shorter
	// than the requested period but may be longer.  The actual time
	// between any two resyncs may be longer than the nominal period
	// because the implementation takes time to do work and there may
	// be competing load and scheduling noise.
	AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
	// GetStore returns the informer's local cache as a Store.
	GetStore() Store
	// GetController is deprecated, it does nothing useful
	GetController() Controller
	// Run starts and runs the shared informer, returning after it stops.
	// The informer will be stopped when stopCh is closed.
	Run(stopCh <-chan struct{})
	// HasSynced returns true if the shared informer's store has been
	// informed by at least one full LIST of the authoritative state
	// of the informer's object collection.  This is unrelated to "resync".
	HasSynced() bool
	// LastSyncResourceVersion is the resource version observed when last synced with the underlying
	// store. The value returned is not synchronized with access to the underlying store and is not
	// thread-safe.
	LastSyncResourceVersion() string
}

SharedIndexInformer 是对SharedInformer的实现,可以从结构中看出,SharedIndexInformer 大致具有如下功能:

  • 索引本地缓存
  • controller,通过list watch拉取API并推入 Deltal FIFO
  • 事件的处理
type sharedIndexInformer struct {
	indexer    Indexer // 具有索引的本地缓存
	controller Controller // controller

	processor             *sharedProcessor // 事件处理函数集合
	cacheMutationDetector MutationDetector

	listerWatcher ListerWatcher
	objectType runtime.Object
	resyncCheckPeriod time.Duration
	defaultEventHandlerResyncPeriod time.Duration
	clock clock.Clock
	started, stopped bool
	startedLock      sync.Mutex
	blockDeltas sync.Mutex
}

而在 tools/cache/share_informer.go 可以看到 shareIndexInformer 的运行过程

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()

	fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
		KnownObjects:          s.indexer,
		EmitDeltaTypeReplaced: true,
	})

	cfg := &Config{
		Queue:            fifo,
		ListerWatcher:    s.listerWatcher,
		ObjectType:       s.objectType,
		FullResyncPeriod: s.resyncCheckPeriod,
		RetryOnError:     false,
		ShouldResync:     s.processor.shouldResync,

		Process: s.HandleDeltas, // process 弹出时操作的流程
	}

	func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()

		s.controller = New(cfg)
		s.controller.(*controller).clock = s.clock
		s.started = true
	}()

	// Separate stop channel because Processor should be stopped strictly after controller
	processorStopCh := make(chan struct{})
	var wg wait.Group
	defer wg.Wait()              // Wait for Processor to stop
	defer close(processorStopCh) // Tell Processor to stop
	wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
	wg.StartWithChannel(processorStopCh, s.processor.run) // 启动事件处理函数

	defer func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()
		s.stopped = true // Don't want any new listeners
	}()
    s.controller.Run(stopCh) // 启动controller,controller会启动Reflector和fifo的Pop()
}

而在操作Delta FIFO中可以看到,做具体操作时,会将动作分发至对应的事件处理函数中,这个是informer初始化时对事件操作的函数

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
	s.blockDeltas.Lock()
	defer s.blockDeltas.Unlock()


	for _, d := range obj.(Deltas) {
		switch d.Type {
		case Sync, Replaced, Added, Updated:
			s.cacheMutationDetector.AddObject(d.Object)
			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
				if err := s.indexer.Update(d.Object); err != nil {
					return err
				}

				isSync := false
				switch {
				case d.Type == Sync:
					isSync = true
				case d.Type == Replaced:
					if accessor, err := meta.Accessor(d.Object); err == nil {
						if oldAccessor, err := meta.Accessor(old); err == nil {
							isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
						}
					}
				}
                // 事件的分发
				s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
			} else {
				if err := s.indexer.Add(d.Object); err != nil {
					return err
				}
                // 事件的分发
				s.processor.distribute(addNotification{newObj: d.Object}, false)
			}
		case Deleted:
			if err := s.indexer.Delete(d.Object); err != nil {
				return err
			}
			s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
		}
	}
	return nil
}

事件处理函数 processor#

启动informer时也会启动注册进来的事件处理函数;processor 就是这个事件处理函数。

run() 函数会启动两个 listener,j监听事件处理业务函数 listener.run 和 事件的处理

wg.StartWithChannel(processorStopCh, s.processor.run)

func (p *sharedProcessor) run(stopCh <-chan struct{}) {
	func() {
		p.listenersLock.RLock()
		defer p.listenersLock.RUnlock()
		for _, listener := range p.listeners {
			p.wg.Start(listener.run) 
			p.wg.Start(listener.pop)
		}
		p.listenersStarted = true
	}()
	<-stopCh
	p.listenersLock.RLock()
	defer p.listenersLock.RUnlock()
	for _, listener := range p.listeners {
		close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
	}
	p.wg.Wait() // Wait for all .pop() and .run() to stop
}

可以看出,就是拿到的事件,根据注册的到informer的事件函数进行处理

func (p *processorListener) run() {
	stopCh := make(chan struct{})
	wait.Until(func() {
		for next := range p.nextCh { // 消费事件
			switch notification := next.(type) {
			case updateNotification:
				p.handler.OnUpdate(notification.oldObj, notification.newObj)
			case addNotification:
				p.handler.OnAdd(notification.newObj)
			case deleteNotification:
				p.handler.OnDelete(notification.oldObj)
			default:
				utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
			}
		}
		// the only way to get here is if the p.nextCh is empty and closed
		close(stopCh)
	}, 1*time.Second, stopCh)
}

informer中的事件的设计#

了解了informer如何处理事件,就需要学习下,informer的事件系统设计 prossorListener

事件的添加#

当在handleDelta时,会分发具体的事件

// 事件的分发
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)

此时,事件泵 Pop() 会根据接收到的事件进行处理

// run() 时会启动一个事件泵
p.wg.Start(listener.pop)

func (p *processorListener) pop() {
	defer utilruntime.HandleCrash()
	defer close(p.nextCh) 

	var nextCh chan<- interface{}
	var notification interface{}
	for {
		select {
        case nextCh <- notification: // 这里实际上是一个阻塞的等待
            // 单向channel 可能不会走到这步骤
			var ok bool
            // deltahandle 中 distribute 会将事件添加到addCh待处理事件中
            // 处理完事件会再次拿到一个事件
			notification, ok = p.pendingNotifications.ReadOne()
			if !ok { // Nothing to pop
				nextCh = nil // Disable this select case
			}
        // 处理 分发过来的事件 addCh
		case notificationToAdd, ok := <-p.addCh: // distribute分发的事件
			if !ok {
				return
			}
            // 这里代表第一次,没有任何事件时,或者上面步骤完成读取
			if notification == nil { // 就会走这里
				notification = notificationToAdd 
				nextCh = p.nextCh 
			} else { 
                // notification否则代表没有处理完,将数据再次添加到待处理中
				p.pendingNotifications.WriteOne(notificationToAdd)
			}
		}
	}
}

该消息事件的流程图为

1380340-20220525222403736-498859930.png

通过一个简单实例来学习client-go中的消息通知机制

package main

import (
	"fmt"
	"time"

	"k8s.io/utils/buffer"
)

var nextCh1 = make(chan interface{})
var addCh = make(chan interface{})
var stopper = make(chan struct{})
var notification interface{}
var pendding = *buffer.NewRingGrowing(2)

func main() {
	// pop
	go func() {
		var nextCh chan<- interface{}
		var notification interface{}
		//var n int
		for {
			fmt.Println("busy wait")
			fmt.Println("entry select", notification)
			select {
			// 初始时,一个未初始化的channel,nil,形成一个阻塞(单channel下是死锁)
			case nextCh <- notification:
				fmt.Println("entry nextCh", notification)
				var ok bool
				// 读不到数据代表已处理完,置空锁
				notification, ok = pendding.ReadOne()
				if !ok {
					fmt.Println("unactive nextch")
					nextCh = nil
				}
			// 事件的分发,监听,初始时也是一个阻塞
			case notificationToAdd, ok := <-addCh:
				fmt.Println(notificationToAdd, notification)
				if !ok {
					return
				}
				// 线程安全
				// 当消息为空时,没有被处理
				// 锁为空,就分发数据
				if notification == nil {
					fmt.Println("frist notification nil")
					notification = notificationToAdd
					nextCh = nextCh1 // 这步骤等于初始化了局部的nextCh,会触发上面的流程
				} else {
					// 在第三次时,会走到这里,数据进入环
					fmt.Println("into ring", notificationToAdd)
					pendding.WriteOne(notificationToAdd)
				}
			}
		}
	}()
	// producer
	go func() {
		i := 0
		for {
			i++
			if i%5 == 0 {
				addCh <- fmt.Sprintf("thread 2 inner -- %d", i)
				time.Sleep(time.Millisecond * 9000)
			} else {
				addCh <- fmt.Sprintf("thread 2 outer -- %d", i)
				time.Sleep(time.Millisecond * 500)
			}
		}
	}()
	// subsriber
	go func() {
		for {
			for next := range nextCh1 {
				time.Sleep(time.Millisecond * 300)
				fmt.Println("consumer", next)
			}
		}
	}()
	<-stopper
}

总结,这里的机制类似于线程安全,进入临界区的一些算法,临界区就是 nextChnotification 就是保证了至少有一个进程可以进入临界区(要么分发事件,要么生产事件);nextChnextCh1 一个是局部管道一个是全局的,管道未初始化代表了死锁(阻塞);当有消息要处理时,会将局部管道 nextCh 赋值给 全局 nextCh1 此时相当于解除了分发的步骤(对管道赋值,触发分发操作);ringbuffer 实际上是提供了一个对 notification 加锁的操作,在没有处理的消息时,需要保障 notification 为空,同时也关闭了流程 nextCh 的写入。这里主要是考虑对golang中channel的用法


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK