
ETCD Client源码解析

ETCD 客户端分析

Watch 订阅

watch 订阅指的是 客户端(订阅者)针对 etcd 的 key 或者 范围 key 的 value 的变化而进行的订阅行为。

话不多说 开启源码之旅

watch 类包含 watchGrpcStream 类,这是实现 watch 订阅的核心。本质是基于 grpc 的 watch 机制。

type watchGrpcStream struct {
owner *watcher
remote pb.WatchClient
callOpts []grpc.CallOption

// ctx controls internal remote.Watch requests
ctx context.Context
// ctxKey is the key used when looking up this stream's context
ctxKey string
cancel context.CancelFunc

// substreams holds all active watchers on this grpc stream

watch 采用基于一个 grpc watch的原理,通过保存所有子watcher 来维持客户端所有子watch的订阅。
子watcher 指的是 采用同一个 client 调用 watch 所生成的订阅,程序中我们往往会多次调用 watch 去 watch 不同的key。
substreams map[int64]*watcherStream
// resuming holds all resuming watchers on this grpc stream
这个 resuming 数组其实是用来给 create watch 或者 resume watch 排队用的。为什么排队接下来重点讲解下,会涉及到grpc的多路复用(注意不是IO的多路复用)
resuming []*watcherStream

// reqc sends a watch request from Watch() to the main goroutine

reqc chan watchStreamRequest
// respc receives data from the watch client
respc chan *pb.WatchResponse
// donec closes to broadcast shutdown
done信号 关闭此 watchgrpcStream
donec chan struct{}
// errc transmits errors from grpc Recv to the watch stream reconnect logic
error channel 当 grpc recv 出现 err 之后,对 err 进行判断,在可重连的情况下可以进行重连
errc chan error
// closingc gets the watcherStream of closing watchers
存放关闭中的 watcherStream
closingc chan *watcherStream
// wg is Done when all substream goroutines have exited
关闭所有 substream 时用到
wg sync.WaitGroup

// resumec closes to signal that all substreams should begin resuming

恢复所有 substreams 的信号
resumec chan struct{}
// closeErr is the error that closed the watch stream
关闭 stream 的 err
closeErr error

lg *zap.Logger


grpc stream 本质基于一个 grpc.conn 进行的,stream 某种程度上只是同一个tcp长连接的subscribe接口,是针对同一个stream方法的请求上的阻塞等待。

这个时候如果需要对不同key进行阻塞等待,在保持同一个 grpc.conn 的基础上,需要针对不同的key做不同的子 watchStream:通过 ID 来唯一标识具体 watchStream

// watcherStream represents a registered watcher
type watcherStream struct {
// initReq is the request that initiated this request
请求的 Req
initReq watchRequest

// outc publishes watch responses to subscriber

给 subscriber 的 Response 阻塞 channel
outc chan WatchResponse
// recvc buffers watch responses before publishing
接受来自 watchGrpcStream 的 watchResponse
recvc chan *WatchResponse
// donec closes when the watcherStream goroutine stops.
本身自身的 close 信号
donec chan struct{}
// closing is set to true when stream should be scheduled to shutdown.
closing 标识位
closing bool
// id is the registered watch id on the grpc stream
ID 这是 watch 的唯一标识符号
id int64

// buf holds all events received from etcd but not yet consumed by the client

内部缓存 watchResponse 的切片,注意这个切片没有缓存大小的限制
buf []*WatchResponse

针对每一个 watchStream ,watchGrpcStream 都会起一个协程 goroutine 去处理

// serveSubstream forwards watch responses from run() to the subscriber
func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{}) {
if ws.closing {
panic(“created substream goroutine but substream is closing”)

// nextRev is the minimum expected next revision
nextRev := ws.initReq.rev
resuming := false
defer func() {
    if !resuming {
        ws.closing = true
    if !resuming {
        w.closingc <- ws

emptyWr := &WatchResponse{}
for {
    curWr := emptyWr
    outc := ws.outc

    if len(ws.buf) > 0 {
        curWr = ws.buf[0]
    } else {
        outc = nil

    select {
    如果 当前缓存有值 则传递给 outChan
    case outc <- *curWr:
        if ws.buf[0].Err() != nil {
        ws.buf[0] = nil
        ws.buf = ws.buf[1:]
    如果 ws 有 上层 Stream 传递下来的 wr 则进行
    case wr, ok := <-ws.recvc:
        if !ok {
            // shutdown from closeSubstream
        if wr.Created {
            if ws.initReq.retc != nil {
                ws.initReq.retc <- ws.outc
                // to prevent next write from taking the slot in buffered channel
                // and posting duplicate create events
                ws.initReq.retc = nil

                // send first creation event only if requested
                if ws.initReq.createdNotify {
                    ws.outc <- *wr
                // once the watch channel is returned, a current revision
                // watch must resume at the store revision. This is necessary
                // for the following case to work as expected:
                //    wch := m1.Watch("a")
                //    m2.Put("a", "b")
                //    <-wch
                // If the revision is only bound on the first observed event,
                // if wch is disconnected before the Put is issued, then reconnects
                // after it is committed, it'll miss the Put.
    这是针对注释所说的场景 设置下一个watch的Revision为当前返回的最新revision
                if ws.initReq.rev == 0 {
                    nextRev = wr.Header.Revision
        } else {
            // current progress of watch; <= store revision
            nextRev = wr.Header.Revision

        if len(wr.Events) > 0 {
            nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1
        ws.initReq.rev = nextRev

        // created event is already sent above,
        // watcher should not post duplicate events
        if wr.Created {

        // TODO pause channel if buffer gets too large
        ws.buf = append(ws.buf, wr)
    case <-w.ctx.Done():
    case <-ws.initReq.ctx.Done():
    case <-resumec:
        resuming = true
// lazily send cancel message if events on missing id


接下来看下 watchGrpcStream 如何排队发送请求和处理回复的。

run方法是核心处理方法,平心而论 写的复杂,细节很多且复杂,很难一下子明白为什么这样设计,为什么有这个变量 为什么需要这段处理代码。废话不多说,开始阅读源码

// run is the root of the goroutines for managing a watcher client
func (w *watchGrpcStream) run() {
var wc pb.Watch_WatchClient
var closeErr error

// substreams marked to close but goroutine still running; needed for
// avoiding double-closing recvc on grpc stream teardown
closing := make(map[*watcherStream]struct{})

defer func() {
    w.closeErr = closeErr
    // shutdown substreams and resuming substreams
    for _, ws := range w.substreams {
        if _, ok := closing[ws]; !ok {
            closing[ws] = struct{}{}
    for _, ws := range w.resuming {
        if _, ok := closing[ws]; ws != nil && !ok {
            closing[ws] = struct{}{}
    for range closing {

// start a stream with the etcd grpc server

此处建立 grpc 连接
if wc, closeErr = w.newWatchClient(); closeErr != nil {

cancelSet := make(map[int64]struct{})

var cur *pb.WatchResponse
for {
    select {
    // Watch() requested
    case req := <-w.reqc:
        switch wreq := req.(type) {
        case *watchRequest:
            outc := make(chan WatchResponse, 1)
            // TODO: pass custom watch ID?
            ws := &watcherStream{
                initReq: *wreq,
                id:      -1,
                outc:    outc,
                // unbuffered so resumes won't cause repeat events
                recvc: make(chan *WatchResponse),

            ws.donec = make(chan struct{})
            go w.serveSubstream(ws, w.resumec)

            // queue up for watcher creation/resume
            w.resuming = append(w.resuming, ws)
            if len(w.resuming) == 1 {
                // head of resume queue, can register a new watcher
                if err := wc.Send(ws.initReq.toPB()); err != nil {
                    w.lg.Debug("error when sending request", zap.Error(err))
        case *progressRequest:
            if err := wc.Send(wreq.toPB()); err != nil {
                w.lg.Debug("error when sending request", zap.Error(err))

    // new events from the watch client
    case pbresp := <-w.respc:
        if cur == nil || pbresp.Created || pbresp.Canceled {
            cur = pbresp
        } else if cur != nil && cur.WatchId == pbresp.WatchId {
            // merge new events
            分段 watchResponse
            cur.Events = append(cur.Events, pbresp.Events...)
            // update "Fragment" field; last response with "Fragment" == false
            cur.Fragment = pbresp.Fragment

        switch {
        case pbresp.Created:
            // response to head of queue creation
            if ws := w.resuming[0]; ws != nil {
            收到 Created Response,添加 子Stream
                w.addSubstream(pbresp, ws)
            转发 wr 到对应的 subStream:watchStream 的Recvc channel,也就是上文提及的watchStream的接受channel
                w.resuming[0] = nil

            if ws := w.nextResume(); ws != nil {
                if err := wc.Send(ws.initReq.toPB()); err != nil {
                    w.lg.Debug("error when sending request", zap.Error(err))

            // reset for next iteration
            cur = nil
        服务端主动canceled:cancled信息 CompactRevision 为 0
        case pbresp.Canceled && pbresp.CompactRevision == 0:
            delete(cancelSet, pbresp.WatchId)
            if ws, ok := w.substreams[pbresp.WatchId]; ok {
                // signal to stream goroutine to update closingc
            关闭ws的接收 channel
            记录ws,防止 ws.recvc 被 close 两次
                closing[ws] = struct{}{}

            // reset for next iteration
            cur = nil

        case cur.Fragment:
            // watch response events are still fragmented
            // continue to fetch next fragmented event arrival

            // dispatch to appropriate watch stream
            ok := w.dispatchEvent(cur)

            // reset for next iteration
            cur = nil

            if ok {

            // watch response on unexpected watch id; cancel id
            if _, ok := cancelSet[pbresp.WatchId]; ok {
            cancelSet[pbresp.WatchId] = struct{}{}
            cr := &pb.WatchRequest_CancelRequest{
                CancelRequest: &pb.WatchCancelRequest{
                    WatchId: pbresp.WatchId,
            req := &pb.WatchRequest{RequestUnion: cr}
            w.lg.Debug("sending watch cancel request for failed dispatch", zap.Int64("watch-id", pbresp.WatchId))
            if err := wc.Send(req); err != nil {
                w.lg.Debug("failed to send watch cancel request", zap.Int64("watch-id", pbresp.WatchId), zap.Error(err))

    // watch client failed on Recv; spawn another if possible
    case err := <-w.errc:
        if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.ErrNoLeader {
            closeErr = err
        重新链接 client 端
        if wc, closeErr = w.newWatchClient(); closeErr != nil {
        if ws := w.nextResume(); ws != nil {
            if err := wc.Send(ws.initReq.toPB()); err != nil {
                w.lg.Debug("error when sending request", zap.Error(err))
        cancelSet = make(map[int64]struct{})

    case <-w.ctx.Done():

    case ws := <-w.closingc:
        delete(closing, ws)
        // no more watchers on this stream, shutdown, skip cancellation
        if len(w.substreams)+len(w.resuming) == 0 {
        if ws.id != -1 {
            // client is closing an established watch; close it on the server proactively instead of waiting
            // to close when the next message arrives
            cancelSet[ws.id] = struct{}{}
            cr := &pb.WatchRequest_CancelRequest{
                CancelRequest: &pb.WatchCancelRequest{
                    WatchId: ws.id,
            req := &pb.WatchRequest{RequestUnion: cr}
            w.lg.Debug("sending watch cancel request for closed watcher", zap.Int64("watch-id", ws.id))
            if err := wc.Send(req); err != nil {
                w.lg.Debug("failed to send watch cancel request", zap.Int64("watch-id", ws.id), zap.Error(err))
