Skip to content

Commit

Permalink
clientv3: Add retry for round robin load balancer using grpc-middlewa…
Browse files Browse the repository at this point in the history
…re's retry interceptor
  • Loading branch information
jpbetz committed May 18, 2018
1 parent 6593d59 commit 9c962cb
Show file tree
Hide file tree
Showing 14 changed files with 106 additions and 107 deletions.
46 changes: 38 additions & 8 deletions clientv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"github.com/coreos/etcd/clientv3/balancer/picker"
"github.com/coreos/etcd/clientv3/balancer/resolver/endpoint"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"github.com/grpc-ecosystem/go-grpc-middleware/util/backoffutils"
"go.uber.org/zap"

"google.golang.org/grpc"
Expand All @@ -45,13 +47,15 @@ var (
ErrOldCluster = errors.New("etcdclient: old cluster version")

roundRobinBalancerName = fmt.Sprintf("etcd-%s", picker.RoundrobinBalanced.String())
logger *zap.Logger
)

func init() {
logger = zap.NewNop() // zap.NewExample()
balancer.RegisterBuilder(balancer.Config{
Policy: picker.RoundrobinBalanced,
Name: roundRobinBalancerName,
Logger: zap.NewNop(), // zap.NewExample(),
Logger: logger,
})
}

Expand Down Expand Up @@ -263,6 +267,16 @@ func (c *Client) dialSetupOpts(target string, dopts ...grpc.DialOption) (opts []
opts = append(opts, grpc.WithInsecure())
}

// Interceptor retry and backoff.
// TODO: Replace all of clientv3/retry.go with interceptor based retry, or with
// https://github.com/grpc/proposal/blob/master/A6-client-retries.md#retry-policy
// once it is available.
rrBackoff := grpc_retry.WithBackoff(c.roundRobinQuorumBackoff(defaultBackoffWaitBetween, defaultBackoffJitterFraction))
opts = append(opts,
grpc.WithStreamInterceptor(grpc_retry.StreamClientInterceptor(grpc_retry.WithMax(defaultStreamMaxRetries), rrBackoff)),
grpc.WithUnaryInterceptor(grpc_retry.UnaryClientInterceptor(grpc_retry.WithMax(defaultUnaryMaxRetries), rrBackoff)),
)

return opts, nil
}

Expand Down Expand Up @@ -386,14 +400,14 @@ func newClient(cfg *Config) (*Client, error) {

ctx, cancel := context.WithCancel(baseCtx)
client := &Client{
conn: nil,
cfg: *cfg,
creds: creds,
ctx: ctx,
cancel: cancel,
mu: new(sync.Mutex),
callOpts: defaultCallOpts,
conn: nil,
cfg: *cfg,
creds: creds,
ctx: ctx,
cancel: cancel,
mu: new(sync.Mutex),
}

if cfg.Username != "" && cfg.Password != "" {
client.Username = cfg.Username
client.Password = cfg.Password
Expand Down Expand Up @@ -461,6 +475,22 @@ func newClient(cfg *Config) (*Client, error) {
return client, nil
}

// roundRobinQuorumBackoff retries against quorum between each backoff.
// This is intended for use with a round robin load balancer.
func (c *Client) roundRobinQuorumBackoff(waitBetween time.Duration, jitterFraction float64) grpc_retry.BackoffFunc {
return func(attempt uint) time.Duration {
// after each round robin across quorum, backoff for our wait between duration
n := uint(len(c.Endpoints()))
quorum := (n/2 + 1)
if attempt%quorum == 0 {
logger.Info("backoff", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum), zap.Duration("waitBetween", waitBetween), zap.Float64("jitterFraction", jitterFraction))
return backoffutils.JitterUp(waitBetween, jitterFraction)
}
logger.Info("backoff skipped", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum))
return 0
}
}

func (c *Client) checkVersion() (err error) {
var wg sync.WaitGroup
errc := make(chan error, len(c.cfg.Endpoints))
Expand Down
7 changes: 3 additions & 4 deletions clientv3/integration/dial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ func TestDialTLSNoConfig(t *testing.T) {

// TODO: this should not be required when we set grpc.WithBlock()
if c != nil {
_, err = c.KV.Get(context.Background(), "/")
ctx, cancel := context.WithTimeout(context.Background(), integration.RequestWaitTimeout)
_, err = c.KV.Get(ctx, "/")
cancel()
}
if !isClientTimeout(err) {
t.Fatalf("expected dial timeout error, got %v", err)
Expand Down Expand Up @@ -157,9 +159,6 @@ func TestSwitchSetEndpoints(t *testing.T) {

cli.SetEndpoints(eps...)

// TODO: Remove wait once the new grpc load balancer provides retry.
integration.WaitClientV3(t, cli)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if _, err := cli.Get(ctx, "foo"); err != nil {
Expand Down
17 changes: 5 additions & 12 deletions clientv3/integration/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,15 +438,12 @@ func TestKVGetErrConnClosed(t *testing.T) {

cli := clus.Client(0)

// TODO: Remove wait once the new grpc load balancer provides retry.
integration.WaitClientV3(t, cli)

donec := make(chan struct{})
go func() {
defer close(donec)
_, err := cli.Get(context.TODO(), "foo")
if err != nil && err != context.Canceled && err != grpc.ErrClientConnClosing && !isServerUnavailable(err) {
t.Fatalf("expected %v, %v or server unavailable, got %v", context.Canceled, grpc.ErrClientConnClosing, err)
if err != nil && err != context.Canceled && err != grpc.ErrClientConnClosing {
t.Fatalf("expected %v or %v, got %v", context.Canceled, grpc.ErrClientConnClosing, err)
}
}()

Expand Down Expand Up @@ -689,8 +686,6 @@ func TestKVGetRetry(t *testing.T) {

donec := make(chan struct{})
go func() {
// TODO: Remove wait once the new grpc load balancer provides retry.
integration.WaitClientV3(t, kv)
// Get will fail, but reconnect will trigger
gresp, gerr := kv.Get(ctx, "foo")
if gerr != nil {
Expand Down Expand Up @@ -741,8 +736,6 @@ func TestKVPutFailGetRetry(t *testing.T) {

donec := make(chan struct{})
go func() {
// TODO: Remove wait once the new grpc load balancer provides retry.
integration.WaitClientV3(t, kv)
// Get will fail, but reconnect will trigger
gresp, gerr := kv.Get(context.TODO(), "foo")
if gerr != nil {
Expand Down Expand Up @@ -800,7 +793,7 @@ func TestKVGetStoppedServerAndClose(t *testing.T) {
// this Get fails and triggers an asynchronous connection retry
_, err := cli.Get(ctx, "abc")
cancel()
if err != nil && !(isServerUnavailable(err) || isCanceled(err) || isClientTimeout(err)) {
if err != nil && !(isCanceled(err) || isClientTimeout(err)) {
t.Fatal(err)
}
}
Expand All @@ -822,15 +815,15 @@ func TestKVPutStoppedServerAndClose(t *testing.T) {
// grpc finds out the original connection is down due to the member shutdown.
_, err := cli.Get(ctx, "abc")
cancel()
if err != nil && !(isServerUnavailable(err) || isCanceled(err) || isClientTimeout(err)) {
if err != nil && !(isCanceled(err) || isClientTimeout(err)) {
t.Fatal(err)
}

ctx, cancel = context.WithTimeout(context.TODO(), time.Second)
// this Put fails and triggers an asynchronous connection retry
_, err = cli.Put(ctx, "abc", "123")
cancel()
if err != nil && !(isServerUnavailable(err) || isCanceled(err) || isClientTimeout(err)) {
if err != nil && !(isCanceled(err) || isClientTimeout(err)) {
t.Fatal(err)
}
}
Expand Down
10 changes: 7 additions & 3 deletions clientv3/integration/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ func TestLeaseKeepAlive(t *testing.T) {
t.Errorf("chan is closed, want not closed")
}

if kresp == nil {
t.Fatalf("unexpected null response")
}

if kresp.ID != resp.ID {
t.Errorf("ID = %x, want %x", kresp.ID, resp.ID)
}
Expand Down Expand Up @@ -292,7 +296,7 @@ func TestLeaseGrantErrConnClosed(t *testing.T) {
go func() {
defer close(donec)
_, err := cli.Grant(context.TODO(), 5)
if err != nil && err != grpc.ErrClientConnClosing && err != context.Canceled && !isServerUnavailable(err) {
if err != nil && err != grpc.ErrClientConnClosing && err != context.Canceled {
// grpc.ErrClientConnClosing if grpc-go balancer calls 'Get' after client.Close.
// context.Canceled if grpc-go balancer calls 'Get' with an inflight client.Close.
t.Fatalf("expected %v, %v or server unavailable, got %v", err != context.Canceled, grpc.ErrClientConnClosing, err)
Expand Down Expand Up @@ -324,7 +328,7 @@ func TestLeaseGrantNewAfterClose(t *testing.T) {

donec := make(chan struct{})
go func() {
if _, err := cli.Grant(context.TODO(), 5); err != context.Canceled && err != grpc.ErrClientConnClosing && !isServerUnavailable(err) {
if _, err := cli.Grant(context.TODO(), 5); err != context.Canceled && err != grpc.ErrClientConnClosing {
t.Fatalf("expected %v, %v or server unavailable, got %v", err != context.Canceled, grpc.ErrClientConnClosing, err)
}
close(donec)
Expand Down Expand Up @@ -356,7 +360,7 @@ func TestLeaseRevokeNewAfterClose(t *testing.T) {

donec := make(chan struct{})
go func() {
if _, err := cli.Revoke(context.TODO(), leaseID); err != context.Canceled && err != grpc.ErrClientConnClosing && !isServerUnavailable(err) {
if _, err := cli.Revoke(context.TODO(), leaseID); err != context.Canceled && err != grpc.ErrClientConnClosing {
t.Fatalf("expected %v, %v or server unavailable, got %v", err != context.Canceled, grpc.ErrClientConnClosing, err)
}
close(donec)
Expand Down
14 changes: 2 additions & 12 deletions clientv3/integration/leasing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,9 +869,6 @@ func TestLeasingTxnCancel(t *testing.T) {
}
clus.Members[0].Stop(t)

// TODO: Remove wait once the new grpc load balancer provides retry.
integration.WaitClientV3(t, clus.Client(1))

// wait for leader election, if any
if _, err = clus.Client(1).Get(context.TODO(), "abc"); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1536,9 +1533,6 @@ func TestLeasingReconnectOwnerConsistency(t *testing.T) {
}
}

// TODO: Remove wait once the new grpc load balancer provides retry.
integration.WaitClientV3(t, lkv)

lresp, lerr := lkv.Get(context.TODO(), "k")
if lerr != nil {
t.Fatal(lerr)
Expand Down Expand Up @@ -1820,9 +1814,6 @@ func TestLeasingTxnOwnerPutBranch(t *testing.T) {
// lkv shouldn't need to call out to server for updated leased keys
clus.Members[0].Stop(t)

// TODO: Remove wait once the new grpc load balancer provides retry.
integration.WaitClientV3(t, clus.Client(1))

for i := 0; i < n; i++ {
k := fmt.Sprintf("tree/%d", i)
lkvResp, err := lkv.Get(context.TODO(), k)
Expand Down Expand Up @@ -1914,7 +1905,6 @@ func TestLeasingSessionExpire(t *testing.T) {
}
waitForExpireAck(t, lkv)
clus.Members[0].Restart(t)
integration.WaitClientV3(t, lkv2)
if _, err = lkv2.Put(context.TODO(), "abc", "def"); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1994,7 +1984,7 @@ func TestLeasingSessionExpireCancel(t *testing.T) {

select {
case err := <-errc:
if !(err == ctx.Err() || isServerUnavailable(err)) {
if err != ctx.Err() {
t.Errorf("#%d: expected %v of server unavailable, got %v", i, ctx.Err(), err)
}
case <-time.After(5 * time.Second):
Expand Down Expand Up @@ -2025,7 +2015,7 @@ func waitForExpireAck(t *testing.T, kv clientv3.KV) {
ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
_, err := kv.Get(ctx, "abc")
cancel()
if err == ctx.Err() || isServerUnavailable(err) {
if err == ctx.Err() {
return
} else if err != nil {
t.Logf("current error: %v", err)
Expand Down
1 change: 0 additions & 1 deletion clientv3/integration/maintenance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ func TestMaintenanceSnapshotErrorInflight(t *testing.T) {
clus.Members[0].Restart(t)

cli := clus.RandClient()
integration.WaitClientV3(t, cli)
// reading snapshot with canceled context should error out
ctx, cancel := context.WithCancel(context.Background())
rc1, err := cli.Snapshot(ctx)
Expand Down
3 changes: 0 additions & 3 deletions clientv3/integration/network_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,6 @@ func TestBalancerUnderNetworkPartitionLinearizableGetLeaderElection(t *testing.T
// isolate leader
clus.Members[lead].InjectPartition(t, clus.Members[(lead+1)%3], clus.Members[(lead+2)%3])

// TODO: Remove wait once the new grpc load balancer provides retry.
integration.WaitClientV3(t, cli)

// expects balancer to round robin to leader within two attempts
for i := 0; i < 2; i++ {
ctx, cancel := context.WithTimeout(context.TODO(), timeout)
Expand Down
19 changes: 1 addition & 18 deletions clientv3/integration/server_shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package integration
import (
"bytes"
"context"
"reflect"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -352,11 +351,7 @@ func testBalancerUnderServerStopInflightRangeOnRestart(t *testing.T, linearizabl
}
cancel()
if err != nil {
if linearizable && isServerUnavailable(err) {
t.Logf("TODO: FIX THIS after balancer rewrite! %v %v", reflect.TypeOf(err), err)
} else {
t.Fatalf("expected linearizable=true and a server unavailable error, but got linearizable=%t and '%v'", linearizable, err)
}
t.Fatalf("unexpected error: %v", err)
}
}()

Expand Down Expand Up @@ -402,18 +397,6 @@ func isClientTimeout(err error) bool {
return code == codes.DeadlineExceeded || ev.Message() == transport.ErrConnClosing.Desc
}

func isServerUnavailable(err error) bool {
if err == nil {
return false
}
ev, ok := status.FromError(err)
if !ok {
return false
}
code := ev.Code()
return code == codes.Unavailable
}

func isCanceled(err error) bool {
if err == nil {
return false
Expand Down
3 changes: 0 additions & 3 deletions clientv3/integration/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,6 @@ func TestTxnWriteFail(t *testing.T) {
t.Fatalf("timed out waiting for txn fail")
case <-txnc:
}
// TODO: Remove wait once the new grpc load balancer provides retry.
integration.WaitClientV3(t, kv)

// and ensure the put didn't take
gresp, gerr := clus.Client(1).Get(context.TODO(), "foo")
if gerr != nil {
Expand Down
5 changes: 3 additions & 2 deletions clientv3/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"

"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
Expand Down Expand Up @@ -395,7 +396,7 @@ func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive
cctx, cancel := context.WithCancel(ctx)
defer cancel()

stream, err := l.remote.LeaseKeepAlive(cctx, l.callOpts...)
stream, err := l.remote.LeaseKeepAlive(cctx, append(l.callOpts, grpc_retry.Disable())...)
if err != nil {
return nil, toErr(ctx, err)
}
Expand Down Expand Up @@ -466,7 +467,7 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) {
// resetRecv opens a new lease stream and starts sending keep alive requests.
func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
sctx, cancel := context.WithCancel(l.stopCtx)
stream, err := l.remote.LeaseKeepAlive(sctx, l.callOpts...)
stream, err := l.remote.LeaseKeepAlive(sctx, append(l.callOpts, grpc_retry.Disable())...)
if err != nil {
cancel()
return nil, err
Expand Down
Loading

0 comments on commit 9c962cb

Please sign in to comment.