Skip to content

Commit

Permalink
clientv3/naming/endpoints: fix endpoints prefix bug
Browse files Browse the repository at this point in the history
fixes bug with multiple endpoints with same prefix

Signed-off-by: Ramil Mirhasanov <ramil600@yahoo.com>
  • Loading branch information
ramil600 authored and HubertZhang committed May 23, 2023
1 parent 721d9fe commit 2158f21
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 3 deletions.
9 changes: 6 additions & 3 deletions client/v3/naming/endpoints/endpoints_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ func (m *endpointManager) DeleteEndpoint(ctx context.Context, key string, opts .
}

func (m *endpointManager) NewWatchChannel(ctx context.Context) (WatchChannel, error) {
resp, err := m.client.Get(ctx, m.target, clientv3.WithPrefix(), clientv3.WithSerializable())
key := m.target + "/"
resp, err := m.client.Get(ctx, key, clientv3.WithPrefix(), clientv3.WithSerializable())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -112,7 +113,8 @@ func (m *endpointManager) watch(ctx context.Context, rev int64, upch chan []*Upd

lg := m.client.GetLogger()
opts := []clientv3.OpOption{clientv3.WithRev(rev), clientv3.WithPrefix()}
wch := m.client.Watch(ctx, m.target, opts...)
key := m.target + "/"
wch := m.client.Watch(ctx, key, opts...)
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -157,7 +159,8 @@ func (m *endpointManager) watch(ctx context.Context, rev int64, upch chan []*Upd
}

func (m *endpointManager) List(ctx context.Context) (Key2EndpointMap, error) {
resp, err := m.client.Get(ctx, m.target, clientv3.WithPrefix(), clientv3.WithSerializable())
key := m.target + "/"
resp, err := m.client.Get(ctx, key, clientv3.WithPrefix(), clientv3.WithSerializable())
if err != nil {
return nil, err
}
Expand Down
39 changes: 39 additions & 0 deletions tests/integration/clientv3/naming/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/client/v3/naming/endpoints"
"go.etcd.io/etcd/client/v3/naming/resolver"
"go.etcd.io/etcd/pkg/v3/grpc_testing"
Expand Down Expand Up @@ -112,3 +113,41 @@ func TestEtcdGrpcResolver(t *testing.T) {
break
}
}

func TestEtcdEndpointManager(t *testing.T) {
integration2.BeforeTest(t)

s1PayloadBody := []byte{'1'}
s1 := grpc_testing.NewDummyStubServer(s1PayloadBody)
err := s1.Start(nil)
assert.NoError(t, err)
defer s1.Stop()

s2PayloadBody := []byte{'2'}
s2 := grpc_testing.NewDummyStubServer(s2PayloadBody)
err = s2.Start(nil)
assert.NoError(t, err)
defer s2.Stop()

clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 3})
defer clus.Terminate(t)

// Check if any endpoint with the same prefix "foo" will not break the logic with multiple endpoints
em, err := endpoints.NewManager(clus.Client(0), "foo")
assert.NoError(t, err)
emOther, err := endpoints.NewManager(clus.Client(1), "foo_other")
assert.NoError(t, err)

e1 := endpoints.Endpoint{Addr: s1.Addr()}
e2 := endpoints.Endpoint{Addr: s2.Addr()}

em.AddEndpoint(context.Background(), "foo/e1", e1)
emOther.AddEndpoint(context.Background(), "foo_other/e2", e2)

epts, err := em.List(context.Background())
assert.NoError(t, err)
eptsOther, err := emOther.List(context.Background())
assert.NoError(t, err)
assert.Equal(t, len(epts), 1)
assert.Equal(t, len(eptsOther), 1)
}

0 comments on commit 2158f21

Please sign in to comment.