ETCD汇总

0. ETCD简介

本文将从以下几个方面来分析 ETCD (v3.3.12)。

  1. 整体架构
  2. 启动过程
  3. 数据存储
  4. 通信方式
  5. TTL实现原理
  6. Lease实现原理
  7. 单次事务过程
  8. 线性一致性读过程
  9. Watch机制

在介绍上面所有过程之前,我们先来介绍下 ETCD的整体架构以及相关名词术语。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
- Node:一个Raft状态机节点;
- Proxy:etcd的一种模式,为etcd集群提供反向代理服务;
- Member: etcd集群中的一个节点。它可以与其他节点进行交互且为客户端提供服务;
- Cluster:由多个Member组成的etcd集群;
- Peer:对处在相同集群中其他节点的称呼;
- Client: 请求客户端;
- Candidate 候选人
- Leader 领导者
- Follower 跟随者
- Term 选举任期,每次选举之后递增1
- Index:索引号,Raft中通过Term和Index来定位数据。
- Vote 选举投票(的ID)
- Commit 提交
- Propose 提议
- WAL:预写式日志
- SoftState:软状态,软状态易变且不需要保存在WAL日志中的状态数据,包括:集群leader、节点的当前状态
- HardState:硬状态,与软状态相反,需要写入持久化存储中,包括:节点当前Term、Vote、Commit
- ReadStates:用于读一致性的数据,后续会详细介绍
- Entries:在向其他集群发送消息之前需要先写入持久化存储的日志数据
- Snapshot:需要写入持久化存储中的快照数据
- CommittedEntries:需要输入到状态机中的数据,这些数据之前已经被保存到持久化存储中了
- Messages:在entries被写入持久化存储中以后,需要发送出去的数据

peer间通信消息类型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
- MsgHup            // 不用于节点间通信,仅用于发送给本节点让本节点进行选举
- MsgBeat // 心跳消息
- MsgProp // raft库使用者提议(propose)数据
- MsgApp // 用于leader向集群中其他节点同步数据的消息
- MsgAppResp // 消息同步回复
- MsgVote // 请求投票
- MsgVoteResp // 投票反馈
- MsgSnap // 用于leader向follower同步数据用的快照消息
- MsgHeartbeat // 心跳消息
- MsgHeartbeatResp // 心跳回复消息
- MsgTransferLeader // 转移主节点
- MsgReadIndex // 用于线性一致性读
- MsgReadIndexResp
- MsgPreVote // 请求预先投票
- MsgPreVoteResp

Etcd整体架构图如下:

下面将简单介绍下:

  1. etcd面向clientpeer节点开放http服务以及grpc服务,对于像watch机制就是基于grpcstream通信模式实现的;
  2. EtcdServeretcd上层结构体,其负责对外提供服务,且负责应用层的实现,比如操作应用层存储器,管理leassorwatch
  3. raftNode负责上层与raft层的衔接。其负责将应用的需求传递到raft中进行处理(通过Step函数)、在消息发送到其他节点前将消息保存到WAL中、调用传输器发送消息;
  4. raftraft协议的承载者;
  5. raftLog用于存储状态机信息:memoryStorge保存稳定的记录,unstable保存不稳定的记录。

1. Etcd初始化流程解析

Etcd的启动类为 父目录的main.go文件。其启动过程调用如下:

1
2
3
4
5
6
main.go
|- etcdmain/main.go(暂且忽略`gateway``proxy`模式启动)
|- checkSupportArch // 检查是否是支持的处理器架构
|- startEtcdOrProxyV2 // 解析参数并根据参数决定启动etcd节点还是按Proxy模式启动(这里按etcd节点形式启动)
|- 生成默认参数`newConfig()`解析参数 `cfg.parse(os.Args[1:])`解析命令行启动参数
|- startEtcd(&cfg.ec)// 启动过程最核心的地方

startEtcd中执行etcd启动的主要过程:

1
2
3
4
5
6
7
8
9
10
11
12
embed.startEtcd(inCfg *Config)
|- inCfg.Validate() //校验配置,检查url是否是以ip地址开头的,否则报错终止流程
|- configurePeerListeners(cfg) // 根据配置初始化peerListener结构体(为`peer`服务的服务器配置)
|- configureClientListeners(cfg) // 根据配置初始化clientListener结构体,(为`client`服务的服务器配置)
|- 通过判断是否有 `wal`文件来判断是否已经有其他节点信息
|- cfg.PeerURLsMapAndToken("etcd")方法,其用于解析出其他节点的信息。
|- etcd集群模式有三种启动方式,其具体实现即实现在其内部。(`后续我们将详细分析`
|- etcdserver.NewServer(srvcfg) // 执行etcdServer初始化
|- e.Server.Start() // 启动 etcdServer
|- e.servePeers() // 启动 监听etcd节点间请求的GRPC服务
|- e.serveClients() // 启动监听客户端请求的GRPC服务
|- e.serveMetrics() // 启动Metrics Http服务,供外部查询Metrics信息

通过 cfg.PeerURLsMapAndToken(“etcd”) 逻辑,了解到etcd有三种方式来获取集群中其他节点信息:

1
2
3
4
5
switch {
case cfg.Durl != "":// etcd自发现模式:配置 “discovery”参数设置。这里没有真正获取集群所有节点。比较trick的作用,等后面来处理。
case cfg.DNSCluster != "":// 通过DNS自发现模式,配置”discovery-srv“
default:// 默认的静态配置方式,通过参数 "initial-cluster"进行设置
}

准备好创建etcd节点后开始初始化节点信息:

1
2
3
4
5
6
etcdserver/NewServer(srvcfg)
|- fileutil.TouchDirAll(cfg.DataDir) //检查是否可以获取目录权限:DataDir:"集群名.etcd"
|- fileutil.TouchDirAll(cfg.SnapDir()) // 检查是佛偶可以获取快照目录权限 ”集群名.etcd/member/snap“
|- snap.New(cfg.Logger, cfg.SnapDir()) //创建快照管理器
|- be := openBackend(cfg) // 创建数据库后端,其底层使用boltDB存储数据,然后在其之上进行了一次封装:包括批量提交事务。(关于存储的内容,我们后面讲单独讲解)
|- rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.peerDialTimeout()) // 创建一个RoundTripper,其作用是封装一个具有执行一次http事务,为一个http request获取response的对象

初始化快照管理器和数据库后端后,就会根据一系列条件来决定怎样启动节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
switch {
case !haveWAL && !cfg.NewCluster:// 非新集群且也没有WAL文件
// 首先进行一些校验工作:比如判断本地集群成员列表是否与远程其他节点的成本列表配置是否相同
// 判断集群每个节点的版本是否兼容
id, n, s, w = startNode(cfg, cl, nil)
case !haveWAL && cfg.NewCluster: // 新集群,且没有WAL文件
// 此处就是接着 PeerURLsMapAndToken的处理,如果配置`Discovery`参数,etcd则进行自发现流程 `v2discovery.JoinCluster`
id, n, s, w = startNode(cfg, cl, cl.MemberIDs())
case haveWAL: // 非新集群,且有WAL文件
// 首先读取快照文件
// 从快照文件中恢复数据库后端
snapshot, err = ss.Load()
// 调用restartNode方法重启节点
id, cl, n, s, w = restartNode(cfg, snapshot)
// 其再从wal内进一步恢复内容
default: //异常情况
}

创建完raft.Node并绑定相应raft后,继续初始化:

1
2
3
4
5
6
7
8
9
10
11
12
|- stats.NewServerStats // 初始化统计计数
|- stats.NewLeaderStats
|- heartbeat := time.Duration(cfg.TickMs) * time.Millisecond // 初始化心跳参数
|- 创建 `EtcdServer`:同时创建 `RaftNode`、初始化ID生成器
|- newRaftNode
|- lease.NewLessor //创建或恢复租约管理器
|- srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, &srv.consistIndex) // 创建KV存储器,其封装了backend、TreeIndex、Lessor,另外其内部也运行着 Watch 机制。(后续独立章节详细解析)
|- srv.applyV3Base = srv.newApplierV3Backend() // 创建 ApplierV3 接口实现,其用于处理v3 raft消息,为应用层的操作。
|- restoreAlarms // 恢复报警,如要是节点存储的报警:如空间不足、崩溃时替换`applyV3`做配置限制或拒绝请求
|- tr := &rafthttp.Transport{...} // 创建Transporter,其用于向其他节点发送raft消息,并从其他节点获取raft消息。
|- 每个remotes执行:tr.AddRemote(m.ID, m.PeerURLs) // 初始化与每个Learner的通信
|- 每个Members执行:tr.AddPeer(m.ID, m.PeerURLs) // 初始化与每个Member节点的通信

下面来看raftNode的详细创建过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
newRaftNode(
raftNodeConfig{
Node: n,
heartbeat: heartbeat,
raftStorage: s,
storage: NewStorage(w, ss),
},
)
func newRaftNode(cfg raftNodeConfig) *raftNode {
r := &raftNode{
lg: cfg.lg,
tickMu: new(sync.Mutex),
raftNodeConfig: cfg,
td: contention.NewTimeoutDetector(2 * cfg.heartbeat),
readStateC: make(chan raft.ReadState, 1),
msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
applyc: make(chan apply),
stopped: make(chan struct{}),
done: make(chan struct{}),
}
// 开始心跳 Ticker
r.ticker = time.NewTicker(r.heartbeat)
return r
}

可以看出raftNoderaft.node之间的关系。通过raftNode可以直接访问raft.node的所有公有方法。

回到 startNode 方法,我们以新集群且没有WAL文件的场景来了解下startNode的处理过程:

1
2
3
4
5
6
7
8
9
10
11
etcdserver/raft.go/startNode
|- wal.Create() // 创建 WAL文件
|- raft.NewMemoryStorage() // 创建raft的数据存储器,这里为内存储存
|- n = raft/node.go/StartNode(c, peers) // 开始节点
|- r := newRaft(c) // 为当前节点创建raft对象
|- r.becomeFollower(1, None) //初始化当前节点成为 follower角色
|- 对每个节点,追加一条`ConfChangeAddNode`配置更改记录到 raftLog中
|- r.raftLog.committed = r.raftLog.lastIndex() // 更新raftLog的提交索引
|- for each peer r.addNode(peer.ID) //为每个peer在Raft中创建一个Progress结构体(没有创建时,若创建了则设置状态)来,来保存该peer的数据复制状态
|- 创建一个raft/node节点 // 创建node结构体,初始化各种通道
|- 异步执行 node.run(r *raft)方法// 其内部主要执行事项包括:轮训raft内需要周知其他节点的信息进行发送,监听node的各种通道(请求和响应)作相应处理,和任务超时通知通道

下面我们来详细了解下newRaft的内容:

1
2
3
4
5
6
7
8
newRaft(c *Config) *raft
|- c.validate() // 相关参数的校验,比如选举超时时间设置必须大于心跳超时时间设置
|- raftlog := newLogWithSize(c.Storage, c.Logger, c.MaxCommittedSizePerReady) // 创建raftLog结构体,其用于保存raft状态机信息,比如当前节点事务提交的最大log位置、已经应用到应用的最大索引位置、所有未提交不稳定的记录
|- `创建raft结构体`
|- 对于每个决策节点 `peers`,设置复制初始`Next`位置、复制滑动串口器
|- 对于非决策节点 `learnerPrs`,也进行初始化
|- r.becomeFollower(r.Term, None) // 将自身设置为 Follower角色,对于一个刚启动的节点,这里Term为0
|-

接下来,再来看 becomeFollower 方法,其设置了 step 方法和 tick 方法、设置了 raft所在任期以及raft的角色状态。我们都知道raft协议中共有三个角色FollowerCandidatesLeaderetcd中通过不同角色设置不同的step来区分开每个角色的处理逻辑,设置不同tick方法来设置超时任务(对于Follower角色,其超时后会发起新一轮选举,而对于Leader角色,则广播一次心跳消息… )

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (r *raft) becomeFollower(term uint64, lead uint64) {
r.step = stepFollower
r.reset(term)
r.tick = r.tickElection
r.lead = lead
r.state = StateFollower
}
func (r *raft) becomeCandidate() {
r.step = stepCandidate
r.reset(r.Term + 1)
r.tick = r.tickElection
r.Vote = r.id
r.state = StateCandidate
}
func (r *raft) becomeLeader() {
r.step = stepLeader
r.reset(r.Term)
r.tick = r.tickHeartbeat
r.lead = r.id
r.state = StateLeader
r.prs[r.id].becomeReplicate()
r.pendingConfIndex = r.raftLog.lastIndex()
emptyEnt := pb.Entry{Data: nil}
r.reduceUncommittedSize([]pb.Entry{emptyEnt})
}

到此,就完成了 EtcdServer的创建。

接下来,再来看EtcdServer的开始方法Start:

1
2
3
4
5
6
7
8
9
EtcdServer/Start
s.start() // 启动
s.goAttach(func() { s.adjustTicks() }) // 调整频率,启动时加快选举
s.goAttach(func() { s.publish(s.Cfg.ReqTimeout()) }) // 发布节点属性信息,以遍其他节点可查询
s.goAttach(s.purgeFile) // 启动异步任务做文件的合并操作:db文件\snap文件\WAL文件
s.goAttach(func() { monitorFileDescriptor(s.getLogger(), s.stopping) }) // 监控文件句柄数不能超过系统限制的80%
s.goAttach(s.monitorVersions)
s.goAttach(s.linearizableReadLoop) // 线性一致性读异步任务(后续详细讲解)
s.goAttach(s.monitorKVHash)

EtcdServer.start()方法,首先进行一系列通道的初始化,然后异步执行EtcdServer.run()方法:

1
2
3
4
5
6
7
EtcdServer.run
|- sched := schedule.NewFIFOScheduler() //首先创建一个先进先出的异步调度器
|- rh := &raftReadyHandler{...} // 创建一个raftReadHanlder用于处理一些节点信息操作的回调。
|- s.raftNode.start(rh) // 启动raftNode的异步处理流程
|- 开启循环,监听 raftNode的 applyC通道 以及租约过期通道 `s.lessor.expiredC`
|- 对于 `applyC`,当从其中获取消息时,通过先进先出异步调度器顺序执行 applyAll方法
|- 对于

展开 raftNode.start(rh)的逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
select {
case <-r.ticker.C: // 监听超时通知通道
r.tick()
case rd := <-r.Ready(): // 监听 raft.node的readyC通道,其为需要发送给其他节点或广播的消息
// 根据消息更新自身状态
// 然后应用到状态机中,最后发送给其他节点(非主节点时)或广播给其他节点(主节点)
if rd.SoftState != nil {
// 判断是否需要更新主节点信息
}
if len(rd.ReadStates) != 0 {
// 对于线性一致性读,进行通知回调
}

notifyc := make(chan struct{}, 1)
// 通过rh来更新提交索引号
updateCommittedIndex(&ap, rh)
select {
case r.applyc <- ap:
case <-r.stopped:
return
}
if islead {
r.transport.Send(r.processMessages(rd.Messages))
}
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
// ...
}
//...
r.raftStorage.Append(rd.Entries)
if !islead {
msgs := r.processMessages(rd.Messages)
// ...
r.transport.Send(msgs)
} else {
notifyc <- struct{}{}
}
r.Advance()
case <-r.stopped:
return
}

完成etcdServer的启动后,开始http/grpc服务对外提供服务(peer间的服务以及对client开放的服务)。
我们以servePeers()来讲解启动服务过程。

1
2
3
4
5
6
7
8
servePeers()
|- etcdhttp.NewPeerHandler //创建 `http.Handler`,用于处理 节点间 以`raft`、`raft/`、`/leases`、`/leases/internal`为前缀的http请求
|- 对于每个 `e.Peers`:执行以下操作
|- gs := v3rpc.Server(e.Server, peerTLScfg) // 创建grpcServer,然后注册服务描述信息,如kv、watch、lease、cluster、auth、maintenance。
|- m := cmux.New(p.Listener) // 创建连接多路转接器,用于转发连接到不同的服务里去处理,其工作原理后续讲解
|- go gs.Serve(m.Match(cmux.HTTP2())) // 筛选出http2的链接,并对其服务
|- go srv.Serve(m.Match(cmux.Any())) //对于剩下的连接,用http服务进行处理
|- 对每个 `e.Peers`.serve() 启动监听服务

当新连接到达时,处理流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
cMux.Server()
|- c, err := m.root.Accept() // 通过最原始的方式获取到达的连接
|- m.serve(c, m.donec, &wg) // 执行serve进行分发
|- for _, sl := range m.sls {// sl包装了 其自身的匹配器列表,和转发通道
for _, s := range sl.ss {// s为通过cMux.Match方法创建的匹配器
matched := s(muc.Conn, muc.startSniffing())
if matched { // 若该连接匹配上了,则将该连接通过连接发送到sl的连接接收通道里
//...
select {
case sl.l.connc <- muc:
case <-donec:
_ = c.Close()
}
return
}
}
}

到此整个初始化过程就完成了。其后开始进行选举,那么选举是哪里出发的呢?
回到创建raftNode的地方 r.ticker = time.NewTicker(r.heartbeat) 开启了ticker。当时间到达时,tickder.C中得到通知。而其正在被 raftNodestart方法中的循环监听着。进一步就触发了raftNoe.tick()方法。

1
2
3
4
5
6
7
8
9
raftNoe.tick()
|- node.Tick()方法, 之前介绍过 raftNode与node之间的关系
|- n.tickc <- struct{}{} 通知 node.tickc通道
|- node的run循环中监听此通道,进一步调用 `raft.tick()`方法。此方法在 `becomeFollower`时设置成了 `tickElection`
|- r.Step(pb.Message{From: r.id, Type: pb.MsgHup}) // 到此开始发起选举
|- r.campaign(campaignElection) // 开始角逐 主节点角色
|- r.becomeCandidate // 更改节点角色为 候选者
|- r.step = stepCandidate...
|- r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: rL.lastIndex, LogTerm: r.raftLog.lastTerm(), Context: ctx}) // 对每个节点发送 `pb.MsgVote` 消息 携带本节点的任期,索引

当其他接收到该节点的投票请求时:

1
2
3
4
5
6
7
8
9
peer.go/startPeer(180L)
|- etcdServer.Pocess // 转发到ectdServer进行处理
|- node.Step // 调用 raftNode,并间接调用 raft.node
|- node.stepWithWaitOption
|- raft.Step
|- 首先进行判断是否可投票(是否已经投给了这个需要投票的人|| 没投票,且没有主|| 消息的任期比本节点大)
|- 判断是否要投票给请求投票的节点(请求节点任期比本节点大,或者任期相等时索引Id是否比本节点大)
|- 上面条件都满足时,返回`MsgVoteResp`消息,告诉请求者,其同意投票给它
|- 但上述条件不满足时,则返回 拒绝消息给请求者

当节点收到 其他的投票反馈消息时,最终会调用 raft.go/stepCandidate方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// poll方法传进去本消息的投票,返回已经有多少赞成票
gr := r.poll(m.From, m.Type, !m.Reject)
switch r.quorum() {// 当达到quorum个时,进行角色转变
case gr:
if r.state == StatePreCandidate {
r.campaign(campaignElection)
} else {
// 转换为主节点
r.becomeLeader()
// 广播消息
r.bcastAppend()
}
case len(r.votes) - gr:
r.becomeFollower(r.Term, None)
}

通过上面的逻辑,可以看出,当投票数达到quorum数时,转变角色为主节点。同时向所有其他节点广播本节点状态以及记录信息(MsgApp),其他节点接收到此消息后,自动转变为 follower角色,整个集群初始化完成。

对于etcd的选举,还需要说明的是,etcd为了某些网络分区的问题了设置PreVote流程(只需要启动节点的时候 设置 pre-vote 参数)。即在进行真正的选举之前 先进行PreVote得到大多数节点同意选举之后才进行真正的选举。可以解决如下问题:

  • 对于网络分区的节点,在重新加入集群的时候不会中断集群;(因为获取不了大部分节点的许可,索引其Term无法增大,所以赢不了选举主节点)。

到此,etcd的启动到建立集群、完成选举的整个过程就介绍完了。

附加图:
下图为 EtcdServerraftNoderaft.noderaft间的联系。

最后补充说明下etcdproxy模式:
etcd可以通过命令./etcd –proxy on –listen-client-urls的形式启动代理模式。代理模式下,它的作用是一个反向代理,接收客户端请求,然后转发到etcd集群。
代理模式有2种运行形式:readwritereadonly,默认情况下为readwrite,即会将读写请求都进行转发,而readonly形式下,则只转发读请求,写请求将报5xx错误,

IDEA中启动ETCD方式:

1
2
3
4
debug方式运行三个终端程序 `etcd/main.go` 并设置如下参数:
--name infra1 --listen-client-urls http://127.0.0.1:2379 --advertise-client-urls http://127.0.0.1:2379 --listen-peer-urls http://127.0.0.1:12380 --initial-advertise-peer-urls http://127.0.0.1:12380 --initial-cluster-token etcd-cluster-1 --initial-cluster infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380 --initial-cluster-state new --enable-pprof --logger=zap --log-outputs=stderr
--name infra2 --listen-client-urls http://127.0.0.1:22379 --advertise-client-urls http://127.0.0.1:22379 --listen-peer-urls http://127.0.0.1:22380 --initial-advertise-peer-urls http://127.0.0.1:22380 --initial-cluster-token etcd-cluster-1 --initial-cluster infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380 --initial-cluster-state new --enable-pprof --logger=zap --log-outputs=stderr
--name infra3 --listen-client-urls http://127.0.0.1:32379 --advertise-client-urls http://127.0.0.1:32379 --listen-peer-urls http://127.0.0.1:32380 --initial-advertise-peer-urls http://127.0.0.1:32380 --initial-cluster-token etcd-cluster-1 --initial-cluster infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380 --initial-cluster-state new --enable-pprof --logger=zap --log-outputs=stderr

3. 数据存储

Etcd的存储部分,可以分两部分来讲解。一部分是其应用层的数据存储方式,另一部分是raft相关数据的存储。Etcd应用层的数据存储从v3版本开始就延用boltDB,其也是CoreOS的产品boltDB。PS:本文主要聚焦于v3版本,对于v2版本不作解读。

下面将分别介绍这两部分内容:

3.1 raft数据存储

首先我们来介绍下raft相关的数据存储:
raft中有两个比较重要的组件:

  • raftLog:用来保存状态机相关信息的,包括当前任期、索引号、不稳定记录项等;
  • WAL:预写日志器,用于以顺序形式写入操作记录,以便故障时数据恢复;
  • Snapshot:数据快照,一般用于启动时快速恢复数据。

首先来看raftLog

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
type raftLog struct {
// 包含所有稳定的记录 MemeoryStorge
storage Storage
// 包含所有不稳定的记录
unstable unstable
// 提交记录索引
committed uint64
// 应用记录索引
applied uint64
}
type MemoryStorage struct {
hardState pb.HardState
// 快照信息(保存 v2版存储中的保存的数据快照、任期、索引、ConfState[集群节点信息])。快照生成一般有个条件:距上次提交的次数大于`SnapshotCount`(默认10000)
snapshot pb.Snapshot
ents []pb.Entry
}
// 从名字也可以看出其用途
type unstable struct {
// 快照信息,这里只会在节点加入集群,主节点向其发送`MsgSnap`消息的时候才会有
snapshot *pb.Snapshot
entries []pb.Entry
offset uint64
}

下图描述了数据从客户端请求到落地各个阶段与以前存储结构的关系:

其中,8’、9、11 是涉及 I/O 的操作,其他均为内存操作。
WAL的操作在每次写事务操作中都会存在,因此其是制约etcd写性能的一个重要因素。接下来,将重点介绍WAL的工作原理。

1
2
3
4
5
6
7
8
9
10
11
12
type WAL struct {
dir string // WAL文件所在目录
dirFile *os.File // 目录文件句柄
metadata []byte // 元数据,记录在每个wal文件头
state raftpb.HardState // 硬状态(任期、索引号),记录在每个文件头
start walpb.Snapshot // snapshot to start reading
decoder *decoder // 解码器
enti uint64 // 保存到WAL的最大索引号
encoder *encoder // 编码器
locks []*fileutil.LockedFile // 文件锁
fp *filePipeline // 文件创建工具,预先创建文件
}

首先,来看其创建过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
wal.Create
|- tmpdirpath := filepath.Clean(dirpath) + ".tmp"
|- os.RemoveAll(tmpdirpath)
|- fileutil.CreateDirAll(tmpdirpath)
|- p := filepath.Join(tmpdirpath, walName(0, 0))
|- f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode)
|- f.Seek(0, io.SeekEnd)
|- fileutil.Preallocate(f.File, SegmentSizeBytes, true) // 预分配空间
|- w := &WAL{
lg: lg,
dir: dirpath,
metadata: metadata,
}
|- w.encoder, err = newFileEncoder(f.File, 0) // 编码器,编码器编码的同时会将结果写到文件中
|- w.locks = append(w.locks, f)
|- w.saveCrc(0) w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata})// 写文件头
return nil, err
}
|- w.SaveSnapshot(walpb.Snapshot{})
|- w.renameWAL(tmpdirpath) // 更改零时文件名称为waldir,创建FilePipeline(用于预先创建文件)
|-pdir, perr := fileutil.OpenDir(filepath.Dir(w.dir))
|- fileutil.Fsync(pdir)

创建WAL时,会初始化编码器以及FilePipeline。下面再以其Save方法来介绍保存记录的过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
WAL.Save
|- mustSync := raft.MustSync(st, w.state, len(ents)) // 判断是否需要文件同步,写入磁盘
|- for each ents: saveEntry
|- b := pbutil.MustMarshal(e) rec := &walpb.Record{Type: entryType, Data: b}
|- w.encoder.encode(rec) // 编码后写入文件
|- w.saveState(&st) // 保存状态
|- curOff, err := w.tail().Seek(0, io.SeekCurrent)
|- if curOff < SegmentSizeBytes & mustSync w.sync() 如果文件还没大于SegmentSizeBytes,且需要同步,则进行文件同步
|- 如果大于,则进行切割文件 `w.cut`
|- fpath := filepath.Join(w.dir, walName(w.seq()+1, w.enti+1))
|- newTail, err := w.fp.Open() // 通过filePipeline获取一个新的零时文件
|- 进行初始化 w.encoder, err = newFileEncoder(w.tail().File, prevCrc) ...
|- os.Rename(newTail.Name(), fpath) // 重命名
|- w.sync()

随着记录的增加,wal文件会越来越多,入股不做处理的话会导致磁盘被占满。那么etcd是怎么做的呢?
其实是由两步构成的:

  1. etcd每次进行执行快照的实时,会进行wal.ReleaseLockTo(snap.Metadata.Index)释放文件锁的操作。(释放快照对应索引号之前的所有WAL文件句柄)
  2. 之前在EtcdServer启动章节介绍过,其启动后会启动一个定时任务purgeFile。其会针对snap.dbsnapwal文件做30秒一次的fileutil.PurgeFile任务:
    1. 任务带有参数 MaxWalFiles,获取指定wal.dir下所有文件,然后按文件名排序,从小到大进行遍历:尝试锁文件。如果成功,则进行删除,否则的话说明依然被etcd锁占用。

3.2 应用数据存储

在解析etcd应用层数据存储结构前,先来介绍下etcd的数据存储形式。etcd对数据的存储并不是直接存储key-value对,而是引入了一种带版本号revision的存储方式:以数据的revisonkey,键值对为值。
revision由两部分组成:main-revision.sub-revisionmain-revision为事务ID,sub-revision为事务中一次操作ID。

举例来说:
系统刚启动后,在一个事务中执行put ty dj \n put dj ty两个操作,实际存储的是

  1. {1,0} key=ty val=dj
  2. {1,1} key=dj val=ty

紧接着执行第二次操作:put ty dj90 \n put dj ty92,那么存储中会追加如下信息:

  1. {2,0} key=ty val=dj90
  2. {2,1} key=dj val=ty92

而为了支持这种存储形式快速查询,etcd建立了treeIndex结构,用于建立keyrevision间的关系。随之,通过key查询val的过程如下:

treeIndex是一个b-ree,其存储这keyIndex信息。KeyIndex的结构如下:

1
2
3
4
5
6
key         []byte
modified revision // 最后一次更改版本信息
generations []generation // 代:每一代记录着键值对从创建到删除的过程
|- ver int64 // 存放了多少次修改
created revision // 创建此generation的第一个版本
revs []revision

keyIndex中,需要特别说明的是generation数据内部,保存的revs,如果最后一项为tombstone,则表示在这个代中被删除了。被tombstonegeneration是可以被删除的。针对此,keyIndex有个专门的函数compactcompact(n)可以将主版本小于n的数据。

将完了其存储结构和存储格式,下面将从启动和执行一次操作两个流程来讲解其的工作原理。对boltDB不了解的读者建议先去了解下 boltDBboltDB学习

3.2.1 启动过程

etcd应用层存储创建过程如下:
首先创建backend,其是对boltDB的封装,加入一些批量提交逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
bepath := cfg.backendPath()
beExist := fileutil.Exist(bepath)
be := openBackend(cfg) // 创建
|- newBackend
|- bolt.Open(bcfg.Path, 0600, bopts)
|- b := &backend{
db: db,
batchInterval: bcfg.BatchInterval,
batchLimit: bcfg.BatchLimit ...
}
|- b.run() // 定时任务,批量周期进行提交
|- t := time.NewTimer(b.batchInterval)
|- select {
case <-t.C:
b.batchTx.CommitAndStop()
return
}
if b.batchTx.safePending() != 0 {
b.batchTx.Commit()
}

有了backend后,会再基于此作一层封装:mvcc.New(srv.getLogger(), srv.be, srv.lessor, &srv.consistIndex),其内部包含watcher处理机制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
mvcc.New
|- s := &watchableStore{
store: NewStore(lg, b, le, ig),
victimc: make(chan struct{}, 1),
unsynced: newWatcherGroup(),
synced: newWatcherGroup(),
}
|- s.store.ReadView = &readView{s}
s.store.WriteView = &writeView{s}
if s.le != nil {
s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() })
}
|- go s.syncWatchersLoop() // 用于watch机制的异步任务
|- go s.syncVictimsLoop()

这里我们比较关注的是NewStore逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
mvcc.NewStore
|- s := &store{
b: b,
kvindex: newTreeIndex(lg), // 创建 treeIndex
le: le,
currentRev: 1, // 最近一次事务的版次
compactMainRev: -1,//最近一次事务的主版次
bytesBuf8: make([]byte, 8),
fifoSched: schedule.NewFIFOScheduler(),
stopc: make(chan struct{}),
}
|- s.ReadView = &readView{s}
s.WriteView = &writeView{s}
if s.le != nil {
s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() })
}
|- tx.UnsafeCreateBucket(keyBucketName) // 创建bucket-"key"用来存储kv数据,
|- tx.UnsafeCreateBucket(metaBucketName) //创建Bucket-”meta“用来存储元数据
|- s.restore() // 恢复存储
// TreeIndex 是b树
func newTreeIndex(lg *zap.Logger) index {
return &treeIndex{
tree: btree.New(32),
lg: lg,
}
}
s.restore
|- rkvc, revc := restoreIntoIndex(s.lg, s.kvindex)// 构建tree-index方法,返回两个通道,用于向内传入数据
|- for {
// 分页提取key-val
keys, vals := tx.UnsafeRange(keyBucketName, min, max, int64(restoreChunkKeys))
if len(keys) == 0 {
break
}
// 将key-val进行解码,然后传递给`rkvc`通道,在restoreIntoIndex中监听`rkvc`通道,进行构建`treeIndex`
restoreChunk(s.lg, rkvc, keys, vals, keyToLease)
if len(keys) < restoreChunkKeys {
break
}
newMin := bytesToRev(keys[len(keys)-1][:revBytesLen])
newMin.sub++
revToBytes(newMin, min)
}
|- ...

由此就完成了treeIndexboltDB的初始化。
最后,etcd又对mvcc.watchableStore进行了一次封装srv.newApplierV3Backend(),其用于衔接存储和raft消息请求。

3.2.2 请求应用到存储

put ty dj请求通过raft协议提交决策后,最终会调用到applierV3backend.put方法进行应用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
applierV3backend.put
|- txn = a.s.KV().Write()
|- txn.Put(p.Key, val, leaseID)
|- storeTxnWrite.put
|- rev := tw.beginRev + 1
c := rev
|- _, created, ver, err := tw.s.kvindex.Get(key, rev) // 如果key存在则获取
if err == nil {
c = created.main
oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)})
}
|- ibytes := newRevBytes()
|- idxRev := revision{main: rev, sub: int64(len(tw.changes))}
|- revToBytes(idxRev, ibytes)
|- ver = ver + 1
|- kv := mvccpb.KeyValue{
Key: key,
Value: value,
CreateRevision: c,
ModRevision: rev,
Version: ver,
Lease: int64(leaseID),
}
|- d, err := kv.Marshal()
|- tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d) // 存储到boltDB中,key为`revision`
|- tw.s.kvindex.Put(key, idxRev) // 将key到 revision的映射存储到treeIndex中

到此就完成了存储模块的讲解。

2. 网络通信

在上一节,我们介绍了etcd集群的启动过程。在其中也简单介绍了会初始化与其他成员节点通信的连接,以及启动为其他成员节点提供服务的GRPC服务和Http服务以及启动服务客户端的服务。本章我们将详细介绍通信相关的实现。

2.1 成员节点间通信(peer)

前一章节介绍了peer服务端启动过程。下面,我们来讲解下调用端是如何初始化以及如何调用的。
与成员节点间通信传输的初始化之前介绍过,是调用Transport.AddPeer方法。
前面我们介绍了Transport的作用:向成员节点发送raft消息,并从成员节点获取raft消息。下面将详细描述其作用。
首先来看其初始化和启动过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
tr := &rafthttp.Transport{
DialTimeout: cfg.peerDialTimeout(),
ID: id,
URLs: cfg.PeerURLs,
ClusterID: cl.ID(),
Raft: srv, // EtcdServer
Snapshotter: ss,
ServerStats: sstats,
LeaderStats: lstats,
ErrorC: srv.errorc,
}
tr.Start()
|- t.streamRt = newStreamRoundTripper(t.TLSInfo, t.DialTimeout) // 其作用前面介绍过:可以执行事务性HTTP请求,给定请求获取结果
|- t.pipelineRt = NewRoundTripper(t.TLSInfo, t.DialTimeout) //
|- t.remotes = make(map[types.ID]*remote) // 初始化复制节点映射
|- t.peers = make(map[types.ID]Peer) // 初始化成员节点映射
|- t.pipelineProber = probing.NewProber(t.pipelineRt) // Prober的作用是探测,探测链路监控状态
|- t.streamProber = probing.NewProber(t.streamRt)

接下来,继续了解AddPeer实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Transport/AddPeer
|- startPeer
|- picker := newURLPicker(urls) // 其用于选取通信通道(负载均衡作用)
|- pipeline := &pipeline{...} // 创建pipeline,多通道发送消息。
|- pipeline.start() // 开启多个异步任务,监听 msgC通道,进行数据发送
|- p := &peer{...} // 创建peer结构体,初始化其属性,如recev、propc通道、消息发送器 msgAppV2Writer、writer(StreamWriter)
|- go select {
case mm := <-p.recvc: //监听 recev通道发来的消息,并将其丢给 EtcdServer.`Process`方法处理
if err := r.Process(ctx, mm)
}
|- go select {
case mm := <-p.propc: // 监听 propc 通道发来的消息,将其丢给...
if err := r.Process(ctx, mm);
}
|- 初始化 `p.msgAppV2Reader``p.msgAppReader` 用于读取消息
|- p.msgAppV2Reader.start() // 开启监听消息读取
|- p.msgAppReader.start()
|- addPeerToProber(t.Logger, t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rttSec) // 将该节点添加到探测器中,进行定期探测状态
|- addPeerToProber(t.Logger, t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rttSec)

从上面的流程可以看出 peer使用stream模式通信方式。读写分别用不同的协程去监听处理。其中streamWriter负责消息发送,streamReader负责消息接收。见下面详情:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
w := &streamWriter{
peerID: id,
status: status,
fs: fs,
r: r,
msgc: make(chan raftpb.Message, streamBufSize),
connc: make(chan *outgoingConn),
stopc: make(chan struct{}),
done: make(chan struct{}),
}
w.run:
|- select {
case <-heartbeatc:
//发送心跳消息
case m := <-msgc:
// 接收到需要发送的消息,进行编码并使用conn进行发送
case conn := <-cw.connc:
// 监听附加上来的连接通道,进行初始化 编码器、Flusher等
}

streamWriter里比较关键的一点是,这里的conn从哪来?往cw.connc传递conn的只有peer.attachOutgoingConn方法。
而在上面初始化并启动peer的时候是没有发现调用这个方法的 ???哪是哪里进行执行的呢?通过 追踪peer.attachOutgoingConn的调用方,最终发现,其在rafthhtp/http.go/ServeHTTP() L483处调用。而这个方法会在客户端建立连接进行http服务调用的时候执行。所以,在这里调用 peer.attachOutgoingConn 的作用是复用连接。当我们分析完读处理过程后,再来整体看 etcd节点是怎么建立通信链路的。

StreamReader的处理比StreamWriter的处理要简单一些。初始化完其属性后,调用start方法启动监听读请求过程。

1
2
3
4
5
6
StreamReader/start
|- run
|- rc, err := cr.dial(t) // 进行拨号,建立起与peer之间的连接
|- err = cr.decodeLoop(rc, t) // 获取返回结果,进行解码
|- for{ m, err := dec.decode() //循环解码
|- 如果 m 是`MsgProp`类型,则将消息传递到 propc通道,否则到 recv通道

这样每个peer之间的链路就建立了。回过头来,可以看出每对peer之间至少会有两条connection。且他们之间交互交错使用。

peer往对方发送的消息,是通过 对方跟自己建立的连接 来发送的。

最后,我们通过单次请求响应过程,来介绍节点间的通信过程。

当要想集群其他成员节点发送消息时,最终会调用peer.send方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
peer.send
|- writec, name := p.pick(m) // 首先peer会根据m的类型来选择一个发送通道
|- isMsgSnap(m) return p.pipeline.msgc, pipelineMsg // 如果是快照消息,则通过pipeline.msgc来发送消息
|- isMsgApp(m) return p.msgAppV2Writer.writec(),streamAppV2 // 如果是MsgApp消息
|- 如果 p.writer.writec() 可用,return p.writer.writec()
|- 备用 p.pipeline.msgc, pipelineMsg
|- writec <- m: // 即发送数据到 写通道中,`StreamWriter` 中会监听此通道。

StreamWriter.run
|- select {
case m := <-msgc:
|- enc.encode(&m)
|- binary.Write(enc.w, binary.BigEndian, uint64(m.Size())) // 对于 messageEncoder,首先发送字节长度
|- enc.w.Write(pbutil.MustMarshal(m)) //再发送字节内容
//...
}

目标节点的 StreamReaderp.msgAppReader)接收到消息后,通过解析后传递到 EtcdServer(Process),即完成单次通信;对于请求的响应,则通过对方的peer来发送给本节点,本节点的StreamReader来接受响应消息。

2.2 客户端通信

对于客户端通信,因为是GRPC或者HTTP简单的请求响应方式,因此这里就不再介绍了。

4. 事务请求

[TOC]

本节我们将讲述 etcd集群的事务请求处理过程。首先我们将以put操作为例来讲解一般性事务请求过程。其次,我们将介绍 etcd特有的线性一致性读请求过程。

put 请求

通过 etcdctl put ty dj 命令,向本地监听2380端口的节点发送 写请求。
该请求为GRPC请求,会中转到quotaKVServer进行处理。

Leader接收请求,并广播消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
quotaKVServer.Put
|- s.qa.check(ctx,r) //检查是否满足quota,即是否有足够的内存,如果没有则忽略请求,并发报警
|- EtcdServer.Put // 调用EtcdServer进行写
|- s.raftRequest(ctx, pb.InternalRaftRequest{Put: r}) // EtcdServer将请求进行包装成 `InternalRaftRequest`
并调用`raftRequest`进行请求,并等待获取结果
|- s.processInternalRaftRequestOnce// 最终会调用此方法
|- if ci > ai+maxGapBetweenApplyAndCommitIndex //首先检查apply索引和 commit索引是否相差太大,若太大则忽略请求(自我保护机制)
|- 生成请求头,并生成请求ID `s.reqIDGen.Next()`
|- r.Marshal() // 对请求进行编码
|- ch := s.w.Register(id) // 注册请求响应回调
|- err = s.r.Propose(cctx, data) // 向 raftNode 发起提议,申请写入数据
|- select { // 等待结果,或超时
case x := <-ch:
return x.(*applyResult), nil
case timeout
}

接下来就进入了主要的处理流程:

1
2
3
4
5
node.Propose
|- n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}}) // 将请求包装成 `MsgProp`消息
|- n.stepWithWaitOption
|- pm := msgWithResult{m: m} // 将消息再一次封装
|- 将消息传递给 通道`n.propc`

可以看出 将请求两层封装之后传递给通道n.propc
node的循环中,其会监听proc通道,当有消息到来时:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
node.start | 360+L
|- m.From = r.id // 将自己的id设置到消息的Form字段里
|- err := r.Step(m) // 再交给raft处理
|- r.step(r, m) //此处就会根据节点的角色不同进行不同处理,若本节点是Follower则会转交给Leader节点进行处理,然后再进行如上的流程。这里假设本节点是主节点 即 stepLeader方法
|- r.appendEntry(m.Entries...) // 将该记录追加到`raftLog`中
|- r.increaseUncommittedSize(es) // 校验是否超过上界
|- r.raftLog.append(es...) // 追加到日志中记录下来(unstable中)
|- r.getProgress(r.id).maybeUpdate(li) //更新本节点的复制进度
|- r.maybeCommit() //对于单节点,尝试更新一次 raftLog中的 commit索引。//大多数节点都到达的索引
|- r.bcastAppend() // 广播消息
|- 对于每个其他成员节点,都执行 `r.sendAppend(id)`
|- pr := r.getProgress(to) //获取节点的复制进程
|- term, errt := r.raftLog.term(pr.Next - 1) // p.Next为该节点下次复制的索引号
|- ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize) // 获取需要复制给对方节点的记录
|- 叫这些信息都封装成`MsgApp`消息,进行发送
|- r.send(m)
|- r.msgs = append(r.msgs, m) // 将消息追加到了r.msgs中

当本节点将需要发送给每个成员节点的消息都放到r.msgs后,node.run会进入下一轮循环。此时:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
node.run
|- rd = newReady(r, prevSoftSt, prevHardSt) // 准备准备好发送的消息结构体 Ready
|- rd := Ready{
Entries: r.raftLog.unstableEntries(),
CommittedEntries: r.raftLog.nextEnts(),
Messages: r.msgs, // 消息在此进行包装
}
|- readyc = n.readyc
|- select {
case readyc <- rd: // 此处即将消息传递到 n.readyc通道中
//完成传递后,会进行一些状态的更新
if index := rd.appliedCursor(); index != 0 {
applyingToI = index // 提交消息的最大索引号
}
advancec = n.advancec // 赋值后,待本次消息发送完成后,进入下次循环。(等待 `raftNode`的通知)
case <-advancec:
// 得到通知后,即知道之前的已提交的消息都已经发送出去了可以应用了
if applyingToI != 0 {
r.raftLog.appliedTo(applyingToI)
applyingToI = 0
}
//...
}

发送到n.readyc通道中的消息,会被raftNode捕获:

1
2
3
4
5
6
7
8
9
10
for {
case rd := <-r.Ready():
// ...
if islead { // 发送消息给相应节点
r.transport.Send(r.processMessages(rd.Messages))
}
// ...
r.Advance() // 这里是通知 node.advancec 可以处理下一个循坏
// ...
}

以上是Leader角色接收到Put请求,然后自身进行处理,然后向 Follower发送MsgApp消息。

Follower接收到Leader的广播,进行处理回复

Follower收到消息后,会执行EtcdServer.Process方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
EtcdServer.Process
|- node.Step
|- node.step
|- n.recvc <- m // 最终传递给node.recvc通道
|- raft.Step // node监听node.recvc,接收到消息后调用 raft执行逻辑
|- stepFollower
|- r.handleAppendEntries(m) //处理追加记录
|- r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...) // 尝试追加消息
|- l.matchTerm(index, logTerm) // 校验 term 和index信息是否跟本节点一致
|- l.findConflict(ents) //找出冲突的索引号
|- l.append(ents[ci-offset:]...) // 追加记录到raftLog中(unstable中)
|- l.commitTo(min(committed, lastnewi)) // 更新 commitId
|- r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) // 向 Leader反馈 信息`MsgAppResp`和索引号
|- r.msgs = append(r.msgs, m) // 将消息追加到了r.msgs中

发送的消息最终被 raftNode接收:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
for {
case rd := <-r.Ready():
notifyc := make(chan struct{}, 1)
ap := apply{
entries: rd.CommittedEntries, // raft中已经提交的消息,
snapshot: rd.Snapshot,
notifyc: notifyc,
}
// 将Committed数据应用到
updateCommittedIndex(&ap, rh) // 更新 EtcdServer的CommitId
select {
case r.applyc <- ap: // 将应用信息传递到 raftNode.applyc通道中,其被EtcdServer所监听。后续详解
}
r.storage.Save(rd.HardState, rd.Entries) // 实际调用WAL的Save方法,将记录写入到文件中 // ... wal
r.raftStorage.Append(rd.Entries) // 将消息追加到 MemoryStorage中
if !islead {
notifyc <- struct{}{}
r.transport.Send(msgs)
}
r.Advance() // 这里是通知 node.advancec 可以处理下一个循坏
// ...
}

再看EtcdServer接收到apply消息后的处理:
再来看 EtcdServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
EtcdServer/1020L+-
select {
case ap := <-s.r.apply():
f := func(context.Context) { s.applyAll(&ep, &ap) }
sched.Schedule(f) // 顺序调用`s.appAll`方法
s.applySnapshot(ep, apply)
s.applyEntries(ep, apply) //这里是应用服务最重要的地方
|- 首先筛选出需要应用到此`EtcdServer`的记录
|- s.apply(ents, &ep.confState)//然后将这些记录应用到 `EtcdServer`中
|- 对于正常的记录(`EntryNormal`类型)
|- case raftpb.EntryNormal:
s.applyEntryNormal(&e)
|- s.consistIndex.setConsistentIndex(e.Index)// 设置一致性索引号
|- 解码消息 Unmarshal
|- ar = s.applyV3.Apply(&raftReq) // 应用到应用数据存储中
|- s.w.Trigger(id, ar) // 回调,通知该记录对应的请求
s.setAppliedIndex(e.Index)
s.setTerm(e.Term)

|- <-apply.notifyc // 接到raftNode的通知已经完成发送等操作
|- s.triggerSnapshot(ep) // 检查是否有 SnapshotCount操作,如果有则进行快照
}

总结起来,Follower节点接收到Leader节点的MsgApp应用到其节点中,然后向反馈Leader MsgAppResp消息,并带上自己的最大记录号。

Leader收到Follower的回复进行处理

Leader收到MsgAppResp消息后,经过层层传递到stepLeader方法中:

1
2
3
4
5
6
7
8
9
10
|- pr := r.getProgress(m.From) // 获取该节点的进度
|- switch m.Type {
case pb.MsgAppResp:
pr.RecentActive = true //设置该节点状态
|- pr.maybeUpdate(m.Index) //更新该节点在 主节点中的进度信息(已经复制到什么位置了)
|- 根据该节点的状态进行相应操作
|- if r.maybeCommit() { // 判断是否可以提交
r.bcastAppend() // 若可以提交,则广播消息
}
}

我们先来看判断是否可以提交的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
raft.maybeCommit
mis := r.matchBuf[:len(r.prs)]
idx := 0
// 首先将每个成员节点确认的同步消息索引号放入数组中
for _, p := range r.prs {
mis[idx] = p.Match
idx++
}
// 按索引号排序
sort.Sort(mis)
// 获取 len(mis)-r.quorum()位的索引位置
mci := mis[len(mis)-r.quorum()]
return r.raftLog.maybeCommit(mci, r.Term) //判断是否比raftLog中的CommitId大,若是则表示有数据可以提交
|- if maxIndex > l.committed && l.zeroTermOnErrCompacted(l.term(maxIndex)) == term {
l.commitTo(maxIndex) // 更新commitId
}

r.bcastAppend()的处理过程之前以及介绍过,其会构造一遍Ready数据结构,带上CommitedEntries提交的数据、其他节点未同步的Entries,传递给raftNoderaftNode接收到此消息后,会执行与上面分析的Follower的逻辑类似。

线性一致性读

线性一致性读和一般的读不通的时候,会在进行真正的读请求之前,先与其他节点进行一个check,查看当前集群主是否发生了变化。通过etcdctl get ty命令,向监听本地2380端口的节点发送GRPC请求,会被中转到quotaKVServer上。
get请求在etcdctl中会被转换成Range请求。quotaKVServer最终会调用EtcdServer.Range方法。

Leader节点收到请求后,广播心跳消息

1
2
3
4
5
6
7
8
9
10
quotaKVServer
|- kvServer.Range
|- EtcdServer.Range
|- if !r.Serializable { //是否线性一致性
err = s.linearizableReadNotify(ctx)
if err != nil {
return nil, err
}
resp, err = s.applyV3Base.Range(nil, r)
}

线性一致性读的主要逻辑体现在 s.linearizableReadNotify中。

1
2
3
4
5
6
7
8
s.linearizableReadNotify  // 线性读通知
|- nc := s.readNotifier // 用于通知读协程可以进行读操作
|- select {
case s.readwaitc <- struct{}{}: //通知读等待通道
}
|- select {
case <-nc.c: // 等待读状态通知
}

EtcdServer在启动的时候会启动linearizableReadLoop线性一致性读循环。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
linearizableReadLoop
|- ctxToSend := make([]byte, 8)
|- id1 := s.reqIDGen.Next()
|- binary.BigEndian.PutUint64(ctxToSend, id1)
|- for ;; {
select {
case <-s.readwaitc: // 当接收到线性一致性读请求后,循环就可以往下走
}
nextnr := newNotifier() // 创建新的读通知器,给下一个读请求使用
nr := s.readNotifier
s.readNotifier = nextnr
err := s.r.ReadIndex(cctx, ctxToSend) // 调用`node.ReadeIndex`发起读索引请求
select {
case rs = <-s.r.readStateC:
done = bytes.Equal(rs.RequestCtx, ctxToSend)
case <-leaderChangedNotifier:
nr.notify(ErrLeaderChanged)
case <-time.After(s.Cfg.ReqTimeout()):
nr.notify(ErrTimeout)
timeout = true
slowReadIndex.Inc()
case <-s.stopping:
return
}
|- nr.notify(nil)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
node.ReadIndex
|- n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}}) //
|- raft.Step
|- raft.stepLeader
|- if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term { // 当本任期没有提交任何消息时,拒绝读
return nil
}
switch r.readOnly.option {
case `ReadOnlySafe`:
r.readOnly.addRequest(r.raftLog.committed, m) // 将只读请求添加到`readyOnly`结构里。
r.bcastHeartbeatWithCtx(m.Entries[0].Data)// 带上请求的ID,向所有成员节点发送心跳。
case `ReadOnlyLeaseBased`:
ri := r.raftLog.committed
if m.From == None || m.From == r.id { // from local member
r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
} else {
r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries})
}
}

Follower接收到Leader的心跳请求,并返回

Follower接收到Leader的心跳请求,最终会调用:

1
2
r.raftLog.commitTo(m.Commit)
r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context}) // 发送心跳回复

Leader接收Follower回复,通知客户端请求继续

Leader节点接收到Follower的回复后,最终会调用如下逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
pr.RecentActive = true
if pr.Match < r.raftLog.lastIndex() { // 当Follower节点有数据没复制时,进行发送
r.sendAppend(m.From)
}
// 操作readyOnly接收Ack,并返回有多少节点已经确认
ackCount := r.readOnly.recvAck(m)
if ackCount < r.quorum() {
return nil
}
// 返回有哪些请求可以继续往下走
rss := r.readOnly.advance(m)
for _, rs := range rss {
req := rs.req
if req.From == None || req.From == r.id {
// 将可以读的请求信息放到 r.readStates中,等后面提交到 ReadyC结构体,向上提交进行处理。
r.readStates = append(r.readStates, ReadState{Index: rs.index, RequestCtx: req.Entries[0].Data})
}
}
func (ro *readOnly) recvAck(m pb.Message) int {
rs, ok := ro.pendingReadIndex[string(m.Context)] // m.Context 为请求ID
if !ok {
return 0
}
rs.acks[m.From] = struct{}{}
return len(rs.acks) + 1 // 返回已经有多少Follower进行了应答
}
// 返回容许读的请求ID列表
func (ro *readOnly) advance(m pb.Message) []*readIndexStatus {
var (
i int
found bool
)
ctx := string(m.Context)
rss := []*readIndexStatus{}
for _, okctx := range ro.readIndexQueue {
i++
rs, ok := ro.pendingReadIndex[okctx]
rss = append(rss, rs)
// 找到匹配的读,以及可以触发读的请求队列。
if okctx == ctx {
found = true
break
}
}
if found {
ro.readIndexQueue = ro.readIndexQueue[i:]// 剔除之前的读请求
for _, rs := range rss {
delete(ro.pendingReadIndex, string(rs.req.Entries[0].Data))
}
return rss
}
return nil
}

raftNode结构体接收到Ready消息时,会检查是否有ReadStates,然后对其进行处理:

1
2
3
4
if len(rd.ReadStates) != 0 {
select {
case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]:
}

再回到linearizableReadLoop中:

1
2
3
4
5
6
7
8
9
10
11
select {
case rs = <-s.r.readStateC:
case <-leaderChangedNotifier:
nr.notify(ErrLeaderChanged)
case <-time.After(s.Cfg.ReqTimeout()):
nr.notify(ErrTimeout)
case <-s.stopping:
return
}
}
nr.notify(nil) // 通知客户端可以进行查询请求

到此就完成线性一致性读的全过程。

5. TTL & Lease 实现

etcd中用来实现TTL的机制叫LeaseLease可以用来绑定多个key。Lease的主要用法如下:

1
2
3
4
5
6
7
8
9
10
1. etcdctl lease grant 1900 // 申请一个租约,返回租约ID
- lease 326969472b0c5d05 granted with TTL(1900s)
2. etcdctl lease revoke 326969472b0c5d05 // 取消租约
- lease 326969472b0c5d05 revoked
3. etcdctl lease timetolive 326969472b0c5d08 // 查询剩余多长时间
- lease 326969472b0c5d08 granted with TTL(190s), remaining(182s)
4. etcdctl lease keep-alive 326969472b0c5d05 // 到期续约
- lease 326969472b0c5d05 keepalived with TTL(1900)
5. etcdctl lease keep-alive --once=true 326969472b0c5d0c // 单次续约
- lease 326969472b0c5d0c keepalived with TTL(200)

下面将解析其工作原理,首先将介绍其初始化过程,然后介绍创建、以及绑定key以及过期的操作。

5.1 初始化

Lessor是在创建EtcdServer是进行初始化的:lease.NewLessor(...)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat // 首先计算最小TTL单元,为选举跳数的1.5倍*心跳间隔时间
srv.lessor = lease.NewLessor(log, srv.be, lease.LessorConfig{MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())), CheckpointInterval: cfg.LeaseCheckpointInterval}) // 创建`Lessor`,传入最小租约TTL时间,和租约周期性检测时间间隔。租约的TTL时间最小不能小于最小租约时间(秒级别)
|- if checkpointInterval == 0 { // 设置默认时间为5分钟
checkpointInterval = 5 * time.Minute
}
|- l := &lessor{
leaseMap: make(map[LeaseID]*Lease),
itemMap: make(map[LeaseItem]LeaseID),
leaseHeap: make(LeaseQueue, 0), // 根据租约到期时间排序的小根堆
leaseCheckpointHeap: make(LeaseQueue, 0),
b: b,
minLeaseTTL: cfg.MinLeaseTTL,
checkpointInterval: checkpointInterval,
expiredC: make(chan []*Lease, 16),
stopC: make(chan struct{}),
doneC: make(chan struct{}),
}
|- l.initAndRecover() // 初始化并恢复
|- vs := tx.UnsafeRange(leaseBucketName...) // 从`boltDB`中,查询`lease` bucket中所有数组
|- 对于每个记录,解码`lpb.Unmarshal(vs[i])`,然后插进`le.leaseMap`
|- le.leaseMap[ID] = &Lease{
ID: ID,
ttl: lpb.TTL,
itemSet: make(map[LeaseItem]struct{}),
expiry: forever,
revokec: make(chan struct{}),
}
|- heap.Init(&le.leaseHeap) //初始化堆
|- heap.Init(&le.leaseCheckpointHeap)
|- go l.runLoop() // 运行500ms一次的任务,撤销过期的租约,周期性调度`Lease`检测。后续将详细分析其执行过程

完成了Lessor的初始化后,随着进入kvstore.restore的初始化,会伴随着恢复keylease的映射关系。

1
2
3
4
5
6
当从 bucket`key`中获取所有key-val后,执行`restoreChunk`方法
restoreChunk
|- 其会检测kv是否带`Lease`,若有则加入到`keyToLease`map
|- 对`keyToLease`map中的每个entry
|- s.le.Attach(lid, []lease.LeaseItem{{Key: key}})
|- 将`key`绑定到`lease`中,并建立`key``lease`的映射

5.2 Lease创建

创建Lease,最终会调用EtcdServer.LeaseGrant方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
EtcdServer.LeaseGrant
|- r.ID = int64(s.reqIDGen.Next() & ((1 << 63) - 1)) // 首先创建 leaseId
|- s.raftRequestOnce 发起提议
|- 当集群确认提交此提议后,会进行应用。调用`lessor.Grant`
|- l := &Lease{ // 创建 Lease
ID: id,
ttl: ttl,
itemSet: make(map[LeaseItem]struct{}),
revokec: make(chan struct{}),
}
|- if l.ttl < le.minLeaseTTL { // 保证租约ttl不小于`minLeaseTTL`
l.ttl = le.minLeaseTTL
}
|- l.refresh(0) // 设置 `Lease`的 `expiry`时间
|- le.leaseMap[id] = l // 放入 `leaseId`与`lease`的映射
|- item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()} // 过期时间戳、leaseId item
|- heap.Push(&le.leaseHeap, item) // 存入堆中
|- l.persistTo(le.b) // 将lease信息存储 boltDB中
|- le.scheduleCheckpointIfNeeded(l)
|- if lease.RemainingTTL() > int64(le.checkpointInterval.Seconds()) { // 若`lease`的剩余时间比`检测点间隔`大时,则存进`le.leaseCheckpointHeap`中
|- heap.Push(&le.leaseCheckpointHeap, &LeaseWithTime{
id: lease.ID,
time: time.Now().Add(le.checkpointInterval).UnixNano(),
})

5.3 Lease绑定key

put操作时通过添加参数--lease=326969472b0c5d08即将key绑定到lease上。直接定位storeTxnWrite.put方法:

1
2
3
4
5
6
7
8
9
10
11
oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)}) // 获取key之前绑定的`lease`
if oldLease != lease.NoLease {
err = tw.s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}}) // 剔除`key`与老`lease`的映射关系
}
if leaseID != lease.NoLease {
err = tw.s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}})
|- for _, it := range items {
l.itemSet[it] = struct{}{}
le.itemMap[it] = id
}
}

5.4 Lease过期

上文中介绍过在初始化Lease后,会启动lessor.runLoop任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
for {
// 注销过期的租约
le.revokeExpiredLeases()
// 检测调度的租约
le.checkpointScheduledLeases()
select {
// 500ms 一次
case <-time.After(500 * time.Millisecond):
case <-le.stopC:
return
}
}
le.revokeExpiredLeases
|- revokeLimit := leaseRevokeRate / 2 // 限流
|- ls = le.findExpiredLeases(revokeLimit) // 查询出过期的租约
|- select {
case le.expiredC <- ls:
}
|- `EtcdServer` 会监听`lessor.expiredC`通道:
|- 异步串行执行:`s.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: int64(lid)})`
|- 提交Revoke提议,最后进行应用:
|- lessor.Revoke
|- keys := l.Keys()
sort.StringSlice(keys).Sort()
for _, key := range keys { // 删除key
txn.DeleteRange([]byte(key), nil)
}
|- delete(le.leaseMap, l.ID)
|- le.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID))) // 从`lease`bucket中删除租约

6. Watch机制

watch用于监听指定key或指定key前缀的键值对的变动。其 api 如下:

1
2
3
etcdctl watch --prefix[=false] // 带前缀
--prev-kv[=false]// 是否获取之前的key-val对
--rev=0 // 开始监听的主Revision

下面将详细解析其工作原理。首先将介绍其初始化过程,其次介绍创建watch过程以及相关键值对修改时的操作。

6.1 初始化过程

在初始化存储的时候,提到了newWatchableStore方法,其对boltDB进行了一次封装。

1
2
3
4
5
6
7
8
9
10
s := &watchableStore{
store: NewStore(lg, b, le, ig),
victimc: make(chan struct{}, 1),
unsynced: newWatcherGroup(), // 包含所有未同步的watchers,有事件发生需要进行同步
synced: newWatcherGroup(), // 包含与存储进程同步的所有同步了的watchers,map的key即为watcher监听的key。
stopc: make(chan struct{}),
}

go s.syncWatchersLoop()// 启动每100ms一次的同步未同步映射中的监听者
go s.syncVictimsLoop()// 同步预先发送未成功的watchers

首先来看下WatcherGroup的数据结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
type watcherGroup struct {
// key到监听此key的watcher集合
keyWatchers watcherSetByKey
// 监听范围的监听者 (红黑树)
ranges adt.IntervalTree
// watcher 集合
watchers watcherSet
}
watchableStore.syncWatchers
|- curRev := s.store.currentRev
|- compactionRev := s.store.compactMainRev
|- wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev) // 从未同步的监听组中获取watcher集合
|- wg.chooseAll // 选择所有watcher中最小`Revision`
|- for w := range wg.watchers {
if w.minRev < compactRev { // 当小于当前压缩版本的时候直接做一次响应
select {
case w.ch <- WatchResponse{WatchID: w.id, CompactRevision: compactRev}:
w.compacted = true
wg.delete(w)
default:
// retry next time
}
}
if minRev > w.minRev {
minRev = w.minRev
}
}
|- revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0) // 查询大于revision的所有 key-val
|- evs = kvsToEvents(s.store.lg, wg, revs, vs) // 转变成 事件
|- for i, v := range vals {
|- kv.Unmarshal(v)
|- if !wg.contains(string(kv.Key)) { // 查询是否有监听此key的watcher
continue
}
|- ty := mvccpb.PUT
if isTombstone(revs[i]) {
ty = mvccpb.DELETE
kv.ModRevision = bytesToRev(revs[i]).main
}
|- evs = append(evs, mvccpb.Event{Kv: &kv, Type: ty})
}
|- newWatcherBatch(wg, evs)// 将变更事件与关注该事件的watcher建立映射 map[*watcher]*eventBatch
|- 对`WatcherBatch`中的每一项作如下处理:
|- 如果一次性未发送完成,则设置下次发送的Rev `w.minRev = eb.moreRev`
|- if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev}) { // 发送变更响应。将消息传递到`watcher.ch`中
} else {
if victims == nil {
victims = make(watcherBatch)
}
w.victim = true
}
|- if w.victim { // 若发送未成功,则将要发送的`WatcherBatch`存储到`victims`中
victims[w] = eb
} else { //发送成功的话,则
if eb.moreRev != 0 {
continue
}
s.synced.add(w)
}
|- s.unsynced.delete(w)
|- s.addVictim(victims)

总结整体流程如下:

  1. 首先从未同步watcherGroup中获取最多maxWatchersPerSyncwatcher对应的关注最小的Revision,并删除已经被压缩的watcher
  2. 使用上面得到的最小Revision去获取所有key-val对;
  3. 对所有key-val对,去寻找关注其变化的watcher最终生成WactherBatch
  4. 发送给客户端,然后将watcher从unsync WatcherGroup中移动到sync WatcherGroup中。对于未发送完成的消息添加到victims中,在syncVictimsLoop中重试。

接下来即系了解syncVictimsLoop的实现逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
for { // 循环执行moveVictims方法
for s.moveVictims() != 0 {
}
isEmpty := len(s.victims) == 0
var tickc <-chan time.Time
if !isEmpty {
tickc = time.After(10 * time.Millisecond)
}
select {
case <-tickc:
case <-s.victimc:
case <-s.stopc:
return
}
}
s.moveVictims
|- 对victims中每一项:
|- 首先尝试进行发送`w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev})`。不成功的话,则加到`newVictim`
|- 其后根据当前集群情况将其加入到`unsynced WatcherGroup`中(w.minRev <= curRev)或者`synced WatcherGroup`

6.2 创建watch

当通过客户端执行如下命令etcdctl watch /ty/dj --prefix命令时,客户端与服务端建立grpc双工通道进行交互,其最终会调用watchServer.Watch方法,在其中建立双工通道交互方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
sws := serverWatchStream{
clusterID: ws.clusterID,
memberID: ws.memberID,
maxRequestBytes: ws.maxRequestBytes,
sg: ws.sg,
watchable: ws.watchable,
ag: ws.ag,
gRPCStream: stream,
watchStream: ws.watchable.NewWatchStream(),
ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen),
progress: make(map[mvcc.WatchID]bool), // 记录watcher是否需要发送精度
prevKV: make(map[mvcc.WatchID]bool), // 是否需要查询历史值
fragment: make(map[mvcc.WatchID]bool), // 是否需要分片
closec: make(chan struct{}),
}
go sws.sendLoop() // 异步建立起发送消息的过程
go sws.recvLoop() // 异步接收请求,并进行处理

首先来看sws.recvLoop处理过程,通过grpcstream通道获取请求,然后调用watchStream.Watch方法创建watch,并注册到相应的WatcherGroup上。

1
2
3
4
5
6
7
8
for ;;
req, err := sws.gRPCStream.Recv() // 接收请求
switch uv := req.RequestUnion.(type) {
case *pb.WatchRequest_CreateRequest:
id, err := sws.watchStream.Watch(mvcc.WatchID(creq.WatchId), creq.Key, creq.RangeEnd, rev, filters...) // 注册一个Watch
|- watchableStore.watch
|- synced := startRev > s.store.currentRev || startRev == 0 //查看需要同步
|- 如果不需要,则放入`synced WatcherGroup`中,否则放入`unsynced WatcherGroup`

再来看发送流程:
通过watcher.send发送变更消息的时候,实际上是传递到watcherch通道上,而这个通道则是serverWatchStream的发送通道:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
select {
case wresp, ok := <-sws.watchStream.Chan():
evs := wresp.Events
events := make([]*mvccpb.Event, len(evs))
sws.mu.RLock()
needPrevKV := sws.prevKV[wresp.WatchID]
for i := range evs {
events[i] = &evs[i]
if needPrevKV { // 对于需要查询历史版本的数据
opt := mvcc.RangeOptions{Rev: evs[i].Kv.ModRevision - 1}
r, err := sws.watchable.Range(evs[i].Kv.Key, nil, opt)
if err == nil && len(r.KVs) != 0 {
events[i].PrevKv = &(r.KVs[0])
}
}
}
//是否需要分割发送
fragmented, ok := sws.fragment[wresp.WatchID]
if !fragmented && !ok { //不需要是,则直接发送
serr = sws.gRPCStream.Send(wr)
} else {
serr = sendFragments(wr, sws.maxRequestBytes, sws.gRPCStream.Send)
}
sws.mu.Lock()
if len(evs) > 0 && sws.progress[wresp.WatchID] {
// elide next progress update if sent a key update
sws.progress[wresp.WatchID] = false
}
sws.mu.Unlock()
//...
}

6.3 键值对更改

建设有两个客户端A,B
A客户端先执行了操作etcdctl watch /ty/dj;B客户端随后执行了操作etcdctl put /ty/dj hello。那么在B执行该操作时watcher机制是如何work的?
对于put操作,会执行storeTxnWrite.put方法,会将更新的数据添加到 tw.changes中,最后会执行watchableStoreTxnWrite.End方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
changes := tw.Changes()
rev := tw.Rev() + 1
evs := make([]mvccpb.Event, len(changes))
for i, change := range changes { // 将变更转换为事件
evs[i].Kv = &changes[i]
if change.CreateRevision == 0 {
evs[i].Type = mvccpb.DELETE
evs[i].Kv.ModRevision = rev
} else {
evs[i].Type = mvccpb.PUT
}
}
tw.s.notify(rev, evs) // 调用 watchableStore.notify方法
|- for w, eb := range newWatcherBatch(&s.synced, evs) {
if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) { // 发送更新
} else {
// move slow watcher to victims
w.minRev = rev + 1
if victim == nil {
victim = make(watcherBatch)
}
w.victim = true
victim[w] = eb
s.synced.delete(w)
slowWatcherGauge.Inc()
}
}
s.addVictim(victim)

总结过程如下:在进行put操作时会将变更添加到changes中。当put操作结束时,执行watchableStoreTxnWrite.End方法,转换成newWatcherBatch事件,然后调用watchableStore.notify方法进行发送更新。

7. 集群管理

7.1 集群间数据同步

7.2 变更节点

7.2.1 新增节点

7.2.2 下线节点

7.3 节点宕机

7.3.1 主节点宕机

7.3.2 follower节点宕机

7.4 节点迁移、替换

您的支持是我创作源源不断的动力