Pilot
Pilot
是Istio的控制中枢,它负责sidecar
的生命周期管理并负责向Sidecar
下发控制数据。
[TOC]
下面将从以下几个方面来分析Pilot
:
- 整体架构
- 启动过程
Sidecar
初始拉取过程 & 信息下发过程- 拓展性
整体架构
Pilot
内部整体架构如下:
- 实现 Grpc Server 对
Envoy
提供查询配置以及服务发现服务; - 支持配置控制器、服务控制器
- 配置控制器支持聚合多种类型配置源,如
K8s
、基于文件系统的内存配置源、Galley
以及其他的实现MCP协议的拓展配置中心服务; - 服务控制器同样支持多种类型服务注册中心,如
K8s
、Consul
以及可以拓展MCP协议实现的注册中心服务 - 另外,通过
ControlZ
服务对外暴露Pilot
内部配置&运行时信息的查询和修改接口
启动分析
首先其入口地址为:istio/pilot/cmd/pilot-discovery
。
启动前,init
方法预先执行,其解析出启动参数如:registries
(注册中心配置,若未配置,则默认注册中心为k8s
)、meshConfig
(mesh的配置文件地址)、httpAddr
、grpcAddr
服务器启动http、Grpc端口等。discoveryCmd.Run
方法为启动入口,首先通过bootstrap.NewServer
创建Server,然后通过Start
方法启动 Server。
首先,看bootstrap.NewServer
方法:
1 | s := &Server{ |
创建完DiscoveryServer
后,通过调用Start
方法启动各组件。
上面执行流程大致过程如下:首先创建与k8s
的交互客户端,然后根据初始传入配置路径读取配置文件中mesh
以及网络的配置,同时监听、维护配置文件更新;其后创建三个控制器:证书、配置、服务控制器分别管理 Secert、config、service信息;再创建发现服务:聚合上面所有控制器的能力对Sidecar提供服务。
下面将分别介绍初始化mesh
配置以及网络配置过程,初始化证书、配置以及服务控制器,以及初始化发现服务。
初始化Mesh
配置
initMesh方法,首先判断是否配置了args.Mesh.ConfigFile
。若是,则从文件中读取配置信息,并添加文件监听器当有更新时回调更新方法(下发配置);若无,则从k8s
中获取mesh配置信息。
1 | // args.Mesh.ConfigFile != "" |
初始化配置控制器
Pilot 支持对接多配置中心,支持从多个配置中心获取配置值:
- 若设置了配置源
mesh.ConfigSources
,则初始化MCPConfigController
; - 若设置了配置文件目录
Config.FileDir
,则创建一个内存配置控制器。并定时(100ms
)同步指定文件目录下的配置到内存配置控制器中; - 创建 k8s 配置控制器,每种Istio配置类型对应一种
CRD
资源,并创建每种资源的informer,构成配置更新机制。
1 | if len(s.mesh.ConfigSources) > 0 { |
随后添加置后 Start方法,在所有组件初始化完成执行(Pilot中所有组件都是如此)。
1 | s.addStartFunc(func(stop <-chan struct{}) error { |
完成基本类型配置控制器创建后,会继续判断是否支持 Ingress
模式的配置;若是则对控制器进行包装:添加一种配置类型Ingress
,并按类型进行映射:
1 | configController, err := configaggregate.MakeCache([]model.ConfigStoreCache{ |
最后,根据创建的配置控制器创建 IstioConfigStore
。其作用就是配置访问层,提供具体配置查询接口,定义如下:
1 | type IstioConfigStore interface { |
因为MCP协议 是Istio
中最近比较火的概念,下文将详细讲解initMCPConfigController
:
1 | func (s *Server) initMCPConfigController(args *PilotArgs) error { |
继续跟进s.mcpController(args, conn, reporter, &clients, &configStores)
,逻辑:
1 | func (s *Server) mcpController(args *PilotArgs, |
总结创建 MCPConfigController的执行过程如下:
- 对于每种配置源,根据
configSource.Address
以及配置创建与MCP服务端的grpc链接以及stream client; - 创建
mcpController
用于接收 MCP Server 的配置变更; - 创建
Sink
组件,维护与 MCP Server 的交互; - 别外,如果若配置源同时也维护 服务资源(充当注册中心的角色),同时会创建
SyntheticServiceEntryController
。
初始化服务控制器
Pilot
同时也支持对接多个注册中心,其同时可以从多个注册中心获取服务注册信息。目前Pilot
支持三种类型的注册中心:k8s
、consul
和基于MCP协议
实现的注册中心。
初始化时,创建指定类型Registry
,然后加到aggregate.Controller
中,其聚合不同注册中心的数据,对sidecar
服务。
1 | func (s *Server) initServiceControllers(args *PilotArgs) error { |
初始化服务发现控制器的过程主要如下:
- 根据注册器配置列表创建相应类型服务发现管理器;
- 将配置中心包装成发现服务;
- 将生成的所有发现服务聚合成
aggregate.Controller
再深入Registry
服务逻辑,首先看其定义:
1 | type Registry struct { |
下面将以k8s
为具体发现服务来讲解其工作原理。
1 | func (s *Server) createK8sServiceControllers(serviceControllers *aggregate.Controller, args *PilotArgs) (err error) { |
从上文可以看出controller2.NewController
是 k8sRegistry 的核心部分。
1 | func NewController(client kubernetes.Interface, options Options) *Controller { |
- 首先基于
k8s
client创建Service
、Endpoint
、Node
、Pod
(k8s
中的服务角色) 的Informer
(k8s为了简化各组件之间交互而构建的工具,其即具有缓存的功能同时也有查询与实时变更通知的功能)。 - 向每种
Informer
添加变更事件回调方法,回调方法会往controller
的queue
中Push变更task
1 | handler := &kube.ChainHandler{Funcs: []kube.Handler{c.notify}} |
queue
是一个防抖动的设计,其会缓存一秒的变更对象,然后集中执行Handler
方法,而Handler
则是一个调用链。其允许添加多个处理器- 再看对
Controller
接口的实现方法,其实就是往queue
的调用链中添加处理方法:
1 | func (c *Controller) AppendServiceHandler(f func(*model.Service, model.Event)) error { |
初始化发现服务
EnvoyXdsServer
的作用是聚合服务控制器、配置控制器、mesh
配置、meshNetworks
配置信息,为Sidecar
提供服务。
1 | func (s *Server) initDiscoveryService(args *PilotArgs) error { |
其中,最重要的是创建EnvoyXdsServer
,构建参数分别是 Environment
所有数据源、ConfigGenerator
配置生成器(其作用是根据已有信息生成Sidecar
需要的 Cluster
、Route
、Listener
,并且会构建相应的filter链)。
1 | s.EnvoyXdsServer = envoyv2.NewDiscoveryServer(environment, |
初始化更新下发流程
创建完 EnvoyXdsServer
后,执行其Start
方法,开启push
通道处理流程:
1 | func (s *DiscoveryServer) Start(stopCh <-chan struct{}) { |
首先,先来看处理变更流程:
1 | func (s *DiscoveryServer) handleUpdates(stopCh <-chan struct{}) { |
处理变更处理流程是一个防抖动的设计,处理流程大致如下:
- 当
配置中心
或者注册中心
有变更时 会发送到 pushChannel; - 每个抖动窗口(默认
100ms
,DebounceAfter)第一次接收更新,设置到期timer
,后续不断接收更新,并合并更新; - 当抖动窗口期到期时则调用
pushWorker
下发更新; - 另外通过 变量
free
和freeCh
来防止上一次没下发完后一次就开始了。
合并请求主要内容如下(push_context.go/L201
):
- 如果任意一个更新是全局 Push,则合并更新即为全局 Push;
- 如果不是全局 Push,则合并 EdsUpdates信息,标识哪些 Service 需要更新;
- 合并需要更新的目标命名空间;
- 合并需要更新的配置类型。
pushWorker
最后调用的是 EnvoyXdsServer
的Push
方法:
1 | func (s *DiscoveryServer) Push(req *model.PushRequest) { |
总结其执行流程如下:
- 若部分更新即只有
EDS
更新,则异步调用AdsPushAll
下发; - 若全局更新,首先创建出初始化新的Push上下文
PushContext
(包含全局信息); - 根据最新的Push上下文更新本地的
IstioEndpoint
缓存; - 生成新的版本号;
- 最后异步执行
AdsPushAl
下发。
继续跟进AdsPushAll
方法中:
1 | func (s *DiscoveryServer) AdsPushAll(version string, req *model.PushRequest) { |
AdsPushAll
首先会根据下发请求更新本地缓存,然后再执行 startPush
进行下发。startPush
将所有与DiscoveryServer
建立的链接XdsConnection
和下发请求组队,放入 pushQueue
:
1 | func (s *DiscoveryServer) startPush(req *model.PushRequest) { |
discovery.go/doSendPushes
方法会阻塞等待pushQueue的内容:
1 | func doSendPushes(stopCh <-chan struct{}, semaphore chan struct{}, queue *PushQueue) { |
初始化集群注册器
Pilot
支持创建多集群注册中心,通过配置istio/multiCluster=true
Secrets 类型,可以Pilot
对接多k8s
集群。
1 | func StartSecretController(k8s kubernetes.Interface, |
创建跨集群注册服务流程如下:
- 向当前对接k8s集群添加 带有
istio/multiCluster=true
的Secret资源; - Pilot 获取此配置后,解析配置,并根据配置创建对应
k8s
集群的client
,并基于此创建相应的注册器。
初始化完所有组件后,调用Server.Start
方法执行之前注入的StartFunc
。
到此,Pilot 的初始化过程就结束了。
初始化 ControlZ
Pilot
里挺有意思的一个设计。作用是开启一个端口向外暴露运维查询界面:官方介绍。
目前支持:
- ScopeTopic: 查询和修改日志级别
- MemTopic: 查询内存统计、强制GC
- ProcTopic:查询进程执行情况:进程ID、协程数、hostName等
- ArgsTopic:启动参数
- MetricsTopic: metric信息
- …
同时该机制支持组件拓展,比如配置服务可以通过该机制对外暴露接口查询信息。目前,McpController
以及sseMcpController
(组合服务MCP控制器)也是通过它来暴露查询信息接口的:
1 | func (c *configzTopic) Activate(context fw.TopicContext) { |
CtrolZ
提供 接口ctrlz.RegisterTopic(CreateTopic(topic)
来拓展信息暴露
总结
总结启动过程大致如下:
- 根据参数创建
k8s
Client; - 从配置地址中获取 mesh 配置,并监听文件变更;
- 从配置地址获取 mesh 网络配置,并监听变更;
- 初始化配置控制器,支持文件配置、k8s配置、MCP协议配置服务;
- 初始化注册服务控制器,支持k8s、consul、实现MCP协议的拓展注册中心并且将配置控制器也包装成注册中心;
- 初始化发现服务,聚合之前初始化的控制器对外提供服务,包括http查询服务以及对
sidecar
服务的grpc服务; - 初始化集群注册服务,使得pilot可以对接多k8s集群;
- 初始化所有组件后,调用初始化过程中添加的
StartFunc
开启所有组件的执行流程。
信息拉取与下发
上一节介绍过,DiscoveryServer.StreamAggregatedResources
用来接收客户端请求。
1 | func (s *DiscoveryServer) StreamAggregatedResources(stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error { |
总结其实现流程如下:
- 首先
StreamAggregatedResources
是代理(Envoy
, 可以是SiEnvoy
角色也可以是Gateway
角色)的入口,接收代理的Grpc Stream
; - 如果全局
PushContext
未初始化好,则进行一次初始化; - 根据
Stream
信息构建XdsConnection
- 构建一个协程用于接收代理的请求,当请求到达时,丢进
reqChannel
中; - 构建一个循环,同时接收代理的请求和要下发给代理的
XdsEvent
; - 对于请求:
- 当是首次接收代理的请求时,代理请求中会携带
Node
信息,Pilot
会解析此信息并根据Controller
查询相关信息来填充XdsConnection
信息(代理的类型(sidecar\gateway
)、ip地址、版本号、节点标签、关联的ServiceInstance
); - 其后根据请求的
TypeUrl
判断请求类型进行相应处理; - 对于每种类型请求
- 会将对应的
XdsConnection.XXXWatch
置为True,表示代理对这种类型数据敏感,当有这种类型数据变更时,下发给它; - 同时需要注意的是,下发的时候会带上版本号(
VersionInfo
); - 根据
XdsConnection
信息构建相应类型数据;(pushXds
) - 根据数据构建
DiscoveryResponse
进行下发。
- 会将对应的
- 最后会将
XdsConnection
添加到DiscoveryServer
中。
- 当是首次接收代理的请求时,代理请求中会携带
- 数据下发(之前提到:当信息变更时会捞出
DiscoveryServer
中保存的所有XdsConnection
,然后往其pushChannel
中发送XdsEvent
)- 数据下发接口最终会调用
pushConnection
方法,其处理流程大致如下:- 如果只是
Eds
信息更新,则只下发pushEds
; - 如不是,则会依次下发
CDS
、EDS
、LDS
、Route
- 如果只是
- 数据下发接口最终会调用
拓展性
提到Pilot
的拓展性,首先想到其支持对接实现MCP
协议的配置中心和注册中心。下一章将首先介绍MCP
协议,然后再结合Galley
实例来讲解MCP
的工作原理。官方文档