ETCD Watch机制

6. Watch机制

watch用于监听指定key或指定key前缀的键值对的变动。其 api 如下:

1
2
3
etcdctl watch --prefix[=false] // 带前缀
--prev-kv[=false]// 是否获取之前的key-val对
--rev=0 // 开始监听的主Revision

下面将详细解析其工作原理。首先将介绍其初始化过程,其次介绍创建watch过程以及相关键值对修改时的操作。

6.1 初始化过程

在初始化存储的时候,提到了newWatchableStore方法,其对boltDB进行了一次封装。

1
2
3
4
5
6
7
8
9
10
s := &watchableStore{
store: NewStore(lg, b, le, ig),
victimc: make(chan struct{}, 1),
unsynced: newWatcherGroup(), // 包含所有未同步的watchers,有事件发生需要进行同步
synced: newWatcherGroup(), // 包含与存储进程同步的所有同步了的watchers,map的key即为watcher监听的key。
stopc: make(chan struct{}),
}

go s.syncWatchersLoop()// 启动每100ms一次的同步未同步映射中的监听者
go s.syncVictimsLoop()// 同步预先发送未成功的watchers

首先来看下WatcherGroup的数据结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
type watcherGroup struct {
// key到监听此key的watcher集合
keyWatchers watcherSetByKey
// 监听范围的监听者 (红黑树)
ranges adt.IntervalTree
// watcher 集合
watchers watcherSet
}
watchableStore.syncWatchers
|- curRev := s.store.currentRev
|- compactionRev := s.store.compactMainRev
|- wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev) // 从未同步的监听组中获取watcher集合
|- wg.chooseAll // 选择所有watcher中最小`Revision`
|- for w := range wg.watchers {
if w.minRev < compactRev { // 当小于当前压缩版本的时候直接做一次响应
select {
case w.ch <- WatchResponse{WatchID: w.id, CompactRevision: compactRev}:
w.compacted = true
wg.delete(w)
default:
// retry next time
}
}
if minRev > w.minRev {
minRev = w.minRev
}
}
|- revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0) // 查询大于revision的所有 key-val
|- evs = kvsToEvents(s.store.lg, wg, revs, vs) // 转变成 事件
|- for i, v := range vals {
|- kv.Unmarshal(v)
|- if !wg.contains(string(kv.Key)) { // 查询是否有监听此key的watcher
continue
}
|- ty := mvccpb.PUT
if isTombstone(revs[i]) {
ty = mvccpb.DELETE
kv.ModRevision = bytesToRev(revs[i]).main
}
|- evs = append(evs, mvccpb.Event{Kv: &kv, Type: ty})
}
|- newWatcherBatch(wg, evs)// 将变更事件与关注该事件的watcher建立映射 map[*watcher]*eventBatch
|- 对`WatcherBatch`中的每一项作如下处理:
|- 如果一次性未发送完成,则设置下次发送的Rev `w.minRev = eb.moreRev`
|- if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev}) { // 发送变更响应。将消息传递到`watcher.ch`中
} else {
if victims == nil {
victims = make(watcherBatch)
}
w.victim = true
}
|- if w.victim { // 若发送未成功,则将要发送的`WatcherBatch`存储到`victims`中
victims[w] = eb
} else { //发送成功的话,则
if eb.moreRev != 0 {
continue
}
s.synced.add(w)
}
|- s.unsynced.delete(w)
|- s.addVictim(victims)

总结整体流程如下:

  1. 首先从未同步watcherGroup中获取最多maxWatchersPerSyncwatcher对应的关注最小的Revision,并删除已经被压缩的watcher
  2. 使用上面得到的最小Revision去获取所有key-val对;
  3. 对所有key-val对,去寻找关注其变化的watcher最终生成WactherBatch
  4. 发送给客户端,然后将watcherunsync WatcherGroup中移动到sync WatcherGroup中。对于未发送成功的消息添加到victims中,在syncVictimsLoop中重试。

接下来即系了解syncVictimsLoop的实现逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
for { // 循环执行moveVictims方法
for s.moveVictims() != 0 {
}
isEmpty := len(s.victims) == 0
var tickc <-chan time.Time
if !isEmpty {
tickc = time.After(10 * time.Millisecond)
}
select {
case <-tickc:
case <-s.victimc:
case <-s.stopc:
return
}
}
s.moveVictims
|- 对victims中每一项:
|- 首先尝试进行发送`w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev})`。不成功的话,则加到`newVictim`
|- 其后根据当前集群情况将其加入到`unsynced WatcherGroup`中(w.minRev <= curRev)或者`synced WatcherGroup`

6.2 创建watch

当通过客户端执行如下命令etcdctl watch /ty/dj --prefix命令时,客户端与服务端建立grpc双工通道进行交互,其最终会调用watchServer.Watch方法,在其中建立双工通道交互方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
sws := serverWatchStream{
clusterID: ws.clusterID,
memberID: ws.memberID,
maxRequestBytes: ws.maxRequestBytes,
sg: ws.sg,
watchable: ws.watchable,
ag: ws.ag,
gRPCStream: stream,
watchStream: ws.watchable.NewWatchStream(),
ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen),
progress: make(map[mvcc.WatchID]bool), // 记录watcher是否需要发送精度
prevKV: make(map[mvcc.WatchID]bool), // 是否需要查询历史值
fragment: make(map[mvcc.WatchID]bool), // 是否需要分片
closec: make(chan struct{}),
}
go sws.sendLoop() // 异步建立起发送消息的过程
go sws.recvLoop() // 异步接收请求,并进行处理

首先来看sws.recvLoop处理过程,通过grpcstream通道获取请求,然后调用watchStream.Watch方法创建watch,并注册到相应的WatcherGroup上。

1
2
3
4
5
6
7
8
for ;;
req, err := sws.gRPCStream.Recv() // 接收请求
switch uv := req.RequestUnion.(type) {
case *pb.WatchRequest_CreateRequest:
id, err := sws.watchStream.Watch(mvcc.WatchID(creq.WatchId), creq.Key, creq.RangeEnd, rev, filters...) // 注册一个Watch
|- watchableStore.watch
|- synced := startRev > s.store.currentRev || startRev == 0 //查看需要同步
|- 如果不需要,则放入`synced WatcherGroup`中,否则放入`unsynced WatcherGroup`

再来看发送流程:
通过watcher.send发送变更消息的时候,实际上是传递到watcherch通道上,而这个通道则是serverWatchStream的发送通道:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
select {
case wresp, ok := <-sws.watchStream.Chan():
evs := wresp.Events
events := make([]*mvccpb.Event, len(evs))
sws.mu.RLock()
needPrevKV := sws.prevKV[wresp.WatchID]
for i := range evs {
events[i] = &evs[i]
if needPrevKV { // 对于需要查询历史版本的数据
opt := mvcc.RangeOptions{Rev: evs[i].Kv.ModRevision - 1}
r, err := sws.watchable.Range(evs[i].Kv.Key, nil, opt)
if err == nil && len(r.KVs) != 0 {
events[i].PrevKv = &(r.KVs[0])
}
}
}
//是否需要分割发送
fragmented, ok := sws.fragment[wresp.WatchID]
if !fragmented && !ok { //不需要是,则直接发送
serr = sws.gRPCStream.Send(wr)
} else {
serr = sendFragments(wr, sws.maxRequestBytes, sws.gRPCStream.Send)
}
sws.mu.Lock()
if len(evs) > 0 && sws.progress[wresp.WatchID] {
// elide next progress update if sent a key update
sws.progress[wresp.WatchID] = false
}
sws.mu.Unlock()
//...
}

6.3 键值对更改

建设有两个客户端A,B
A客户端先执行了操作etcdctl watch /ty/dj;B客户端随后执行了操作etcdctl put /ty/dj hello。那么在B执行该操作时watcher机制是如何work的?
对于put操作,会执行storeTxnWrite.put方法,会将更新的数据添加到 tw.changes中,最后会执行watchableStoreTxnWrite.End方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
changes := tw.Changes()
rev := tw.Rev() + 1
evs := make([]mvccpb.Event, len(changes))
for i, change := range changes { // 将变更转换为事件
evs[i].Kv = &changes[i]
if change.CreateRevision == 0 {
evs[i].Type = mvccpb.DELETE
evs[i].Kv.ModRevision = rev
} else {
evs[i].Type = mvccpb.PUT
}
}
tw.s.notify(rev, evs) // 调用 watchableStore.notify方法
|- for w, eb := range newWatcherBatch(&s.synced, evs) {
if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) { // 发送更新
} else {
// move slow watcher to victims
w.minRev = rev + 1
if victim == nil {
victim = make(watcherBatch)
}
w.victim = true
victim[w] = eb
s.synced.delete(w)
slowWatcherGauge.Inc()
}
}
s.addVictim(victim)

总结过程如下:在进行put操作时会将变更添加到changes中。当put操作结束时,执行watchableStoreTxnWrite.End方法,转换成newWatcherBatch事件,然后调用watchableStore.notify方法进行发送更新。

您的支持是我创作源源不断的动力