Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove cortex_query_frontend_workers_enqueued_requests_total metric #6121

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* [CHANGE] Store-gateway: lazy-loading concurrency limit default value is now 4. #6004
* [CHANGE] General: enabled `-log.buffered` by default. The `-log.buffered` has been deprecated and will be removed in Mimir 2.13. #6131
* [CHANGE] Ingester: changed default `-blocks-storage.tsdb.series-hash-cache-max-size-bytes` setting from `1GB` to `350MB`. The new default cache size is enough to store the hashes for all series in a ingester, assuming up to 2M in-memory series per ingester and using the default 13h retention period for local TSDB blocks in the ingesters. #6129
* [CHANGE] Query-frontend: removed `cortex_query_frontend_workers_enqueued_requests_total`. Use `cortex_query_frontend_enqueue_duration_seconds_count` instead. #6121
* [FEATURE] Query-frontend: add experimental support for query blocking. Queries are blocked on a per-tenant basis and is configured via the limit `blocked_queries`. #5609
* [ENHANCEMENT] Ingester: exported summary `cortex_ingester_inflight_push_requests_summary` tracking total number of inflight requests in percentile buckets. #5845
* [ENHANCEMENT] Query-scheduler: add `cortex_query_scheduler_enqueue_duration_seconds` metric that records the time taken to enqueue or reject a query request. #5879
Expand Down
34 changes: 11 additions & 23 deletions pkg/frontend/v2/frontend_scheduler_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ type frontendSchedulerWorkers struct {
// Set to nil when stop is called... no more workers are created afterwards.
workers map[string]*frontendSchedulerWorker

enqueuedRequests *prometheus.CounterVec
enqueueDuration *prometheus.HistogramVec
enqueueDuration *prometheus.HistogramVec
}

func newFrontendSchedulerWorkers(cfg Config, frontendAddress string, requestsCh <-chan *frontendRequest, log log.Logger, reg prometheus.Registerer) (*frontendSchedulerWorkers, error) {
Expand All @@ -64,10 +63,6 @@ func newFrontendSchedulerWorkers(cfg Config, frontendAddress string, requestsCh
requestsCh: requestsCh,
workers: map[string]*frontendSchedulerWorker{},
schedulerDiscoveryWatcher: services.NewFailureWatcher(),
enqueuedRequests: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_frontend_workers_enqueued_requests_total",
Help: "Total number of requests enqueued by each query frontend worker (regardless of the result), labeled by scheduler address.",
}, []string{schedulerAddressLabel}),
enqueueDuration: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_query_frontend_enqueue_duration_seconds",
Help: "Time spent by requests waiting to join the queue or be rejected.",
Expand Down Expand Up @@ -143,7 +138,7 @@ func (f *frontendSchedulerWorkers) addScheduler(address string) {
}

// No worker for this address yet, start a new one.
w = newFrontendSchedulerWorker(conn, address, f.frontendAddress, f.requestsCh, f.cfg.WorkerConcurrency, f.enqueuedRequests.WithLabelValues(address), f.enqueueDuration.WithLabelValues(address), f.log)
w = newFrontendSchedulerWorker(conn, address, f.frontendAddress, f.requestsCh, f.cfg.WorkerConcurrency, f.enqueueDuration.WithLabelValues(address), f.log)

f.mu.Lock()
defer f.mu.Unlock()
Expand Down Expand Up @@ -177,7 +172,6 @@ func (f *frontendSchedulerWorkers) removeScheduler(address string) {
level.Info(f.log).Log("msg", "removing connection to query-scheduler", "addr", address)
w.stop()
}
f.enqueuedRequests.Delete(prometheus.Labels{schedulerAddressLabel: address})
f.enqueueDuration.Delete(prometheus.Labels{schedulerAddressLabel: address})
}

Expand Down Expand Up @@ -236,24 +230,20 @@ type frontendSchedulerWorker struct {
// query has been enqueued to scheduler.
cancelCh chan uint64

// Number of queries sent to this scheduler.
enqueuedRequests prometheus.Counter

// How long it takes to enqueue a query.
enqueueDuration prometheus.Observer
}

func newFrontendSchedulerWorker(conn *grpc.ClientConn, schedulerAddr string, frontendAddr string, requestsCh <-chan *frontendRequest, concurrency int, enqueuedRequests prometheus.Counter, enqueueDuration prometheus.Observer, log log.Logger) *frontendSchedulerWorker {
func newFrontendSchedulerWorker(conn *grpc.ClientConn, schedulerAddr string, frontendAddr string, requestsCh <-chan *frontendRequest, concurrency int, enqueueDuration prometheus.Observer, log log.Logger) *frontendSchedulerWorker {
w := &frontendSchedulerWorker{
log: log,
conn: conn,
concurrency: concurrency,
schedulerAddr: schedulerAddr,
frontendAddr: frontendAddr,
requestsCh: requestsCh,
cancelCh: make(chan uint64, schedulerWorkerCancelChanCapacity),
enqueuedRequests: enqueuedRequests,
enqueueDuration: enqueueDuration,
log: log,
conn: conn,
concurrency: concurrency,
schedulerAddr: schedulerAddr,
frontendAddr: frontendAddr,
requestsCh: requestsCh,
cancelCh: make(chan uint64, schedulerWorkerCancelChanCapacity),
enqueueDuration: enqueueDuration,
}
w.ctx, w.cancel = context.WithCancel(context.Background())

Expand Down Expand Up @@ -391,8 +381,6 @@ func (w *frontendSchedulerWorker) enqueueRequest(loop schedulerpb.SchedulerForFr
FrontendAddress: w.frontendAddr,
StatsEnabled: req.statsEnabled,
})
w.enqueuedRequests.Inc()

if err != nil {
level.Warn(spanLogger).Log("msg", "received error while sending request to scheduler", "err", err)
req.enqueue <- enqueueResult{status: failed}
Expand Down
21 changes: 0 additions & 21 deletions pkg/frontend/v2/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package v2

import (
"context"
"fmt"
"net"
"net/http"
"os"
Expand All @@ -26,7 +25,6 @@ import (
"github.com/grafana/dskit/test"
"github.com/grafana/dskit/user"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -156,14 +154,6 @@ func TestFrontend_ShouldTrackPerRequestMetrics(t *testing.T) {
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}
})

// Assert on cortex_query_frontend_workers_enqueued_requests_total.
expectedMetrics := fmt.Sprintf(`
# HELP cortex_query_frontend_workers_enqueued_requests_total Total number of requests enqueued by each query frontend worker (regardless of the result), labeled by scheduler address.
# TYPE cortex_query_frontend_workers_enqueued_requests_total counter
cortex_query_frontend_workers_enqueued_requests_total{scheduler_address="%s"} 0
`, f.cfg.SchedulerAddress)
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), "cortex_query_frontend_workers_enqueued_requests_total"))

// Assert on cortex_query_frontend_enqueue_duration_seconds.
metricsMap, err := metrics.NewMetricFamilyMapFromGatherer(reg)
require.NoError(t, err)
Expand All @@ -177,14 +167,6 @@ func TestFrontend_ShouldTrackPerRequestMetrics(t *testing.T) {
require.Equal(t, int32(200), resp.Code)
require.Equal(t, []byte(body), resp.Body)

// Assert on cortex_query_frontend_workers_enqueued_requests_total.
expectedMetrics = fmt.Sprintf(`
# HELP cortex_query_frontend_workers_enqueued_requests_total Total number of requests enqueued by each query frontend worker (regardless of the result), labeled by scheduler address.
# TYPE cortex_query_frontend_workers_enqueued_requests_total counter
cortex_query_frontend_workers_enqueued_requests_total{scheduler_address="%s"} 1
`, f.cfg.SchedulerAddress)
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), "cortex_query_frontend_workers_enqueued_requests_total"))

// Assert on cortex_query_frontend_enqueue_duration_seconds.
metricsMap, err = metrics.NewMetricFamilyMapFromGatherer(reg)
require.NoError(t, err)
Expand All @@ -195,10 +177,7 @@ func TestFrontend_ShouldTrackPerRequestMetrics(t *testing.T) {

// Manually remove the address, check that label is removed.
f.schedulerWorkers.InstanceRemoved(servicediscovery.Instance{Address: f.cfg.SchedulerAddress, InUse: true})
expectedMetrics = ``
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), "cortex_query_frontend_workers_enqueued_requests_total"))

// Assert on cortex_query_frontend_enqueue_duration_seconds (ensure the series is removed).
metricsMap, err = metrics.NewMetricFamilyMapFromGatherer(reg)
require.NoError(t, err)
assert.Empty(t, metricsMap["cortex_query_frontend_enqueue_duration_seconds"])
Expand Down
Loading