ETCD事务请求

4. 事务请求

[TOC]

本节我们将讲述 etcd集群的事务请求处理过程。首先我们将以put操作为例来讲解一般性事务请求过程。其次,我们将介绍 etcd特有的线性一致性读请求过程。

put 请求

通过 etcdctl put ty dj 命令,向本地监听2380端口的节点发送 写请求。
该请求为GRPC请求,会中转到quotaKVServer进行处理。

Leader接收请求,并广播消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
quotaKVServer.Put
|- s.qa.check(ctx,r) //检查是否满足quota,即是否有足够的内存,如果没有则忽略请求,并发报警
|- EtcdServer.Put // 调用EtcdServer进行写
|- s.raftRequest(ctx, pb.InternalRaftRequest{Put: r}) // EtcdServer将请求进行包装成 `InternalRaftRequest`
并调用`raftRequest`进行请求,并等待获取结果
|- s.processInternalRaftRequestOnce// 最终会调用此方法
|- if ci > ai+maxGapBetweenApplyAndCommitIndex //首先检查apply索引和 commit索引是否相差太大,若太大则忽略请求(自我保护机制)
|- 生成请求头,并生成请求ID `s.reqIDGen.Next()`
|- r.Marshal() // 对请求进行编码
|- ch := s.w.Register(id) // 注册请求响应回调
|- err = s.r.Propose(cctx, data) // 向 raftNode 发起提议,申请写入数据
|- select { // 等待结果,或超时
case x := <-ch:
return x.(*applyResult), nil
case timeout
}

接下来就进入了主要的处理流程:

1
2
3
4
5
node.Propose
|- n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}}) // 将请求包装成 `MsgProp`消息
|- n.stepWithWaitOption
|- pm := msgWithResult{m: m} // 将消息再一次封装
|- 将消息传递给 通道`n.propc`

可以看出 将请求两层封装之后传递给通道n.propc
node的循环中,其会监听proc通道,当有消息到来时:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
node.start | 360+L
|- m.From = r.id // 将自己的id设置到消息的Form字段里
|- err := r.Step(m) // 再交给raft处理
|- r.step(r, m) //此处就会根据节点的角色不同进行不同处理,若本节点是Follower则会转交给Leader节点进行处理,然后再进行如上的流程。这里假设本节点是主节点 即 stepLeader方法
|- r.appendEntry(m.Entries...) // 将该记录追加到`raftLog`中
|- r.increaseUncommittedSize(es) // 校验是否超过上界
|- r.raftLog.append(es...) // 追加到日志中记录下来(unstable中)
|- r.getProgress(r.id).maybeUpdate(li) //更新本节点的复制进度
|- r.maybeCommit() //对于单节点,尝试更新一次 raftLog中的 commit索引。//大多数节点都到达的索引
|- r.bcastAppend() // 广播消息
|- 对于每个其他成员节点,都执行 `r.sendAppend(id)`
|- pr := r.getProgress(to) //获取节点的复制进程
|- term, errt := r.raftLog.term(pr.Next - 1) // p.Next为该节点下次复制的索引号
|- ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize) // 获取需要复制给对方节点的记录
|- 叫这些信息都封装成`MsgApp`消息,进行发送
|- r.send(m)
|- r.msgs = append(r.msgs, m) // 将消息追加到了r.msgs中

当本节点将需要发送给每个成员节点的消息都放到r.msgs后,node.run会进入下一轮循环。此时:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
node.run
|- rd = newReady(r, prevSoftSt, prevHardSt) // 准备好发送的消息结构体 Ready
|- rd := Ready{
Entries: r.raftLog.unstableEntries(),
CommittedEntries: r.raftLog.nextEnts(),
Messages: r.msgs, // 消息在此进行包装
}
|- readyc = n.readyc
|- select {
case readyc <- rd: // 此处即将消息传递到 n.readyc通道中
//完成传递后,会进行一些状态的更新
if index := rd.appliedCursor(); index != 0 {
applyingToI = index // 提交消息的最大索引号
}
advancec = n.advancec // 赋值后,待本次消息发送完成后,进入下次循环。(等待 `raftNode`的通知)
case <-advancec:
// 得到通知后,即知道之前的已提交的消息都已经发送出去了可以应用了
if applyingToI != 0 {
r.raftLog.appliedTo(applyingToI)
applyingToI = 0
}
//...
}

发送到n.readyc通道中的消息,会被raftNode捕获:

1
2
3
4
5
6
7
8
9
10
for {
case rd := <-r.Ready():
// ...
if islead { // 发送消息给相应节点
r.transport.Send(r.processMessages(rd.Messages))
}
// ...
r.Advance() // 这里是通知 node.advancec 可以处理下一个循坏
// ...
}

以上是Leader角色接收到Put请求后的处理逻辑,其中会向 Follower发送MsgApp消息。

Follower接收到Leader的广播,进行处理回复

Follower收到消息后,会执行EtcdServer.Process方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
EtcdServer.Process
|- node.Step
|- node.step
|- n.recvc <- m // 最终传递给node.recvc通道
|- raft.Step // node监听node.recvc,接收到消息后调用 raft执行逻辑
|- stepFollower
|- r.handleAppendEntries(m) //处理追加记录
|- r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...) // 尝试追加消息
|- l.matchTerm(index, logTerm) // 校验 term 和index信息是否跟本节点一致
|- l.findConflict(ents) //找出冲突的索引号
|- l.append(ents[ci-offset:]...) // 追加记录到raftLog中(unstable中)
|- l.commitTo(min(committed, lastnewi)) // 更新 commitId
|- r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) // 向 Leader反馈 信息`MsgAppResp`和索引号
|- r.msgs = append(r.msgs, m) // 将消息追加到了r.msgs中

发送的消息最终被 raftNode接收:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
for {
case rd := <-r.Ready():
notifyc := make(chan struct{}, 1)
ap := apply{
entries: rd.CommittedEntries, // raft中已经提交的消息,
snapshot: rd.Snapshot,
notifyc: notifyc,
}

updateCommittedIndex(&ap, rh) // 更新 EtcdServer的CommitId
select {
case r.applyc <- ap: // 将应用信息传递到 raftNode.applyc通道中,其被EtcdServer所监听。后续详解
}
r.storage.Save(rd.HardState, rd.Entries) // 实际调用WAL的Save方法,将记录写入到文件中 // ... wal
r.raftStorage.Append(rd.Entries) // 将消息追加到 MemoryStorage中
if !islead {
notifyc <- struct{}{}
r.transport.Send(msgs)
}
r.Advance() // 这里是通知 node.advancec 可以处理下一个循坏
// ...
}

再看EtcdServer接收到apply消息后的处理:
再来看 EtcdServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
EtcdServer/1020L+-
select {
case ap := <-s.r.apply():
f := func(context.Context) { s.applyAll(&ep, &ap) }
sched.Schedule(f) // 顺序调用`s.appAll`方法
s.applySnapshot(ep, apply)
s.applyEntries(ep, apply) //这里是应用服务最重要的地方
|- 首先筛选出需要应用到此`EtcdServer`的记录
|- s.apply(ents, &ep.confState)//然后将这些记录应用到 `EtcdServer`中
|- 对于正常的记录(`EntryNormal`类型)
|- case raftpb.EntryNormal:
s.applyEntryNormal(&e)
|- s.consistIndex.setConsistentIndex(e.Index)// 设置一致性索引号
|- 解码消息 Unmarshal
|- ar = s.applyV3.Apply(&raftReq) // 应用到应用数据存储中
|- s.w.Trigger(id, ar) // 回调,通知该记录对应的请求
s.setAppliedIndex(e.Index)
s.setTerm(e.Term)

|- <-apply.notifyc // 接到raftNode的通知已经完成发送等操作
|- s.triggerSnapshot(ep) // 检查是否有 SnapshotCount操作,如果有则进行快照
}

总结起来,Follower节点接收到Leader节点的MsgApp应用到其节点中,然后向反馈Leader MsgAppResp消息,并带上自己的最大记录号。

Leader收到Follower的回复进行处理

Leader收到MsgAppResp消息后,经过层层传递到stepLeader方法中:

1
2
3
4
5
6
7
8
9
10
|- pr := r.getProgress(m.From) // 获取该节点的进度
|- switch m.Type {
case pb.MsgAppResp:
pr.RecentActive = true //设置该节点状态
|- pr.maybeUpdate(m.Index) //更新该节点在 主节点中的进度信息(已经复制到什么位置了)
|- 根据该节点的状态进行相应操作
|- if r.maybeCommit() { // 判断是否可以提交
r.bcastAppend() // 若可以提交,则广播消息
}
}

我们先来看判断是否可以提交的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
raft.maybeCommit
mis := r.matchBuf[:len(r.prs)]
idx := 0
// 首先将每个成员节点确认的同步消息索引号放入数组中
for _, p := range r.prs {
mis[idx] = p.Match
idx++
}
// 按索引号排序
sort.Sort(mis)
// 获取 len(mis)-r.quorum()位的索引位置
mci := mis[len(mis)-r.quorum()]
return r.raftLog.maybeCommit(mci, r.Term) //判断是否比raftLog中的CommitId大,若是则表示有数据可以提交
|- if maxIndex > l.committed && l.zeroTermOnErrCompacted(l.term(maxIndex)) == term {
l.commitTo(maxIndex) // 更新commitId
}

r.bcastAppend()的处理过程之前以及介绍过,其会构造一遍Ready数据结构,带上CommitedEntries提交的数据、其他节点未同步的Entries,传递给raftNoderaftNode接收到此消息后,会执行与上面分析的Follower的逻辑类似。

线性一致性读

线性一致性读和一般的读不通的时候,会在进行真正的读请求之前,先与其他节点进行一个check,查看当前集群主是否发生了变化。通过etcdctl get ty命令,向监听本地2380端口的节点发送GRPC请求,会被中转到quotaKVServer上。
get请求在etcdctl中会被转换成Range请求。quotaKVServer最终会调用EtcdServer.Range方法。

Leader节点收到请求后,广播心跳消息

1
2
3
4
5
6
7
8
9
10
quotaKVServer
|- kvServer.Range
|- EtcdServer.Range
|- if !r.Serializable { //是否线性一致性
err = s.linearizableReadNotify(ctx)
if err != nil {
return nil, err
}
resp, err = s.applyV3Base.Range(nil, r)
}

线性一致性读的主要逻辑体现在 s.linearizableReadNotify中。

1
2
3
4
5
6
7
8
s.linearizableReadNotify  // 线性读通知
|- nc := s.readNotifier // 用于通知读协程可以进行读操作
|- select {
case s.readwaitc <- struct{}{}: //通知读等待通道
}
|- select {
case <-nc.c: // 等待读状态通知
}

EtcdServer在启动的时候会启动linearizableReadLoop线性一致性读循环。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
linearizableReadLoop
|- ctxToSend := make([]byte, 8)
|- id1 := s.reqIDGen.Next()
|- binary.BigEndian.PutUint64(ctxToSend, id1)
|- for ;; {
select {
case <-s.readwaitc: // 当接收到线性一致性读请求后,循环就可以往下走
}
nextnr := newNotifier() // 创建新的读通知器,给下一个读请求使用
nr := s.readNotifier
s.readNotifier = nextnr
err := s.r.ReadIndex(cctx, ctxToSend) // 调用`node.ReadeIndex`发起读索引请求
select {
case rs = <-s.r.readStateC:
done = bytes.Equal(rs.RequestCtx, ctxToSend)
case <-leaderChangedNotifier:
nr.notify(ErrLeaderChanged)
case <-time.After(s.Cfg.ReqTimeout()):
nr.notify(ErrTimeout)
timeout = true
slowReadIndex.Inc()
case <-s.stopping:
return
}
|- nr.notify(nil)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
node.ReadIndex
|- n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}}) //
|- raft.Step
|- raft.stepLeader
|- if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term { // 当本任期没有提交任何消息时,拒绝读
return nil
}
switch r.readOnly.option {
case `ReadOnlySafe`:
r.readOnly.addRequest(r.raftLog.committed, m) // 将只读请求添加到`readyOnly`结构里。
r.bcastHeartbeatWithCtx(m.Entries[0].Data)// 带上请求的ID,向所有成员节点发送心跳。
case `ReadOnlyLeaseBased`:
ri := r.raftLog.committed
if m.From == None || m.From == r.id { // from local member
r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
} else {
r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries})
}
}

Follower接收到Leader的心跳请求,并返回

Follower接收到Leader的心跳请求,最终会调用:

1
2
r.raftLog.commitTo(m.Commit)
r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context}) // 发送心跳回复

Leader接收Follower回复,通知客户端请求继续

Leader节点接收到Follower的回复后,最终会调用如下逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
pr.RecentActive = true
if pr.Match < r.raftLog.lastIndex() { // 当Follower节点有数据没复制时,进行发送
r.sendAppend(m.From)
}
// 操作readyOnly接收Ack,并返回有多少节点已经确认
ackCount := r.readOnly.recvAck(m)
if ackCount < r.quorum() {
return nil
}
// 返回有哪些请求可以继续往下走
rss := r.readOnly.advance(m)
for _, rs := range rss {
req := rs.req
if req.From == None || req.From == r.id {
// 将可以读的请求信息放到 r.readStates中,等后面提交到 ReadyC结构体,向上提交进行处理。
r.readStates = append(r.readStates, ReadState{Index: rs.index, RequestCtx: req.Entries[0].Data})
}
}
func (ro *readOnly) recvAck(m pb.Message) int {
rs, ok := ro.pendingReadIndex[string(m.Context)] // m.Context 为请求ID
if !ok {
return 0
}
rs.acks[m.From] = struct{}{}
return len(rs.acks) + 1 // 返回已经有多少Follower进行了应答
}
// 返回容许读的请求ID列表
func (ro *readOnly) advance(m pb.Message) []*readIndexStatus {
var (
i int
found bool
)
ctx := string(m.Context)
rss := []*readIndexStatus{}
for _, okctx := range ro.readIndexQueue {
i++
rs, ok := ro.pendingReadIndex[okctx]
rss = append(rss, rs)
// 找到匹配的读,以及可以触发读的请求队列。
if okctx == ctx {
found = true
break
}
}
if found {
ro.readIndexQueue = ro.readIndexQueue[i:]// 剔除之前的读请求
for _, rs := range rss {
delete(ro.pendingReadIndex, string(rs.req.Entries[0].Data))
}
return rss
}
return nil
}

raftNode结构体接收到Ready消息时,会检查是否有ReadStates,然后对其进行处理:

1
2
3
4
if len(rd.ReadStates) != 0 {
select {
case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]:
}

再回到linearizableReadLoop中:

1
2
3
4
5
6
7
8
9
10
11
select {
case rs = <-s.r.readStateC:
case <-leaderChangedNotifier:
nr.notify(ErrLeaderChanged)
case <-time.After(s.Cfg.ReqTimeout()):
nr.notify(ErrTimeout)
case <-s.stopping:
return
}
}
nr.notify(nil) // 通知客户端可以进行查询请求

到此就完成线性一致性读的全过程。

您的支持是我创作源源不断的动力