From 00dde7193fe6994e265ba5ec18d73f250a9050b6 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Wed, 13 Jan 2021 19:39:46 -0500 Subject: [PATCH 1/4] [tests] Add logging for TestGRPCResolverRoundRobin Signed-off-by: Yuri Shkuro --- pkg/discovery/grpcresolver/grpc_resolver_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/discovery/grpcresolver/grpc_resolver_test.go b/pkg/discovery/grpcresolver/grpc_resolver_test.go index f6bcdd7cf67..32fc94b1179 100644 --- a/pkg/discovery/grpcresolver/grpc_resolver_test.go +++ b/pkg/discovery/grpcresolver/grpc_resolver_test.go @@ -92,11 +92,12 @@ func makeSureConnectionsUp(t *testing.T, count int, testc grpctest.TestServiceCl if _, ok := addrs[p.Addr.String()]; !ok { addrs[p.Addr.String()] = struct{}{} connected = true + t.Logf("connected to peer #%d (%v) on iteration %d", si, p.Addr, i) break } time.Sleep(time.Millisecond * 10) } - assert.True(t, connected, "Connection was still not up") + assert.True(t, connected, "Connection #%d was still not up. Connections so far: %+v", si, addrs) } } From 5ddf59597264d16b2678d0d1984ca9f7f395eac2 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Wed, 13 Jan 2021 19:48:45 -0500 Subject: [PATCH 2/4] rename var for clarity Signed-off-by: Yuri Shkuro --- pkg/discovery/grpcresolver/grpc_resolver.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/discovery/grpcresolver/grpc_resolver.go b/pkg/discovery/grpcresolver/grpc_resolver.go index ba345c90280..eee40732df4 100644 --- a/pkg/discovery/grpcresolver/grpc_resolver.go +++ b/pkg/discovery/grpcresolver/grpc_resolver.go @@ -134,11 +134,11 @@ func (r *Resolver) rendezvousHash(addresses []string) []string { } sort.Sort(hosts) size := min(r.discoveryMinPeers, len(hosts)) - addressesPerHost := make([]string, size) + topAddrs := make([]string, size) for i := 0; i < size; i++ { - addressesPerHost[i] = hosts[i].address + topAddrs[i] = hosts[i].address } - return addressesPerHost + return topAddrs } func min(a, b int) int { From b424d3644064f8de9c0616c15d70fcbffe2e56bc Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Wed, 13 Jan 2021 20:01:52 -0500 Subject: [PATCH 3/4] cleanup Signed-off-by: Yuri Shkuro --- pkg/discovery/grpcresolver/grpc_resolver.go | 27 ++++++++++--------- .../grpcresolver/grpc_resolver_test.go | 7 +++-- 2 files changed, 20 insertions(+), 14 deletions(-) diff --git a/pkg/discovery/grpcresolver/grpc_resolver.go b/pkg/discovery/grpcresolver/grpc_resolver.go index eee40732df4..2329870f7ca 100644 --- a/pkg/discovery/grpcresolver/grpc_resolver.go +++ b/pkg/discovery/grpcresolver/grpc_resolver.go @@ -123,22 +123,25 @@ func (r *Resolver) Close() { r.closing.Wait() } +// rendezvousHash is the core of the algorithm. It takes input addresses, +// assigns each of them a hash, sorts them by those hash values, and +// returns top N of entries from the sorted list, up to minPeers parameter. func (r *Resolver) rendezvousHash(addresses []string) []string { hasher := fnv.New32() - hosts := hostScores{} - for _, address := range addresses { - hosts = append(hosts, hostScore{ + hosts := make(hostScores, len(addresses)) + for i, address := range addresses { + hosts[i] = hostScore{ address: address, score: hashAddr(hasher, []byte(address), r.salt), - }) + } } sort.Sort(hosts) - size := min(r.discoveryMinPeers, len(hosts)) - topAddrs := make([]string, size) - for i := 0; i < size; i++ { - topAddrs[i] = hosts[i].address + n := min(r.discoveryMinPeers, len(hosts)) + topN := make([]string, n) + for i := 0; i < n; i++ { + topN[i] = hosts[i].address } - return topAddrs + return topN } func min(a, b int) int { @@ -162,9 +165,9 @@ func (r *Resolver) updateAddresses(hostPorts []string) { } func generateAddresses(instances []string) []resolver.Address { - var addrs []resolver.Address - for _, instance := range instances { - addrs = append(addrs, resolver.Address{Addr: instance}) + addrs := make([]resolver.Address, len(instances)) + for i, instance := range instances { + addrs[i] = resolver.Address{Addr: instance} } return addrs } diff --git a/pkg/discovery/grpcresolver/grpc_resolver_test.go b/pkg/discovery/grpcresolver/grpc_resolver_test.go index 32fc94b1179..93c32e7c823 100644 --- a/pkg/discovery/grpcresolver/grpc_resolver_test.go +++ b/pkg/discovery/grpcresolver/grpc_resolver_test.go @@ -136,10 +136,13 @@ func TestGRPCResolverRoundRobin(t *testing.T) { minPeers int connections int // expected number of unique connections to servers }{ - {3, 3}, {5, 5}, {7, 5}, + {minPeers: 3, connections: 3}, + {minPeers: 5, connections: 3}, + // note: test cannot succeed with minPeers < connections because resolver + // will never return more than minPeers addresses. } for _, test := range tests { - t.Run(fmt.Sprintf("minPeers=%d", test.minPeers), func(t *testing.T) { + t.Run(fmt.Sprintf("%+v", test), func(t *testing.T) { res := New(notifier, discoverer, zap.NewNop(), test.minPeers) defer resolver.UnregisterForTesting(res.Scheme()) From dface2fa2c42bc60494910446357e87fb00e5975 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Wed, 13 Jan 2021 20:09:46 -0500 Subject: [PATCH 4/4] increase timeout Signed-off-by: Yuri Shkuro --- pkg/discovery/grpcresolver/grpc_resolver_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/discovery/grpcresolver/grpc_resolver_test.go b/pkg/discovery/grpcresolver/grpc_resolver_test.go index 93c32e7c823..7840efbee52 100644 --- a/pkg/discovery/grpcresolver/grpc_resolver_test.go +++ b/pkg/discovery/grpcresolver/grpc_resolver_test.go @@ -84,7 +84,7 @@ func makeSureConnectionsUp(t *testing.T, count int, testc grpctest.TestServiceCl // Make sure connections to all servers are up. for si := 0; si < count; si++ { connected := false - for i := 0; i < 1000; i++ { + for i := 0; i < 3000; i++ { // 3000 * 10ms = 30s _, err := testc.EmptyCall(context.Background(), &grpctest.Empty{}, grpc.Peer(&p)) if err != nil { continue