ETCD 整体架构
整体架构与交互
客户端层:包括 clientv3 和 etcdctl 等客户端。用户通过命令行或者客户端调用提供了 RESTful 风格的API,降低了 etcd的使用复杂度。除此之外,客户端层的负载均衡(etcd V3.4 版本的客户端默认使用的是 Round-robin,即轮询调度)和节点间故障转移等特性,提升了etcd 服务端的高可用性。需要注意的是,etcd V3.4之前版本的客户端存在负载均衡的 Bug,如果第一个节点出现异常,访问服务端时也可能会出现异常,建议进行升级。
API 接口层:API 接口层提供了客户端访问服务端的通信协议和接口定义,以及服务端节点之间相互通信的协议,我将会在下一讲重点讲解 etcd的通信接口。etcd 有 V3和V2 两个版本。etcd V3 使用gRPC 作为消息传输协议;对于之前的V2 版本,etcd 默认使用HTTP/1.x 协议。对于不支持 gRPC的客户端语言,etcd 提供 JSON的grpc-gateway。通过 grpc-gateway 提供 RESTful 代理,转换 HTTP/JSON 请求为 gRPC 的 Protocol Buffer 格式的消息。这部分内容我们在 05 讲“gRPC 代理模式:实现可伸缩的 etcd API”具体做了讲解,这里我们就不再展开了。
etcd Raft 层:负责 Leader 选举和日志复制等功能,除了与本节点的 etcd Server 通信之外,还与集群中的其他 etcd 节点进行交互,实现分布式一致性数据同步的关键工作。
逻辑层:etcd 的业务逻辑层,包括鉴权、租约、KVServer、MVCC 和 Compactor 压缩等核心功能特性。
etcd 存储:实现了快照、预写式日志 WAL(Write Ahead Log)。etcd V3 版本中,使用 BoltDB 来持久化存储集群元数据和用户写入的数据。
一次请求的处理流程和细节
从上至下依次为客户端 → API 接口层 → etcd Server → etcd raft 算法库。我们根据请求处理的过程,将 etcd Server 和 etcd raft 算法库单独说明。
etcd Server:接收客户端的请求,在上述的etcd 项目代码中对应etcdserver 包。请求到达 etcd Server 之后,经过 KVServer 拦截,实现诸如日志、Metrics 监控、请求校验等功能。etcd Server 中的raft模块,用于与 etcd-raft 库进行通信。applierV3 模块封装了 etcd V3 版本的数据存储;WAL 用于写数据日志,WAL中保存了任期号、投票信息、已提交索引、提案内容等,etcd 根据 WAL 中的内容在启动时恢复,以此实现集群的数据一致性。
etcdraft:etcd 的raft 库。raftLog 用于管理 raft 协议中单个节点的日志,都处于内存中。raftLog 中还有两种结构体 unstable和storage,unsable 中存储不稳定的数据,表示还没有 commit,而 storage中都是已经被 commit 了的数据。这两种结构体分别用于不同步骤的存储,我们将在下面的交互流程中介绍。除此之外,raft 库更重要的是负责与集群中的其他 etcd Server进行交互,实现分布式一致性。
在上图中,客户端请求与 etcd 集群交互包括如下两个步骤:
首先是写数据到 etcd 节点中;
其次是当前的 etcd 节点与集群中的其他 etcd 节点之间进行通信,确认存储数据成功之后回复客户端。
请求流程可划分为以下的子步骤:
客户端通过负载均衡算法选择一个 etcd 节点,发起 gRPC 调用;
etcd Server 收到客户端请求;
经过 gRPC 拦截、Quota 校验,Quota 模块用于校验 etcd db 文件大小是否超过了配额;
接着 KVServer 模块将请求发送给本模块中的raft,这里负责与 etcd raft模块进行通信;
发起一个提案,命令为put foo bar,即使用put 方法将 foo 更新为 bar;
在raft 中会将数据封装成 raft 日志的形式提交给 raft模块;
raft模块会首先保存到 raftLog 的 unstable 存储部分;
raft模块通过raft 协议与集群中其他 etcd 节点进行交互。
需要注意的是,在 raft 协议中写入数据的 etcd 必定是 leader 节点,如果客户端提交数据到非 leader 节点时,该节点需要将请求转发到 etcd leader 节点处理。
提案通过 RaftHTTP 网络模块转发,集群中的其他节点接收到该提案;
在收到提案之后,集群中其他节点向 leader 节点应答“我已经接收这条日志数据”;
Leader收到应答之后,统计应答的数量,当满足超过集群半数以上节点,应答接收成功;
etcd raft算法模块构造 Ready 结构体,用来通知 etcd Server 模块,该日志数据已经被 commit;
etcd Server 中的 raft 模块(交互图中有标识),收到 Ready 消息后,会将这条日志数据写入到 WAL 模块中;
正式通知 etcd Server 该提案已经被 commit;
etcd Server 调用 applierV3 模块,将日志写入持久化存储中;
etcd Server 应答客户端该数据写入成功;
etcd Server 调用 etcd raft 库,将这条日志写入到 raftLog 模块中的 storage。
上述过程中,提案经过网络转发,当多数etcd 节点持久化日志数据成功并进行应答,提案的状态会变成已提交。
在应答某条日志数据是否已经 commit 时,为什么 etcd raft 模块首先写入到 WAL 模块中?这是因为该过程仅仅添加一条日志,一方面开销小,速度会很快;另一方面,如果在后面 applierV3 写入失败,etcd 服务端在重启的时候也可以根据 WAL 模块中的日志数据进行恢复。etcd Server 从 raft 模块获取已提交的日志条目,由 applierV3 模块通过 MVCC 模块执行提案内容,更新状态机。
整个过程中,etcd raft 模块中的 raftLog 数据在内存中存储,在服务重启后失效;客户端请求的数据则被持久化保存到 WAL 和 applierV3 中,不会在重启之后丢失。
客户端API
etcd client 与 etcd 集群之间通过 grpc 进行通信。发送到 etcd 服务器的每个 API 请求都是一个 gRPC 远程过程调用。etcd 中的 RPC 接口定义根据功能分类到服务中。
处理 etcd 键值的重要服务包括:
KV Service,创建、更新、获取和删除键值对。
Watch Service,监视键的更改。
Lease Service,实现键值对过期,客户端用来续租、保持心跳。
Lock Service,etcd 提供分布式共享锁的支持。
Election Service,暴露客户端选举机制。
响应头
message ResponseHeader {
uint64 cluster_id = 1;
uint64 member_id = 2;
int64 revision = 3;
uint64 raft_term = 4;
}
Cluster_ID:产生响应的集群的 ID。
Member_ID:产生响应的成员的 ID。
应用服务可以通过 Cluster_ID 和 Member_ID 字段来确保,当前与之通信的正是预期的那个集群或者成员。
Revision:产生响应时键值存储的修订版本号。
应用服务可以使用修订号字段来获得当前键值存储库最新的修订号。应用程序指定历史修订版以进行查询,如果希望在请求时知道最新修订版,此功能特别有用。
Raft_Term:产生响应时,成员的 Raft 称谓。
应用服务可以使用 Raft_Term 来检测集群何时完成一个新的 leader 选举。
客户端连接示例代码
// client_init_test.go
package client
import (
“context”
“testing”
“time”
“go.etcd.io/etcd/clientv3”
)
// 测试客户端连接
func TestEtcdClientInit(t *testing.T) {
var (
config clientv3.Config
client *clientv3.Client
err error
)
// 客户端配置
config = clientv3.Config{
// 节点配置
Endpoints: []string{“localhost:2379”},// etcd 的多个节点服务地址,因为我是单点本机测试,所以只传 1 个。
DialTimeout: 5 * time.Second,// 创建 client 的首次连接超时,这里传了 5 秒,如果 5 秒都没有连接成功就会返回 err。需要注意的是,一旦 client 创建成功,我们就不用再关心后续底层连接的状态了,client 内部会重连。
}
// 建立连接
if client, err = clientv3.New(config); err != nil {
t.Error(err)
} else {
// 输出集群信息
t.Log(client.Cluster.MemberList(context.TODO()))
}
client.Close()
}
KV 存储
kv := clientev3.NewKV(client)
// kv 接口定义
type KV interface {
Put(ctx context.Context, key, val string, opts …OpOption) (*PutResponse, error)
// 检索 keys
Get(ctx context.Context, key string, opts …OpOption) (*GetResponse, error)
// 删除 key,可以使用 WithRange(end), [key, end) 的方式
Delete(ctx context.Context, key string, opts …OpOption) (*DeleteResponse, error)
// 压缩给定版本之前的 KV 历史
Compact(ctx context.Context, rev int64, opts …CompactOption) (*CompactResponse, error)
// 指定某种没有事务的操作
Do(ctx context.Context, op Op) (OpResponse, error)
// Txn 创建一个事务
Txn(ctx context.Context) Txn
}
KV 存储 Put
Put 的定义如下:
Put(ctx context.Context, key, val string, opts …OpOption) (*PutResponse, error)
其中的参数
ctx:Context 包对象,用来跟踪上下文,比如超时控制。
key:存储对象的 key。
val:存储对象的 value。
opts:可变参数,额外选项。
Put 将一个键值对放入 etcd 中。请注意,键值可以是纯字节数组,字符串是该字节数组的不可变表示形式。要获取字节字符串,请执行string([] byte {0x10,0x20})。
Put 的使用方法如下所示:
putResp, err := kv.Put(context.TODO(),”shopee_etcd_key”, “hello-world!”)
KV 查询 Get
现在可以对存储的数据进行取值了。默认情况下,Get 将返回“key”对应的值。
复制代码
Get(ctx context.Context, key string, opts …OpOption) (*GetResponse, error)
OpOption 为可选的函数传参,传参为WithRange(end)时,Get 将返回 [key,end) 范围内的键;传参为 WithFromKey() 时,Get 返回大于或等于 key 的键;当通过 rev> 0 传递 WithRev(rev) 时,Get 查询给定修订版本的键;如果压缩了所查找的修订版本,则返回请求失败,并显示 ErrCompacted。 传递 WithLimit(limit) 时,返回的 key 数量受 limit 限制;传参为 WithSort 时,将对键进行排序。
对应的使用方法如下:
getResp, err := kv.Get(context.TODO(), “shopee_etcd_key”)
从以上数据的存储和取值,我们知道:Put 返回 PutResponse,Get 返回 GetResponse。注意:不同的 KV 操作对应不同的 Response 结构,定义如下:
type (
CompactResponse pb.CompactionResponse
PutResponse pb.PutResponse
GetResponse pb.RangeResponse
DeleteResponse pb.DeleteRangeResponse
TxnResponse pb.TxnResponse
)
下面我们分别来看一看 PutResponse 和 GetResponse 映射的 RangeResponse 结构的定义:
复制代码
type PutResponse struct {
Header *ResponseHeader protobuf:"bytes,1,opt,name=header" json:"header,omitempty"
// 请求中如有 prev_kv,响应时也会携带 prev_kv
PrevKv *mvccpb.KeyValue protobuf:"bytes,2,opt,name=prev_kv,json=prevKv" json:"prev_kv,omitempty"
}
//Header 里保存的主要是本次更新的 revision 信息
type RangeResponse struct {
Header *ResponseHeader protobuf:"bytes,1,opt,name=header" json:"header,omitempty"
// kvs 是一个匹配 range 请求的键值对列表
Kvs []*mvccpb.KeyValue protobuf:"bytes,2,rep,name=kvs" json:"kvs,omitempty"
// more 用以分页
More bool protobuf:"varint,3,opt,name=more,proto3" json:"more,omitempty"
// count 表示 range 的键值对数量
Count int64 protobuf:"varint,4,opt,name=count,proto3" json:"count,omitempty"
}
KVS 字段,保存了本次 Get 查询到的所有 KV 对,我们继续看一下 mvccpb.KeyValue 对象的定义:
type KeyValue struct {
Key []byte protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"
// create_revision 是当前 key 的最后创建版本
CreateRevision int64 protobuf:"varint,2,opt,name=create_revision,json=createRevision,proto3" json:"create_revision,omitempty"
// mod_revision 是指当前 key 的最新修订版本
ModRevision int64 protobuf:"varint,3,opt,name=mod_revision,json=modRevision,proto3" json:"mod_revision,omitempty"
// key 的版本,每次更新都会增加版本号
Version int64 protobuf:"varint,4,opt,name=version,proto3" json:"version,omitempty"
Value []byte protobuf:"bytes,5,opt,name=value,proto3" json:"value,omitempty"
// 绑定了 key 的租期 Id,当 lease 为 0 ,则表明没有绑定 key;租期过期,则会删除 key
Lease int64 protobuf:"varint,6,opt,name=lease,proto3" json:"lease,omitempty"
}
至于 RangeResponse.More 和 Count,当我们使用 withLimit() 选项进行 Get 时会发挥作用,相当于分页查询。
接下来,我们通过一个特别的 Get 选项,获取 shopee 目录下的所有子目录:
rangeResp, err := kv.Get(context.TODO(), “/shopee”, clientv3.WithPrefix())
WithPrefix()用于查找以/shopee为前缀的所有 key,因此可以模拟出查找子目录的效果。我们知道 etcd 是一个有序的 KV 存储,因此/shopee为前缀的 key 总是顺序排列在一起。
WithPrefix 实际上会转化为范围查询,它根据前缀/shopee生成了一个 key range,[“/shopee/”, “/shopee0”),这是因为比 / 大的字符是 0,所以以 /shopee0 作为范围的末尾,就可以扫描到所有的 /aa/ 打头的 key 了。
KV 操作实践
键值对的操作是 etcd 中最基本、最常用的功能,主要包括读、写、删除三种基本的操作。在 etcd 中定义了 KV 接口,用来对外提供这些操作,下面我们进行具体测试:
package client
import (
“context”
“testing”
“time”
“github.com/google/uuid”
“go.etcd.io/etcd/clientv3”
)
func TestKV(t *testing.T) {
rootContext := context.Background()
// 客户端初始化
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{“localhost:2379”},
DialTimeout: 2 * time.Second,
})
// etcd clientv3 >= v3.2.10, grpc/grpc-go >= v1.7.3
if cli == nil || err == context.DeadlineExceeded {
// handle errors
t.Error(err)
panic(“invalid connection!”)
}
// 客户端断开连接
defer cli.Close()
// 初始化 kv
kvc := clientv3.NewKV(cli)
//获取值
ctx, cancelFunc := context.WithTimeout(rootContext, time.Duration(2)*time.Second)
response, err := kvc.Get(ctx, “cc”)
cancelFunc()
if err != nil {
t.Error(err)
}
kvs := response.Kvs
// 输出获取的 key
if len(kvs) > 0 {
t.Logf(“last value is :%s\r\n”, string(kvs[0].Value))
} else {
t.Logf(“empty key for %s\n”, “cc”)
}
//设置值
uuid := uuid.New().String()
fmt.Printf(“new value is :%s\r\n”, uuid)
ctx2, cancelFunc2 := context.WithTimeout(rootContext, time.Duration(2)*time.Second)
_, err = kvc.Put(ctx2, “cc”, uuid)
// 设置成功后,将该 key 对应的键值删除
if delRes, err := kvc.Delete(ctx2, “cc”); err != nil {
t.Error(err)
} else {
t.Logf(“delete %s for %t\n”, “cc”, delRes.Deleted > 0)
}
cancelFunc2()
if err != nil {
t.Error(err)
}
}
如上的测试用例,主要是针对 KV 的操作,依次获取 key,即 Get(),对应 etcd 底层实现的 range 接口;其次是写入键值对,即 put 操作;最后删除刚刚写入的键值对。
其他通信接口
其他常用的接口还有 Txn、Compact、Watch、Lease、Lock 等。我们依次看看这些接口的定义。
事务 Txn
Txn 方法在单个事务中处理多个请求。Txn 请求增加键值存储的修订版本,并为每个完成的请求生成带有相同修订版本的事件,etcd 不容许在一个 Txn 中多次修改同一个 key。Txn 接口定义如下:
rpc Txn(TxnRequest) returns (TxnResponse) {}
Compact
Compact 方法压缩 etcd 键值对存储中的事件历史。键值对存储应该定期压缩,否则事件历史会无限制地持续增长。Compact 接口定义如下:
rpc Compact(CompactionRequest) returns (CompactionResponse) {}
请求的消息体是 CompactionRequest, CompactionRequest 压缩键值对存储到给定修订版本,所有修订版本比压缩修订版本小的键都将被删除。
Watch
Watch API 提供了一个基于事件的接口,用于异步监视键的更改。etcd 监视程序通过给定的修订版本(当前版本或历史版本)持续监视 key 更改,并将 key 更新流回客户端。
在 rpc.proto 中 Watch Service 定义如下:
service Watch {
rpc Watch(stream WatchRequest) returns (stream WatchResponse) {}
}
Watch 观察将要发生或者已经发生的事件。输入和输出都是流,输入流用于创建和取消观察,而输出流发送事件。一个观察 RPC 可以一次性在多个 key 范围上观察,并为多个观察流化事件。整个事件历史可以从最后压缩修订版本开始观察。Watch Service 只有一个 Watch 方法。
Lease Service
Lease Service 提供租约的支持。Lease 是一种检测客户端存活状况的机制。集群授予客户端具有生存时间的租约。如果 etcd 集群在给定的 TTL 时间内未收到 keepAlive,则租约到期。
为了将租约绑定到键值存储中,每个 key 最多可以附加一个租约。当租约到期或被撤销时,该租约依附的所有 key 都将被删除,每个过期的密钥都会在事件历史记录中生成一个删除事件。
在 rpc.proto 中 Lease Service 定义的接口如下:
service Lease {
rpc LeaseGrant(LeaseGrantRequest) returns (LeaseGrantResponse) {}
rpc LeaseRevoke(LeaseRevokeRequest) returns (LeaseRevokeResponse) {}
rpc LeaseKeepAlive(stream LeaseKeepAliveRequest) returns (stream LeaseKeepAliveResponse) {}
rpc LeaseTimeToLive(LeaseTimeToLiveRequest) returns (LeaseTimeToLiveResponse) {}
}
其中:
LeaseGrant,创建一个租约;
LeaseRevoke,撤销一个租约;
LeaseKeepAlive,用于维持租约;
LeaseTimeToLive,获取租约信息。
Lock Service
Lock Service 提供分布式共享锁的支持。Lock Service 以 gRPC 接口的方式暴露客户端锁机制。在 v3lock.proto 中 Lock Service 定义如下:
service Lock {
rpc Lock(LockRequest) returns (LockResponse) {}
rpc Unlock(UnlockRequest) returns (UnlockResponse) {}
}
其中:
Lock 方法,在给定命令锁上获得分布式共享锁;
Unlock 使用 Lock 返回的 key 并释放对锁的持有。