Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add additional querier metrics #7099

Merged
merged 6 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* [ENHANCEMENT] Distributor: support disabling metric relabel rules per-tenant via the flag `-distributor.metric-relabeling-enabled` or associated YAML. #6970
* [ENHANCEMENT] Distributor: `-distributor.remote-timeout` is now accounted from the first ingester push request being sent. #6972
* [ENHANCEMENT] Storage Provider: allow aws sts support for s3 storage provider #6172
* [ENHANCEMENT] Querier: add `cortex_querier_queries_storage_type_total ` metric that indicates how many queries have executed for a source, ingesters or store-gateways. Add `cortex_query_storegateway_chunks_total` metric to count the number of chunks fetched from a store gateway.
jhalterman marked this conversation as resolved.
Show resolved Hide resolved
* [FEATURE] Introduce `-tenant-federation.max-tenants` option to limit the max number of tenants allowed for requests when federation is enabled. #6959
* [ENHANCEMENT] Query-frontend: add experimental support for sharding active series queries via `-query-frontend.shard-active-series-queries`. #6784
* [BUGFIX] Ingester: don't ignore errors encountered while iterating through chunks or samples in response to a query request. #6451
Expand Down
7 changes: 6 additions & 1 deletion pkg/querier/block_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ type storeGatewayStreamReader struct {
expectedSeriesCount int
queryLimiter *limiter.QueryLimiter
stats *stats.Stats
metrics *blocksStoreQueryableMetrics
log log.Logger

chunkCountEstimateChan chan int
Expand All @@ -141,13 +142,14 @@ type storeGatewayStreamReader struct {
err error
}

func newStoreGatewayStreamReader(ctx context.Context, client storegatewaypb.StoreGateway_SeriesClient, expectedSeriesCount int, queryLimiter *limiter.QueryLimiter, stats *stats.Stats, log log.Logger) *storeGatewayStreamReader {
func newStoreGatewayStreamReader(ctx context.Context, client storegatewaypb.StoreGateway_SeriesClient, expectedSeriesCount int, queryLimiter *limiter.QueryLimiter, stats *stats.Stats, metrics *blocksStoreQueryableMetrics, log log.Logger) *storeGatewayStreamReader {
return &storeGatewayStreamReader{
ctx: ctx,
client: client,
expectedSeriesCount: expectedSeriesCount,
queryLimiter: queryLimiter,
stats: stats,
metrics: metrics,
log: log,
}
}
Expand Down Expand Up @@ -193,6 +195,9 @@ func (s *storeGatewayStreamReader) StartBuffering() {
func (s *storeGatewayStreamReader) readStream(log *spanlogger.SpanLogger) error {
totalSeries := 0
totalChunks := 0
defer func() {
s.metrics.chunksTotal.Add(float64(totalChunks))
}()

translateReceivedError := func(err error) error {
if errors.Is(err, context.Canceled) {
Expand Down
27 changes: 18 additions & 9 deletions pkg/querier/block_streaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,8 @@ func TestStoreGatewayStreamReader_HappyPaths(t *testing.T) {
t.Run(name, func(t *testing.T) {
ctx := context.Background()
mockClient := &mockStoreGatewayQueryStreamClient{ctx: ctx, messages: testCase.messages}
reader := newStoreGatewayStreamReader(ctx, mockClient, 5, limiter.NewQueryLimiter(0, 0, 0, 0, nil), &stats.Stats{}, log.NewNopLogger())
metrics := newBlocksStoreQueryableMetrics(prometheus.NewPedanticRegistry())
reader := newStoreGatewayStreamReader(ctx, mockClient, 5, limiter.NewQueryLimiter(0, 0, 0, 0, nil), &stats.Stats{}, metrics, log.NewNopLogger())
reader.StartBuffering()

actualChunksEstimate := reader.EstimateChunkCount()
Expand Down Expand Up @@ -387,7 +388,8 @@ func TestStoreGatewayStreamReader_AbortsWhenParentContextCancelled(t *testing.T)
mockClient := &mockStoreGatewayQueryStreamClient{ctx: streamCtx, messages: batchesToMessages(3, batches...)}

parentCtx, cancel := context.WithCancel(context.Background())
reader := newStoreGatewayStreamReader(parentCtx, mockClient, 3, limiter.NewQueryLimiter(0, 0, 0, 0, nil), &stats.Stats{}, log.NewNopLogger())
metrics := newBlocksStoreQueryableMetrics(prometheus.NewPedanticRegistry())
reader := newStoreGatewayStreamReader(parentCtx, mockClient, 3, limiter.NewQueryLimiter(0, 0, 0, 0, nil), &stats.Stats{}, metrics, log.NewNopLogger())
cancel()
reader.StartBuffering()

Expand All @@ -413,9 +415,10 @@ func TestStoreGatewayStreamReader_DoesNotAbortWhenStreamContextCancelled(t *test
cancel()
const expectedChunksEstimate uint64 = 5
mockClient := &mockStoreGatewayQueryStreamClient{ctx: streamCtx, messages: batchesToMessages(expectedChunksEstimate, batches...)}
metrics := newBlocksStoreQueryableMetrics(prometheus.NewPedanticRegistry())

parentCtx := context.Background()
reader := newStoreGatewayStreamReader(parentCtx, mockClient, 3, limiter.NewQueryLimiter(0, 0, 0, 0, nil), &stats.Stats{}, log.NewNopLogger())
reader := newStoreGatewayStreamReader(parentCtx, mockClient, 3, limiter.NewQueryLimiter(0, 0, 0, 0, nil), &stats.Stats{}, metrics, log.NewNopLogger())
reader.StartBuffering()

actualChunksEstimate := reader.EstimateChunkCount()
Expand All @@ -436,7 +439,8 @@ func TestStoreGatewayStreamReader_ReadingSeriesOutOfOrder(t *testing.T) {

ctx := context.Background()
mockClient := &mockStoreGatewayQueryStreamClient{ctx: ctx, messages: batchesToMessages(3, batches...)}
reader := newStoreGatewayStreamReader(ctx, mockClient, 1, limiter.NewQueryLimiter(0, 0, 0, 0, nil), &stats.Stats{}, log.NewNopLogger())
metrics := newBlocksStoreQueryableMetrics(prometheus.NewPedanticRegistry())
reader := newStoreGatewayStreamReader(ctx, mockClient, 1, limiter.NewQueryLimiter(0, 0, 0, 0, nil), &stats.Stats{}, metrics, log.NewNopLogger())
reader.StartBuffering()

s, err := reader.GetChunks(1)
Expand All @@ -452,7 +456,8 @@ func TestStoreGatewayStreamReader_ReadingMoreSeriesThanAvailable(t *testing.T) {

ctx := context.Background()
mockClient := &mockStoreGatewayQueryStreamClient{ctx: ctx, messages: batchesToMessages(3, batches...)}
reader := newStoreGatewayStreamReader(ctx, mockClient, 1, limiter.NewQueryLimiter(0, 0, 0, 0, nil), &stats.Stats{}, log.NewNopLogger())
metrics := newBlocksStoreQueryableMetrics(prometheus.NewPedanticRegistry())
reader := newStoreGatewayStreamReader(ctx, mockClient, 1, limiter.NewQueryLimiter(0, 0, 0, 0, nil), &stats.Stats{}, metrics, log.NewNopLogger())
reader.StartBuffering()

s, err := reader.GetChunks(0)
Expand All @@ -479,7 +484,8 @@ func TestStoreGatewayStreamReader_ReceivedFewerSeriesThanExpected(t *testing.T)

ctx := context.Background()
mockClient := &mockStoreGatewayQueryStreamClient{ctx: ctx, messages: batchesToMessages(3, batches...)}
reader := newStoreGatewayStreamReader(ctx, mockClient, 3, limiter.NewQueryLimiter(0, 0, 0, 0, nil), &stats.Stats{}, log.NewNopLogger())
metrics := newBlocksStoreQueryableMetrics(prometheus.NewPedanticRegistry())
reader := newStoreGatewayStreamReader(ctx, mockClient, 3, limiter.NewQueryLimiter(0, 0, 0, 0, nil), &stats.Stats{}, metrics, log.NewNopLogger())
reader.StartBuffering()

s, err := reader.GetChunks(0)
Expand Down Expand Up @@ -530,7 +536,8 @@ func TestStoreGatewayStreamReader_ReceivedMoreSeriesThanExpected(t *testing.T) {
t.Run(name, func(t *testing.T) {
ctx := context.Background()
mockClient := &mockStoreGatewayQueryStreamClient{ctx: ctx, messages: batchesToMessages(3, batches...)}
reader := newStoreGatewayStreamReader(ctx, mockClient, 1, limiter.NewQueryLimiter(0, 0, 0, 0, nil), &stats.Stats{}, log.NewNopLogger())
metrics := newBlocksStoreQueryableMetrics(prometheus.NewPedanticRegistry())
reader := newStoreGatewayStreamReader(ctx, mockClient, 1, limiter.NewQueryLimiter(0, 0, 0, 0, nil), &stats.Stats{}, metrics, log.NewNopLogger())
reader.StartBuffering()

s, err := reader.GetChunks(0)
Expand Down Expand Up @@ -585,9 +592,11 @@ func TestStoreGatewayStreamReader_ChunksLimits(t *testing.T) {

ctx := context.Background()
mockClient := &mockStoreGatewayQueryStreamClient{ctx: ctx, messages: batchesToMessages(3, batches...)}
queryMetrics := stats.NewQueryMetrics(prometheus.NewPedanticRegistry())
registry := prometheus.NewPedanticRegistry()
metrics := newBlocksStoreQueryableMetrics(registry)
queryMetrics := stats.NewQueryMetrics(registry)

reader := newStoreGatewayStreamReader(ctx, mockClient, 1, limiter.NewQueryLimiter(0, testCase.maxChunkBytes, testCase.maxChunks, 0, queryMetrics), &stats.Stats{}, log.NewNopLogger())
reader := newStoreGatewayStreamReader(ctx, mockClient, 1, limiter.NewQueryLimiter(0, testCase.maxChunkBytes, testCase.maxChunks, 0, queryMetrics), &stats.Stats{}, metrics, log.NewNopLogger())
reader.StartBuffering()

_, err := reader.GetChunks(0)
Expand Down
10 changes: 9 additions & 1 deletion pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ type blocksStoreQueryableMetrics struct {
blocksFound prometheus.Counter
blocksQueried prometheus.Counter
blocksWithCompactorShardButIncompatibleQueryShard prometheus.Counter
// The total number of chunks received from store-gateways that were used to evaluate queries
chunksTotal prometheus.Counter
}

func newBlocksStoreQueryableMetrics(reg prometheus.Registerer) *blocksStoreQueryableMetrics {
Expand Down Expand Up @@ -135,6 +137,11 @@ func newBlocksStoreQueryableMetrics(reg prometheus.Registerer) *blocksStoreQuery
Name: "cortex_querier_blocks_with_compactor_shard_but_incompatible_query_shard_total",
Help: "Blocks that couldn't be checked for query and compactor sharding optimization due to incompatible shard counts.",
}),
// Named to be consistent with distributor_query_ingester_chunks_total
Copy link
Member Author

@jhalterman jhalterman Jan 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This metric is very similar to the metrics in query_metrics.go, but those are mostly specific to ingesters/distributors and not as easy to share with a block store queryable, so I followed the precedent of keeping the ingester and store gateway metrics in queriers separate.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: the metric is named cortex_distributor_query_ingester_chunks_total (with cortex_ prefix).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really a nit, not worth a PR, but if you address the other comment then you could fix this too.

chunksTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_query_storegateway_chunks_total",
Copy link
Collaborator

@pracucci pracucci Jan 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The querier metrics are prefixed by cortex_querier_. WDYT if we change the metric name to cortex_querier_query_storegateway_chunks_total?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That name sounds good. Pushed PR #7145

Help: "Number of chunks received from store gateways at query time.",
}),
}
}

Expand Down Expand Up @@ -795,6 +802,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor
}

chunksCount, chunksSize := countChunksAndBytes(s)
q.metrics.chunksTotal.Add(float64(chunksCount))
if err := queryLimiter.AddChunkBytes(chunksSize); err != nil {
return err
}
Expand Down Expand Up @@ -863,7 +871,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor
} else if len(myStreamingSeries) > 0 {
// FetchedChunks and FetchedChunkBytes are added by the SeriesChunksStreamReader.
reqStats.AddFetchedSeries(uint64(len(myStreamingSeries)))
streamReader = newStoreGatewayStreamReader(reqCtx, stream, len(myStreamingSeries), queryLimiter, reqStats, q.logger)
streamReader = newStoreGatewayStreamReader(reqCtx, stream, len(myStreamingSeries), queryLimiter, reqStats, q.metrics, q.logger)
level.Debug(log).Log("msg", "received streaming series from store-gateway",
"instance", c.RemoteAddress(),
"fetched series", len(myStreamingSeries),
Expand Down
2 changes: 2 additions & 0 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ func (mq multiQuerier) getQueriers(ctx context.Context) (context.Context, []stor
return nil, nil, err
}
queriers = append(queriers, q)
mq.queryMetrics.QueriesExecutedTotal.WithLabelValues("ingester").Inc()
}

if mq.blockStore != nil && ShouldQueryBlockStore(mq.cfg.QueryStoreAfter, now, mq.minT) {
Expand All @@ -247,6 +248,7 @@ func (mq multiQuerier) getQueriers(ctx context.Context) (context.Context, []stor
return nil, nil, err
}
queriers = append(queriers, q)
mq.queryMetrics.QueriesExecutedTotal.WithLabelValues("store-gateway").Inc()
}

return ctx, queriers, nil
Expand Down
8 changes: 8 additions & 0 deletions pkg/querier/stats/query_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ type QueryMetrics struct {

// The total number of queries that were rejected for some reason.
QueriesRejectedTotal *prometheus.CounterVec

// The total number of queries executed against a particular source
QueriesExecutedTotal *prometheus.CounterVec
}

func NewQueryMetrics(reg prometheus.Registerer) *QueryMetrics {
Expand All @@ -52,6 +55,11 @@ func NewQueryMetrics(reg prometheus.Registerer) *QueryMetrics {
Name: "querier_queries_rejected_total",
Help: "Number of queries that were rejected, for example because they exceeded a limit.",
}, []string{"reason"}),
QueriesExecutedTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "querier_queries_storage_type_total",
Help: "Number of PromQL queries that were executed against a particular storage type.",
}, []string{"storage"}),
}

// Ensure the reject metric is initialised (so that we export the value "0" before a limit is reached for the first time).
Expand Down
Loading