Pilot
Pilot是Istio的控制中枢,它负责sidecar的生命周期管理并负责向Sidecar下发控制数据。
[TOC]
下面将从以下几个方面来分析Pilot:
- 整体架构
- 启动过程
Sidecar初始拉取过程 & 信息下发过程- 拓展性
整体架构
Pilot 内部整体架构如下:
- 实现 Grpc Server 对
Envoy提供查询配置以及服务发现服务; - 支持配置控制器、服务控制器
- 配置控制器支持聚合多种类型配置源,如
K8s、基于文件系统的内存配置源、Galley以及其他的实现MCP协议的拓展配置中心服务; - 服务控制器同样支持多种类型服务注册中心,如
K8s、Consul以及可以拓展MCP协议实现的注册中心服务 - 另外,通过
ControlZ服务对外暴露Pilot内部配置&运行时信息的查询和修改接口

启动分析
首先其入口地址为:istio/pilot/cmd/pilot-discovery。
启动前,init方法预先执行,其解析出启动参数如:registries(注册中心配置,若未配置,则默认注册中心为k8s)、meshConfig(mesh的配置文件地址)、httpAddr、grpcAddr 服务器启动http、Grpc端口等。discoveryCmd.Run方法为启动入口,首先通过bootstrap.NewServer创建Server,然后通过Start方法启动 Server。
首先,看bootstrap.NewServer方法:
1 | s := &Server{ |
创建完DiscoveryServer后,通过调用Start方法启动各组件。
上面执行流程大致过程如下:首先创建与k8s的交互客户端,然后根据初始传入配置路径读取配置文件中mesh以及网络的配置,同时监听、维护配置文件更新;其后创建三个控制器:证书、配置、服务控制器分别管理 Secert、config、service信息;再创建发现服务:聚合上面所有控制器的能力对Sidecar提供服务。
下面将分别介绍初始化mesh配置以及网络配置过程,初始化证书、配置以及服务控制器,以及初始化发现服务。
初始化Mesh配置
initMesh方法,首先判断是否配置了args.Mesh.ConfigFile。若是,则从文件中读取配置信息,并添加文件监听器当有更新时回调更新方法(下发配置);若无,则从k8s中获取mesh配置信息。
1 | // args.Mesh.ConfigFile != "" |
初始化配置控制器
Pilot 支持对接多配置中心,支持从多个配置中心获取配置值:
- 若设置了配置源
mesh.ConfigSources,则初始化MCPConfigController; - 若设置了配置文件目录
Config.FileDir,则创建一个内存配置控制器。并定时(100ms)同步指定文件目录下的配置到内存配置控制器中; - 创建 k8s 配置控制器,每种Istio配置类型对应一种
CRD资源,并创建每种资源的informer,构成配置更新机制。
1 | if len(s.mesh.ConfigSources) > 0 { |
随后添加置后 Start方法,在所有组件初始化完成执行(Pilot中所有组件都是如此)。
1 | s.addStartFunc(func(stop <-chan struct{}) error { |
完成基本类型配置控制器创建后,会继续判断是否支持 Ingress 模式的配置;若是则对控制器进行包装:添加一种配置类型Ingress,并按类型进行映射:
1 | configController, err := configaggregate.MakeCache([]model.ConfigStoreCache{ |
最后,根据创建的配置控制器创建 IstioConfigStore。其作用就是配置访问层,提供具体配置查询接口,定义如下:
1 | type IstioConfigStore interface { |
因为MCP协议 是Istio中最近比较火的概念,下文将详细讲解initMCPConfigController:
1 | func (s *Server) initMCPConfigController(args *PilotArgs) error { |
继续跟进s.mcpController(args, conn, reporter, &clients, &configStores),逻辑:
1 | func (s *Server) mcpController(args *PilotArgs, |
总结创建 MCPConfigController的执行过程如下:
- 对于每种配置源,根据
configSource.Address以及配置创建与MCP服务端的grpc链接以及stream client; - 创建
mcpController用于接收 MCP Server 的配置变更; - 创建
Sink组件,维护与 MCP Server 的交互; - 别外,如果若配置源同时也维护 服务资源(充当注册中心的角色),同时会创建
SyntheticServiceEntryController。
初始化服务控制器
Pilot同时也支持对接多个注册中心,其同时可以从多个注册中心获取服务注册信息。目前Pilot支持三种类型的注册中心:k8s、consul和基于MCP协议实现的注册中心。
初始化时,创建指定类型Registry,然后加到aggregate.Controller中,其聚合不同注册中心的数据,对sidecar服务。
1 | func (s *Server) initServiceControllers(args *PilotArgs) error { |
初始化服务发现控制器的过程主要如下:
- 根据注册器配置列表创建相应类型服务发现管理器;
- 将配置中心包装成发现服务;
- 将生成的所有发现服务聚合成
aggregate.Controller
再深入Registry服务逻辑,首先看其定义:
1 | type Registry struct { |
下面将以k8s为具体发现服务来讲解其工作原理。
1 | func (s *Server) createK8sServiceControllers(serviceControllers *aggregate.Controller, args *PilotArgs) (err error) { |
从上文可以看出controller2.NewController是 k8sRegistry 的核心部分。
1 | func NewController(client kubernetes.Interface, options Options) *Controller { |
- 首先基于
k8sclient创建Service、Endpoint、Node、Pod(k8s中的服务角色) 的Informer(k8s为了简化各组件之间交互而构建的工具,其即具有缓存的功能同时也有查询与实时变更通知的功能)。 - 向每种
Informer添加变更事件回调方法,回调方法会往controller的queue中Push变更task
1 | handler := &kube.ChainHandler{Funcs: []kube.Handler{c.notify}} |
queue是一个防抖动的设计,其会缓存一秒的变更对象,然后集中执行Handler方法,而Handler则是一个调用链。其允许添加多个处理器- 再看对
Controller接口的实现方法,其实就是往queue的调用链中添加处理方法:
1 | func (c *Controller) AppendServiceHandler(f func(*model.Service, model.Event)) error { |
初始化发现服务
EnvoyXdsServer 的作用是聚合服务控制器、配置控制器、mesh配置、meshNetworks配置信息,为Sidecar提供服务。
1 | func (s *Server) initDiscoveryService(args *PilotArgs) error { |
其中,最重要的是创建EnvoyXdsServer,构建参数分别是 Environment 所有数据源、ConfigGenerator配置生成器(其作用是根据已有信息生成Sidecar需要的 Cluster、Route、Listener,并且会构建相应的filter链)。
1 | s.EnvoyXdsServer = envoyv2.NewDiscoveryServer(environment, |
初始化更新下发流程
创建完 EnvoyXdsServer后,执行其Start方法,开启push通道处理流程:
1 | func (s *DiscoveryServer) Start(stopCh <-chan struct{}) { |
首先,先来看处理变更流程:
1 | func (s *DiscoveryServer) handleUpdates(stopCh <-chan struct{}) { |
处理变更处理流程是一个防抖动的设计,处理流程大致如下:
- 当
配置中心或者注册中心有变更时 会发送到 pushChannel; - 每个抖动窗口(默认
100ms,DebounceAfter)第一次接收更新,设置到期timer,后续不断接收更新,并合并更新; - 当抖动窗口期到期时则调用
pushWorker下发更新; - 另外通过 变量
free和freeCh来防止上一次没下发完后一次就开始了。
合并请求主要内容如下(push_context.go/L201):
- 如果任意一个更新是全局 Push,则合并更新即为全局 Push;
- 如果不是全局 Push,则合并 EdsUpdates信息,标识哪些 Service 需要更新;
- 合并需要更新的目标命名空间;
- 合并需要更新的配置类型。
pushWorker最后调用的是 EnvoyXdsServer的Push方法:
1 | func (s *DiscoveryServer) Push(req *model.PushRequest) { |
总结其执行流程如下:
- 若部分更新即只有
EDS更新,则异步调用AdsPushAll下发; - 若全局更新,首先创建出初始化新的Push上下文
PushContext(包含全局信息); - 根据最新的Push上下文更新本地的
IstioEndpoint缓存; - 生成新的版本号;
- 最后异步执行
AdsPushAl下发。
继续跟进AdsPushAll方法中:
1 | func (s *DiscoveryServer) AdsPushAll(version string, req *model.PushRequest) { |
AdsPushAll首先会根据下发请求更新本地缓存,然后再执行 startPush 进行下发。startPush将所有与DiscoveryServer建立的链接XdsConnection 和下发请求组队,放入 pushQueue:
1 | func (s *DiscoveryServer) startPush(req *model.PushRequest) { |
discovery.go/doSendPushes方法会阻塞等待pushQueue的内容:
1 | func doSendPushes(stopCh <-chan struct{}, semaphore chan struct{}, queue *PushQueue) { |
初始化集群注册器
Pilot 支持创建多集群注册中心,通过配置istio/multiCluster=true Secrets 类型,可以Pilot对接多k8s集群。
1 | func StartSecretController(k8s kubernetes.Interface, |
创建跨集群注册服务流程如下:
- 向当前对接k8s集群添加 带有
istio/multiCluster=true的Secret资源; - Pilot 获取此配置后,解析配置,并根据配置创建对应
k8s集群的client,并基于此创建相应的注册器。
初始化完所有组件后,调用Server.Start方法执行之前注入的StartFunc。
到此,Pilot 的初始化过程就结束了。
初始化 ControlZ
Pilot 里挺有意思的一个设计。作用是开启一个端口向外暴露运维查询界面:官方介绍。
目前支持:
- ScopeTopic: 查询和修改日志级别
- MemTopic: 查询内存统计、强制GC
- ProcTopic:查询进程执行情况:进程ID、协程数、hostName等
- ArgsTopic:启动参数
- MetricsTopic: metric信息
- …
同时该机制支持组件拓展,比如配置服务可以通过该机制对外暴露接口查询信息。目前,McpController以及sseMcpController(组合服务MCP控制器)也是通过它来暴露查询信息接口的:
1 | func (c *configzTopic) Activate(context fw.TopicContext) { |
CtrolZ提供 接口ctrlz.RegisterTopic(CreateTopic(topic)来拓展信息暴露
总结
总结启动过程大致如下:
- 根据参数创建
k8sClient; - 从配置地址中获取 mesh 配置,并监听文件变更;
- 从配置地址获取 mesh 网络配置,并监听变更;
- 初始化配置控制器,支持文件配置、k8s配置、MCP协议配置服务;
- 初始化注册服务控制器,支持k8s、consul、实现MCP协议的拓展注册中心并且将配置控制器也包装成注册中心;
- 初始化发现服务,聚合之前初始化的控制器对外提供服务,包括http查询服务以及对
sidecar服务的grpc服务; - 初始化集群注册服务,使得pilot可以对接多k8s集群;
- 初始化所有组件后,调用初始化过程中添加的
StartFunc开启所有组件的执行流程。
信息拉取与下发
上一节介绍过,DiscoveryServer.StreamAggregatedResources用来接收客户端请求。
1 | func (s *DiscoveryServer) StreamAggregatedResources(stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error { |
总结其实现流程如下:
- 首先
StreamAggregatedResources是代理(Envoy, 可以是SiEnvoy角色也可以是Gateway角色)的入口,接收代理的Grpc Stream; - 如果全局
PushContext未初始化好,则进行一次初始化; - 根据
Stream信息构建XdsConnection - 构建一个协程用于接收代理的请求,当请求到达时,丢进
reqChannel中; - 构建一个循环,同时接收代理的请求和要下发给代理的
XdsEvent; - 对于请求:
- 当是首次接收代理的请求时,代理请求中会携带
Node信息,Pilot会解析此信息并根据Controller查询相关信息来填充XdsConnection信息(代理的类型(sidecar\gateway)、ip地址、版本号、节点标签、关联的ServiceInstance); - 其后根据请求的
TypeUrl判断请求类型进行相应处理; - 对于每种类型请求
- 会将对应的
XdsConnection.XXXWatch置为True,表示代理对这种类型数据敏感,当有这种类型数据变更时,下发给它; - 同时需要注意的是,下发的时候会带上版本号(
VersionInfo); - 根据
XdsConnection信息构建相应类型数据;(pushXds) - 根据数据构建
DiscoveryResponse进行下发。
- 会将对应的
- 最后会将
XdsConnection添加到DiscoveryServer中。
- 当是首次接收代理的请求时,代理请求中会携带
- 数据下发(之前提到:当信息变更时会捞出
DiscoveryServer中保存的所有XdsConnection,然后往其pushChannel中发送XdsEvent)- 数据下发接口最终会调用
pushConnection方法,其处理流程大致如下:- 如果只是
Eds信息更新,则只下发pushEds; - 如不是,则会依次下发
CDS、EDS、LDS、Route
- 如果只是
- 数据下发接口最终会调用
拓展性
提到Pilot的拓展性,首先想到其支持对接实现MCP协议的配置中心和注册中心。下一章将首先介绍MCP协议,然后再结合Galley实例来讲解MCP的工作原理。官方文档