Istio MCP&Galley

MCP协议&Galley

[TOC]

MCP协议

  • MCP协议全称是 Mesh Configuration Protocol
  • Istio历史版本里,基于k8s来存储数据导致其与k8s耦合,限制了Istio的拓展性(非k8s机制则无法使用)。
  • MCP协议的出现即是为了解决与k8s耦合的问题。MCP定义了一套配置订阅与下发的标准协议,PilotMixer 作为MCP client,任何实现了MCP协议的Server(如Galley)通过 MCP协议Pilot下发配置。

MCP中对配置生生产者和消费者进行了如下抽象:

  • source:配置源,即IstioGalley(配置中心),对应的是Grpc中定义的 Service: ResourceSource
  • sink:配置消费者,即Istio中的PilotMixer,对应的Grpc中定义的ResourceSink
  • resource:配置资源的抽象定义。

  1. 通常情况下,source作为Grpc的server,提供ResourceSource服务;sink作为Grpc的client,提供ResourceSink服务,sink主动向source发起连接,请求资源。当有资源更新时source将信息push给sink
  2. MCP也支持一些特殊场景,Source 作为ClientSink Server建立新连接,然后SeverClient发起资源请求(实际还是Sink请求Source,只是Sink作为Server,让Source连上来)。

MCPSinkSource交互接口定义:

1
2
3
4
5
type ResourceSource_EstablishResourceStreamClient interface {
Send(*RequestResources) error
Recv() (*Resources, error)
grpc.ClientStream
}

Galley

前面介绍了 MCP 协议,接下来我们先介绍下Galley以及其工作原理,然后再以其与Pilot的交互流程讲解 MCP 的工作原理。

Galley 主要负责两方面的功能:配置验证(校验配置的正确性)以及配置管理输入、转换、分发

初始化

Galley启动入口位置:galley/cmd/galley/main.go
首先,创建server.Args,通过参数初始化之后调用server.New创建GalleyServer。然后调用其Start方法开启Galley服务。

创建Server的处理过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func New(a *settings.Args) *Server {
// 创建存活探针
liveness := components.NewProbe(&a.Liveness)
s.host.Add(liveness)
// 创建可读探针
readiness := components.NewProbe(&a.Readiness)
s.host.Add(readiness)
// 创建配置校验管理器
validation := components.NewValidation(a.KubeConfig, a.ValidationArgs, liveness.Controller(), readiness.Controller())
s.host.Add(validation)
// 创建配置处理器
if a.UseOldProcessor {
s.p = components.NewProcessing(a)
s.host.Add(s.p)
t := s.p.ConfigZTopic()
topics = append(topics, t)
} else {
s.p2 = components.NewProcessing2(a)
s.host.Add(s.p2)
t := s.p2.ConfigZTopic()
topics = append(topics, t)
}
// 创建监控接口
mon := components.NewMonitoring(a.MonitoringPort)
s.host.Add(mon)
// 与Pilot一样,ControlZ 运维接口
clz := components.NewCtrlz(a.IntrospectionOptions, topics...)
s.host.Add(clz)
}

Galley 让所有组件都实现 Component接口,其定义了Start方法和Stop方法。Server创建所有组件后执行所有组件的Start方法,当Server要退出时执行Stop方法。


配置校验器

Istio 中配置非常之多,而 Istio 配置数据的主要来源是 k8s。在 k8s 中创建各种类型的配置 CRD 资源,Istio 通过监听这种类型资源进行处理转换后下发给 Envoy。配置资源如果任意设置可能会导致意想不到的结果,而 k8s 提供了一种叫Admission WebHooks的拓展,用来支持对配置的验证工作。
工作原理如下:

  • k8s中注入用于准入的 WebHookConfiguration(配置校验服务器信息),在k8s中有配置输入时,会向配置的校验服务器发起准入请求;
  • 校验服务器会拿到请求中的资源信息,根据其配置的规则进行校验,若不通过,则返回告诉k8s拒绝,并输入具体信息;否则返回成功。

初始化

1
2
3
4
5
6
7
8
9
10
webhookServerReady := make(chan struct{})
stopCh := make(chan struct{})
if params.EnableValidation {
//创建与运行校验模块
go validation.RunValidation(webhookServerReady, stopCh, params, kubeConfig, liveness, readiness)
}
if params.EnableReconcileWebhookConfiguration {
//创建ValidatingWebhookConfiguration,并向k8s注入此WebhookConfiguration
go validation.ReconcileWebhookConfiguration(webhookServerReady, stopCh, params, kubeConfig)
}

其过程分为两步:创建并运行校验模块,当校验模块就绪后向 k8s 注入WebhookConfiguration
首先,先看校验模块的初始化过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func RunValidation(ready chan<- struct{}, stopCh chan struct{}, vc *WebhookParameters, kubeConfig string,
livenessProbeController, readinessProbeController probe.Controller) {
// 创建默认的Mixer的校验器
mixerValidator := mixervalidate.NewDefaultValidator(false)
// 创建与k8s交互的客户端
clientset, err := kube.CreateClientset(kubeConfig, "")
vc.MixerValidator = mixerValidator
// 所有Istio的模型
vc.PilotDescriptor = schemas.Istio
vc.Clientset = clientset
// 创建WebHook处理器,并对外输出http操作接口
wh, err := NewWebhook(*vc)
|- wh := &Webhook{
server: &http.Server{
Addr: fmt.Sprintf(":%v", p.Port),
},
descriptor: p.PilotDescriptor,
validator: p.MixerValidator,
clientset: p.Clientset,
|- h.HandleFunc("/admitpilot", wh.serveAdmitPilot)
|- h.HandleFunc("/admitmixer", wh.serveAdmitMixer)
// ...
go wh.Run(ready, stopCh)
|- go wh.server.ListenAndServeTLS("", "") // 监听端口
|- wh.waitForEndpointReady // 等待资源初始化完成
|- store, controller := cache.NewInformer(wh.createInformerEndpointSource(wh.clientset, wh.deploymentAndServiceNamespace, wh.serviceName)...
|- // 等待资源同步完成
|- cache.WaitForCacheSync(stopCh, controller.HasSynced)
|- //检查节点是否准备就绪:
|- ready := endpointReady(store, queue, wh.deploymentAndServiceNamespace, wh.serviceName)
|- ready <- struct{}{} //发送信号标识可以向k8s注册此准入检测点
}
func ReconcileWebhookConfiguration(webhookServerReady, stopCh <-chan struct{},
vc *WebhookParameters, kubeConfig string) {
clientset, err := kube.CreateClientset(kubeConfig, "")
// 创建 WebhookConfigController
whc, err := NewWebhookConfigController(*vc)
<-webhookServerReady // 等待检验模块就绪
whc.reconcile(stopCh)
|- whc.rebuildWebhookConfig() // 跟据配置构建Webhook配置
|- whc.createOrUpdateWebhookConfig() // 向`k8s`发起创建`webhook`

总结其过程:首先创建默认的校验器,其次创建WebHook监听端口并添加指定路径的处理函数,最后通过查询k8s查询到节点可读时告知WebhookControllerk8s注册ValidatingWebhookConfiguration

校验

接下来,继续跟进具体的校验逻辑,对于Pilot配置逻辑校验入口admitPilot;而对于Mixer配置逻辑校验入口为admitMixer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (wh *Webhook) admitPilot(request *AdmissionRequest) *AdmissionResponse {
// 只校验创建和修改操作
switch request.Operation {
case admissionv1beta1.Create, admissionv1beta1.Update:
default:
return &admissionv1beta1.AdmissionResponse{Allowed: true}
}
// 解码,并判断是否是Istio关注的类型,若不是则返回无法识别的类型
var obj crd.IstioKind
yaml.Unmarshal(request.Object.Raw, &obj)
s, exists := wh.descriptor.GetByType(crd.CamelCaseToKebabCase(obj.Kind))
// 进行类型转换
out, err := crd.ConvertObject(s, &obj, wh.domainSuffix)
// 调用Schema中配置的Validate进行实体校验
s.Validate(out.Name, out.Namespace, out.Spec)
// 对属性进行检查,查看必备字段是否无缺失
checkFields(request.Object.Raw, request.Kind.Kind, request.Namespace, obj.Name)
}

总结,Pilot的校验过程较简单,主要检查:

  • 解码是否正常;
  • 是否能转换成Istio类型;
  • 是否符合Schema中配置的Validate函数;
  • 必须属性是否无缺失

Mixer配置校验过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
switch request.Operation {
case admissionv1beta1.Create, admissionv1beta1.Update:
ev.Type = store.Update
var obj unstructured.Unstructured
// 解码
yaml.Unmarshal(request.Object.Raw, &obj)
ev.Value = mixerCrd.ToBackEndResource(&obj)
ev.Key.Name = ev.Value.Metadata.Name
// 检查必备字段是否无缺失
checkFields(request.Object.Raw, request.Kind.Kind, request.Namespace, ev.Key.Name)
case //...
}
if ev.Type == store.Update {
// 执行校验
wh.validator.Validate(ev)
|- _, ok := v.kinds[bev.Key.Kind] // 查看是否关注此类型配置
|- ev, err := store.ConvertValue(*bev, v.kinds) // 根据类型进行转换成mixer定义的对象
|- v.config.ApplyEvent([]*store.Event{&ev}) // 应用该变化
|- v.config.BuildSnapshot() // 基于现有所有配置构建一次快照,若成功代表配置无异常
}

总结其过程如下:

  • 解码,若解码异常则返回失败
  • 必备字段是否无缺失
  • 是否是Mixer关注的类型
  • 是否能转换成Mixer中定义的类型所对应的对象
  • 应用变更
  • 查看是否能成功构建快照成功(之所以这么做的原因是:Mixer中配置之间存在一定的关联性(adaptertemplate…),只校验单个实体是不够的)

配置管理

Galley中负责配置管理组件是Processing,它有两个版本ProcessingProcessing2,下面我们将主要研究Processing2的实现。

1
2
3
4
5
6
7
8
// 资源快照的缓存
// 索引方法:如果不是SyntheticServiceEntry类型的资源则用默认的快照资源,否则是的话则用为SyntheticServiceEntry单独构建的快照。
mcpCache := snapshot.New(groups.IndexFunction)
return &Processing2{
args: a,
mcpCache: mcpCache,
configzTopic: configz.CreateTopic(mcpCache),
}

processing2由以下几个组件组成:

  • Source:配置源,如k8s、fs;
  • Runtime:Galley中最复杂的模块,它将SourceTransformSnapshotterdistributor连接在一起。
    • runtime中创建带有状态机机制的组件 session 来追踪开始、结束事件处理循环
      • inactive:未激活状态;
      • starting:当会话开始时,即转变为开始状态。为了避免在启动阶段阻塞并可以正确处理所有生命周期事件,其是一个很短暂的状态。当所有数据源初始化完成时,它将会被转换成buffering状态。
      • buffering: 此阶段,所有数据源都已经初始化完成,但还没收到meshconfig时间。所有非meshconfig事件都将会被缓存,直到meshconfig到达;
      • processing: 一旦接收到meshconfig后,runtime将开始做配置转换并开始分发缓存的事件到各转换器。当接收到reset事件或者收到meshconfig的变更时,session将转换成inactive状态或者创建新的session
  • snapshot.Cache: 配置快照

processing2 初始化流程实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
func (p *Processing2) Start() (err error) {
// 创建本地文件类型的配置源
mesh = meshcfgNewFS(p.args.MeshConfigFile)
// 从本地文件`metadata.yaml`中加载所有元数据信息
m := metadata.MustGet()
// 根据元数据中转换配置以及其他配置构建类型转换器提供者逻辑,即告知输入参数类型、输出参数类型,以及转换函数实现
transformProviders := transforms.Providers(m)
var colsInSnapshots collection.Names
// 默认输入参数中,Snapshots包含Default和SyntheticServiceEntry。从元数据中获取所有在这些Snapshot中的集合
for _, c := range m.AllCollectionsInSnapshots(p.args.Snapshots) {
colsInSnapshots = append(colsInSnapshots, collection.NewName(c))
}
// 过滤出所有需要的k8s资源信息(这里有个判断,如果Galley不充当服务发现角色则过滤掉k8s内置的资源信息,如k8s的Service\Node\Endpoint等)
kubeResources := kuberesource.DisableExcludedKubeResources(m.KubeSource().Resources(), transformProviders,
colsInSnapshots, p.args.ExcludedResourceKinds, p.args.EnableServiceDiscovery)
// 创建 k8s来源的数据源,其会对k8s中所有`crd`资源进行watch
src, updater, err = p.createSourceAndStatusUpdater(kubeResources)
// 分配器
var distributor snapshotter.Distributor = snapshotter.NewMCPDistributor(p.mcpCache)
// 创建运行时,进行一些快照以及分类内容的初始化
p.runtime, err = processorInitialize(processorSettings)
|- //... 初始化快照参数(更新策略,是否防抖动)
|- procProvider := func(o processing.ProcessorOptions) event.Processor {
xforms := settings.TransformProviders.Create(o)
s, err := snapshotter.NewSnapshotter(xforms, options)
return s
}
|- processing.NewRuntime(rtOpt) //创建运行时,并向配置源注册事件处理器
|- r := &Runtime{
options: o.Clone(),
}
|- h := event.HandlerFromFn(r.handle)
|- o.Source.Dispatch(h)
// 创建 grpc sever
p.grpcServer = grpc.NewServer(grpcOptions...)
// 向指定地址服务器发起建立连接,并对对方提供配置服务(galley中特别的设计,client_source模式)
p.callOut, err = newCallout(p.args.SinkAddress, p.args.SinkAuthMode, md, options)
//创建 MCP Server Source
p.mcpSource = source.NewServer(options, serverOptions)
// 将MCP Server注册到grpc server上对外提供服务
mcp.RegisterResourceSourceServer(p.grpcServer, p.mcpSource)
//
p.runtime.Start()
|- go r.run(...) // 异步运行
|- se, done := newSession(sid, r.options) // 创建并初始化配置处理会话
|- r.session.Store(se)
|- se.start()
|- s.transitionTo(starting) //更改状态
|- go s.startSources()
|- s.options.Source.Start() //开始所有配置源组件
|- 当Source初始化成功后,会向runtime发送一个`FullSync`事件。
gs.Serve(l) // 开启grpc服务
p.callOut.run()
|- conn, err := c.pt.grpcDial(c.address, c.do...) // dial
|- client := mcp.NewResourceSinkClient(conn) // 创建Mcp sink client
|- mcpClient := c.pt.sourceNewClient(client, c.so) // 创建mcp client
|- mcpClient.Run(ctx) // 运行 client_source
|- c.sendTriggerResponse(stream) // $triggerCollection(特别记号)
}

总结其流程如下:

  1. 初始化配置源Source: meshSource(文件配置源)、apiServer.Source(k8s crd配置源);
  2. 解析配置文件中的元数据,初始化转换器提供者;
  3. 创建并初始化 Runtime
  4. 创建Callout(将galley作为client方式为目标服务提供配置服务);
  5. 创建 gRpc Service mcpSource,为配置使用方服务;
  6. 启动 Runtime,过程比较复杂,下面将详细介绍;
  7. 启动grpc提供服务;
  8. 启动callout连接到目标服务,对其提供服务。

可以看到RuntimeGalley的核心,对接各种配置源,并进行配置转换。而mcpSource是用来对配置客户端提供服务的,他们之间通过mcpCache进行衔接。Runtime经过一系列处理后构建Snapshot,然后将快照传进mcpCachemcpSourcemcpCache中获取配置信息,同时监听mcpCache的配置变更并push到客户端。

Runtime启动流程大致如下:

总结起来,Galley整体架构如下:

配置处理流程

  1. client首次与gRpcServer建立连接后,会从stream中获取其节点信息PeerInfo,同时从PeerInfo中截取AuthInfo进行校验;
  2. mcpSource调用Processtream方法对Stream进行处理:
    • 首先为每个客户端创建一个connection对象,用来保存其元信息(节点信息、订阅类型以及订阅类型的监听状态等)
    • 然后会同时监听connection的请求通道reuqestC和向其推送信息的队列queue
1
2
3
4
5
6
7
8
9
10
11
type connection struct {
peerAddr string
stream Stream
id int64
streamNonce int64 // 请求-响应记号 // 只有在server端每次发送数据时才会修改该值
requestC chan *mcp.RequestResources
watches map[string]*watch //每种类型配置的监听状态
watcher Watcher // mcpCache
queue *internal.UniqueQueue
// ...
}

初次请求处理流程如下:

变更下发流程如下:
apiServer为配置源,资源 VirtualService 为例来讲解配置下发流程:

可以看出其流程非常之长,但概括起来其过程:

  • Source变更 -> Runtime进行转换 -> Cache更新快照 -> connectin连接下发更新

附件

  1. metadata.yaml 元数据配置文件
您的支持是我创作源源不断的动力