kube-controller-manager

3. controller-manager

controller-manager是整个k8s的大脑,
其由kube-conrtoller-managercloud-controller-manager组成。其中clound-controller-manager用来配合云服务提供商的控制,因此本章将不对其做详细描述。
controller-manager中包含众多控制器见附件^1,通过这些控制器来完成集群管理的作用。监控集群状态,并确保集群处于预期的状态。
本节,将从如下几个方面来讲解controller-manager的工作原理:

  1. 启动过程
  2. 单次部署的执行过程
  3. pod自动横向伸缩工作原理

3.1 启动过程

controller-manager启动入口cmd/kube-controller-manager/controller-manager.go,与apiserver一样其也是基于CLI框架Cobra来实现。
其初始化过程如下:

  1. 首先创建一个带有默认参数的KubeControllerManagerOption,用于构建控制器管理器;
    • 其中带有各控制器创建的配置参数
  2. 通过cobra将输入参数应用到KubeControllerManagerOption上,运行前,会先对参数进行校验;
  3. 跟据KubeControllerManagerOptions构建创建众多控制器的配置参数kubecontrollerconfig.Config
    1. 跟据输入参数构建与apiserver交互的client
    2. 创建用于选主的通信客户端leaderElectionClient
    3. 创建一个事件记录器,用于记录事件并将事件以v1.Event发送给apiserver
    4. KubeControllerManagerOptions中的配置应用到kubecontrollerconfig.Config
  4. 开启运行控制器管理器。
    1. controller-manager创建两个http服务,分别用来做健康检查、配置查询;
    2. 若配置需要选主,则先选主(没有那么复杂, 只是先到先得的原则)。若选主成功则开始初始化其内部组件,众多控制器;
    3. 运行组件
      • 创建控制器上下文ControllerContext,实际是提前构建一些通用的组件
        1. 分别创建两个Informer工厂,shared-informers以及metadata-informers
        2. 等待apiserver就绪,最多等待10s
        3. 查询apiserver中所有可用的资源;
        4. 创建云提供商工具
          • 跟据控制器上下文来开启控制器
          1. 开启ServiceAccount控制器;
          2. app/controllermanager.go.NewControllerInitializer方法中保存了所有控制器的初始化函数,遍历并执行所有控制器的初始化函数,开启这些控制器的执行。比如DeploymentControllerReplicaSetController等;
          • 开始两个Informer工厂,开始同步并监听资源更新。

到此就完成了controller-manager的初始化过程。
下面我们以DeploymentController来讲解其初始化详细过程,DeploymentController的初始化函数是startDeploymentController

1
2
3
4
5
6
7
dc, err := deployment.NewDeploymentController(
ctx.InformerFactory.Apps().V1().Deployments(),
ctx.InformerFactory.Apps().V1().ReplicaSets(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.ClientBuilder.ClientOrDie("deployment-controller"),
)
go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)

可以看到,其关注监听DeploymentReplicaSetsPod资源。再继续查看DeploymentController的创建过程,其关注这三种资源,并且添加事件监听器,当Deployment资源更新时回调addDeplotment\updateDeployment\deleteDeployment方法,ReplicaSetsPod的更新也会回调DeploymentController的相应方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
dc := &DeploymentController{
client: client,
eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
}
dc.rsControl = controller.RealRSControl{
KubeClient: client,
Recorder: dc.eventRecorder,
}

dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dc.addDeployment,
UpdateFunc: dc.updateDeployment,
DeleteFunc: dc.deleteDeployment,
})
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dc.addReplicaSet,
UpdateFunc: dc.updateReplicaSet,
DeleteFunc: dc.deleteReplicaSet,
})
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: dc.deletePod,
})

创建完成后,调用Run方法开始运行DeploymentController,当然运行前需要这三种资源都从apiserver同步到其本地,然后开启多个协程执行worker逻辑。
workerController自身的queue中阻塞式等待数据,然后执行syncDeploymet方法,这里则是其的核心处理逻辑。

3.2 pod单次部署执行过程

单次执行Deployment进行部署时,其经历主过程如下:DeploymenetController接收创建Deployment的请求,进行解析创建ReplicaSet资源,而后ReplicaSetController接收到创建ReplicaSet的请求则创建响应的Pod资源信息。
除了简单部署之外,各控制器组件同时需要保障部署运行实情与期望一致。以下就是部署过程的简要流程图:

下面将详细介绍其执行流程:

  1. kubectl执行create deployment.yml命令后,apiserver接收请求,并将其保存到etcd中;
  2. 由于controller-manager中的DeploymentController,所以变更会推送到DeploymentControllerdeploymentInformer中,而deploymentInformer则会回调DeploymentController.addDeployment方法;
  3. addDeployment方法并不会立即处理Deployment,而是将该Deployment生成的唯一key 入队列queue(该队列支持限流);
  4. DeploymentController.processBexrWorkItem会阻塞等待队列中的资源更新(系统可以启动多个工作协程同时监听该队列,以提升吞吐),最后会调用核心方法syncDeployment进行处理;
  5. DeploymentController.syncDeployment方法不仅处理初次部署的情况,同时也负责更新以及由其发起的部署集群发生变化的处理逻辑:
    1. 通过key获取发生变更的Deployment
    2. 获取该Deployment创建的ReplicaSet,这里因为是新创建的Deployment所以查询不到ReplicaSet
    3. 获取该Deployment间接创建的所有Pod
    4. Deployment的属性DeletionTimestamp不为空时表示该Deployment被删除,则只同步状态;
    5. 检查Deployment是否被暂停或者恢复,并设置相应的状态,若被暂停则进一步进行清理;
    6. 若该Deployment是进行回滚操作,则执行rollback回滚相应逻辑;
    7. 检查Deployment是否需要进行扩缩容(通过检查其对应的ReplicaSet所有的分片数与理想状态是否一致来判断)
    8. 若是扩缩容事件,则执行同步方法sync
    9. 若不是,则是初次创建,部署时有两种策略:RecreateRollingUpdate(针对升级的情况,默认是RollingUpdate),这里我们就以RollingUpdate为例来查看其执行过程(DeploymentController.rolloutRolling):
      1. 首先会获取所有新老ReplicaSet。当不存在ReplicaSet时,则跟据Deployment属性创建一个ReplicaSet,并发送到apiserverDeploymentController.getNewReplicaSet);
      2. 更新Deployment的状态
  6. ReplicaSetController监听ReplicaSet的更新,上面DeploymentController创建ReplicaSet后,ReplicaSetController会受到通知(与DeploymentController执行方式一样,也通过queue和多个worker来处理更新)并最终调用ReplicaSetController.syncReplicaSet方法:
    1. 获取同一命名空间内的所有存活的Pod
    2. 并通过ReplicaSetSelector筛选出由其创建的Pod
    3. manageReplicas方法管理ReplicaSet,计算已经创建的Pod数和ReplicaSet的预期的Pod差值
      • Pod数不足时,则依次跟据ReplicaSet的配置PodTemplateSepc创建Pod,同时设置上PodOwnerRefernce(用于gc);
      • Pod数过多时,则并发删除多余的Pod
    4. 更新ReplicaSet的状态。

到此就完成了Pod的创建,但创建的Pod还不能工作,需要被调度到某个Node上才能运行。至于应该被调度到哪个节点上,这就涉及到调度器的实现逻辑,将在下节scheduler中介绍。

3.3 pod自动横向伸缩

k8s中负责pod自动横向伸缩的控制器叫HorizontalController。首先看其控制器初始化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
func startHPAController(ctx ControllerContext) (http.Handler, bool, error) {
// 若此项为true,则表示从组合API服务获取指标数据,而不是通过apiserver
if ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerUseRESTClients {
return startHPAControllerWithRESTClient(ctx)
}
return startHPAControllerWithLegacyClient(ctx)
}
func startHPAControllerWithRESTClient(ctx ControllerContext) (http.Handler, bool, error) {
// ...
// 创建获取指标的客户端
metricsClient := metrics.NewRESTMetricsClient(
resourceclient.NewForConfigOrDie(clientConfig),
custom_metrics.NewForConfig(clientConfig, ctx.RESTMapper, apiVersionsGetter),
external_metrics.NewForConfigOrDie(clientConfig),
)
return startHPAControllerWithMetricsClient(ctx, metricsClient)
}
func startHPAControllerWithMetricsClient(ctx ControllerContext, metricsClient metrics.MetricsClient) (http.Handler, bool, error) {
// scale类型解析器,用于给定特定资源解析出奇`Scacle`子资源类型
scaleKindResolver := scale.NewDiscoveryScaleKindResolver(hpaClient.Discovery())
scaleClient, err := scale.NewForConfig(hpaClientConfig, ctx.RESTMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
if err != nil {
return nil, false, err
}
// 创建控制器,并异步运行
go podautoscaler.NewHorizontalController(
hpaClient.CoreV1(),
scaleClient,
hpaClient.AutoscalingV1(),
ctx.RESTMapper,
metricsClient,
ctx.InformerFactory.Autoscaling().V1().HorizontalPodAutoscalers(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration,
ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerDownscaleStabilizationWindow.Duration,
ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerTolerance,
ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerCPUInitializationPeriod.Duration,
ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerInitialReadinessDelay.Duration,
).Run(ctx.Stop)
return nil, true, nil
}
func NewHorizontalController(
evtNamespacer v1core.EventsGetter,
scaleNamespacer scaleclient.ScalesGetter,
hpaNamespacer autoscalingclient.HorizontalPodAutoscalersGetter,
mapper apimeta.RESTMapper,
metricsClient metricsclient.MetricsClient,
hpaInformer autoscalinginformers.HorizontalPodAutoscalerInformer,
podInformer coreinformers.PodInformer,
resyncPeriod time.Duration,
downscaleStabilisationWindow time.Duration,
tolerance float64,
cpuInitializationPeriod,
delayOfInitialReadinessStatus time.Duration,

) *HorizontalController {
hpaController := &HorizontalController{
eventRecorder: recorder,
scaleNamespacer: scaleNamespacer,
hpaNamespacer: hpaNamespacer,
downscaleStabilisationWindow: downscaleStabilisationWindow,
queue: workqueue.NewNamedRateLimitingQueue(NewDefaultHPARateLimiter(resyncPeriod), "horizontalpodautoscaler"),
mapper: mapper,
recommendations: map[string][]timestampedRecommendation{},
}
// 监听资源更新并回调相应的处理函数
hpaInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: hpaController.enqueueHPA,
UpdateFunc: hpaController.updateHPA,
DeleteFunc: hpaController.deleteHPA,
},
resyncPeriod,
)
// 创建分片计算器,基于目标资源利用率来计算预期的分片数
replicaCalc := NewReplicaCalculator(
metricsClient,
hpaController.podLister,
tolerance,
cpuInitializationPeriod,
delayOfInitialReadinessStatus,
)
hpaController.replicaCalc = replicaCalc
return hpaController
}

HorizontalController的初始化过程流程主要做了以下几件事:

  1. 创建Metrics查询客户端;
  2. 创建分片计算器;
  3. 创建hpa资源informer,监听该资源变更并回调相应的函数enqueueHPAupdateHPAdeleteHPA

下面将来详细介绍其运行原理:
通过kubectl执行autoscale命令kubectl autoscale deployment nginx-deployment --cpu-percent=30 --min=7 --max=8后,HorizontalController会接收到HorizontalPodAutoscaler新增消息,最终会执行HorizontalController.reconcileAutoscaler方法。该方法是控制器的核心逻辑,其执行过程如下:

  1. 通过hpa的属性获取对应资源(可以是Deployment也可以是其他)的Scale中的分片数信息;
  2. 进行逻辑判断:
    1. 若当前分片数为0但最小分片数不为0,则不进行自动扩容;
    2. 若当前分片数大于hpa定义的最大分片数则将目标分片数(desiredReplicas)设置成最大分片数;
    3. 若当前分片数小于hpa定义的最小分片数则将目标分片数设置成该最小分片数;
    4. 都不满足的话则执行第三步跟据Metrics信息计算需要部署的分片数
  3. 执行HorizontalController.computeReplicasForMetrics方法计算目标分片数:
    1. 定义hpa时可以定义多个指标维度的扩缩容策略,比如cpu等。因此这里会按每个指标信息依次计算目标分片数,最后取最大值作为最终的目标值;
    2. 对于指标(MetricSpec),k8s对齐进行了分类,主要分为四类,没类的计算方式也不同:
      1. Object: 描述 k8s 对象,如hits-per-second
      2. Pods: 描述目标中每个Pod信息,如transactions-processed-per-second,而这些值在比较前会进行求平均;
      3. Resource:k8s中一个知名度量信息,在requestlimit中进行定义的资源;
      4. External:拓展指标信息,来自于k8s集群之外的信息。
        1. 下面以Resource为例,来讲解其计算过程(HorizontalController.computeStatusForResourceMetric):
      5. 最终调用的是ReplicaCalculator分片计算器的GetRawResourceReplicas方法;
      6. 首先,通过metricsClient查询每个PodMetrics信息;
      7. 进行以下条件判断和计算目标分片数
        • 首先计算当前测量的metric的平均值和目标值的比例(usageRatio);
        • 若没有未就绪的pod且当前指标大于目标值时:
          • 当差值在10%(默认可容忍值)之内,则直接返回原值;
          • 若大于10%时,则返回分片数:usageRatio乘以当前Pod数;
        • 如有Pod未收集到指标信息
          • usageRatio小于0,将未收集到的指标设置零时为目标值
          • usageRatio大于0,将未收集的指标设置零时为0
            • usageRatio大于0,将所有未就绪的Pod指标设置为0
            • 对修改过的数据重新计算usageRatio
            • 当新的usageRatio<1.1(0.1是容忍度)或者oldUageRatio<1&&newUsageRatio>1或者oldUageRatio>1&&newUsageRatio<1时,直接返回原值;
            • 否则返回newUsageRatio乘以所有pod数量(也及以newUsageRatio比例来扩容)
      8. 并不是计算出来就用这个值,还需要对伸缩速率做一次调整(normalizeDesiredReplicas):
        • 根据历史值来调整该值stabilizeRecommendation):
          • HPA中保存每次计算结果和时间戳,timestampedRecommendation
          • HPA启动时会设置一个窗口期downscaleStabilisationWindow(默认5min
          • 找到窗口期之内历史计算结果,若历史推荐分片数比当前推荐的大,则覆盖当前的
        • 速率限制convertDesiredReplicasWithRules
          • 防止扩容过快,限制最大不能超过当前实例数的两倍;
            1. 获取目标分片数时,若与原值不相等则通过Scales接口将分片数修改成目标值(实际,Scales只是一个包装,其实质是更新DeploymentReplica只)
            2. DeploymentController接收到更新后,就会引发其扩缩容操作,上一小节中已经介绍。
              到此,HPA的单次操作执行流程就介绍完了,下面以一张图来简要描述该过程:

附件

1. 控制器列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
controllers := map[string]InitFunc{}
controllers["endpoint"] = startEndpointController
controllers["endpointslice"] = startEndpointSliceController
controllers["replicationcontroller"] = startReplicationController
controllers["podgc"] = startPodGCController
controllers["resourcequota"] = startResourceQuotaController
controllers["namespace"] = startNamespaceController
controllers["serviceaccount"] = startServiceAccountController
controllers["garbagecollector"] = startGarbageCollectorController
controllers["daemonset"] = startDaemonSetController
controllers["job"] = startJobController
controllers["deployment"] = startDeploymentController
controllers["replicaset"] = startReplicaSetController
controllers["horizontalpodautoscaling"] = startHPAController
controllers["disruption"] = startDisruptionController
controllers["statefulset"] = startStatefulSetController
controllers["cronjob"] = startCronJobController
controllers["csrsigning"] = startCSRSigningController
controllers["csrapproving"] = startCSRApprovingController
controllers["csrcleaner"] = startCSRCleanerController
controllers["ttl"] = startTTLController
controllers["bootstrapsigner"] = startBootstrapSignerController
controllers["tokencleaner"] = startTokenCleanerController
controllers["nodeipam"] = startNodeIpamController
controllers["nodelifecycle"] = startNodeLifecycleController
if loopMode == IncludeCloudLoops {
controllers["service"] = startServiceController
controllers["route"] = startRouteController
controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleController
}
controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
controllers["attachdetach"] = startAttachDetachController
controllers["persistentvolume-expander"] = startVolumeExpandController
controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController
controllers["pvc-protection"] = startPVCProtectionController
controllers["pv-protection"] = startPVProtectionController
controllers["ttl-after-finished"] = startTTLAfterFinishedController
controllers["root-ca-cert-publisher"] = startRootCACertPublisher
您的支持是我创作源源不断的动力