ETCD网络通信

2. 网络通信

在上一节,我们介绍了etcd集群的启动过程。在其中也简单介绍了会初始化与其他成员节点通信的连接,以及启动为其他成员节点提供服务的GRPC服务和Http服务以及启动服务客户端的服务。本章我们将详细介绍通信相关的实现。

2.1 成员节点间通信(peer)

前一章节介绍了peer服务端启动过程。下面,我们来讲解下调用端是如何初始化以及如何调用的。
与成员节点间通信传输的初始化之前介绍过,是调用Transport.AddPeer方法。
前面我们介绍了Transport的作用:向成员节点发送raft消息,并从成员节点获取raft消息。下面将详细描述其作用。
首先来看其初始化和启动过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
tr := &rafthttp.Transport{
DialTimeout: cfg.peerDialTimeout(),
ID: id,
URLs: cfg.PeerURLs,
ClusterID: cl.ID(),
Raft: srv, // EtcdServer
Snapshotter: ss,
ServerStats: sstats,
LeaderStats: lstats,
ErrorC: srv.errorc,
}
tr.Start()
|- t.streamRt = newStreamRoundTripper(t.TLSInfo, t.DialTimeout) // 其作用前面介绍过:可以执行事务性HTTP请求,给定请求获取结果
|- t.pipelineRt = NewRoundTripper(t.TLSInfo, t.DialTimeout) //
|- t.remotes = make(map[types.ID]*remote) // 初始化复制节点映射
|- t.peers = make(map[types.ID]Peer) // 初始化成员节点映射
|- t.pipelineProber = probing.NewProber(t.pipelineRt) // Prober的作用是探测,探测链路监控状态
|- t.streamProber = probing.NewProber(t.streamRt)

接下来,继续了解AddPeer实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Transport/AddPeer
|- startPeer
|- picker := newURLPicker(urls) // 其用于选取通信通道(负载均衡作用)
|- pipeline := &pipeline{...} // 创建pipeline,多通道发送消息。
|- pipeline.start() // 开启多个异步任务,监听 msgC通道,进行数据发送
|- p := &peer{...} // 创建peer结构体,初始化其属性,如recev、propc通道、消息发送器 msgAppV2Writer、writer(StreamWriter)
|- go select {
case mm := <-p.recvc: //监听 recev通道发来的消息,并将其丢给 EtcdServer.`Process`方法处理
if err := r.Process(ctx, mm)
}
|- go select {
case mm := <-p.propc: // 监听 propc 通道发来的消息,将其丢给...
if err := r.Process(ctx, mm);
}
|- 初始化 `p.msgAppV2Reader``p.msgAppReader` 用于读取消息
|- p.msgAppV2Reader.start() // 开启监听消息读取
|- p.msgAppReader.start()
|- addPeerToProber(t.Logger, t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rttSec) // 将该节点添加到探测器中,进行定期探测状态
|- addPeerToProber(t.Logger, t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rttSec)

从上面的流程可以看出 peer使用stream模式通信方式。读写分别用不同的协程去监听处理。其中streamWriter负责消息发送,streamReader负责消息接收。见下面详情:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
w := &streamWriter{
peerID: id,
status: status,
fs: fs,
r: r,
msgc: make(chan raftpb.Message, streamBufSize),
connc: make(chan *outgoingConn),
stopc: make(chan struct{}),
done: make(chan struct{}),
}
w.run:
|- select {
case <-heartbeatc:
//发送心跳消息
case m := <-msgc:
// 接收到需要发送的消息,进行编码并使用conn进行发送
case conn := <-cw.connc:
// 监听附加上来的连接通道,进行初始化 编码器、Flusher等
}

streamWriter里比较关键的一点是,这里的conn从哪来?往cw.connc传递conn的只有peer.attachOutgoingConn方法。
而在上面初始化并启动peer的时候是没有发现调用这个方法的 ???哪是哪里进行执行的呢?通过 追踪peer.attachOutgoingConn的调用方,最终发现,其在rafthhtp/http.go/ServeHTTP() L483处调用。而这个方法会在客户端建立连接进行http服务调用的时候执行。所以,在这里调用 peer.attachOutgoingConn 的作用是复用连接。当我们分析完读处理过程后,再来整体看 etcd节点是怎么建立通信链路的。

StreamReader的处理比StreamWriter的处理要简单一些。初始化完其属性后,调用start方法启动监听读请求过程。

1
2
3
4
5
6
StreamReader/start
|- run
|- rc, err := cr.dial(t) // 进行拨号,建立起与peer之间的连接
|- err = cr.decodeLoop(rc, t) // 获取返回结果,进行解码
|- for{ m, err := dec.decode() //循环解码
|- 如果 m 是`MsgProp`类型,则将消息传递到 propc通道,否则到 recv通道

这样每个peer之间的链路就建立了。回过头来,可以看出每对peer之间至少会有两条connection。且他们之间交互交错使用。

peer往对方发送的消息,是通过 对方跟自己建立的连接 来发送的。

最后,我们通过单次请求响应过程,来介绍节点间的通信过程。

当要想集群其他成员节点发送消息时,最终会调用peer.send方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
peer.send
|- writec, name := p.pick(m) // 首先peer会根据m的类型来选择一个发送通道
|- isMsgSnap(m) return p.pipeline.msgc, pipelineMsg // 如果是快照消息,则通过pipeline.msgc来发送消息
|- isMsgApp(m) return p.msgAppV2Writer.writec(),streamAppV2 // 如果是MsgApp消息
|- 如果 p.writer.writec() 可用,return p.writer.writec()
|- 备用 p.pipeline.msgc, pipelineMsg
|- writec <- m: // 即发送数据到 写通道中,`StreamWriter` 中会监听此通道。

StreamWriter.run
|- select {
case m := <-msgc:
|- enc.encode(&m)
|- binary.Write(enc.w, binary.BigEndian, uint64(m.Size())) // 对于 messageEncoder,首先发送字节长度
|- enc.w.Write(pbutil.MustMarshal(m)) //再发送字节内容
//...
}

目标节点的 StreamReaderp.msgAppReader)接收到消息后,通过解析后传递到 EtcdServer(Process),即完成单次通信;对于请求的响应,则通过对方的peer来发送给本节点,本节点的StreamReader来接受响应消息。

2.2 客户端通信

对于客户端通信,因为是GRPC或者HTTP简单的请求响应方式,因此这里就不再介绍了。

您的支持是我创作源源不断的动力