TTL&Lease实现

5. TTL & Lease 实现

etcd中用来实现TTL的机制叫LeaseLease可以用来绑定多个key。Lease的主要用法如下:

1
2
3
4
5
6
7
8
9
10
1. etcdctl lease grant 1900 // 申请一个租约,返回租约ID
- lease 326969472b0c5d05 granted with TTL(1900s)
2. etcdctl lease revoke 326969472b0c5d05 // 取消租约
- lease 326969472b0c5d05 revoked
3. etcdctl lease timetolive 326969472b0c5d08 // 查询剩余多长时间
- lease 326969472b0c5d08 granted with TTL(190s), remaining(182s)
4. etcdctl lease keep-alive 326969472b0c5d05 // 到期续约
- lease 326969472b0c5d05 keepalived with TTL(1900)
5. etcdctl lease keep-alive --once=true 326969472b0c5d0c // 单次续约
- lease 326969472b0c5d0c keepalived with TTL(200)

下面将解析其工作原理,首先将介绍其初始化过程,然后介绍创建、以及绑定key以及过期的操作。

5.1 初始化

Lessor是在创建EtcdServer是进行初始化的:lease.NewLessor(...)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat // 首先计算最小TTL单元,为选举跳数的1.5倍*心跳间隔时间
srv.lessor = lease.NewLessor(log, srv.be, lease.LessorConfig{MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())), CheckpointInterval: cfg.LeaseCheckpointInterval}) // 创建`Lessor`,传入最小租约TTL时间,和租约周期性检测时间间隔。租约的TTL时间最小不能小于最小租约时间(秒级别)
|- if checkpointInterval == 0 { // 设置默认时间为5分钟
checkpointInterval = 5 * time.Minute
}
|- l := &lessor{
leaseMap: make(map[LeaseID]*Lease),
itemMap: make(map[LeaseItem]LeaseID),
leaseHeap: make(LeaseQueue, 0), // 根据租约到期时间排序的小根堆
leaseCheckpointHeap: make(LeaseQueue, 0),
b: b,
minLeaseTTL: cfg.MinLeaseTTL,
checkpointInterval: checkpointInterval,
expiredC: make(chan []*Lease, 16),
stopC: make(chan struct{}),
doneC: make(chan struct{}),
}
|- l.initAndRecover() // 初始化并恢复
|- vs := tx.UnsafeRange(leaseBucketName...) // 从`boltDB`中,查询`lease` bucket中所有数组
|- 对于每个记录,解码`lpb.Unmarshal(vs[i])`,然后插进`le.leaseMap`
|- le.leaseMap[ID] = &Lease{
ID: ID,
ttl: lpb.TTL,
itemSet: make(map[LeaseItem]struct{}),
expiry: forever,
revokec: make(chan struct{}),
}
|- heap.Init(&le.leaseHeap) //初始化堆
|- heap.Init(&le.leaseCheckpointHeap)
|- go l.runLoop() // 运行500ms一次的任务,撤销过期的租约,周期性调度`Lease`检测。后续将详细分析其执行过程

完成了Lessor的初始化后,随着进入kvstore.restore的初始化,会伴随着恢复keylease的映射关系。

1
2
3
4
5
6
当从 bucket`key`中获取所有key-val后,执行`restoreChunk`方法
restoreChunk
|- 其会检测kv是否带`Lease`,若有则加入到`keyToLease`map
|- 对`keyToLease`map中的每个entry
|- s.le.Attach(lid, []lease.LeaseItem{{Key: key}})
|- 将`key`绑定到`lease`中,并建立`key``lease`的映射

5.2 Lease创建

创建Lease,最终会调用EtcdServer.LeaseGrant方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
EtcdServer.LeaseGrant
|- r.ID = int64(s.reqIDGen.Next() & ((1 << 63) - 1)) // 首先创建 leaseId
|- s.raftRequestOnce 发起提议
|- 当集群确认提交此提议后,会进行应用。调用`lessor.Grant`
|- l := &Lease{ // 创建 Lease
ID: id,
ttl: ttl,
itemSet: make(map[LeaseItem]struct{}),
revokec: make(chan struct{}),
}
|- if l.ttl < le.minLeaseTTL { // 保证租约ttl不小于`minLeaseTTL`
l.ttl = le.minLeaseTTL
}
|- l.refresh(0) // 设置 `Lease`的 `expiry`时间
|- le.leaseMap[id] = l // 放入 `leaseId`与`lease`的映射
|- item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()} // 过期时间戳、leaseId item
|- heap.Push(&le.leaseHeap, item) // 存入堆中
|- l.persistTo(le.b) // 将lease信息存储 boltDB中
|- le.scheduleCheckpointIfNeeded(l)
|- if lease.RemainingTTL() > int64(le.checkpointInterval.Seconds()) { // 若`lease`的剩余时间比`检测点间隔`大时,则存进`le.leaseCheckpointHeap`中
|- heap.Push(&le.leaseCheckpointHeap, &LeaseWithTime{
id: lease.ID,
time: time.Now().Add(le.checkpointInterval).UnixNano(),
})

5.3 Lease绑定key

put操作时通过添加参数--lease=326969472b0c5d08即将key绑定到lease上。直接定位storeTxnWrite.put方法:

1
2
3
4
5
6
7
8
9
10
11
oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)}) // 获取key之前绑定的`lease`
if oldLease != lease.NoLease {
err = tw.s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}}) // 剔除`key`与老`lease`的映射关系
}
if leaseID != lease.NoLease {
err = tw.s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}})
|- for _, it := range items {
l.itemSet[it] = struct{}{}
le.itemMap[it] = id
}
}

5.4 Lease过期

上文中介绍过在初始化Lease后,会启动lessor.runLoop任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
for {
// 注销过期的租约
le.revokeExpiredLeases()
// 检测调度的租约
le.checkpointScheduledLeases()
select {
// 500ms 一次
case <-time.After(500 * time.Millisecond):
case <-le.stopC:
return
}
}
le.revokeExpiredLeases
|- revokeLimit := leaseRevokeRate / 2 // 限流
|- ls = le.findExpiredLeases(revokeLimit) // 查询出过期的租约
|- select {
case le.expiredC <- ls:
}
|- `EtcdServer` 会监听`lessor.expiredC`通道:
|- 异步串行执行:`s.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: int64(lid)})`
|- 提交Revoke提议,最后进行应用:
|- lessor.Revoke
|- keys := l.Keys()
sort.StringSlice(keys).Sort()
for _, key := range keys { // 删除key
txn.DeleteRange([]byte(key), nil)
}
|- delete(le.leaseMap, l.ID)
|- le.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID))) // 从`lease`bucket中删除租约
您的支持是我创作源源不断的动力