Skip to content

Commit

Permalink
created lru cache in querysharding pkg (#5749)
Browse files Browse the repository at this point in the history
* created lru cache in querysharding pkg

Signed-off-by: Rahul Kumar Jha <rahuljhakumar1600@gmail.com>

* triggering linting & vetting

Signed-off-by: Rahul Kumar Jha <rahuljhakumar1600@gmail.com>

* added cache in querysharding pkg

Signed-off-by: Rahul Kumar Jha <rahuljhakumar1600@gmail.com>

* created CacheQueryAnalyzer in querysharding pkg

Signed-off-by: Rahul Kumar Jha <rahuljhakumar1600@gmail.com>

* changed type of queryAnalyzer in queryfrontend pkg

Signed-off-by: Rahul Kumar Jha <rahuljhakumar1600@gmail.com>

* cache in querysharding pkg with error

Signed-off-by: Rahul Kumar Jha <rahuljhakumar1600@gmail.com>

* cache in querysharding pkg with error

Signed-off-by: Rahul Kumar Jha <rahuljhakumar1600@gmail.com>

* removed typos in comments in analyzer.go

Signed-off-by: Rahul Kumar Jha <rahuljhakumar1600@gmail.com>

* removed typos in comments in analyzer.go

Signed-off-by: Rahul Kumar Jha <rahuljhakumar1600@gmail.com>

* cached queryanalyzer

Signed-off-by: Rahul Kumar Jha <rahuljhakumar1600@gmail.com>

* cached query analysis in querysharding pkg

Signed-off-by: Rahul Kumar Jha <rahuljhakumar1600@gmail.com>

* changed interface name from Analyze to Analyzer in analyzer.go

Signed-off-by: Rahul Kumar Jha <rahuljhakumar1600@gmail.com>

* changed struct to interface in shard_query.go

Signed-off-by: Rahul Kumar Jha <rahuljhakumar1600@gmail.com>

Signed-off-by: Rahul Kumar Jha <rahuljhakumar1600@gmail.com>
  • Loading branch information
Rahulkumar2002 committed Oct 12, 2022
1 parent ecb26ec commit 4730ae9
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 14 deletions.
23 changes: 16 additions & 7 deletions pkg/queryfrontend/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,16 @@ func NewTripperware(config Config, reg prometheus.Registerer, logger log.Logger)
if err != nil {
return nil, err
}

queryInstantTripperware := newInstantQueryTripperware(
queryInstantTripperware, err := newInstantQueryTripperware(
config.NumShards,
queryRangeLimits,
queryInstantCodec,
prometheus.WrapRegistererWith(prometheus.Labels{"tripperware": "query_instant"}, reg),
config.ForwardHeaders,
)

if err != nil {
return nil, err
}
return func(next http.RoundTripper) http.RoundTripper {
return newRoundTripper(next, queryRangeTripperware(next), labelsTripperware(next), queryInstantTripperware(next), reg)
}, nil
Expand Down Expand Up @@ -190,9 +191,13 @@ func newQueryRangeTripperware(
}

if numShards > 0 {
analyzer, err := querysharding.NewQueryAnalyzer()
if err != nil {
return nil, errors.Wrap(err, "create query analyzer")
}
queryRangeMiddleware = append(
queryRangeMiddleware,
PromQLShardingMiddleware(querysharding.NewQueryAnalyzer(), numShards, limits, codec, reg),
PromQLShardingMiddleware(analyzer, numShards, limits, codec, reg),
)
}

Expand Down Expand Up @@ -327,14 +332,18 @@ func newInstantQueryTripperware(
codec queryrange.Codec,
reg prometheus.Registerer,
forwardHeaders []string,
) queryrange.Tripperware {
) (queryrange.Tripperware, error) {
instantQueryMiddlewares := []queryrange.Middleware{}
m := queryrange.NewInstrumentMiddlewareMetrics(reg)
if numShards > 0 {
analyzer, err := querysharding.NewQueryAnalyzer()
if err != nil {
return nil, errors.Wrap(err, "create query analyzer")
}
instantQueryMiddlewares = append(
instantQueryMiddlewares,
queryrange.InstrumentMiddleware("sharding", m),
PromQLShardingMiddleware(querysharding.NewQueryAnalyzer(), numShards, limits, codec, reg),
PromQLShardingMiddleware(analyzer, numShards, limits, codec, reg),
)
}

Expand All @@ -343,7 +352,7 @@ func newInstantQueryTripperware(
return queryrange.RoundTripFunc(func(r *http.Request) (*http.Response, error) {
return rt.RoundTrip(r)
})
}
}, nil
}

// shouldCache controls what kind of Thanos request should be cached.
Expand Down
4 changes: 2 additions & 2 deletions pkg/queryfrontend/shard_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
)

// PromQLShardingMiddleware creates a new Middleware that shards PromQL aggregations using grouping labels.
func PromQLShardingMiddleware(queryAnalyzer *querysharding.QueryAnalyzer, numShards int, limits queryrange.Limits, merger queryrange.Merger, registerer prometheus.Registerer) queryrange.Middleware {
func PromQLShardingMiddleware(queryAnalyzer querysharding.Analyzer, numShards int, limits queryrange.Limits, merger queryrange.Merger, registerer prometheus.Registerer) queryrange.Middleware {
return queryrange.MiddlewareFunc(func(next queryrange.Handler) queryrange.Handler {
queriesTotal := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: "thanos",
Expand All @@ -45,7 +45,7 @@ type querySharder struct {
next queryrange.Handler
limits queryrange.Limits

queryAnalyzer *querysharding.QueryAnalyzer
queryAnalyzer querysharding.Analyzer
numShards int
merger queryrange.Merger

Expand Down
45 changes: 43 additions & 2 deletions pkg/querysharding/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,62 @@ package querysharding
import (
"fmt"

lru "github.com/hashicorp/golang-lru"
"github.com/prometheus/prometheus/promql/parser"
)

// QueryAnalyzer is an analyzer which determines
// whether a PromQL Query is shardable and using which labels.

type Analyzer interface {
Analyze(string) (QueryAnalysis, error)
}

type QueryAnalyzer struct{}

type CachedQueryAnalyzer struct {
analyzer *QueryAnalyzer
cache *lru.Cache
}

var nonShardableFuncs = []string{
"label_join",
"label_replace",
}

// NewQueryAnalyzer creates a new QueryAnalyzer.
func NewQueryAnalyzer() *QueryAnalyzer {
return &QueryAnalyzer{}
func NewQueryAnalyzer() (*CachedQueryAnalyzer, error) {
cache, err := lru.New(256)
if err != nil {
return nil, err
}

return &CachedQueryAnalyzer{
analyzer: &QueryAnalyzer{},
cache: cache,
}, nil
}

type cachedValue struct {
QueryAnalysis QueryAnalysis
err error
}

func (a *CachedQueryAnalyzer) Analyze(query string) (QueryAnalysis, error) {
if a.cache.Contains(query) {
value, ok := a.cache.Get(query)
if ok {
return value.(cachedValue).QueryAnalysis, value.(cachedValue).err
}
}

// Analyze if needed.
analysis, err := a.analyzer.Analyze(query)

// Adding to cache.
_ = a.cache.Add(query, cachedValue{QueryAnalysis: analysis, err: err})

return analysis, err
}

// Analyze analyzes a query and returns a QueryAnalysis.
Expand Down
10 changes: 7 additions & 3 deletions pkg/querysharding/analyzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
)

func TestAnalyzeQuery(t *testing.T) {

type testCase struct {
name string
expression string
Expand Down Expand Up @@ -165,7 +166,8 @@ http_requests_total`,

for _, test := range nonShardable {
t.Run(test.name, func(t *testing.T) {
analyzer := NewQueryAnalyzer()
analyzer, err := NewQueryAnalyzer()
require.NoError(t, err)
analysis, err := analyzer.Analyze(test.expression)
require.NoError(t, err)
require.False(t, analysis.IsShardable())
Expand All @@ -174,7 +176,8 @@ http_requests_total`,

for _, test := range shardableByLabels {
t.Run(test.name, func(t *testing.T) {
analyzer := NewQueryAnalyzer()
analyzer, err := NewQueryAnalyzer()
require.NoError(t, err)
analysis, err := analyzer.Analyze(test.expression)
require.NoError(t, err)
require.True(t, analysis.IsShardable())
Expand All @@ -188,7 +191,8 @@ http_requests_total`,

for _, test := range shardableWithoutLabels {
t.Run(test.name, func(t *testing.T) {
analyzer := NewQueryAnalyzer()
analyzer, err := NewQueryAnalyzer()
require.NoError(t, err)
analysis, err := analyzer.Analyze(test.expression)
require.NoError(t, err)
require.True(t, analysis.IsShardable())
Expand Down

0 comments on commit 4730ae9

Please sign in to comment.