0%

ETCD源码解析-第二篇

ETCD 读写分析

键值对读写分析

上一篇文章,我们阐述了etcd一次请求流程,总体上从上至下依次为客户端 → API 接口层 → etcd Server → etcd raft 算法库。

对于读请求来说,客户端通过负载均衡选择一个 etcd 节点发出读请求,API 接口层提供了 Range RPC 方法,etcd 服务端拦截到 gRPC 读请求后,调用相应的处理器处理请求。

写请求相对复杂一些,客户端通过负载均衡选择一个 etcd 节点发起写请求,etcd 服务端拦截到 gRPC 写请求,涉及一些校验和监控,之后KVServer 向 raft 模块发起提案,内容即为写入数据的命令。经过网络转发,当集群中的多数节点达成一致并持久化数据后,状态变更且 MVCC 模块执行提案内容。

下面我们就分别看一下读写请求的底层存储实现。

读操作

在 etcd 中读请求占了大部分,是高频的操作。我们使用 etcdctl 命令行工具进行读操作:

$ etcdctl –endpoints http://localhost:2379 get foo

foo

bar

将整个读操作划分成如下几个步骤:

etcdctl 会创建一个 clientv3 库对象,选取一个合适的 etcd 节点;

调用 KVServer 模块的 Range RPC 方法(上一课时有讲解),发送请求;

拦截器拦截,主要做一些校验和监控;

调用 KVServer 模块的 Range 接口获取数据;

接着就进入了读请求的核心步骤,会经过线性读 ReadIndex 模块、MVCC(包含 treeIndex 和 BlotDB)模块。

这里要提一下线性读,线性读是相对串行读来讲的概念。集群模式下会有多个 etcd 节点,不同节点之间可能存在一致性问题,串行读直接返回状态数据,不需要与集群中其他节点交互。这种方式速度快,开销小,但是会存在数据不一致的情况。

线性读则需要集群成员之间达成共识,存在开销,响应速度相对慢。但是能够保证数据的一致性,etcd 默认读模式是线性读。我们将在后面的课时重点介绍如何实现分布式一致性。

继续往下,看看如何读取 etcd 中的数据。etcd 中查询请求,查询单个键或者一组键,以及查询数量,到了底层实际都会调用 Range keys 方法。下面我们具体分析一下这个方式的实现。

Range 请求的结构图如下所示:

从上至下,查询键值对的流程包括:

在 treeIndex 中根据键利用 BTree 快速查询该键对应的索引项 keyIndex,索引项中包含 Revision;

根据查询到的版本号信息 Revision,在 Backend 的缓存 Buffer 中利用二分法查找,如果命中则直接返回;

若缓存中不符合条件,在 BlotDB 中查找(基于 BlotDB 的索引),查询之后返回键值对信息。

图中 ReadTx 和 BatchTx 是两个接口,用于读写请求。在创建 Backend 结构体时,默认也会创建 readTx 和 batchTx,readTx 实现了 ReadTx ,负责处理只读请求;batchTx 实现了 BatchTx 接口,负责处理读写请求。

rangeKeys方法的实现如下所示:

// 位于 mvcc/kvstore_txn.go:117

func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {

rev := ro.Rev

if rev > curRev {

return &RangeResult{KVs: nil, Count: -1, Rev: curRev}, ErrFutureRev

}

if rev <= 0 {

rev = curRev

}

if rev < tr.s.compactMainRev {

return &RangeResult{KVs: nil, Count: -1, Rev: 0}, ErrCompacted

}

// 获取索引项 keyIndex,索引项中包含 Revision

revpairs := tr.s.kvindex.Revisions(key, end, rev)

tr.trace.Step(“range keys from in-memory index tree”)

// 结果为空,直接返回

if len(revpairs) == 0 {

return &RangeResult{KVs: nil, Count: 0, Rev: curRev}, nil

}

if ro.Count {

return &RangeResult{KVs: nil, Count: len(revpairs), Rev: curRev}, nil

}

limit := int(ro.Limit)

if limit <= 0 || limit > len(revpairs) {

limit = len(revpairs)

}

kvs := make([]mvccpb.KeyValue, limit)

revBytes := newRevBytes()

for i, revpair := range revpairs[:len(kvs)] {

revToBytes(revpair, revBytes)

// UnsafeRange 实现了 ReadTx,查询对应的键值对

_, vs := tr.tx.UnsafeRange(keyBucketName, revBytes, nil, 0)

if len(vs) != 1 {

tr.s.lg.Fatal(

“range failed to find revision pair”,

zap.Int64(“revision-main”, revpair.main),

zap.Int64(“revision-sub”, revpair.sub),

)

}

if err := kvs[i].Unmarshal(vs[0]); err != nil {

tr.s.lg.Fatal(

“failed to unmarshal mvccpb.KeyValue”,

zap.Error(err),

)

}

}

tr.trace.Step(“range keys from bolt db”)

return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil

}

在上述代码的实现中,我们需要通过Revisions方法从 Btree 中获取范围内所有的 keyIndex,以此才能获取一个范围内的所有键值对。Revisions方法实现如下:

// 位于 mvcc/index.go:106

func (ti *treeIndex) Revisions(key, end []byte, atRev int64) (revs []revision) {

if end == nil {

rev, _, _, err := ti.Get(key, atRev)

if err != nil {

return nil

}

return []revision{rev}

}

ti.visit(key, end, func(ki *keyIndex) {

// 使用 keyIndex.get 来遍历整棵树

if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {

revs = append(revs, rev)

}

})

return revs

}

如果只获取一个键对应的版本,使用 treeIndex 方法即可,但是一般会从 Btree 索引中获取多个 Revision 值,此时需要调用 keyIndex.get 方法来遍历整棵树并选取合适的版本。这是因为BoltDB 保存一个 key 的多个历史版本,每一个 key 的 keyIndex 中其实都存储着多个历史版本,我们需要根据传入的参数返回正确的版本。

对于上层的键值存储来说,它会利用这里返回的 Revision,从真正存储数据的 BoltDB 中查询当前 key 对应 Revision 的数据。BoltDB 内部使用的也是类似 bucket(桶)的方式存储,其实就是对应 MySQL 中的表结构,用户的 key 数据存放的 bucket 名字的是 key,etcd MVCC 元数据存放的 bucket 是 meta。

写操作

介绍完读请求,我们回忆一下写操作的实现。使用 etcdctl 命令行工具进行写操作:

$ etcdctl –endpoints http://localhost:2379 put foo bar

将整个写操作划分成如下几个步骤:

客户端通过负载均衡算法选择一个 etcd 节点,发起 gRPC 调用;

etcd Server 收到客户端请求;

经过 gRPC 拦截、Quota 校验,Quota 模块用于校验 etcd db 文件大小是否超过了配额;

接着 KVServer 模块将请求发送给本模块中的 raft,这里负责与 etcd raft 模块进行通信,发起一个提案,命令为put foo bar,即使用 put 方法将 foo 更新为 bar;

提案经过转发之后,半数节点成功持久化;

MVCC 模块更新状态机。

我们重点关注最后一步,学习如何更新和插入键值对。与上面一张图相对应,我们来看下 put 接口的执行过程:

调用 put 向 etcd 写入数据时,首先会使用传入的键构建 keyIndex 结构体,基于 currentRevision 自增生成新的 Revision 如{1,0},并从 treeIndex 中获取相关版本 Revision 等信息;写事务提交之后,将本次写操作的缓存 buffer 合并(merge)到读缓存上(图中 ReadTx 中的缓存)。代码实现如下所示:

//位于 mvcc/index.go:53

func (ti *treeIndex) Put(key []byte, rev revision) {

keyi := &keyIndex{key: key}

// 加锁,互斥

ti.Lock()

defer ti.Unlock()

// 获取版本信息

item := ti.tree.Get(keyi)

if item == nil {

keyi.put(ti.lg, rev.main, rev.sub)

ti.tree.ReplaceOrInsert(keyi)

return

}

okeyi := item.(*keyIndex)

okeyi.put(ti.lg, rev.main, rev.sub)

}

treeIndex.Put 在获取 Btree 中的 keyIndex 结构之后,会通过 keyIndex.put 在其中加入新的 revision,方法实现如下:

// 位于 mvcc/key_index.go:77

func (ki *keyIndex) put(lg *zap.Logger, main int64, sub int64) {

rev := revision{main: main, sub: sub}

// 校验版本号

if !rev.GreaterThan(ki.modified) {

lg.Panic(

“‘put’ with an unexpected smaller revision”,

zap.Int64(“given-revision-main”, rev.main),

zap.Int64(“given-revision-sub”, rev.sub),

zap.Int64(“modified-revision-main”, ki.modified.main),

zap.Int64(“modified-revision-sub”, ki.modified.sub),

)

}

if len(ki.generations) == 0 {

ki.generations = append(ki.generations, generation{})

}

g := &ki.generations[len(ki.generations)-1]

if len(g.revs) == 0 { // 创建一个新的键

keysGauge.Inc()

g.created = rev

}

g.revs = append(g.revs, rev)

g.ver++

ki.modified = rev

}

从上述代码我们可以知道,构造的 Revision 结构体写入 keyIndex 键索引时,会改变 generation 结构体中的属性,generation 中包括一个键的多个不同的版本信息,包括创建版本、修改次数等参数。因此我们可以通过该方法了解 generation 结构体中的各个成员如何定义和赋值。

revision{1,0} 是生成的全局版本号,作为 BoltDB 的 key,经过序列化包括 key 名称、key 创建时的版本号(create_revision)、value 值和租约等信息为二进制数据之后,将填充到 BoltDB 的 value 中,同时将该键和 Revision 等信息存储到 Btree。

本章小结

这一讲我们主要介绍了 etcd 的底层如何实现读写操作。我们首先简单介绍了客户端与服务端读写操作的流程,之后重点分析了在 etcd 中如何读写数据。

通过上面的分析不难发现,etcd 最底层的读写其实并不是很复杂。根据 etcd 读写流程图,可以知道读写操作依赖 MVCC 模块的 treeIndex 和 BoltDB,treeIndex 用来保存键的历史版本号信息,而 BoltDB 用来保存 etcd 的键值对数据。通过这两个模块之间的协作,实现了 etcd 数据的读取和存储。因此后续的课程将会进一步介绍 etcd 分布式一致性实现以及 MVCC 多版本控制实现的原理。

etcd-raft 分布式一致性

在分布式环境中,常用数据复制来避免单点故障,实现多副本,提高服务的高可用性以及系统的吞吐量。

etcd 集群中的多个节点不可避免地会出现相互之间数据不一致的情况。但不管是同步复制、异步复制还是半同步复制,都会存在可用性或者一致性的问题。我们一般会使用共识算法来解决多个节点数据一致性的问题,常见的共识算法有 Paxos和Raft。ZooKeeper 使用的是 ZAB 协议,etcd 使用的共识算法就是Raft。

etcd-raft 模块是 etcd中解决分布式一致性的模块,这一讲我们就结合源码具体分析这部分内容。

etcd-raft 对外提供的接口

raft 库对外提供了一个 Node 接口,由 raft/node.go 中的 node结构体实现,Node 接口需要实现的函数包括:Tick、Propose、Ready、Step 等。

我们重点需要了解 Ready 接口,该接口将返回类型为 Ready 的 channel,该通道表示当前时间点的channel。应用层需要关注该 channel,当发生变更时,其中的数据也将会进行相应的操作。其他的函数对应的功能如下:

Tick:时钟,触发选举或者发送心跳;

Propose:通过 channel 向 raft StateMachine 提交一个 Op,提交的是本地 MsgProp 类型的消息;

Step:节点收到 Peer 节点发送的 Msg 时会通过该接口提交给 raft 状态机,Step 接口通过 recvc channel向raft StateMachine 传递 Msg;

然后是 raft 算法的实现,node 结构体实现了 Node 接口,其定义如下:

type node struct {

propc chan msgWithResult

recvc chan pb.Message

confc chan pb.ConfChangeV2

confstatec chan pb.ConfState

readyc chan Ready

advancec chan struct{}

tickc chan struct{}

done chan struct{}

stop chan struct{}

status chan chan Status

rn *RawNode

}

这个结构体会在后面经常用到。

在 raft/raft.go 中还有两个核心数据结构:

Config,封装了与 raft 算法相关的配置参数,公开用于外部调用。

raft,具体实现 raft 算法的结构体。

节点状态

下面我们来看看 raft StateMachine 的状态机转换,实际上就是 raft 算法中各种角色的转换。每个 raft 节点,可能具有以下三种状态中的一种。

Candidate:候选人状态,该状态意味着将进行一次新的选举。

Follower:跟随者状态,该状态意味着选举结束。

Leader:领导者状态,选举出来的节点,所有数据提交都必须先提交到 Leader 上。

每一个状态都有其对应的状态机,每次收到一条提交的数据时,都会根据其不同的状态将消息输入到不同状态的状态机中。同时,在进行 tick 操作时,每种状态对应的处理函数也是不一样的。

因此 raft 结构体中将不同的状态及其不同的处理函数,独立出来几个成员变量:

state,保存当前节点状态;

tick 函数,每个状态对应的 tick 函数不同;

step,状态机函数,同样每个状态对应的状态机也不相同。

状态转换

我们接着看 etcd raft 状态转换。etcd-raft StateMachine 封装在 raft机构体中,其状态转换如下图:

etcd raft 状态转换示意图

raft 状态转换的接口都在 raft.go 中,其定义如下:

func (r *raft) becomeFollower(term uint64, lead uint64)

func (r *raft) becomePreCandidate()

func (r *raft) becomeCandidate()

func (r *raft) becomeLeader()

raft 在不同的状态下,是如何驱动 raft StateMachine 状态机运转的呢?答案是etcd 将 raft 相关的所有处理都抽象为了 Msg,通过 Step 接口处理:

func (r *raft) Step(m pb.Message) error {

r.step(r, m)

}

这里的step是一个回调函数,根据不同的状态会设置不同的回调函数来驱动 raft,这个回调函数 stepFunc 就是在becomeXX()函数完成的设置:

type raft struct {

step stepFunc

}

step 回调函数有如下几个值,注意其中 stepCandidate 会处理 PreCandidate 和 Candidate 两种状态:

func stepFollower(r *raft, m pb.Message) error

func stepCandidate(r *raft, m pb.Message) error

func stepLeader(r *raft, m pb.Message) error

这几个函数的实现其实就是对各种 Msg 进行处理,这里就不详细展开了。我们来看一下 raft 消息的类型及其定义。

raft 消息

raft 算法本质上是一个大的状态机,任何的操作例如选举、提交数据等,最后都被封装成一个消息结构体,输入到 raft 算法库的状态机中。

在 raft/raftpb/raft.proto 文件中,定义了 raft 算法中传输消息的结构体。raft 算法其实由好几个协议组成,etcd-raft 将其统一定义在了 Message 结构体之中,以下总结了该结构体的成员用途:

// 位于 raft/raftpb/raft.pb.go:295

type Message struct {

Type MessageType protobuf:"varint,1,opt,name=type,enum=raftpb.MessageType" json:"type" // 消息类型

To uint64 protobuf:"varint,2,opt,name=to" json:"to" // 消息接收者的节点ID

From uint64 protobuf:"varint,3,opt,name=from" json:"from" // 消息发送者的节点 ID

Term uint64 protobuf:"varint,4,opt,name=term" json:"term" // 任期 ID

LogTerm uint64 protobuf:"varint,5,opt,name=logTerm" json:"logTerm" // 日志所处的任期 ID

Index uint64 protobuf:"varint,6,opt,name=index" json:"index" // 日志索引 ID,用于节点向 Leader 汇报自己已经commit的日志数据 ID

Entries []Entry protobuf:"bytes,7,rep,name=entries" json:"entries" // 日志条目数组

Commit uint64 protobuf:"varint,8,opt,name=commit" json:"commit" // 提交日志索引

Snapshot Snapshot protobuf:"bytes,9,opt,name=snapshot" json:"snapshot" // 快照数据

Reject bool protobuf:"varint,10,opt,name=reject" json:"reject" // 是否拒绝

RejectHint uint64 protobuf:"varint,11,opt,name=rejectHint" json:"rejectHint" // 拒绝同步日志请求时返回的当前节点日志 ID,用于被拒绝方快速定位到下一次合适的同步日志位置

Context []byte protobuf:"bytes,12,opt,name=context" json:"context,omitempty" // 上下文数据

XXX_unrecognized []byte json:"-"

}

Message结构体相关的数据类型为 MessageType,MessageType 有 19 种。当然,并不是所有的消息类型都会用到上面定义的Message结构体中的所有字段,因此其中有些字段是Optinal的。

我将其中常用的协议(即不同的消息类型)的用途总结成如下的表格:

上表列出了消息的类型对应的功能、消息接收者的节点 ID 和消息发送者的节点 ID。在收到消息之后,根据消息类型检索此表,可以帮助我们理解 raft 算法的操作。

选举流程

raft 一致性算法实现的关键有 Leader 选举、日志复制和安全性限制。Leader 故障后集群能快速选出新 Leader,集群只有Leader 能写入日志, Leader 负责复制日志到 Follower 节点,并强制 Follower 节点与自己保持相同。

raft 算法的第一步是选举出 Leader,即使在 Leader 出现故障后也需要快速选出新 Leader,下面我们来梳理一下选举的流程。

发起选举

发起选举对节点的状态有限制,很显然只有在Candidate 或者 Follower 状态下的节点才有可能发起一个选举流程,而这两种状态的节点,其对应的tick 函数为 raft.tickElection 函数,用来发起选举和选举超时控制。发起选举的流程如下。

节点启动时都以Follower 启动,同时随机生成自己的选举超时时间。

在Follower的tickElection 函数中,当选举超时,节点向自己发送 MsgHup 消息。

在状态机函数 raft.Step函数中,收到 MsgHup 消息之后,节点首先判断当前有没有 apply 的配置变更消息,如果有就忽略该消息。

否则进入 campaign 函数中进行选举:首先将任期号增加 1,然后广播给其他节点选举消息,带上的其他字段,包括:节点当前的最后一条日志索引(Index 字段)、最后一条日志对应的任期号(LogTerm 字段)、选举任期号(Term 字段,即前面已经进行 +1 之后的任期号)、Context 字段(目的是告知这一次是否是 Leader 转让类需要强制进行选举的消息)。

如果在一个选举超时期间内,发起新的选举流程的节点,得到了超过半数的节点投票,那么状态就切换到 Leader 状态。成为 Leader的同时,Leader 将发送一条 dummy 的 append 消息,目的是提交该节点上在此任期之前的值。

在上述流程中,之所以每个节点随机选择自己的超时时间,是为了避免有两个节点同时进行选举,此时没有任何一个节点会赢得半数以上的投票,从而导致这一轮选举失败,继续进行下一轮选举。在第三步,判断是否有 apply 配置变更消息,其原因在于,当有配置更新的情况下不能进行选举操作,即要保证每一次集群成员变化时只能变化一个,不能多个集群成员的状态同时发生变化。

参与选举

当收到任期号大于当前节点任期号的消息,且该消息类型如果是选举类的消息(类型为 prevote 或者 vote)时,节点会做出以下判断。

首先判断该消息是否为强制要求进行选举的类型(context 为 campaignTransfer,表示进行 Leader转让)。

判断当前是否在租约期内,满足的条件包括:checkQuorum 为 true、当前节点保存的 Leader 不为空、没有到选举超时。

如果不是强制要求选举,且在租约期内,就忽略该选举消息,这样做是为了避免出现那些分裂集群的节点,频繁发起新的选举请求。

如果不是忽略选举消息的情况,除非是 prevote 类的选举消息,否则在收到其他消息的情况下,该节点都切换为 Follower 状态。

此时需要针对投票类型中带来的其他字段进行处理,同时满足日志新旧的判断和参与选举的条件。

只有在同时满足以上两个条件的情况下,才能同意该节点的选举,否则都会被拒绝。这种做法可以保证最后选出来的新 Leader 节点,其日志都是最新的。

日志复制

选举好 Leader 后,Leader在收到 put 提案时,如何将提案复制给其他 Follower 呢?

我们回顾一下前面讲的 etcd 读写请求的处理流程。并结合下图说明日志复制的流程:

日志复制的流程图 ①

收到客户端请求之后,etcd Server 的 KVServer 模块会向 raft 模块提交一个类型为 MsgProp 的提案消息。

Leader节点在本地添加一条日志,其对应的命令为put foo bar。此步骤只是添加一条日志,并没有提交,两个索引值还指向上一条日志。

Leader 节点向集群中其他节点广播 AppendEntries 消息,带上 put 命令。

第二步中,两个索引值分别为 committedIndex和appliedIndex,图中有标识。committedIndex 存储最后一条提交日志的索引,而 appliedIndex 存储的是最后一条应用到状态机中的日志索引值。两个数值满足committedIndex 大于等于 appliedIndex,这是因为一条日志只有被提交了才能应用到状态机中。

接下来我们看看 Leader 如何将日志数据复制到 Follower 节点。

日志复制的流程图 ②

Follower 节点收到 AppendEntries 请求后,与 Leader 节点一样,在本地添加一条新的日志,此时日志也没有提交。

添加成功日志后,Follower 节点向 Leader 节点应答 AppendEntries 消息。

Leader 节点汇总 Follower 节点的应答。当Leader 节点收到半数以上节点的 AppendEntries 请求的应答消息时,表明 put foo bar 命令成功复制,可以进行日志提交。

Leader 修改本地 committed 日志的索引,指向最新的存储put foo bar的日志,因为还没有应用该命令到状态机中,所以 appliedIndex 还是保持着上一次的值。

当这个命令提交完成之后,命令就可以提交给应用层了。

此时修改 appliedIndex的值,与 committedIndex 的值相等。

Leader 节点在后续发送给 Follower 的 AppendEntries 请求中,总会带上最新的 committedIndex 索引值。

Follower 收到AppendEntries 后会修改本地日志的 committedIndex 索引。

至此,日志复制的过程在集群的多个节点之间就完成了。

本章小结

这一讲我们主要介绍了 etcd-raft 模块实现分布式一致性的原理,并且通过 raftexample 了解了 raft 模块的使用方式和过程。接着重点介绍了选举流程和日志复制的过程。