Skip to content

Commit

Permalink
Track cortex_query_frontend_enqueue_duration_seconds by query-scheduler
Browse files Browse the repository at this point in the history
Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci committed Sep 26, 2023
1 parent e04a90f commit f9767a9
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 44 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* [CHANGE] Store-gateway: lazy-loading concurrency limit default value is now 4. #6004
* [CHANGE] General: enabled `-log.buffered` by default. The `-log.buffered` has been deprecated and will be removed in Mimir 2.13. #6131
* [CHANGE] Ingester: changed default `-blocks-storage.tsdb.series-hash-cache-max-size-bytes` setting from `1GB` to `350MB`. The new default cache size is enough to store the hashes for all series in a ingester, assuming up to 2M in-memory series per ingester and using the default 13h retention period for local TSDB blocks in the ingesters. #6129
* [CHANGE] Query-frontend: removed `cortex_query_frontend_workers_enqueued_requests_total`. Use `cortex_query_frontend_enqueue_duration_seconds_count` instead. #6121
* [FEATURE] Query-frontend: add experimental support for query blocking. Queries are blocked on a per-tenant basis and is configured via the limit `blocked_queries`. #5609
* [ENHANCEMENT] Ingester: exported summary `cortex_ingester_inflight_push_requests_summary` tracking total number of inflight requests in percentile buckets. #5845
* [ENHANCEMENT] Query-scheduler: add `cortex_query_scheduler_enqueue_duration_seconds` metric that records the time taken to enqueue or reject a query request. #5879
Expand Down
34 changes: 11 additions & 23 deletions pkg/frontend/v2/frontend_scheduler_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ type frontendSchedulerWorkers struct {
// Set to nil when stop is called... no more workers are created afterwards.
workers map[string]*frontendSchedulerWorker

enqueuedRequests *prometheus.CounterVec
enqueueDuration *prometheus.HistogramVec
enqueueDuration *prometheus.HistogramVec
}

func newFrontendSchedulerWorkers(cfg Config, frontendAddress string, requestsCh <-chan *frontendRequest, log log.Logger, reg prometheus.Registerer) (*frontendSchedulerWorkers, error) {
Expand All @@ -64,10 +63,6 @@ func newFrontendSchedulerWorkers(cfg Config, frontendAddress string, requestsCh
requestsCh: requestsCh,
workers: map[string]*frontendSchedulerWorker{},
schedulerDiscoveryWatcher: services.NewFailureWatcher(),
enqueuedRequests: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_frontend_workers_enqueued_requests_total",
Help: "Total number of requests enqueued by each query frontend worker (regardless of the result), labeled by scheduler address.",
}, []string{schedulerAddressLabel}),
enqueueDuration: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_query_frontend_enqueue_duration_seconds",
Help: "Time spent by requests waiting to join the queue or be rejected.",
Expand Down Expand Up @@ -143,7 +138,7 @@ func (f *frontendSchedulerWorkers) addScheduler(address string) {
}

// No worker for this address yet, start a new one.
w = newFrontendSchedulerWorker(conn, address, f.frontendAddress, f.requestsCh, f.cfg.WorkerConcurrency, f.enqueuedRequests.WithLabelValues(address), f.enqueueDuration.WithLabelValues(address), f.log)
w = newFrontendSchedulerWorker(conn, address, f.frontendAddress, f.requestsCh, f.cfg.WorkerConcurrency, f.enqueueDuration.WithLabelValues(address), f.log)

f.mu.Lock()
defer f.mu.Unlock()
Expand Down Expand Up @@ -177,7 +172,6 @@ func (f *frontendSchedulerWorkers) removeScheduler(address string) {
level.Info(f.log).Log("msg", "removing connection to query-scheduler", "addr", address)
w.stop()
}
f.enqueuedRequests.Delete(prometheus.Labels{schedulerAddressLabel: address})
f.enqueueDuration.Delete(prometheus.Labels{schedulerAddressLabel: address})
}

Expand Down Expand Up @@ -236,24 +230,20 @@ type frontendSchedulerWorker struct {
// query has been enqueued to scheduler.
cancelCh chan uint64

// Number of queries sent to this scheduler.
enqueuedRequests prometheus.Counter

// How long it takes to enqueue a query.
enqueueDuration prometheus.Observer
}

func newFrontendSchedulerWorker(conn *grpc.ClientConn, schedulerAddr string, frontendAddr string, requestsCh <-chan *frontendRequest, concurrency int, enqueuedRequests prometheus.Counter, enqueueDuration prometheus.Observer, log log.Logger) *frontendSchedulerWorker {
func newFrontendSchedulerWorker(conn *grpc.ClientConn, schedulerAddr string, frontendAddr string, requestsCh <-chan *frontendRequest, concurrency int, enqueueDuration prometheus.Observer, log log.Logger) *frontendSchedulerWorker {
w := &frontendSchedulerWorker{
log: log,
conn: conn,
concurrency: concurrency,
schedulerAddr: schedulerAddr,
frontendAddr: frontendAddr,
requestsCh: requestsCh,
cancelCh: make(chan uint64, schedulerWorkerCancelChanCapacity),
enqueuedRequests: enqueuedRequests,
enqueueDuration: enqueueDuration,
log: log,
conn: conn,
concurrency: concurrency,
schedulerAddr: schedulerAddr,
frontendAddr: frontendAddr,
requestsCh: requestsCh,
cancelCh: make(chan uint64, schedulerWorkerCancelChanCapacity),
enqueueDuration: enqueueDuration,
}
w.ctx, w.cancel = context.WithCancel(context.Background())

Expand Down Expand Up @@ -391,8 +381,6 @@ func (w *frontendSchedulerWorker) enqueueRequest(loop schedulerpb.SchedulerForFr
FrontendAddress: w.frontendAddr,
StatsEnabled: req.statsEnabled,
})
w.enqueuedRequests.Inc()

if err != nil {
level.Warn(spanLogger).Log("msg", "received error while sending request to scheduler", "err", err)
req.enqueue <- enqueueResult{status: failed}
Expand Down
21 changes: 0 additions & 21 deletions pkg/frontend/v2/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package v2

import (
"context"
"fmt"
"net"
"net/http"
"os"
Expand All @@ -26,7 +25,6 @@ import (
"github.com/grafana/dskit/test"
"github.com/grafana/dskit/user"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -156,14 +154,6 @@ func TestFrontend_ShouldTrackPerRequestMetrics(t *testing.T) {
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}
})

// Assert on cortex_query_frontend_workers_enqueued_requests_total.
expectedMetrics := fmt.Sprintf(`
# HELP cortex_query_frontend_workers_enqueued_requests_total Total number of requests enqueued by each query frontend worker (regardless of the result), labeled by scheduler address.
# TYPE cortex_query_frontend_workers_enqueued_requests_total counter
cortex_query_frontend_workers_enqueued_requests_total{scheduler_address="%s"} 0
`, f.cfg.SchedulerAddress)
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), "cortex_query_frontend_workers_enqueued_requests_total"))

// Assert on cortex_query_frontend_enqueue_duration_seconds.
metricsMap, err := metrics.NewMetricFamilyMapFromGatherer(reg)
require.NoError(t, err)
Expand All @@ -177,14 +167,6 @@ func TestFrontend_ShouldTrackPerRequestMetrics(t *testing.T) {
require.Equal(t, int32(200), resp.Code)
require.Equal(t, []byte(body), resp.Body)

// Assert on cortex_query_frontend_workers_enqueued_requests_total.
expectedMetrics = fmt.Sprintf(`
# HELP cortex_query_frontend_workers_enqueued_requests_total Total number of requests enqueued by each query frontend worker (regardless of the result), labeled by scheduler address.
# TYPE cortex_query_frontend_workers_enqueued_requests_total counter
cortex_query_frontend_workers_enqueued_requests_total{scheduler_address="%s"} 1
`, f.cfg.SchedulerAddress)
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), "cortex_query_frontend_workers_enqueued_requests_total"))

// Assert on cortex_query_frontend_enqueue_duration_seconds.
metricsMap, err = metrics.NewMetricFamilyMapFromGatherer(reg)
require.NoError(t, err)
Expand All @@ -195,10 +177,7 @@ func TestFrontend_ShouldTrackPerRequestMetrics(t *testing.T) {

// Manually remove the address, check that label is removed.
f.schedulerWorkers.InstanceRemoved(servicediscovery.Instance{Address: f.cfg.SchedulerAddress, InUse: true})
expectedMetrics = ``
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), "cortex_query_frontend_workers_enqueued_requests_total"))

// Assert on cortex_query_frontend_enqueue_duration_seconds (ensure the series is removed).
metricsMap, err = metrics.NewMetricFamilyMapFromGatherer(reg)
require.NoError(t, err)
assert.Empty(t, metricsMap["cortex_query_frontend_enqueue_duration_seconds"])
Expand Down

0 comments on commit f9767a9

Please sign in to comment.