Skip to content

Commit

Permalink
Store Gateway index stats response (#3206)
Browse files Browse the repository at this point in the history
* WIP gateway stats response

* Update comment and fix linter issue

* update license header for new rpc.proto

* Refactor for moved proto files

* rename stats entry to reflect other usages

* add series fetched to the index fetch bytes total

* add a comment and move to uint64

* Update pkg/storegateway/storepb/rpc.proto

Co-authored-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
Tyler Reid and pracucci committed Oct 21, 2022
1 parent 35d35d2 commit a3fac18
Show file tree
Hide file tree
Showing 10 changed files with 498 additions and 78 deletions.
2 changes: 2 additions & 0 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer
numSeries := stats.LoadFetchedSeries()
numBytes := stats.LoadFetchedChunkBytes()
numChunks := stats.LoadFetchedChunks()
numIndexBytes := stats.LoadFetchedIndexBytes()
sharded := strconv.FormatBool(stats.GetShardedQueries() > 0)

if stats != nil {
Expand All @@ -222,6 +223,7 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer
"fetched_series_count", numSeries,
"fetched_chunk_bytes", numBytes,
"fetched_chunks_count", numChunks,
"fetched_index_bytes", numIndexBytes,
"sharded_queries", stats.LoadShardedQueries(),
"split_queries", stats.LoadSplitQueries(),
}, formatQueryString(queryString)...)
Expand Down
7 changes: 7 additions & 0 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
mySeries := []*storepb.Series(nil)
myWarnings := storage.Warnings(nil)
myQueriedBlocks := []ulid.ULID(nil)
indexBytesFetched := uint64(0)

for {
// Ensure the context hasn't been canceled in the meanwhile (eg. an error occurred
Expand Down Expand Up @@ -789,6 +790,10 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(

myQueriedBlocks = append(myQueriedBlocks, ids...)
}

if s := resp.GetStats(); s != nil {
indexBytesFetched += s.FetchedIndexBytes
}
}

numSeries := len(mySeries)
Expand All @@ -797,12 +802,14 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
reqStats.AddFetchedSeries(uint64(numSeries))
reqStats.AddFetchedChunkBytes(uint64(chunkBytes))
reqStats.AddFetchedChunks(uint64(chunksFetched))
reqStats.AddFetchedIndexBytes(indexBytesFetched)

level.Debug(spanLog).Log("msg", "received series from store-gateway",
"instance", c.RemoteAddress(),
"fetched series", numSeries,
"fetched chunk bytes", chunkBytes,
"fetched chunks", chunksFetched,
"fetched index bytes", indexBytesFetched,
"requested blocks", strings.Join(convertULIDsToString(blockIDs), " "),
"queried blocks", strings.Join(convertULIDsToString(myQueriedBlocks), " "))

Expand Down
9 changes: 9 additions & 0 deletions pkg/querier/blocks_store_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
mockSeriesResponse(labels.Labels{metricNameLabel}, minT, 1),
mockSeriesResponse(labels.Labels{metricNameLabel}, minT+1, 2),
mockHintsResponse(block1, block2),
mockStatsResponse(50),
}}: {block1, block2},
},
},
Expand Down Expand Up @@ -1928,6 +1929,14 @@ func mockSeriesResponseWithChunks(lbls labels.Labels, chunks ...storepb.AggrChun
}
}

func mockStatsResponse(fetchedIndexBytes int) *storepb.SeriesResponse {
return &storepb.SeriesResponse{
Result: &storepb.SeriesResponse_Stats{
Stats: &storepb.Stats{FetchedIndexBytes: uint64(fetchedIndexBytes)},
},
}
}

func mockHintsResponse(ids ...ulid.ULID) *storepb.SeriesResponse {
hints := &hintspb.SeriesResponseHints{}
for _, id := range ids {
Expand Down
17 changes: 17 additions & 0 deletions pkg/querier/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,22 @@ func (s *Stats) LoadFetchedChunks() uint64 {
return atomic.LoadUint64(&s.FetchedChunksCount)
}

func (s *Stats) AddFetchedIndexBytes(indexBytes uint64) {
if s == nil {
return
}

atomic.AddUint64(&s.FetchedIndexBytes, indexBytes)
}

func (s *Stats) LoadFetchedIndexBytes() uint64 {
if s == nil {
return 0
}

return atomic.LoadUint64(&s.FetchedIndexBytes)
}

func (s *Stats) AddShardedQueries(num uint32) {
if s == nil {
return
Expand Down Expand Up @@ -151,6 +167,7 @@ func (s *Stats) Merge(other *Stats) {
s.AddFetchedChunks(other.LoadFetchedChunks())
s.AddShardedQueries(other.LoadShardedQueries())
s.AddSplitQueries(other.LoadSplitQueries())
s.AddFetchedIndexBytes(other.LoadFetchedIndexBytes())
}

func ShouldTrackHTTPGRPCResponse(r *httpgrpc.HTTPResponse) bool {
Expand Down
90 changes: 66 additions & 24 deletions pkg/querier/stats/stats.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/querier/stats/stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ message Stats {
uint32 sharded_queries = 5;
// The number of split partial queries executed. 0 if splitting is disabled or the query can't be split.
uint32 split_queries = 6;
// The number of index bytes fetched on the store-gateway for the query
uint64 fetched_index_bytes = 7;
}
5 changes: 5 additions & 0 deletions pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1063,6 +1063,11 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
}
}

if err = srv.Send(storepb.NewStatsResponse(stats.postingsFetchedSizeSum + stats.seriesFetchedSizeSum)); err != nil {
err = status.Error(codes.Unknown, errors.Wrap(err, "sends series response stats").Error())
return
}

return err
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/storegateway/storepb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ func NewHintsSeriesResponse(hints *types.Any) *SeriesResponse {
}
}

func NewStatsResponse(indexBytesFetched int) *SeriesResponse {
return &SeriesResponse{
Result: &SeriesResponse_Stats{
Stats: &Stats{FetchedIndexBytes: uint64(indexBytesFetched)},
},
}
}

type emptySeriesSet struct{}

func (emptySeriesSet) Next() bool { return false }
Expand Down
Loading

0 comments on commit a3fac18

Please sign in to comment.