ETCD初始化

1. Etcd冷启动

1.1 初始化主流程

Etcd的启动类为 父目录的main.go文件。其启动过程调用如下:

1
2
3
4
5
6
main.go
|- etcdmain/main.go(暂且忽略`gateway``proxy`模式启动)
|- checkSupportArch // 检查是否是支持的处理器架构
|- startEtcdOrProxyV2 // 解析参数并根据参数决定启动etcd节点还是按Proxy模式启动(这里按etcd节点形式启动)
|- 生成默认参数`newConfig()`解析参数 `cfg.parse(os.Args[1:])`解析命令行启动参数
|- startEtcd(&cfg.ec)// 启动过程最核心的地方

startEtcd中执行etcd启动的主要过程:

1
2
3
4
5
6
7
8
9
10
11
12
embed.startEtcd(inCfg *Config)
|- inCfg.Validate() //校验配置,检查url是否是以ip地址开头的,否则报错终止流程
|- configurePeerListeners(cfg) // 根据配置初始化peerListener结构体(为`peer`服务的服务器配置)
|- configureClientListeners(cfg) // 根据配置初始化clientListener结构体,(为`client`服务的服务器配置)
|- 通过判断是否有 `wal`文件来判断是否已经有其他节点信息
|- cfg.PeerURLsMapAndToken("etcd")方法,其用于解析出其他节点的信息。
|- etcd集群模式有三种启动方式,其具体实现即实现在其内部。(`后续我们将详细分析`
|- etcdserver.NewServer(srvcfg) // 执行etcdServer初始化
|- e.Server.Start() // 启动 etcdServer
|- e.servePeers() // 启动 监听etcd节点间请求的GRPC服务
|- e.serveClients() // 启动监听客户端请求的GRPC服务
|- e.serveMetrics() // 启动Metrics Http服务,供外部查询Metrics信息

通过 cfg.PeerURLsMapAndToken(“etcd”) 逻辑,了解到etcd有三种方式来获取集群中其他节点信息:

1
2
3
4
5
switch {
case cfg.Durl != "":// etcd自发现模式:配置 “discovery”参数设置。这里没有真正获取集群所有节点。比较trick的作用,等后面来处理。
case cfg.DNSCluster != "":// 通过DNS自发现模式,配置”discovery-srv“
default:// 默认的静态配置方式,通过参数 "initial-cluster"进行设置
}

准备好创建etcd节点后开始初始化节点信息:

1
2
3
4
5
6
etcdserver/NewServer(srvcfg)
|- fileutil.TouchDirAll(cfg.DataDir) //检查是否可以获取目录权限:DataDir:"集群名.etcd"
|- fileutil.TouchDirAll(cfg.SnapDir()) // 检查是佛偶可以获取快照目录权限 ”集群名.etcd/member/snap“
|- snap.New(cfg.Logger, cfg.SnapDir()) //创建快照管理器
|- be := openBackend(cfg) // 创建数据库后端,其底层使用boltDB存储数据,然后在其之上进行了一次封装:包括批量提交事务。(关于存储的内容,我们后面讲单独讲解)
|- rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.peerDialTimeout()) // 创建一个RoundTripper,其作用是封装一个具有执行一次http事务,为一个http request获取response的对象

初始化快照管理器和数据库后端后,就会根据一系列条件来决定怎样启动节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
switch {
case !haveWAL && !cfg.NewCluster:// 非新集群且也没有WAL文件
// 首先进行一些校验工作:比如判断本地集群成员列表是否与远程其他节点的成本列表配置是否相同
// 判断集群每个节点的版本是否兼容
id, n, s, w = startNode(cfg, cl, nil)
case !haveWAL && cfg.NewCluster: // 新集群,且没有WAL文件
// 此处就是接着 PeerURLsMapAndToken的处理,如果配置`Discovery`参数,etcd则进行自发现流程 `v2discovery.JoinCluster`
id, n, s, w = startNode(cfg, cl, cl.MemberIDs())
case haveWAL: // 非新集群,且有WAL文件
// 首先读取快照文件
// 从快照文件中恢复数据库后端
snapshot, err = ss.Load()
// 调用restartNode方法重启节点
id, cl, n, s, w = restartNode(cfg, snapshot)
// 其再从wal内进一步恢复内容
default: //异常情况
}

创建完raft.Node并绑定相应raft后,继续初始化:

1
2
3
4
5
6
7
8
9
10
11
12
|- stats.NewServerStats // 初始化统计计数
|- stats.NewLeaderStats
|- heartbeat := time.Duration(cfg.TickMs) * time.Millisecond // 初始化心跳参数
|- 创建 `EtcdServer`:同时创建 `RaftNode`、初始化ID生成器
|- newRaftNode
|- lease.NewLessor //创建或恢复租约管理器
|- srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, &srv.consistIndex) // 创建KV存储器,其封装了backend、TreeIndex、Lessor,另外其内部也运行着 Watch 机制。(后续独立章节详细解析)
|- srv.applyV3Base = srv.newApplierV3Backend() // 创建 ApplierV3 接口实现,其用于处理v3 raft消息,为应用层的操作。
|- restoreAlarms // 恢复报警,如要是节点存储的报警:如空间不足、崩溃时替换`applyV3`做配置限制或拒绝请求
|- tr := &rafthttp.Transport{...} // 创建Transporter,其用于向其他节点发送raft消息,并从其他节点获取raft消息。
|- 每个remotes执行:tr.AddRemote(m.ID, m.PeerURLs) // 初始化与每个Learner的通信
|- 每个Members执行:tr.AddPeer(m.ID, m.PeerURLs) // 初始化与每个Member节点的通信

下面来看raftNode的详细创建过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
newRaftNode(
raftNodeConfig{
Node: n,
heartbeat: heartbeat,
raftStorage: s,
storage: NewStorage(w, ss),
},
)
func newRaftNode(cfg raftNodeConfig) *raftNode {
r := &raftNode{
lg: cfg.lg,
tickMu: new(sync.Mutex),
raftNodeConfig: cfg,
td: contention.NewTimeoutDetector(2 * cfg.heartbeat),
readStateC: make(chan raft.ReadState, 1),
msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
applyc: make(chan apply),
stopped: make(chan struct{}),
done: make(chan struct{}),
}
// 开始心跳 Ticker
r.ticker = time.NewTicker(r.heartbeat)
return r
}

可以看出raftNoderaft.node之间的关系。通过raftNode可以直接访问raft.node的所有公有方法。

回到 startNode 方法,我们以新集群且没有WAL文件的场景来了解下startNode的处理过程:

1
2
3
4
5
6
7
8
9
10
11
etcdserver/raft.go/startNode
|- wal.Create() // 创建 WAL文件
|- raft.NewMemoryStorage() // 创建raft的数据存储器,这里为内存储存
|- n = raft/node.go/StartNode(c, peers) // 开始节点
|- r := newRaft(c) // 为当前节点创建raft对象
|- r.becomeFollower(1, None) //初始化当前节点成为 follower角色
|- 对每个节点,追加一条`ConfChangeAddNode`配置更改记录到 raftLog中
|- r.raftLog.committed = r.raftLog.lastIndex() // 更新raftLog的提交索引
|- for each peer r.addNode(peer.ID) //为每个peer在Raft中创建一个Progress结构体(没有创建时,若创建了则设置状态)来,来保存该peer的数据复制状态
|- 创建一个raft/node节点 // 创建node结构体,初始化各种通道
|- 异步执行 node.run(r *raft)方法// 其内部主要执行事项包括:轮训raft内需要周知其他节点的信息进行发送,监听node的各种通道(请求和响应)作相应处理,和任务超时通知通道

下面我们来详细了解下newRaft的内容:

1
2
3
4
5
6
7
8
newRaft(c *Config) *raft
|- c.validate() // 相关参数的校验,比如选举超时时间设置必须大于心跳超时时间设置
|- raftlog := newLogWithSize(c.Storage, c.Logger, c.MaxCommittedSizePerReady) // 创建raftLog结构体,其用于保存raft状态机信息,比如当前节点事务提交的最大log位置、已经应用到应用的最大索引位置、所有未提交不稳定的记录
|- `创建raft结构体`
|- 对于每个决策节点 `peers`,设置复制初始`Next`位置、复制滑动串口器
|- 对于非决策节点 `learnerPrs`,也进行初始化
|- r.becomeFollower(r.Term, None) // 将自身设置为 Follower角色,对于一个刚启动的节点,这里Term为0
|-

接下来,再来看 becomeFollower 方法,其设置了 step 方法和 tick 方法、设置了 raft所在任期以及raft的角色状态。我们都知道raft协议中共有三个角色FollowerCandidatesLeaderetcd中通过不同角色设置不同的step来区分开每个角色的处理逻辑,设置不同tick方法来设置超时任务(对于Follower角色,其超时后会发起新一轮选举,而对于Leader角色,则广播一次心跳消息… )

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (r *raft) becomeFollower(term uint64, lead uint64) {
r.step = stepFollower
r.reset(term)
r.tick = r.tickElection
r.lead = lead
r.state = StateFollower
}
func (r *raft) becomeCandidate() {
r.step = stepCandidate
r.reset(r.Term + 1)
r.tick = r.tickElection
r.Vote = r.id
r.state = StateCandidate
}
func (r *raft) becomeLeader() {
r.step = stepLeader
r.reset(r.Term)
r.tick = r.tickHeartbeat
r.lead = r.id
r.state = StateLeader
r.prs[r.id].becomeReplicate()
r.pendingConfIndex = r.raftLog.lastIndex()
emptyEnt := pb.Entry{Data: nil}
r.reduceUncommittedSize([]pb.Entry{emptyEnt})
}

到此,就完成了 EtcdServer的创建。

接下来,再来看EtcdServer的开始方法Start:

1
2
3
4
5
6
7
8
9
EtcdServer/Start
s.start() // 启动
s.goAttach(func() { s.adjustTicks() }) // 调整频率,启动时加快选举
s.goAttach(func() { s.publish(s.Cfg.ReqTimeout()) }) // 发布节点属性信息,以遍其他节点可查询
s.goAttach(s.purgeFile) // 启动异步任务做文件的合并操作:db文件\snap文件\WAL文件
s.goAttach(func() { monitorFileDescriptor(s.getLogger(), s.stopping) }) // 监控文件句柄数不能超过系统限制的80%
s.goAttach(s.monitorVersions)
s.goAttach(s.linearizableReadLoop) // 线性一致性读异步任务(后续详细讲解)
s.goAttach(s.monitorKVHash)

EtcdServer.start()方法,首先进行一系列通道的初始化,然后异步执行EtcdServer.run()方法:

1
2
3
4
5
6
EtcdServer.run
|- sched := schedule.NewFIFOScheduler() //首先创建一个先进先出的异步调度器
|- rh := &raftReadyHandler{...} // 创建一个raftReadHanlder用于处理一些节点信息操作的回调。
|- s.raftNode.start(rh) // 启动raftNode的异步处理流程
|- 开启循环,监听 raftNode的 applyC通道 以及租约过期通道 `s.lessor.expiredC`
|- 对于 `applyC`,当从其中获取消息时,通过先进先出异步调度器顺序执行 applyAll方法

展开 raftNode.start(rh)的逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
select {
case <-r.ticker.C: // 监听超时通知通道
r.tick()
case rd := <-r.Ready(): // 监听 raft.node的readyC通道,其为需要发送给其他节点或广播的消息
// 根据消息更新自身状态
// 然后应用到状态机中,最后发送给其他节点(非主节点时)或广播给其他节点(主节点)
if rd.SoftState != nil {
// 判断是否需要更新主节点信息
}
if len(rd.ReadStates) != 0 {
// 对于线性一致性读,进行通知回调
}

notifyc := make(chan struct{}, 1)
// 通过rh来更新提交索引号
updateCommittedIndex(&ap, rh)
select {
case r.applyc <- ap:
case <-r.stopped:
return
}
if islead {
r.transport.Send(r.processMessages(rd.Messages))
}
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
// ...
}
//...
r.raftStorage.Append(rd.Entries)
if !islead {
msgs := r.processMessages(rd.Messages)
// ...
r.transport.Send(msgs)
} else {
notifyc <- struct{}{}
}
r.Advance()
case <-r.stopped:
return
}

完成etcdServer的启动后,开始http/grpc服务对外提供服务(peer间的服务以及对client开放的服务)。
我们以servePeers()来讲解启动服务过程。

1
2
3
4
5
6
7
8
servePeers()
|- etcdhttp.NewPeerHandler //创建 `http.Handler`,用于处理 节点间 以`raft`、`raft/`、`/leases`、`/leases/internal`为前缀的http请求
|- 对于每个 `e.Peers`:执行以下操作
|- gs := v3rpc.Server(e.Server, peerTLScfg) // 创建grpcServer,然后注册服务描述信息,如kv、watch、lease、cluster、auth、maintenance。
|- m := cmux.New(p.Listener) // 创建连接多路转接器,用于转发连接到不同的服务里去处理,其工作原理后续讲解
|- go gs.Serve(m.Match(cmux.HTTP2())) // 筛选出http2的链接,并对其服务
|- go srv.Serve(m.Match(cmux.Any())) //对于剩下的连接,用http服务进行处理
|- 对每个 `e.Peers`.serve() 启动监听服务

当新连接到达时,处理流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
cMux.Server()
|- c, err := m.root.Accept() // 通过最原始的方式获取到达的连接
|- m.serve(c, m.donec, &wg) // 执行serve进行分发
|- for _, sl := range m.sls {// sl包装了 其自身的匹配器列表,和转发通道
for _, s := range sl.ss {// s为通过cMux.Match方法创建的匹配器
matched := s(muc.Conn, muc.startSniffing())
if matched { // 若该连接匹配上了,则将该连接通过连接发送到sl的连接接收通道里
//...
select {
case sl.l.connc <- muc:
case <-donec:
_ = c.Close()
}
return
}
}
}

到此整个初始化过程就完成了。其后开始进行选举,那么选举是哪里出发的呢?

1.2 选举

回到创建raftNode的地方 r.ticker = time.NewTicker(r.heartbeat) 开启了ticker。当时间到达时,tickder.C中得到通知。而其正在被 raftNodestart方法中的循环监听着。进一步就触发了raftNoe.tick()方法。

1
2
3
4
5
6
7
8
9
raftNoe.tick()
|- node.Tick()方法, 之前介绍过 raftNode与node之间的关系
|- n.tickc <- struct{}{} 通知 node.tickc通道
|- node的run循环中监听此通道,进一步调用 `raft.tick()`方法。此方法在 `becomeFollower`时设置成了 `tickElection`
|- r.Step(pb.Message{From: r.id, Type: pb.MsgHup}) // 到此开始发起选举
|- r.campaign(campaignElection) // 开始角逐 主节点角色
|- r.becomeCandidate // 更改节点角色为 候选者
|- r.step = stepCandidate...
|- r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: rL.lastIndex, LogTerm: r.raftLog.lastTerm(), Context: ctx}) // 对每个节点发送 `pb.MsgVote` 消息 携带本节点的任期,索引

当其他接收到该节点的投票请求时:

1
2
3
4
5
6
7
8
9
peer.go/startPeer(180L)
|- etcdServer.Pocess // 转发到ectdServer进行处理
|- node.Step // 调用 raftNode,并间接调用 raft.node
|- node.stepWithWaitOption
|- raft.Step
|- 首先进行判断是否可投票(之前投票给它了 || 没投票,且没有主 || 节点的任期比本节点大)
|- 判断是否要投票给请求投票的节点(请求节点`日志任期` 比本节点大,或者任期相等时 `日职索引Id` 是否比本节点大)
|- 上面条件都满足时,返回`MsgVoteResp`消息,告诉请求者,其同意投票给它
|- 但上述条件不满足时,则返回 拒绝消息给请求者

当节点收到 其他的投票反馈消息时,最终会调用 raft.go/stepCandidate方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// poll方法传进去本消息的投票,返回已经有多少赞成票
gr := r.poll(m.From, m.Type, !m.Reject)
switch r.quorum() {// 当达到quorum个时,进行角色转变
case gr:
if r.state == StatePreCandidate {
r.campaign(campaignElection)
} else {
// 转换为主节点
r.becomeLeader()
// 广播消息
r.bcastAppend()
}
case len(r.votes) - gr:
r.becomeFollower(r.Term, None)
}

通过上面的逻辑,可以看出,当投票数达到quorum数时,转变角色为主节点。同时向所有其他节点广播本节点状态以及记录信息(MsgApp),其他节点接收到此消息后,自动转变为 follower角色,整个集群初始化完成。

总结选举过程如下:

  1. 每个节点启动时作为Follwer角色,经过一个ElectionTimeout(启动时随时生成的 默认150ms~250ms)后进入Candidate状态,并向其他节点发送消息MsgVote消息(本节点的Term、最新的日志Index,最新日志的LogTerm);
  2. 其他节点收到投标后,进行两步判断:
    1. 预判断,:
      1. r.Vote==m.Form(是否已经投标给该节点)
      2. 当前节点未投标且无leader节点;
      3. 对于预投标(PreVote),消息的任期比当前节点大;
    2. 任何一个条件判断则可以进入正式判断
      1. 若消息中日志Term 对当前节点大则投给消息发送方,若日志Term相等,Index比当前节点大则也将票投给消息发送方r.raftLog.isUpToDate(m.Index, m.LogTerm)
    3. 若上面的判断失败,则返回拒绝
  3. 当发起投标方收到半数以上的头条,则转换自己的角色becomeLader (raft.go/stepCandidate),并广播一条空消息给其他所有节点。

对于etcd的选举,还需要说明的是,etcd为了解决网络分区的情况下某些节点不停加入集群导致抖动的情况,设置PreVote流程(只需要启动节点的时候 设置 pre-vote 参数)。即在进行真正的选举之前 先进行PreVote得到大多数节点同意选举之后才进行真正的选举。可以解决如下问题:

  • 对于网络分区的节点,在重新加入集群的时候不会中断集群;(因为获取不了大部分节点的许可,索引其Term无法增大,所以赢不了选举主节点)。

到此,etcd的启动到建立集群、完成选举的整个过程就介绍完了。

附加图:
下图为 EtcdServerraftNoderaft.noderaft间的联系。

最后补充说明下etcdproxy模式:
etcd可以通过命令./etcd –proxy on –listen-client-urls的形式启动代理模式。代理模式下,它的作用是一个反向代理,接收客户端请求,然后转发到etcd集群。
代理模式有2种运行形式:readwritereadonly,默认情况下为readwrite,即会将读写请求都进行转发,而readonly形式下,则只转发读请求,写请求将报5xx错误,

IDEA中启动ETCD方式:

1
2
3
4
debug方式运行三个终端程序 `etcd/main.go` 并设置如下参数:
--name infra1 --listen-client-urls http://127.0.0.1:2379 --advertise-client-urls http://127.0.0.1:2379 --listen-peer-urls http://127.0.0.1:12380 --initial-advertise-peer-urls http://127.0.0.1:12380 --initial-cluster-token etcd-cluster-1 --initial-cluster infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380 --initial-cluster-state new --enable-pprof --logger=zap --log-outputs=stderr
--name infra2 --listen-client-urls http://127.0.0.1:22379 --advertise-client-urls http://127.0.0.1:22379 --listen-peer-urls http://127.0.0.1:22380 --initial-advertise-peer-urls http://127.0.0.1:22380 --initial-cluster-token etcd-cluster-1 --initial-cluster infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380 --initial-cluster-state new --enable-pprof --logger=zap --log-outputs=stderr
--name infra3 --listen-client-urls http://127.0.0.1:32379 --advertise-client-urls http://127.0.0.1:32379 --listen-peer-urls http://127.0.0.1:32380 --initial-advertise-peer-urls http://127.0.0.1:32380 --initial-cluster-token etcd-cluster-1 --initial-cluster infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380 --initial-cluster-state new --enable-pprof --logger=zap --log-outputs=stderr
您的支持是我创作源源不断的动力