Skip to content

Commit

Permalink
Cache vertical shards in query frontend (#5648)
Browse files Browse the repository at this point in the history
* Cache vertical shards in query frontend

The vertical sharding middleware is currently executed after the
caching middleware. Because of this, individual vertical shards are
not getting cached when the succeed. Caching is only done when the
entire requests including all shards complete successfully.

This commit moves the vertical sharding middleware before the caching
middleware. It also modifies caching keys to contain the total shards
and the shard number so that each vertical shard gets an independent
caching key.

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

* Adjust cache key tests

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

* Remove source of flakiness by using sync.Cond

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed Sep 4, 2022
1 parent 891e6f2 commit a947f33
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 16 deletions.
10 changes: 9 additions & 1 deletion pkg/queryfrontend/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,19 @@ func (t thanosCacheKeyGenerator) GenerateCacheKey(userID string, r queryrange.Re
i := 0
for ; i < len(t.resolutions) && t.resolutions[i] > tr.MaxSourceResolution; i++ {
}
return fmt.Sprintf("fe:%s:%s:%d:%d:%d", userID, tr.Query, tr.Step, currentInterval, i)
shardInfoKey := generateShardInfoKey(tr)
return fmt.Sprintf("fe:%s:%s:%d:%d:%d:%s", userID, tr.Query, tr.Step, currentInterval, i, shardInfoKey)
case *ThanosLabelsRequest:
return fmt.Sprintf("fe:%s:%s:%s:%d", userID, tr.Label, tr.Matchers, currentInterval)
case *ThanosSeriesRequest:
return fmt.Sprintf("fe:%s:%s:%d", userID, tr.Matchers, currentInterval)
}
return fmt.Sprintf("fe:%s:%s:%d:%d", userID, r.GetQuery(), r.GetStep(), currentInterval)
}

func generateShardInfoKey(r *ThanosQueryRangeRequest) string {
if r.ShardInfo == nil {
return "-"
}
return fmt.Sprintf("%d:%d", r.ShardInfo.TotalShards, r.ShardInfo.ShardIndex)
}
16 changes: 9 additions & 7 deletions pkg/queryfrontend/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestGenerateCacheKey(t *testing.T) {
Start: 0,
Step: 60 * seconds,
},
expected: "fe::up:60000:0:2",
expected: "fe::up:60000:0:2:-",
},
{
name: "10s step",
Expand All @@ -46,7 +46,7 @@ func TestGenerateCacheKey(t *testing.T) {
Start: 0,
Step: 10 * seconds,
},
expected: "fe::up:10000:0:2",
expected: "fe::up:10000:0:2:-",
},
{
name: "1m downsampling resolution",
Expand All @@ -56,7 +56,7 @@ func TestGenerateCacheKey(t *testing.T) {
Step: 10 * seconds,
MaxSourceResolution: 60 * seconds,
},
expected: "fe::up:10000:0:2",
expected: "fe::up:10000:0:2:-",
},
{
name: "5m downsampling resolution, different cache key",
Expand All @@ -66,7 +66,7 @@ func TestGenerateCacheKey(t *testing.T) {
Step: 10 * seconds,
MaxSourceResolution: 300 * seconds,
},
expected: "fe::up:10000:0:1",
expected: "fe::up:10000:0:1:-",
},
{
name: "1h downsampling resolution, different cache key",
Expand All @@ -76,7 +76,7 @@ func TestGenerateCacheKey(t *testing.T) {
Step: 10 * seconds,
MaxSourceResolution: hour,
},
expected: "fe::up:10000:0:0",
expected: "fe::up:10000:0:0:-",
},
{
name: "label names, no matcher",
Expand Down Expand Up @@ -134,7 +134,9 @@ func TestGenerateCacheKey(t *testing.T) {
expected: `fe::up:[[foo="bar"] [baz="qux"]]:0`,
},
} {
key := splitter.GenerateCacheKey("", tc.req)
testutil.Equals(t, tc.expected, key)
t.Run(tc.name, func(t *testing.T) {
key := splitter.GenerateCacheKey("", tc.req)
testutil.Equals(t, tc.expected, key)
})
}
}
14 changes: 7 additions & 7 deletions pkg/queryfrontend/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,13 @@ func newQueryRangeTripperware(
)
}

if numShards > 0 {
queryRangeMiddleware = append(
queryRangeMiddleware,
PromQLShardingMiddleware(querysharding.NewQueryAnalyzer(), numShards, limits, codec, reg),
)
}

if config.ResultsCacheConfig != nil {
queryCacheMiddleware, _, err := queryrange.NewResultsCacheMiddleware(
logger,
Expand Down Expand Up @@ -222,13 +229,6 @@ func newQueryRangeTripperware(
)
}

if numShards > 0 {
queryRangeMiddleware = append(
queryRangeMiddleware,
PromQLShardingMiddleware(querysharding.NewQueryAnalyzer(), numShards, limits, codec, reg),
)
}

return func(next http.RoundTripper) http.RoundTripper {
rt := queryrange.NewRoundTripper(next, codec, forwardHeaders, queryRangeMiddleware...)
return queryrange.RoundTripFunc(func(r *http.Request) (*http.Response, error) {
Expand Down
142 changes: 141 additions & 1 deletion pkg/queryfrontend/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,6 @@ func TestRoundTripQueryRangeCacheMiddleware(t *testing.T) {
},
} {
if !t.Run(tc.name, func(t *testing.T) {

ctx := user.InjectOrgID(context.Background(), "1")
httpReq, err := NewThanosQueryRangeCodec(true).EncodeRequest(ctx, tc.req)
testutil.Ok(t, err)
Expand All @@ -479,6 +478,96 @@ func TestRoundTripQueryRangeCacheMiddleware(t *testing.T) {
}
}

func TestRoundTripQueryCacheWithShardingMiddleware(t *testing.T) {
testRequest := &ThanosQueryRangeRequest{
Path: "/api/v1/query_range",
Start: 0,
End: 2 * hour,
Step: 10 * seconds,
Dedup: true,
Query: "sum by (pod) (memory_usage)",
Timeout: hour,
}

cacheConf := &queryrange.ResultsCacheConfig{
CacheConfig: cortexcache.Config{
EnableFifoCache: true,
Fifocache: cortexcache.FifoCacheConfig{
MaxSizeBytes: "1MiB",
MaxSizeItems: 1000,
Validity: time.Hour,
},
},
}

tpw, err := NewTripperware(
Config{
NumShards: 2,
QueryRangeConfig: QueryRangeConfig{
Limits: defaultLimits,
ResultsCacheConfig: cacheConf,
SplitQueriesByInterval: day,
},
}, nil, log.NewNopLogger(),
)
testutil.Ok(t, err)

rt, err := newFakeRoundTripper()
testutil.Ok(t, err)
defer rt.Close()
res, handler := promqlResultsWithFailures(3)
rt.setHandler(handler)

for _, tc := range []struct {
name string
req queryrange.Request
err bool
expected int
}{
{
name: "query with vertical sharding",
req: testRequest,
err: true,
expected: 2,
},
{
name: "same query as before, both requests are executed",
req: testRequest,
err: true,
expected: 4,
},
{
name: "same query as before, one request is executed",
req: testRequest,
err: false,
expected: 5,
},
{
name: "same query as before again, no requests are executed",
req: testRequest,
err: false,
expected: 5,
},
} {
if !t.Run(tc.name, func(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "1")
httpReq, err := NewThanosQueryRangeCodec(true).EncodeRequest(ctx, tc.req)
testutil.Ok(t, err)

_, err = tpw(rt).RoundTrip(httpReq)
if tc.err {
testutil.NotOk(t, err)
} else {
testutil.Ok(t, err)
}

testutil.Equals(t, tc.expected, *res)
}) {
break
}
}
}

// TestRoundTripLabelsCacheMiddleware tests the cache middleware for labels requests.
func TestRoundTripLabelsCacheMiddleware(t *testing.T) {
testRequest := &ThanosLabelsRequest{
Expand Down Expand Up @@ -730,6 +819,57 @@ func promqlResults(fail bool) (*int, http.Handler) {
})
}

// promqlResultsWithFailures is a mock handler used to test split and cache middleware.
// it will return a failed response numFailures times.
func promqlResultsWithFailures(numFailures int) (*int, http.Handler) {
count := 0
var lock sync.Mutex
q := queryrange.PrometheusResponse{
Status: "success",
Data: queryrange.PrometheusData{
ResultType: string(parser.ValueTypeMatrix),
Result: []queryrange.SampleStream{
{
Labels: []cortexpb.LabelAdapter{},
Samples: []cortexpb.Sample{
{Value: 0, TimestampMs: 0},
{Value: 1, TimestampMs: 1},
},
},
},
},
}

cond := sync.NewCond(&sync.Mutex{})
cond.L.Lock()
return &count, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
lock.Lock()
defer lock.Unlock()

// Set fail in the response code to test retry.
if numFailures > 0 {
numFailures--

// Wait for a successful request.
// Release the lock to allow other requests to execute.
if numFailures == 0 {
lock.Unlock()
cond.Wait()
<-time.After(500 * time.Millisecond)
lock.Lock()
}
w.WriteHeader(500)
}
if err := json.NewEncoder(w).Encode(q); err != nil {
panic(err)
}
if numFailures == 0 {
cond.Broadcast()
}
count++
})
}

// labelsResults is a mock handler used to test split and cache middleware for label names and label values requests.
func labelsResults(fail bool) (*int, http.Handler) {
count := 0
Expand Down

0 comments on commit a947f33

Please sign in to comment.