diff --git a/core/discov/internal/registry.go b/core/discov/internal/registry.go index ad37fda3..2e55f21a 100644 --- a/core/discov/internal/registry.go +++ b/core/discov/internal/registry.go @@ -208,7 +208,7 @@ func (c *cluster) handleWatchEvents(key string, events []*clientv3.Event) { } } -func (c *cluster) load(cli EtcdClient, key string) { +func (c *cluster) load(cli EtcdClient, key string) int64 { var resp *clientv3.GetResponse for { var err error @@ -232,6 +232,8 @@ func (c *cluster) load(cli EtcdClient, key string) { } c.handleChanges(key, kvs) + + return resp.Header.Revision } func (c *cluster) monitor(key string, l UpdateListener) error { @@ -244,9 +246,9 @@ func (c *cluster) monitor(key string, l UpdateListener) error { return err } - c.load(cli, key) + rev := c.load(cli, key) c.watchGroup.Run(func() { - c.watch(cli, key) + c.watch(cli, key, rev) }) return nil @@ -278,22 +280,28 @@ func (c *cluster) reload(cli EtcdClient) { for _, key := range keys { k := key c.watchGroup.Run(func() { - c.load(cli, k) - c.watch(cli, k) + rev := c.load(cli, k) + c.watch(cli, k, rev) }) } } -func (c *cluster) watch(cli EtcdClient, key string) { +func (c *cluster) watch(cli EtcdClient, key string, rev int64) { for { - if c.watchStream(cli, key) { + if c.watchStream(cli, key, rev) { return } } } -func (c *cluster) watchStream(cli EtcdClient, key string) bool { - rch := cli.Watch(clientv3.WithRequireLeader(c.context(cli)), makeKeyPrefix(key), clientv3.WithPrefix()) +func (c *cluster) watchStream(cli EtcdClient, key string, rev int64) bool { + var rch clientv3.WatchChan + if rev != 0 { + rch = cli.Watch(clientv3.WithRequireLeader(c.context(cli)), makeKeyPrefix(key), clientv3.WithPrefix(), clientv3.WithRev(rev+1)) + } else { + rch = cli.Watch(clientv3.WithRequireLeader(c.context(cli)), makeKeyPrefix(key), clientv3.WithPrefix()) + } + for { select { case wresp, ok := <-rch: diff --git a/core/discov/internal/registry_test.go b/core/discov/internal/registry_test.go index 8d35c703..0839cb1e 100644 --- a/core/discov/internal/registry_test.go +++ b/core/discov/internal/registry_test.go @@ -2,6 +2,7 @@ package internal import ( "context" + "go.etcd.io/etcd/api/v3/etcdserverpb" "sync" "testing" @@ -112,6 +113,7 @@ func TestCluster_Load(t *testing.T) { restore := setMockClient(cli) defer restore() cli.EXPECT().Get(gomock.Any(), "any/", gomock.Any()).Return(&clientv3.GetResponse{ + Header: &etcdserverpb.ResponseHeader{}, Kvs: []*mvccpb.KeyValue{ { Key: []byte("hello"), @@ -168,7 +170,7 @@ func TestCluster_Watch(t *testing.T) { listener.EXPECT().OnDelete(gomock.Any()).Do(func(_ interface{}) { wg.Done() }).MaxTimes(1) - go c.watch(cli, "any") + go c.watch(cli, "any", 0) ch <- clientv3.WatchResponse{ Events: []*clientv3.Event{ { @@ -212,7 +214,7 @@ func TestClusterWatch_RespFailures(t *testing.T) { ch <- resp close(c.done) }() - c.watch(cli, "any") + c.watch(cli, "any", 0) }) } } @@ -232,7 +234,7 @@ func TestClusterWatch_CloseChan(t *testing.T) { close(ch) close(c.done) }() - c.watch(cli, "any") + c.watch(cli, "any", 0) } func TestValueOnlyContext(t *testing.T) {