Skip to content

Commit

Permalink
Implement transparent retries for gRFC A6 (#1597)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed Nov 6, 2017
1 parent 551f295 commit 8ff8683
Show file tree
Hide file tree
Showing 11 changed files with 256 additions and 267 deletions.
122 changes: 68 additions & 54 deletions call.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
"google.golang.org/grpc/transport"
)

Expand Down Expand Up @@ -208,94 +207,100 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
Last: true,
Delay: false,
}
callHdr := &transport.CallHdr{
Host: cc.authority,
Method: method,
}
if cc.dopts.cp != nil {
callHdr.SendCompress = cc.dopts.cp.Type()
}
if c.creds != nil {
callHdr.Creds = c.creds
}
if c.compressorType != "" {
callHdr.SendCompress = c.compressorType
} else if cc.dopts.cp != nil {
callHdr.SendCompress = cc.dopts.cp.Type()
}
firstAttempt := true

for {
var (
err error
t transport.ClientTransport
stream *transport.Stream
// Record the done handler from Balancer.Get(...). It is called once the
// RPC has completed or failed.
done func(balancer.DoneInfo)
)
// TODO(zhaoq): Need a formal spec of fail-fast.
callHdr := &transport.CallHdr{
Host: cc.authority,
Method: method,
}
if c.compressorType != "" {
callHdr.SendCompress = c.compressorType
} else if cc.dopts.cp != nil {
callHdr.SendCompress = cc.dopts.cp.Type()
}
if c.creds != nil {
callHdr.Creds = c.creds
// Check to make sure the context has expired. This will prevent us from
// looping forever if an error occurs for wait-for-ready RPCs where no data
// is sent on the wire.
select {
case <-ctx.Done():
return toRPCErr(ctx.Err())
default:
}

t, done, err = cc.getTransport(ctx, c.failFast)
// Record the done handler from Balancer.Get(...). It is called once the
// RPC has completed or failed.
t, done, err := cc.getTransport(ctx, c.failFast)
if err != nil {
// TODO(zhaoq): Probably revisit the error handling.
if _, ok := status.FromError(err); ok {
return err
}
if err == errConnClosing || err == errConnUnavailable {
if c.failFast {
return Errorf(codes.Unavailable, "%v", err)
}
continue
}
// All the other errors are treated as Internal errors.
return Errorf(codes.Internal, "%v", err)
return err
}
if c.traceInfo.tr != nil {
c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true)
}
stream, err = t.NewStream(ctx, callHdr)
stream, err := t.NewStream(ctx, callHdr)
if err != nil {
if done != nil {
if _, ok := err.(transport.ConnectionError); ok {
// If error is connection error, transport was sending data on wire,
// and we are not sure if anything has been sent on wire.
// If error is not connection error, we are sure nothing has been sent.
updateRPCInfoInContext(ctx, rpcInfo{bytesSent: true, bytesReceived: false})
}
done(balancer.DoneInfo{Err: err})
}
if _, ok := err.(transport.ConnectionError); (ok || err == transport.ErrStreamDrain) && !c.failFast {
// In the event of any error from NewStream, we never attempted to write
// anything to the wire, so we can retry indefinitely for non-fail-fast
// RPCs.
if !c.failFast {
continue
}
return toRPCErr(err)
}
if peer, ok := peer.FromContext(stream.Context()); ok {
c.peer = peer
}
if c.traceInfo.tr != nil {
c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true)
}
err = sendRequest(ctx, cc.dopts, cc.dopts.cp, c, callHdr, stream, t, args, topts)
if err != nil {
if done != nil {
updateRPCInfoInContext(ctx, rpcInfo{
bytesSent: stream.BytesSent(),
bytesSent: true,
bytesReceived: stream.BytesReceived(),
})
done(balancer.DoneInfo{Err: err})
}
// Retry a non-failfast RPC when
// i) there is a connection error; or
// ii) the server started to drain before this RPC was initiated.
if _, ok := err.(transport.ConnectionError); (ok || err == transport.ErrStreamDrain) && !c.failFast {
continue
// i) the server started to drain before this RPC was initiated.
// ii) the server refused the stream.
if !c.failFast && stream.Unprocessed() {
// In this case, the server did not receive the data, but we still
// created wire traffic, so we should not retry indefinitely.
if firstAttempt {
// TODO: Add a field to header for grpc-transparent-retry-attempts
firstAttempt = false
continue
}
// Otherwise, give up and return an error anyway.
}
return toRPCErr(err)
}
err = recvResponse(ctx, cc.dopts, t, c, stream, reply)
if err != nil {
if done != nil {
updateRPCInfoInContext(ctx, rpcInfo{
bytesSent: stream.BytesSent(),
bytesSent: true,
bytesReceived: stream.BytesReceived(),
})
done(balancer.DoneInfo{Err: err})
}
if _, ok := err.(transport.ConnectionError); (ok || err == transport.ErrStreamDrain) && !c.failFast {
continue
if !c.failFast && stream.Unprocessed() {
// In these cases, the server did not receive the data, but we still
// created wire traffic, so we should not retry indefinitely.
if firstAttempt {
// TODO: Add a field to header for grpc-transparent-retry-attempts
firstAttempt = false
continue
}
// Otherwise, give up and return an error anyway.
}
return toRPCErr(err)
}
Expand All @@ -305,11 +310,20 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
t.CloseStream(stream, nil)
if done != nil {
updateRPCInfoInContext(ctx, rpcInfo{
bytesSent: stream.BytesSent(),
bytesSent: true,
bytesReceived: stream.BytesReceived(),
})
done(balancer.DoneInfo{Err: err})
}
if !c.failFast && stream.Unprocessed() {
// In these cases, the server did not receive the data, but we still
// created wire traffic, so we should not retry indefinitely.
if firstAttempt {
// TODO: Add a field to header for grpc-transparent-retry-attempts
firstAttempt = false
continue
}
}
return stream.Status().Err()
}
}
25 changes: 14 additions & 11 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,20 @@ var (
// underlying connections within the specified timeout.
// DEPRECATED: Please use context.DeadlineExceeded instead.
ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
// errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
errConnDrain = errors.New("grpc: the connection is drained")
// errConnClosing indicates that the connection is closing.
errConnClosing = errors.New("grpc: the connection is closing")
// errConnUnavailable indicates that the connection is unavailable.
errConnUnavailable = errors.New("grpc: the connection is unavailable")
// errBalancerClosed indicates that the balancer is closed.
errBalancerClosed = errors.New("grpc: balancer is closed")
// minimum time to give a connection to complete
minConnectTimeout = 20 * time.Second
)

// The following errors are returned from Dial and DialContext
var (
// errNoTransportSecurity indicates that there is no transport security
// being set for ClientConn. Users should either set one or explicitly
// call WithInsecure DialOption to disable security.
Expand All @@ -65,16 +78,6 @@ var (
errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
// errNetworkIO indicates that the connection is down due to some network I/O error.
errNetworkIO = errors.New("grpc: failed with network I/O error")
// errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
errConnDrain = errors.New("grpc: the connection is drained")
// errConnClosing indicates that the connection is closing.
errConnClosing = errors.New("grpc: the connection is closing")
// errConnUnavailable indicates that the connection is unavailable.
errConnUnavailable = errors.New("grpc: the connection is unavailable")
// errBalancerClosed indicates that the balancer is closed.
errBalancerClosed = errors.New("grpc: balancer is closed")
// minimum time to give a connection to complete
minConnectTimeout = 20 * time.Second
)

// dialOptions configure a Dial call. dialOptions are set by the DialOption
Expand Down Expand Up @@ -879,7 +882,7 @@ type addrConn struct {
// receiving a GoAway.
func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
switch r {
case transport.TooManyPings:
case transport.GoAwayTooManyPings:
v := 2 * ac.dopts.copts.KeepaliveParams.Time
ac.cc.mu.Lock()
if v > ac.cc.mkp.Time {
Expand Down
7 changes: 0 additions & 7 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,6 @@
// the godoc of the top-level grpc package.
package internal

// TestingCloseConns closes all existing transports but keeps
// grpcServer.lis accepting new connections.
//
// The provided grpcServer must be of type *grpc.Server. It is untyped
// for circular dependency reasons.
var TestingCloseConns func(grpcServer interface{})

// TestingUseHandlerImpl enables the http.Handler-based server implementation.
// It must be called before Serve and requires TLS credentials.
//
Expand Down
12 changes: 8 additions & 4 deletions rpc_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,15 @@ func Peer(peer *peer.Peer) CallOption {
}

// FailFast configures the action to take when an RPC is attempted on broken
// connections or unreachable servers. If failFast is true, the RPC will fail
// connections or unreachable servers. If failFast is true, the RPC will fail
// immediately. Otherwise, the RPC client will block the call until a
// connection is available (or the call is canceled or times out). Please refer
// to https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md.
// The default behavior of RPCs is to fail fast.
// connection is available (or the call is canceled or times out) and will
// retry the call if it fails due to a transient error. gRPC will not retry if
// data was written to the wire unless the server indicates it did not process
// the data. Please refer to
// https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md.
//
// By default, RPCs are "Fail Fast".
func FailFast(failFast bool) CallOption {
return beforeCall(func(c *callInfo) error {
c.failFast = failFast
Expand Down
14 changes: 0 additions & 14 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1184,25 +1184,11 @@ func (s *Server) GracefulStop() {
}

func init() {
internal.TestingCloseConns = func(arg interface{}) {
arg.(*Server).testingCloseConns()
}
internal.TestingUseHandlerImpl = func(arg interface{}) {
arg.(*Server).opts.useHandlerImpl = true
}
}

// testingCloseConns closes all existing transports but keeps s.lis
// accepting new connections.
func (s *Server) testingCloseConns() {
s.mu.Lock()
for c := range s.conns {
c.Close()
delete(s.conns, c)
}
s.mu.Unlock()
}

// SetHeader sets the header metadata.
// When called multiple times, all the provided metadata will be merged.
// All the metadata will be sent out when one of the following happens:
Expand Down
41 changes: 19 additions & 22 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,42 +199,39 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
}
}()
}

for {
// Check to make sure the context has expired. This will prevent us from
// looping forever if an error occurs for wait-for-ready RPCs where no data
// is sent on the wire.
select {
case <-ctx.Done():
return nil, toRPCErr(ctx.Err())
default:
}

t, done, err = cc.getTransport(ctx, c.failFast)
if err != nil {
// TODO(zhaoq): Probably revisit the error handling.
if _, ok := status.FromError(err); ok {
return nil, err
}
if err == errConnClosing || err == errConnUnavailable {
if c.failFast {
return nil, Errorf(codes.Unavailable, "%v", err)
}
continue
}
// All the other errors are treated as Internal errors.
return nil, Errorf(codes.Internal, "%v", err)
return nil, err
}

s, err = t.NewStream(ctx, callHdr)
if err != nil {
if _, ok := err.(transport.ConnectionError); ok && done != nil {
// If error is connection error, transport was sending data on wire,
// and we are not sure if anything has been sent on wire.
// If error is not connection error, we are sure nothing has been sent.
updateRPCInfoInContext(ctx, rpcInfo{bytesSent: true, bytesReceived: false})
}
if done != nil {
done(balancer.DoneInfo{Err: err})
done = nil
}
if _, ok := err.(transport.ConnectionError); (ok || err == transport.ErrStreamDrain) && !c.failFast {
// In the event of any error from NewStream, we never attempted to write
// anything to the wire, so we can retry indefinitely for non-fail-fast
// RPCs.
if !c.failFast {
continue
}
return nil, toRPCErr(err)
}
break
}

// Set callInfo.peer object from stream's context.
if peer, ok := peer.FromContext(s.Context()); ok {
c.peer = peer
Expand All @@ -260,8 +257,8 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
statsCtx: ctx,
statsHandler: cc.dopts.copts.StatsHandler,
}
// Listen on ctx.Done() to detect cancellation and s.Done() to detect normal termination
// when there is no pending I/O operations on this stream.
// Listen on s.Context().Done() to detect cancellation and s.Done() to detect
// normal termination when there is no pending I/O operations on this stream.
go func() {
select {
case <-t.Error():
Expand Down Expand Up @@ -502,7 +499,7 @@ func (cs *clientStream) finish(err error) {
}
if cs.done != nil {
updateRPCInfoInContext(cs.s.Context(), rpcInfo{
bytesSent: cs.s.BytesSent(),
bytesSent: true,
bytesReceived: cs.s.BytesReceived(),
})
cs.done(balancer.DoneInfo{Err: err})
Expand Down
Loading

0 comments on commit 8ff8683

Please sign in to comment.