0%

ETCD源码解析-第三篇

ETCD 多版本控制

多版本控制
MVCC(Multi-Version Concurrency Control)机制

和mysql一样,etcd 同样采用mvcc机制处理读写冲突和事务隔离级别问题。

MVCC 模块主要由 BoltDB 和 treeIndex 两部分组成

BoltDB 是一个基于 B+ 树的 KV 存储数据库。TreeIndex 模块基于内存版 BTree 实现键的索引管理,它是基于 Google 开源项目 Btree 实现的一个索引模块,保存了每一个 key 与对应的版本号(Revision)的映射关系等信息。

etcd 存储以数据的 Revision 作为 key。键值、创建时的版本号、最后修改的版本号等作为 value 保存到数据库。etcd 对于每一个键值对都维护了一个全局的 Revision 版本号,键值对的每一次变化都会被记录。获取某一个 key 对应的值时,需要先获取该 key 对应的 Revision,再通过它找到对应的值。

MVCC 写过程解析

首先我们结合之前写请求实现流程图的内容分析 MVCC 写请求的过程:

写请求实现流程图

上图为写请求的过程,写请求在底层统一调用 put 方法。treeIndex 中根据查询的 key 从 B-tree 查找得到的是一个 keyIndex 对象,里面包含了 Revision 等全局版本号信息。

keyIndex 结构体定义如下所示:

// 位于 mvcc/key_index.go:70

type keyIndex struct {

key []byte // key 名称

modified revision // 最后一次修改的 etcd 版本号

generations []generation // 保存了 key 多次修改的版本号信息

}

keyIndex 中保存了 key、modified 和 generations。

其中 generations 的结构体定义如下:

// 位于 mvcc/key_index.go:335

type generation struct {

ver int64

created revision // generation 创建时的版本

revs []revision

}

generation 中的 ver 表示当前 generation 包含的修改次数,created 记录创建 generation 时的 Revision 版本,最后的 revs 用于存储所有的版本信息。

Revision 结构体的定义如下:

// 位于 mvcc/revision.go:26

type revision struct {

// 事务发生时自动生成的主版本号

main int64

// 事务内的子版本号

sub int64

}

Revision 中定义了一个全局递增的主版本号main,发生 put、txn、del 操作会递增,一个事务内的 main 版本号是唯一的;事务内的子版本号定义为sub,事务发生 put 和 del 操作时,从 0 开始递增。

keyIndex、generation 和 revision 之间的关系

由于是第一次写,treeIndex 查询为空。etcd 会根据当前的全局版本号加 1(集群初始化从 1 开始),根据执行的结果,我们这里全局版本号在写之前为 2,自增之后变成 3。因此操作对应的版本号 revision {3,0},对应写入 BoltDB 的 key。写入的 value 对应 mvccpb.KeyValue 结构体,其由 key、value、create_revision、mod_revision、version、lease 等字段组成,定义如下所示:

type KeyValue struct {

// 键

Key []byte protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"

// 创建时的版本号

CreateRevision int64 protobuf:"varint,2,opt,name=create_revision,json=createRevision,proto3" json:"create_revision,omitempty"

// 最后一次修改的版本号

ModRevision int64 protobuf:"varint,3,opt,name=mod_revision,json=modRevision,proto3" json:"mod_revision,omitempty"

// 表示 key 的修改次数,删除 key 会重置为 0,key 的更新会导致 version 增加

Version int64 protobuf:"varint,4,opt,name=version,proto3" json:"version,omitempty"

// 值

Value []byte protobuf:"bytes,5,opt,name=value,proto3" json:"value,omitempty"

// 键值对绑定的租约 LeaseId,0 表示未绑定

Lease int64 protobuf:"varint,6,opt,name=lease,proto3" json:"lease,omitempty"

}

构造好 key 和 value 之后,就可以写入 BoltDB 了。并同步更新 buffer。

此外还需将本次修改的版本号与用户 key 的映射关系保存到 treeIndex 模块中,key hello 的 keyIndex。对照着上面介绍的 keyIndex、generation 和 Revision 结构体的定义,写入的 keyIndex 记录如下所示:

key: “hello”

modified: <3,0>

generations:

[{ver:1,created:<3,0>,revs: [<3,0>]} ]

modified 为最后一次修改的 etcd 版本号,这里是 <3,0>。generations 数组有一个元素,首次创建 ver 为 1,created 创建时的版本为 <3,0>,revs 数组中也只有一个元素,存储了所有的版本信息。

至此,put 事务基本结束,之所以说是基本完场,是因为还差最后一步——写入的数据持久化到磁盘。数据持久化的操作由 Backend 的协程来完成,以此提高写的性能和吞吐量。协程通过事务批量提交,将 BoltDB 内存中的数据持久化存储磁盘中。

这里我们要提一下键值对的删除。与更新一样,键值对的删除也是异步完成,每当一个 key 被删除时都会调用 tombstone 方法向当前的 generation 中追加一个空的 generation 对象,其实现如下所示:

// 位于 mvcc/key_index.go:119

func (ki *keyIndex) tombstone(lg *zap.Logger, main int64, sub int64) error {

if ki.isEmpty() {

lg.Panic(

“‘tombstone’ got an unexpected empty keyIndex”,

zap.String(“key”, string(ki.key)),

)

}

if ki.generations[len(ki.generations)-1].isEmpty() {

return ErrRevisionNotFound

}

ki.put(lg, main, sub)

ki.generations = append(ki.generations, generation{})

keysGauge.Dec()

return nil

}

这个空的 generation 标识说明当前的 key 已经被删除了。除此之外,生成的 BoltDB key 版本号中追加了 t(tombstone),如 <3,0,t>,用于标识删除,而对应的 value 变成了只含 key 属性。

当查询键值对时,treeIndex 模块查找到 key 对应的 keyIndex,若查询的版本号大于等于被删除时的版本号,则会返回空。而真正删除 treeIndex 中的索引对象以及 BoltDB 中的键值对,则由compactor 组件完成。

MVCC 读过程解析

我们继续来看读过程中的 MVCC 实现细节。还是使用讲解键值对查询时的流程图:

读请求实现流程图

读请求在底层统一调用的是 Range 方法,首先 treeIndex 根据查询的 key 从 BTree 查找对应 keyIndex 对象。从 keyIndex 结构体的定义可知,每一个 keyIndex 结构体中都包含当前键的值以及最后一次修改对应的 Revision 信息,其中还保存了一个 key 的多个 generation,每一个 generation 都会存储当前 key 的所有历史版本。

treeIndex 模块中提供了 Get 接口获取一个 key 对应 Revision 值:

// 位于 mvcc/index.go:68

func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) {

keyi := &keyIndex{key: key}

if keyi = ti.keyIndex(keyi); keyi == nil {

return revision{}, revision{}, 0, ErrRevisionNotFound

}

return keyi.get(ti.lg, atRev)

}

Get 接口的实现通过 keyIndex 函数查找 key 对应的 keyIndex 结构体:

// 位于 mvcc/index.go:78

func (ti *treeIndex) keyIndex(keyi *keyIndex) *keyIndex {

if item := ti.tree.Get(keyi); item != nil {

return item.(*keyIndex)

}

return nil

}

可以看到这里的实现非常简单,从 treeIndex 成员 BTree 中查找 keyIndex,将结果转换成 keyIndex 类型后返回;获取 key 对应 Revision 的实现如下:

// 位于 mvcc/key_index.go:137

func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) {

if ki.isEmpty() {

lg.Panic(

“‘get’ got an unexpected empty keyIndex”,

zap.String(“key”, string(ki.key)),

)

}

g := ki.findGeneration(atRev)

if g.isEmpty() {

return revision{}, revision{}, 0, ErrRevisionNotFound

}

n := g.walk(func(rev revision) bool { return rev.main > atRev })

if n != -1 {

return g.revs[n], g.created, g.ver - int64(len(g.revs)-n-1), nil

}

return revision{}, revision{}, 0, ErrRevisionNotFound

}

上述实现中,通过遍历 generations 数组来获取 generation,匹配到有效的 generation 后,返回 generation 的 revisions 数组中最后一个版本号,即 <3,0> 给读事务。

获取到 Revision 信息之后,读事务接口优先从 buffer 中查询,如果命中则直接返回,否则根据 revision <3,0> 作为 key 在 BoltDB 中查询。

在查询时如果没有指定版本号,默认读取最新的数据。如果指定了版本号,比如我们在上面发起了一个指定历史版本号为 3 的读请求:

$ etcdctl get hello –rev=3

在 treeIndex 模块获取 key 对应的 keyIndex 时,指定了读版本号为 3 的快照数据。keyIndex 会遍历 generation 内的历史版本号,返回小于等于 3 的最大历史版本号作为 BoltDB 的 key,从中查询对应的 value。

需要注意的是,并发读写事务不会阻塞在一个 buffer 资源锁上。并发读创建事务时,会全量拷贝当前未提交的 buffer 数据,以此实现并发读。

Watch机制

etcd 的 MVCC 模块对外提供了两种访问键值对的实现方式,一种是键值存储 kvstore,另一种是 watchableStore,它们都实现了 KV 接口。

clientv3 中很简洁地封装了 watch 客户端与服务端交互的细节,客户端使用的代码如下:

func testWatch() {

ch := clientv3.Watch(context.TODO(), key)
if ch == nil {
    t.Errorf(“expected watcher channel, got nil”)
}

for i := 0; i < numKeyUpdates; i++ {
      resp, ok := <-ch
      if !ok {
          t.Errorf(“watcher unexpectedly closed”)
      }
     v := fmt.Sprintf(“%s-%d”, key, i)
     gotv := string(resp.Events[0].Kv.Value)
     if gotv != v {
        t.Errorf(“#%d: got %s, wanted %s”, i, gotv, v)
     }
}

}

在上述实现中,我们调用了 watchableStore。为了实现 watch 监测,我们创建了一个 watchStream,watchStream 监听的 key 为 hello,之后我们就可以消费w.Chan()返回的 channel。key 为 hello 的任何变化,都会通过这个 channel 发送给客户端。

实现原理

server 基于 watchableStore 实现 watch 功能

在上述实现中,我们调用了 watchableStore。为了实现 watch 监测,我们创建了一个 watchStream,watchStream 监听的 key 为 hello,之后我们就可以消费w.Chan()返回的 channel。key 为 hello 的任何变化,都会通过这个 channel 发送给客户端。

结合这张图,我们可以看到:watchStream 实现了在大量 KV 的变化事件中,过滤出当前所指定监听的 key,并将键值对的变更事件输出。

watchableStore 存储

watchableStore 负责了注册、管理以及触发 Watcher 的功能。我们先来看一下这个结构体的各个字段:

// 位于 mvcc/watchable_store.go:47

type watchableStore struct {

*store

// 同步读写锁

mu sync.RWMutex

// 被阻塞在 watch channel 中的 watcherBatch

victims []watcherBatch

victimc chan struct{}

// 未同步的 watchers

unsynced watcherGroup

// 已同步的 watchers

synced watcherGroup

stopc chan struct{}

wg sync.WaitGroup

}

watchableStore 组合了 store 结构体的字段和方法,除此之外,还有两个 watcherGroup 类型的字段,watcherGroup 管理多个 watcher,并能够根据 key 快速找到监听该 key 的一个或多个 watcher。

unsynced 表示 watcher 监听的数据还未同步完成。当创建的 watcher 指定的版本号小于 etcd server 最新的版本号时,会将 watcher 保存到 unsynced watcherGroup。

synced 表示 watcher 监听的数据都已经同步完毕,在等待新的变更。如果创建的 watcher 未指定版本号或指定的版本号大于当前最新的版本号,它将会保存到 synced watcherGroup 中。

根据 watchableStore 的定义,我们可以结合下图描述前文示例 watch 监听的过程。

watch 监听流程

watchableStore 收到了所有 key 的变更后,将这些 key 交给 synced(watchGroup),synced 使用了 map 和 ADT(红黑树),能够快速地从所有 key 中找到监听的 key,将这些 key 发送给对应的 watcher,这些 watcher 再通过 chan 将变更信息发送出去。

在查找监听 key 对应的事件时,如果只监听一个 key:

watch(start_key: foo, end_key: nil)

则对应的存储为map[key]*watcher。这样可以根据 key 快速找到对应的 watcher。

watch 可以监听一组范围的 key:

watch(start_key: hello1, end_key: hello3)

上面的代码监听了从 hello1→hello3 之间的所有 key,这些 key 的数量不固定,比如:key=hello11 也处于监听范围。这种情况就无法再使用 map 了,因此 etcd 用 ADT 结构来存储一个范围内的 key。

watcherGroup 是由一系列范围 watcher 组织起来的 watchers。在找到对应的 watcher 后,调用 watcher 的 send() 方法,将变更的事件发送出去。

syncWatchers 同步监听

在初始化一个新的 watchableStore 时,etcd 会创建一个用于同步 watcherGroup 的 goroutine,会在 syncWatchersLoop 函数中每隔 100ms 调用一次 syncWatchers 方法,将所有未通知的事件通知给所有的监听者:

// 位于 mvcc/watchable_store.go:334

func (s *watchableStore) syncWatchers() int {

//…

// 为了从 unsynced watchers 中找到未同步的键值对,我们需要查询最小的版本号,利用最小的版本号查询 backend 存储中的键值对

curRev := s.store.currentRev

compactionRev := s.store.compactMainRev

wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)

minBytes, maxBytes := newRevBytes(), newRevBytes()

// UnsafeRange 方法返回了键值对。在 boltdb 中存储的 key 都是版本号,而 value 为在 backend 中存储的键值对

tx := s.store.b.ReadTx()

tx.RLock()

revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)

var evs []mvccpb.Event

// 转换成事件

evs = kvsToEvents(s.store.lg, wg, revs, vs)

var victims watcherBatch

wb := newWatcherBatch(wg, evs)

for w := range wg.watchers {

w.minRev = curRev + 1

//…

if eb.moreRev != 0 {

w.minRev = eb.moreRev

}

// 通过 send 将事件和 watcherGroup 发送到每一个 watcher 对应的 channel 中

if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev}) {

pendingEventsGauge.Add(float64(len(eb.evs)))

} else {

// 异常情况处理

if victims == nil {

victims = make(watcherBatch)

}

w.victim = true

}

//…

s.unsynced.delete(w)

}

//…

}

简化后的 syncWatchers 方法中有三个核心步骤,首先是根据当前的版本从未同步的 watcherGroup 中选出一些待处理的任务,然后从 BoltDB 中获取当前版本范围内的数据变更,并将它们转换成事件,事件和 watcherGroup 在打包之后会通过 send 方法发送到每一个 watcher 对应的 channel 中。

syncWatchers 方法调用流程图

客户端监听事件

客户端监听键值对时,调用的正是Watch方法,Watch在 stream 中创建一个新的 watcher,并返回对应的 WatchID。

// 位于 mvcc/watcher.go:108

func (ws *watchStream) Watch(id WatchID, key, end []byte, startRev int64, fcs …FilterFunc) (WatchID, error) {

// 防止出现 key >= end 的错误 range

if len(end) != 0 && bytes.Compare(key, end) != -1 {

return -1, ErrEmptyWatcherRange

}

ws.mu.Lock()

defer ws.mu.Unlock()

if ws.closed {

return -1, ErrEmptyWatcherRange

}

if id == AutoWatchID {

for ws.watchers[ws.nextID] != nil {

ws.nextID++

}

id = ws.nextID

ws.nextID++

} else if _, ok := ws.watchers[id]; ok {

return -1, ErrWatcherDuplicateID

}

w, c := ws.watchable.watch(key, end, startRev, id, ws.ch, fcs…)

ws.cancels[id] = c

ws.watchers[id] = w

return id, nil

}

AutoWatchID 是 WatchStream 中传递的观察者 ID。当用户没有提供可用的 ID 时,如果又传递该值,etcd 将自动分配一个 ID。如果传递的 ID 已经存在,则会返回 ErrWatcherDuplicateID 错误。watchable_store.go 中的 watch 实现是监听的具体实现,实现代码如下:

// 位于 mvcc/watchable_store.go:120

func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs …FilterFunc) (*watcher, cancelFunc) {

// 构建 watcher

wa := &watcher{

key: key,

end: end,

minRev: startRev,

id: id,

ch: ch,

fcs: fcs,

}

s.mu.Lock()

s.revMu.RLock()

synced := startRev > s.store.currentRev || startRev == 0

if synced {

wa.minRev = s.store.currentRev + 1

if startRev > wa.minRev {

wa.minRev = startRev

}

}

if synced {

s.synced.add(wa)

} else {

slowWatcherGauge.Inc()

s.unsynced.add(wa)

}

s.revMu.RUnlock()

s.mu.Unlock()

// prometheus 的指标增加

watcherGauge.Inc()

return wa, func() { s.cancelWatcher(wa) }

}

对 watchableStore 进行操作之前,需要加锁。如果 etcd 收到客户端的 watch 请求中携带了 revision 参数,则比较请求的 revision 和 store 当前的 revision,如果大于当前 revision,则放入 synced 组中,否则放入 unsynced 组。

服务端处理监听

当 etcd 服务启动时,会在服务端运行一个用于处理监听事件的 watchServer gRPC 服务,客户端的 watch 请求最终都会被转发到 Watch 函数处理:

// 位于 etcdserver/api/v3rpc/watch.go:140

func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {

sws := serverWatchStream{

lg: ws.lg,

clusterID: ws.clusterID,
memberID: ws.memberID,

maxRequestBytes: ws.maxRequestBytes,

sg: ws.sg,
watchable: ws.watchable,// 共用一个 watchable
ag: ws.ag,

gRPCStream: stream, // 每个单独的grpc stream
watchStream: ws.watchable.NewWatchStream(), // 共用一个 watchStream,由watchable 产生(结合上文理解)
// chan for sending control response like watcher created and canceled.
ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen),

progress: make(map[mvcc.WatchID]bool),
prevKV: make(map[mvcc.WatchID]bool),
fragment: make(map[mvcc.WatchID]bool),

closec: make(chan struct{}),

}

sws.wg.Add(1)

go func() {

sws.sendLoop()

sws.wg.Done()

}()

errc := make(chan error, 1)

// 理想情况下,recvLoop 将会使用 sws.wg 通知操作的完成,但是当 stream.Context().Done() 关闭时,由于使用了不同的 ctx,stream 的接口有可能一直阻塞,调用 sws.close() 会发生死锁

go func() {

if rerr := sws.recvLoop(); rerr != nil {

if isClientCtxErr(stream.Context().Err(), rerr) {

// 错误处理

}

errc <- rerr

}

}()

select {

case err = <-errc:

close(sws.ctrlStream)

case <-stream.Context().Done():

err = stream.Context().Err()

if err == context.Canceled {

err = rpctypes.ErrGRPCNoLeader

}

}

sws.close()

return err

}

如果出现了更新或者删除操作,相应的事件就会被发送到 watchStream 的通道中。客户端可以通过 Watch 功能监听某一个 Key 或者一个范围的变动,在每一次客户端调用服务端时都会创建两个 goroutine,其中一个协程 sendLoop 负责向监听者发送数据变动的事件,另一个协程 recvLoop 负责处理客户端发来的事件。

sendLoop 会通过select 关键字来监听多个 channel 中的数据,将接收到的数据封装成 pb.WatchResponse 结构,并通过 gRPC 流发送给客户端;recvLoop 方法调用了 MVCC 模块暴露出的watchStream.Watch 方法,该方法会返回一个可以用于取消监听事件的 watchID;当 gRPC 流已经结束或者出现错误时,当前的循环就会返回,两个 goroutine 也都会结束。

异常流程处理

我们来考虑一下异常流程的处理。消息都是通过 channel 发送出去,但如果消费者消费速度慢,channel 中的消息形成堆积,但是空间有限,满了之后应该怎么办呢?带着这个问题,首先我们来看 channel 的默认容量:

var (

// chanBufLen 是发送 watch 事件的 buffered channel 长度

chanBufLen = 1024

// maxWatchersPerSync 是每次 sync 时 watchers 的数量

maxWatchersPerSync = 512

)

在实现中设置的 channel 的长度是 1024。channel 一旦满了,etcd 并不会丢弃 watch 事件,而是会进行如下的操作:

// 位于 mvcc/watchable_store.go:438

func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {

var victim watcherBatch

for w, eb := range newWatcherBatch(&s.synced, evs) {

if eb.revs != 1 {

// 异常

}

if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {

pendingEventsGauge.Add(float64(len(eb.evs)))

} else {

// 将 slow watchers 移动到 victims

w.minRev = rev + 1

if victim == nil {

victim = make(watcherBatch)

}

w.victim = true

victim[w] = eb

s.synced.delete(w)

slowWatcherGauge.Inc()

}

}

s.addVictim(victim)

}

从 notify 的实现中可以知道,此 watcher 将会从 synced watcherGroup 中删除,和事件列表保存到一个名为 victim 的 watcherBatch 结构中。watcher 会记录当前的 Revision,并将自身标记为受损,变更操作也会被保存到 watchableStore 的 victims 中。我使用如下的示例来描述上述过程:

channel 已满的情况下,有一个写操作写入 foo = bar。监听 foo 的 watcher 将从 synced 中移除,同时 foo=bar 也被保存到 victims 中。

channel 已满时的处理流程

接下来该 watcher 不会记录对 foo 的任何变更。那么这些变更消息怎么处理呢?

我们知道在 channel 队列满时,变更的 Event 就会放入 victims 中。在 etcd 启动的时候,WatchableKV 模块启动了 syncWatchersLoop 和 syncVictimsLoop 两个异步协程,这两个协程用于处理不同场景下发送事件。

// 位于 mvcc/watchable_store.go:246

// syncVictimsLoop 清除堆积的 Event

func (s *watchableStore) syncVictimsLoop() {

defer s.wg.Done()

for {

for s.moveVictims() != 0 {

//更新所有的 victim watchers

}

s.mu.RLock()

isEmpty := len(s.victims) == 0

s.mu.RUnlock()

var tickc <-chan time.Time

if !isEmpty {

tickc = time.After(10 * time.Millisecond)

}

select {

case <-tickc:

case <-s.victimc:

case <-s.stopc:

return

}

}

}

syncVictimsLoop 则负责堆积的事件推送,尝试清除堆积的 Event。它会不断尝试让 watcher 发送这个 Event,一旦队列不满,watcher 将这个 Event 发出后,该 watcher 就被划入了 unsycned 中,同时不再是 victim 状态。

至此,syncWatchersLoop 协程就开始起作用。由于该 watcher 在 victim 状态已经落后了很多消息。为了保持同步,协程会根据 watcher 保存的 Revision,查出 victim 状态之后所有的消息,将关于 foo 的消息全部给到 watcher,当 watcher 将这些消息都发送出去后,watcher 就由 unsynced 变成 synced。