6. Watch机制
watch用于监听指定key或指定key前缀的键值对的变动。其 api 如下:
1 | etcdctl watch --prefix[=false] // 带前缀 |
下面将详细解析其工作原理。首先将介绍其初始化过程,其次介绍创建watch过程以及相关键值对修改时的操作。
6.1 初始化过程
在初始化存储的时候,提到了newWatchableStore方法,其对boltDB进行了一次封装。
1 | s := &watchableStore{ |
首先来看下WatcherGroup的数据结构:
1 | type watcherGroup struct { |
总结整体流程如下:
- 首先从未同步watcherGroup中获取最多
maxWatchersPerSync个watcher对应的关注最小的Revision,并删除已经被压缩的watcher; - 使用上面得到的最小
Revision去获取所有key-val对; - 对所有
key-val对,去寻找关注其变化的watcher最终生成WactherBatch - 发送给客户端,然后将
watcher从unsync WatcherGroup中移动到sync WatcherGroup中。对于未发送成功的消息添加到victims中,在syncVictimsLoop中重试。
接下来即系了解syncVictimsLoop的实现逻辑:
1 | for { // 循环执行moveVictims方法 |
6.2 创建watch
当通过客户端执行如下命令etcdctl watch /ty/dj --prefix命令时,客户端与服务端建立grpc双工通道进行交互,其最终会调用watchServer.Watch方法,在其中建立双工通道交互方式。
1 | sws := serverWatchStream{ |
首先来看sws.recvLoop处理过程,通过grpcstream通道获取请求,然后调用watchStream.Watch方法创建watch,并注册到相应的WatcherGroup上。
1 | for ;; |
再来看发送流程:
通过watcher.send发送变更消息的时候,实际上是传递到watcher的ch通道上,而这个通道则是serverWatchStream的发送通道:
1 | select { |
6.3 键值对更改
建设有两个客户端A,B
A客户端先执行了操作etcdctl watch /ty/dj;B客户端随后执行了操作etcdctl put /ty/dj hello。那么在B执行该操作时watcher机制是如何work的?
对于put操作,会执行storeTxnWrite.put方法,会将更新的数据添加到 tw.changes中,最后会执行watchableStoreTxnWrite.End方法:
1 | changes := tw.Changes() |
总结过程如下:在进行put操作时会将变更添加到changes中。当put操作结束时,执行watchableStoreTxnWrite.End方法,转换成newWatcherBatch事件,然后调用watchableStore.notify方法进行发送更新。