Zookeeper 数据存储

4. 数据与存储

ZkDatabaseZK的内存数据库,其负责ZK的所有会话、DataTree存储和事务日志。其会定时向磁盘 dump 快照数据,同时在 ZK 启动的时候,会通过磁盘上的事务日志和快照数据恢复内存数据。
ZK 中的数据存储分为两部分:内存数据存储、磁盘数据存储。其中内存数据中最核心的及时DataTree,而磁盘数据包括 快照数据和事务日志。

内存数据

DataTree是一个“树”形结构,按路径形成一个树形结构。其内部结构如下:

1
2
3
4
5
6
7
8
ConcurrentHashMap<String, DataNode> nodes =
new ConcurrentHashMap<String, DataNode>();
// sessionId 2 path Set
Map<Long, HashSet<String>> ephemerals =
new ConcurrentHashMap<Long, HashSet<String>>();
WatchManager dataWatches = new WatchManager();
WatchManager childWatches = new WatchManager();
ReferenceCountedACLCache aclCache = new ReferenceCountedACLCache();

nodes用来保存所有path到节点的映射;
ephemerals来保存所有零时节点集合;
dataWatcheschildWatches分别是监听器,当数据节点发生变化时触发相应的watch
aclCache保存所有的访问控制信息。

DataNode的数据结构如下:

1
2
3
4
5
6
7
byte data[];
// 访问控制主键
Long acl;
// 磁盘存储对象
public StatPersisted stat;
// 子节点路径字符串
private Set<String> children;

其创建节点的逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public void createNode(final String path, byte data[], List<ACL> acl, long ephemeralOwner, int parentCVersion, long zxid, long time, Stat outputStat) {
int lastSlash = path.lastIndexOf('/');
String parentName = path.substring(0, lastSlash);
String childName = path.substring(lastSlash + 1);
//实例化 持久化对象
StatPersisted stat = new StatPersisted();
stat.setCtime(time);
stat.setMtime(time);
stat.setCzxid(zxid);
stat.setMzxid(zxid);
stat.setPzxid(zxid);
stat.setVersion(0);
stat.setAversion(0);
stat.setEphemeralOwner(ephemeralOwner);
//获取父节点
DataNode parent = nodes.get(parentName);
if (parent == null) {
throw new KeeperException.NoNodeException();
}
synchronized (parent) {
Set<String> children = parent.getChildren();
if (children.contains(childName)) {
throw new KeeperException.NodeExistsException();
}
// 更新 父节点的版本
if (parentCVersion == -1) {
parentCVersion = parent.stat.getCversion();
parentCVersion++;
}
parent.stat.setCversion(parentCVersion);
parent.stat.setPzxid(zxid);
//存储 ACL信息
Long longval = aclCache.convertAcls(acl);
//创建节点
DataNode child = new DataNode(data, longval, stat);
parent.addChild(childName);
//存储Node
nodes.put(path, child);
EphemeralType ephemeralType = EphemeralType.get(ephemeralOwner);
if (ephemeralType == EphemeralType.CONTAINER) {
containers.add(path);
} else if (ephemeralType == EphemeralType.TTL) {
ttls.add(path);
} else if (ephemeralOwner != 0) {
HashSet<String> list = ephemerals.get(ephemeralOwner);
if (list == null) {
list = new HashSet<String>();
ephemerals.put(ephemeralOwner, list);
}
synchronized (list) {
list.add(path);
}
}
if (outputStat != null) {
child.copyStat(outputStat);
}
}
// ...
// 触发 监听管理器
dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,
Event.EventType.NodeChildrenChanged);
}

磁盘数据

ZK中,FileTxnSnapLog是磁盘存储的核心类。

1
2
3
4
5
6
7
8
9
10
public class FileTxnSnapLog {
// 事务日志文件目录
private final File dataDir;
// 快照文件目录
private final File snapDir;
// 事务日志管理器 FileTxnLog
private TxnLog txnLog;
// 快照管理器 FileSnap
private SnapShot snapLog;
}

磁盘文件分为两部分:快照文件事务日志文件。其中快照文件将定期保存内存数据库内容,事务日志则保存所有事务记录。
恢复时,首先选择一个可用的最近保存的快照文件进行恢复,其后再读取事务日志进行恢复。

下面重点来看看其 写入日志和数据快照的过程服务启动恢复的过程

日志写入

服务端负责日志写入的是SyncRequestProcessor
其过程如下:

  1. 首先向事务日志中追加一条记录;
  2. 然后判断追加的记录数量是否大于 snapCount / 2 + randRoll(其中snapCount默认为100000,randRoll为随机数,之所以这么做是为了确保所有服务器不是同一时间做快照),如果大于则首先滚动日志文件(将FileTxnLog.logStream设置为null),然后起异步线程 snapInProcess 进行快照。
  3. 默认当积累的事务记录超过1000个则进行flush到磁盘。

下面分别来看追加事务记录和快照的过程:

  1. 追加事务记录:其最终会调用FileTxnLog.append方法,其内部执行过程如下:
    1. 首先判断logStream==null,若是则根据请求的Zxid创建新的事务日志文件,并为文件预分配 64MB 的空间,填充 0
    2. 其后序列化事务头和事务体,然后据此生成CheckSum,写入文件中。
  2. 快照:其最终会调用ZookeeperServer.takeSnapshot方法:
1
2
3
4
5
6
txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap);
public void save(DataTree dataTree, ConcurrentHashMap<Long, Integer> sessionsWithTimeouts, boolean syncSnap) throws IOException {
long lastZxid = dataTree.lastProcessedZxid;
File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));
snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile, syncSnap);
}

跟据DataTree.lastProcessedZxidsnapDir目录下生成一个快照文件。然后序列化DataTreeSession存入文件中。

数据恢复

当服务器重启时,会调用ZkDatabase.loadDataBase进行数据恢复。
其最终会调用FileTxnSnapLog.restore方法:

  1. 首先找到最近可用的快照文件恢复DataTreeSession
    1. 近可能多找出可用的100个快照文件,按文件名进行逆序排序;
    2. 选取最近的快照文件进行反序列化恢复DataTreeSession,并做一次校验。
  2. 根据快照文件恢复后的DataTree对应的lastProcessZxod获取事务记录,然后将事务应用到DataTreeSession上。(FileTxnSnapLog.fastForwardFromEdits
  3. 需要特别提出的是,在事务记录恢复的过程中,会回调ZKDatabase.commitProposalPlaybackListener,将事务加入到committedLog中,并保存minCommitedLogmaxCommitedLog,其在选举后LearnerLeader见的数据同步过程中起着重要作用。

数据同步

在前面《选举算法与实现》章节中,提到完成选举后,Leader与各Learner会进行一次数据同步,有三种模式:DIFFSNAPTURNC。这里令Learner发送到LeaderZxidpeerLastZxid

  1. peerLastZxidminCommitedLogmaxCommitedLog之间时,执行DIFF操作,直接从LeaderZkDataabse.committedLog中消费;
  2. peerLastZxid小于minCommitedLog,则进行全量数据同步;
  3. peerLastZxid大于maxCommitedLogLearner则需要进行回退操作。
您的支持是我创作源源不断的动力