6. Watch机制
watch
用于监听指定key
或指定key
前缀的键值对的变动。其 api
如下:
1 | etcdctl watch --prefix[=false] // 带前缀 |
下面将详细解析其工作原理。首先将介绍其初始化过程,其次介绍创建watch
过程以及相关键值对修改时的操作。
6.1 初始化过程
在初始化存储的时候,提到了newWatchableStore
方法,其对boltDB
进行了一次封装。
1 | s := &watchableStore{ |
首先来看下WatcherGroup
的数据结构:
1 | type watcherGroup struct { |
总结整体流程如下:
- 首先从未同步watcherGroup中获取最多
maxWatchersPerSync
个watcher
对应的关注最小的Revision
,并删除已经被压缩的watcher
; - 使用上面得到的最小
Revision
去获取所有key-val
对; - 对所有
key-val
对,去寻找关注其变化的watcher
最终生成WactherBatch - 发送给客户端,然后将
watcher
从unsync WatcherGroup
中移动到sync WatcherGroup
中。对于未发送成功的消息添加到victims
中,在syncVictimsLoop
中重试。
接下来即系了解syncVictimsLoop
的实现逻辑:
1 | for { // 循环执行moveVictims方法 |
6.2 创建watch
当通过客户端执行如下命令etcdctl watch /ty/dj --prefix
命令时,客户端与服务端建立grpc
双工通道进行交互,其最终会调用watchServer.Watch
方法,在其中建立双工通道交互方式。
1 | sws := serverWatchStream{ |
首先来看sws.recvLoop
处理过程,通过grpc
stream通道获取请求,然后调用watchStream.Watch
方法创建watch
,并注册到相应的WatcherGroup
上。
1 | for ;; |
再来看发送流程:
通过watcher.send
发送变更消息的时候,实际上是传递到watcher
的ch
通道上,而这个通道则是serverWatchStream
的发送通道:
1 | select { |
6.3 键值对更改
建设有两个客户端A,B
A客户端先执行了操作etcdctl watch /ty/dj
;B客户端随后执行了操作etcdctl put /ty/dj hello
。那么在B执行该操作时watcher
机制是如何work的?
对于put
操作,会执行storeTxnWrite.put
方法,会将更新的数据添加到 tw.changes
中,最后会执行watchableStoreTxnWrite.End
方法:
1 | changes := tw.Changes() |
总结过程如下:在进行put
操作时会将变更添加到changes
中。当put
操作结束时,执行watchableStoreTxnWrite.End
方法,转换成newWatcherBatch
事件,然后调用watchableStore.notify
方法进行发送更新。