ETCD 客户端分析
Watch 订阅
watch 订阅指的是 客户端(订阅者)针对 etcd 的 key 或者 范围 key 的 value 的变化而进行的订阅行为。
话不多说 开启源码之旅
watch 类包含 watchGrpcStream 类,这是实现 watch 订阅的核心。本质是基于 grpc 的 watch 机制。
type watchGrpcStream struct {
owner *watcher
remote pb.WatchClient
callOpts []grpc.CallOption
// ctx controls internal remote.Watch requests
ctx context.Context
// ctxKey is the key used when looking up this stream's context
ctxKey string
cancel context.CancelFunc
// substreams holds all active watchers on this grpc stream
watch 采用基于一个 grpc watch的原理,通过保存所有子watcher 来维持客户端所有子watch的订阅。
子watcher 指的是 采用同一个 client 调用 watch 所生成的订阅,程序中我们往往会多次调用 watch 去 watch 不同的key。
substreams map[int64]*watcherStream
// resuming holds all resuming watchers on this grpc stream
这个 resuming 数组其实是用来给 create watch 或者 resume watch 排队用的。为什么排队接下来重点讲解下,会涉及到grpc的多路复用(注意不是IO的多路复用)
resuming []*watcherStream
// reqc sends a watch request from Watch() to the main goroutine
请求队列channel
reqc chan watchStreamRequest
// respc receives data from the watch client
回复队列channel
respc chan *pb.WatchResponse
// donec closes to broadcast shutdown
done信号 关闭此 watchgrpcStream
donec chan struct{}
// errc transmits errors from grpc Recv to the watch stream reconnect logic
error channel 当 grpc recv 出现 err 之后,对 err 进行判断,在可重连的情况下可以进行重连
errc chan error
// closingc gets the watcherStream of closing watchers
存放关闭中的 watcherStream
closingc chan *watcherStream
// wg is Done when all substream goroutines have exited
关闭所有 substream 时用到
wg sync.WaitGroup
// resumec closes to signal that all substreams should begin resuming
恢复所有 substreams 的信号
resumec chan struct{}
// closeErr is the error that closed the watch stream
关闭 stream 的 err
closeErr error
lg *zap.Logger
}
resuming []*watcherStream
resuming 数组其实是用来给 create watch 或者 resume watch 排队用的。为什么排队接下来重点讲解下,会涉及到grpc的多路复用(注意不是IO的多路复用)
grpc stream 本质基于一个 grpc.conn 进行的,stream 某种程度上只是同一个tcp长连接的subscribe接口,是针对同一个stream方法的请求上的阻塞等待。
这个时候如果需要对不同key进行阻塞等待,在保持同一个 grpc.conn 的基础上,需要针对不同的key做不同的子 watchStream:通过 ID 来唯一标识具体 watchStream
// watcherStream represents a registered watcher
type watcherStream struct {
// initReq is the request that initiated this request
请求的 Req
initReq watchRequest
// outc publishes watch responses to subscriber
给 subscriber 的 Response 阻塞 channel
outc chan WatchResponse
// recvc buffers watch responses before publishing
接受来自 watchGrpcStream 的 watchResponse
recvc chan *WatchResponse
// donec closes when the watcherStream goroutine stops.
本身自身的 close 信号
donec chan struct{}
// closing is set to true when stream should be scheduled to shutdown.
closing 标识位
closing bool
// id is the registered watch id on the grpc stream
ID 这是 watch 的唯一标识符号
id int64
// buf holds all events received from etcd but not yet consumed by the client
内部缓存 watchResponse 的切片,注意这个切片没有缓存大小的限制
buf []*WatchResponse
}
针对每一个 watchStream ,watchGrpcStream 都会起一个协程 goroutine 去处理
// serveSubstream forwards watch responses from run() to the subscriber
func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{}) {
if ws.closing {
panic(“created substream goroutine but substream is closing”)
}
// nextRev is the minimum expected next revision
nextRev := ws.initReq.rev
resuming := false
defer func() {
if !resuming {
ws.closing = true
}
close(ws.donec)
if !resuming {
w.closingc <- ws
}
w.wg.Done()
}()
emptyWr := &WatchResponse{}
for {
curWr := emptyWr
outc := ws.outc
先从缓存获取
if len(ws.buf) > 0 {
curWr = ws.buf[0]
} else {
outc = nil
}
select {
如果 当前缓存有值 则传递给 outChan
case outc <- *curWr:
if ws.buf[0].Err() != nil {
return
}
ws.buf[0] = nil
ws.buf = ws.buf[1:]
如果 ws 有 上层 Stream 传递下来的 wr 则进行
case wr, ok := <-ws.recvc:
if !ok {
// shutdown from closeSubstream
return
}
如果是新建请求的response
if wr.Created {
if ws.initReq.retc != nil {
ws.initReq.retc <- ws.outc
// to prevent next write from taking the slot in buffered channel
// and posting duplicate create events
ws.initReq.retc = nil
// send first creation event only if requested
如果请求的req设置了创建提醒功能
if ws.initReq.createdNotify {
ws.outc <- *wr
}
// once the watch channel is returned, a current revision
// watch must resume at the store revision. This is necessary
// for the following case to work as expected:
// wch := m1.Watch("a")
// m2.Put("a", "b")
// <-wch
// If the revision is only bound on the first observed event,
// if wch is disconnected before the Put is issued, then reconnects
// after it is committed, it'll miss the Put.
这是针对注释所说的场景 设置下一个watch的Revision为当前返回的最新revision
if ws.initReq.rev == 0 {
nextRev = wr.Header.Revision
}
}
} else {
// current progress of watch; <= store revision
设置下一个watch的Revision为当前返回的最新revision
nextRev = wr.Header.Revision
}
if len(wr.Events) > 0 {
nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1
}
设置下一个watch的Revision为当前返回的最新revision
ws.initReq.rev = nextRev
// created event is already sent above,
// watcher should not post duplicate events
if wr.Created {
continue
}
// TODO pause channel if buffer gets too large
ws.buf = append(ws.buf, wr)
case <-w.ctx.Done():
return
case <-ws.initReq.ctx.Done():
return
case <-resumec:
resuming = true
return
}
}
// lazily send cancel message if events on missing id
}
接下来看下 watchGrpcStream 如何排队发送请求和处理回复的。
run方法是核心处理方法,平心而论 写的复杂,细节很多且复杂,很难一下子明白为什么这样设计,为什么有这个变量 为什么需要这段处理代码。废话不多说,开始阅读源码
// run is the root of the goroutines for managing a watcher client
func (w *watchGrpcStream) run() {
var wc pb.Watch_WatchClient
var closeErr error
// substreams marked to close but goroutine still running; needed for
// avoiding double-closing recvc on grpc stream teardown
closing := make(map[*watcherStream]struct{})
defer func() {
w.closeErr = closeErr
// shutdown substreams and resuming substreams
for _, ws := range w.substreams {
if _, ok := closing[ws]; !ok {
close(ws.recvc)
closing[ws] = struct{}{}
}
}
for _, ws := range w.resuming {
if _, ok := closing[ws]; ws != nil && !ok {
close(ws.recvc)
closing[ws] = struct{}{}
}
}
w.joinSubstreams()
for range closing {
w.closeSubstream(<-w.closingc)
}
w.wg.Wait()
w.owner.closeStream(w)
}()
// start a stream with the etcd grpc server
此处建立 grpc 连接
if wc, closeErr = w.newWatchClient(); closeErr != nil {
return
}
这个是用来记录是否主动发送过cancel
cancelSet := make(map[int64]struct{})
var cur *pb.WatchResponse
for {
select {
// Watch() requested
case req := <-w.reqc:
switch wreq := req.(type) {
case *watchRequest:
outc := make(chan WatchResponse, 1)
// TODO: pass custom watch ID?
ws := &watcherStream{
initReq: *wreq,
id: -1,
outc: outc,
// unbuffered so resumes won't cause repeat events
recvc: make(chan *WatchResponse),
}
ws.donec = make(chan struct{})
w.wg.Add(1)
go w.serveSubstream(ws, w.resumec)
// queue up for watcher creation/resume
在此处排队处理请求
w.resuming = append(w.resuming, ws)
if len(w.resuming) == 1 {
// head of resume queue, can register a new watcher
发送stream请求
if err := wc.Send(ws.initReq.toPB()); err != nil {
w.lg.Debug("error when sending request", zap.Error(err))
}
}
case *progressRequest:
if err := wc.Send(wreq.toPB()); err != nil {
w.lg.Debug("error when sending request", zap.Error(err))
}
}
// new events from the watch client
case pbresp := <-w.respc:
if cur == nil || pbresp.Created || pbresp.Canceled {
cur = pbresp
} else if cur != nil && cur.WatchId == pbresp.WatchId {
// merge new events
分段 watchResponse
cur.Events = append(cur.Events, pbresp.Events...)
// update "Fragment" field; last response with "Fragment" == false
cur.Fragment = pbresp.Fragment
}
switch {
case pbresp.Created:
// response to head of queue creation
if ws := w.resuming[0]; ws != nil {
收到 Created Response,添加 子Stream
w.addSubstream(pbresp, ws)
转发 wr 到对应的 subStream:watchStream 的Recvc channel,也就是上文提及的watchStream的接受channel
w.dispatchEvent(pbresp)
w.resuming[0] = nil
}
if ws := w.nextResume(); ws != nil {
if err := wc.Send(ws.initReq.toPB()); err != nil {
w.lg.Debug("error when sending request", zap.Error(err))
}
}
// reset for next iteration
cur = nil
服务端主动canceled:cancled信息 CompactRevision 为 0
case pbresp.Canceled && pbresp.CompactRevision == 0:
delete(cancelSet, pbresp.WatchId)
if ws, ok := w.substreams[pbresp.WatchId]; ok {
// signal to stream goroutine to update closingc
关闭ws的接收 channel
close(ws.recvc)
记录ws,防止 ws.recvc 被 close 两次
closing[ws] = struct{}{}
}
// reset for next iteration
cur = nil
case cur.Fragment:
// watch response events are still fragmented
// continue to fetch next fragmented event arrival
continue
default:
// dispatch to appropriate watch stream
ok := w.dispatchEvent(cur)
// reset for next iteration
cur = nil
if ok {
break
}
// watch response on unexpected watch id; cancel id
记录取消的ws,防止重复发送
if _, ok := cancelSet[pbresp.WatchId]; ok {
break
}
设置取消标记
cancelSet[pbresp.WatchId] = struct{}{}
cr := &pb.WatchRequest_CancelRequest{
CancelRequest: &pb.WatchCancelRequest{
WatchId: pbresp.WatchId,
},
}
发送取消请求
req := &pb.WatchRequest{RequestUnion: cr}
w.lg.Debug("sending watch cancel request for failed dispatch", zap.Int64("watch-id", pbresp.WatchId))
if err := wc.Send(req); err != nil {
w.lg.Debug("failed to send watch cancel request", zap.Int64("watch-id", pbresp.WatchId), zap.Error(err))
}
}
// watch client failed on Recv; spawn another if possible
case err := <-w.errc:
判断错误类型
if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.ErrNoLeader {
closeErr = err
return
}
重新链接 client 端
if wc, closeErr = w.newWatchClient(); closeErr != nil {
return
}
获取下一个请求
if ws := w.nextResume(); ws != nil {
if err := wc.Send(ws.initReq.toPB()); err != nil {
w.lg.Debug("error when sending request", zap.Error(err))
}
}
重置取消集合
cancelSet = make(map[int64]struct{})
case <-w.ctx.Done():
return
case ws := <-w.closingc:
w.closeSubstream(ws)
delete(closing, ws)
// no more watchers on this stream, shutdown, skip cancellation
if len(w.substreams)+len(w.resuming) == 0 {
return
}
if ws.id != -1 {
// client is closing an established watch; close it on the server proactively instead of waiting
// to close when the next message arrives
设置取消标记
cancelSet[ws.id] = struct{}{}
发送取消请求
cr := &pb.WatchRequest_CancelRequest{
CancelRequest: &pb.WatchCancelRequest{
WatchId: ws.id,
},
}
req := &pb.WatchRequest{RequestUnion: cr}
w.lg.Debug("sending watch cancel request for closed watcher", zap.Int64("watch-id", ws.id))
if err := wc.Send(req); err != nil {
w.lg.Debug("failed to send watch cancel request", zap.Int64("watch-id", ws.id), zap.Error(err))
}
}
}
}
}