Zamperini


  • 首页

  • 关于

  • 标签23

  • 分类11

  • 归档56

ETCD Watch机制

发表于 2019-11-25 | 分类于 etcd

6. Watch机制

watch用于监听指定key或指定key前缀的键值对的变动。其 api 如下:

1
2
3
etcdctl watch --prefix[=false] // 带前缀
--prev-kv[=false]// 是否获取之前的key-val对
--rev=0 // 开始监听的主Revision

下面将详细解析其工作原理。首先将介绍其初始化过程,其次介绍创建watch过程以及相关键值对修改时的操作。

6.1 初始化过程

在初始化存储的时候,提到了newWatchableStore方法,其对boltDB进行了一次封装。

1
2
3
4
5
6
7
8
9
10
s := &watchableStore{
store: NewStore(lg, b, le, ig),
victimc: make(chan struct{}, 1),
unsynced: newWatcherGroup(), // 包含所有未同步的watchers,有事件发生需要进行同步
synced: newWatcherGroup(), // 包含与存储进程同步的所有同步了的watchers,map的key即为watcher监听的key。
stopc: make(chan struct{}),
}

go s.syncWatchersLoop()// 启动每100ms一次的同步未同步映射中的监听者
go s.syncVictimsLoop()// 同步预先发送未成功的watchers

首先来看下WatcherGroup的数据结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
type watcherGroup struct {
// key到监听此key的watcher集合
keyWatchers watcherSetByKey
// 监听范围的监听者 (红黑树)
ranges adt.IntervalTree
// watcher 集合
watchers watcherSet
}
watchableStore.syncWatchers
|- curRev := s.store.currentRev
|- compactionRev := s.store.compactMainRev
|- wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev) // 从未同步的监听组中获取watcher集合
|- wg.chooseAll // 选择所有watcher中最小`Revision`
|- for w := range wg.watchers {
if w.minRev < compactRev { // 当小于当前压缩版本的时候直接做一次响应
select {
case w.ch <- WatchResponse{WatchID: w.id, CompactRevision: compactRev}:
w.compacted = true
wg.delete(w)
default:
// retry next time
}
}
if minRev > w.minRev {
minRev = w.minRev
}
}
|- revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0) // 查询大于revision的所有 key-val
|- evs = kvsToEvents(s.store.lg, wg, revs, vs) // 转变成 事件
|- for i, v := range vals {
|- kv.Unmarshal(v)
|- if !wg.contains(string(kv.Key)) { // 查询是否有监听此key的watcher
continue
}
|- ty := mvccpb.PUT
if isTombstone(revs[i]) {
ty = mvccpb.DELETE
kv.ModRevision = bytesToRev(revs[i]).main
}
|- evs = append(evs, mvccpb.Event{Kv: &kv, Type: ty})
}
|- newWatcherBatch(wg, evs)// 将变更事件与关注该事件的watcher建立映射 map[*watcher]*eventBatch
|- 对`WatcherBatch`中的每一项作如下处理:
|- 如果一次性未发送完成,则设置下次发送的Rev `w.minRev = eb.moreRev`
|- if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev}) { // 发送变更响应。将消息传递到`watcher.ch`中
} else {
if victims == nil {
victims = make(watcherBatch)
}
w.victim = true
}
|- if w.victim { // 若发送未成功,则将要发送的`WatcherBatch`存储到`victims`中
victims[w] = eb
} else { //发送成功的话,则
if eb.moreRev != 0 {
continue
}
s.synced.add(w)
}
|- s.unsynced.delete(w)
|- s.addVictim(victims)

总结整体流程如下:

阅读全文 »

TTL&Lease实现

发表于 2019-11-24 | 分类于 etcd

5. TTL & Lease 实现

etcd中用来实现TTL的机制叫Lease,Lease可以用来绑定多个key。Lease的主要用法如下:

1
2
3
4
5
6
7
8
9
10
1. etcdctl lease grant 1900 // 申请一个租约,返回租约ID
- lease 326969472b0c5d05 granted with TTL(1900s)
2. etcdctl lease revoke 326969472b0c5d05 // 取消租约
- lease 326969472b0c5d05 revoked
3. etcdctl lease timetolive 326969472b0c5d08 // 查询剩余多长时间
- lease 326969472b0c5d08 granted with TTL(190s), remaining(182s)
4. etcdctl lease keep-alive 326969472b0c5d05 // 到期续约
- lease 326969472b0c5d05 keepalived with TTL(1900)
5. etcdctl lease keep-alive --once=true 326969472b0c5d0c // 单次续约
- lease 326969472b0c5d0c keepalived with TTL(200)

下面将解析其工作原理,首先将介绍其初始化过程,然后介绍创建、以及绑定key以及过期的操作。

5.1 初始化

Lessor是在创建EtcdServer是进行初始化的:lease.NewLessor(...)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat // 首先计算最小TTL单元,为选举跳数的1.5倍*心跳间隔时间
srv.lessor = lease.NewLessor(log, srv.be, lease.LessorConfig{MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())), CheckpointInterval: cfg.LeaseCheckpointInterval}) // 创建`Lessor`,传入最小租约TTL时间,和租约周期性检测时间间隔。租约的TTL时间最小不能小于最小租约时间(秒级别)
|- if checkpointInterval == 0 { // 设置默认时间为5分钟
checkpointInterval = 5 * time.Minute
}
|- l := &lessor{
leaseMap: make(map[LeaseID]*Lease),
itemMap: make(map[LeaseItem]LeaseID),
leaseHeap: make(LeaseQueue, 0), // 根据租约到期时间排序的小根堆
leaseCheckpointHeap: make(LeaseQueue, 0),
b: b,
minLeaseTTL: cfg.MinLeaseTTL,
checkpointInterval: checkpointInterval,
expiredC: make(chan []*Lease, 16),
stopC: make(chan struct{}),
doneC: make(chan struct{}),
}
|- l.initAndRecover() // 初始化并恢复
|- vs := tx.UnsafeRange(leaseBucketName...) // 从`boltDB`中,查询`lease` bucket中所有数组
|- 对于每个记录,解码`lpb.Unmarshal(vs[i])`,然后插进`le.leaseMap`中
|- le.leaseMap[ID] = &Lease{
ID: ID,
ttl: lpb.TTL,
itemSet: make(map[LeaseItem]struct{}),
expiry: forever,
revokec: make(chan struct{}),
}
|- heap.Init(&le.leaseHeap) //初始化堆
|- heap.Init(&le.leaseCheckpointHeap)
|- go l.runLoop() // 运行500ms一次的任务,撤销过期的租约,周期性调度`Lease`检测。后续将详细分析其执行过程

完成了Lessor的初始化后,随着进入kvstore.restore的初始化,会伴随着恢复key与lease的映射关系。

1
2
3
4
5
6
当从 bucket`key`中获取所有key-val后,执行`restoreChunk`方法
restoreChunk
|- 其会检测kv是否带`Lease`,若有则加入到`keyToLease`map中
|- 对`keyToLease`map中的每个entry
|- s.le.Attach(lid, []lease.LeaseItem{{Key: key}})
|- 将`key`绑定到`lease`中,并建立`key`到`lease`的映射

5.2 Lease创建

阅读全文 »

ETCD事务请求

发表于 2019-11-23 | 分类于 etcd

4. 事务请求

[TOC]

本节我们将讲述 etcd集群的事务请求处理过程。首先我们将以put操作为例来讲解一般性事务请求过程。其次,我们将介绍 etcd特有的线性一致性读请求过程。

put 请求

通过 etcdctl put ty dj 命令,向本地监听2380端口的节点发送 写请求。
该请求为GRPC请求,会中转到quotaKVServer进行处理。

Leader接收请求,并广播消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
quotaKVServer.Put
|- s.qa.check(ctx,r) //检查是否满足quota,即是否有足够的内存,如果没有则忽略请求,并发报警
|- EtcdServer.Put // 调用EtcdServer进行写
|- s.raftRequest(ctx, pb.InternalRaftRequest{Put: r}) // EtcdServer将请求进行包装成 `InternalRaftRequest`
并调用`raftRequest`进行请求,并等待获取结果
|- s.processInternalRaftRequestOnce// 最终会调用此方法
|- if ci > ai+maxGapBetweenApplyAndCommitIndex //首先检查apply索引和 commit索引是否相差太大,若太大则忽略请求(自我保护机制)
|- 生成请求头,并生成请求ID `s.reqIDGen.Next()`
|- r.Marshal() // 对请求进行编码
|- ch := s.w.Register(id) // 注册请求响应回调
|- err = s.r.Propose(cctx, data) // 向 raftNode 发起提议,申请写入数据
|- select { // 等待结果,或超时
case x := <-ch:
return x.(*applyResult), nil
case timeout
}

接下来就进入了主要的处理流程:

1
2
3
4
5
node.Propose
|- n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}}) // 将请求包装成 `MsgProp`消息
|- n.stepWithWaitOption
|- pm := msgWithResult{m: m} // 将消息再一次封装
|- 将消息传递给 通道`n.propc`

可以看出 将请求两层封装之后传递给通道n.propc。
在node的循环中,其会监听proc通道,当有消息到来时:

阅读全文 »

ETCD网络通信

发表于 2019-11-22 | 分类于 etcd

2. 网络通信

在上一节,我们介绍了etcd集群的启动过程。在其中也简单介绍了会初始化与其他成员节点通信的连接,以及启动为其他成员节点提供服务的GRPC服务和Http服务以及启动服务客户端的服务。本章我们将详细介绍通信相关的实现。

2.1 成员节点间通信(peer)

前一章节介绍了peer服务端启动过程。下面,我们来讲解下调用端是如何初始化以及如何调用的。
与成员节点间通信传输的初始化之前介绍过,是调用Transport.AddPeer方法。
前面我们介绍了Transport的作用:向成员节点发送raft消息,并从成员节点获取raft消息。下面将详细描述其作用。
首先来看其初始化和启动过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
tr := &rafthttp.Transport{
DialTimeout: cfg.peerDialTimeout(),
ID: id,
URLs: cfg.PeerURLs,
ClusterID: cl.ID(),
Raft: srv, // EtcdServer
Snapshotter: ss,
ServerStats: sstats,
LeaderStats: lstats,
ErrorC: srv.errorc,
}
tr.Start()
|- t.streamRt = newStreamRoundTripper(t.TLSInfo, t.DialTimeout) // 其作用前面介绍过:可以执行事务性HTTP请求,给定请求获取结果
|- t.pipelineRt = NewRoundTripper(t.TLSInfo, t.DialTimeout) //
|- t.remotes = make(map[types.ID]*remote) // 初始化复制节点映射
|- t.peers = make(map[types.ID]Peer) // 初始化成员节点映射
|- t.pipelineProber = probing.NewProber(t.pipelineRt) // Prober的作用是探测,探测链路监控状态
|- t.streamProber = probing.NewProber(t.streamRt)

接下来,继续了解AddPeer实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Transport/AddPeer
|- startPeer
|- picker := newURLPicker(urls) // 其用于选取通信通道(负载均衡作用)
|- pipeline := &pipeline{...} // 创建pipeline,多通道发送消息。
|- pipeline.start() // 开启多个异步任务,监听 msgC通道,进行数据发送
|- p := &peer{...} // 创建peer结构体,初始化其属性,如recev、propc通道、消息发送器 msgAppV2Writer、writer(StreamWriter)
|- go select {
case mm := <-p.recvc: //监听 recev通道发来的消息,并将其丢给 EtcdServer.`Process`方法处理
if err := r.Process(ctx, mm)
}
|- go select {
case mm := <-p.propc: // 监听 propc 通道发来的消息,将其丢给...
if err := r.Process(ctx, mm);
}
|- 初始化 `p.msgAppV2Reader`、`p.msgAppReader` 用于读取消息
|- p.msgAppV2Reader.start() // 开启监听消息读取
|- p.msgAppReader.start()
|- addPeerToProber(t.Logger, t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rttSec) // 将该节点添加到探测器中,进行定期探测状态
|- addPeerToProber(t.Logger, t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rttSec)

从上面的流程可以看出 peer使用stream模式通信方式。读写分别用不同的协程去监听处理。其中streamWriter负责消息发送,streamReader负责消息接收。见下面详情:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
w := &streamWriter{
peerID: id,
status: status,
fs: fs,
r: r,
msgc: make(chan raftpb.Message, streamBufSize),
connc: make(chan *outgoingConn),
stopc: make(chan struct{}),
done: make(chan struct{}),
}
w.run:
|- select {
case <-heartbeatc:
//发送心跳消息
case m := <-msgc:
// 接收到需要发送的消息,进行编码并使用conn进行发送
case conn := <-cw.connc:
// 监听附加上来的连接通道,进行初始化 编码器、Flusher等
}

streamWriter里比较关键的一点是,这里的conn从哪来?往cw.connc传递conn的只有peer.attachOutgoingConn方法。
而在上面初始化并启动peer的时候是没有发现调用这个方法的 ???哪是哪里进行执行的呢?通过 追踪peer.attachOutgoingConn的调用方,最终发现,其在rafthhtp/http.go/ServeHTTP() L483处调用。而这个方法会在客户端建立连接进行http服务调用的时候执行。所以,在这里调用 peer.attachOutgoingConn 的作用是复用连接。当我们分析完读处理过程后,再来整体看 etcd节点是怎么建立通信链路的。

阅读全文 »

ETCD数据存储

发表于 2019-11-21 | 分类于 etcd

3. 数据存储

Etcd的存储部分,可以分两部分来讲解。一部分是其应用层的数据存储方式,另一部分是raft相关数据的存储。Etcd应用层的数据存储从v3版本开始就延用boltDB,其也是CoreOS的产品boltDB。PS:本文主要聚焦于v3版本,对于v2版本不作解读。

下面将分别介绍这两部分内容:

3.1 raft数据存储

首先我们来介绍下raft相关的数据存储:
raft中有两个比较重要的组件:

  • raftLog:用来保存状态机相关信息的,包括当前任期、索引号、不稳定记录项等;
  • WAL:预写日志器,用于以顺序形式写入操作记录,以便故障时数据恢复;
  • Snapshot:数据快照,一般用于启动时快速恢复数据。

首先来看raftLog:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
type raftLog struct {
// 包含所有稳定的记录 MemeoryStorge
storage Storage
// 包含所有不稳定的记录
unstable unstable
// 提交记录索引
committed uint64
// 应用记录索引
applied uint64
}
type MemoryStorage struct {
hardState pb.HardState
// 快照信息(保存 v2版存储中的保存的数据快照、任期、索引、ConfState[集群节点信息])。快照生成一般有个条件:距上次提交的次数大于`SnapshotCount`(默认10000)
snapshot pb.Snapshot
ents []pb.Entry
}
// 从名字也可以看出其用途
type unstable struct {
// 快照信息,这里只会在节点加入集群,主节点向其发送`MsgSnap`消息的时候才会有
snapshot *pb.Snapshot
entries []pb.Entry
offset uint64
}

下图描述了数据从客户端请求到落地各个阶段与以前存储结构的关系:

其中,8’、9、11 是涉及 I/O 的操作,其他均为内存操作。
对WAL的操作在每次写事务操作中都会存在,因此其是制约etcd写性能的一个重要因素。接下来,将重点介绍WAL的工作原理。

阅读全文 »
prev1234…12next
Zamperini

Zamperini

GitHub E-Mail Weibo
© 2020 Zamperini
由 Hexo 强力驱动
|
主题 — NexT.Gemini v6.0.3