Skip to content

Commit

Permalink
[query-frontend] Disable write deadline for /active_series requests
Browse files Browse the repository at this point in the history
  • Loading branch information
flxbk committed Mar 6, 2024
1 parent 3374004 commit 68db107
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 0 deletions.
15 changes: 15 additions & 0 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,17 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
activityIndex := f.at.Insert(func() string { return httpRequestActivity(r, r.Header.Get("User-Agent"), params) })
defer f.at.Delete(activityIndex)

if isStreamingEndpoint(r) {
// Disable the write deadline for streaming endpoints.
rc := http.NewResponseController(w)
err = rc.SetWriteDeadline(time.Time{})
if err != nil {
err := fmt.Errorf("failed to set write deadline for response writer: %w", err)
writeError(w, apierror.New(apierror.TypeInternal, err.Error()))
return
}
}

startTime := time.Now()
resp, err := f.roundTripper.RoundTrip(r)
queryResponseTime := time.Since(startTime)
Expand Down Expand Up @@ -441,3 +452,7 @@ func httpRequestActivity(request *http.Request, userAgent string, requestParams
// This doesn't have to be pretty, just useful for debugging, so prioritize efficiency.
return fmt.Sprintf("user:%s UA:%s req:%s %s %s", tenantID, userAgent, request.Method, request.URL.Path, params)
}

func isStreamingEndpoint(r *http.Request) bool {
return strings.HasSuffix(r.URL.Path, "api/v1/cardinality/active_series")
}
41 changes: 41 additions & 0 deletions pkg/frontend/transport/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,47 @@ func TestHandler_LogsFormattedQueryDetails(t *testing.T) {
}
}

func TestHandler_StreamingWriteTimeout(t *testing.T) {
for _, tt := range []struct {
name string
path string
wantError bool
}{
{name: "deadline exceeded for non-streaming endpoint", path: "/api/v1/query", wantError: true},
{name: "deadline not exceeded for streaming endpoint", path: "/api/v1/cardinality/active_series"},
} {
t.Run(tt.name, func(t *testing.T) {
const serverWriteTimeout = 10 * time.Millisecond

roundTripper := roundTripperFunc(func(req *http.Request) (*http.Response, error) {
// Simulate a request that takes longer than the server write timeout.
time.Sleep(2 * serverWriteTimeout)
return &http.Response{StatusCode: http.StatusOK, Body: io.NopCloser(strings.NewReader("{}"))}, nil
})

handler := NewHandler(HandlerConfig{}, roundTripper, log.NewNopLogger(), nil, nil)

server := httptest.NewUnstartedServer(handler)
server.Config.WriteTimeout = serverWriteTimeout
server.Start()
defer server.Close()

req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s%s", server.URL, tt.path), nil)
require.NoError(t, err)
resp, err := http.DefaultClient.Do(req)
if tt.wantError {
require.Error(t, err)
return
}
require.NoError(t, err)
defer func() {
_, _ = io.Copy(io.Discard, resp.Body)
_ = resp.Body.Close()
}()
})
}
}

type testLogger struct {
logMessages []map[string]interface{}
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/util/gziphandler/gzip.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ func (w *GzipResponseWriter) Write(b []byte) (int, error) {
return w.startPlainWrite(len(b))
}

func (w *GzipResponseWriter) Unwrap() http.ResponseWriter {
return w.ResponseWriter
}

func (w *GzipResponseWriter) startPlainWrite(blen int) (int, error) {
if err := w.startPlain(); err != nil {
return 0, err
Expand Down

0 comments on commit 68db107

Please sign in to comment.