Skip to content

Commit

Permalink
frontend/transport: log non-2xx replies from downstream as non-succes…
Browse files Browse the repository at this point in the history
…sful

Signed-off-by: Vladimir Varankin <vladimir.varankin@grafana.com>
  • Loading branch information
narqo committed Feb 8, 2024
1 parent 5c736df commit da77ead
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 26 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
* `other`: any other request
* [BUGFIX] Fix performance regression introduced in Mimir 2.11.0 when uploading blocks to AWS S3. #7240
* [BUGFIX] Query-frontend: fix race condition when sharding active series is enabled (see above) and response is compressed with snappy. #7290
* [BUGFIX] Query-frontend: "query stats" log unsuccessful replies from downstream as "failed". #7296

### Mixin

Expand Down
31 changes: 23 additions & 8 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var (
errRequestEntityTooLarge = httpgrpc.Errorf(http.StatusRequestEntityTooLarge, "http: request body too large")
)

// Config for a Handler.
// HandlerConfig is a config for the handler.
type HandlerConfig struct {
LogQueriesLongerThan time.Duration `yaml:"log_queries_longer_than"`
LogQueryRequestHeaders flagext.StringSliceCSV `yaml:"log_query_request_headers" category:"advanced"`
Expand Down Expand Up @@ -196,7 +196,7 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

if err != nil {
writeError(w, err)
f.reportQueryStats(r, params, startTime, queryResponseTime, 0, queryDetails, err)
f.reportQueryStats(r, params, startTime, queryResponseTime, 0, queryDetails, 0, err)
return
}

Expand All @@ -217,13 +217,13 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
f.reportSlowQuery(r, params, queryResponseTime, queryDetails)
}
if f.cfg.QueryStatsEnabled {
f.reportQueryStats(r, params, startTime, queryResponseTime, queryResponseSize, queryDetails, nil)
f.reportQueryStats(r, params, startTime, queryResponseTime, queryResponseSize, queryDetails, resp.StatusCode, nil)
}
}

// reportSlowQuery reports slow queries.
func (f *Handler) reportSlowQuery(r *http.Request, queryString url.Values, queryResponseTime time.Duration, details *querymiddleware.QueryDetails) {
logMessage := append([]interface{}{
logMessage := append([]any{
"msg", "slow query detected",
"method", r.Method,
"host", r.Host,
Expand All @@ -238,7 +238,16 @@ func (f *Handler) reportSlowQuery(r *http.Request, queryString url.Values, query
level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)
}

func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, queryStartTime time.Time, queryResponseTime time.Duration, queryResponseSizeBytes int64, details *querymiddleware.QueryDetails, queryErr error) {
func (f *Handler) reportQueryStats(
r *http.Request,
queryString url.Values,
queryStartTime time.Time,
queryResponseTime time.Duration,
queryResponseSizeBytes int64,
details *querymiddleware.QueryDetails,
queryResponseStatusCode int,
queryErr error,
) {
tenantIDs, err := tenant.TenantIDs(r.Context())
if err != nil {
return
Expand Down Expand Up @@ -266,12 +275,13 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer
}

// Log stats.
logMessage := append([]interface{}{
logMessage := append([]any{
"msg", "query stats",
"component", "query-frontend",
"method", r.Method,
"path", r.URL.Path,
"user_agent", r.UserAgent(),
"status_code", queryResponseStatusCode,
"response_time", queryResponseTime,
"response_size_bytes", queryResponseSizeBytes,
"query_wall_time_seconds", wallTime.Seconds(),
Expand Down Expand Up @@ -312,6 +322,11 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer
logMessage = append(logMessage, formatRequestHeaders(&r.Header, f.cfg.LogQueryRequestHeaders)...)
}

if queryErr == nil && queryResponseStatusCode/100 != 2 {
// If downstream replied with non-2xx, log this as a failure.
queryErr = fmt.Errorf("downstream replied with %s", http.StatusText(queryResponseStatusCode))
}

if queryErr != nil {
logStatus := "failed"
if errors.Is(queryErr, context.Canceled) {
Expand All @@ -332,7 +347,7 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer
}

// formatQueryString prefers printing start, end, and step from details if they are not nil.
func formatQueryString(details *querymiddleware.QueryDetails, queryString url.Values) (fields []interface{}) {
func formatQueryString(details *querymiddleware.QueryDetails, queryString url.Values) (fields []any) {
for k, v := range queryString {
var formattedValue string
if details != nil {
Expand Down Expand Up @@ -368,7 +383,7 @@ func paramValueFromDetails(details *querymiddleware.QueryDetails, paramName stri
return ""
}

func formatRequestHeaders(h *http.Header, headersToLog []string) (fields []interface{}) {
func formatRequestHeaders(h *http.Header, headersToLog []string) (fields []any) {
for _, s := range headersToLog {
if v := h.Get(s); v != "" {
fields = append(fields, fmt.Sprintf("header_%s", strings.ReplaceAll(strings.ToLower(s), "-", "_")), v)
Expand Down
56 changes: 38 additions & 18 deletions pkg/frontend/transport/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,45 +238,66 @@ func TestHandler_FailedRoundTrip(t *testing.T) {
for _, test := range []struct {
name string
cfg HandlerConfig
expectedMetrics int
path string
queryResponseFunc roundTripperFunc
expectedStatusCode int
expectedMetrics int
expectedStatusLog string
expectQueryParamLog bool
queryErr error
}{
{
name: "Failed round trip with context cancelled",
cfg: HandlerConfig{QueryStatsEnabled: false},
name: "Failed round trip with context cancelled",
cfg: HandlerConfig{QueryStatsEnabled: false},
path: "/api/v1/query?query=up&time=2015-07-01T20:10:51.781Z",
queryResponseFunc: func(*http.Request) (*http.Response, error) {
return nil, context.Canceled
},
expectedStatusCode: StatusClientClosedRequest,
expectedMetrics: 0,
path: "/api/v1/query?query=up&time=2015-07-01T20:10:51.781Z",
expectedStatusLog: "canceled",
expectQueryParamLog: true,
queryErr: context.Canceled,
},
{
name: "Failed round trip with no query params",
cfg: HandlerConfig{QueryStatsEnabled: true},
name: "Failed round trip with no query params",
cfg: HandlerConfig{QueryStatsEnabled: true},
path: "/api/v1/query",
queryResponseFunc: func(*http.Request) (*http.Response, error) {
return nil, context.Canceled
},
expectedStatusCode: StatusClientClosedRequest,
expectedMetrics: 5,
path: "/api/v1/query",
expectedStatusLog: "canceled",
expectQueryParamLog: false,
},
{
name: "Failed round trip with HTTP response",
cfg: HandlerConfig{QueryStatsEnabled: true},
path: "/api/v1/query",
queryResponseFunc: func(*http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusInternalServerError,
Body: io.NopCloser(strings.NewReader("{}")),
}, nil
},
expectedStatusCode: http.StatusInternalServerError,
expectedMetrics: 5,
expectedStatusLog: "failed",
expectQueryParamLog: false,
queryErr: context.Canceled,
},
} {
t.Run(test.name, func(t *testing.T) {
roundTripper := roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return nil, test.queryErr
})

reg := prometheus.NewPedanticRegistry()
logs := &concurrency.SyncBuffer{}
logger := log.NewLogfmtLogger(logs)
handler := NewHandler(test.cfg, roundTripper, logger, reg, nil)
handler := NewHandler(test.cfg, test.queryResponseFunc, logger, reg, nil)

ctx := user.InjectOrgID(context.Background(), "12345")
req := httptest.NewRequest("GET", test.path, nil)
req = req.WithContext(ctx)
resp := httptest.NewRecorder()

handler.ServeHTTP(resp, req)
require.Equal(t, StatusClientClosedRequest, resp.Code)
require.Equal(t, test.expectedStatusCode, resp.Code)

count, err := promtest.GatherAndCount(
reg,
Expand All @@ -286,11 +307,10 @@ func TestHandler_FailedRoundTrip(t *testing.T) {
"cortex_query_fetched_chunks_total",
"cortex_query_fetched_index_bytes_total",
)

require.NoError(t, err)

assert.Contains(t, strings.TrimSpace(logs.String()), "sharded_queries")
assert.Contains(t, strings.TrimSpace(logs.String()), "status=canceled")
assert.Contains(t, strings.TrimSpace(logs.String()), fmt.Sprintf("status=%s", test.expectedStatusLog))
if test.expectQueryParamLog {
assert.Contains(t, strings.TrimSpace(logs.String()), "param_query")
}
Expand Down

0 comments on commit da77ead

Please sign in to comment.