k8s-apiserver

1. kube-apiserver(@v1.17.2

[TOC]

ApiServerK8S服务调度的核心,其同时负责与外界的交互、集群内其他组件的交互,也是K8S中唯一与持久化存储Etcd交互的组件。

下面将从以下几点来解析 ApiSever

  • 启动过程
    • Scheme构建过程
    • 路由构建过程
    • 认证、授权、准入构建过程
  • 请求过程
    • 读写请求过程
    • watch机制
  • 其他组件与其交互过程
    • informerFactory机制

1. 启动过程

1.1 启动主流程

ApiServer启动位置:cmd/kube-apiserver/apiserver.go,启动先需要先启动一个Etcd。调试启动参数如下:

–secure-port=6445 –storage-backend=etcd3 –etcd-servers=http://127.0.0.1:2379 –enable-swagger-ui=true

首先创建ApiServerCommond,然后执行命令:(整个k8s代码体系都引用CLI框架Cobra)
创建命令的过程:

  1. 创建默认服务启动参数 ServerRunOption
  2. 通过Cobra获取程序启动参数并应用到ServerRunOption

执行的过程,如下:

  1. 首先设置默认参数:apiserverAdvertiseAddress(ip),apiserverSericeIp、集群里所有ServiceIp范围、授权配置、服务账户配置、etcd缓存配置、api开关配置;
  2. 校验启动参数:如未配置Etcd服务节点信息,则返回异常,终止启动。
  3. 基于ServerRunOption运行服务(核心流程)
    1. 创建服务链:创建APiServerApiExtensionServerAggregatorServer,建立非安全服务端口(默认8080)提供服务;
    2. 预运行:(安装健康检查探针、存活探针、就绪探针。装配 OpenAPI/openapi/v2路径));
    3. 运行:运行审计后台(AuditBackend),建立安全服务端口(默认6445)监听提供服务。

因此,启动过程最重要的内容在:CreateServerChain创建服务链中。
其过程如下:

  1. 创建节点拨号器组件NodeTunneler),设置SSH等信息,对于不同的云服务提供商其区别即在于AddSSHKeyToAllInstancessshKey分发函数,将SSHkey分发到所有节点上;
  2. 创建APIServer配置CreateKubeAPIServerConfig):创建为运行APIServer所需要的资源。将ServerRunOption中配置转换成其所需要的资源。
    1. 构建apiserver配置genericapiserver.Config(legacyscheme.Codecs)
      • legacyscheme.CodecsApiServer核心设计,其用于编解码API资源,后面讲重点讲解
      • 设置BuildHandlerChainFunc,其是ApiServer的拓展机制。每次请求到达后台处理逻辑之前,会先运行处理链逻辑,目前其默认实现server.DefaultBuildHandlerChain见 ^附件1 。其内部包含认证授权审计准入等处理逻辑。
    2. 设置MergedResourceConfig,其用于表示配置哪些groupVersion被开启,哪个资源被启用和禁用;
    3. 将服务启动ServerRunOption的部分参数应用到genericapiserver.Config中;
    4. 根据ServerRunOption中的InsecureServingSecureServingAuthenticationFeaturesAPIEnablement应用到genericapiserver.Config中;
    5. 创建OpenAPI配置;
    6. 创建存储工厂storageFactory,并聚合Etcd的配置创建genericapiserver.Config需要的组件RESTOptionsGetter(其用于构建RESTStorage,用来访问存储层)
      • storageFactory的创建过程存在很多细节:
        • StorageConfig:存储配置
        • Overrides:覆盖设置,根据EtcdServersOverrides转换而得,其作用是对于某种特定资源转换到其他etcd服务上;
        • ResourceEncodingConfig:用来判断对于某种资源使用什么版本标识(存储的版本、内存中表示的版本);
        • DefaultSerializer:序列化工具legacyscheme.Codecs
        • APIResoureceConfigSource:存储哪些版本哪些资源可用或被禁用。
    7. 创建Clientset(包含请求所有资源的Client),其用来请求自己来获取各种类型资源;
    8. 创建SharedInformerFactory,其用来为所有已知api组版本的资源提供Informerk8s体系中与apiserver交互的重要组件);
    9. 构建认证器Authenticator
      • 跟据ServerRunOption创建authenticatorConfig
      • 创建ServiceAccountTokenGetter,用于获取ServiceAccountSecretsPod
      • 创建BootstrapTokenAuthenticator,;
      • 构建多个认证器,并包装成一个联合认证插件,目前k8s支持如下认证插件(authenticator/config.New):
        • basic auth
        • X509证书
        • ServiceAccount
        • 静态 Token 文件
        • 引导 Token
        • OpenID
        • WebHook
    10. 创建授权工具Authorizer,目前k8s支持6种授权模式:
      • ModeNode:显示kubelet只能访问其自身所在的Node
      • ModeRBAC:基于角色的访问控制
      • ModeABAC:基于属性(Attribute)的访问控制
      • ModeWebhook:通过webhook的访问控制
      • ModeAlwaysDeny:拒绝所有
      • ModeAlwaysAllow:允许所有
    11. 创建服务调节器ServiceResolver 给定命名空间、名称、端口返回Service的URL;
    12. 创建审计策略检测器AuditPolicyChecker,审计后台AuditBackend
    13. 创建准入控制AdmissionControl
      • k8s目前支持13种准入插件:NamespaceLifecycle,LimitRanger,ServiceAccount,TaintNodesByCondition,Priority,DefaultTolerationSeconds,DefaultStorageClass,StorageObjectInUseProtection,PersistentVolumeClaimResize,MutatingAdmissionWebhook,ValidatingAdmissionWebhook,RuntimeClass,ResourceQuota
      • 插件初始化时会用PluginInitializers来对所有的插件进行填充,这也是k8s设计比较巧妙的点;
        • 插件需要什么属性,实现相应接口即可。
    14. 解析ServiceIpRangeapiServerServiceIp
    15. 创建master.Config,并进行赋值返回。
  3. 构建配置APIExtensionsConfig,为创建ApiExtensionServer做准备,ApiExtensionServerKubeAPIServer的区别在于,ApiExtensionServerk8s的拓展服务,其负责API:CustomResourceDefinitions(简称CRD)的管理;
  4. 创建apiExtensionsServer,首先填充参数再创建:
    1. 创建GenericAPIServer
      1. 创建APIServerHandlerapiserver/pkg/server/handler.go),其是众多Hanlder的包装器和分发器
        1. 其包含属性:FullHandlerChain(过滤链)、GoRestfulContainerNOGoRestfulMuxDirector
        2. 其处理请求的顺序:FullHandlerChain -> Director -> {GoRestfulContainer,NonGoRestfulMux}
      2. 创建GenericAPIServer,填充参数
    2. 创建ApiGroupInfo,标识API组,包含组内元素的所有数据,其包含主要信息如下:
      • PrioritizedVersions:标识该API组有哪些版本;
      • VersionedResourcesStorageMap:存储API组中每个版本的每种资源的存储API;
      • Scheme:包含该组内所有类型,并且可以做不同类型之间的相互转换,同时可以转换来自组外的对象;
      • NegotiatedSerializerAPI组内数据编解码;
      • ParameterCodec:查询参数转换
    3. 填充apiGroupInfo
      • 填充VersionedResourcesStorageMap
    4. 安装APIGroupInfo
      • 安装APIResources
        • 对于每个版本,首先创建APIGroupVersion(用于将存储对象暴露为http.Handlershelper)(每个APIGroupVersion是一个WebService)
        • 调用apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer)方法安装Restful API;
        • 创建APIInstaller执行Install方法进行安装APIResource
          • 对于APIGroupVersion中每种资源执行registerResourceHandlers方法进行 最核心的转换rest.Storage-> http.Handler),详细见1.3
    5. 创建KubeApiServer,其创建过程与创建apiextensionServer类似
      1. 创建GenericAPIServer,但与上面不一样的地方是:其将apiextensionServer带入其中当nofoundHandler,当kubeApiServer不能处理请求时,会将请求转发到apiextensionServer
      2. 安装api/路径的核心资源(LeagyAPI)(endpointnodepod)。其内部安装逻辑与上面过程类似,则将不累述;
      3. 安装apis/路径其他拓展资源;
    6. 创建APIAggregator,聚合服务,其是apiserver重要拓展机制,用户可以自定义API服务,然后以APIService形式告知apiserver
      1. 同样先创建GenericAPIServer,传入KubeApiServer作为nofoundHandler
      2. 安装apiregistrationAPI组;
      3. 初始化apiserviceRegistrationController,其用于管理APIService。当有APIService更新时,其会将其转换成proxyHandler然后注册到APIAggregatorNonGoRestfulMux中,用于将请求转发到APIService中定义的自定义API服务
    7. 构建InsecureHandlerChain,并监听insecureServer端口,接收请求

总结其运行图如下:

k8s现阶段,API一共分为13个Group:Core、apps、authentication、authorization、autoscaling、batch、certificates、componentconfig、extensions、imagepolicy、policy、rbac、storage。

1.2 Scheme

Schemek8s中最重要的组件。其存储API资源信息(类型信息、版本信息等),支持API资源的序列化、反序列化、版本转化。
首先,我们将来分析其结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type Scheme struct {
// GroupVersionKind到具体反射类型的映射
gvkToType map[schema.GroupVersionKind]reflect.Type
// 反射类型到GroupVersionKind列表的映射
typeToGVK map[reflect.Type][]schema.GroupVersionKind
// 不带版本的类型
unversionedTypes map[reflect.Type]schema.GroupVersionKind
// 可以在任意组合版本中创建的类型
unversionedKinds map[string]reflect.Type
// 类型,属性标签转换函数
fieldLabelConversionFuncs map[schema.GroupVersionKind]FieldLabelConversionFunc
// 默认初始化函数
defaulterFuncs map[reflect.Type]func(interface{})
// 转换函数
converter *conversion.Converter
// 版本优先级
versionPriority map[string][]string
//记录类型注入时版本的顺序
observedVersions []schema.GroupVersion
schemeName string
}

Scheme内容的注册时机发生在创建ApiServerCommand的时候,其引入k8s.io/kubernetes/pkg/master包,而这个包中包含import_known_versions.go类,其引入众多install包:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import (
// These imports are the API groups the API server will support.
_ "k8s.io/kubernetes/pkg/apis/admission/install"
_ "k8s.io/kubernetes/pkg/apis/admissionregistration/install"
_ "k8s.io/kubernetes/pkg/apis/apps/install"
_ "k8s.io/kubernetes/pkg/apis/auditregistration/install"
_ "k8s.io/kubernetes/pkg/apis/authentication/install"
_ "k8s.io/kubernetes/pkg/apis/authorization/install"
_ "k8s.io/kubernetes/pkg/apis/autoscaling/install"
_ "k8s.io/kubernetes/pkg/apis/batch/install"
_ "k8s.io/kubernetes/pkg/apis/certificates/install"
_ "k8s.io/kubernetes/pkg/apis/coordination/install"
_ "k8s.io/kubernetes/pkg/apis/core/install"
_ "k8s.io/kubernetes/pkg/apis/discovery/install"
_ "k8s.io/kubernetes/pkg/apis/events/install"
_ "k8s.io/kubernetes/pkg/apis/extensions/install"
_ "k8s.io/kubernetes/pkg/apis/flowcontrol/install"
_ "k8s.io/kubernetes/pkg/apis/imagepolicy/install"
_ "k8s.io/kubernetes/pkg/apis/networking/install"
_ "k8s.io/kubernetes/pkg/apis/node/install"
_ "k8s.io/kubernetes/pkg/apis/policy/install"
_ "k8s.io/kubernetes/pkg/apis/rbac/install"
_ "k8s.io/kubernetes/pkg/apis/scheduling/install"
_ "k8s.io/kubernetes/pkg/apis/settings/install"
_ "k8s.io/kubernetes/pkg/apis/storage/install"
)

而,Scheme内容的注入则是在这些install中进行。下面以k8s.io/kubernetes/pkg/apis/core/install为例来介绍其注册过程:

1
2
3
4
5
6
7
8
func init() {
Install(legacyscheme.Scheme)
}
func Install(scheme *runtime.Scheme) {
utilruntime.Must(core.AddToScheme(scheme))
utilruntime.Must(v1.AddToScheme(scheme))
utilruntime.Must(scheme.SetVersionPriority(v1.SchemeGroupVersion))
}

首先,会将core包下的API资源注册到Scheme,再将v1包下的API资源注册到Scheme中,设置版本的优先级。
core包下的注册内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func addKnownTypes(scheme *runtime.Scheme) error {
// 添加converter的忽略转换类型
if err := scheme.AddIgnoredConversionType(&metav1.TypeMeta{}, &metav1.TypeMeta{}); err != nil {
return err
}
scheme.AddKnownTypes(SchemeGroupVersion,
&Pod{},
&PodList{},
&PodStatusResult{},
&PodTemplate{},
&PodTemplateList{},
&ReplicationControllerList{},
&ReplicationController{},
&ServiceList{},
&Service{},
&ServiceProxyOptions{},
&NodeList{},
&Node{},
&NodeProxyOptions{},
&Endpoints{},
&EndpointsList{},
&Binding{},
&Event{},
&EventList{},
&List{}
...
)
return nil
}

core包中关键API资源加入到Scheme中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (s *Scheme) AddKnownTypes(gv schema.GroupVersion, types ...Object) {
s.addObservedVersion(gv)
for _, obj := range types {
t := reflect.TypeOf(obj)
t = t.Elem()
s.AddKnownTypeWithName(gv.WithKind(t.Name()), obj)
}
}
func (s *Scheme) AddKnownTypeWithName(gvk schema.GroupVersionKind, obj Object) {
s.addObservedVersion(gvk.GroupVersion())
t := reflect.TypeOf(obj)
t = t.Elem()
// 设置`GroupVersionKind`到类型反射的映射
s.gvkToType[gvk] = t
for _, existingGvk := range s.typeToGVK[t] {
if existingGvk == gvk {
return
}
}
// 建立类型反射到`GroupVersionKind`列表的映射
s.typeToGVK[t] = append(s.typeToGVK[t], gvk)
}

再看v1.AddToScheme,向Scheme中添加类型初始化函数v1.addDefaultingFuncs,向Scheme中添加转换函数v1.addConversionFuncs

1.3 rest.Storage -> http.Handler

rest.Storagehttp.Handler的转换是apiServer中的重要部分。也是其抽象的精髓。位于apiserver/pkg/endpoints/installer.go/registerResourceHandlers。下面将详细阐述其实现过程:

  1. 通过SchemeGroupVersionStorage 获取类型信息,GroupVersionKind
  2. 通过SchemeGroupVersionKind创建类型实例;
  3. 判断 rest.Storage实现了哪些接口:是否有创建接口(isCreater)、是否可列举(isLister)、是否可查询(isGetter)。判断的意义在于:确定该类型可以生成的Http行为
  4. 区分是否对命名空间敏感,形成不同的资源路径;其次,跟据前面的判断结果集生成可以生成httpHandler的actions。如 LISTPOSTGETWATCH…;
  5. 针对每种action,为其创建一个route(go-restful),同时为其构建一个处理函数RouteFunction
  6. 需要特别说明的一点是:对于修改操作,同时会将Admit传入RouteFunction中,为了在操作前进行准入的控制;
  7. 最后,将每种route加入到WebService中。

单次请求过程

下面本节将以创建一个deployment为例来讲解请求执行过程:
首先通过kubectlk8s-apiserver发起创建deployment命令,kubectl create -f nginx-deployment.yaml
其执行流程如下图:

总结起来,其过程如下:

  1. 首先经过过滤链进行处理,包括认证、准入等通用的功能链;
  2. 开始依次经过aggregator、apiserver的go-restfulContainer,查看是否匹配注册的规则;
  3. 本次请求你deploymenetappsAPI组中的资源,因此分发到对应的apps/v1WebService下POSTroute进行处理;
  4. route对应function是一个包装的handlerhandlers.createHandlers,其是整个请求核心逻辑模块,首先利用解码请求体(利用Scheme保障的ParamterCocdec),再通过准入控制插件(13种)的检测以及修改,调用rest.Storage进行操作etcd写入数据。

附件

1. 处理链

1
2
3
4
5
6
7
8
9
10
11
12
13
handler := genericapifilters.WithAuthorization(apiHandler, c.Authorization.Authorizer, c.Serializer)
handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)
handler = genericapifilters.WithImpersonation(handler, c.Authorization.Authorizer, c.Serializer)
handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyChecker, c.LongRunningFunc)
failedHandler := genericapifilters.Unauthorized(c.Serializer, c.Authentication.SupportsBasicAuth)
failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyChecker)
handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences)
handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc, c.RequestTimeout)
handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
handler = genericfilters.WithPanicRecovery(handler)
return handler

拓展阅读:

您的支持是我创作源源不断的动力