39

Kubernetes 源码解析 - Informer

 3 years ago
source link: https://mp.weixin.qq.com/s/Jawg5ggZ4Fs2Cv-IEOtcCA
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

上篇扒了 HPA 的源码,但是没深入细节,今天往细节深入。

开局先祭出一张图:

640?wx_fmt=png

为什么要有 Informer?

Kubernetes 中的持久化数据保存在 etcd中,各个组件并不会直接访问 etcd,而是通过 api-server暴露的 RESTful 接口对集群进行访问和控制。

资源的控制器(图中右侧灰色的部分)读取数据也并不会直接从 api-server 中获取资源信息(这样会增加 api-server 的压力),而是从其“本地缓存”中读取。这个“本地缓存”只是表象的存在,加上缓存的同步逻辑就是今天要是说的Informer(灰色区域中的第一个蓝色块)所提供的功能。

从图中可以看到 Informer 的几个组件:

  • Reflector:

    api-server交互,监听资源的变更。

  • Delta FIFO Queue:

    增量的 FIFO 队列,保存 Reflector 监听到的资源变更(简单的封装)。

  • Indexer:

    Informer 的本地缓存,FIFO 队列中的数据根据不同的变更类型,在该缓存中进行操作。

    • Local Store:

上篇 提到了水平自动伸缩的控制器HorizontalController,其构造方法就需要提供 Informer

//pkg/controller/podautoscaler/horizontal.gotype HorizontalController struct {
scaleNamespacer scaleclient.ScalesGetter
hpaNamespacer autoscalingclient.HorizontalPodAutoscalersGetter
mapper apimeta.RESTMapper
replicaCalc *ReplicaCalculator
eventRecorder record.EventRecorder
downscaleStabilisationWindow time.Duration
hpaLister autoscalinglisters.HorizontalPodAutoscalerLister
hpaListerSynced cache.InformerSynced
podLister corelisters.PodLister
podListerSynced cache.InformerSynced
queue workqueue.RateLimitingInterface
recommendations map[string][]timestampedRecommendation
}func NewHorizontalController(
evtNamespacer v1core.EventsGetter,
scaleNamespacer scaleclient.ScalesGetter,
hpaNamespacer autoscalingclient.HorizontalPodAutoscalersGetter,
mapper apimeta.RESTMapper,
metricsClient metricsclient.MetricsClient, //从HorizontalPodAutoscalerInformer 获取hpa 实例信息
hpaInformer autoscalinginformers.HorizontalPodAutoscalerInformer, //从PodInformer 中获取 pod 信息
podInformer coreinformers.PodInformer,
resyncPeriod time.Duration,
downscaleStabilisationWindow time.Duration,
tolerance float64,
cpuInitializationPeriod,
delayOfInitialReadinessStatus time.Duration,

) *HorizontalController {
......
hpaInformer.Informer().AddEventHandlerWithResyncPeriod( //添加事件处理器
cache.ResourceEventHandlerFuncs{
AddFunc: hpaController.enqueueHPA,
UpdateFunc: hpaController.updateHPA,
DeleteFunc: hpaController.deleteHPA,
},
resyncPeriod,
)
......
}type HorizontalPodAutoscalerInformer interface {
Informer() cache.SharedIndexInformer
Lister() v1.HorizontalPodAutoscalerLister
}

HorizontalPodAutoscalerInformer的实例化方法中就出现了今天的正主cache.NewSharedIndexInformer()

//staging/src/k8s.io/client-go/informers/autoscaling/v1/horizontalpodautoscaler.gofunc NewFilteredHorizontalPodAutoscalerInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {    return cache.NewSharedIndexInformer(           //用于 list 和 watch api-server 中的资源。比如用来创建 Reflector
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { if tweakListOptions != nil {
tweakListOptions(&options)
} //使用 HPA API 获取 HPA资源
return client.AutoscalingV1().HorizontalPodAutoscalers(namespace).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { if tweakListOptions != nil {
tweakListOptions(&options)
} //使用 HPA API 监控 HPA资源
return client.AutoscalingV1().HorizontalPodAutoscalers(namespace).Watch(options)
},
},
&autoscalingv1.HorizontalPodAutoscaler{},
resyncPeriod,
indexers,
)
}

Informer

//staging/src/k8s.io/client-go/tools/cache/index.gotype Indexers map[string]IndexFunctype IndexFunc func(obj interface{}) ([]string, error)

实例化 Indexers cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}

//staging/src/k8s.io/client-go/tools/cache/shared_informer.go// ListerWatcher 用于 list 和watch api-server 上的资源//runtime.Object要监控的资源的运行时对象//time.Duration同步的间隔时间//Indexers 提供不同资源的索引数据的信息查询方法,如 namespace => MetaNamespaceIndexFuncfunc NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{
processor: &sharedProcessor{clock: realClock},
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), //初始化 Indexer
listerWatcher: lw,
objectType: objType,
resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
clock: realClock,
} return sharedIndexInformer
}

Indexer

Indexer提供了本地缓存的实现:计算 key 和对数据进行控制(通过调用ThreadSafeStore的接口)

type Indexer interface {
Store
Index(indexName string, obj interface{}) ([]interface{}, error)
IndexKeys(indexName, indexedValue string) ([]string, error)
ListIndexFuncValues(indexName string) []string
ByIndex(indexName, indexedValue string) ([]interface{}, error)
GetIndexers() Indexers
AddIndexers(newIndexers Indexers) error
}

Indexer 的创建

//staging/src/k8s.io/client-go/tools/cache/store.go//keyFunc:key 的生成规则//indexers:提供了索引资源的不同信息的访问方法,如用于查询命名空间的 namespace => MetaNamespaceIndexFuncfunc NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {    return &cache{
cacheStorage: NewThreadSafeStore(indexers, Indices{}),
keyFunc: keyFunc,
}
}

ThreadSafeStore

ThreadSafeStore提供了对存储的并发访问接口

注意事项:不能修改Get或List返回的任何内容,因为它不仅会破坏线程安全,还会破坏索引功能。

//staging/src/k8s.io/client-go/tools/cache/thread_safe_store.gofunc NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {    return &threadSafeMap{
items: map[string]interface{}{},
indexers: indexers,
indices: indices,
}
}type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{} //key => value
indexers Indexers //value 的信息的访问方法
indices Indices //索引}

Reflector

Reflector通过ListerWatcher(API)与api-server交互,对资源进行监控。将资源实例的创建、更新、删除等时间封装后保存在Informer的FIFO 队列中。

//staging/src/k8s.io/client-go/tools/cache/reflector.gofunc NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {    return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}// NewNamedReflector same as NewReflector, but with a specified name for loggingfunc NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
r := &Reflector{
name: name,
listerWatcher: lw,
store: store, //FIFO队列
period: time.Second,
resyncPeriod: resyncPeriod,
clock: &clock.RealClock{},
}
r.setExpectedType(expectedType) return r
}

添加同步事件监听器

通过sharedIndexInformer#AddEventHandlerWithResyncPeriod()注册事件监听器。

以前面的 HorizontalController为例,创建 informer 的时候添加了三个处理方法:AddFuncUpdateFuncDeleteFunc。这三个方法的实现是将对应的元素的 key(固定格式 namespace/name)从workequeue中进行入队、出队的操作。(资源控制器监听了该 workqueue

controller-manager

在通过InformerFactory创建Informer完成后,都会将新建的Informer加入到InformerFactory的一个map中。

controller-manager在完成所有的控制器(各种Controller,包括 CRD)后,会调用InformerFactory#Start()来启动InformerFactorymap中的所有Informer(调用Informer#Run()方法)

sharedIndexInformer#Run()

//staging/src/k8s.io/client-go/tools/cache/shared_informer.gofunc (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {    defer utilruntime.HandleCrash()      //创建一个增量的 FIFO队列:DeltaFIFO
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,

Process: s.HandleDeltas,
} //启动前的初始化,创建 Controller
func() {
s.startedLock.Lock() defer s.startedLock.Unlock()

s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
processorStopCh := make(chan struct{}) var wg wait.Group defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
wg.StartWithChannel(processorStopCh, s.processor.run) //退出时的状态清理
defer func() {
s.startedLock.Lock() defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners
}() //实行控制逻辑
s.controller.Run(stopCh)
}

controller#Run()

//staging/src/k8s.io/client-go/tools/cache/controller.gofunc (c *controller) Run(stopCh <-chan struct{}) {    defer utilruntime.HandleCrash()    go func() {
<-stopCh
c.config.Queue.Close()
}() //创建一个 Reflector,用于从 api-server list 和 watch 资源
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.clock = c.clock //为 controller 指定 Reflector
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock() var wg wait.Group defer wg.Wait() //执行Reflector#Run():会启动一个goroutine开始监控资源,将 watch 到的数据写入到queue(FIFO 队列)中
wg.StartWithChannel(stopCh, r.Run) //持续从 queue(FIFO 队列) 获取数据并进行处理,处理的逻辑在sharedIndexInformer#HandleDeltas()
wait.Until(c.processLoop, time.Second, stopCh)
}

sharedIndexInformer#HandleDeltas()

//staging/src/k8s.io/client-go/tools/cache/shared_informer.gofunc (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock() defer s.blockDeltas.Unlock() // from oldest to newest
for _, d := range obj.(Deltas) { //循环处理 FIFO 队列中取出的资源实例
switch d.Type { case Sync, Added, Updated: //同步(后面详细解读)、新增、更新事件
isSync := d.Type == Sync
s.cacheMutationDetector.AddObject(d.Object) if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { if err := s.indexer.Update(d.Object); err != nil { //如果 indexer 中已经存在,更掉用 update 方法进行更新
return err
} //更新成功后发送“更新”通知:包含了新、旧资源实例
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else { //如果 indexer 中没有该资源实例,则放入 indexer 中
if err := s.indexer.Add(d.Object); err != nil { return err
} //添加成功后,发送“新增”通知:包含了新加的资源实例
s.processor.distribute(addNotification{newObj: d.Object}, isSync)
} case Deleted: //删除事件
if err := s.indexer.Delete(d.Object); err != nil {//从 indexer 中删除
return err
} //删除成功后,发送“删除通知”:包含了删除的资源实例
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
} return nil}

Informer 的实现不算复杂,却在 Kubernetes 中很常见,每种资源的控制也都通过 Informer 来获取api-server的资源实例的变更。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK