Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optionally wait for the query-frontend to start up before rejecting requests #6621

Merged
merged 15 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
* [ENHANCEMENT] Server: Add `-server.report-grpc-codes-in-instrumentation-label-enabled` CLI flag to specify whether gRPC status codes should be used in `status_code` label of `cortex_request_duration_seconds` metric. It defaults to false, meaning that successful and erroneous gRPC status codes are represented with `success` and `error` respectively. #6562
* [ENHANCEMENT] Server: Add `-ingester.client.report-grpc-codes-in-instrumentation-label-enabled` CLI flag to specify whether gRPC status codes should be used in `status_code` label of `cortex_ingester_client_request_duration_seconds` metric. It defaults to false, meaning that successful and erroneous gRPC status codes are represented with `2xx` and `error` respectively. #6562
* [ENHANCEMENT] Server: Add `-server.http-log-closed-connections-without-response-enabled` option to log details about connections to HTTP server that were closed before any data was sent back. This can happen if client doesn't manage to send complete HTTP headers before timeout. #6612
* [ENHANCEMENT] Query-frontend: backoff and retry requests received while the frontend is starting up. Disabled by default, set `-query-frontend.not-running-backoff-duration` to a non-zero value to enable. #6621
* [BUGFIX] Ring: Ensure network addresses used for component hash rings are formatted correctly when using IPv6. #6068
* [BUGFIX] Query-scheduler: don't retain connections from queriers that have shut down, leading to gradually increasing enqueue latency over time. #6100 #6145
* [BUGFIX] Ingester: prevent query logic from continuing to execute after queries are canceled. #6085
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -5477,6 +5477,17 @@
"fieldType": "int",
"fieldCategory": "advanced"
},
{
"kind": "field",
"name": "not_running_backoff_duration",
"required": false,
"desc": "Time to wait between retries for a request that fails because the query-frontend is still starting up. 0 to disable backoff (ie. retry immediately)",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "query-frontend.not-running-backoff-duration",
"fieldType": "duration",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "parallelize_shardable_queries",
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1843,6 +1843,8 @@ Usage of ./cmd/mimir/mimir:
Maximum number of retries for a single request; beyond this, the downstream error is returned. (default 5)
-query-frontend.max-total-query-length duration
Limit the total query time range (end - start time). This limit is enforced in the query-frontend on the received query.
-query-frontend.not-running-backoff-duration duration
[experimental] Time to wait between retries for a request that fails because the query-frontend is still starting up. 0 to disable backoff (ie. retry immediately)
charleskorn marked this conversation as resolved.
Show resolved Hide resolved
-query-frontend.parallelize-shardable-queries
True to enable query sharding.
-query-frontend.querier-forget-delay duration
Expand Down
1 change: 1 addition & 0 deletions docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ The following features are currently experimental:
- Lower TTL for cache entries overlapping the out-of-order samples ingestion window (re-using `-ingester.out-of-order-allowance` from ingesters)
- Use of Redis cache backend (`-query-frontend.results-cache.backend=redis`)
- Query blocking on a per-tenant basis (configured with the limit `blocked_queries`)
- Backoff and retry requests received while the query-frontend is starting up (`-query-frontend.not-running-backoff-duration`)
- Query-scheduler
- `-query-scheduler.querier-forget-delay`
- Store-gateway
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1431,6 +1431,12 @@ results_cache:
# CLI flag: -query-frontend.max-retries-per-request
[max_retries: <int> | default = 5]

# (experimental) Time to wait between retries for a request that fails because
charleskorn marked this conversation as resolved.
Show resolved Hide resolved
# the query-frontend is still starting up. 0 to disable backoff (ie. retry
# immediately)
# CLI flag: -query-frontend.not-running-backoff-duration
[not_running_backoff_duration: <duration> | default = 0s]

# True to enable query sharding.
# CLI flag: -query-frontend.parallelize-shardable-queries
[parallelize_shardable_queries: <boolean> | default = false]
Expand Down
32 changes: 20 additions & 12 deletions pkg/frontend/querymiddleware/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/go-kit/log"
"github.com/grafana/dskit/cache"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/tenant"
"github.com/grafana/regexp"
"github.com/pkg/errors"
Expand Down Expand Up @@ -50,11 +51,12 @@ type Config struct {
SplitQueriesByInterval time.Duration `yaml:"split_queries_by_interval" category:"advanced"`
AlignQueriesWithStep bool `yaml:"align_queries_with_step"`
ResultsCacheConfig `yaml:"results_cache"`
CacheResults bool `yaml:"cache_results"`
MaxRetries int `yaml:"max_retries" category:"advanced"`
ShardedQueries bool `yaml:"parallelize_shardable_queries"`
DeprecatedCacheUnalignedRequests bool `yaml:"cache_unaligned_requests" category:"advanced" doc:"hidden"` // Deprecated: Deprecated in Mimir 2.10.0, remove in Mimir 2.12.0 (https://github.com/grafana/mimir/issues/5253)
TargetSeriesPerShard uint64 `yaml:"query_sharding_target_series_per_shard" category:"advanced"`
CacheResults bool `yaml:"cache_results"`
MaxRetries int `yaml:"max_retries" category:"advanced"`
NotRunningBackoffDuration time.Duration `yaml:"not_running_backoff_duration" category:"experimental"`
ShardedQueries bool `yaml:"parallelize_shardable_queries"`
DeprecatedCacheUnalignedRequests bool `yaml:"cache_unaligned_requests" category:"advanced" doc:"hidden"` // Deprecated: Deprecated in Mimir 2.10.0, remove in Mimir 2.12.0 (https://github.com/grafana/mimir/issues/5253)
TargetSeriesPerShard uint64 `yaml:"query_sharding_target_series_per_shard" category:"advanced"`

// CacheSplitter allows to inject a CacheSplitter to use for generating cache keys.
// If nil, the querymiddleware package uses a ConstSplitter with SplitQueriesByInterval.
Expand All @@ -66,6 +68,7 @@ type Config struct {
// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxRetries, "query-frontend.max-retries-per-request", 5, "Maximum number of retries for a single request; beyond this, the downstream error is returned.")
f.DurationVar(&cfg.NotRunningBackoffDuration, "query-frontend.not-running-backoff-duration", 0, "Time to wait between retries for a request that fails because the query-frontend is still starting up. 0 to disable backoff (ie. retry immediately)")
f.DurationVar(&cfg.SplitQueriesByInterval, "query-frontend.split-queries-by-interval", 24*time.Hour, "Split range queries by an interval and execute in parallel. You should use a multiple of 24 hours to optimize querying blocks. 0 to disable it.")
f.BoolVar(&cfg.AlignQueriesWithStep, "query-frontend.align-queries-with-step", false, "Mutate incoming queries to align their start and end with their step.")
f.BoolVar(&cfg.CacheResults, "query-frontend.cache-results", false, "Cache query results.")
Expand Down Expand Up @@ -174,8 +177,9 @@ func NewTripperware(
cacheExtractor Extractor,
engineOpts promql.EngineOpts,
registerer prometheus.Registerer,
frontendState func() services.State,
charleskorn marked this conversation as resolved.
Show resolved Hide resolved
) (Tripperware, error) {
queryRangeTripperware, err := newQueryTripperware(cfg, log, limits, codec, cacheExtractor, engineOpts, registerer)
queryRangeTripperware, err := newQueryTripperware(cfg, log, limits, codec, cacheExtractor, engineOpts, registerer, frontendState)
if err != nil {
return nil, err
}
Expand All @@ -193,16 +197,20 @@ func newQueryTripperware(
cacheExtractor Extractor,
engineOpts promql.EngineOpts,
registerer prometheus.Registerer,
frontendState func() services.State,
) (Tripperware, error) {
// Disable concurrency limits for sharded queries.
engineOpts.ActiveQueryTracker = nil
engine := promql.NewEngine(engineOpts)

runningMiddleware := newFrontendRunningMiddleware(frontendState, cfg.NotRunningBackoffDuration, cfg.MaxRetries, log)

// Metric used to keep track of each middleware execution duration.
metrics := newInstrumentMiddlewareMetrics(registerer)
queryBlockerMiddleware := newQueryBlockerMiddleware(limits, log, registerer)

queryRangeMiddleware := []Middleware{
runningMiddleware,
// Track query range statistics. Added first before any subsequent middleware modifies the request.
newQueryStatsMiddleware(registerer),
newLimitsMiddleware(limits, log),
Expand Down Expand Up @@ -249,13 +257,12 @@ func newQueryTripperware(
))
}

queryInstantMiddleware := []Middleware{newLimitsMiddleware(limits, log)}

queryInstantMiddleware = append(
queryInstantMiddleware,
queryInstantMiddleware := []Middleware{
runningMiddleware,
newLimitsMiddleware(limits, log),
newSplitInstantQueryByIntervalMiddleware(limits, log, engine, registerer),
queryBlockerMiddleware,
)
}

if cfg.ShardedQueries {
// Inject the cardinality estimation middleware after time-based splitting and
Expand Down Expand Up @@ -283,7 +290,8 @@ func newQueryTripperware(
registerer,
)

queryRangeMiddleware = append(queryRangeMiddleware,
queryRangeMiddleware = append(
queryRangeMiddleware,
newInstrumentMiddleware("querysharding", metrics),
queryshardingMiddleware,
)
Expand Down
8 changes: 8 additions & 0 deletions pkg/frontend/querymiddleware/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/go-kit/log"
"github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/user"
"github.com/prometheus/client_golang/api"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
Expand Down Expand Up @@ -77,6 +78,7 @@ func TestRangeTripperware(t *testing.T) {
Timeout: time.Minute,
},
nil,
alwaysRunning,
)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -128,6 +130,7 @@ func TestInstantTripperware(t *testing.T) {
Timeout: time.Minute,
},
nil,
alwaysRunning,
)
require.NoError(t, err)

Expand Down Expand Up @@ -301,6 +304,7 @@ func TestTripperware_Metrics(t *testing.T) {
Timeout: time.Minute,
},
reg,
alwaysRunning,
)
require.NoError(t, err)

Expand Down Expand Up @@ -408,3 +412,7 @@ func (s singleHostRoundTripper) RoundTrip(r *http.Request) (*http.Response, erro
r.URL.Host = s.host
return s.next.RoundTrip(r)
}

func alwaysRunning() services.State {
return services.Running
}
78 changes: 78 additions & 0 deletions pkg/frontend/querymiddleware/running.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// SPDX-License-Identifier: AGPL-3.0-only

package querymiddleware

import (
"context"
"fmt"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/services"

apierror "github.com/grafana/mimir/pkg/api/error"
"github.com/grafana/mimir/pkg/util/spanlogger"
)

func newFrontendRunningMiddleware(frontendState func() services.State, notRunningBackoffDuration time.Duration, maxRetries int, log log.Logger) Middleware {
return MiddlewareFunc(func(next Handler) Handler {
return frontendRunningMiddleware{
frontendState: frontendState,
notRunningBackoffDuration: notRunningBackoffDuration,
maxRetries: maxRetries,
log: log,

next: next,
}
})
}

type frontendRunningMiddleware struct {
frontendState func() services.State
notRunningBackoffDuration time.Duration
maxRetries int
log log.Logger

next Handler
}

func (f frontendRunningMiddleware) Do(ctx context.Context, r Request) (Response, error) {
if err := f.waitForRunning(ctx); err != nil {
return nil, err
}

return f.next.Do(ctx, r)
}

func (f frontendRunningMiddleware) waitForRunning(ctx context.Context) (err error) {
spanLog, _ := spanlogger.NewWithLogger(ctx, f.log, "waitForRunning")
charleskorn marked this conversation as resolved.
Show resolved Hide resolved
defer spanLog.Finish()

attempt := 1

for {
charleskorn marked this conversation as resolved.
Show resolved Hide resolved
state := f.frontendState()

switch state {
case services.Running:
return nil
case services.Stopping, services.Terminated, services.Failed:
level.Error(spanLog).Log("msg", "returning error: frontend is shutting down", "state", state)
return apierror.New(apierror.TypeUnavailable, fmt.Sprintf("frontend shutting down: %v", state))
}

if f.notRunningBackoffDuration == 0 {
// Retries disabled, stop now.
level.Error(spanLog).Log("msg", "returning error: frontend is not running and backoff / retry is disabled", "state", state)
return apierror.New(apierror.TypeUnavailable, fmt.Sprintf("frontend not running: %v", state))
} else if attempt >= f.maxRetries {
level.Error(spanLog).Log("msg", "returning error: frontend is still not running after backoff / retry", "state", state)
return apierror.New(apierror.TypeUnavailable, fmt.Sprintf("frontend not running after %v: %v", time.Duration(f.maxRetries-1)*f.notRunningBackoffDuration, state))
}

level.Warn(spanLog).Log("msg", "frontend is not running, will back off and check again", "state", state, "attempt", attempt)
attempt++
time.Sleep(f.notRunningBackoffDuration)
}
}
124 changes: 124 additions & 0 deletions pkg/frontend/querymiddleware/running_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// SPDX-License-Identifier: AGPL-3.0-only

package querymiddleware

import (
"context"
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/services"
"github.com/stretchr/testify/require"

apierror "github.com/grafana/mimir/pkg/api/error"
)

func TestFrontendRunningMiddleware_Enabled(t *testing.T) {
maxRetries := 5
backoffDuration := 100 * time.Millisecond

testCases := map[string]struct {
stateFunc func(callCount int) services.State
expectedStateFuncCalls int
expectedError string
}{
"frontend always in 'new' state": {
stateFunc: func(int) services.State { return services.New },
expectedStateFuncCalls: maxRetries,
expectedError: "frontend not running after 400ms: New",
},
"frontend always in 'starting' state": {
stateFunc: func(int) services.State { return services.Starting },
expectedStateFuncCalls: maxRetries,
expectedError: "frontend not running after 400ms: Starting",
},
"frontend transitions from 'new' to 'running' after retry": {
stateFunc: func(callCount int) services.State {
if callCount == 3 {
return services.Running
}

return services.New
},
expectedStateFuncCalls: 3,
},
"frontend in 'running' state": {
stateFunc: func(int) services.State { return services.Running },
expectedStateFuncCalls: 1,
},
"frontend in 'stopping' state": {
stateFunc: func(int) services.State { return services.Stopping },
expectedStateFuncCalls: 1,
expectedError: "frontend shutting down: Stopping",
},
"frontend in 'terminated' state": {
stateFunc: func(int) services.State { return services.Terminated },
expectedStateFuncCalls: 1,
expectedError: "frontend shutting down: Terminated",
},
"frontend in 'failed' state": {
stateFunc: func(int) services.State { return services.Failed },
expectedStateFuncCalls: 1,
expectedError: "frontend shutting down: Failed",
},
}

for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
downstreamCalled := false
successResponse := &PrometheusResponse{Status: statusSuccess}

downstream := HandlerFunc(func(ctx context.Context, r Request) (Response, error) {
downstreamCalled = true
return successResponse, nil
})

stateFuncCalls := 0
lastStateFuncCall := time.Time{}
stateFunc := func() services.State {
stateFuncCalls++
require.Greater(t, time.Since(lastStateFuncCall), backoffDuration)
lastStateFuncCall = time.Now()
return testCase.stateFunc(stateFuncCalls)
}

handler := newFrontendRunningMiddleware(stateFunc, backoffDuration, maxRetries, log.NewNopLogger()).Wrap(downstream)
startTime := time.Now()
resp, err := handler.Do(context.Background(), nil)
duration := time.Since(startTime)

if testCase.expectedError == "" {
require.NoError(t, err)
require.True(t, downstreamCalled, "expected downstream handler to be invoked")
require.Equal(t, successResponse, resp)
} else {
require.False(t, downstreamCalled, "expected downstream handler to not be invoked")
require.Equal(t, apierror.New(apierror.TypeUnavailable, testCase.expectedError), err)
require.Nil(t, resp)
}

require.Equal(t, testCase.expectedStateFuncCalls, stateFuncCalls)
require.Less(t, duration, time.Duration(testCase.expectedStateFuncCalls)*backoffDuration, "should only wait between calls to the state function, not before or after calls")
})
}
}

func TestFrontendRunningMiddleware_Disabled(t *testing.T) {
downstream := HandlerFunc(func(ctx context.Context, r Request) (Response, error) {
panic("should not be called")
})

stateFuncCalls := 0
stateFunc := func() services.State {
stateFuncCalls++
return services.New
}

handler := newFrontendRunningMiddleware(stateFunc, 0, 10, log.NewNopLogger()).Wrap(downstream)
resp, err := handler.Do(context.Background(), nil)

require.Equal(t, apierror.New(apierror.TypeUnavailable, "frontend not running: New"), err)
require.Nil(t, resp)
require.Equal(t, 1, stateFuncCalls)
}
Loading
Loading