Zookeeper 初始化

2. 初始化过程

下面,将从客户端初始化和服务端初始化两个方面来讲解ZK的初始化过程:

客户端初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider, ZKClientConfig clientConfig) throws IOException {
if (clientConfig == null) {
clientConfig = new ZKClientConfig();
}
this.clientConfig = clientConfig;
// 观察者管理器
watchManager = defaultWatchManager();
watchManager.defaultWatcher = watcher;
ConnectStringParser connectStringParser = new ConnectStringParser(
connectString);
hostProvider = aHostProvider;

// ClientCnxn: client的通讯组件,负责socket通讯(发送请求等,且支持透明地切换到不同的Server)
// ClientCnxnSocket : 真正通讯的组件,ClientCnxnSocketNIO、ClientCnxnSocketNetty
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),// 客户端隔离命名空间,相当于试图模式。只对某个path的进行操作
hostProvider,// 服务器提供者列表
sessionTimeout,//会话超时时间
this,
watchManager, // 监听管理器
getClientCnxnSocket(),// 客户端套接字通讯工具 ClientCnxnSocket 比如 ClientCnxnSocketNetty
canBeReadOnly);
// 初始化
cnxn.start();
}

从上面可以看到最主要的即是 ClientCnxn,深入进去可以看到:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
this.sessionPasswd = sessionPasswd;
this.sessionTimeout = sessionTimeout;
this.hostProvider = hostProvider;
this.chrootPath = chrootPath;
connectTimeout = sessionTimeout / hostProvider.size();
readTimeout = sessionTimeout * 2 / 3;
readOnly = canBeReadOnly;
// 发送请求的线程
sendThread = new SendThread(clientCnxnSocket);
// 事件线程,当注册的节点发生变化时,server端会通知client,EventThread用来获取通知,并触发相应的Watcher
eventThread = new EventThread();
this.clientConfig = zooKeeper.getClientConfig();
}

public void start() {
sendThread.start();
eventThread.start();
}

SendThread是负责发送请求,而EventThread负责接收并处理事件。
ZK Client发送请求的过程是非常简单的:将请求进行包装成Packet,放进双向阻塞队列outgoingQueue中。然后SendThread不停地从其中拿出请求向Server发送请求。
其过程是在SendThreadrun()方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public void run() {
// 引入 outgoingQueue
clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = Time.currentElapsedTime();
// 10 秒发一次 Ping 操作
final int MAX_SEND_PING_INTERVAL = 10000;
InetSocketAddress serverAddress = null;
while (state.isAlive()) {
// 链接断开时,重连
if (!clientCnxnSocket.isConnected()) {
if (closing) {
break;
}
if (rwServerAddress != null) {
serverAddress = rwServerAddress;
rwServerAddress = null;
} else {
//如果所有的host都试了一遍,则停顿 1s
serverAddress = hostProvider.next(1000);
}
// 链接服务器,首次连接时同时
startConnect(serverAddress);
clientCnxnSocket.updateLastSendAndHeard();
}
// ...
if (state.isConnected()) {
int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -
((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
//当有 MAX_SEND_PING_INTERVAL秒没发送请求,则发送 ping 心跳
sendPing();
clientCnxnSocket.updateLastSend();
} else {
if (timeToNextPing < to) {
to = timeToNextPing;
}
}
}
//对于只读的则重试ping到读写服务上
if (state == States.CONNECTEDREADONLY) {
long now = Time.currentElapsedTime();
int idlePingRwServer = (int) (now - lastPingRwServer);
if (idlePingRwServer >= pingRwTimeout) {
lastPingRwServer = now;
idlePingRwServer = 0;
pingRwTimeout =
Math.min(2*pingRwTimeout, maxPingRwTimeout);
pingRwServer();
}
to = Math.min(to, pingRwTimeout - idlePingRwServer);
}
// clientCnxnSocket 从 outgoingQueue中取请求发送
clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
}
// ...
}

SendThread中不仅有调用clientCnxnSocket发送请求的逻辑,同时也维护着心跳逻辑,10秒内如果没有发送请求则发送一次心跳。
另外,值得一提的是,通过startConnect(serverAddress)完成与服务端的连接后,会触发一次sendThread.primeConnection(),它的作用是向服务端发起一次创建Session请求或者续联。另外,如果配置了zookeeper.disableAutoWatchReset = false会将本地的Watcher重新向连接的服务端注册一遍。

当服务端返回时,则由ClientCnxnSocketNetty.ZKClientHandler进行处理,而其最终会调用SendThread.readResponse。在readResponse会根据返回的内容进行相应处理。如果返回的Xid为-1时,表示事件通知,反序列化后触发相应的Wacther

服务端初始化

服务端分为两种模式,单机模式和集群模式。后文重点介绍集群模式。
ZK中集群模式下,单实例启动类为QuorumPeerMain
首先调用initializeAndRun进行配置参数解析,然后开启数据定时清理任务,定时删除过期的快照文件等;
其次调用runFromConfig方法,进行实例化QuorumPeer,并初始化其参数。最后调用QuorumPeer.start方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public void runFromConfig(QuorumPeerConfig config)
throws IOException, AdminServerException {
ServerCnxnFactory cnxnFactory = null;
//创建 服务端通讯组件 NIOServerCnxnFactory、NettyServerCnxnFactory
if (config.getClientPortAddress() != null) {
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns(),
false);
}
// ...
quorumPeer = getQuorumPeer();
// 文件存储(事务日志与文件存储)
quorumPeer.setTxnFactory(new FileTxnSnapLog(
config.getDataLogDir(),
config.getDataDir()));
// 选举算法
quorumPeer.setElectionType(config.getElectionAlg());
// sid
quorumPeer.setMyid(config.getServerId());
//tickTime 时钟,最小时钟单位
quorumPeer.setTickTime(config.getTickTime());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
if (config.getLastSeenQuorumVerifier()!=null) {
quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
}
quorumPeer.initConfigInZKDatabase();
quorumPeer.setCnxnFactory(cnxnFactory);
// PARTICIPANT, OBSERVER
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.setSyncEnabled(config.getSyncEnabled());
// 法定人数是否监听所有ip
quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
quorumPeer.initialize();
// ...
// 初始化
quorumPeer.start();
quorumPeer.join();
}

下面来重点看QuorumPeer的初始化过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public synchronized void start() {
// 恢复数据,从快照里读,读完后再从事务日志中读;
// 读取纪元,并做校验
loadDataBase();
//绑定端口,开启面向client端的服务
startServerCnxnFactory();
//初始化选举算法,并初始化选举通讯组件
startLeaderElection();
super.start();
}

private void loadDataBase() {
// 恢复数据,从快照里读,读完后再从事务日志中读
zkDb.loadDataBase();
// load the epochs
long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
// 获取纪元
long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
}

其过程主要分为以下几步:

  1. 从快照文件中恢复原始数据,并读取 Epoch信息
  2. 绑定端口,开始接收请求;
  3. 初始化选举算法,并初始化选举通讯组件;
  4. 运行QuorumPeer.run方法启动服务:选举以及确定角色后提供服务。

对于 1 ,将在存储章节详细讲解。另外,因为选举算法是 ZK 的核心内容,也将另起一章进行介绍。下文将重点分析 选举的初始化工作以及选举后的服务器角色初始化工作。

首先来看选举初始化:

初始化过程会实例化类 QuorumCnxManager.Listener ,其会起一个 ServerSocekt 进行监听端口,接收其他 Participant 发给自己的选举相关请求。随后初始化FastLeaderElection对象,其内部会初始化两个阻塞队列sendqueue(发送队列)、recvqueue(接收队列)。同时实例化类Messenger,其内部会开启两个线程WorkerSenderWorkerReceiver,其工作原理如下:

  1. 发送:当选举算法中有发送请求时,会将请求放入sendqueue中,而WorkerSender会从sendqueue中拉取请求,然后交给QuorumCnxManagerQuorumCnxManagersid 调用QuorumCnxManagersendQueue队列中的发送请求ToSend按sid丢给queueSendMap.get(sid)对应的队列中,若服务器与sid之间没有创建连接则创建连接,并同时启动对应的RecvWorkerSendWorkerSendWorker从阻塞队列queueSendMap.get(sid)中获取请求进行socket发送;
  2. 接收:刚介绍过QuorumCnxManagerRecvWroker 用于接收请求,将请求包装进阻塞队列recvQueue中,而后FastLeaderElection.WorkReceiver线程会从中取请求进行消费。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
protected Election createElectionAlgorithm(int electionAlgorithm) {
Election le = null;
// 默认为 3
switch (electionAlgorithm) {
// ...
case 3:
// QuorumCnxManager 连接管理器,为每对服务器维持有且仅有一个连接,sid大的向sid小的建立连接。
qcm = createCnxnManager();
QuorumCnxManager.Listener listener = qcm.listener;
// 监听服务端请求,接收请求
listener.start();
FastLeaderElection fle = new FastLeaderElection(this, qcm);
//
fle.start();
le = fle;
break;
// ...
return le;
}
public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){
this.stop = false;
this.manager = manager;
starter(self, manager);
}
private void starter(QuorumPeer self, QuorumCnxManager manager) {
this.self = self;
proposedLeader = -1;
proposedZxid = -1;
//发送队列
sendqueue = new LinkedBlockingQueue<>();
//接收队列
recvqueue = new LinkedBlockingQueue<>();
this.messenger = new Messenger(manager);
}

初始化完选举算法后,服务开始正式启动运行。
服务启动开始时,是LOOKING状态。选举完成后就会变成OBSERVING|FOLLOWING|LEADING状态。
LOOKING阶段,如果配置了readonlymode.enabled则在选举完成前开启只读服务ReadOnlyZooKeeperServer,当选举完成后卸载只读服务。
调用 makeLEStrategy().lookForLeader()开启选举。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
public void run() {
updateThreadName();
//... 向 jmx注册 服务节点
while (running) {
switch (getPeerState()) {
case LOOKING:
LOG.info("LOOKING");
// readOnly开启状态下,允许在初始化完成前接收读请求
if (Boolean.getBoolean("readonlymode.enabled")) {
LOG.info("Attempting to start ReadOnlyZooKeeperServer");
// 先开启个readOnly服务提供读服务
final ReadOnlyZooKeeperServer roZk =
new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);
Thread roZkMgr = new Thread() {
public void run() {
try {
// lower-bound grace period to 2 secs
sleep(Math.max(2000, tickTime));
if (ServerState.LOOKING.equals(getPeerState())) {
roZk.startup();
}
} catch (InterruptedException e) {
LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
} catch (Exception e) {
LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
}
}
};
try {
roZkMgr.start();
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
// 开始运行选举算法
setCurrentVote(makeLEStrategy().lookForLeader());
} finally {
// 选主结束后关闭 读服务
roZkMgr.interrupt();
roZk.shutdown();
}
} else {
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
setCurrentVote(makeLEStrategy().lookForLeader());
}
break;
case OBSERVING:
try {
setObserver(makeObserver(logFactory));
observer.observeLeader();
} finally {
observer.shutdown();
setObserver(null);
updateServerState();
}
break;
case FOLLOWING:
try {
setFollower(makeFollower(logFactory));
follower.followLeader();
} finally {
follower.shutdown();
setFollower(null);
updateServerState();
}
break;
case LEADING:
// 设置为leading开启
try {
setLeader(makeLeader(logFactory));
//开启acceptor接收follwer和observer的请求
leader.lead();
setLeader(null);
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
updateServerState();
}
break;
}
start_fle = Time.currentElapsedTime();
}
//...
}

当选举完成后,每个服务器就知道自己的角色,是OBSERVERFOLLOWER还是LEADER

  1. Observer:当服务器角色是Observer时,其首先创建Observer以及ObserverZooKeeperServer,然后调用observeLeaderLeader建立连接。一开始发送 OBSERVERINFO 获取LeadernewLeaderZxid,然后开始与Leader的同步syncWithLeader,数据同步分为三种类型DIFFSNAPTUNC,具体后文将详细介绍。完成同步后,接收到Leader发来的UPTODATE命令,将ObserverZooKeeperServer与之前的ServerCnxnFactory绑定上(当有客户端请求来时,会将请求转发到ObserverZooKeeperServer,其他角色也是同样的道理)。最后调用xxZooKeeperServer.startup()建立起每个Server的处理器链路(每个角色最大差别即表现在处理器链上),具体见ZK角色介绍
  2. Follower:其与Observer的过程大致相同,不同点在于一开始其向Leader发送的命令是Follwer
  3. Leader:在创建Leader时,会开启一个SeverSocket,并监听端口接收请求。Leader.lead方法则开始初始化Leader,首先加载数据库,然后开启线程LearnerCnxAcceptor,接收来自Learner的请求。当接收到一半以上的NEWLEADER后才开始启动服务对外服务。一样也会调用LeaderZookeeperServer.startup建立起处理链。最后会开启一个ping服务,每两个tick一个周期。
    • 对于每个连接上来的LearnerLeader都会创建一个LearnHandler。其负责与Learner间的交互。数据同步等。
    • 初始化完成后,就开始接收Learner发送的请求。

每个角色最大的不同即是其所对应的ZookeeperServer不同,而每个ZookeeperServer的本质不同在于其处理器链的不同。

  • Leader的处理链:

  • Follower的处理链:

  • Observer的处理链

每个处理器的作用如下:

Reference:

  • 《从Paxos到ZooKeeper 分布式一致性原理与实践》
您的支持是我创作源源不断的动力