0%

ETCD源码解析-第一篇

ETCD 整体架构

整体架构与交互

客户端层:包括 clientv3 和 etcdctl 等客户端。用户通过命令行或者客户端调用提供了 RESTful 风格的API,降低了 etcd的使用复杂度。除此之外,客户端层的负载均衡(etcd V3.4 版本的客户端默认使用的是 Round-robin,即轮询调度)和节点间故障转移等特性,提升了etcd 服务端的高可用性。需要注意的是,etcd V3.4之前版本的客户端存在负载均衡的 Bug,如果第一个节点出现异常,访问服务端时也可能会出现异常,建议进行升级。

API 接口层:API 接口层提供了客户端访问服务端的通信协议和接口定义,以及服务端节点之间相互通信的协议,我将会在下一讲重点讲解 etcd的通信接口。etcd 有 V3和V2 两个版本。etcd V3 使用gRPC 作为消息传输协议;对于之前的V2 版本,etcd 默认使用HTTP/1.x 协议。对于不支持 gRPC的客户端语言,etcd 提供 JSON的grpc-gateway。通过 grpc-gateway 提供 RESTful 代理,转换 HTTP/JSON 请求为 gRPC 的 Protocol Buffer 格式的消息。这部分内容我们在 05 讲“gRPC 代理模式:实现可伸缩的 etcd API”具体做了讲解,这里我们就不再展开了。

etcd Raft 层:负责 Leader 选举和日志复制等功能,除了与本节点的 etcd Server 通信之外,还与集群中的其他 etcd 节点进行交互,实现分布式一致性数据同步的关键工作。

逻辑层:etcd 的业务逻辑层,包括鉴权、租约、KVServer、MVCC 和 Compactor 压缩等核心功能特性。

etcd 存储:实现了快照、预写式日志 WAL(Write Ahead Log)。etcd V3 版本中,使用 BoltDB 来持久化存储集群元数据和用户写入的数据。

一次请求的处理流程和细节

从上至下依次为客户端 → API 接口层 → etcd Server → etcd raft 算法库。我们根据请求处理的过程,将 etcd Server 和 etcd raft 算法库单独说明。

etcd Server:接收客户端的请求,在上述的etcd 项目代码中对应etcdserver 包。请求到达 etcd Server 之后,经过 KVServer 拦截,实现诸如日志、Metrics 监控、请求校验等功能。etcd Server 中的raft模块,用于与 etcd-raft 库进行通信。applierV3 模块封装了 etcd V3 版本的数据存储;WAL 用于写数据日志,WAL中保存了任期号、投票信息、已提交索引、提案内容等,etcd 根据 WAL 中的内容在启动时恢复,以此实现集群的数据一致性。

etcdraft:etcd 的raft 库。raftLog 用于管理 raft 协议中单个节点的日志,都处于内存中。raftLog 中还有两种结构体 unstable和storage,unsable 中存储不稳定的数据,表示还没有 commit,而 storage中都是已经被 commit 了的数据。这两种结构体分别用于不同步骤的存储,我们将在下面的交互流程中介绍。除此之外,raft 库更重要的是负责与集群中的其他 etcd Server进行交互,实现分布式一致性。

在上图中,客户端请求与 etcd 集群交互包括如下两个步骤:

首先是写数据到 etcd 节点中;

其次是当前的 etcd 节点与集群中的其他 etcd 节点之间进行通信,确认存储数据成功之后回复客户端。

请求流程可划分为以下的子步骤:

客户端通过负载均衡算法选择一个 etcd 节点,发起 gRPC 调用;

etcd Server 收到客户端请求;

经过 gRPC 拦截、Quota 校验,Quota 模块用于校验 etcd db 文件大小是否超过了配额;

接着 KVServer 模块将请求发送给本模块中的raft,这里负责与 etcd raft模块进行通信;

发起一个提案,命令为put foo bar,即使用put 方法将 foo 更新为 bar;

在raft 中会将数据封装成 raft 日志的形式提交给 raft模块;

raft模块会首先保存到 raftLog 的 unstable 存储部分;

raft模块通过raft 协议与集群中其他 etcd 节点进行交互。

需要注意的是,在 raft 协议中写入数据的 etcd 必定是 leader 节点,如果客户端提交数据到非 leader 节点时,该节点需要将请求转发到 etcd leader 节点处理。

提案通过 RaftHTTP 网络模块转发,集群中的其他节点接收到该提案;

在收到提案之后,集群中其他节点向 leader 节点应答“我已经接收这条日志数据”;

Leader收到应答之后,统计应答的数量,当满足超过集群半数以上节点,应答接收成功;

etcd raft算法模块构造 Ready 结构体,用来通知 etcd Server 模块,该日志数据已经被 commit;

etcd Server 中的 raft 模块(交互图中有标识),收到 Ready 消息后,会将这条日志数据写入到 WAL 模块中;

正式通知 etcd Server 该提案已经被 commit;

etcd Server 调用 applierV3 模块,将日志写入持久化存储中;

etcd Server 应答客户端该数据写入成功;

etcd Server 调用 etcd raft 库,将这条日志写入到 raftLog 模块中的 storage。

上述过程中,提案经过网络转发,当多数etcd 节点持久化日志数据成功并进行应答,提案的状态会变成已提交。

在应答某条日志数据是否已经 commit 时,为什么 etcd raft 模块首先写入到 WAL 模块中?这是因为该过程仅仅添加一条日志,一方面开销小,速度会很快;另一方面,如果在后面 applierV3 写入失败,etcd 服务端在重启的时候也可以根据 WAL 模块中的日志数据进行恢复。etcd Server 从 raft 模块获取已提交的日志条目,由 applierV3 模块通过 MVCC 模块执行提案内容,更新状态机。

整个过程中,etcd raft 模块中的 raftLog 数据在内存中存储,在服务重启后失效;客户端请求的数据则被持久化保存到 WAL 和 applierV3 中,不会在重启之后丢失。

客户端API

etcd client 与 etcd 集群之间通过 grpc 进行通信。发送到 etcd 服务器的每个 API 请求都是一个 gRPC 远程过程调用。etcd 中的 RPC 接口定义根据功能分类到服务中。

处理 etcd 键值的重要服务包括:

KV Service,创建、更新、获取和删除键值对。
Watch Service,监视键的更改。
Lease Service,实现键值对过期,客户端用来续租、保持心跳。
Lock Service,etcd 提供分布式共享锁的支持。
Election Service,暴露客户端选举机制。
响应头

message ResponseHeader {

uint64 cluster_id = 1;

uint64 member_id = 2;

int64 revision = 3;

uint64 raft_term = 4;

}

Cluster_ID:产生响应的集群的 ID。

Member_ID:产生响应的成员的 ID。

应用服务可以通过 Cluster_ID 和 Member_ID 字段来确保,当前与之通信的正是预期的那个集群或者成员。

Revision:产生响应时键值存储的修订版本号。

应用服务可以使用修订号字段来获得当前键值存储库最新的修订号。应用程序指定历史修订版以进行查询,如果希望在请求时知道最新修订版,此功能特别有用。

Raft_Term:产生响应时,成员的 Raft 称谓。

应用服务可以使用 Raft_Term 来检测集群何时完成一个新的 leader 选举。

客户端连接示例代码

// client_init_test.go

package client

import (

“context”

“testing”

“time”

“go.etcd.io/etcd/clientv3”

)

// 测试客户端连接

func TestEtcdClientInit(t *testing.T) {

var (

config clientv3.Config

client *clientv3.Client

err error

)

// 客户端配置

config = clientv3.Config{

// 节点配置

Endpoints: []string{“localhost:2379”},// etcd 的多个节点服务地址,因为我是单点本机测试,所以只传 1 个。

DialTimeout: 5 * time.Second,// 创建 client 的首次连接超时,这里传了 5 秒,如果 5 秒都没有连接成功就会返回 err。需要注意的是,一旦 client 创建成功,我们就不用再关心后续底层连接的状态了,client 内部会重连。

}

// 建立连接

if client, err = clientv3.New(config); err != nil {

t.Error(err)

} else {

// 输出集群信息

t.Log(client.Cluster.MemberList(context.TODO()))

}

client.Close()

}

KV 存储

kv := clientev3.NewKV(client)

// kv 接口定义
type KV interface {

Put(ctx context.Context, key, val string, opts …OpOption) (*PutResponse, error)

// 检索 keys

Get(ctx context.Context, key string, opts …OpOption) (*GetResponse, error)

// 删除 key,可以使用 WithRange(end), [key, end) 的方式

Delete(ctx context.Context, key string, opts …OpOption) (*DeleteResponse, error)

// 压缩给定版本之前的 KV 历史

Compact(ctx context.Context, rev int64, opts …CompactOption) (*CompactResponse, error)

// 指定某种没有事务的操作

Do(ctx context.Context, op Op) (OpResponse, error)

// Txn 创建一个事务

Txn(ctx context.Context) Txn

}

KV 存储 Put

Put 的定义如下:

Put(ctx context.Context, key, val string, opts …OpOption) (*PutResponse, error)

其中的参数

ctx:Context 包对象,用来跟踪上下文,比如超时控制。

key:存储对象的 key。

val:存储对象的 value。

opts:可变参数,额外选项。

Put 将一个键值对放入 etcd 中。请注意,键值可以是纯字节数组,字符串是该字节数组的不可变表示形式。要获取字节字符串,请执行string([] byte {0x10,0x20})。

Put 的使用方法如下所示:

putResp, err := kv.Put(context.TODO(),”shopee_etcd_key”, “hello-world!”)

KV 查询 Get

现在可以对存储的数据进行取值了。默认情况下,Get 将返回“key”对应的值。

复制代码

Get(ctx context.Context, key string, opts …OpOption) (*GetResponse, error)

OpOption 为可选的函数传参,传参为WithRange(end)时,Get 将返回 [key,end) 范围内的键;传参为 WithFromKey() 时,Get 返回大于或等于 key 的键;当通过 rev> 0 传递 WithRev(rev) 时,Get 查询给定修订版本的键;如果压缩了所查找的修订版本,则返回请求失败,并显示 ErrCompacted。 传递 WithLimit(limit) 时,返回的 key 数量受 limit 限制;传参为 WithSort 时,将对键进行排序。

对应的使用方法如下:

getResp, err := kv.Get(context.TODO(), “shopee_etcd_key”)

从以上数据的存储和取值,我们知道:Put 返回 PutResponse,Get 返回 GetResponse。注意:不同的 KV 操作对应不同的 Response 结构,定义如下:

type (

CompactResponse pb.CompactionResponse

PutResponse pb.PutResponse

GetResponse pb.RangeResponse

DeleteResponse pb.DeleteRangeResponse

TxnResponse pb.TxnResponse

)

下面我们分别来看一看 PutResponse 和 GetResponse 映射的 RangeResponse 结构的定义:

复制代码

type PutResponse struct {

Header *ResponseHeader protobuf:"bytes,1,opt,name=header" json:"header,omitempty"

// 请求中如有 prev_kv,响应时也会携带 prev_kv

PrevKv *mvccpb.KeyValue protobuf:"bytes,2,opt,name=prev_kv,json=prevKv" json:"prev_kv,omitempty"

}

//Header 里保存的主要是本次更新的 revision 信息

type RangeResponse struct {

Header *ResponseHeader protobuf:"bytes,1,opt,name=header" json:"header,omitempty"

// kvs 是一个匹配 range 请求的键值对列表

Kvs []*mvccpb.KeyValue protobuf:"bytes,2,rep,name=kvs" json:"kvs,omitempty"

// more 用以分页

More bool protobuf:"varint,3,opt,name=more,proto3" json:"more,omitempty"

// count 表示 range 的键值对数量

Count int64 protobuf:"varint,4,opt,name=count,proto3" json:"count,omitempty"

}

KVS 字段,保存了本次 Get 查询到的所有 KV 对,我们继续看一下 mvccpb.KeyValue 对象的定义:

type KeyValue struct {

Key []byte protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"

// create_revision 是当前 key 的最后创建版本

CreateRevision int64 protobuf:"varint,2,opt,name=create_revision,json=createRevision,proto3" json:"create_revision,omitempty"

// mod_revision 是指当前 key 的最新修订版本

ModRevision int64 protobuf:"varint,3,opt,name=mod_revision,json=modRevision,proto3" json:"mod_revision,omitempty"

// key 的版本,每次更新都会增加版本号

Version int64 protobuf:"varint,4,opt,name=version,proto3" json:"version,omitempty"

Value []byte protobuf:"bytes,5,opt,name=value,proto3" json:"value,omitempty"

// 绑定了 key 的租期 Id,当 lease 为 0 ,则表明没有绑定 key;租期过期,则会删除 key

Lease int64 protobuf:"varint,6,opt,name=lease,proto3" json:"lease,omitempty"

}

至于 RangeResponse.More 和 Count,当我们使用 withLimit() 选项进行 Get 时会发挥作用,相当于分页查询。

接下来,我们通过一个特别的 Get 选项,获取 shopee 目录下的所有子目录:

rangeResp, err := kv.Get(context.TODO(), “/shopee”, clientv3.WithPrefix())

WithPrefix()用于查找以/shopee为前缀的所有 key,因此可以模拟出查找子目录的效果。我们知道 etcd 是一个有序的 KV 存储,因此/shopee为前缀的 key 总是顺序排列在一起。

WithPrefix 实际上会转化为范围查询,它根据前缀/shopee生成了一个 key range,[“/shopee/”, “/shopee0”),这是因为比 / 大的字符是 0,所以以 /shopee0 作为范围的末尾,就可以扫描到所有的 /aa/ 打头的 key 了。

KV 操作实践

键值对的操作是 etcd 中最基本、最常用的功能,主要包括读、写、删除三种基本的操作。在 etcd 中定义了 KV 接口,用来对外提供这些操作,下面我们进行具体测试:

package client

import (

“context”

“testing”

“time”

“github.com/google/uuid”

“go.etcd.io/etcd/clientv3”

)

func TestKV(t *testing.T) {

rootContext := context.Background()

// 客户端初始化

cli, err := clientv3.New(clientv3.Config{

Endpoints: []string{“localhost:2379”},

DialTimeout: 2 * time.Second,

})

// etcd clientv3 >= v3.2.10, grpc/grpc-go >= v1.7.3

if cli == nil || err == context.DeadlineExceeded {

// handle errors

t.Error(err)

panic(“invalid connection!”)

}

// 客户端断开连接

defer cli.Close()

// 初始化 kv

kvc := clientv3.NewKV(cli)

//获取值

ctx, cancelFunc := context.WithTimeout(rootContext, time.Duration(2)*time.Second)

response, err := kvc.Get(ctx, “cc”)

cancelFunc()

if err != nil {

t.Error(err)

}

kvs := response.Kvs

// 输出获取的 key

if len(kvs) > 0 {

t.Logf(“last value is :%s\r\n”, string(kvs[0].Value))

} else {

t.Logf(“empty key for %s\n”, “cc”)

}

//设置值

uuid := uuid.New().String()

fmt.Printf(“new value is :%s\r\n”, uuid)

ctx2, cancelFunc2 := context.WithTimeout(rootContext, time.Duration(2)*time.Second)

_, err = kvc.Put(ctx2, “cc”, uuid)

// 设置成功后,将该 key 对应的键值删除

if delRes, err := kvc.Delete(ctx2, “cc”); err != nil {

t.Error(err)

} else {

t.Logf(“delete %s for %t\n”, “cc”, delRes.Deleted > 0)

}

cancelFunc2()

if err != nil {

t.Error(err)

}

}

如上的测试用例,主要是针对 KV 的操作,依次获取 key,即 Get(),对应 etcd 底层实现的 range 接口;其次是写入键值对,即 put 操作;最后删除刚刚写入的键值对。

其他通信接口

其他常用的接口还有 Txn、Compact、Watch、Lease、Lock 等。我们依次看看这些接口的定义。

事务 Txn

Txn 方法在单个事务中处理多个请求。Txn 请求增加键值存储的修订版本,并为每个完成的请求生成带有相同修订版本的事件,etcd 不容许在一个 Txn 中多次修改同一个 key。Txn 接口定义如下:

rpc Txn(TxnRequest) returns (TxnResponse) {}

Compact

Compact 方法压缩 etcd 键值对存储中的事件历史。键值对存储应该定期压缩,否则事件历史会无限制地持续增长。Compact 接口定义如下:

rpc Compact(CompactionRequest) returns (CompactionResponse) {}

请求的消息体是 CompactionRequest, CompactionRequest 压缩键值对存储到给定修订版本,所有修订版本比压缩修订版本小的键都将被删除。

Watch

Watch API 提供了一个基于事件的接口,用于异步监视键的更改。etcd 监视程序通过给定的修订版本(当前版本或历史版本)持续监视 key 更改,并将 key 更新流回客户端。

在 rpc.proto 中 Watch Service 定义如下:

service Watch {

rpc Watch(stream WatchRequest) returns (stream WatchResponse) {}

}

Watch 观察将要发生或者已经发生的事件。输入和输出都是流,输入流用于创建和取消观察,而输出流发送事件。一个观察 RPC 可以一次性在多个 key 范围上观察,并为多个观察流化事件。整个事件历史可以从最后压缩修订版本开始观察。Watch Service 只有一个 Watch 方法。

Lease Service

Lease Service 提供租约的支持。Lease 是一种检测客户端存活状况的机制。集群授予客户端具有生存时间的租约。如果 etcd 集群在给定的 TTL 时间内未收到 keepAlive,则租约到期。

为了将租约绑定到键值存储中,每个 key 最多可以附加一个租约。当租约到期或被撤销时,该租约依附的所有 key 都将被删除,每个过期的密钥都会在事件历史记录中生成一个删除事件。

在 rpc.proto 中 Lease Service 定义的接口如下:

service Lease {

rpc LeaseGrant(LeaseGrantRequest) returns (LeaseGrantResponse) {}

rpc LeaseRevoke(LeaseRevokeRequest) returns (LeaseRevokeResponse) {}

rpc LeaseKeepAlive(stream LeaseKeepAliveRequest) returns (stream LeaseKeepAliveResponse) {}

rpc LeaseTimeToLive(LeaseTimeToLiveRequest) returns (LeaseTimeToLiveResponse) {}

}

其中:

LeaseGrant,创建一个租约;

LeaseRevoke,撤销一个租约;

LeaseKeepAlive,用于维持租约;

LeaseTimeToLive,获取租约信息。

Lock Service

Lock Service 提供分布式共享锁的支持。Lock Service 以 gRPC 接口的方式暴露客户端锁机制。在 v3lock.proto 中 Lock Service 定义如下:

service Lock {

rpc Lock(LockRequest) returns (LockResponse) {}

rpc Unlock(UnlockRequest) returns (UnlockResponse) {}

}

其中:

Lock 方法,在给定命令锁上获得分布式共享锁;

Unlock 使用 Lock 返回的 key 并释放对锁的持有。