Istio Pilot

Pilot

Pilot是Istio的控制中枢,它负责sidecar的生命周期管理并负责向Sidecar下发控制数据。

[TOC]

下面将从以下几个方面来分析Pilot:

  • 整体架构
  • 启动过程
  • Sidecar 初始拉取过程 & 信息下发过程
  • 拓展性

整体架构

Pilot 内部整体架构如下:

  • 实现 Grpc ServerEnvoy提供查询配置以及服务发现服务;
  • 支持配置控制器、服务控制器
  • 配置控制器支持聚合多种类型配置源,如 K8s、基于文件系统的内存配置源、Galley 以及其他的实现MCP协议的拓展配置中心服务;
  • 服务控制器同样支持多种类型服务注册中心,如 K8sConsul以及可以拓展MCP协议实现的注册中心服务
  • 另外,通过ControlZ服务对外暴露 Pilot 内部配置&运行时信息的查询和修改接口

启动分析

首先其入口地址为:istio/pilot/cmd/pilot-discovery
启动前,init方法预先执行,其解析出启动参数如:registries(注册中心配置,若未配置,则默认注册中心为k8s)、meshConfig(mesh的配置文件地址)、httpAddrgrpcAddr 服务器启动http、Grpc端口等。
discoveryCmd.Run方法为启动入口,首先通过bootstrap.NewServer创建Server,然后通过Start方法启动 Server
首先,看bootstrap.NewServer方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
s := &Server{
// 文件监听器组件,用于监听文件更新事件
fileWatcher: filewatcher.NewWatcher(),
}
// 创建k8s client
s.initKubeClient(&args)
// 初始化 mesh配置 根据配置地址从指定位置获取配置,并添加文件监听器监听更新
s.initMesh(&args)
// 初始化Mesh网络配置,根据配置地址从指定位置获取配置,并添加文件监听器监听更新
s.initMeshNetworks(&args)
// 初始化证书控制器
s.initCertController(&args)
// 初始化配置控制器
s.initConfigController(&args)
// 初始化服务控制器
s.initServiceControllers(&args)
// 初始化发现服务
s.initDiscoveryService(&args)
// 初始化监控服务
s.initMonitor(&args)
// 初始化集群注册器,监控远程集群并初始化多集群结构
s.initClusterRegistries(&args)

创建完DiscoveryServer后,通过调用Start方法启动各组件。
上面执行流程大致过程如下:首先创建与k8s的交互客户端,然后根据初始传入配置路径读取配置文件中mesh以及网络的配置,同时监听、维护配置文件更新;其后创建三个控制器:证书、配置、服务控制器分别管理 Secert、config、service信息;再创建发现服务:聚合上面所有控制器的能力对Sidecar提供服务。
下面将分别介绍初始化mesh配置以及网络配置过程,初始化证书、配置以及服务控制器,以及初始化发现服务。

初始化Mesh配置

initMesh方法,首先判断是否配置了args.Mesh.ConfigFile。若是,则从文件中读取配置信息,并添加文件监听器当有更新时回调更新方法(下发配置);若无,则从k8s中获取mesh配置信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// args.Mesh.ConfigFile != ""
meshConfig, err = cmd.ReadMeshConfig(args.Mesh.ConfigFile)
s.addFileWatcher(args.Mesh.ConfigFile, func() {
meshConfig, err = cmd.ReadMeshConfig(args.Mesh.ConfigFile)
if !reflect.DeepEqual(meshConfig, s.mesh) {
s.mesh = meshConfig
if s.EnvoyXdsServer != nil {
s.EnvoyXdsServer.Env.Mesh = meshConfig
//下发配置
s.EnvoyXdsServer.ConfigUpdate(&model.PushRequest{Full: true})
}
}
})
// 从k8s中获取配置信息 namespace:istio-system, name:istio
cfg, err := kube.CoreV1().ConfigMaps(namespace).Get(name, meta_v1.GetOptions{})
cfgYaml, exists := cfg.Data[ConfigMapKey]
meshConfig, err := mesh.ApplyMeshConfigDefaults(cfgYaml)

初始化配置控制器

Pilot 支持对接多配置中心,支持从多个配置中心获取配置值:

  1. 若设置了配置源mesh.ConfigSources,则初始化MCPConfigController
  2. 若设置了配置文件目录Config.FileDir,则创建一个内存配置控制器。并定时(100ms)同步指定文件目录下的配置到内存配置控制器中;
  3. 创建 k8s 配置控制器,每种Istio配置类型对应一种CRD资源,并创建每种资源的informer,构成配置更新机制。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
if len(s.mesh.ConfigSources) > 0 {
s.initMCPConfigController(args)
} else if args.Config.Controller != nil {
// 正常流程不会走到此处
s.configController = args.Config.Controller
} else if args.Config.FileDir != "" {
store := memory.Make(schemas.Istio)
configController := memory.NewController(store)
err := s.makeFileMonitor(args.Config.FileDir, configController)
s.configController = configController
} else {
cfgController, err := s.makeKubeConfigController(args)
s.configController = cfgController
}

随后添加置后 Start方法,在所有组件初始化完成执行(Pilot中所有组件都是如此)。

1
2
3
4
s.addStartFunc(func(stop <-chan struct{}) error {
go s.configController.Run(stop)
return nil
})

完成基本类型配置控制器创建后,会继续判断是否支持 Ingress 模式的配置;若是则对控制器进行包装:添加一种配置类型Ingress,并按类型进行映射:

1
2
3
4
   configController, err := configaggregate.MakeCache([]model.ConfigStoreCache{
s.configController,
ingress.NewController(s.kubeClient, s.mesh, args.Config.ControllerOptions),
})

最后,根据创建的配置控制器创建 IstioConfigStore。其作用就是配置访问层,提供具体配置查询接口,定义如下:

1
2
3
4
5
6
7
type IstioConfigStore interface {
ConfigStore
ServiceEntries() []Config
Gateways(workloadLabels labels.Collection) []Config
EnvoyFilter(workloadLabels labels.Collection) *Config
// ...
}

因为MCP协议 是Istio中最近比较火的概念,下文将详细讲解initMCPConfigController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func (s *Server) initMCPConfigController(args *PilotArgs) error {
// ...
for _, configSource := range s.mesh.ConfigSources {
if strings.Contains(configSource.Address, fsScheme+"://") {
// ... 若是,则创建文件配置控制器,上文中有提起此种类型控制器
configStores = append(configStores, configController)
}
// 根据配置创建与MCP server的grpc链接
conn, err := grpc.DialContext(
ctx,//... 配置
)
// 创建MCP控制
s.mcpController(args, conn, reporter, &clients, &configStores)
|- // 基于Grpc链接创建grpc资源客户端,其用于与MCP服务端建立Stream通道,
|- cl := mcpapi.NewResourceSourceClient(conn)
// 创建MCP客户端
|- mcpClient := sink.NewClient(cl, sinkOptions)

// !!!这里很有意思即可以 MCP服务端同时当成 注册中心使用
if resourceContains(configSource.SubscribedResources, meshconfig.Resource_SERVICE_REGISTRY) {
args.Service.Registries = []string{string(serviceregistry.MCPRegistry)}
conn, err := grpc.DialContext(
ctx,
//...配置)
conns = append(conns, conn)
// 合成服务注册中心
s.sseMCPController(args, conn, reporter, &clients, &configStores)
}
}
// 添加启动方法
s.addStartFunc(func(stop <-chan struct{}) error {
for i := range clients {
client := clients[i]
go func() {
client.Run(ctx)
}()
}
}
//将多个配置服务源进行聚合
aggregateMcpController, err := configaggregate.MakeCache(configStores)
}

继续跟进s.mcpController(args, conn, reporter, &clients, &configStores),逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (s *Server) mcpController(args *PilotArgs,
conn *grpc.ClientConn,
reporter monitoring.Reporter,
clients *[]*sink.Client,
configStores *[]model.ConfigStoreCache) {
// CoreDataModel 结构,用于配置临时存储并接收MCP Server下发的变更
mcpController := coredatamodel.NewController(s.mcpOptions)
sinkOptions := &sink.Options{
CollectionOptions: collections,
Updater: mcpController,
ID: clientNodeID,
Reporter: reporter,
}
//创建资源客户端
cl := mcpapi.NewResourceSourceClient(conn)
// 创建Sink客户端,针对MCP而设计的资源信息交互组件,后面我们将分章节讲解MCP整体交互实现。
mcpClient := sink.NewClient(cl, sinkOptions)
configz.Register(mcpClient)
*clients = append(*clients, mcpClient)
*configStores = append(*configStores, mcpController)
}

总结创建 MCPConfigController的执行过程如下:

  • 对于每种配置源,根据configSource.Address以及配置创建与MCP服务端的grpc链接以及stream client;
  • 创建 mcpController 用于接收 MCP Server 的配置变更;
  • 创建 Sink组件,维护与 MCP Server 的交互;
  • 别外,如果若配置源同时也维护 服务资源(充当注册中心的角色),同时会创建 SyntheticServiceEntryController

初始化服务控制器

Pilot同时也支持对接多个注册中心,其同时可以从多个注册中心获取服务注册信息。目前Pilot支持三种类型的注册中心:k8sconsul和基于MCP协议实现的注册中心。
初始化时,创建指定类型Registry,然后加到aggregate.Controller中,其聚合不同注册中心的数据,对sidecar服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (s *Server) initServiceControllers(args *PilotArgs) error {
for _, r := range args.Service.Registries {
switch serviceRegistry {
case serviceregistry.KubernetesRegistry:
s.createK8sServiceControllers(serviceControllers, args)
case serviceregistry.ConsulRegistry:
s.initConsulRegistry(serviceControllers, args)
case serviceregistry.MCPRegistry:
if s.mcpDiscovery != nil {
serviceControllers.AddRegistry(
aggregate.Registry{
Name: serviceregistry.MCPRegistry,
// mcpDiscovery为之前初始化配置服务时创建
ServiceDiscovery: s.mcpDiscovery,
Controller: s.mcpDiscovery,
})
}
}
}
// 将配置中心包装成注册中心,其会将配置数据转换成Sidecar需要的Sercice\Instance数据。
serviceEntryStore := external.NewServiceDiscovery(s.configController, s.istioConfigStore)
serviceEntryRegistry := aggregate.Registry{
Name: "ServiceEntries",
Controller: serviceEntryStore,
ServiceDiscovery: serviceEntryStore,
}
serviceControllers.AddRegistry(serviceEntryRegistry)
}
// 添加开启方法
s.addStartFunc(func(stop <-chan struct{}) error {
go s.ServiceController.Run(stop)
return nil
})

初始化服务发现控制器的过程主要如下:

  • 根据注册器配置列表创建相应类型服务发现管理器;
  • 将配置中心包装成发现服务;
  • 将生成的所有发现服务聚合成aggregate.Controller

再深入Registry服务逻辑,首先看其定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type Registry struct {
// 服务注册类型
Name serviceregistry.ServiceRegistry
// 集群ID,当有多个同种类型注册中心时,用ClusterID来区分它们
ClusterID string
// 控制器:提供接口让依赖方可以加入变更事件处理器
model.Controller
// 定义获取资源的接口
model.ServiceDiscovery
}
type Controller interface {
AppendServiceHandler(f func(*Service, Event)) error
AppendInstanceHandler(f func(*ServiceInstance, Event)) error
Run(stop <-chan struct{})
}
type ServiceDiscovery interface {
Services() ([]*Service, error)
GetService(hostname host.Name) (*Service, error)
InstancesByPort(svc *Service, servicePort int, labels labels.Collection) ([]*ServiceInstance, error)
// ...
}

下面将以k8s为具体发现服务来讲解其工作原理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (s *Server) createK8sServiceControllers(serviceControllers *aggregate.Controller, args *PilotArgs) (err error) {
clusterID := string(serviceregistry.KubernetesRegistry)
args.Config.ControllerOptions.ClusterID = clusterID
// 构建k8s服务发现控制器
kubectl := controller2.NewController(s.kubeClient, args.Config.ControllerOptions)
s.kubeRegistry = kubectl
serviceControllers.AddRegistry(
aggregate.Registry{
Name: serviceregistry.KubernetesRegistry,
ClusterID: clusterID,
ServiceDiscovery: kubectl,
Controller: kubectl,
})
return
}

从上文可以看出controller2.NewControllerk8sRegistry 的核心部分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func NewController(client kubernetes.Interface, options Options) *Controller {
// Queue requires a time duration for a retry delay after a handler error
out := &Controller{
domainSuffix: options.DomainSuffix,
client: client,
queue: kube.NewQueue(1 * time.Second),
ClusterID: options.ClusterID,
XDSUpdater: options.XDSUpdater,
servicesMap: make(map[host.Name]*model.Service),
externalNameSvcInstanceMap: make(map[host.Name][]*model.ServiceInstance),
}
// 基于k8s的client创建Informer(与k8s apiserver的交互组件)
sharedInformers := informers.NewSharedInformerFactoryWithOptions(client, options.ResyncPeriod, informers.WithNamespace(options.WatchedNamespace))
// 创建 Service、Endpoint、Node、Pod 的 Informer,并向每个informer中添加事件监听器,监听每种类型的变更事件。当有变更时会通知controller的queue,Queue中会定时`1s`捞事件执行回调,回调什么将从下文中介绍
svcInformer := sharedInformers.Core().V1().Services().Informer()
out.services = out.createCacheHandler(svcInformer, "Services")

epInformer := sharedInformers.Core().V1().Endpoints().Informer()
out.endpoints = out.createEDSCacheHandler(epInformer, "Endpoints")

nodeInformer := sharedInformers.Core().V1().Nodes().Informer()
out.nodes = out.createCacheHandler(nodeInformer, "Nodes")

podInformer := sharedInformers.Core().V1().Pods().Informer()
out.pods = newPodCache(out.createCacheHandler(podInformer, "Pod"), out)
return out
}
  • 首先基于 k8s client创建 ServiceEndpointNodePodk8s中的服务角色) 的 Informer(k8s为了简化各组件之间交互而构建的工具,其即具有缓存的功能同时也有查询与实时变更通知的功能)。
  • 向每种 Informer 添加变更事件回调方法,回调方法会往controllerqueue中Push变更task
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
handler := &kube.ChainHandler{Funcs: []kube.Handler{c.notify}}
informer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
// ...
c.queue.Push(kube.Task{Handler: handler.Apply, Obj: obj, Event: model.EventAdd})
},
UpdateFunc: func(old, cur interface{}) {
// ...
},
DeleteFunc: func(obj interface{}) {
// ...
},
})
handler := &kube.ChainHandler{Funcs: []kube.Handler{c.notify}}
  • queue是一个防抖动的设计,其会缓存一秒的变更对象,然后集中执行Handler方法,而Handler则是一个调用链。其允许添加多个处理器
  • 再看对Controller接口的实现方法,其实就是往queue的调用链中添加处理方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (c *Controller) AppendServiceHandler(f func(*model.Service, model.Event)) error {
c.services.handler.Append(func(obj interface{}, event model.Event) error {
svc, ok := obj.(*v1.Service)
hostname := svc.Name + "." + svc.Namespace
ports := map[string]uint32{}
portsByNum := map[uint32]string{}

for _, port := range svc.Spec.Ports {
ports[port.Name] = uint32(port.Port)
portsByNum[uint32(port.Port)] = port.Name
}

//将
svcConv := kube.ConvertService(*svc, c.domainSuffix, c.ClusterID)
instances := kube.ExternalNameServiceInstances(*svc, svcConv)
// ...
// EDS needs the port mapping.
c.XDSUpdater.SvcUpdate(c.ClusterID, hostname, ports, portsByNum)
// 回调
f(svcConv, event)

return nil
})
return nil
}

初始化发现服务

EnvoyXdsServer 的作用是聚合服务控制器、配置控制器、mesh配置、meshNetworks配置信息,为Sidecar提供服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
func (s *Server) initDiscoveryService(args *PilotArgs) error {
environment := &model.Environment{
Mesh: s.mesh,
MeshNetworks: s.meshNetworks,
IstioConfigStore: s.istioConfigStore,
ServiceDiscovery: s.ServiceController,
PushContext: model.NewPushContext(),
}
// 添加 `/v1/registration`路径处理函数,查询所有Service和Endpoint
discovery, err := envoy.NewDiscoveryService(
environment,
args.DiscoveryOptions,
)
s.mux = discovery.RestContainer.ServeMux
// 1. 聚合配置中心、注册中心,并往其中添加各种类型的事件处理器
// 2. 创建配置生成器
s.EnvoyXdsServer = envoyv2.NewDiscoveryServer(environment,
istio_networking.NewConfigGenerator(args.Plugins),
s.ServiceController, s.kubeRegistry, s.configController)
// ...
if s.kubeRegistry != nil {
s.kubeRegistry.Env = environment
s.kubeRegistry.InitNetworkLookup(s.meshNetworks)
// 设置k8s注册器的更新回调
s.kubeRegistry.XDSUpdater = s.EnvoyXdsServer
}

if s.mcpOptions != nil {
// 设置mcp配置更新回调
s.mcpOptions.XDSUpdater = s.EnvoyXdsServer
}
if s.incrementalMcpOptions != nil {
clusterID := args.Config.ControllerOptions.ClusterID
s.incrementalMcpOptions.XDSUpdater = s.EnvoyXdsServer
s.incrementalMcpOptions.ClusterID = clusterID
// 设置服务发现配置更新回调
s.discoveryOptions.XDSUpdater = s.EnvoyXdsServer
s.discoveryOptions.Env = environment
s.discoveryOptions.ClusterID = clusterID
}

s.addStartFunc(func(stop <-chan struct{}) error {
s.EnvoyXdsServer.Start(stop)
return nil
})
// 创建GrpServer,并将 EnvoyXdsServer 服务注册到GrpcServer中
s.initGrpcServer(args.KeepaliveOptions)
// 创建安全端口服务
if args.DiscoveryOptions.SecureGrpcAddr != "" {
s.initSecureGrpcServer(args.KeepaliveOptions)
}

其中,最重要的是创建EnvoyXdsServer,构建参数分别是 Environment 所有数据源、ConfigGenerator配置生成器(其作用是根据已有信息生成Sidecar需要的 ClusterRouteListener,并且会构建相应的filter链)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
s.EnvoyXdsServer = envoyv2.NewDiscoveryServer(environment,
istio_networking.NewConfigGenerator(args.Plugins),
s.ServiceController, s.kubeRegistry, s.configController)
func NewDiscoveryServer(
env *model.Environment,
generator core.ConfigGenerator,
ctl model.Controller,
kubeController *controller.Controller,
configCache model.ConfigStoreCache) *DiscoveryServer {
serviceHandler := func(svc *model.Service, _ model.Event) {
pushReq := &model.PushRequest{
Full: true,
NamespacesUpdated: map[string]struct{}{svc.Attributes.Namespace: {}},
ConfigTypesUpdated: map[string]struct{}{schemas.ServiceEntry.Type: {}},
}
out.ConfigUpdate(pushReq)
}
// 往注册中心添加Service变更处理器
ctl.AppendServiceHandler(serviceHandler)
instanceHandler := func(si *model.ServiceInstance, _ model.Event) {
out.ConfigUpdate(&model.PushRequest{
Full: true,
NamespacesUpdated: map[string]struct{}{si.Service.Attributes.Namespace: {}},
ConfigTypesUpdated: map[string]struct{}{schemas.ServiceEntry.Type: {}},
})
}
ctl.AppendInstanceHandler(instanceHandler)
configHandler := func(c model.Config, _ model.Event) {
pushReq := &model.PushRequest{
Full: true,
ConfigTypesUpdated: map[string]struct{}{c.Type: {}},
}
out.ConfigUpdate(pushReq)
}
// 往配置中心添加配置变更处理器
for _, descriptor := range schemas.Istio {
configCache.RegisterEventHandler(descriptor.Type, configHandler)
}

初始化更新下发流程

创建完 EnvoyXdsServer后,执行其Start方法,开启push通道处理流程:

1
2
3
4
5
6
7
8
func (s *DiscoveryServer) Start(stopCh <-chan struct{}) {
// 接收 配置中心、注册中心的配置变更
go s.handleUpdates(stopCh)
// 周期性刷新Metrics信息
go s.periodicRefreshMetrics(stopCh)
// 往下发送变更
go s.sendPushes(stopCh)
}

首先,先来看处理变更流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
func (s *DiscoveryServer) handleUpdates(stopCh <-chan struct{}) {
// pushChannel接收上层配置中心、注册中心的更新
debounce(s.pushChannel, stopCh, s.Push)
}
func debounce(ch chan *model.PushRequest, stopCh <-chan struct{}, pushFn func(req *model.PushRequest)) {
// ...
var req *model.PushRequest
free := true
freeCh := make(chan struct{}, 1)
push := func(req *model.PushRequest) {
pushFn(req)
freeCh <- struct{}{}
}

pushWorker := func() {
// 距离上一次开始防抖动时间开始
eventDelay := time.Since(startDebounce)
// 距离上次配置更新间隔时间
quietTime := time.Since(lastConfigUpdateTime)
// 大于 10s,100ms
if eventDelay >= DebounceMax || quietTime >= DebounceAfter {
if req != nil {
// ... 打印下发信息
free = false
go push(req)
req = nil
debouncedEvents = 0
}
} else {
timeChan = time.After(DebounceAfter - quietTime)
}
}

for {
select {
case <-freeCh:
free = true
pushWorker()
case r := <-ch:
// ... 对于EDS而言,如果未配置防抖动,且不是全局更新,则立即下发
lastConfigUpdateTime = time.Now()
// 第一次更新事件
if debouncedEvents == 0 {
timeChan = time.After(DebounceAfter)
startDebounce = lastConfigUpdateTime
}
debouncedEvents++
// 合并变更
req = req.Merge(r)
case <-timeChan:
// 时间窗口到来时,下发更新
if free {
pushWorker()
}
case <-stopCh:
return
}
}
}

处理变更处理流程是一个防抖动的设计,处理流程大致如下:

  1. 配置中心 或者 注册中心 有变更时 会发送到 pushChannel
  2. 每个抖动窗口(默认 100msDebounceAfter)第一次接收更新,设置到期 timer,后续不断接收更新,并合并更新;
  3. 当抖动窗口期到期时则调用pushWorker下发更新;
  4. 另外通过 变量freefreeCh来防止上一次没下发完后一次就开始了。

合并请求主要内容如下(push_context.go/L201):

  • 如果任意一个更新是全局 Push,则合并更新即为全局 Push
  • 如果不是全局 Push,则合并 EdsUpdates信息,标识哪些 Service 需要更新;
  • 合并需要更新的目标命名空间;
  • 合并需要更新的配置类型。

pushWorker最后调用的是 EnvoyXdsServerPush方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func (s *DiscoveryServer) Push(req *model.PushRequest) {
if !req.Full {
// 部分下发,只下发EDS
req.Push = s.globalPushContext()
go s.AdsPushAll(versionInfo(), req)
return
}

oldPushContext := s.globalPushContext()
if oldPushContext != nil {
oldPushContext.OnConfigChange()
}
t0 := time.Now()
// 创建新的PushContext,并基于已有PushContext和请求以及环境初始化新的PushContext
push := model.NewPushContext()
push.InitContext(s.Env, oldPushContext, req)

// 根据最新的push信息,更新本地缓存
if err := s.updateServiceShards(push); err != nil {
return
}

s.updateMutex.Lock()
s.Env.PushContext = push
s.updateMutex.Unlock()
// 生成版本号
versionLocal := time.Now().Format(time.RFC3339) + "/" + strconv.FormatUint(versionNum.Load(), 10)
versionNum.Inc()
initContextTime := time.Since(t0)
versionMutex.Lock()
version = versionLocal
versionMutex.Unlock()
req.Push = push
go s.AdsPushAll(versionLocal, req)
}

总结其执行流程如下:

  1. 若部分更新即只有EDS更新,则异步调用AdsPushAll下发;
  2. 若全局更新,首先创建出初始化新的Push上下文 PushContext(包含全局信息);
  3. 根据最新的Push上下文更新本地的IstioEndpoint缓存;
  4. 生成新的版本号;
  5. 最后异步执行 AdsPushAl 下发。

继续跟进AdsPushAll方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (s *DiscoveryServer) AdsPushAll(version string, req *model.PushRequest) {
if !req.Full {
// 增量eds更新下发,更新本地缓存。
s.edsIncremental(version, req.Push, req)
return
}
cMap := make(map[string]*EdsCluster, len(edsClusters))
for k, v := range edsClusters {
cMap[k] = v
}
for clusterName, edsCluster := range cMap {
// 更新本地缓存
s.updateCluster(req.Push, clusterName, edsCluster)
}
// 下发将所有与DiscoveryServer建立的链接XdsConnection和下发请求组队,放入 pushQueue。s.pushQueue.Enqueue(p, req)
s.startPush(req)
}

AdsPushAll首先会根据下发请求更新本地缓存,然后再执行 startPush 进行下发。startPush将所有与DiscoveryServer建立的链接XdsConnection 和下发请求组队,放入 pushQueue

1
2
3
4
5
6
7
8
9
func (s *DiscoveryServer) startPush(req *model.PushRequest) {
pending := []*XdsConnection{}
for _, v := range adsClients {
pending = append(pending, v)
}
req.Start = time.Now()
for _, p := range pending {
s.pushQueue.Enqueue(p, req)
}

discovery.go/doSendPushes方法会阻塞等待pushQueue的内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func doSendPushes(stopCh <-chan struct{}, semaphore chan struct{}, queue *PushQueue) {
for {
select {
case <-stopCh:
return
default:
// 控制下发的并发度
semaphore <- struct{}{}
// 阻塞等待下发
client, info := queue.Dequeue()
doneFunc := func() {
queue.MarkDone(client)
<-semaphore
}
go func() {
// ...
select {
// 向每个链接的下发通道中发送事件
case client.pushChannel <- &XdsEvent{
push: info.Push,
edsUpdatedServices: edsUpdates,
done: doneFunc,
start: info.Start,
namespacesUpdated: info.NamespacesUpdated,
configTypesUpdated: info.ConfigTypesUpdated,
noncePrefix: info.Push.Version,
}:
return
case <-client.stream.Context().Done(): // 连接断开 doneFunc()
}
}()
}
}
}

初始化集群注册器

Pilot 支持创建多集群注册中心,通过配置istio/multiCluster=true Secrets 类型,可以Pilot对接多k8s集群。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
func StartSecretController(k8s kubernetes.Interface,
// ...
namespace string) error {
stopCh := make(chan struct{})
clusterStore := newClustersStore()
controller := NewController(k8s, namespace, clusterStore, addCallback, removeCallback)
go controller.Run(stopCh)
}
func NewController(
kubeclientset kubernetes.Interface,
namespace string,
cs *ClusterStore,
addCallback addSecretCallback,
removeCallback removeSecretCallback) *Controller {

// 创建Secret的Informer,监听 带`istio/multiCluster=true`d的Secret资源
secretsInformer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(opts meta_v1.ListOptions) (runtime.Object, error) {
opts.LabelSelector = MultiClusterSecretLabel + "=true"
return kubeclientset.CoreV1().Secrets(namespace).List(opts)
},
WatchFunc: func(opts meta_v1.ListOptions) (watch.Interface, error) {
opts.LabelSelector = MultiClusterSecretLabel + "=true"
return kubeclientset.CoreV1().Secrets(namespace).Watch(opts)
},
},
&corev1.Secret{}, 0, cache.Indexers{},
)

controller := &Controller{
//...
queue: queue,
addCallback: addCallback,
removeCallback: removeCallback,
}
secretsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
queue.Add(key)
}
// ...
})

return controller
}
func (c *Controller) Run(stopCh <-chan struct{}) {
go c.informer.Run(stopCh)
wait.Until(c.runWorker, 5*time.Second, stopCh)
}
// 创建集群
func (c *Controller) addMemberCluster(secretName string, s *corev1.Secret) {
// ...
}

创建跨集群注册服务流程如下:

  1. 向当前对接k8s集群添加 带有istio/multiCluster=trueSecret资源;
  2. Pilot 获取此配置后,解析配置,并根据配置创建对应k8s集群的client,并基于此创建相应的注册器。

初始化完所有组件后,调用Server.Start方法执行之前注入的StartFunc
到此,Pilot 的初始化过程就结束了。

初始化 ControlZ

Pilot 里挺有意思的一个设计。作用是开启一个端口向外暴露运维查询界面:官方介绍
目前支持:

  • ScopeTopic: 查询和修改日志级别
  • MemTopic: 查询内存统计、强制GC
  • ProcTopic:查询进程执行情况:进程ID、协程数、hostName等
  • ArgsTopic:启动参数
  • MetricsTopic: metric信息

同时该机制支持组件拓展,比如配置服务可以通过该机制对外暴露接口查询信息。目前,McpController以及sseMcpController(组合服务MCP控制器)也是通过它来暴露查询信息接口的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (c *configzTopic) Activate(context fw.TopicContext) {
l := template.Must(context.Layout().Clone())
c.tmpl = template.Must(l.Parse(string(assets.MustAsset("templates/config.html"))))
_ = context.HTMLRouter().StrictSlash(true).NewRoute().Path("/").HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
d := c.collectData()
fw.RenderHTML(w, c.tmpl, d)
})
_ = context.JSONRouter().StrictSlash(true).NewRoute().Methods("GET").Path("/").HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
d := c.collectData()
fw.RenderJSON(w, http.StatusOK, d)
})
}

func (c *configzTopic) collectData() *data {
return &data{
ID: c.topic.ID(),
Metadata: c.topic.Metadata(), // mcp元数据
Collections: c.topic.Collections(), // mcp请求的资源集合包含版本信息
LatestRequests: c.topic.SnapshotRequestInfo(), // 最近一组已知请求结果的快照信息
}
}

CtrolZ提供 接口ctrlz.RegisterTopic(CreateTopic(topic)来拓展信息暴露

总结

总结启动过程大致如下:

  1. 根据参数创建 k8s Client;
  2. 从配置地址中获取 mesh 配置,并监听文件变更;
  3. 从配置地址获取 mesh 网络配置,并监听变更;
  4. 初始化配置控制器,支持文件配置、k8s配置、MCP协议配置服务;
  5. 初始化注册服务控制器,支持k8s、consul、实现MCP协议的拓展注册中心并且将配置控制器也包装成注册中心;
  6. 初始化发现服务,聚合之前初始化的控制器对外提供服务,包括http查询服务以及对sidecar服务的grpc服务;
  7. 初始化集群注册服务,使得pilot可以对接多k8s集群;
  8. 初始化所有组件后,调用初始化过程中添加的StartFunc开启所有组件的执行流程。

信息拉取与下发

上一节介绍过,DiscoveryServer.StreamAggregatedResources用来接收客户端请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
func (s *DiscoveryServer) StreamAggregatedResources(stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
// 从grpc stream上下文中获取 连接信
peerInfo, ok := peer.FromContext(stream.Context())
// 初始化Push上下文,若是第一次,则会根据之前构建的Enviroment属性里面的 注册服务和配置服务 的内容初始化下全局的缓存信息
err := s.globalPushContext().InitContext(s.Env, nil, nil)
// 将每个sidecar对应stream、节点地址包装成XdsConnection,其会在下发时被使用
con := newXdsConnection(peerAddr, stream)
// 开启 异步接收代理(sidecar、gateray)通过Stream发送过来的信息
go receiveThread(con, reqChannel, &receiveError)
// 处理流程的核心部分:
for {
select {
// 对于来自于 Sidecar的请求
case discReq, ok := <-reqChannel:
// 如果是第一次,代理会带上其自身节点信息 Node属性
// 使用解析出来的信息填充 XdsConnection
if discReq.Node != nil && discReq.Node.Id != "" {
err = s.initConnectionNode(discReq.Node, con)
if err != nil {
return err
}
}
switch discReq.TypeUrl {
case ClusterType:
con.CDSWatch = true
// 首先,跟据节点信息 构建envoy.Cluster信息
// 跟据Cluster构建DiscoveryResponse通过Stream返回代理
s.pushCds(con, s.globalPushContext(), versionInfo())
case ListenerType:
con.LDSWatch = true
// 构建Listener信息
// 构建DiscoveryResponse通过Stream返回代理
err := s.pushLds(con, s.globalPushContext(), versionInfo())
case RouteType:
//...
err := s.pushRoute(con, s.globalPushContext(), versionInfo())
case EndpointType:
clusters := discReq.GetResourceNames()
for _, cn := range con.Clusters {
s.removeEdsCon(cn, con.ConID)
}
for _, cn := range clusters {
s.getOrAddEdsCluster(cn, con.ConID, con)
}
err := s.pushEds(s.globalPushContext(), con, versionInfo(), nil)
}
if !con.added {
con.added = true
s.addCon(con.ConID, con)
defer s.removeCon(con.ConID, con)
}
case pushEv := <-con.pushChannel:
// 接收更新下发
err := s.pushConnection(con, pushEv)
pushEv.done()

总结其实现流程如下:

  1. 首先 StreamAggregatedResources 是代理(Envoy, 可以是SiEnvoy角色也可以是Gateway角色)的入口,接收代理的Grpc Stream
  2. 如果全局PushContext未初始化好,则进行一次初始化;
  3. 根据 Stream 信息构建 XdsConnection
  4. 构建一个协程用于接收代理的请求,当请求到达时,丢进reqChannel中;
  5. 构建一个循环,同时接收代理的请求和要下发给代理的XdsEvent
  6. 对于请求:
    • 当是首次接收代理的请求时,代理请求中会携带Node信息,Pilot会解析此信息并根据Controller查询相关信息来填充 XdsConnection 信息(代理的类型(sidecar\gateway)、ip地址、版本号、节点标签、关联的ServiceInstance);
    • 其后根据请求的TypeUrl判断请求类型进行相应处理;
    • 对于每种类型请求
      • 会将对应的XdsConnection.XXXWatch置为True,表示代理对这种类型数据敏感,当有这种类型数据变更时,下发给它;
      • 同时需要注意的是,下发的时候会带上版本号(VersionInfo);
      • 根据 XdsConnection 信息构建相应类型数据;(pushXds
      • 根据数据构建 DiscoveryResponse进行下发。
    • 最后会将XdsConnection 添加到DiscoveryServer中。
  7. 数据下发(之前提到:当信息变更时会捞出DiscoveryServer中保存的所有XdsConnection,然后往其pushChannel中发送XdsEvent
    • 数据下发接口最终会调用pushConnection方法,其处理流程大致如下:
      • 如果只是Eds信息更新,则只下发pushEds
      • 如不是,则会依次下发 CDSEDSLDSRoute

拓展性

提到Pilot的拓展性,首先想到其支持对接实现MCP协议的配置中心和注册中心。下一章将首先介绍MCP协议,然后再结合Galley实例来讲解MCP的工作原理。官方文档

您的支持是我创作源源不断的动力