From 68db107309904fb4e6c92923c36c44c3a2c84349 Mon Sep 17 00:00:00 2001 From: Felix Beuke Date: Wed, 6 Mar 2024 15:48:08 +0100 Subject: [PATCH] [query-frontend] Disable write deadline for `/active_series` requests --- pkg/frontend/transport/handler.go | 15 ++++++++++ pkg/frontend/transport/handler_test.go | 41 ++++++++++++++++++++++++++ pkg/util/gziphandler/gzip.go | 4 +++ 3 files changed, 60 insertions(+) diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index 0027c00af01..09448bb0c49 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -190,6 +190,17 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { activityIndex := f.at.Insert(func() string { return httpRequestActivity(r, r.Header.Get("User-Agent"), params) }) defer f.at.Delete(activityIndex) + if isStreamingEndpoint(r) { + // Disable the write deadline for streaming endpoints. + rc := http.NewResponseController(w) + err = rc.SetWriteDeadline(time.Time{}) + if err != nil { + err := fmt.Errorf("failed to set write deadline for response writer: %w", err) + writeError(w, apierror.New(apierror.TypeInternal, err.Error())) + return + } + } + startTime := time.Now() resp, err := f.roundTripper.RoundTrip(r) queryResponseTime := time.Since(startTime) @@ -441,3 +452,7 @@ func httpRequestActivity(request *http.Request, userAgent string, requestParams // This doesn't have to be pretty, just useful for debugging, so prioritize efficiency. return fmt.Sprintf("user:%s UA:%s req:%s %s %s", tenantID, userAgent, request.Method, request.URL.Path, params) } + +func isStreamingEndpoint(r *http.Request) bool { + return strings.HasSuffix(r.URL.Path, "api/v1/cardinality/active_series") +} diff --git a/pkg/frontend/transport/handler_test.go b/pkg/frontend/transport/handler_test.go index b842e97d104..0be6440472a 100644 --- a/pkg/frontend/transport/handler_test.go +++ b/pkg/frontend/transport/handler_test.go @@ -533,6 +533,47 @@ func TestHandler_LogsFormattedQueryDetails(t *testing.T) { } } +func TestHandler_StreamingWriteTimeout(t *testing.T) { + for _, tt := range []struct { + name string + path string + wantError bool + }{ + {name: "deadline exceeded for non-streaming endpoint", path: "/api/v1/query", wantError: true}, + {name: "deadline not exceeded for streaming endpoint", path: "/api/v1/cardinality/active_series"}, + } { + t.Run(tt.name, func(t *testing.T) { + const serverWriteTimeout = 10 * time.Millisecond + + roundTripper := roundTripperFunc(func(req *http.Request) (*http.Response, error) { + // Simulate a request that takes longer than the server write timeout. + time.Sleep(2 * serverWriteTimeout) + return &http.Response{StatusCode: http.StatusOK, Body: io.NopCloser(strings.NewReader("{}"))}, nil + }) + + handler := NewHandler(HandlerConfig{}, roundTripper, log.NewNopLogger(), nil, nil) + + server := httptest.NewUnstartedServer(handler) + server.Config.WriteTimeout = serverWriteTimeout + server.Start() + defer server.Close() + + req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s%s", server.URL, tt.path), nil) + require.NoError(t, err) + resp, err := http.DefaultClient.Do(req) + if tt.wantError { + require.Error(t, err) + return + } + require.NoError(t, err) + defer func() { + _, _ = io.Copy(io.Discard, resp.Body) + _ = resp.Body.Close() + }() + }) + } +} + type testLogger struct { logMessages []map[string]interface{} } diff --git a/pkg/util/gziphandler/gzip.go b/pkg/util/gziphandler/gzip.go index ecd69631e76..7d0f469397b 100644 --- a/pkg/util/gziphandler/gzip.go +++ b/pkg/util/gziphandler/gzip.go @@ -158,6 +158,10 @@ func (w *GzipResponseWriter) Write(b []byte) (int, error) { return w.startPlainWrite(len(b)) } +func (w *GzipResponseWriter) Unwrap() http.ResponseWriter { + return w.ResponseWriter +} + func (w *GzipResponseWriter) startPlainWrite(blen int) (int, error) { if err := w.startPlain(); err != nil { return 0, err