通信组件 Informer

2. Informer

[TOC]

在介绍完apiserver之后,在介绍其他重要组件之前,本章将介绍各组件与apiserver交互的重要工具Informer。通过其我们可以轻松List\Get所有资源对象,同时可以监听资源的变更事件,当变化发生时触发回调函数。
下面将从Informer的初始化过程、更新变更两个方便来讲解其工作原理。

初始化&监听更新变更流程

首先,k8sInformer 的创建需要借助于SharedInformerFactory,其可以提供k8s中已知所有API组版本资源的 informer ,其接口定义如下 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
type SharedInformerFactory interface {
internalinterfaces.SharedInformerFactory
ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
Admissionregistration() admissionregistration.Interface
Apps() apps.Interface
Auditregistration() auditregistration.Interface
Autoscaling() autoscaling.Interface
Batch() batch.Interface
Certificates() certificates.Interface
Coordination() coordination.Interface
Core() core.Interface
Discovery() discovery.Interface
Events() events.Interface
Extensions() extensions.Interface
Flowcontrol() flowcontrol.Interface
Networking() networking.Interface
Node() node.Interface
Policy() policy.Interface
Rbac() rbac.Interface
Scheduling() scheduling.Interface
Settings() settings.Interface
Storage() storage.Interface
}

type internalinterfaces.SharedInformerFactory interface {
Start(stopCh <-chan struct{})
InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer
}

可以看出其包含所有资源(Api组、版本)操作的接口定义。SharedInformerFactory的默认实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type sharedInformerFactory struct {
client kubernetes.Interface
namespace string
tweakListOptions internalinterfaces.TweakListOptionsFunc
lock sync.Mutex
//重同步间隔
defaultResync time.Duration
// 自定义特定类型重同步时间
customResync map[reflect.Type]time.Duration
// 指定类型的Informer
informers map[reflect.Type]cache.SharedIndexInformer
// 指定类型的Informer是否开启
startedInformers map[reflect.Type]bool
}

其中clientk8s中所有API资源操作(更删改查)的工具(其通过http与apiserver通信)。
下面,就以PodInformer为例来讲解其创建过程:

1
2
3
4
sharedInformerFactory.Core().V1().Pods().Informer()
|- core.New(f, f.namespace, f.tweakListOptions)
|- v1.New(g.factory, g.namespace, g.tweakListOptions)
|- podInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}

podInformer实际只是包装类,其包含两个方法,其中Informer()方法用来创建最重要的ShardIndexInformerLister()用来构建pod信息查询器:

1
2
3
4
type PodInformer interface {
Informer() cache.SharedIndexInformer
Lister() v1.PodLister
}

下面来重点讲解Informer()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
// 创建ShardInformer
return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}
func (f *podInformer) Informer() cache.SharedIndexInformer {
// 向`ShardInformerFactory`中注册`Pod`的Informer
return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}
// 创建 SharedIndexInformer
func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{
// 处理器,维护事件变更监听
processor: &sharedProcessor{clock: realClock},
// 存储索引,支持通过多个索引函数来查询对象
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
// 查询监听器,用时实时请求apiserver查询监听数据
listerWatcher: lw,
// 资源对象类型 pod
objectType: objType,
resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
clock: realClock,
}
return sharedIndexInformer
}

到此,即看到了podshardIndexInformer的创建过程,接下来看shardIndexInformer的运行机制。

sharedIndexInformer定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type sharedIndexInformer struct {
// 存储索引,支持通过多个索引函数来查询对象
indexer Indexer
// 通用控制器
controller Controller
// 处理器,事件监听维护者
processor *sharedProcessor
// 变更探测器
cacheMutationDetector MutationDetector
// 查询监听器,用时实时请求apiserver查询监听数据
listerWatcher ListerWatcher
// 资源对象类型
objectType runtime.Object
// 重同步验证周期
resyncCheckPeriod time.Duration
defaultEventHandlerResyncPeriod time.Duration
started, stopped bool
startedLock sync.Mutex
blockDeltas sync.Mutex
}

通过执行Run方法开启Informer的运行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
// 创建 DeltaFIFO 先进先出队列,并且允许处理删除操作;其是一个生产者-消费者队列,Reflector 是生产者,调用`Pop`方法的都是消费者
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
//处理变更
Process: s.HandleDeltas,
}
// 创建控制器
s.controller = New(cfg)
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
// 运行处理器,接收变更运行处理
wg.StartWithChannel(processorStopCh, s.processor.run)
// 运行控制器
s.controller.Run(stopCh)
}
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
for _, listener := range p.listeners {
// 接收处理之后的变更通知
p.wg.Start(listener.run)
// 接收外部的变更事件,通过ringbuffer(无限大)进行缓存变更事件
p.wg.Start(listener.pop)
}
p.listenersStarted = true
}

从上面的内容可以看出,最关键的内容在Controller.Run中:
首先创建反射器Reflector并运行。再异步执行c.processLoop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (c *controller) Run(stopCh <-chan struct{}) {
// Reflector:监听资源,并应用到DeltaFIFO
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
c.reflector = r
wg.StartWithChannel(stopCh, r.Run)
wait.Until(c.processLoop, time.Second, stopCh)
}
func (c *controller) processLoop() {
for {
// 当Queue中有数据时,执行c.config.Process方法实际则是`sharedIndexInformer.HandleDeltas`
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
}
}

Reflector的作用是与k8s交互,并将获取和更新数据传递到Queue中,而Queue获取的事件会回调sharedIndexInformer.HandleDeltas方法。下面是 Reflector 的主要执行流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (r *Reflector) Run(stopCh <-chan struct{}) {
r.ListAndWatch(stopCh)
}
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
// 查询资源所有数据
list, err := r.listerWatcher.List(options)
listMetaInterface, err := meta.ListAccessor(list)
resourceVersion = listMetaInterface.GetResourceVersion()
items, err := meta.ExtractList(list)
// 同步资源到`Queue`
r.syncWith(items, resourceVersion)
r.setLastSyncResourceVersion(resourceVersion)
for {
w, err := r.listerWatcher.Watch(options)
// 监听资源更新
r.watchHandler(w, &resourceVersion, resyncerrc, stopCh)
}
}
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
for {
select {
case event, ok := <-w.ResultChan():
// ...校验
meta, err := meta.Accessor(event.Object)
newResourceVersion := meta.GetResourceVersion()
//将更新对象添加到Queue中
switch event.Type {
case watch.Added:
err := r.store.Add(event.Object)
case watch.Modified:
err := r.store.Update(event.Object)
case watch.Deleted:
err := r.store.Delete(event.Object) }
*resourceVersion = newResourceVersion
r.setLastSyncResourceVersion(newResourceVersion)
}
}
}

下面将详细研究sharedIndexInformer.HandleDeltas

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Added, Updated:
isSync := d.Type == Sync
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
//更新索引中数据
s.indexer.Update(d.Object)
// 分发更新通知,通知注册到Informer上的EventHandler
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
// 向索引中添加数据
s.indexer.Add(d.Object)
// 分发新增通知
s.processor.distribute(addNotification{newObj: d.Object}, isSync)
}
case Deleted:
// 从索引中删除数据
s.indexer.Delete(d.Object)
// 分发删除通知
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
}

总结起来:更新流程如下:

重同步

重同步也是Informer的关键功能之一,k8s中众多实现基于此来做recheck,如单次部署失败,重同步可以修复自动修复部署失败的问题;再比如HPA模块,若单次需要扩容分片数超过自身分片数的2倍时,需要执行多次reconcileAutoscaler方法才能到达目标状态。
在创建informer时,会传入参数resyncPeriod,表示重同步的间隔,每过一个这样的接个,informer就会捞出相应的所有数据执行相应逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
resyncerrc := make(chan error, 1)
cancelCh := make(chan struct{})
defer close(cancelCh)
go func() {
// 重同步Timer
resyncCh, cleanup := r.resyncChan()
for {
select {
case <-resyncCh:
if r.ShouldResync() {
r.store.Resync()
}
}

cleanup()
resyncCh, cleanup = r.resyncChan()
}
}()
func (f *DeltaFIFO) Resync() error {
keys := f.knownObjects.ListKeys()
for _, k := range keys {
f.syncKeyLocked(k)
}
}
func (p *sharedProcessor) shouldResync() bool {
p.syncingListeners = []*processorListener{}
resyncNeeded := false
now := p.clock.Now()
for _, listener := range p.listeners {
if listener.shouldResync(now) {
resyncNeeded = true
p.syncingListeners = append(p.syncingListeners, listener)
listener.determineNextResync(now)
}
}
return resyncNeeded
}

可以看到会起一个 Timer 来定时重同步,重同步之前ShouldResync筛选出需要接收本次重同步的Listener放到syncingListeners,同步时只会同步这些syncingListeners

您的支持是我创作源源不断的动力