ETCD数据存储

3. 数据存储

Etcd的存储部分,可以分两部分来讲解。一部分是其应用层的数据存储方式,另一部分是raft相关数据的存储。Etcd应用层的数据存储从v3版本开始就延用boltDB,其也是CoreOS的产品boltDB。PS:本文主要聚焦于v3版本,对于v2版本不作解读。

下面将分别介绍这两部分内容:

3.1 raft数据存储

首先我们来介绍下raft相关的数据存储:
raft中有两个比较重要的组件:

  • raftLog:用来保存状态机相关信息的,包括当前任期、索引号、不稳定记录项等;
  • WAL:预写日志器,用于以顺序形式写入操作记录,以便故障时数据恢复;
  • Snapshot:数据快照,一般用于启动时快速恢复数据。

首先来看raftLog

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
type raftLog struct {
// 包含所有稳定的记录 MemeoryStorge
storage Storage
// 包含所有不稳定的记录
unstable unstable
// 提交记录索引
committed uint64
// 应用记录索引
applied uint64
}
type MemoryStorage struct {
hardState pb.HardState
// 快照信息(保存 v2版存储中的保存的数据快照、任期、索引、ConfState[集群节点信息])。快照生成一般有个条件:距上次提交的次数大于`SnapshotCount`(默认10000)
snapshot pb.Snapshot
ents []pb.Entry
}
// 从名字也可以看出其用途
type unstable struct {
// 快照信息,这里只会在节点加入集群,主节点向其发送`MsgSnap`消息的时候才会有
snapshot *pb.Snapshot
entries []pb.Entry
offset uint64
}

下图描述了数据从客户端请求到落地各个阶段与以前存储结构的关系:

其中,8’、9、11 是涉及 I/O 的操作,其他均为内存操作。
WAL的操作在每次写事务操作中都会存在,因此其是制约etcd写性能的一个重要因素。接下来,将重点介绍WAL的工作原理。

1
2
3
4
5
6
7
8
9
10
11
12
type WAL struct {
dir string // WAL文件所在目录
dirFile *os.File // 目录文件句柄
metadata []byte // 元数据,记录在每个wal文件头
state raftpb.HardState // 硬状态(任期、索引号),记录在每个文件头
start walpb.Snapshot // snapshot to start reading
decoder *decoder // 解码器
enti uint64 // 保存到WAL的最大索引号
encoder *encoder // 编码器
locks []*fileutil.LockedFile // 文件锁
fp *filePipeline // 文件创建工具,预先创建文件
}

首先,来看其创建过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
wal.Create
|- tmpdirpath := filepath.Clean(dirpath) + ".tmp"
|- os.RemoveAll(tmpdirpath)
|- fileutil.CreateDirAll(tmpdirpath)
|- p := filepath.Join(tmpdirpath, walName(0, 0))
|- f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode)
|- f.Seek(0, io.SeekEnd)
|- fileutil.Preallocate(f.File, SegmentSizeBytes, true) // 预分配空间
|- w := &WAL{
lg: lg,
dir: dirpath,
metadata: metadata,
}
|- w.encoder, err = newFileEncoder(f.File, 0) // 编码器,编码器编码的同时会将结果写到文件中
|- w.locks = append(w.locks, f)
|- w.saveCrc(0) w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata})// 写文件头
return nil, err
}
|- w.SaveSnapshot(walpb.Snapshot{})
|- w.renameWAL(tmpdirpath) // 更改零时文件名称为waldir,创建FilePipeline(用于预先创建文件)
|-pdir, perr := fileutil.OpenDir(filepath.Dir(w.dir))
|- fileutil.Fsync(pdir)

创建WAL时,会初始化编码器以及FilePipeline。下面再以其Save方法来介绍保存记录的过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
WAL.Save
|- mustSync := raft.MustSync(st, w.state, len(ents)) // 判断是否需要文件同步,写入磁盘
|- for each ents: saveEntry
|- b := pbutil.MustMarshal(e) rec := &walpb.Record{Type: entryType, Data: b}
|- w.encoder.encode(rec) // 编码后写入文件
|- w.saveState(&st) // 保存状态
|- curOff, err := w.tail().Seek(0, io.SeekCurrent)
|- if curOff < SegmentSizeBytes & mustSync w.sync() 如果文件还没大于SegmentSizeBytes,且需要同步,则进行文件同步
|- 如果大于,则进行切割文件 `w.cut`
|- fpath := filepath.Join(w.dir, walName(w.seq()+1, w.enti+1))
|- newTail, err := w.fp.Open() // 通过filePipeline获取一个新的零时文件
|- 进行初始化 w.encoder, err = newFileEncoder(w.tail().File, prevCrc) ...
|- os.Rename(newTail.Name(), fpath) // 重命名
|- w.sync()

随着记录的增加,wal文件会越来越多,入股不做处理的话会导致磁盘被占满。那么etcd是怎么做的呢?
其实是由两步构成的:

  1. etcd每次进行执行快照的实时,会进行wal.ReleaseLockTo(snap.Metadata.Index)释放文件锁的操作。(释放快照对应索引号之前的所有WAL文件句柄)
  2. 之前在EtcdServer启动章节介绍过,其启动后会启动一个定时任务purgeFile。其会针对snap.dbsnapwal文件做30秒一次的fileutil.PurgeFile任务:
    1. 任务带有参数 MaxWalFiles,获取指定wal.dir下所有文件,然后按文件名排序,从小到大进行遍历:尝试锁文件。如果成功,则进行删除,否则的话说明依然被etcd锁占用。

3.2 应用数据存储

在解析etcd应用层数据存储结构前,先来介绍下etcd的数据存储形式。etcd对数据的存储并不是直接存储key-value对,而是引入了一种带版本号revision的存储方式:以数据的revisonkey,键值对为值。
revision由两部分组成:main-revision.sub-revisionmain-revision为事务ID,sub-revision为事务中一次操作ID。

举例来说:
系统刚启动后,在一个事务中执行put ty dj \n put dj ty两个操作,实际存储的是

  1. {1,0} key=ty val=dj
  2. {1,1} key=dj val=ty

紧接着执行第二次操作:put ty dj90 \n put dj ty92,那么存储中会追加如下信息:

  1. {2,0} key=ty val=dj90
  2. {2,1} key=dj val=ty92

而为了支持这种存储形式快速查询,etcd建立了treeIndex结构,用于建立keyrevision间的关系。随之,通过key查询val的过程如下:

treeIndex是一个b-ree,其存储这keyIndex信息。KeyIndex的结构如下:

1
2
3
4
5
6
key         []byte
modified revision // 最后一次更改版本信息
generations []generation // 代:每一代记录着键值对从创建到删除的过程
|- ver int64 // 存放了多少次修改
created revision // 创建此generation的第一个版本
revs []revision

keyIndex中,需要特别说明的是generation数据内部,保存的revs,如果最后一项为tombstone,则表示在这个代中被删除了。被tombstonegeneration是可以被删除的。针对此,keyIndex有个专门的函数compactcompact(n)可以将主版本小于n的数据。

将完了其存储结构和存储格式,下面将从启动和执行一次操作两个流程来讲解其的工作原理。对boltDB不了解的读者建议先去了解下 boltDBboltDB学习

3.2.1 启动过程

etcd应用层存储创建过程如下:
首先创建backend,其是对boltDB的封装,加入一些批量提交逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
bepath := cfg.backendPath()
beExist := fileutil.Exist(bepath)
be := openBackend(cfg) // 创建
|- newBackend
|- bolt.Open(bcfg.Path, 0600, bopts)
|- b := &backend{
db: db,
batchInterval: bcfg.BatchInterval,
batchLimit: bcfg.BatchLimit ...
}
|- b.run() // 定时任务,批量周期进行提交
|- t := time.NewTimer(b.batchInterval)
|- select {
case <-t.C:
b.batchTx.CommitAndStop()
return
}
if b.batchTx.safePending() != 0 {
b.batchTx.Commit()
}

有了backend后,会再基于此作一层封装:mvcc.New(srv.getLogger(), srv.be, srv.lessor, &srv.consistIndex),其内部包含watcher处理机制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
mvcc.New
|- s := &watchableStore{
store: NewStore(lg, b, le, ig),
victimc: make(chan struct{}, 1),
unsynced: newWatcherGroup(),
synced: newWatcherGroup(),
}
|- s.store.ReadView = &readView{s}
s.store.WriteView = &writeView{s}
if s.le != nil {
s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() })
}
|- go s.syncWatchersLoop() // 用于watch机制的异步任务
|- go s.syncVictimsLoop()

这里我们比较关注的是NewStore逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
mvcc.NewStore
|- s := &store{
b: b,
kvindex: newTreeIndex(lg), // 创建 treeIndex
le: le,
currentRev: 1, // 最近一次事务的版次
compactMainRev: -1,//最近一次事务的主版次
bytesBuf8: make([]byte, 8),
fifoSched: schedule.NewFIFOScheduler(),
stopc: make(chan struct{}),
}
|- s.ReadView = &readView{s}
s.WriteView = &writeView{s}
if s.le != nil {
s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() })
}
|- tx.UnsafeCreateBucket(keyBucketName) // 创建bucket-"key"用来存储kv数据,
|- tx.UnsafeCreateBucket(metaBucketName) //创建Bucket-”meta“用来存储元数据
|- s.restore() // 恢复存储
// TreeIndex 是b树
func newTreeIndex(lg *zap.Logger) index {
return &treeIndex{
tree: btree.New(32),
lg: lg,
}
}
s.restore
|- rkvc, revc := restoreIntoIndex(s.lg, s.kvindex)// 构建tree-index方法,返回两个通道,用于向内传入数据
|- for {
// 分页提取key-val
keys, vals := tx.UnsafeRange(keyBucketName, min, max, int64(restoreChunkKeys))
if len(keys) == 0 {
break
}
// 将key-val进行解码,然后传递给`rkvc`通道,在restoreIntoIndex中监听`rkvc`通道,进行构建`treeIndex`
restoreChunk(s.lg, rkvc, keys, vals, keyToLease)
if len(keys) < restoreChunkKeys {
break
}
newMin := bytesToRev(keys[len(keys)-1][:revBytesLen])
newMin.sub++
revToBytes(newMin, min)
}
|- ...

由此就完成了treeIndexboltDB的初始化。
最后,etcd又对mvcc.watchableStore进行了一次封装srv.newApplierV3Backend(),其用于衔接存储和raft消息请求。

3.2.2 请求应用到存储

put ty dj请求通过raft协议提交决策后,最终会调用到applierV3backend.put方法进行应用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
applierV3backend.put
|- txn = a.s.KV().Write()
|- txn.Put(p.Key, val, leaseID)
|- storeTxnWrite.put
|- rev := tw.beginRev + 1
c := rev
|- _, created, ver, err := tw.s.kvindex.Get(key, rev) // 如果key存在则获取
if err == nil {
c = created.main
oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)})
}
|- ibytes := newRevBytes()
|- idxRev := revision{main: rev, sub: int64(len(tw.changes))}
|- revToBytes(idxRev, ibytes)
|- ver = ver + 1
|- kv := mvccpb.KeyValue{
Key: key,
Value: value,
CreateRevision: c,
ModRevision: rev,
Version: ver,
Lease: int64(leaseID),
}
|- d, err := kv.Marshal()
|- tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d) // 存储到boltDB中,key为`revision`
|- tw.s.kvindex.Put(key, idxRev) // 将key到 revision的映射存储到treeIndex中

到此就完成了存储模块的讲解。

您的支持是我创作源源不断的动力