Skip to content

Commit

Permalink
backports form etcd-io#9860
Browse files Browse the repository at this point in the history
Signed-off-by: Sam Batschelet <sbatsche@redhat.com>
  • Loading branch information
hexfusion committed Aug 5, 2019
1 parent 0840ffa commit d8c6b30
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 12 deletions.
13 changes: 7 additions & 6 deletions clientv3/integration/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,7 @@ func TestKVGetRetry(t *testing.T) {

time.Sleep(100 * time.Millisecond)
clus.Members[fIdx].Restart(t)
clus.Members[fIdx].WaitOK(t)

select {
case <-time.After(5 * time.Second):
Expand Down Expand Up @@ -792,7 +793,7 @@ func TestKVGetStoppedServerAndClose(t *testing.T) {
// this Get fails and triggers an asynchronous connection retry
_, err := cli.Get(ctx, "abc")
cancel()
if err != nil && err != context.DeadlineExceeded {
if err != nil && !isServerUnavailable(err) {
t.Fatal(err)
}
}
Expand All @@ -814,14 +815,14 @@ func TestKVPutStoppedServerAndClose(t *testing.T) {
// grpc finds out the original connection is down due to the member shutdown.
_, err := cli.Get(ctx, "abc")
cancel()
if err != nil && err != context.DeadlineExceeded {
if err != nil && !isServerUnavailable(err) {
t.Fatal(err)
}

// this Put fails and triggers an asynchronous connection retry
_, err = cli.Put(ctx, "abc", "123")
cancel()
if err != nil && err != context.DeadlineExceeded {
if err != nil && !isServerUnavailable(err) {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -906,7 +907,7 @@ func TestKVLargeRequests(t *testing.T) {
maxCallSendBytesClient: 10 * 1024 * 1024,
maxCallRecvBytesClient: 0,
valueSize: 10 * 1024 * 1024,
expectError: grpc.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max "),
expectError: grpc.Errorf(codes.ResourceExhausted, "trying to send message larger than max "),
},
{
maxRequestBytesServer: 10 * 1024 * 1024,
Expand All @@ -920,7 +921,7 @@ func TestKVLargeRequests(t *testing.T) {
maxCallSendBytesClient: 10 * 1024 * 1024,
maxCallRecvBytesClient: 0,
valueSize: 10*1024*1024 + 5,
expectError: grpc.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max "),
expectError: grpc.Errorf(codes.ResourceExhausted, "trying to send message larger than max "),
},
}
for i, test := range tests {
Expand All @@ -940,7 +941,7 @@ func TestKVLargeRequests(t *testing.T) {
t.Errorf("#%d: expected %v, got %v", i, test.expectError, err)
}
} else if err != nil && !strings.HasPrefix(err.Error(), test.expectError.Error()) {
t.Errorf("#%d: expected %v, got %v", i, test.expectError, err)
t.Errorf("#%d: expected error starting with '%s', got '%s'", i, test.expectError.Error(), err.Error())
}

// put request went through, now expects large response back
Expand Down
46 changes: 45 additions & 1 deletion clientv3/integration/server_shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestBalancerUnderServerShutdownWatch(t *testing.T) {
if err == nil {
break
}
if err == context.DeadlineExceeded || isServerCtxTimeout(err) || err == rpctypes.ErrTimeout || err == rpctypes.ErrTimeoutDueToLeaderFail {
if isClientTimeout(err) || isServerCtxTimeout(err) || err == rpctypes.ErrTimeout || err == rpctypes.ErrTimeoutDueToLeaderFail {
continue
}
t.Fatal(err)
Expand Down Expand Up @@ -365,3 +365,47 @@ func isServerCtxTimeout(err error) bool {
code := ev.Code()
return code == codes.DeadlineExceeded && strings.Contains(err.Error(), "context deadline exceeded")
}

// In grpc v1.11.3+ dial timeouts can error out with transport.ErrConnClosing. Previously dial timeouts
// would always error out with context.DeadlineExceeded.
func isClientTimeout(err error) bool {
if err == nil {
return false
}
if err == context.DeadlineExceeded {
return true
}
ev, ok := status.FromError(err)
if !ok {
return false
}
code := ev.Code()
return code == codes.DeadlineExceeded
}

func isServerUnavailable(err error) bool {
if err == nil {
return false
}
ev, ok := status.FromError(err)
if !ok {
return false
}
code := ev.Code()
return code == codes.Unavailable
}

func isCanceled(err error) bool {
if err == nil {
return false
}
if err == context.Canceled {
return true
}
ev, ok := status.FromError(err)
if !ok {
return false
}
code := ev.Code()
return code == codes.Canceled
}
9 changes: 4 additions & 5 deletions clientv3/integration/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
mvccpb "github.com/coreos/etcd/mvcc/mvccpb"
"github.com/coreos/etcd/pkg/testutil"

"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

Expand Down Expand Up @@ -667,8 +666,8 @@ func TestWatchErrConnClosed(t *testing.T) {
go func() {
defer close(donec)
ch := cli.Watch(context.TODO(), "foo")
if wr := <-ch; grpc.ErrorDesc(wr.Err()) != grpc.ErrClientConnClosing.Error() {
t.Fatalf("expected %v, got %v", grpc.ErrClientConnClosing, grpc.ErrorDesc(wr.Err()))
if wr := <-ch; !isCanceled(wr.Err()) {
t.Fatalf("expected context canceled, got %v", wr.Err())
}
}()

Expand Down Expand Up @@ -699,8 +698,8 @@ func TestWatchAfterClose(t *testing.T) {
donec := make(chan struct{})
go func() {
cli.Watch(context.TODO(), "foo")
if err := cli.Close(); err != nil && err != grpc.ErrClientConnClosing {
t.Fatalf("expected %v, got %v", grpc.ErrClientConnClosing, err)
if err := cli.Close(); err != nil && err != context.Canceled {
t.Fatalf("expected %v, got %v", context.Canceled, err)
}
close(donec)
}()
Expand Down
1 change: 1 addition & 0 deletions integration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,7 @@ func NewClientV3(m *member) (*clientv3.Client, error) {
cfg := clientv3.Config{
Endpoints: []string{m.grpcAddr},
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
MaxCallSendMsgSize: m.clientMaxCallSendMsgSize,
MaxCallRecvMsgSize: m.clientMaxCallRecvMsgSize,
}
Expand Down

0 comments on commit d8c6b30

Please sign in to comment.