Zookeeper 选举算法

3. 选举算法与实现

第2节中,介绍了在做选举之前做的各服务器之间的通讯初始化。本章将在此基础上,介绍选举的过程。

其大体过程如下:

  1. 投票节点:首先将节点状态转变成LOOKING状态;

  2. 投票节点:将自己作为(SID、ZXID)广播给其他节点;

  3. 其他节点:接收其他节点的选举投票;

  4. 其他节点:如果接收的选举纪元大于本地的选举轮数则更新本地选举轮数,并将接收的投票跟本地投票比较后更新本地投票,再广播出去。否则执行4

  5. 其他节点:判断外部投票是否优于本地,如果是则更新本地提议再广播出去;

    1. 比较纪元大小(最新日志记录的纪元,并不是当前选举纪元);

    2. 纪元相等的情况,比较 Zxid

    3. Zxid相等的情况下 则比较 Sid

      return ((newEpoch > curEpoch) ||

      ((newEpoch == curEpoch) &&
      ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
  6. 投票节点:收集投票提议,并进行统计,若有节点收集的投票大于一半以上则认为该节点为Leader节点;

  7. 投票节点:投票选定节点是该节点则转变自己的服务器状态为LEADING,若不是则转换成FOLLOWING | OBSERVING

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
public Vote lookForLeader() throws InterruptedException {
// sid 2 vote
// 用于收集LOOKING、FOLLOWING、LEADING状态下的server的投票
Map<Long, Vote> recvset = new HashMap<Long, Vote>();
// 用于收集FOLLOWING、LEADING状态下的server的投票(能够收集到这种状态下的投票,说明leader选举已经完成)
// sid 2 vote
Map<Long, Vote> outofelection = new HashMap<Long, Vote>();
int notTimeout = finalizeWait;
synchronized(this) {
logicalclock.incrementAndGet();
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
// 向所有节点广播投票信息(投自己)
sendNotifications();
while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)) {
// 接收消息
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
// 超时没有收到任何消息
if(n == null) {
if(manager.haveDelivered()){
// 再次发送广播
sendNotifications();
} else {
//尝试连接
manager.connectAll();
}
int tmpTimeOut = notTimeout * 2;
notTimeout = (tmpTimeOut < maxNotificationInterval?
tmpTimeOut : maxNotificationInterval);
continue;
}
if (validVoter(n.sid) && validVoter(n.leader)) {
switch (n.state) {
case LOOKING:
// ...
//如果收到的投票纪元大于本地的选举轮数那么更新本机
// If notification > current, replace and send messages out
if (n.electionEpoch > logicalclock.get()) {
logicalclock.set(n.electionEpoch);
recvset.clear();
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
//发送一次广播
sendNotifications();
// 小于本机的则不做处理
} else if (n.electionEpoch < logicalclock.get()) {
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
//如果 收到的投票信息大于本机的投票则更新本机且继续发送
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
//收集投票
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
//判断接收的消息里对 proposedLeader 的认可度是否大于一般,是则结束选举
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {

// 继续收集投票信息,以防万一
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null) {
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
recvqueue.put(n);
break;
}
}
// 设置本地服务的角色
if (n == null) {
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader,
proposedZxid, proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
// ...
}
} //...
}
return null;
}

再来详细看下,ZK是按什么判断投票胜出的:

1
2
3
4
5
6
7
8
9
10
11
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
if(self.getQuorumVerifier().getWeight(newId) == 0){
return false;
}
// 首先 比较 纪元
// 纪元相等的情况,比较 Zxid
// zxid相等的情况下 则比较sid
return ((newEpoch > curEpoch) ||
((newEpoch == curEpoch) &&
((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}

以上介绍的是整个集群处于启动阶段的选举过程。当集群已经存在一个Leader,而本地机器刚刚进入Leader选举,则会收到其他节点发来的非LOOKING状态通知:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public Vote lookForLeader() {
// ...
case FOLLOWING:
case LEADING:
//当其他节点都已经完成 选举,则给本地返回的消息state是 FOLLOWING 和 LEADING
if(n.electionEpoch == logicalclock.get()) {
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
if(termPredicate(recvset, new Vote(n.leader,
n.zxid, n.electionEpoch, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, n.electionEpoch)) {
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());

Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
outofelection.put(n.sid, new Vote(n.leader,
IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state));
if (termPredicate(outofelection, new Vote(n.leader,
IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, IGNOREVALUE)) {
synchronized(this){
logicalclock.set(n.electionEpoch);
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
}
Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
}
您的支持是我创作源源不断的动力