Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply @ modifier start and end in QF split interval middleware #5844

Merged
merged 3 commits into from
Nov 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

### Fixed

- [#5844](https://github.com/thanos-io/thanos/pull/5844) Query Frontend: Fixes @ modifier time range when splitting queries by interval.

### Added

- [#5814](https://github.com/thanos-io/thanos/pull/5814) Store: Add metric `thanos_bucket_store_postings_size_bytes` that shows the distribution of how many postings (in bytes) were needed for each Series() call in Thanos Store. Useful for determining limits.
Expand Down
6 changes: 3 additions & 3 deletions internal/cortex/querier/queryrange/split_by_interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func splitQuery(r Request, interval time.Duration) ([]Request, error) {

// Replace @ modifier function to their respective constant values in the query.
// This way subqueries will be evaluated at the same time as the parent query.
query, err := evaluateAtModifierFunction(r.GetQuery(), r.GetStart(), r.GetEnd())
query, err := EvaluateAtModifierFunction(r.GetQuery(), r.GetStart(), r.GetEnd())
if err != nil {
return nil, err
}
Expand All @@ -93,10 +93,10 @@ func splitQuery(r Request, interval time.Duration) ([]Request, error) {
return reqs, nil
}

// evaluateAtModifierFunction parse the query and evaluates the `start()` and `end()` at modifier functions into actual constant timestamps.
// EvaluateAtModifierFunction parse the query and evaluates the `start()` and `end()` at modifier functions into actual constant timestamps.
// For example given the start of the query is 10.00, `http_requests_total[1h] @ start()` query will be replaced with `http_requests_total[1h] @ 10.00`
// If the modifier is already a constant, it will be returned as is.
func evaluateAtModifierFunction(query string, start, end int64) (string, error) {
func EvaluateAtModifierFunction(query string, start, end int64) (string, error) {
expr, err := parser.ParseExpr(query)
if err != nil {
return "", httpgrpc.Errorf(http.StatusBadRequest, "%s", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func Test_evaluateAtModifier(t *testing.T) {
tt := tt
t.Run(tt.in, func(t *testing.T) {
t.Parallel()
out, err := evaluateAtModifierFunction(tt.in, start, end)
out, err := EvaluateAtModifierFunction(tt.in, start, end)
if tt.expectedErrorCode != 0 {
require.Error(t, err)
httpResp, ok := httpgrpc.HTTPResponseFromError(err)
Expand Down
9 changes: 9 additions & 0 deletions pkg/queryfrontend/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func TestRoundTripRetryMiddleware(t *testing.T) {
Start: 0,
End: 2 * hour,
Step: 10 * seconds,
Query: "foo",
}

testLabelsRequest := &ThanosLabelsRequest{Path: "/api/v1/labels", Start: 0, End: 2 * hour}
Expand Down Expand Up @@ -222,6 +223,7 @@ func TestRoundTripSplitIntervalMiddleware(t *testing.T) {
Start: 0,
End: 2 * hour,
Step: 10 * seconds,
Query: "foo",
}

testLabelsRequest := &ThanosLabelsRequest{
Expand Down Expand Up @@ -395,6 +397,7 @@ func TestRoundTripQueryRangeCacheMiddleware(t *testing.T) {
Step: 10 * seconds,
MaxSourceResolution: 1 * seconds,
Dedup: true, // Deduplication is enabled by default.
Query: "foo",
}

testRequestWithoutDedup := &ThanosQueryRangeRequest{
Expand All @@ -404,6 +407,7 @@ func TestRoundTripQueryRangeCacheMiddleware(t *testing.T) {
Step: 10 * seconds,
MaxSourceResolution: 1 * seconds,
Dedup: false,
Query: "foo",
}

// Same query params as testRequest, different maxSourceResolution
Expand All @@ -415,6 +419,7 @@ func TestRoundTripQueryRangeCacheMiddleware(t *testing.T) {
Step: 10 * seconds,
MaxSourceResolution: 10 * seconds,
Dedup: true,
Query: "foo",
}

// Same query params as testRequest, different maxSourceResolution
Expand All @@ -426,6 +431,7 @@ func TestRoundTripQueryRangeCacheMiddleware(t *testing.T) {
Step: 10 * seconds,
MaxSourceResolution: 1 * hour,
Dedup: true,
Query: "foo",
}

// Same query params as testRequest, but with storeMatchers
Expand All @@ -437,6 +443,7 @@ func TestRoundTripQueryRangeCacheMiddleware(t *testing.T) {
MaxSourceResolution: 1 * seconds,
StoreMatchers: [][]*labels.Matcher{{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}},
Dedup: true,
Query: "foo",
}

cacheConf := &queryrange.ResultsCacheConfig{
Expand Down Expand Up @@ -487,6 +494,7 @@ func TestRoundTripQueryRangeCacheMiddleware(t *testing.T) {
End: 25 * hour,
Step: 10 * seconds,
Dedup: true,
Query: "foo",
},
expected: 6,
},
Expand All @@ -498,6 +506,7 @@ func TestRoundTripQueryRangeCacheMiddleware(t *testing.T) {
End: 25 * hour,
Step: 10 * seconds,
Dedup: true,
Query: "foo",
},
expected: 6,
},
Expand Down
17 changes: 13 additions & 4 deletions pkg/queryfrontend/split_by_interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ type splitByInterval struct {
func (s splitByInterval) Do(ctx context.Context, r queryrange.Request) (queryrange.Response, error) {
// First we're going to build new requests, one for each day, taking care
// to line up the boundaries with step.
reqs := splitQuery(r, s.interval(r))
reqs, err := splitQuery(r, s.interval(r))
if err != nil {
return nil, err
}
s.splitByCounter.Add(float64(len(reqs)))

reqResps, err := queryrange.DoRequests(ctx, s.next, reqs, s.limits)
Expand All @@ -66,9 +69,15 @@ func (s splitByInterval) Do(ctx context.Context, r queryrange.Request) (queryran
return response, nil
}

func splitQuery(r queryrange.Request, interval time.Duration) []queryrange.Request {
func splitQuery(r queryrange.Request, interval time.Duration) ([]queryrange.Request, error) {
var reqs []queryrange.Request
if _, ok := r.(*ThanosQueryRangeRequest); ok {
// Replace @ modifier function to their respective constant values in the query.
// This way subqueries will be evaluated at the same time as the parent query.
query, err := queryrange.EvaluateAtModifierFunction(r.GetQuery(), r.GetStart(), r.GetEnd())
if err != nil {
return nil, err
}
if start := r.GetStart(); start == r.GetEnd() {
reqs = append(reqs, r.WithStartEnd(start, start))
} else {
Expand All @@ -78,7 +87,7 @@ func splitQuery(r queryrange.Request, interval time.Duration) []queryrange.Reque
end = r.GetEnd()
}

reqs = append(reqs, r.WithStartEnd(start, end))
reqs = append(reqs, r.WithQuery(query).WithStartEnd(start, end))
}
}
} else {
Expand All @@ -93,7 +102,7 @@ func splitQuery(r queryrange.Request, interval time.Duration) []queryrange.Reque
}
}

return reqs
return reqs, nil
}

// Round up to the step before the next interval boundary.
Expand Down
240 changes: 240 additions & 0 deletions pkg/queryfrontend/split_by_interval_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package queryfrontend

import (
"strconv"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/internal/cortex/querier/queryrange"
)

func TestSplitQuery(t *testing.T) {
for i, tc := range []struct {
input queryrange.Request
expected []queryrange.Request
interval time.Duration
}{
{
input: &ThanosQueryRangeRequest{
Start: 0,
End: 60 * 60 * seconds,
Step: 15 * seconds,
Query: "foo",
},
expected: []queryrange.Request{
&ThanosQueryRangeRequest{
Start: 0,
End: 60 * 60 * seconds,
Step: 15 * seconds,
Query: "foo",
},
},
interval: day,
},
{
input: &ThanosQueryRangeRequest{
Start: 60 * 60 * seconds,
End: 60 * 60 * seconds,
Step: 15 * seconds,
Query: "foo",
},
expected: []queryrange.Request{
&ThanosQueryRangeRequest{
Start: 60 * 60 * seconds,
End: 60 * 60 * seconds,
Step: 15 * seconds,
Query: "foo",
},
},
interval: day,
},
{
input: &ThanosQueryRangeRequest{
Start: 0,
End: 60 * 60 * seconds,
Step: 15 * seconds,
Query: "foo",
},
expected: []queryrange.Request{
&ThanosQueryRangeRequest{
Start: 0,
End: 60 * 60 * seconds,
Step: 15 * seconds,
Query: "foo",
},
},
interval: 3 * time.Hour,
},
{
input: &ThanosQueryRangeRequest{
Start: 0,
End: 24 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo",
},
expected: []queryrange.Request{
&ThanosQueryRangeRequest{
Start: 0,
End: 24 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo",
},
},
interval: day,
},
{
input: &ThanosQueryRangeRequest{
Start: 0,
End: 3 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo",
},
expected: []queryrange.Request{
&ThanosQueryRangeRequest{
Start: 0,
End: 3 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo",
},
},
interval: 3 * time.Hour,
},
{
input: &ThanosQueryRangeRequest{
Start: 0,
End: 2 * 24 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo @ start()",
},
expected: []queryrange.Request{
&ThanosQueryRangeRequest{
Start: 0,
End: (24 * 3600 * seconds) - (15 * seconds),
Step: 15 * seconds,
Query: "foo @ 0.000",
},
&ThanosQueryRangeRequest{
Start: 24 * 3600 * seconds,
End: 2 * 24 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo @ 0.000",
},
},
interval: day,
},
{
input: &ThanosQueryRangeRequest{
Start: 0,
End: 2 * 24 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo @ end()",
},
expected: []queryrange.Request{
&ThanosQueryRangeRequest{
Start: 0,
End: (24 * 3600 * seconds) - (15 * seconds),
Step: 15 * seconds,
Query: "foo @ 172800.000",
},
&ThanosQueryRangeRequest{
Start: 24 * 3600 * seconds,
End: 2 * 24 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo @ 172800.000",
},
},
interval: day,
},
{
input: &ThanosQueryRangeRequest{
Start: 0,
End: 2 * 3 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo",
},
expected: []queryrange.Request{
&ThanosQueryRangeRequest{
Start: 0,
End: (3 * 3600 * seconds) - (15 * seconds),
Step: 15 * seconds,
Query: "foo",
},
&ThanosQueryRangeRequest{
Start: 3 * 3600 * seconds,
End: 2 * 3 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo",
},
},
interval: 3 * time.Hour,
},
{
input: &ThanosQueryRangeRequest{
Start: 3 * 3600 * seconds,
End: 3 * 24 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo",
},
expected: []queryrange.Request{
&ThanosQueryRangeRequest{
Start: 3 * 3600 * seconds,
End: (24 * 3600 * seconds) - (15 * seconds),
Step: 15 * seconds,
Query: "foo",
},
&ThanosQueryRangeRequest{
Start: 24 * 3600 * seconds,
End: (2 * 24 * 3600 * seconds) - (15 * seconds),
Step: 15 * seconds,
Query: "foo",
},
&ThanosQueryRangeRequest{
Start: 2 * 24 * 3600 * seconds,
End: 3 * 24 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo",
},
},
interval: day,
},
{
input: &ThanosQueryRangeRequest{
Start: 2 * 3600 * seconds,
End: 3 * 3 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo",
},
expected: []queryrange.Request{
&ThanosQueryRangeRequest{
Start: 2 * 3600 * seconds,
End: (3 * 3600 * seconds) - (15 * seconds),
Step: 15 * seconds,
Query: "foo",
},
&ThanosQueryRangeRequest{
Start: 3 * 3600 * seconds,
End: (2 * 3 * 3600 * seconds) - (15 * seconds),
Step: 15 * seconds,
Query: "foo",
},
&ThanosQueryRangeRequest{
Start: 2 * 3 * 3600 * seconds,
End: 3 * 3 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo",
},
},
interval: 3 * time.Hour,
},
} {
t.Run(strconv.Itoa(i), func(t *testing.T) {
queries, err := splitQuery(tc.input, tc.interval)
require.NoError(t, err)
require.Equal(t, tc.expected, queries)
})
}
}