Skip to content

Commit

Permalink
Added logs throughout the transport layer for connection and transpor…
Browse files Browse the repository at this point in the history
…t close
  • Loading branch information
zasweq committed Dec 3, 2022
1 parent 7361971 commit 9382de1
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 8 deletions.
9 changes: 9 additions & 0 deletions internal/transport/controlbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,9 @@ func (c *controlBuffer) get(block bool) (interface{}, error) {
select {
case <-c.ch:
case <-c.done:
if logger.V(logLevel) {
logger.Infof("transport: trigger loopy to exit (with a subsequent conn closure) because transports context expired.")
}
return nil, ErrConnClosing
}
}
Expand Down Expand Up @@ -772,6 +775,9 @@ func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
}
}
if l.side == clientSide && l.draining && len(l.estdStreams) == 0 {
if logger.V(logLevel) {
logger.Infof("transport: trigger loopy to exit (with a subsequent conn closure) because no active streams left to finish processing while in draining mode.")
}
return ErrConnClosing
}
return nil
Expand Down Expand Up @@ -807,6 +813,9 @@ func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
if l.side == clientSide {
l.draining = true
if len(l.estdStreams) == 0 {
if logger.V(logLevel) {
logger.Infof("transport: trigger loopy to exit (with a subsequent conn closure) because no active streams to wait to finish processing when GOAWAY frame received.")
}
return ErrConnClosing
}
}
Expand Down
41 changes: 36 additions & 5 deletions internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
// Any further errors will close the underlying connection
defer func(conn net.Conn) {
if err != nil {
if logger.V(logLevel) {
logger.Errorf("transport: closing connection due to error %v.", err)
}
conn.Close()
}
}(conn)
Expand All @@ -244,6 +247,9 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
<-newClientCtx.Done() // Block until connectCtx expires or the defer above executes.
if connectCtx.Err() != nil {
// connectCtx expired before exiting the function. Hard close the connection.
if logger.V(logLevel) {
logger.Infof("transport: closing connection due to connect context expiring.")
}
conn.Close()
}
}(conn)
Expand Down Expand Up @@ -396,6 +402,9 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
err = <-readerErrCh
}
if err != nil {
if logger.V(logLevel) {
logger.Errorf("transport: closing transport due to error reading server preface: %v.", err)
}
t.Close(err)
}
}()
Expand Down Expand Up @@ -447,7 +456,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
err := t.loopy.run()
if err != nil {
if logger.V(logLevel) {
logger.Errorf("transport: loopyWriter.run returning. Err: %v", err)
logger.Errorf("transport: loopyWriter.run returned. Err: %v. Closing connection.", err)
}
}
// Do not close the transport. Let reader goroutine handle it since
Expand Down Expand Up @@ -965,6 +974,9 @@ func (t *http2Client) Close(err error) {
t.mu.Unlock()
t.controlBuf.finish()
t.cancel()
if logger.V(logLevel) {
logger.Infof("transport: calling Close on the conn within transport.Close.")
}
t.conn.Close()
channelz.RemoveEntry(t.channelzID)
// Append info about previous goaways if there were any, since this may be important
Expand Down Expand Up @@ -1007,6 +1019,9 @@ func (t *http2Client) GracefulClose() {
active := len(t.activeStreams)
t.mu.Unlock()
if active == 0 {
if logger.V(logLevel) {
logger.Infof("transport: closing transport because no active streams left to process in GracefulClose call.")
}
t.Close(ErrConnClosing)
return
}
Expand Down Expand Up @@ -1273,7 +1288,11 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
// If there are multiple GoAways the first one should always have an ID greater than the following ones.
if id > t.prevGoAwayID {
t.mu.Unlock()
t.Close(connectionErrorf(true, nil, "received goaway with stream id: %v, which exceeds stream id of previous goaway: %v", id, t.prevGoAwayID))
err := connectionErrorf(true, nil, "received goaway with stream id: %v, which exceeds stream id of previous goaway: %v", id, t.prevGoAwayID)
if logger.V(logLevel) {
logger.Errorf("transport: closing transport due to error in goaway handling: %v.", err)
}
t.Close(err)
return
}
default:
Expand All @@ -1296,7 +1315,11 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
t.prevGoAwayID = id
if len(t.activeStreams) == 0 {
t.mu.Unlock()
t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams"))
err := connectionErrorf(true, nil, "received goaway and there are no active streams")
if logger.V(logLevel) {
logger.Errorf("transport: closing transport due to error in goaway handling: %v.", err)
}
t.Close(err)
return
}

Expand Down Expand Up @@ -1602,7 +1625,11 @@ func (t *http2Client) reader(errCh chan<- error) {
continue
} else {
// Transport error.
t.Close(connectionErrorf(true, err, "error reading from server: %v", err))
err := connectionErrorf(true, err, "error reading from server: %v", err)
if logger.V(logLevel) {
logger.Errorf("transport: closing transport due to connection level error %v.", err)
}
t.Close(err)
return
}
}
Expand Down Expand Up @@ -1661,7 +1688,11 @@ func (t *http2Client) keepalive() {
continue
}
if outstandingPing && timeoutLeft <= 0 {
t.Close(connectionErrorf(true, nil, "keepalive ping failed to receive ACK within timeout"))
err := connectionErrorf(true, nil, "keepalive ping failed to receive ACK within timeout")
if logger.V(logLevel) {
logger.Errorf("transport: closing transport due to error in keep alive handling: %v.", err)
}
t.Close(err)
return
}
t.mu.Lock()
Expand Down
18 changes: 15 additions & 3 deletions internal/transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
if err := t.loopy.run(); err != nil {
if logger.V(logLevel) {
logger.Errorf("transport: loopyWriter.run returning. Err: %v", err)
logger.Errorf("transport: loopyWriter.run returned. Err: %v. Closing Connection.", err)
}
}
t.conn.Close()
Expand Down Expand Up @@ -630,18 +630,24 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
continue
}
if err == io.EOF || err == io.ErrUnexpectedEOF {
if logger.V(logLevel) {
logger.Infof("transport: closing the server transport due to connection level EOF.")
}
t.Close()
return
}
if logger.V(logLevel) {
logger.Warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
logger.Warningf("transport: http2Server.HandleStreams failed to read frame: %v, closing transport.", err)
}
t.Close()
return
}
switch frame := frame.(type) {
case *http2.MetaHeadersFrame:
if t.operateHeaders(frame, handle, traceCtx) {
if logger.V(logLevel) {
logger.Errorf("transport: closing the transport due to fatal error processing headers frame.")
}
t.Close()
break
}
Expand Down Expand Up @@ -1170,7 +1176,7 @@ func (t *http2Server) keepalive() {
}
if outstandingPing && kpTimeoutLeft <= 0 {
if logger.V(logLevel) {
logger.Infof("transport: closing server transport due to idleness.")
logger.Infof("transport: closing server transport due to keep alive ping not being acked within timeout %v.", t.kp.Time.String())
}
t.Close()
return
Expand Down Expand Up @@ -1211,6 +1217,9 @@ func (t *http2Server) Close() {
t.mu.Unlock()
t.controlBuf.finish()
close(t.done)
if logger.V(logLevel) {
logger.Infof("transport: calling Close on the conn within transport.Close.")
}
if err := t.conn.Close(); err != nil && logger.V(logLevel) {
logger.Infof("transport: error closing conn during Close: %v", err)
}
Expand Down Expand Up @@ -1320,6 +1329,9 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
t.state = draining
sid := t.maxStreamID
if len(t.activeStreams) == 0 {
if logger.V(logLevel) {
logger.Infof("transport: second goaway written and no active streams left to process, closing the connection.")
}
g.closeConn = true
}
t.mu.Unlock()
Expand Down

0 comments on commit 9382de1

Please sign in to comment.