From 9c962cb461742f7eb4da9d072f6a79b343b4c36e Mon Sep 17 00:00:00 2001 From: Joe Betz Date: Fri, 11 May 2018 12:53:20 -0700 Subject: [PATCH] clientv3: Add retry for round robin load balancer using grpc-middleware's retry interceptor --- clientv3/client.go | 46 +++++++++++++--- clientv3/integration/dial_test.go | 7 ++- clientv3/integration/kv_test.go | 17 ++---- clientv3/integration/lease_test.go | 10 ++-- clientv3/integration/leasing_test.go | 14 +---- clientv3/integration/maintenance_test.go | 1 - .../integration/network_partition_test.go | 3 -- clientv3/integration/server_shutdown_test.go | 19 +------ clientv3/integration/txn_test.go | 3 -- clientv3/lease.go | 5 +- clientv3/options.go | 25 ++++++++- clientv3/retry.go | 53 ++++++------------- clientv3/watch.go | 3 +- integration/v3_alarm_test.go | 7 ++- 14 files changed, 106 insertions(+), 107 deletions(-) diff --git a/clientv3/client.go b/clientv3/client.go index 2a4a6d04f953..18481006ea51 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -30,6 +30,8 @@ import ( "github.com/coreos/etcd/clientv3/balancer/picker" "github.com/coreos/etcd/clientv3/balancer/resolver/endpoint" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" + grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" + "github.com/grpc-ecosystem/go-grpc-middleware/util/backoffutils" "go.uber.org/zap" "google.golang.org/grpc" @@ -45,13 +47,15 @@ var ( ErrOldCluster = errors.New("etcdclient: old cluster version") roundRobinBalancerName = fmt.Sprintf("etcd-%s", picker.RoundrobinBalanced.String()) + logger *zap.Logger ) func init() { + logger = zap.NewNop() // zap.NewExample() balancer.RegisterBuilder(balancer.Config{ Policy: picker.RoundrobinBalanced, Name: roundRobinBalancerName, - Logger: zap.NewNop(), // zap.NewExample(), + Logger: logger, }) } @@ -263,6 +267,16 @@ func (c *Client) dialSetupOpts(target string, dopts ...grpc.DialOption) (opts [] opts = append(opts, grpc.WithInsecure()) } + // Interceptor retry and backoff. + // TODO: Replace all of clientv3/retry.go with interceptor based retry, or with + // https://github.com/grpc/proposal/blob/master/A6-client-retries.md#retry-policy + // once it is available. + rrBackoff := grpc_retry.WithBackoff(c.roundRobinQuorumBackoff(defaultBackoffWaitBetween, defaultBackoffJitterFraction)) + opts = append(opts, + grpc.WithStreamInterceptor(grpc_retry.StreamClientInterceptor(grpc_retry.WithMax(defaultStreamMaxRetries), rrBackoff)), + grpc.WithUnaryInterceptor(grpc_retry.UnaryClientInterceptor(grpc_retry.WithMax(defaultUnaryMaxRetries), rrBackoff)), + ) + return opts, nil } @@ -386,14 +400,14 @@ func newClient(cfg *Config) (*Client, error) { ctx, cancel := context.WithCancel(baseCtx) client := &Client{ - conn: nil, - cfg: *cfg, - creds: creds, - ctx: ctx, - cancel: cancel, - mu: new(sync.Mutex), - callOpts: defaultCallOpts, + conn: nil, + cfg: *cfg, + creds: creds, + ctx: ctx, + cancel: cancel, + mu: new(sync.Mutex), } + if cfg.Username != "" && cfg.Password != "" { client.Username = cfg.Username client.Password = cfg.Password @@ -461,6 +475,22 @@ func newClient(cfg *Config) (*Client, error) { return client, nil } +// roundRobinQuorumBackoff retries against quorum between each backoff. +// This is intended for use with a round robin load balancer. +func (c *Client) roundRobinQuorumBackoff(waitBetween time.Duration, jitterFraction float64) grpc_retry.BackoffFunc { + return func(attempt uint) time.Duration { + // after each round robin across quorum, backoff for our wait between duration + n := uint(len(c.Endpoints())) + quorum := (n/2 + 1) + if attempt%quorum == 0 { + logger.Info("backoff", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum), zap.Duration("waitBetween", waitBetween), zap.Float64("jitterFraction", jitterFraction)) + return backoffutils.JitterUp(waitBetween, jitterFraction) + } + logger.Info("backoff skipped", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum)) + return 0 + } +} + func (c *Client) checkVersion() (err error) { var wg sync.WaitGroup errc := make(chan error, len(c.cfg.Endpoints)) diff --git a/clientv3/integration/dial_test.go b/clientv3/integration/dial_test.go index 587bcd565770..574f224492f5 100644 --- a/clientv3/integration/dial_test.go +++ b/clientv3/integration/dial_test.go @@ -83,7 +83,9 @@ func TestDialTLSNoConfig(t *testing.T) { // TODO: this should not be required when we set grpc.WithBlock() if c != nil { - _, err = c.KV.Get(context.Background(), "/") + ctx, cancel := context.WithTimeout(context.Background(), integration.RequestWaitTimeout) + _, err = c.KV.Get(ctx, "/") + cancel() } if !isClientTimeout(err) { t.Fatalf("expected dial timeout error, got %v", err) @@ -157,9 +159,6 @@ func TestSwitchSetEndpoints(t *testing.T) { cli.SetEndpoints(eps...) - // TODO: Remove wait once the new grpc load balancer provides retry. - integration.WaitClientV3(t, cli) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() if _, err := cli.Get(ctx, "foo"); err != nil { diff --git a/clientv3/integration/kv_test.go b/clientv3/integration/kv_test.go index 994effa6fd60..7fe7fec8a07c 100644 --- a/clientv3/integration/kv_test.go +++ b/clientv3/integration/kv_test.go @@ -438,15 +438,12 @@ func TestKVGetErrConnClosed(t *testing.T) { cli := clus.Client(0) - // TODO: Remove wait once the new grpc load balancer provides retry. - integration.WaitClientV3(t, cli) - donec := make(chan struct{}) go func() { defer close(donec) _, err := cli.Get(context.TODO(), "foo") - if err != nil && err != context.Canceled && err != grpc.ErrClientConnClosing && !isServerUnavailable(err) { - t.Fatalf("expected %v, %v or server unavailable, got %v", context.Canceled, grpc.ErrClientConnClosing, err) + if err != nil && err != context.Canceled && err != grpc.ErrClientConnClosing { + t.Fatalf("expected %v or %v, got %v", context.Canceled, grpc.ErrClientConnClosing, err) } }() @@ -689,8 +686,6 @@ func TestKVGetRetry(t *testing.T) { donec := make(chan struct{}) go func() { - // TODO: Remove wait once the new grpc load balancer provides retry. - integration.WaitClientV3(t, kv) // Get will fail, but reconnect will trigger gresp, gerr := kv.Get(ctx, "foo") if gerr != nil { @@ -741,8 +736,6 @@ func TestKVPutFailGetRetry(t *testing.T) { donec := make(chan struct{}) go func() { - // TODO: Remove wait once the new grpc load balancer provides retry. - integration.WaitClientV3(t, kv) // Get will fail, but reconnect will trigger gresp, gerr := kv.Get(context.TODO(), "foo") if gerr != nil { @@ -800,7 +793,7 @@ func TestKVGetStoppedServerAndClose(t *testing.T) { // this Get fails and triggers an asynchronous connection retry _, err := cli.Get(ctx, "abc") cancel() - if err != nil && !(isServerUnavailable(err) || isCanceled(err) || isClientTimeout(err)) { + if err != nil && !(isCanceled(err) || isClientTimeout(err)) { t.Fatal(err) } } @@ -822,7 +815,7 @@ func TestKVPutStoppedServerAndClose(t *testing.T) { // grpc finds out the original connection is down due to the member shutdown. _, err := cli.Get(ctx, "abc") cancel() - if err != nil && !(isServerUnavailable(err) || isCanceled(err) || isClientTimeout(err)) { + if err != nil && !(isCanceled(err) || isClientTimeout(err)) { t.Fatal(err) } @@ -830,7 +823,7 @@ func TestKVPutStoppedServerAndClose(t *testing.T) { // this Put fails and triggers an asynchronous connection retry _, err = cli.Put(ctx, "abc", "123") cancel() - if err != nil && !(isServerUnavailable(err) || isCanceled(err) || isClientTimeout(err)) { + if err != nil && !(isCanceled(err) || isClientTimeout(err)) { t.Fatal(err) } } diff --git a/clientv3/integration/lease_test.go b/clientv3/integration/lease_test.go index 4329f7ac91c0..75a0987c52e5 100644 --- a/clientv3/integration/lease_test.go +++ b/clientv3/integration/lease_test.go @@ -145,6 +145,10 @@ func TestLeaseKeepAlive(t *testing.T) { t.Errorf("chan is closed, want not closed") } + if kresp == nil { + t.Fatalf("unexpected null response") + } + if kresp.ID != resp.ID { t.Errorf("ID = %x, want %x", kresp.ID, resp.ID) } @@ -292,7 +296,7 @@ func TestLeaseGrantErrConnClosed(t *testing.T) { go func() { defer close(donec) _, err := cli.Grant(context.TODO(), 5) - if err != nil && err != grpc.ErrClientConnClosing && err != context.Canceled && !isServerUnavailable(err) { + if err != nil && err != grpc.ErrClientConnClosing && err != context.Canceled { // grpc.ErrClientConnClosing if grpc-go balancer calls 'Get' after client.Close. // context.Canceled if grpc-go balancer calls 'Get' with an inflight client.Close. t.Fatalf("expected %v, %v or server unavailable, got %v", err != context.Canceled, grpc.ErrClientConnClosing, err) @@ -324,7 +328,7 @@ func TestLeaseGrantNewAfterClose(t *testing.T) { donec := make(chan struct{}) go func() { - if _, err := cli.Grant(context.TODO(), 5); err != context.Canceled && err != grpc.ErrClientConnClosing && !isServerUnavailable(err) { + if _, err := cli.Grant(context.TODO(), 5); err != context.Canceled && err != grpc.ErrClientConnClosing { t.Fatalf("expected %v, %v or server unavailable, got %v", err != context.Canceled, grpc.ErrClientConnClosing, err) } close(donec) @@ -356,7 +360,7 @@ func TestLeaseRevokeNewAfterClose(t *testing.T) { donec := make(chan struct{}) go func() { - if _, err := cli.Revoke(context.TODO(), leaseID); err != context.Canceled && err != grpc.ErrClientConnClosing && !isServerUnavailable(err) { + if _, err := cli.Revoke(context.TODO(), leaseID); err != context.Canceled && err != grpc.ErrClientConnClosing { t.Fatalf("expected %v, %v or server unavailable, got %v", err != context.Canceled, grpc.ErrClientConnClosing, err) } close(donec) diff --git a/clientv3/integration/leasing_test.go b/clientv3/integration/leasing_test.go index ff82ae4b3e94..795cbb45ba62 100644 --- a/clientv3/integration/leasing_test.go +++ b/clientv3/integration/leasing_test.go @@ -869,9 +869,6 @@ func TestLeasingTxnCancel(t *testing.T) { } clus.Members[0].Stop(t) - // TODO: Remove wait once the new grpc load balancer provides retry. - integration.WaitClientV3(t, clus.Client(1)) - // wait for leader election, if any if _, err = clus.Client(1).Get(context.TODO(), "abc"); err != nil { t.Fatal(err) @@ -1536,9 +1533,6 @@ func TestLeasingReconnectOwnerConsistency(t *testing.T) { } } - // TODO: Remove wait once the new grpc load balancer provides retry. - integration.WaitClientV3(t, lkv) - lresp, lerr := lkv.Get(context.TODO(), "k") if lerr != nil { t.Fatal(lerr) @@ -1820,9 +1814,6 @@ func TestLeasingTxnOwnerPutBranch(t *testing.T) { // lkv shouldn't need to call out to server for updated leased keys clus.Members[0].Stop(t) - // TODO: Remove wait once the new grpc load balancer provides retry. - integration.WaitClientV3(t, clus.Client(1)) - for i := 0; i < n; i++ { k := fmt.Sprintf("tree/%d", i) lkvResp, err := lkv.Get(context.TODO(), k) @@ -1914,7 +1905,6 @@ func TestLeasingSessionExpire(t *testing.T) { } waitForExpireAck(t, lkv) clus.Members[0].Restart(t) - integration.WaitClientV3(t, lkv2) if _, err = lkv2.Put(context.TODO(), "abc", "def"); err != nil { t.Fatal(err) } @@ -1994,7 +1984,7 @@ func TestLeasingSessionExpireCancel(t *testing.T) { select { case err := <-errc: - if !(err == ctx.Err() || isServerUnavailable(err)) { + if err != ctx.Err() { t.Errorf("#%d: expected %v of server unavailable, got %v", i, ctx.Err(), err) } case <-time.After(5 * time.Second): @@ -2025,7 +2015,7 @@ func waitForExpireAck(t *testing.T, kv clientv3.KV) { ctx, cancel := context.WithTimeout(context.TODO(), time.Second) _, err := kv.Get(ctx, "abc") cancel() - if err == ctx.Err() || isServerUnavailable(err) { + if err == ctx.Err() { return } else if err != nil { t.Logf("current error: %v", err) diff --git a/clientv3/integration/maintenance_test.go b/clientv3/integration/maintenance_test.go index 0609c1c812bd..b4b73614a36c 100644 --- a/clientv3/integration/maintenance_test.go +++ b/clientv3/integration/maintenance_test.go @@ -157,7 +157,6 @@ func TestMaintenanceSnapshotErrorInflight(t *testing.T) { clus.Members[0].Restart(t) cli := clus.RandClient() - integration.WaitClientV3(t, cli) // reading snapshot with canceled context should error out ctx, cancel := context.WithCancel(context.Background()) rc1, err := cli.Snapshot(ctx) diff --git a/clientv3/integration/network_partition_test.go b/clientv3/integration/network_partition_test.go index 41310f75400d..f65807430eff 100644 --- a/clientv3/integration/network_partition_test.go +++ b/clientv3/integration/network_partition_test.go @@ -186,9 +186,6 @@ func TestBalancerUnderNetworkPartitionLinearizableGetLeaderElection(t *testing.T // isolate leader clus.Members[lead].InjectPartition(t, clus.Members[(lead+1)%3], clus.Members[(lead+2)%3]) - // TODO: Remove wait once the new grpc load balancer provides retry. - integration.WaitClientV3(t, cli) - // expects balancer to round robin to leader within two attempts for i := 0; i < 2; i++ { ctx, cancel := context.WithTimeout(context.TODO(), timeout) diff --git a/clientv3/integration/server_shutdown_test.go b/clientv3/integration/server_shutdown_test.go index d1e5507cc5c8..ce64f14e3290 100644 --- a/clientv3/integration/server_shutdown_test.go +++ b/clientv3/integration/server_shutdown_test.go @@ -17,7 +17,6 @@ package integration import ( "bytes" "context" - "reflect" "strings" "testing" "time" @@ -352,11 +351,7 @@ func testBalancerUnderServerStopInflightRangeOnRestart(t *testing.T, linearizabl } cancel() if err != nil { - if linearizable && isServerUnavailable(err) { - t.Logf("TODO: FIX THIS after balancer rewrite! %v %v", reflect.TypeOf(err), err) - } else { - t.Fatalf("expected linearizable=true and a server unavailable error, but got linearizable=%t and '%v'", linearizable, err) - } + t.Fatalf("unexpected error: %v", err) } }() @@ -402,18 +397,6 @@ func isClientTimeout(err error) bool { return code == codes.DeadlineExceeded || ev.Message() == transport.ErrConnClosing.Desc } -func isServerUnavailable(err error) bool { - if err == nil { - return false - } - ev, ok := status.FromError(err) - if !ok { - return false - } - code := ev.Code() - return code == codes.Unavailable -} - func isCanceled(err error) bool { if err == nil { return false diff --git a/clientv3/integration/txn_test.go b/clientv3/integration/txn_test.go index 672bf79032a3..ca49ec075d84 100644 --- a/clientv3/integration/txn_test.go +++ b/clientv3/integration/txn_test.go @@ -79,9 +79,6 @@ func TestTxnWriteFail(t *testing.T) { t.Fatalf("timed out waiting for txn fail") case <-txnc: } - // TODO: Remove wait once the new grpc load balancer provides retry. - integration.WaitClientV3(t, kv) - // and ensure the put didn't take gresp, gerr := clus.Client(1).Get(context.TODO(), "foo") if gerr != nil { diff --git a/clientv3/lease.go b/clientv3/lease.go index 4097b3afa2ad..3467d7522f7d 100644 --- a/clientv3/lease.go +++ b/clientv3/lease.go @@ -21,6 +21,7 @@ import ( "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" "google.golang.org/grpc" "google.golang.org/grpc/metadata" @@ -395,7 +396,7 @@ func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive cctx, cancel := context.WithCancel(ctx) defer cancel() - stream, err := l.remote.LeaseKeepAlive(cctx, l.callOpts...) + stream, err := l.remote.LeaseKeepAlive(cctx, append(l.callOpts, grpc_retry.Disable())...) if err != nil { return nil, toErr(ctx, err) } @@ -466,7 +467,7 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) { // resetRecv opens a new lease stream and starts sending keep alive requests. func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) { sctx, cancel := context.WithCancel(l.stopCtx) - stream, err := l.remote.LeaseKeepAlive(sctx, l.callOpts...) + stream, err := l.remote.LeaseKeepAlive(sctx, append(l.callOpts, grpc_retry.Disable())...) if err != nil { cancel() return nil, err diff --git a/clientv3/options.go b/clientv3/options.go index a98b19f9b4b1..d1b86b786b58 100644 --- a/clientv3/options.go +++ b/clientv3/options.go @@ -16,8 +16,11 @@ package clientv3 import ( "math" + "time" + grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" "google.golang.org/grpc" + "google.golang.org/grpc/codes" ) var ( @@ -37,12 +40,32 @@ var ( // because range response can easily exceed request send limits // Default to math.MaxInt32; writes exceeding server-side send limit fails anyway defaultMaxCallRecvMsgSize = grpc.MaxCallRecvMsgSize(math.MaxInt32) + + // client-side non-streaming retry limit, only applied to requests where server responds with + // a error code clearly indicating it was unable to process the request such as codes.Unavailable. + // If set to 0, retry is disabled. + defaultUnaryMaxRetries uint = 100 + + // client-side streaming retry limit, only applied to requests where server responds with + // a error code clearly indicating it was unable to process the request such as codes.Unavailable. + // If set to 0, retry is disabled. + defaultStreamMaxRetries uint = ^uint(0) // max uint + + // client-side retry backoff wait between requests. + defaultBackoffWaitBetween = 25 * time.Millisecond + + // client-side retry backoff default jitter fraction. + defaultBackoffJitterFraction = 0.10 + + // client-side retry codes. + // WARNING: It is unsafe to add response codes for non-idempotent operations to this list. + defaultRetryCodes = grpc_retry.WithCodes(codes.Unavailable) ) // defaultCallOpts defines a list of default "gRPC.CallOption". // Some options are exposed to "clientv3.Config". // Defaults will be overridden by the settings in "clientv3.Config". -var defaultCallOpts = []grpc.CallOption{defaultFailFast, defaultMaxCallSendMsgSize, defaultMaxCallRecvMsgSize} +var defaultCallOpts = []grpc.CallOption{defaultFailFast, defaultMaxCallSendMsgSize, defaultMaxCallRecvMsgSize, defaultRetryCodes} // MaxLeaseTTL is the maximum lease TTL value const MaxLeaseTTL = 9000000000 diff --git a/clientv3/retry.go b/clientv3/retry.go index 6c7fcfcf6b64..06752ea85d4b 100644 --- a/clientv3/retry.go +++ b/clientv3/retry.go @@ -78,8 +78,6 @@ func isNonRepeatableStopError(err error) bool { return desc != "there is no address available" && desc != "there is no connection available" } -// TODO: Remove retry logic entirely now that we're using the new grpc load balancer interface? -/* func (c *Client) newRetryWrapper() retryRPCFunc { return func(rpcCtx context.Context, f rpcFunc, rp retryPolicy) error { var isStop retryStopErrFunc @@ -90,21 +88,14 @@ func (c *Client) newRetryWrapper() retryRPCFunc { isStop = isNonRepeatableStopError } for { - if err := readyWait(rpcCtx, c.ctx, c.balancer.ConnectNotify()); err != nil { - return err - } - pinned := c.balancer.Pinned() err := f(rpcCtx) if err == nil { return nil } - lg.Lvl(4).Infof("clientv3/retry: error %q on pinned endpoint %q", err.Error(), pinned) + lg.Lvl(4).Infof("clientv3/retry: error %q", err.Error()) if s, ok := status.FromError(err); ok && (s.Code() == codes.Unavailable || s.Code() == codes.DeadlineExceeded || s.Code() == codes.Internal) { - // mark this before endpoint switch is triggered - c.balancer.HostPortError(pinned, err) - c.balancer.Next() - lg.Lvl(4).Infof("clientv3/retry: switching from %q due to error %q", pinned, err.Error()) + lg.Lvl(4).Infof("clientv3/retry: retrying due to error %q", err.Error()) } if isStop(err) { @@ -112,23 +103,21 @@ func (c *Client) newRetryWrapper() retryRPCFunc { } } } -}*/ +} -/* func (c *Client) newAuthRetryWrapper(retryf retryRPCFunc) retryRPCFunc { return func(rpcCtx context.Context, f rpcFunc, rp retryPolicy) error { for { - pinned := c.balancer.Pinned() err := retryf(rpcCtx, f, rp) if err == nil { return nil } - lg.Lvl(4).Infof("clientv3/auth-retry: error %q on pinned endpoint %q", err.Error(), pinned) + lg.Lvl(4).Infof("clientv3/auth-retry: error %q", err.Error()) // always stop retry on etcd errors other than invalid auth token if rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken { gterr := c.getToken(rpcCtx) if gterr != nil { - lg.Lvl(4).Infof("clientv3/auth-retry: cannot retry due to error %q(%q) on pinned endpoint %q", err.Error(), gterr.Error(), pinned) + lg.Lvl(4).Infof("clientv3/auth-retry: cannot retry due to error %q(%q)", err.Error(), gterr.Error()) return err // return the original error for simplicity } continue @@ -136,7 +125,7 @@ func (c *Client) newAuthRetryWrapper(retryf retryRPCFunc) retryRPCFunc { return err } } -}*/ +} type retryKVClient struct { kc pb.KVClient @@ -145,12 +134,10 @@ type retryKVClient struct { // RetryKVClient implements a KVClient. func RetryKVClient(c *Client) pb.KVClient { - return pb.NewKVClient(c.conn) - // TODO: Remove retry logic entirely now that we're using the new grpc load balancer interface? - /*return &retryKVClient{ + return &retryKVClient{ kc: pb.NewKVClient(c.conn), retryf: c.newAuthRetryWrapper(c.newRetryWrapper()), - }*/ + } } func (rkv *retryKVClient) Range(ctx context.Context, in *pb.RangeRequest, opts ...grpc.CallOption) (resp *pb.RangeResponse, err error) { err = rkv.retryf(ctx, func(rctx context.Context) error { @@ -200,12 +187,10 @@ type retryLeaseClient struct { // RetryLeaseClient implements a LeaseClient. func RetryLeaseClient(c *Client) pb.LeaseClient { - return pb.NewLeaseClient(c.conn) - // TODO: Remove retry logic entirely now that we're using the new grpc load balancer interface? - /*return &retryLeaseClient{ + return &retryLeaseClient{ lc: pb.NewLeaseClient(c.conn), retryf: c.newAuthRetryWrapper(c.newRetryWrapper()), - }*/ + } } func (rlc *retryLeaseClient) LeaseTimeToLive(ctx context.Context, in *pb.LeaseTimeToLiveRequest, opts ...grpc.CallOption) (resp *pb.LeaseTimeToLiveResponse, err error) { @@ -256,12 +241,10 @@ type retryClusterClient struct { // RetryClusterClient implements a ClusterClient. func RetryClusterClient(c *Client) pb.ClusterClient { - return pb.NewClusterClient(c.conn) - // TODO: Remove retry logic entirely now that we're using the new grpc load balancer interface? - /*return &retryClusterClient{ + return &retryClusterClient{ cc: pb.NewClusterClient(c.conn), retryf: c.newRetryWrapper(), - }*/ + } } func (rcc *retryClusterClient) MemberList(ctx context.Context, in *pb.MemberListRequest, opts ...grpc.CallOption) (resp *pb.MemberListResponse, err error) { @@ -303,12 +286,10 @@ type retryMaintenanceClient struct { // RetryMaintenanceClient implements a Maintenance. func RetryMaintenanceClient(c *Client, conn *grpc.ClientConn) pb.MaintenanceClient { - return pb.NewMaintenanceClient(conn) - // TODO: Remove retry logic entirely now that we're using the new grpc load balancer interface? - /*return &retryMaintenanceClient{ + return &retryMaintenanceClient{ mc: pb.NewMaintenanceClient(conn), retryf: c.newRetryWrapper(), - }*/ + } } func (rmc *retryMaintenanceClient) Alarm(ctx context.Context, in *pb.AlarmRequest, opts ...grpc.CallOption) (resp *pb.AlarmResponse, err error) { @@ -374,12 +355,10 @@ type retryAuthClient struct { // RetryAuthClient implements a AuthClient. func RetryAuthClient(c *Client) pb.AuthClient { - return pb.NewAuthClient(c.conn) - // TODO: Remove retry logic entirely now that we're using the new grpc load balancer interface? - /*return &retryAuthClient{ + return &retryAuthClient{ ac: pb.NewAuthClient(c.conn), retryf: c.newRetryWrapper(), - }*/ + } } func (rac *retryAuthClient) UserList(ctx context.Context, in *pb.AuthUserListRequest, opts ...grpc.CallOption) (resp *pb.AuthUserListResponse, err error) { diff --git a/clientv3/watch.go b/clientv3/watch.go index 312845cbedb8..19f60f478edd 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -23,6 +23,7 @@ import ( v3rpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" mvccpb "github.com/coreos/etcd/mvcc/mvccpb" + grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -801,7 +802,7 @@ func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) return nil, err default: } - if ws, err = w.remote.Watch(w.ctx, w.callOpts...); ws != nil && err == nil { + if ws, err = w.remote.Watch(w.ctx, append(w.callOpts, grpc_retry.Disable())...); ws != nil && err == nil { break } if isHaltErr(w.ctx, err) { diff --git a/integration/v3_alarm_test.go b/integration/v3_alarm_test.go index 0486ead80752..e66cf0bab163 100644 --- a/integration/v3_alarm_test.go +++ b/integration/v3_alarm_test.go @@ -88,13 +88,16 @@ func TestV3StorageQuotaApply(t *testing.T) { } } + ctx, close := context.WithTimeout(context.TODO(), RequestWaitTimeout) + defer close() + // small quota machine should reject put - if _, err := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil { + if _, err := kvc0.Put(ctx, &pb.PutRequest{Key: key, Value: smallbuf}); err == nil { t.Fatalf("past-quota instance should reject put") } // large quota machine should reject put - if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil { + if _, err := kvc1.Put(ctx, &pb.PutRequest{Key: key, Value: smallbuf}); err == nil { t.Fatalf("past-quota instance should reject put") }