Skip to content

Commit

Permalink
fix querysharding labels analysis
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <benye@amazon.com>
  • Loading branch information
yeya24 committed Nov 9, 2022
1 parent 863dd57 commit a510f1e
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 60 deletions.
10 changes: 2 additions & 8 deletions pkg/queryfrontend/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,7 @@ func newQueryRangeTripperware(
}

if numShards > 0 {
analyzer, err := querysharding.NewQueryAnalyzer()
if err != nil {
return nil, errors.Wrap(err, "create query analyzer")
}
analyzer := querysharding.NewQueryAnalyzer()
queryRangeMiddleware = append(
queryRangeMiddleware,
PromQLShardingMiddleware(analyzer, numShards, limits, codec, reg),
Expand Down Expand Up @@ -336,10 +333,7 @@ func newInstantQueryTripperware(
instantQueryMiddlewares := []queryrange.Middleware{}
m := queryrange.NewInstrumentMiddlewareMetrics(reg)
if numShards > 0 {
analyzer, err := querysharding.NewQueryAnalyzer()
if err != nil {
return nil, errors.Wrap(err, "create query analyzer")
}
analyzer := querysharding.NewQueryAnalyzer()
instantQueryMiddlewares = append(
instantQueryMiddlewares,
queryrange.InstrumentMiddleware("sharding", m),
Expand Down
28 changes: 16 additions & 12 deletions pkg/querysharding/analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,6 @@ func nonShardableQuery() QueryAnalysis {
}
}

func newShardableByLabels(labels []string, by bool) QueryAnalysis {
labels = without(labels, excludedLabels)

return QueryAnalysis{
shardBy: by,
shardingLabels: labels,
}
}

func (q *QueryAnalysis) scopeToLabels(labels []string, by bool) QueryAnalysis {
labels = without(labels, excludedLabels)

Expand All @@ -39,16 +30,29 @@ func (q *QueryAnalysis) scopeToLabels(labels []string, by bool) QueryAnalysis {
}
}

if by {
if q.shardBy && by {
return QueryAnalysis{
shardBy: true,
shardingLabels: intersect(q.shardingLabels, labels),
}
}

if !q.shardBy && !by {
return QueryAnalysis{
shardBy: false,
shardingLabels: union(q.shardingLabels, labels),
}
}

// If we are sharding by and without the same time,
// keep the sharding by labels that are not in the without labels set.
labelsBy, labelsWithout := q.shardingLabels, labels
if !q.shardBy {
labelsBy, labelsWithout = labelsWithout, labelsBy
}
return QueryAnalysis{
shardBy: false,
shardingLabels: union(q.shardingLabels, labels),
shardBy: true,
shardingLabels: without(labelsBy, labelsWithout),
}
}

Expand Down
38 changes: 5 additions & 33 deletions pkg/querysharding/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,14 @@ var nonShardableFuncs = []string{
}

// NewQueryAnalyzer creates a new QueryAnalyzer.
func NewQueryAnalyzer() (*CachedQueryAnalyzer, error) {
cache, err := lru.New(256)
if err != nil {
return nil, err
}

func NewQueryAnalyzer() *CachedQueryAnalyzer {
// Ignore the error check since it throws error
// only if size is <= 0.
cache, _ := lru.New(256)
return &CachedQueryAnalyzer{
analyzer: &QueryAnalyzer{},
cache: cache,
}, nil
}
}

type cachedValue struct {
Expand Down Expand Up @@ -117,35 +115,9 @@ func (a *QueryAnalyzer) Analyze(query string) (QueryAnalysis, error) {
return nonShardableQuery(), nil
}

rootAnalysis := analyzeRootExpression(expr)
if rootAnalysis.IsShardable() && rootAnalysis.shardBy {
return rootAnalysis, nil
}

return analysis, nil
}

func analyzeRootExpression(node parser.Node) QueryAnalysis {
switch n := node.(type) {
case *parser.BinaryExpr:
if n.VectorMatching != nil && n.VectorMatching.On {
shardingLabels := without(n.VectorMatching.MatchingLabels, []string{"le"})
return newShardableByLabels(shardingLabels, n.VectorMatching.On)
} else {
return nonShardableQuery()
}
case *parser.AggregateExpr:
if len(n.Grouping) == 0 {
return nonShardableQuery()
}

shardingLabels := without(n.Grouping, []string{"le"})
return newShardableByLabels(shardingLabels, !n.Without)
}

return nonShardableQuery()
}

func contains(needle string, haystack []string) bool {
for _, item := range haystack {
if needle == item {
Expand Down
11 changes: 4 additions & 7 deletions pkg/querysharding/analyzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ sum by (container) (
{
name: "multiple binary expressions with grouping",
expression: `(http_requests_total{code="400"} + on (pod) http_requests_total{code="500"}) / on (cluster, pod) http_requests_total`,
shardingLabels: []string{"cluster", "pod"},
shardingLabels: []string{"pod"},
},
{
name: "histogram quantile",
Expand Down Expand Up @@ -177,8 +177,7 @@ http_requests_total`,

for _, test := range nonShardable {
t.Run(test.name, func(t *testing.T) {
analyzer, err := NewQueryAnalyzer()
require.NoError(t, err)
analyzer := NewQueryAnalyzer()
analysis, err := analyzer.Analyze(test.expression)
require.NoError(t, err)
require.False(t, analysis.IsShardable())
Expand All @@ -187,8 +186,7 @@ http_requests_total`,

for _, test := range shardableByLabels {
t.Run(test.name, func(t *testing.T) {
analyzer, err := NewQueryAnalyzer()
require.NoError(t, err)
analyzer := NewQueryAnalyzer()
analysis, err := analyzer.Analyze(test.expression)
require.NoError(t, err)
require.True(t, analysis.IsShardable())
Expand All @@ -202,8 +200,7 @@ http_requests_total`,

for _, test := range shardableWithoutLabels {
t.Run(test.name, func(t *testing.T) {
analyzer, err := NewQueryAnalyzer()
require.NoError(t, err)
analyzer := NewQueryAnalyzer()
analysis, err := analyzer.Analyze(test.expression)
require.NoError(t, err)
require.True(t, analysis.IsShardable())
Expand Down

0 comments on commit a510f1e

Please sign in to comment.