Skip to content

Commit

Permalink
Cache vertical shards in query frontend
Browse files Browse the repository at this point in the history
The vertical sharding middleware is currently executed after the
caching middleware. Because of this, individual vertical shards are
not getting cached when the succeed. Caching is only done when the
entire requests including all shards complete successfully.

This commit moves the vertical sharding middleware before the caching
middleware. It also modifies caching keys to contain the total shards
and the shard number so that each vertical shard gets an independent
caching key.
  • Loading branch information
fpetkovski committed Aug 26, 2022
1 parent 319ef15 commit f25e569
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 9 deletions.
10 changes: 9 additions & 1 deletion pkg/queryfrontend/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,19 @@ func (t thanosCacheKeyGenerator) GenerateCacheKey(userID string, r queryrange.Re
i := 0
for ; i < len(t.resolutions) && t.resolutions[i] > tr.MaxSourceResolution; i++ {
}
return fmt.Sprintf("fe:%s:%s:%d:%d:%d", userID, tr.Query, tr.Step, currentInterval, i)
shardInfoKey := generateShardInfoKey(tr)
return fmt.Sprintf("fe:%s:%s:%d:%d:%d:%s", userID, tr.Query, tr.Step, currentInterval, i, shardInfoKey)
case *ThanosLabelsRequest:
return fmt.Sprintf("fe:%s:%s:%s:%d", userID, tr.Label, tr.Matchers, currentInterval)
case *ThanosSeriesRequest:
return fmt.Sprintf("fe:%s:%s:%d", userID, tr.Matchers, currentInterval)
}
return fmt.Sprintf("fe:%s:%s:%d:%d", userID, r.GetQuery(), r.GetStep(), currentInterval)
}

func generateShardInfoKey(r *ThanosQueryRangeRequest) string {
if r.ShardInfo == nil {
return "-"
}
return fmt.Sprintf("%d:%d", r.ShardInfo.TotalShards, r.ShardInfo.ShardIndex)
}
14 changes: 7 additions & 7 deletions pkg/queryfrontend/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,13 @@ func newQueryRangeTripperware(
)
}

if numShards > 0 {
queryRangeMiddleware = append(
queryRangeMiddleware,
PromQLShardingMiddleware(querysharding.NewQueryAnalyzer(), numShards, limits, codec),
)
}

if config.ResultsCacheConfig != nil {
queryCacheMiddleware, _, err := queryrange.NewResultsCacheMiddleware(
logger,
Expand Down Expand Up @@ -222,13 +229,6 @@ func newQueryRangeTripperware(
)
}

if numShards > 0 {
queryRangeMiddleware = append(
queryRangeMiddleware,
PromQLShardingMiddleware(querysharding.NewQueryAnalyzer(), numShards, limits, codec),
)
}

return func(next http.RoundTripper) http.RoundTripper {
rt := queryrange.NewRoundTripper(next, codec, forwardHeaders, queryRangeMiddleware...)
return queryrange.RoundTripFunc(func(r *http.Request) (*http.Response, error) {
Expand Down
138 changes: 137 additions & 1 deletion pkg/queryfrontend/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,6 @@ func TestRoundTripQueryRangeCacheMiddleware(t *testing.T) {
},
} {
if !t.Run(tc.name, func(t *testing.T) {

ctx := user.InjectOrgID(context.Background(), "1")
httpReq, err := NewThanosQueryRangeCodec(true).EncodeRequest(ctx, tc.req)
testutil.Ok(t, err)
Expand All @@ -479,6 +478,100 @@ func TestRoundTripQueryRangeCacheMiddleware(t *testing.T) {
}
}

func TestRoundTripQueryCacheWithShardingMiddleware(t *testing.T) {
testRequest := &ThanosQueryRangeRequest{
Path: "/api/v1/query_range",
Start: 0,
End: 2 * hour,
Step: 10 * seconds,
Dedup: true,
Query: "sum by (pod) (memory_usage)",
Timeout: hour,
}

cacheConf := &queryrange.ResultsCacheConfig{
CacheConfig: cortexcache.Config{
EnableFifoCache: true,
Fifocache: cortexcache.FifoCacheConfig{
MaxSizeBytes: "1MiB",
MaxSizeItems: 1000,
Validity: time.Hour,
},
},
}

tpw, err := NewTripperware(
Config{
NumShards: 2,
QueryRangeConfig: QueryRangeConfig{
Limits: defaultLimits,
ResultsCacheConfig: cacheConf,
SplitQueriesByInterval: day,
},
}, nil, log.NewNopLogger(),
)
testutil.Ok(t, err)

rt, err := newFakeRoundTripper()
testutil.Ok(t, err)
defer rt.Close()
res, handler := promqlResultsWithFailures(3)
rt.setHandler(handler)

for _, tc := range []struct {
name string
req queryrange.Request
err bool
expected int
}{
{
name: "query with vertical sharding",
req: testRequest,
err: true,
expected: 2,
},
{
name: "same query as before, both requests are executed",
req: testRequest,
err: true,
expected: 4,
},
{
name: "same query as before, one request is executed",
req: testRequest,
err: false,
expected: 5,
},
{
name: "same query as before, no requests are executed",
req: testRequest,
err: false,
expected: 5,
},
} {
if !t.Run(tc.name, func(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "1")
httpReq, err := NewThanosQueryRangeCodec(true).EncodeRequest(ctx, tc.req)
testutil.Ok(t, err)

_, err = tpw(rt).RoundTrip(httpReq)
if tc.err {
testutil.NotOk(t, err)
} else {
testutil.Ok(t, err)
}

testutil.Equals(t, tc.expected, *res)

//if *res > tc.expected {
// t.Fatalf("Expected to get less than or exactly %d requests, got %d", tc.expected, *res)
//}
}) {
break
}
}
}

// TestRoundTripLabelsCacheMiddleware tests the cache middleware for labels requests.
func TestRoundTripLabelsCacheMiddleware(t *testing.T) {
testRequest := &ThanosLabelsRequest{
Expand Down Expand Up @@ -730,6 +823,49 @@ func promqlResults(fail bool) (*int, http.Handler) {
})
}

// promqlResultsWithFailures is a mock handler used to test split and cache middleware.
// it will return a failed response numFailures times.
func promqlResultsWithFailures(numFailures int) (*int, http.Handler) {
count := 0
var lock sync.Mutex
q := queryrange.PrometheusResponse{
Status: "success",
Data: queryrange.PrometheusData{
ResultType: string(parser.ValueTypeMatrix),
Result: []queryrange.SampleStream{
{
Labels: []cortexpb.LabelAdapter{},
Samples: []cortexpb.Sample{
{Value: 0, TimestampMs: 0},
{Value: 1, TimestampMs: 1},
},
},
},
},
}

return &count, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
lock.Lock()
defer lock.Unlock()

// Set fail in the response code to test retry.
if numFailures > 0 {
numFailures--

// Allow other requests to execute
lock.Unlock()
<-time.After(200 * time.Millisecond)
lock.Lock()

w.WriteHeader(500)
}
if err := json.NewEncoder(w).Encode(q); err != nil {
panic(err)
}
count++
})
}

// labelsResults is a mock handler used to test split and cache middleware for label names and label values requests.
func labelsResults(fail bool) (*int, http.Handler) {
count := 0
Expand Down

0 comments on commit f25e569

Please sign in to comment.