Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query-frontend] Set write deadline for /active_series requests #7553

Merged
merged 5 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
* [ENHANCEMENT] API: Use github.com/klauspost/compress for faster gzip and deflate compression of API responses. #7475
* [ENHANCEMENT] Ingester: Limiting on owned series (`-ingester.use-ingester-owned-series-for-limits`) now prevents discards in cases where a tenant is sharded across all ingesters (or shuffle sharding is disabled) and the ingester count increases. #7411
* [ENHANCEMENT] Block upload: include converted timestamps in the error message if block is from the future. #7538
* [ENHANCEMENT] Query-frontend: Introduce `-query-frontend.active-series-write-timeout` to allow configuring the server-side write timeout for active series requests. #7553
* [BUGFIX] Ingester: don't ignore errors encountered while iterating through chunks or samples in response to a query request. #6451
* [BUGFIX] Fix issue where queries can fail or omit OOO samples if OOO head compaction occurs between creating a querier and reading chunks #6766
* [BUGFIX] Fix issue where concatenatingChunkIterator can obscure errors #6766
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -4687,6 +4687,17 @@
"fieldType": "boolean",
"fieldCategory": "advanced"
},
{
"kind": "field",
"name": "active_series_write_timeout",
"required": false,
"desc": "Timeout for writing active series responses. 0 means the value from `-server.http-write-timeout` is used.",
"fieldValue": null,
"fieldDefaultValue": 300000000000,
"fieldFlag": "query-frontend.active-series-write-timeout",
"fieldType": "duration",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "max_outstanding_per_tenant",
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1799,6 +1799,8 @@ Usage of ./cmd/mimir/mimir:
[experimental] Number of series to buffer per store-gateway when streaming chunks from store-gateways. (default 256)
-querier.timeout duration
The timeout for a query. This config option should be set on query-frontend too when query sharding is enabled. This also applies to queries evaluated by the ruler (internally or remotely). (default 2m0s)
-query-frontend.active-series-write-timeout duration
[experimental] Timeout for writing active series responses. 0 means the value from `-server.http-write-timeout` is used. (default 5m0s)
-query-frontend.additional-query-queue-dimensions-enabled
[experimental] Enqueue query requests with additional queue dimensions to split tenant request queues into subqueues. This enables separate requests to proceed from a tenant's subqueues even when other subqueues are blocked on slow query requests. Must be set on both query-frontend and scheduler to take effect. (default false)
-query-frontend.align-queries-with-step
Expand Down
1 change: 1 addition & 0 deletions docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ The following features are currently experimental:
- Query blocking on a per-tenant basis (configured with the limit `blocked_queries`)
- Max number of tenants that may be queried at once (`-tenant-federation.max-tenants`)
- Sharding of active series queries (`-query-frontend.shard-active-series-queries`)
- Server-side write timeout for responses to active series requests (`-query-frontend.active-series-write-timeout`)
- Query-scheduler
- `-query-scheduler.querier-forget-delay`
- Store-gateway
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1385,6 +1385,11 @@ The `frontend` block configures the query-frontend.
# CLI flag: -query-frontend.query-stats-enabled
[query_stats_enabled: <boolean> | default = true]

# (experimental) Timeout for writing active series responses. 0 means the value
# from `-server.http-write-timeout` is used.
# CLI flag: -query-frontend.active-series-write-timeout
[active_series_write_timeout: <duration> | default = 5m]

# (advanced) Maximum number of outstanding requests per tenant per frontend;
# requests beyond this error with HTTP 429.
# CLI flag: -querier.max-outstanding-requests-per-tenant
Expand Down
24 changes: 20 additions & 4 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,19 @@ var (

// HandlerConfig is a config for the handler.
type HandlerConfig struct {
LogQueriesLongerThan time.Duration `yaml:"log_queries_longer_than"`
LogQueryRequestHeaders flagext.StringSliceCSV `yaml:"log_query_request_headers" category:"advanced"`
MaxBodySize int64 `yaml:"max_body_size" category:"advanced"`
QueryStatsEnabled bool `yaml:"query_stats_enabled" category:"advanced"`
LogQueriesLongerThan time.Duration `yaml:"log_queries_longer_than"`
LogQueryRequestHeaders flagext.StringSliceCSV `yaml:"log_query_request_headers" category:"advanced"`
MaxBodySize int64 `yaml:"max_body_size" category:"advanced"`
QueryStatsEnabled bool `yaml:"query_stats_enabled" category:"advanced"`
ActiveSeriesWriteTimeout time.Duration `yaml:"active_series_write_timeout" category:"experimental"`
}

func (cfg *HandlerConfig) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.LogQueriesLongerThan, "query-frontend.log-queries-longer-than", 0, "Log queries that are slower than the specified duration. Set to 0 to disable. Set to < 0 to enable on all queries.")
f.Var(&cfg.LogQueryRequestHeaders, "query-frontend.log-query-request-headers", "Comma-separated list of request header names to include in query logs. Applies to both query stats and slow queries logs.")
f.Int64Var(&cfg.MaxBodySize, "query-frontend.max-body-size", 10*1024*1024, "Max body size for downstream prometheus.")
f.BoolVar(&cfg.QueryStatsEnabled, "query-frontend.query-stats-enabled", true, "False to disable query statistics tracking. When enabled, a message with some statistics is logged for every query.")
f.DurationVar(&cfg.ActiveSeriesWriteTimeout, "query-frontend.active-series-write-timeout", 5*time.Minute, "Timeout for writing active series responses. 0 means the value from `-server.http-write-timeout` is used.")
}

// Handler accepts queries and forwards them to RoundTripper. It can wait on in-flight requests and log slow queries,
Expand Down Expand Up @@ -190,6 +192,16 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
activityIndex := f.at.Insert(func() string { return httpRequestActivity(r, r.Header.Get("User-Agent"), params) })
defer f.at.Delete(activityIndex)

if isActiveSeriesEndpoint(r) && f.cfg.ActiveSeriesWriteTimeout > 0 {
deadline := time.Now().Add(f.cfg.ActiveSeriesWriteTimeout)
err = http.NewResponseController(w).SetWriteDeadline(deadline)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you confirm we don't need to also set the read deadline, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, the requests are typically small and the default read deadline should be applied.

if err != nil {
err := fmt.Errorf("failed to set write deadline for response writer: %w", err)
writeError(w, apierror.New(apierror.TypeInternal, err.Error()))
return
}
}

startTime := time.Now()
resp, err := f.roundTripper.RoundTrip(r)
queryResponseTime := time.Since(startTime)
Expand Down Expand Up @@ -441,3 +453,7 @@ func httpRequestActivity(request *http.Request, userAgent string, requestParams
// This doesn't have to be pretty, just useful for debugging, so prioritize efficiency.
return fmt.Sprintf("user:%s UA:%s req:%s %s %s", tenantID, userAgent, request.Method, request.URL.Path, params)
}

func isActiveSeriesEndpoint(r *http.Request) bool {
return strings.HasSuffix(r.URL.Path, "api/v1/cardinality/active_series")
}
41 changes: 41 additions & 0 deletions pkg/frontend/transport/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,47 @@ func TestHandler_LogsFormattedQueryDetails(t *testing.T) {
}
}

func TestHandler_ActiveSeriesWriteTimeout(t *testing.T) {
for _, tt := range []struct {
name string
path string
wantError bool
}{
{name: "deadline exceeded for non-streaming endpoint", path: "/api/v1/query", wantError: true},
{name: "deadline not exceeded for streaming endpoint", path: "/api/v1/cardinality/active_series"},
} {
t.Run(tt.name, func(t *testing.T) {
const serverWriteTimeout = 50 * time.Millisecond

roundTripper := roundTripperFunc(func(req *http.Request) (*http.Response, error) {
// Simulate a request that takes longer than the server write timeout.
time.Sleep(2 * serverWriteTimeout)
return &http.Response{StatusCode: http.StatusOK, Body: io.NopCloser(strings.NewReader("{}"))}, nil
})

handler := NewHandler(HandlerConfig{ActiveSeriesWriteTimeout: time.Minute}, roundTripper, log.NewNopLogger(), nil, nil)

server := httptest.NewUnstartedServer(handler)
server.Config.WriteTimeout = serverWriteTimeout
server.Start()
defer server.Close()

req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s%s", server.URL, tt.path), nil)
require.NoError(t, err)
resp, err := http.DefaultClient.Do(req)
if tt.wantError {
require.Error(t, err)
return
}
require.NoError(t, err)
defer func() {
_, _ = io.Copy(io.Discard, resp.Body)
_ = resp.Body.Close()
}()
})
}
}

type testLogger struct {
logMessages []map[string]interface{}
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/util/gziphandler/gzip.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,12 @@ func (w *GzipResponseWriter) Write(b []byte) (int, error) {
return w.startPlainWrite(len(b))
}

// Unwrap returns the underlying ResponseWriter. This interface is used by
// http.ResponseController to operate on the underlying ResponseWriter.
func (w *GzipResponseWriter) Unwrap() http.ResponseWriter {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why is this change required?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a comment that explains why this is required.

return w.ResponseWriter
}

func (w *GzipResponseWriter) startPlainWrite(blen int) (int, error) {
if err := w.startPlain(); err != nil {
return 0, err
Expand Down
Loading