From 4ebac9a19272108c670d7a9ebb3eda81ef96339f Mon Sep 17 00:00:00 2001 From: Rahul Kumar Jha Date: Sun, 2 Oct 2022 10:27:58 +0530 Subject: [PATCH 01/13] created lru cache in querysharding pkg Signed-off-by: Rahul Kumar Jha --- pkg/querysharding/analyzer.go | 93 +++++++- pkg/querysharding/analyzer_test.go | 358 ++++++++++++++++++++++++++++- 2 files changed, 435 insertions(+), 16 deletions(-) diff --git a/pkg/querysharding/analyzer.go b/pkg/querysharding/analyzer.go index 352893367d..1cb610a386 100644 --- a/pkg/querysharding/analyzer.go +++ b/pkg/querysharding/analyzer.go @@ -5,7 +5,10 @@ package querysharding import ( "fmt" + "os" + "github.com/go-kit/log" + lru "github.com/hashicorp/golang-lru" "github.com/prometheus/prometheus/promql/parser" ) @@ -13,6 +16,9 @@ import ( // whether a PromQL Query is shardable and using which labels. type QueryAnalyzer struct{} +var LRU *lru.Cache +var logger log.Logger + var nonShardableFuncs = []string{ "label_join", "label_replace", @@ -20,26 +26,71 @@ var nonShardableFuncs = []string{ // NewQueryAnalyzer creates a new QueryAnalyzer. func NewQueryAnalyzer() *QueryAnalyzer { + inIT() // Initializing cache return &QueryAnalyzer{} } +func inIT() { + lruCache, err := lru.New(256) + + if err != nil { + logger.Log("Error: ", err) + } + + LRU = lruCache + + logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) + logger = log.With(logger, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller) +} + +type cachedValue struct { + QueryAnalysis QueryAnalysis + err error +} + // Analyze analyzes a query and returns a QueryAnalysis. // Analyze uses the following algorithm: -// * if a query has subqueries, such as label_join or label_replace, -// or has functions which cannot be sharded, then treat the query as non shardable. -// * if the query's root expression has grouping labels, -// then treat the query as shardable by those labels. -// * if the query's root expression has no grouping labels, -// then walk the query and find the least common labelset -// used in grouping expressions. If non-empty, treat the query -// as shardable by those labels. -// * otherwise, treat the query as non-shardable. +// - if a query has subqueries, such as label_join or label_replace, +// or has functions which cannot be sharded, then treat the query as non shardable. +// - if the query's root expression has grouping labels, +// then treat the query as shardable by those labels. +// - if the query's root expression has no grouping labels, +// then walk the query and find the least common labelset +// used in grouping expressions. If non-empty, treat the query +// as shardable by those labels. +// - otherwise, treat the query as non-shardable. +// // The le label is excluded from sharding. func (a *QueryAnalyzer) Analyze(query string) (QueryAnalysis, error) { + nonShardableQuery := nonShardableQuery() + + if LRU.Len() == 0 { + inIT() + } + + var queryResult cachedValue + + if LRU.Len() != 0 && LRU.Contains(query) { + value, ok := LRU.Get(query) + + if !ok { + logger.Log("FetchingError=", query) + } else { + queryResult = value.(cachedValue) + return queryResult.QueryAnalysis, queryResult.err + } + } + expr, err := parser.ParseExpr(query) if err != nil { - return nonShardableQuery(), err + if ok := LRU.Add(query, cachedValue{ + QueryAnalysis: nonShardableQuery, + err: err, + }); ok { + logger.Log("parsingError=", query) + } + return nonShardableQuery, err } isShardable := true @@ -71,14 +122,34 @@ func (a *QueryAnalyzer) Analyze(query string) (QueryAnalysis, error) { }) if !isShardable { - return nonShardableQuery(), nil + + if ok := LRU.Add(query, cachedValue{ + QueryAnalysis: nonShardableQuery, + err: nil, + }); ok { + logger.Log("cachingError=", query) + } + return nonShardableQuery, nil } rootAnalysis := analyzeRootExpression(expr) if rootAnalysis.IsShardable() && rootAnalysis.shardBy { + if ok := LRU.Add(query, cachedValue{ + QueryAnalysis: rootAnalysis, + err: nil, + }); ok { + logger.Log("cachingError=", query) + } return rootAnalysis, nil } + if ok := LRU.Add(query, cachedValue{ + QueryAnalysis: analysis, + err: nil, + }); ok { + logger.Log("cachingError=", query) + } + return analysis, nil } diff --git a/pkg/querysharding/analyzer_test.go b/pkg/querysharding/analyzer_test.go index d4860bcec4..1c4ae96e38 100644 --- a/pkg/querysharding/analyzer_test.go +++ b/pkg/querysharding/analyzer_test.go @@ -4,18 +4,20 @@ package querysharding import ( + "fmt" "sort" "testing" "github.com/stretchr/testify/require" ) +type testCase struct { + name string + expression string + shardingLabels []string +} + func TestAnalyzeQuery(t *testing.T) { - type testCase struct { - name string - expression string - shardingLabels []string - } nonShardable := []testCase{ { @@ -200,3 +202,349 @@ http_requests_total`, }) } } + +var value = []testCase{ + { + name: "aggregation", + expression: "sum(http_requests_total)", + }, + { + name: "aggregation", + expression: "sum(http_requests_total)", + }, + { + name: "aggregation", + expression: "sum(http_requests_total)", + }, + { + name: "aggregation", + expression: "sum(http_requests_total)", + }, + { + name: "aggregation", + expression: "sum(http_requests_total)", + }, + { + name: "aggregation", + expression: "sum(http_requests_total)", + }, + { + name: "outer aggregation with no grouping", + expression: "count(sum by (pod) (http_requests_total))", + }, + { + name: "outer aggregation with without grouping", + expression: "count(sum without (pod) (http_requests_total))", + }, + { + name: "aggregate expression with subquery", + expression: `sum by (pod) (label_replace(metric, "dst_label", "$1", "src_label", "re"))`, + }, + { + name: "aggregation", + expression: "sum(http_requests_total)", + }, + { + name: "aggregation", + expression: "sum(http_requests_total)", + }, + { + name: "aggregation", + expression: "sum(http_requests_total)", + }, + { + name: "aggregation", + expression: "sum(http_requests_total)", + }, + { + name: "aggregate without expression with subquery", + expression: `sum without (pod) (label_replace(metric, "dst_label", "$1", "src_label", "re"))`, + }, + { + name: "binary expression", + expression: `http_requests_total{code="400"} / http_requests_total`, + }, + { + name: "binary aggregation with different grouping labels", + expression: `sum by (pod) (http_requests_total{code="400"}) / sum by (cluster) (http_requests_total)`, + }, + { + name: "outer aggregation with without grouping", + expression: "count(sum without (pod) (http_requests_total))", + }, + { + name: "outer aggregation with without grouping", + expression: "count(sum without (pod) (http_requests_total))", + }, + { + name: "outer aggregation with without grouping", + expression: "count(sum without (pod) (http_requests_total))", + }, + { + name: "binary expression with constant", + expression: `http_requests_total{code="400"} / 4`, + }, + { + name: "binary expression with empty vector matching", + expression: `http_requests_total{code="400"} / on () http_requests_total`, + }, + { + name: "outer aggregation with without grouping", + expression: "count(sum without (pod) (http_requests_total))", + }, + { + name: "outer aggregation with without grouping", + expression: "count(sum without (pod) (http_requests_total))", + }, + { + name: "binary expression with vector matching and subquery", + expression: `http_requests_total{code="400"} / on (pod) label_replace(metric, "dst_label", "$1", "src_label", "re")`, + }, + { + name: "multiple binary expressions", + expression: `(http_requests_total{code="400"} + http_requests_total{code="500"}) / http_requests_total`, + }, + { + name: "aggregation without grouping", + expression: "sum without (pod) (http_requests_total)", + shardingLabels: []string{"pod"}, + }, + { + name: "multiple binary expressions with empty vector matchers", + expression: ` +(http_requests_total{code="400"} + on (cluster, pod) http_requests_total{code="500"}) +/ on () +http_requests_total`, + }, + { + name: "outer aggregation with without grouping", + expression: "count(sum without (pod) (http_requests_total))", + }, + { + name: "binary expression with vector matching and subquery", + expression: `http_requests_total{code="400"} / on (pod) label_replace(metric, "dst_label", "$1", "src_label", "re")`, + }, + { + name: "multiple binary expressions with empty vector matchers", + expression: ` +(http_requests_total{code="400"} + on (cluster, pod) http_requests_total{code="500"}) +/ on () +http_requests_total`, + }, + { + name: "multiple binary expressions with empty vector matchers", + expression: ` +(http_requests_total{code="400"} + on (cluster, pod) http_requests_total{code="500"}) +/ on () +http_requests_total`, + }, + { + name: "multiple binary expressions with empty vector matchers", + expression: ` +(http_requests_total{code="400"} + on (cluster, pod) http_requests_total{code="500"}) +/ on () +http_requests_total`, + }, + { + name: "multiple binary expressions with empty vector matchers", + expression: ` +(http_requests_total{code="400"} + on (cluster, pod) http_requests_total{code="500"}) +/ on () +http_requests_total`, + }, + { + name: "multiple binary expressions with empty vector matchers", + expression: ` +(http_requests_total{code="400"} + on (cluster, pod) http_requests_total{code="500"}) +/ on () +http_requests_total`, + }, + { + name: "multiple binary expressions with empty vector matchers", + expression: ` +(http_requests_total{code="400"} + on (cluster, pod) http_requests_total{code="500"}) +/ on () +http_requests_total`, + }, + { + name: "multiple binary expressions with empty vector matchers", + expression: ` +(http_requests_total{code="400"} + on (cluster, pod) http_requests_total{code="500"}) +/ on () +http_requests_total`, + }, + { + name: "outer aggregation with without grouping", + expression: "count(sum without (pod) (http_requests_total))", + }, + { + name: "binary expression with vector matching and subquery", + expression: `http_requests_total{code="400"} / on (pod) label_replace(metric, "dst_label", "$1", "src_label", "re")`, + }, + { + name: "multiple binary expressions with empty vector matchers", + expression: ` +(http_requests_total{code="400"} + on (cluster, pod) http_requests_total{code="500"}) +/ on () +http_requests_total`, + }, + { + name: "multiple binary expressions with empty vector matchers", + expression: ` +(http_requests_total{code="400"} + on (cluster, pod) http_requests_total{code="500"}) +/ on () +http_requests_total`, + }, + { + name: "aggregation without grouping", + expression: "sum without (pod) (http_requests_total)", + shardingLabels: []string{"pod"}, + }, + { + name: "multiple aggregations with without grouping", + expression: "max without (pod) (sum without (pod, cluster) (http_requests_total))", + shardingLabels: []string{"pod", "cluster"}, + }, + { + name: "binary expression with without vector matching and grouping", + expression: `sum without (cluster, pod) (http_requests_total{code="400"}) / ignoring (pod) sum without (cluster, pod) (http_requests_total)`, + shardingLabels: []string{"pod", "cluster"}, + }, + { + name: "multiple binary expressions with without grouping", + expression: `(http_requests_total{code="400"} + ignoring (pod) http_requests_total{code="500"}) / ignoring (cluster, pod) http_requests_total`, + shardingLabels: []string{"cluster", "pod"}, + }, + { + name: "multiple binary expressions with without vector matchers", + expression: ` +(http_requests_total{code="400"} + ignoring (cluster, pod) http_requests_total{code="500"}) +/ ignoring (pod) +http_requests_total`, + shardingLabels: []string{"cluster", "pod"}, + }, + { + name: "aggregation with grouping", + expression: "sum by (pod) (http_requests_total)", + shardingLabels: []string{"pod"}, + }, + { + name: "multiple aggregations with grouping", + expression: "max by (pod) (sum by (pod, cluster) (http_requests_total))", + shardingLabels: []string{"pod"}, + }, + { + name: "binary expression with vector matching", + expression: `http_requests_total{code="400"} / on (pod) http_requests_total`, + shardingLabels: []string{"pod"}, + }, + { + name: "binary aggregation with same grouping labels", + expression: `sum by (pod) (http_requests_total{code="400"}) / sum by (pod) (http_requests_total)`, + shardingLabels: []string{"pod"}, + }, + { + name: "binary expression with vector matching and grouping", + expression: `sum by (cluster, pod) (http_requests_total{code="400"}) / on (pod) sum by (cluster, pod) (http_requests_total)`, + shardingLabels: []string{"pod"}, + }, + { + name: "multiple binary expressions with vector matchers", + expression: ` +(http_requests_total{code="400"} + on (cluster, pod) http_requests_total{code="500"}) +/ on (pod) +http_requests_total`, + shardingLabels: []string{"pod"}, + }, + { + name: "multiple binary expressions with grouping", + expression: ` +sum by (container) ( +(http_requests_total{code="400"} + on (cluster, pod, container) http_requests_total{code="500"}) +/ on (pod, container) +http_requests_total +)`, + shardingLabels: []string{"container"}, + }, + { + name: "multiple binary expressions with grouping", + expression: `(http_requests_total{code="400"} + on (pod) http_requests_total{code="500"}) / on (cluster, pod) http_requests_total`, + shardingLabels: []string{"cluster", "pod"}, + }, + { + name: "histogram quantile", + expression: "histogram_quantile(0.95, sum(rate(metric[1m])) by (le, cluster))", + shardingLabels: []string{"cluster"}, + }, + { + name: "multiple binary expressions with grouping", + expression: ` +sum by (container) ( +(http_requests_total{code="400"} + on (cluster, pod, container) http_requests_total{code="500"}) +/ on (pod, container) +http_requests_total +)`, + shardingLabels: []string{"container"}, + }, + { + name: "multiple binary expressions with grouping", + expression: `(http_requests_total{code="400"} + on (pod) http_requests_total{code="500"}) / on (cluster, pod) http_requests_total`, + shardingLabels: []string{"cluster", "pod"}, + }, + { + name: "histogram quantile", + expression: "histogram_quantile(0.95, sum(rate(metric[1m])) by (le, cluster))", + shardingLabels: []string{"cluster"}, + }, + { + name: "multiple binary expressions with grouping", + expression: ` +sum by (container) ( +(http_requests_total{code="400"} + on (cluster, pod, container) http_requests_total{code="500"}) +/ on (pod, container) +http_requests_total +)`, + shardingLabels: []string{"container"}, + }, + { + name: "multiple binary expressions with grouping", + expression: `(http_requests_total{code="400"} + on (pod) http_requests_total{code="500"}) / on (cluster, pod) http_requests_total`, + shardingLabels: []string{"cluster", "pod"}, + }, + { + name: "histogram quantile", + expression: "histogram_quantile(0.95, sum(rate(metric[1m])) by (le, cluster))", + shardingLabels: []string{"cluster"}, + }, + { + name: "multiple binary expressions with grouping", + expression: ` +sum by (container) ( +(http_requests_total{code="400"} + on (cluster, pod, container) http_requests_total{code="500"}) +/ on (pod, container) +http_requests_total +)`, + shardingLabels: []string{"container"}, + }, + { + name: "multiple binary expressions with grouping", + expression: `(http_requests_total{code="400"} + on (pod) http_requests_total{code="500"}) / on (cluster, pod) http_requests_total`, + shardingLabels: []string{"cluster", "pod"}, + }, + { + name: "histogram quantile", + expression: "histogram_quantile(0.95, sum(rate(metric[1m])) by (le, cluster))", + shardingLabels: []string{"cluster"}, + }, +} + +func BenchmarkTestAnalyzeQuery(b *testing.B) { + analyze := NewQueryAnalyzer() + for _, v := range value { + b.Run(fmt.Sprintf("query: %s", v.name), func(b *testing.B) { + for i := 0; i < b.N; i++ { + analyze.Analyze(v.expression) + } + }) + } +} From c52ed6de103e8f2ded39e0ab7d29f8f441b70143 Mon Sep 17 00:00:00 2001 From: Rahul Kumar Jha Date: Sun, 2 Oct 2022 12:30:32 +0530 Subject: [PATCH 02/13] triggering linting & vetting Signed-off-by: Rahul Kumar Jha --- pkg/querysharding/analyzer_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/querysharding/analyzer_test.go b/pkg/querysharding/analyzer_test.go index 1c4ae96e38..f68bf30701 100644 --- a/pkg/querysharding/analyzer_test.go +++ b/pkg/querysharding/analyzer_test.go @@ -543,7 +543,11 @@ func BenchmarkTestAnalyzeQuery(b *testing.B) { for _, v := range value { b.Run(fmt.Sprintf("query: %s", v.name), func(b *testing.B) { for i := 0; i < b.N; i++ { - analyze.Analyze(v.expression) + _, err := analyze.Analyze(v.expression) + + if err != nil { + fmt.Printf("Error while benchmarking =%s ", err) + } } }) } From 8a61aa806f08b2a4af03d44b58a4bc2ffd78cb0e Mon Sep 17 00:00:00 2001 From: Rahul Kumar Jha Date: Sun, 2 Oct 2022 22:34:56 +0530 Subject: [PATCH 03/13] added cache in querysharding pkg Signed-off-by: Rahul Kumar Jha --- pkg/querysharding/analyzer.go | 111 +++------ pkg/querysharding/analyzer_test.go | 363 +---------------------------- 2 files changed, 44 insertions(+), 430 deletions(-) diff --git a/pkg/querysharding/analyzer.go b/pkg/querysharding/analyzer.go index 1cb610a386..3f7a08d644 100644 --- a/pkg/querysharding/analyzer.go +++ b/pkg/querysharding/analyzer.go @@ -5,19 +5,18 @@ package querysharding import ( "fmt" - "os" - "github.com/go-kit/log" lru "github.com/hashicorp/golang-lru" "github.com/prometheus/prometheus/promql/parser" + logging "github.com/thanos-io/thanos/pkg/logging" ) // QueryAnalyzer is an analyzer which determines // whether a PromQL Query is shardable and using which labels. -type QueryAnalyzer struct{} -var LRU *lru.Cache -var logger log.Logger +type QueryAnalyzer struct { + cache *lru.Cache +} var nonShardableFuncs = []string{ "label_join", @@ -26,21 +25,16 @@ var nonShardableFuncs = []string{ // NewQueryAnalyzer creates a new QueryAnalyzer. func NewQueryAnalyzer() *QueryAnalyzer { - inIT() // Initializing cache - return &QueryAnalyzer{} -} - -func inIT() { - lruCache, err := lru.New(256) + cache, err := lru.New(256) + logger := logging.NewLogger("warn", "", "") if err != nil { - logger.Log("Error: ", err) - } - - LRU = lruCache + logger.Log("Error Creating LRU Cache: ", err) - logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) - logger = log.With(logger, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller) + } + return &QueryAnalyzer{ + cache: cache, + } } type cachedValue struct { @@ -48,49 +42,40 @@ type cachedValue struct { err error } -// Analyze analyzes a query and returns a QueryAnalysis. - -// Analyze uses the following algorithm: -// - if a query has subqueries, such as label_join or label_replace, -// or has functions which cannot be sharded, then treat the query as non shardable. -// - if the query's root expression has grouping labels, -// then treat the query as shardable by those labels. -// - if the query's root expression has no grouping labels, -// then walk the query and find the least common labelset -// used in grouping expressions. If non-empty, treat the query -// as shardable by those labels. -// - otherwise, treat the query as non-shardable. -// -// The le label is excluded from sharding. func (a *QueryAnalyzer) Analyze(query string) (QueryAnalysis, error) { - nonShardableQuery := nonShardableQuery() - - if LRU.Len() == 0 { - inIT() + if a.cache.Contains(query) { + value, _ := a.cache.Get(query) + return value.(cachedValue).QueryAnalysis, value.(cachedValue).err } - var queryResult cachedValue + // Analyze if needed - if LRU.Len() != 0 && LRU.Contains(query) { - value, ok := LRU.Get(query) + analysis, err := a.queryAnalyzer(query) - if !ok { - logger.Log("FetchingError=", query) - } else { - queryResult = value.(cachedValue) - return queryResult.QueryAnalysis, queryResult.err - } - } + _ = a.cache.Add(query, cachedValue{QueryAnalysis: analysis, err: err}) + + return analysis, err +} + +// queryAnalyzer analyzes a query and returns a QueryAnalysis. + +// queryAnalyzer uses the following algorithm: +// * if a query has subqueries, such as label_join or label_replace, +// or has functions which cannot be sharded, then treat the query as non shardable. +// * if the query's root expression has grouping labels, +// then treat the query as shardable by those labels. +// * if the query's root expression has no grouping labels, +// then walk the query and find the least common labelset +// used in grouping expressions. If non-empty, treat the query +// as shardable by those labels. +// * otherwise, treat the query as non-shardable. +// The le label is excluded from sharding. + +func (a *QueryAnalyzer) queryAnalyzer(query string) (QueryAnalysis, error) { expr, err := parser.ParseExpr(query) if err != nil { - if ok := LRU.Add(query, cachedValue{ - QueryAnalysis: nonShardableQuery, - err: err, - }); ok { - logger.Log("parsingError=", query) - } - return nonShardableQuery, err + return nonShardableQuery(), err } isShardable := true @@ -122,34 +107,14 @@ func (a *QueryAnalyzer) Analyze(query string) (QueryAnalysis, error) { }) if !isShardable { - - if ok := LRU.Add(query, cachedValue{ - QueryAnalysis: nonShardableQuery, - err: nil, - }); ok { - logger.Log("cachingError=", query) - } - return nonShardableQuery, nil + return nonShardableQuery(), nil } rootAnalysis := analyzeRootExpression(expr) if rootAnalysis.IsShardable() && rootAnalysis.shardBy { - if ok := LRU.Add(query, cachedValue{ - QueryAnalysis: rootAnalysis, - err: nil, - }); ok { - logger.Log("cachingError=", query) - } return rootAnalysis, nil } - if ok := LRU.Add(query, cachedValue{ - QueryAnalysis: analysis, - err: nil, - }); ok { - logger.Log("cachingError=", query) - } - return analysis, nil } diff --git a/pkg/querysharding/analyzer_test.go b/pkg/querysharding/analyzer_test.go index f68bf30701..acb365ae93 100644 --- a/pkg/querysharding/analyzer_test.go +++ b/pkg/querysharding/analyzer_test.go @@ -4,21 +4,20 @@ package querysharding import ( - "fmt" "sort" "testing" "github.com/stretchr/testify/require" ) -type testCase struct { - name string - expression string - shardingLabels []string -} - func TestAnalyzeQuery(t *testing.T) { + type testCase struct { + name string + expression string + shardingLabels []string + } + nonShardable := []testCase{ { name: "aggregation", @@ -202,353 +201,3 @@ http_requests_total`, }) } } - -var value = []testCase{ - { - name: "aggregation", - expression: "sum(http_requests_total)", - }, - { - name: "aggregation", - expression: "sum(http_requests_total)", - }, - { - name: "aggregation", - expression: "sum(http_requests_total)", - }, - { - name: "aggregation", - expression: "sum(http_requests_total)", - }, - { - name: "aggregation", - expression: "sum(http_requests_total)", - }, - { - name: "aggregation", - expression: "sum(http_requests_total)", - }, - { - name: "outer aggregation with no grouping", - expression: "count(sum by (pod) (http_requests_total))", - }, - { - name: "outer aggregation with without grouping", - expression: "count(sum without (pod) (http_requests_total))", - }, - { - name: "aggregate expression with subquery", - expression: `sum by (pod) (label_replace(metric, "dst_label", "$1", "src_label", "re"))`, - }, - { - name: "aggregation", - expression: "sum(http_requests_total)", - }, - { - name: "aggregation", - expression: "sum(http_requests_total)", - }, - { - name: "aggregation", - expression: "sum(http_requests_total)", - }, - { - name: "aggregation", - expression: "sum(http_requests_total)", - }, - { - name: "aggregate without expression with subquery", - expression: `sum without (pod) (label_replace(metric, "dst_label", "$1", "src_label", "re"))`, - }, - { - name: "binary expression", - expression: `http_requests_total{code="400"} / http_requests_total`, - }, - { - name: "binary aggregation with different grouping labels", - expression: `sum by (pod) (http_requests_total{code="400"}) / sum by (cluster) (http_requests_total)`, - }, - { - name: "outer aggregation with without grouping", - expression: "count(sum without (pod) (http_requests_total))", - }, - { - name: "outer aggregation with without grouping", - expression: "count(sum without (pod) (http_requests_total))", - }, - { - name: "outer aggregation with without grouping", - expression: "count(sum without (pod) (http_requests_total))", - }, - { - name: "binary expression with constant", - expression: `http_requests_total{code="400"} / 4`, - }, - { - name: "binary expression with empty vector matching", - expression: `http_requests_total{code="400"} / on () http_requests_total`, - }, - { - name: "outer aggregation with without grouping", - expression: "count(sum without (pod) (http_requests_total))", - }, - { - name: "outer aggregation with without grouping", - expression: "count(sum without (pod) (http_requests_total))", - }, - { - name: "binary expression with vector matching and subquery", - expression: `http_requests_total{code="400"} / on (pod) label_replace(metric, "dst_label", "$1", "src_label", "re")`, - }, - { - name: "multiple binary expressions", - expression: `(http_requests_total{code="400"} + http_requests_total{code="500"}) / http_requests_total`, - }, - { - name: "aggregation without grouping", - expression: "sum without (pod) (http_requests_total)", - shardingLabels: []string{"pod"}, - }, - { - name: "multiple binary expressions with empty vector matchers", - expression: ` -(http_requests_total{code="400"} + on (cluster, pod) http_requests_total{code="500"}) -/ on () -http_requests_total`, - }, - { - name: "outer aggregation with without grouping", - expression: "count(sum without (pod) (http_requests_total))", - }, - { - name: "binary expression with vector matching and subquery", - expression: `http_requests_total{code="400"} / on (pod) label_replace(metric, "dst_label", "$1", "src_label", "re")`, - }, - { - name: "multiple binary expressions with empty vector matchers", - expression: ` -(http_requests_total{code="400"} + on (cluster, pod) http_requests_total{code="500"}) -/ on () -http_requests_total`, - }, - { - name: "multiple binary expressions with empty vector matchers", - expression: ` -(http_requests_total{code="400"} + on (cluster, pod) http_requests_total{code="500"}) -/ on () -http_requests_total`, - }, - { - name: "multiple binary expressions with empty vector matchers", - expression: ` -(http_requests_total{code="400"} + on (cluster, pod) http_requests_total{code="500"}) -/ on () -http_requests_total`, - }, - { - name: "multiple binary expressions with empty vector matchers", - expression: ` -(http_requests_total{code="400"} + on (cluster, pod) http_requests_total{code="500"}) -/ on () -http_requests_total`, - }, - { - name: "multiple binary expressions with empty vector matchers", - expression: ` -(http_requests_total{code="400"} + on (cluster, pod) http_requests_total{code="500"}) -/ on () -http_requests_total`, - }, - { - name: "multiple binary expressions with empty vector matchers", - expression: ` -(http_requests_total{code="400"} + on (cluster, pod) http_requests_total{code="500"}) -/ on () -http_requests_total`, - }, - { - name: "multiple binary expressions with empty vector matchers", - expression: ` -(http_requests_total{code="400"} + on (cluster, pod) http_requests_total{code="500"}) -/ on () -http_requests_total`, - }, - { - name: "outer aggregation with without grouping", - expression: "count(sum without (pod) (http_requests_total))", - }, - { - name: "binary expression with vector matching and subquery", - expression: `http_requests_total{code="400"} / on (pod) label_replace(metric, "dst_label", "$1", "src_label", "re")`, - }, - { - name: "multiple binary expressions with empty vector matchers", - expression: ` -(http_requests_total{code="400"} + on (cluster, pod) http_requests_total{code="500"}) -/ on () -http_requests_total`, - }, - { - name: "multiple binary expressions with empty vector matchers", - expression: ` -(http_requests_total{code="400"} + on (cluster, pod) http_requests_total{code="500"}) -/ on () -http_requests_total`, - }, - { - name: "aggregation without grouping", - expression: "sum without (pod) (http_requests_total)", - shardingLabels: []string{"pod"}, - }, - { - name: "multiple aggregations with without grouping", - expression: "max without (pod) (sum without (pod, cluster) (http_requests_total))", - shardingLabels: []string{"pod", "cluster"}, - }, - { - name: "binary expression with without vector matching and grouping", - expression: `sum without (cluster, pod) (http_requests_total{code="400"}) / ignoring (pod) sum without (cluster, pod) (http_requests_total)`, - shardingLabels: []string{"pod", "cluster"}, - }, - { - name: "multiple binary expressions with without grouping", - expression: `(http_requests_total{code="400"} + ignoring (pod) http_requests_total{code="500"}) / ignoring (cluster, pod) http_requests_total`, - shardingLabels: []string{"cluster", "pod"}, - }, - { - name: "multiple binary expressions with without vector matchers", - expression: ` -(http_requests_total{code="400"} + ignoring (cluster, pod) http_requests_total{code="500"}) -/ ignoring (pod) -http_requests_total`, - shardingLabels: []string{"cluster", "pod"}, - }, - { - name: "aggregation with grouping", - expression: "sum by (pod) (http_requests_total)", - shardingLabels: []string{"pod"}, - }, - { - name: "multiple aggregations with grouping", - expression: "max by (pod) (sum by (pod, cluster) (http_requests_total))", - shardingLabels: []string{"pod"}, - }, - { - name: "binary expression with vector matching", - expression: `http_requests_total{code="400"} / on (pod) http_requests_total`, - shardingLabels: []string{"pod"}, - }, - { - name: "binary aggregation with same grouping labels", - expression: `sum by (pod) (http_requests_total{code="400"}) / sum by (pod) (http_requests_total)`, - shardingLabels: []string{"pod"}, - }, - { - name: "binary expression with vector matching and grouping", - expression: `sum by (cluster, pod) (http_requests_total{code="400"}) / on (pod) sum by (cluster, pod) (http_requests_total)`, - shardingLabels: []string{"pod"}, - }, - { - name: "multiple binary expressions with vector matchers", - expression: ` -(http_requests_total{code="400"} + on (cluster, pod) http_requests_total{code="500"}) -/ on (pod) -http_requests_total`, - shardingLabels: []string{"pod"}, - }, - { - name: "multiple binary expressions with grouping", - expression: ` -sum by (container) ( -(http_requests_total{code="400"} + on (cluster, pod, container) http_requests_total{code="500"}) -/ on (pod, container) -http_requests_total -)`, - shardingLabels: []string{"container"}, - }, - { - name: "multiple binary expressions with grouping", - expression: `(http_requests_total{code="400"} + on (pod) http_requests_total{code="500"}) / on (cluster, pod) http_requests_total`, - shardingLabels: []string{"cluster", "pod"}, - }, - { - name: "histogram quantile", - expression: "histogram_quantile(0.95, sum(rate(metric[1m])) by (le, cluster))", - shardingLabels: []string{"cluster"}, - }, - { - name: "multiple binary expressions with grouping", - expression: ` -sum by (container) ( -(http_requests_total{code="400"} + on (cluster, pod, container) http_requests_total{code="500"}) -/ on (pod, container) -http_requests_total -)`, - shardingLabels: []string{"container"}, - }, - { - name: "multiple binary expressions with grouping", - expression: `(http_requests_total{code="400"} + on (pod) http_requests_total{code="500"}) / on (cluster, pod) http_requests_total`, - shardingLabels: []string{"cluster", "pod"}, - }, - { - name: "histogram quantile", - expression: "histogram_quantile(0.95, sum(rate(metric[1m])) by (le, cluster))", - shardingLabels: []string{"cluster"}, - }, - { - name: "multiple binary expressions with grouping", - expression: ` -sum by (container) ( -(http_requests_total{code="400"} + on (cluster, pod, container) http_requests_total{code="500"}) -/ on (pod, container) -http_requests_total -)`, - shardingLabels: []string{"container"}, - }, - { - name: "multiple binary expressions with grouping", - expression: `(http_requests_total{code="400"} + on (pod) http_requests_total{code="500"}) / on (cluster, pod) http_requests_total`, - shardingLabels: []string{"cluster", "pod"}, - }, - { - name: "histogram quantile", - expression: "histogram_quantile(0.95, sum(rate(metric[1m])) by (le, cluster))", - shardingLabels: []string{"cluster"}, - }, - { - name: "multiple binary expressions with grouping", - expression: ` -sum by (container) ( -(http_requests_total{code="400"} + on (cluster, pod, container) http_requests_total{code="500"}) -/ on (pod, container) -http_requests_total -)`, - shardingLabels: []string{"container"}, - }, - { - name: "multiple binary expressions with grouping", - expression: `(http_requests_total{code="400"} + on (pod) http_requests_total{code="500"}) / on (cluster, pod) http_requests_total`, - shardingLabels: []string{"cluster", "pod"}, - }, - { - name: "histogram quantile", - expression: "histogram_quantile(0.95, sum(rate(metric[1m])) by (le, cluster))", - shardingLabels: []string{"cluster"}, - }, -} - -func BenchmarkTestAnalyzeQuery(b *testing.B) { - analyze := NewQueryAnalyzer() - for _, v := range value { - b.Run(fmt.Sprintf("query: %s", v.name), func(b *testing.B) { - for i := 0; i < b.N; i++ { - _, err := analyze.Analyze(v.expression) - - if err != nil { - fmt.Printf("Error while benchmarking =%s ", err) - } - } - }) - } -} From 1a1703cd07a44c9c6752f990161b11e3e18a7cbf Mon Sep 17 00:00:00 2001 From: Rahul Kumar Jha Date: Mon, 3 Oct 2022 00:14:43 +0530 Subject: [PATCH 04/13] created CacheQueryAnalyzer in querysharding pkg Signed-off-by: Rahul Kumar Jha --- pkg/querysharding/analyzer.go | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/pkg/querysharding/analyzer.go b/pkg/querysharding/analyzer.go index 3f7a08d644..df0e948926 100644 --- a/pkg/querysharding/analyzer.go +++ b/pkg/querysharding/analyzer.go @@ -8,14 +8,21 @@ import ( lru "github.com/hashicorp/golang-lru" "github.com/prometheus/prometheus/promql/parser" - logging "github.com/thanos-io/thanos/pkg/logging" ) // QueryAnalyzer is an analyzer which determines // whether a PromQL Query is shardable and using which labels. +type Analyze interface { + Analyze(string) (QueryAnalysis, error) +} + type QueryAnalyzer struct { - cache *lru.Cache +} + +type CachedQueryAnalyzer struct { + analyzer *QueryAnalyzer + cache *lru.Cache } var nonShardableFuncs = []string{ @@ -24,16 +31,12 @@ var nonShardableFuncs = []string{ } // NewQueryAnalyzer creates a new QueryAnalyzer. -func NewQueryAnalyzer() *QueryAnalyzer { - cache, err := lru.New(256) - logger := logging.NewLogger("warn", "", "") - - if err != nil { - logger.Log("Error Creating LRU Cache: ", err) +func NewQueryAnalyzer() *CachedQueryAnalyzer { + cache, _ := lru.New(256) - } - return &QueryAnalyzer{ - cache: cache, + return &CachedQueryAnalyzer{ + analyzer: &QueryAnalyzer{}, + cache: cache, } } @@ -42,16 +45,16 @@ type cachedValue struct { err error } -func (a *QueryAnalyzer) Analyze(query string) (QueryAnalysis, error) { +func (a *CachedQueryAnalyzer) Analyze(query string) (QueryAnalysis, error) { if a.cache.Contains(query) { value, _ := a.cache.Get(query) return value.(cachedValue).QueryAnalysis, value.(cachedValue).err } // Analyze if needed + analysis, err := a.analyzer.Analyze(query) - analysis, err := a.queryAnalyzer(query) - + // Adding to cache _ = a.cache.Add(query, cachedValue{QueryAnalysis: analysis, err: err}) return analysis, err @@ -71,7 +74,7 @@ func (a *QueryAnalyzer) Analyze(query string) (QueryAnalysis, error) { // * otherwise, treat the query as non-shardable. // The le label is excluded from sharding. -func (a *QueryAnalyzer) queryAnalyzer(query string) (QueryAnalysis, error) { +func (a *QueryAnalyzer) Analyze(query string) (QueryAnalysis, error) { expr, err := parser.ParseExpr(query) if err != nil { From c2a02418947ad7ffeed669ccc000fa4983752dbc Mon Sep 17 00:00:00 2001 From: Rahul Kumar Jha Date: Mon, 3 Oct 2022 00:57:50 +0530 Subject: [PATCH 05/13] changed type of queryAnalyzer in queryfrontend pkg Signed-off-by: Rahul Kumar Jha --- pkg/queryfrontend/shard_query.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/queryfrontend/shard_query.go b/pkg/queryfrontend/shard_query.go index 2109182634..faffd73783 100644 --- a/pkg/queryfrontend/shard_query.go +++ b/pkg/queryfrontend/shard_query.go @@ -19,7 +19,7 @@ import ( ) // PromQLShardingMiddleware creates a new Middleware that shards PromQL aggregations using grouping labels. -func PromQLShardingMiddleware(queryAnalyzer *querysharding.QueryAnalyzer, numShards int, limits queryrange.Limits, merger queryrange.Merger, registerer prometheus.Registerer) queryrange.Middleware { +func PromQLShardingMiddleware(queryAnalyzer querysharding.Analyze, numShards int, limits queryrange.Limits, merger queryrange.Merger, registerer prometheus.Registerer) queryrange.Middleware { return queryrange.MiddlewareFunc(func(next queryrange.Handler) queryrange.Handler { queriesTotal := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ Namespace: "thanos", @@ -45,7 +45,7 @@ type querySharder struct { next queryrange.Handler limits queryrange.Limits - queryAnalyzer *querysharding.QueryAnalyzer + queryAnalyzer querysharding.Analyze numShards int merger queryrange.Merger From 428cf92f8a7ad9a1cf7b8cc0c31984333a389c1a Mon Sep 17 00:00:00 2001 From: Rahul Kumar Jha Date: Thu, 6 Oct 2022 13:08:48 +0530 Subject: [PATCH 06/13] cache in querysharding pkg with error Signed-off-by: Rahul Kumar Jha --- pkg/queryfrontend/roundtrip.go | 23 +++++++++++++++++------ pkg/querysharding/analyzer.go | 21 ++++++++++++++------- pkg/querysharding/analyzer_test.go | 16 +++++++++++++--- 3 files changed, 44 insertions(+), 16 deletions(-) diff --git a/pkg/queryfrontend/roundtrip.go b/pkg/queryfrontend/roundtrip.go index 5a417f584c..1fba1e98a9 100644 --- a/pkg/queryfrontend/roundtrip.go +++ b/pkg/queryfrontend/roundtrip.go @@ -70,8 +70,7 @@ func NewTripperware(config Config, reg prometheus.Registerer, logger log.Logger) if err != nil { return nil, err } - - queryInstantTripperware := newInstantQueryTripperware( + queryInstantTripperware, err := newInstantQueryTripperware( config.NumShards, queryRangeLimits, queryInstantCodec, @@ -79,6 +78,9 @@ func NewTripperware(config Config, reg prometheus.Registerer, logger log.Logger) config.ForwardHeaders, ) + if err != nil { + return nil, err + } return func(next http.RoundTripper) http.RoundTripper { return newRoundTripper(next, queryRangeTripperware(next), labelsTripperware(next), queryInstantTripperware(next), reg) }, nil @@ -190,9 +192,14 @@ func newQueryRangeTripperware( } if numShards > 0 { + analyzer, err := querysharding.NewQueryAnalyzer() + + if err != nil { + return nil, errors.Wrap(err, "create analyzer cache") + } queryRangeMiddleware = append( queryRangeMiddleware, - PromQLShardingMiddleware(querysharding.NewQueryAnalyzer(), numShards, limits, codec, reg), + PromQLShardingMiddleware(analyzer, numShards, limits, codec, reg), ) } @@ -327,14 +334,18 @@ func newInstantQueryTripperware( codec queryrange.Codec, reg prometheus.Registerer, forwardHeaders []string, -) queryrange.Tripperware { +) (queryrange.Tripperware, error) { instantQueryMiddlewares := []queryrange.Middleware{} m := queryrange.NewInstrumentMiddlewareMetrics(reg) if numShards > 0 { + analyzer, err := querysharding.NewQueryAnalyzer() + if err != nil { + return nil, errors.Wrap(err, "create analyzer cache") + } instantQueryMiddlewares = append( instantQueryMiddlewares, queryrange.InstrumentMiddleware("sharding", m), - PromQLShardingMiddleware(querysharding.NewQueryAnalyzer(), numShards, limits, codec, reg), + PromQLShardingMiddleware(analyzer, numShards, limits, codec, reg), ) } @@ -343,7 +354,7 @@ func newInstantQueryTripperware( return queryrange.RoundTripFunc(func(r *http.Request) (*http.Response, error) { return rt.RoundTrip(r) }) - } + }, nil } // shouldCache controls what kind of Thanos request should be cached. diff --git a/pkg/querysharding/analyzer.go b/pkg/querysharding/analyzer.go index df0e948926..a64a317ad0 100644 --- a/pkg/querysharding/analyzer.go +++ b/pkg/querysharding/analyzer.go @@ -17,8 +17,7 @@ type Analyze interface { Analyze(string) (QueryAnalysis, error) } -type QueryAnalyzer struct { -} +type QueryAnalyzer struct{} type CachedQueryAnalyzer struct { analyzer *QueryAnalyzer @@ -31,13 +30,17 @@ var nonShardableFuncs = []string{ } // NewQueryAnalyzer creates a new QueryAnalyzer. -func NewQueryAnalyzer() *CachedQueryAnalyzer { - cache, _ := lru.New(256) +func NewQueryAnalyzer() (Analyze, error) { + cache, err := lru.New(256) + + if err != nil { + return &QueryAnalyzer{}, err + } return &CachedQueryAnalyzer{ analyzer: &QueryAnalyzer{}, cache: cache, - } + }, nil } type cachedValue struct { @@ -47,8 +50,12 @@ type cachedValue struct { func (a *CachedQueryAnalyzer) Analyze(query string) (QueryAnalysis, error) { if a.cache.Contains(query) { - value, _ := a.cache.Get(query) - return value.(cachedValue).QueryAnalysis, value.(cachedValue).err + value, ok := a.cache.Get(query) + if !ok { + return QueryAnalysis{}, fmt.Errorf("failed to fetch query:%s from cache", query) + } else { + return value.(cachedValue).QueryAnalysis, value.(cachedValue).err + } } // Analyze if needed diff --git a/pkg/querysharding/analyzer_test.go b/pkg/querysharding/analyzer_test.go index acb365ae93..d311f2d686 100644 --- a/pkg/querysharding/analyzer_test.go +++ b/pkg/querysharding/analyzer_test.go @@ -4,6 +4,7 @@ package querysharding import ( + "fmt" "sort" "testing" @@ -166,7 +167,10 @@ http_requests_total`, for _, test := range nonShardable { t.Run(test.name, func(t *testing.T) { - analyzer := NewQueryAnalyzer() + analyzer, err := NewQueryAnalyzer() + if err != nil { + fmt.Println("error intializing cache: ", err) + } analysis, err := analyzer.Analyze(test.expression) require.NoError(t, err) require.False(t, analysis.IsShardable()) @@ -175,7 +179,10 @@ http_requests_total`, for _, test := range shardableByLabels { t.Run(test.name, func(t *testing.T) { - analyzer := NewQueryAnalyzer() + analyzer, err := NewQueryAnalyzer() + if err != nil { + fmt.Println("error intializing cache: ", err) + } analysis, err := analyzer.Analyze(test.expression) require.NoError(t, err) require.True(t, analysis.IsShardable()) @@ -189,7 +196,10 @@ http_requests_total`, for _, test := range shardableWithoutLabels { t.Run(test.name, func(t *testing.T) { - analyzer := NewQueryAnalyzer() + analyzer, err := NewQueryAnalyzer() + if err != nil { + fmt.Println("error intializing cache: ", err) + } analysis, err := analyzer.Analyze(test.expression) require.NoError(t, err) require.True(t, analysis.IsShardable()) From 34bec8fc282677cc0c34bc1895ae1f3d27e8b38d Mon Sep 17 00:00:00 2001 From: Rahul Kumar Jha Date: Thu, 6 Oct 2022 13:33:33 +0530 Subject: [PATCH 07/13] cache in querysharding pkg with error Signed-off-by: Rahul Kumar Jha --- pkg/querysharding/analyzer.go | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/pkg/querysharding/analyzer.go b/pkg/querysharding/analyzer.go index a64a317ad0..b4987adca1 100644 --- a/pkg/querysharding/analyzer.go +++ b/pkg/querysharding/analyzer.go @@ -67,18 +67,19 @@ func (a *CachedQueryAnalyzer) Analyze(query string) (QueryAnalysis, error) { return analysis, err } -// queryAnalyzer analyzes a query and returns a QueryAnalysis. - -// queryAnalyzer uses the following algorithm: -// * if a query has subqueries, such as label_join or label_replace, -// or has functions which cannot be sharded, then treat the query as non shardable. -// * if the query's root expression has grouping labels, -// then treat the query as shardable by those labels. -// * if the query's root expression has no grouping labels, -// then walk the query and find the least common labelset -// used in grouping expressions. If non-empty, treat the query -// as shardable by those labels. -// * otherwise, treat the query as non-shardable. +// Analyze analyzes a query and returns a QueryAnalysis. + +// Analyze uses the following algorithm: +// - if a query has subqueries, such as label_join or label_replace, +// or has functions which cannot be sharded, then treat the query as non shardable. +// - if the query's root expression has grouping labels, +// then treat the query as shardable by those labels. +// - if the query's root expression has no grouping labels, +// then walk the query and find the least common labelset +// used in grouping expressions. If non-empty, treat the query +// as shardable by those labels. +// - otherwise, treat the query as non-shardable. + // The le label is excluded from sharding. func (a *QueryAnalyzer) Analyze(query string) (QueryAnalysis, error) { From a61100c03ab0c922b9380f371af44af0c0378a70 Mon Sep 17 00:00:00 2001 From: Rahul Kumar Jha Date: Thu, 6 Oct 2022 20:03:24 +0530 Subject: [PATCH 08/13] removed typos in comments in analyzer.go Signed-off-by: Rahul Kumar Jha --- pkg/querysharding/analyzer.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/querysharding/analyzer.go b/pkg/querysharding/analyzer.go index 22429530f0..4bdbc1b840 100644 --- a/pkg/querysharding/analyzer.go +++ b/pkg/querysharding/analyzer.go @@ -79,8 +79,8 @@ func (a *CachedQueryAnalyzer) Analyze(query string) (QueryAnalysis, error) { // used in grouping expressions. If non-empty, treat the query // as shardable by those labels. // - otherwise, treat the query as non-shardable. -// The le label is excluded from sharding. - +// +// The le label is excluded from sharding func (a *QueryAnalyzer) Analyze(query string) (QueryAnalysis, error) { expr, err := parser.ParseExpr(query) From 49c64c9f9e14dfebc5f9d24f7b401dd46e71c688 Mon Sep 17 00:00:00 2001 From: Rahul Kumar Jha Date: Thu, 6 Oct 2022 20:13:08 +0530 Subject: [PATCH 09/13] removed typos in comments in analyzer.go Signed-off-by: Rahul Kumar Jha --- pkg/querysharding/analyzer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/querysharding/analyzer.go b/pkg/querysharding/analyzer.go index 4bdbc1b840..54d765c1f4 100644 --- a/pkg/querysharding/analyzer.go +++ b/pkg/querysharding/analyzer.go @@ -80,7 +80,7 @@ func (a *CachedQueryAnalyzer) Analyze(query string) (QueryAnalysis, error) { // as shardable by those labels. // - otherwise, treat the query as non-shardable. // -// The le label is excluded from sharding +// The le label is excluded from sharding. func (a *QueryAnalyzer) Analyze(query string) (QueryAnalysis, error) { expr, err := parser.ParseExpr(query) From a5171251d2b874d535e4ebc169a06363dc9fb33b Mon Sep 17 00:00:00 2001 From: Rahul Kumar Jha Date: Sun, 9 Oct 2022 07:15:14 +0530 Subject: [PATCH 10/13] cached queryanalyzer Signed-off-by: Rahul Kumar Jha --- pkg/queryfrontend/roundtrip.go | 6 ++---- pkg/querysharding/analyzer.go | 10 +++------- pkg/querysharding/analyzer_test.go | 13 +++---------- 3 files changed, 8 insertions(+), 21 deletions(-) diff --git a/pkg/queryfrontend/roundtrip.go b/pkg/queryfrontend/roundtrip.go index 1fba1e98a9..d2138385f0 100644 --- a/pkg/queryfrontend/roundtrip.go +++ b/pkg/queryfrontend/roundtrip.go @@ -77,7 +77,6 @@ func NewTripperware(config Config, reg prometheus.Registerer, logger log.Logger) prometheus.WrapRegistererWith(prometheus.Labels{"tripperware": "query_instant"}, reg), config.ForwardHeaders, ) - if err != nil { return nil, err } @@ -193,9 +192,8 @@ func newQueryRangeTripperware( if numShards > 0 { analyzer, err := querysharding.NewQueryAnalyzer() - if err != nil { - return nil, errors.Wrap(err, "create analyzer cache") + return nil, errors.Wrap(err, "create query analyzer") } queryRangeMiddleware = append( queryRangeMiddleware, @@ -340,7 +338,7 @@ func newInstantQueryTripperware( if numShards > 0 { analyzer, err := querysharding.NewQueryAnalyzer() if err != nil { - return nil, errors.Wrap(err, "create analyzer cache") + return nil, errors.Wrap(err, "create query analyzer") } instantQueryMiddlewares = append( instantQueryMiddlewares, diff --git a/pkg/querysharding/analyzer.go b/pkg/querysharding/analyzer.go index 54d765c1f4..cbd624795c 100644 --- a/pkg/querysharding/analyzer.go +++ b/pkg/querysharding/analyzer.go @@ -30,11 +30,10 @@ var nonShardableFuncs = []string{ } // NewQueryAnalyzer creates a new QueryAnalyzer. -func NewQueryAnalyzer() (Analyze, error) { +func NewQueryAnalyzer() (*CachedQueryAnalyzer, error) { cache, err := lru.New(256) - if err != nil { - return &QueryAnalyzer{}, err + return nil, err } return &CachedQueryAnalyzer{ @@ -51,9 +50,7 @@ type cachedValue struct { func (a *CachedQueryAnalyzer) Analyze(query string) (QueryAnalysis, error) { if a.cache.Contains(query) { value, ok := a.cache.Get(query) - if !ok { - return QueryAnalysis{}, fmt.Errorf("failed to fetch query:%s from cache", query) - } else { + if ok { return value.(cachedValue).QueryAnalysis, value.(cachedValue).err } } @@ -82,7 +79,6 @@ func (a *CachedQueryAnalyzer) Analyze(query string) (QueryAnalysis, error) { // // The le label is excluded from sharding. func (a *QueryAnalyzer) Analyze(query string) (QueryAnalysis, error) { - expr, err := parser.ParseExpr(query) if err != nil { return nonShardableQuery(), err diff --git a/pkg/querysharding/analyzer_test.go b/pkg/querysharding/analyzer_test.go index d311f2d686..efd9dd5d94 100644 --- a/pkg/querysharding/analyzer_test.go +++ b/pkg/querysharding/analyzer_test.go @@ -4,7 +4,6 @@ package querysharding import ( - "fmt" "sort" "testing" @@ -168,9 +167,7 @@ http_requests_total`, for _, test := range nonShardable { t.Run(test.name, func(t *testing.T) { analyzer, err := NewQueryAnalyzer() - if err != nil { - fmt.Println("error intializing cache: ", err) - } + require.NoError(t, err) analysis, err := analyzer.Analyze(test.expression) require.NoError(t, err) require.False(t, analysis.IsShardable()) @@ -180,9 +177,7 @@ http_requests_total`, for _, test := range shardableByLabels { t.Run(test.name, func(t *testing.T) { analyzer, err := NewQueryAnalyzer() - if err != nil { - fmt.Println("error intializing cache: ", err) - } + require.NoError(t, err) analysis, err := analyzer.Analyze(test.expression) require.NoError(t, err) require.True(t, analysis.IsShardable()) @@ -197,9 +192,7 @@ http_requests_total`, for _, test := range shardableWithoutLabels { t.Run(test.name, func(t *testing.T) { analyzer, err := NewQueryAnalyzer() - if err != nil { - fmt.Println("error intializing cache: ", err) - } + require.NoError(t, err) analysis, err := analyzer.Analyze(test.expression) require.NoError(t, err) require.True(t, analysis.IsShardable()) From 08674ccf6e7fd113e791c781f45818383e077c93 Mon Sep 17 00:00:00 2001 From: Rahul Kumar Jha Date: Wed, 12 Oct 2022 19:49:00 +0530 Subject: [PATCH 11/13] cached query analysis in querysharding pkg Signed-off-by: Rahul Kumar Jha --- pkg/querysharding/analyzer.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/querysharding/analyzer.go b/pkg/querysharding/analyzer.go index cbd624795c..66d7307fed 100644 --- a/pkg/querysharding/analyzer.go +++ b/pkg/querysharding/analyzer.go @@ -55,10 +55,10 @@ func (a *CachedQueryAnalyzer) Analyze(query string) (QueryAnalysis, error) { } } - // Analyze if needed + // Analyze if needed. analysis, err := a.analyzer.Analyze(query) - // Adding to cache + // Adding to cache. _ = a.cache.Add(query, cachedValue{QueryAnalysis: analysis, err: err}) return analysis, err From cbcff58419ba9d739c08086a39fe2320fe881630 Mon Sep 17 00:00:00 2001 From: Rahul Kumar Jha Date: Wed, 12 Oct 2022 21:58:05 +0530 Subject: [PATCH 12/13] changed interface name from Analyze to Analyzer in analyzer.go Signed-off-by: Rahul Kumar Jha --- pkg/queryfrontend/shard_query.go | 4 ++-- pkg/querysharding/analyzer.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/queryfrontend/shard_query.go b/pkg/queryfrontend/shard_query.go index faffd73783..36921f5cca 100644 --- a/pkg/queryfrontend/shard_query.go +++ b/pkg/queryfrontend/shard_query.go @@ -19,7 +19,7 @@ import ( ) // PromQLShardingMiddleware creates a new Middleware that shards PromQL aggregations using grouping labels. -func PromQLShardingMiddleware(queryAnalyzer querysharding.Analyze, numShards int, limits queryrange.Limits, merger queryrange.Merger, registerer prometheus.Registerer) queryrange.Middleware { +func PromQLShardingMiddleware(queryAnalyzer *querysharding.CachedQueryAnalyzer, numShards int, limits queryrange.Limits, merger queryrange.Merger, registerer prometheus.Registerer) queryrange.Middleware { return queryrange.MiddlewareFunc(func(next queryrange.Handler) queryrange.Handler { queriesTotal := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ Namespace: "thanos", @@ -45,7 +45,7 @@ type querySharder struct { next queryrange.Handler limits queryrange.Limits - queryAnalyzer querysharding.Analyze + queryAnalyzer *querysharding.CachedQueryAnalyzer numShards int merger queryrange.Merger diff --git a/pkg/querysharding/analyzer.go b/pkg/querysharding/analyzer.go index 66d7307fed..f93a103fcf 100644 --- a/pkg/querysharding/analyzer.go +++ b/pkg/querysharding/analyzer.go @@ -13,7 +13,7 @@ import ( // QueryAnalyzer is an analyzer which determines // whether a PromQL Query is shardable and using which labels. -type Analyze interface { +type Analyzer interface { Analyze(string) (QueryAnalysis, error) } From eb627272e9419bbccb86aaff8e075bfcef3f7af7 Mon Sep 17 00:00:00 2001 From: Rahul Kumar Jha Date: Wed, 12 Oct 2022 23:21:23 +0530 Subject: [PATCH 13/13] changed struct to interface in shard_query.go Signed-off-by: Rahul Kumar Jha --- pkg/queryfrontend/shard_query.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/queryfrontend/shard_query.go b/pkg/queryfrontend/shard_query.go index 36921f5cca..67f7cd56f5 100644 --- a/pkg/queryfrontend/shard_query.go +++ b/pkg/queryfrontend/shard_query.go @@ -19,7 +19,7 @@ import ( ) // PromQLShardingMiddleware creates a new Middleware that shards PromQL aggregations using grouping labels. -func PromQLShardingMiddleware(queryAnalyzer *querysharding.CachedQueryAnalyzer, numShards int, limits queryrange.Limits, merger queryrange.Merger, registerer prometheus.Registerer) queryrange.Middleware { +func PromQLShardingMiddleware(queryAnalyzer querysharding.Analyzer, numShards int, limits queryrange.Limits, merger queryrange.Merger, registerer prometheus.Registerer) queryrange.Middleware { return queryrange.MiddlewareFunc(func(next queryrange.Handler) queryrange.Handler { queriesTotal := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ Namespace: "thanos", @@ -45,7 +45,7 @@ type querySharder struct { next queryrange.Handler limits queryrange.Limits - queryAnalyzer *querysharding.CachedQueryAnalyzer + queryAnalyzer querysharding.Analyzer numShards int merger queryrange.Merger