2. Informer
[TOC]
在介绍完apiserver
之后,在介绍其他重要组件之前,本章将介绍各组件与apiserver
交互的重要工具Informer
。通过其我们可以轻松List\Get
所有资源对象,同时可以监听资源的变更事件,当变化发生时触发回调函数。
下面将从Informer
的初始化过程、更新变更两个方便来讲解其工作原理。
初始化&监听更新变更流程
首先,k8s
里 Informer
的创建需要借助于SharedInformerFactory
,其可以提供k8s
中已知所有API组版本资源的 informer
,其接口定义如下 :
1 | type SharedInformerFactory interface { |
可以看出其包含所有资源(Api组、版本)操作的接口定义。SharedInformerFactory
的默认实现如下:
1 | type sharedInformerFactory struct { |
其中client
是k8s
中所有API
资源操作(更删改查)的工具(其通过http与apiserver
通信)。
下面,就以PodInformer
为例来讲解其创建过程:
1 | sharedInformerFactory.Core().V1().Pods().Informer() |
podInformer
实际只是包装类,其包含两个方法,其中Informer()
方法用来创建最重要的ShardIndexInformer
,Lister()
用来构建pod
信息查询器:
1 | type PodInformer interface { |
下面来重点讲解Informer()
方法:
1 | func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { |
到此,即看到了pod
的shardIndexInformer
的创建过程,接下来看shardIndexInformer
的运行机制。
sharedIndexInformer
的定义如下:
1 | type sharedIndexInformer struct { |
通过执行Run
方法开启Informer
的运行:
1 | func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { |
从上面的内容可以看出,最关键的内容在Controller.Run
中:
首先创建反射器Reflector
并运行。再异步执行c.processLoop
。
1 | func (c *controller) Run(stopCh <-chan struct{}) { |
Reflector的作用是与k8s
交互,并将获取和更新数据传递到Queue
中,而Queue
获取的事件会回调sharedIndexInformer.HandleDeltas
方法。下面是 Reflector 的主要执行流程:
1 | func (r *Reflector) Run(stopCh <-chan struct{}) { |
下面将详细研究sharedIndexInformer.HandleDeltas
:
1 | func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { |
总结起来:更新流程如下:
重同步
重同步也是Informer
的关键功能之一,k8s
中众多实现基于此来做recheck
,如单次部署失败,重同步可以修复自动修复部署失败的问题;再比如HPA
模块,若单次需要扩容分片数超过自身分片数的2倍时,需要执行多次reconcileAutoscaler
方法才能到达目标状态。
在创建informer
时,会传入参数resyncPeriod
,表示重同步的间隔,每过一个这样的接个,informer
就会捞出相应的所有数据执行相应逻辑。
1 | resyncerrc := make(chan error, 1) |
可以看到会起一个 Timer
来定时重同步,重同步之前ShouldResync
筛选出需要接收本次重同步的Listener
放到syncingListeners
,同步时只会同步这些syncingListeners
。