Zamperini


  • 首页

  • 关于

  • 标签23

  • 分类11

  • 归档56

RocketMQ 整体架构

发表于 2018-03-17 | 分类于 rocketmq

1. RocketMQ 整体架构

[TOC]

整体架构

RocketMQ主要由四部分组成,NameServer、Broker、Producer、Consumer。其中NameServer、Broker是独立部署集群,而Producer、Consumer一般是以SDK的形式提供给业务方,嵌在业务集群内。另外还有一个FilterSrv,可选择性使用,用于消费者自定义消息过滤。

NameServer

类似于服务治理框架里的注册中心。管理Broker集群的注册信息并提供心跳来检测他们是否可用,持有关于broker集群和队列的全部路由信息;

Broker

负责消息的存储传递,客户端查询并保证高可用。此外,代理提供了灾难恢复、丰富的度量统计和警报机制。主要功能如下;
set up-w600

  1. Remoting Module:服务入口,接收来自客户端的请求并转发;
  2. Client Manager:客户端管理,管理客户端(生产者/消费者)还有维护消费者主题订阅;
  3. Store Service:信息存储和查询的api服务;
  4. HA Service: 提供主从broker的数据同步;
  5. Index Service:为消息建立索引提供消息快速查询。

Producer

阅读全文 »

Zookeeper 事务请求

发表于 2018-03-06 | 分类于 zookeeper

5. 事务请求

下面将介绍在Zookeeper中,一次事务请求的整个过程。对于非事务请求,其处理过程比较简单,则不进行介绍。对于客户端来说,其过程是一样的,客户端将请求包装进队列中,然后发往服务端。所以这里的重点是分析服务端的处理过程:
每个Client一般都是和一个Server保持长连,Server 一般是 Follower、Observer。当Leader配置了leaderServes=true,则客户端也可以连接到Leader。

Server 接收到请求后,会间接交给ZookeeperServer处理,为了将整个过程都涵盖到,因此下面假定Client连接的是Follower,并向Follwer发送 createNode 的请求。

下图是事务请求的整个过程:

整体流程如下:

  1. FollowerRequestProcessor 接收到Client的请求后,会将请求交给 CommitRequestProcessor,其会判断请求是否要提交:如果需要则将请求放进针对每个sessionId区分的 pendingRequests 中等待commit;
  2. 在将请求提交给 CommitRequestProcessor 后,FollowerRequestProcessor 会将事务请求通过之前与Leader建立的连接 交给Leader;
  3. Leader 在接收到请求后,交给 PrepRequestProcessor 进行处理,其会在ZK的outstandingChanges和outstandingCHangesForPath 里添加一条更改记录ChangeRecord。其后将请求交给 ProposalRequestProcessor 进行处理;
  4. ProposalRequestProcessor首先将请求提交给下一个处理器CommitRequestProcessor,然后调用 Leader.propose 来发起事务提议,也即是给每个与自己相连的Follower发从事务提议 Proposal。完成发送后调用 SyncRequestProcessor 添加事务记录,再将请求发送给AckRequestProcessor,其向Leader做出Ack应答;
  5. Follower接收到来自Leader的提议后,交给SyncRequestProcessor添加事务记录,再将请求传递给下一个处理器SendAckRequestProcessor,其则向Leader返回ACK消息;
  6. Leader收到ACK信息后,会执行Leader.processAck方法,首先会取出之前提交的Proposal,然后进行判断是否有超过一半的Follower返回了ACK,若是则将Proposal放进 toBeApplied 队列中,并向所有Follower发送 COMMIT提交事务(若提交的事务与本地不一致则系统会停机),向Observer发送 INFORM 并带上数据。最后执行CommitRequestProcessor.commit进行本地提交,本地提交最后执行的是Leader.processRequest方法,其将请求最终交给FinalRequestProcessor进行处理;
  7. FinalRequestProcessor则将事务请求应用到内存数据库中,并做一些收尾工作;
  8. Follower收到事务的COMMIT消息,提交事务,执行CommitRequestProcessor.commit将事务变更应用到内存数据库中。
阅读全文 »

Zookeeper 会话管理

发表于 2018-03-06 | 分类于 zookeeper

6. 会话管理

  1. leader负责session的过期机制;
  2. 每个follower 或者learner本地会接受 心跳信息,不会立即同步到leader,follower会在leader发来的心跳信息中返回其内部保存的客户端心跳激活信息集合
  3. 创建session和删除session是一个事务操作;
阅读全文 »

Zookeeper 数据存储

发表于 2018-03-05 | 分类于 zookeeper

4. 数据与存储

ZkDatabase是ZK的内存数据库,其负责ZK的所有会话、DataTree存储和事务日志。其会定时向磁盘 dump 快照数据,同时在 ZK 启动的时候,会通过磁盘上的事务日志和快照数据恢复内存数据。
ZK 中的数据存储分为两部分:内存数据存储、磁盘数据存储。其中内存数据中最核心的及时DataTree,而磁盘数据包括 快照数据和事务日志。

内存数据

DataTree是一个“树”形结构,按路径形成一个树形结构。其内部结构如下:

1
2
3
4
5
6
7
8
ConcurrentHashMap<String, DataNode> nodes =
new ConcurrentHashMap<String, DataNode>();
// sessionId 2 path Set
Map<Long, HashSet<String>> ephemerals =
new ConcurrentHashMap<Long, HashSet<String>>();
WatchManager dataWatches = new WatchManager();
WatchManager childWatches = new WatchManager();
ReferenceCountedACLCache aclCache = new ReferenceCountedACLCache();

nodes用来保存所有path到节点的映射;
ephemerals来保存所有零时节点集合;
dataWatches和childWatches分别是监听器,当数据节点发生变化时触发相应的watch;
aclCache保存所有的访问控制信息。

DataNode的数据结构如下:

1
2
3
4
5
6
7
byte data[];
// 访问控制主键
Long acl;
// 磁盘存储对象
public StatPersisted stat;
// 子节点路径字符串
private Set<String> children;

其创建节点的逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public void createNode(final String path, byte data[], List<ACL> acl, long ephemeralOwner, int parentCVersion, long zxid, long time, Stat outputStat) {
int lastSlash = path.lastIndexOf('/');
String parentName = path.substring(0, lastSlash);
String childName = path.substring(lastSlash + 1);
//实例化 持久化对象
StatPersisted stat = new StatPersisted();
stat.setCtime(time);
stat.setMtime(time);
stat.setCzxid(zxid);
stat.setMzxid(zxid);
stat.setPzxid(zxid);
stat.setVersion(0);
stat.setAversion(0);
stat.setEphemeralOwner(ephemeralOwner);
//获取父节点
DataNode parent = nodes.get(parentName);
if (parent == null) {
throw new KeeperException.NoNodeException();
}
synchronized (parent) {
Set<String> children = parent.getChildren();
if (children.contains(childName)) {
throw new KeeperException.NodeExistsException();
}
// 更新 父节点的版本
if (parentCVersion == -1) {
parentCVersion = parent.stat.getCversion();
parentCVersion++;
}
parent.stat.setCversion(parentCVersion);
parent.stat.setPzxid(zxid);
//存储 ACL信息
Long longval = aclCache.convertAcls(acl);
//创建节点
DataNode child = new DataNode(data, longval, stat);
parent.addChild(childName);
//存储Node
nodes.put(path, child);
EphemeralType ephemeralType = EphemeralType.get(ephemeralOwner);
if (ephemeralType == EphemeralType.CONTAINER) {
containers.add(path);
} else if (ephemeralType == EphemeralType.TTL) {
ttls.add(path);
} else if (ephemeralOwner != 0) {
HashSet<String> list = ephemerals.get(ephemeralOwner);
if (list == null) {
list = new HashSet<String>();
ephemerals.put(ephemeralOwner, list);
}
synchronized (list) {
list.add(path);
}
}
if (outputStat != null) {
child.copyStat(outputStat);
}
}
// ...
// 触发 监听管理器
dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,
Event.EventType.NodeChildrenChanged);
}
阅读全文 »

Zookeeper 选举算法

发表于 2018-03-04 | 分类于 zookeeper

3. 选举算法与实现

在第2节中,介绍了在做选举之前做的各服务器之间的通讯初始化。本章将在此基础上,介绍选举的过程。

其大体过程如下:

  1. 投票节点:首先将节点状态转变成LOOKING状态;

  2. 投票节点:将自己作为(SID、ZXID)广播给其他节点;

  3. 其他节点:接收其他节点的选举投票;

  4. 其他节点:如果接收的选举纪元大于本地的选举轮数则更新本地选举轮数,并将接收的投票跟本地投票比较后更新本地投票,再广播出去。否则执行4;

  5. 其他节点:判断外部投票是否优于本地,如果是则更新本地提议再广播出去;

    1. 比较纪元大小(最新日志记录的纪元,并不是当前选举纪元);

    2. 纪元相等的情况,比较 Zxid;

    3. Zxid相等的情况下 则比较 Sid;

      return ((newEpoch > curEpoch) ||

      ((newEpoch == curEpoch) &&
      ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
  6. 投票节点:收集投票提议,并进行统计,若有节点收集的投票大于一半以上则认为该节点为Leader节点;

  7. 投票节点: 若投票选定节点是该节点则转变自己的服务器状态为LEADING,若不是则转换成FOLLOWING | OBSERVING。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
public Vote lookForLeader() throws InterruptedException {
// sid 2 vote
// 用于收集LOOKING、FOLLOWING、LEADING状态下的server的投票
Map<Long, Vote> recvset = new HashMap<Long, Vote>();
// 用于收集FOLLOWING、LEADING状态下的server的投票(能够收集到这种状态下的投票,说明leader选举已经完成)
// sid 2 vote
Map<Long, Vote> outofelection = new HashMap<Long, Vote>();
int notTimeout = finalizeWait;
synchronized(this) {
logicalclock.incrementAndGet();
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
// 向所有节点广播投票信息(投自己)
sendNotifications();
while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)) {
// 接收消息
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
// 超时没有收到任何消息
if(n == null) {
if(manager.haveDelivered()){
// 再次发送广播
sendNotifications();
} else {
//尝试连接
manager.connectAll();
}
int tmpTimeOut = notTimeout * 2;
notTimeout = (tmpTimeOut < maxNotificationInterval?
tmpTimeOut : maxNotificationInterval);
continue;
}
if (validVoter(n.sid) && validVoter(n.leader)) {
switch (n.state) {
case LOOKING:
// ...
//如果收到的投票纪元大于本地的选举轮数那么更新本机
// If notification > current, replace and send messages out
if (n.electionEpoch > logicalclock.get()) {
logicalclock.set(n.electionEpoch);
recvset.clear();
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
//发送一次广播
sendNotifications();
// 小于本机的则不做处理
} else if (n.electionEpoch < logicalclock.get()) {
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
//如果 收到的投票信息大于本机的投票则更新本机且继续发送
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
//收集投票
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
//判断接收的消息里对 proposedLeader 的认可度是否大于一般,是则结束选举
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {

// 继续收集投票信息,以防万一
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null) {
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
recvqueue.put(n);
break;
}
}
// 设置本地服务的角色
if (n == null) {
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader,
proposedZxid, proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
// ...
}
} //...
}
return null;
}

再来详细看下,ZK是按什么判断投票胜出的:

1
2
3
4
5
6
7
8
9
10
11
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
if(self.getQuorumVerifier().getWeight(newId) == 0){
return false;
}
// 首先 比较 纪元
// 纪元相等的情况,比较 Zxid
// zxid相等的情况下 则比较sid
return ((newEpoch > curEpoch) ||
((newEpoch == curEpoch) &&
((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}

以上介绍的是整个集群处于启动阶段的选举过程。当集群已经存在一个Leader,而本地机器刚刚进入Leader选举,则会收到其他节点发来的非LOOKING状态通知:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public Vote lookForLeader() {
// ...
case FOLLOWING:
case LEADING:
//当其他节点都已经完成 选举,则给本地返回的消息state是 FOLLOWING 和 LEADING
if(n.electionEpoch == logicalclock.get()) {
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
if(termPredicate(recvset, new Vote(n.leader,
n.zxid, n.electionEpoch, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, n.electionEpoch)) {
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());

Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
outofelection.put(n.sid, new Vote(n.leader,
IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state));
if (termPredicate(outofelection, new Vote(n.leader,
IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, IGNOREVALUE)) {
synchronized(this){
logicalclock.set(n.electionEpoch);
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
}
Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
}
阅读全文 »
prev1…8910…12next
Zamperini

Zamperini

GitHub E-Mail Weibo
© 2020 Zamperini
由 Hexo 强力驱动
|
主题 — NexT.Gemini v6.0.3