diff --git a/pkg/queryfrontend/roundtrip.go b/pkg/queryfrontend/roundtrip.go index 5a417f584c..d2138385f0 100644 --- a/pkg/queryfrontend/roundtrip.go +++ b/pkg/queryfrontend/roundtrip.go @@ -70,15 +70,16 @@ func NewTripperware(config Config, reg prometheus.Registerer, logger log.Logger) if err != nil { return nil, err } - - queryInstantTripperware := newInstantQueryTripperware( + queryInstantTripperware, err := newInstantQueryTripperware( config.NumShards, queryRangeLimits, queryInstantCodec, prometheus.WrapRegistererWith(prometheus.Labels{"tripperware": "query_instant"}, reg), config.ForwardHeaders, ) - + if err != nil { + return nil, err + } return func(next http.RoundTripper) http.RoundTripper { return newRoundTripper(next, queryRangeTripperware(next), labelsTripperware(next), queryInstantTripperware(next), reg) }, nil @@ -190,9 +191,13 @@ func newQueryRangeTripperware( } if numShards > 0 { + analyzer, err := querysharding.NewQueryAnalyzer() + if err != nil { + return nil, errors.Wrap(err, "create query analyzer") + } queryRangeMiddleware = append( queryRangeMiddleware, - PromQLShardingMiddleware(querysharding.NewQueryAnalyzer(), numShards, limits, codec, reg), + PromQLShardingMiddleware(analyzer, numShards, limits, codec, reg), ) } @@ -327,14 +332,18 @@ func newInstantQueryTripperware( codec queryrange.Codec, reg prometheus.Registerer, forwardHeaders []string, -) queryrange.Tripperware { +) (queryrange.Tripperware, error) { instantQueryMiddlewares := []queryrange.Middleware{} m := queryrange.NewInstrumentMiddlewareMetrics(reg) if numShards > 0 { + analyzer, err := querysharding.NewQueryAnalyzer() + if err != nil { + return nil, errors.Wrap(err, "create query analyzer") + } instantQueryMiddlewares = append( instantQueryMiddlewares, queryrange.InstrumentMiddleware("sharding", m), - PromQLShardingMiddleware(querysharding.NewQueryAnalyzer(), numShards, limits, codec, reg), + PromQLShardingMiddleware(analyzer, numShards, limits, codec, reg), ) } @@ -343,7 +352,7 @@ func newInstantQueryTripperware( return queryrange.RoundTripFunc(func(r *http.Request) (*http.Response, error) { return rt.RoundTrip(r) }) - } + }, nil } // shouldCache controls what kind of Thanos request should be cached. diff --git a/pkg/queryfrontend/shard_query.go b/pkg/queryfrontend/shard_query.go index 2109182634..67f7cd56f5 100644 --- a/pkg/queryfrontend/shard_query.go +++ b/pkg/queryfrontend/shard_query.go @@ -19,7 +19,7 @@ import ( ) // PromQLShardingMiddleware creates a new Middleware that shards PromQL aggregations using grouping labels. -func PromQLShardingMiddleware(queryAnalyzer *querysharding.QueryAnalyzer, numShards int, limits queryrange.Limits, merger queryrange.Merger, registerer prometheus.Registerer) queryrange.Middleware { +func PromQLShardingMiddleware(queryAnalyzer querysharding.Analyzer, numShards int, limits queryrange.Limits, merger queryrange.Merger, registerer prometheus.Registerer) queryrange.Middleware { return queryrange.MiddlewareFunc(func(next queryrange.Handler) queryrange.Handler { queriesTotal := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ Namespace: "thanos", @@ -45,7 +45,7 @@ type querySharder struct { next queryrange.Handler limits queryrange.Limits - queryAnalyzer *querysharding.QueryAnalyzer + queryAnalyzer querysharding.Analyzer numShards int merger queryrange.Merger diff --git a/pkg/querysharding/analyzer.go b/pkg/querysharding/analyzer.go index c6cdead58b..f93a103fcf 100644 --- a/pkg/querysharding/analyzer.go +++ b/pkg/querysharding/analyzer.go @@ -6,21 +6,62 @@ package querysharding import ( "fmt" + lru "github.com/hashicorp/golang-lru" "github.com/prometheus/prometheus/promql/parser" ) // QueryAnalyzer is an analyzer which determines // whether a PromQL Query is shardable and using which labels. + +type Analyzer interface { + Analyze(string) (QueryAnalysis, error) +} + type QueryAnalyzer struct{} +type CachedQueryAnalyzer struct { + analyzer *QueryAnalyzer + cache *lru.Cache +} + var nonShardableFuncs = []string{ "label_join", "label_replace", } // NewQueryAnalyzer creates a new QueryAnalyzer. -func NewQueryAnalyzer() *QueryAnalyzer { - return &QueryAnalyzer{} +func NewQueryAnalyzer() (*CachedQueryAnalyzer, error) { + cache, err := lru.New(256) + if err != nil { + return nil, err + } + + return &CachedQueryAnalyzer{ + analyzer: &QueryAnalyzer{}, + cache: cache, + }, nil +} + +type cachedValue struct { + QueryAnalysis QueryAnalysis + err error +} + +func (a *CachedQueryAnalyzer) Analyze(query string) (QueryAnalysis, error) { + if a.cache.Contains(query) { + value, ok := a.cache.Get(query) + if ok { + return value.(cachedValue).QueryAnalysis, value.(cachedValue).err + } + } + + // Analyze if needed. + analysis, err := a.analyzer.Analyze(query) + + // Adding to cache. + _ = a.cache.Add(query, cachedValue{QueryAnalysis: analysis, err: err}) + + return analysis, err } // Analyze analyzes a query and returns a QueryAnalysis. diff --git a/pkg/querysharding/analyzer_test.go b/pkg/querysharding/analyzer_test.go index d4860bcec4..efd9dd5d94 100644 --- a/pkg/querysharding/analyzer_test.go +++ b/pkg/querysharding/analyzer_test.go @@ -11,6 +11,7 @@ import ( ) func TestAnalyzeQuery(t *testing.T) { + type testCase struct { name string expression string @@ -165,7 +166,8 @@ http_requests_total`, for _, test := range nonShardable { t.Run(test.name, func(t *testing.T) { - analyzer := NewQueryAnalyzer() + analyzer, err := NewQueryAnalyzer() + require.NoError(t, err) analysis, err := analyzer.Analyze(test.expression) require.NoError(t, err) require.False(t, analysis.IsShardable()) @@ -174,7 +176,8 @@ http_requests_total`, for _, test := range shardableByLabels { t.Run(test.name, func(t *testing.T) { - analyzer := NewQueryAnalyzer() + analyzer, err := NewQueryAnalyzer() + require.NoError(t, err) analysis, err := analyzer.Analyze(test.expression) require.NoError(t, err) require.True(t, analysis.IsShardable()) @@ -188,7 +191,8 @@ http_requests_total`, for _, test := range shardableWithoutLabels { t.Run(test.name, func(t *testing.T) { - analyzer := NewQueryAnalyzer() + analyzer, err := NewQueryAnalyzer() + require.NoError(t, err) analysis, err := analyzer.Analyze(test.expression) require.NoError(t, err) require.True(t, analysis.IsShardable())