diff --git a/pkg/scheduler/queue/queue_test.go b/pkg/scheduler/queue/queue_test.go index 9233a3b792c..33ae5330892 100644 --- a/pkg/scheduler/queue/queue_test.go +++ b/pkg/scheduler/queue/queue_test.go @@ -11,6 +11,7 @@ import ( "fmt" "math/rand" "strconv" + "strings" "sync" "testing" "time" @@ -20,6 +21,7 @@ import ( "github.com/grafana/dskit/services" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" @@ -33,22 +35,35 @@ func TestMain(m *testing.M) { // cannot import constants from frontend/v2 due to import cycle, // but content of the strings should not matter as much as the number of options +const ingesterQueueDimension = "ingester" +const storeGatewayQueueDimension = "store-gateway" +const ingesterAndStoreGatewayQueueDimension = "ingester-and-store-gateway" + var secondQueueDimensionOptions = []string{ - "ingester", - "store-gateway", - "ingester-and-store-gateway", + ingesterQueueDimension, + storeGatewayQueueDimension, + ingesterAndStoreGatewayQueueDimension, } -func randAdditionalQueueDimension() []string { - idx := rand.Intn(len(secondQueueDimensionOptions) + 1) +func randAdditionalQueueDimension(allowEmpty bool) []string { + maxIdx := len(secondQueueDimensionOptions) + if allowEmpty { + maxIdx++ + } + + idx := rand.Intn(maxIdx) if idx == len(secondQueueDimensionOptions) { - // randomly don't add a second queue dimension at all to ensure the items still get dequeued return nil } return secondQueueDimensionOptions[idx : idx+1] } -func makeSchedulerRequest(tenantID string) *SchedulerRequest { +// makeSchedulerRequest is intended to create a query request with a nontrivial size. +// +// When running benchmarks for memory usage, we want a relatively representative request size. +// The size of the requests in a queue of nontrivial depth should significantly outweigh the memory +// used by the queue mechanics, in order get a more meaningful % delta between competing queue implementations. +func makeSchedulerRequest(tenantID string, additionalQueueDimensions []string) *SchedulerRequest { return &SchedulerRequest{ Ctx: context.Background(), FrontendAddress: "http://query-frontend:8007", @@ -63,10 +78,147 @@ func makeSchedulerRequest(tenantID string) *SchedulerRequest { }, Url: "/prometheus/api/v1/query_range?end=1701720000&query=rate%28go_goroutines%7Bcluster%3D%22docker-compose-local%22%2Cjob%3D%22mimir-microservices-mode%2Fquery-scheduler%22%2Cnamespace%3D%22mimir-microservices-mode%22%7D%5B10m15s%5D%29&start=1701648000&step=60", }, - AdditionalQueueDimensions: randAdditionalQueueDimension(), + AdditionalQueueDimensions: additionalQueueDimensions, + EnqueueTime: time.Now(), } } +// TestMultiDimensionalQueueFairnessSlowConsumerEffects emulates a simplified queue slowdown scenario +// which the scheduler's additional queue dimensions features are intended to solve for. +// +// In this scenario, one category of queue item causes the queue consumer to slow down, introducing a +// significant delay while the queue consumer processes it and before the consumer can dequeue the next item. +// This emulates a situation where one of the query components - the ingesters or store-gateways - is under load. +// +// If queue items belonging to the slow category are in the same queue in front of the normal queue items, +// the normal queue items must wait for all slow queue items to be cleared before they can be serviced. +// In this way, the degraded performance of the slow query component equally degrades the performance of the +// queries which *could* be serviced quickly, but are waiting behind the slow queries in the queue. +// +// With the additional queue dimensions enabled, the queues are split by which query component the query will utilize. +// The queue broker then round-robins between the split queues, which has the effect of alternating between +// dequeuing the slow queries and normal queries rather than blocking normal queries behind slow queries. +func TestMultiDimensionalQueueFairnessSlowConsumerEffects(t *testing.T) { + promRegistry := prometheus.NewPedanticRegistry() + + maxQueriersPerTenant := 0 // disable shuffle sharding + forgetQuerierDelay := time.Duration(0) + maxOutstandingRequestsPerTenant := 1000 + + totalRequests := 100 + numTenants := 1 + numProducers := 10 + numConsumers := 1 + + normalQueueDimension := "normal-request" + slowConsumerLatency := 20 * time.Millisecond + slowConsumerQueueDimension := "slow-request" + normalQueueDimensionFunc := func() []string { return []string{normalQueueDimension} } + slowQueueDimensionFunc := func() []string { return []string{slowConsumerQueueDimension} } + + additionalQueueDimensionsEnabledCases := []bool{false, true} + queueDurationTotals := map[bool]map[string]float64{ + false: {normalQueueDimension: 0.0, slowConsumerQueueDimension: 0.0}, + true: {normalQueueDimension: 0.0, slowConsumerQueueDimension: 0.0}, + } + + for _, additionalQueueDimensionsEnabled := range additionalQueueDimensionsEnabledCases { + + // Scheduler code uses a histogram for queue duration, but a counter is a more direct metric + // for this test, as we are concerned with the total or average wait time for all queue items. + // Prometheus histograms also lack support for test assertions via prometheus/testutil. + queueDuration := promauto.With(promRegistry).NewCounterVec(prometheus.CounterOpts{ + Name: "test_query_scheduler_queue_duration_total_seconds", + Help: "[test] total time spent by items in queue before getting picked up by a consumer", + }, []string{"additional_queue_dimensions"}) + + queue := NewRequestQueue( + log.NewNopLogger(), + maxOutstandingRequestsPerTenant, + additionalQueueDimensionsEnabled, + forgetQuerierDelay, + promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"tenant"}), + promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"tenant"}), + promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), + ) + + ctx := context.Background() + require.NoError(t, queue.starting(ctx)) + t.Cleanup(func() { + require.NoError(t, queue.stop(nil)) + }) + + // fill queue first with the slow queries, then the normal queries + for _, queueDimensionFunc := range []func() []string{slowQueueDimensionFunc, normalQueueDimensionFunc} { + startProducersChan := make(chan struct{}) + producersErrGroup, _ := errgroup.WithContext(ctx) + + runProducer := runQueueProducerIters( + queue, maxQueriersPerTenant, totalRequests/2, numProducers, numTenants, startProducersChan, queueDimensionFunc, + ) + for producerIdx := 0; producerIdx < numProducers; producerIdx++ { + producerIdx := producerIdx + producersErrGroup.Go(func() error { + return runProducer(producerIdx) + }) + } + close(startProducersChan) + err := producersErrGroup.Wait() + require.NoError(t, err) + } + + // emulate delay when consuming the slow queries + consumeFunc := func(request Request) error { + schedulerRequest := request.(*SchedulerRequest) + if schedulerRequest.AdditionalQueueDimensions[0] == slowConsumerQueueDimension { + time.Sleep(slowConsumerLatency) + } + + queueTime := time.Since(schedulerRequest.EnqueueTime) + additionalQueueDimensionLabels := strings.Join(schedulerRequest.AdditionalQueueDimensions, ":") + queueDuration.With(prometheus.Labels{"additional_queue_dimensions": additionalQueueDimensionLabels}).Add(queueTime.Seconds()) + return nil + } + + // consume queries + queueConsumerErrGroup, ctx := errgroup.WithContext(ctx) + startConsumersChan := make(chan struct{}) + runConsumer := runQueueConsumerIters(ctx, queue, totalRequests, numConsumers, startConsumersChan, consumeFunc) + + for consumerIdx := 0; consumerIdx < numConsumers; consumerIdx++ { + consumerIdx := consumerIdx + queueConsumerErrGroup.Go(func() error { + return runConsumer(consumerIdx) + }) + } + + close(startConsumersChan) + err := queueConsumerErrGroup.Wait() + require.NoError(t, err) + + // record total queue duration by queue dimensions and whether the queue splitting was enabled + for _, queueDimension := range []string{normalQueueDimension, slowConsumerQueueDimension} { + queueDurationTotals[additionalQueueDimensionsEnabled][queueDimension] = promtest.ToFloat64( + queueDuration.With(prometheus.Labels{"additional_queue_dimensions": queueDimension}), + ) + } + + promRegistry.Unregister(queueDuration) + } + + // total or average time in queue for a normal queue item should be roughly cut in half + // when queue splitting is enabled, as the average normal queue item waits behind + // half of the slow queue items, instead of waiting behind all the slow queue items. + expected := queueDurationTotals[false][normalQueueDimension] / 2 + actual := queueDurationTotals[true][normalQueueDimension] + // some variance allowed due to actual time processing needed beyond the slow consumer delay; + // variance is also a function of the number of consumers and the consumer delay chosen. + // variance can be tighter if the test runs longer but there is a tradeoff for testing and CI speed + delta := expected * 0.10 + require.InDelta(t, expected, actual, delta) + +} + func BenchmarkConcurrentQueueOperations(b *testing.B) { maxQueriersPerTenant := 0 // disable shuffle sharding forgetQuerierDelay := time.Duration(0) @@ -101,7 +253,9 @@ func BenchmarkConcurrentQueueOperations(b *testing.B) { require.NoError(b, queue.stop(nil)) }) - runProducer := runQueueProducerForBenchmark(b, queue, maxQueriersPerTenant, numProducers, numTenants, startSignalChan) + runProducer := runQueueProducerIters( + queue, maxQueriersPerTenant, b.N, numProducers, numTenants, startSignalChan, nil, + ) for producerIdx := 0; producerIdx < numProducers; producerIdx++ { producerIdx := producerIdx @@ -110,7 +264,7 @@ func BenchmarkConcurrentQueueOperations(b *testing.B) { }) } - runConsumer := runQueueConsumerForBenchmark(ctx, b, queue, numConsumers, startSignalChan) + runConsumer := runQueueConsumerIters(ctx, queue, b.N, numConsumers, startSignalChan, nil) for consumerIdx := 0; consumerIdx < numConsumers; consumerIdx++ { consumerIdx := consumerIdx @@ -133,9 +287,9 @@ func BenchmarkConcurrentQueueOperations(b *testing.B) { } } -func queueActorIterationCount(benchmarkIters int, numActors int, actorIdx int) int { - actorIters := benchmarkIters / numActors - remainderIters := benchmarkIters % numActors +func queueActorIterationCount(totalIters int, numActors int, actorIdx int) int { + actorIters := totalIters / numActors + remainderIters := totalIters % numActors if remainderIters == 0 { // iterations are spread equally across actors without a remainder @@ -152,31 +306,25 @@ func queueActorIterationCount(benchmarkIters int, numActors int, actorIdx int) i return actorIters } -func runQueueProducerForBenchmark(b *testing.B, queue *RequestQueue, maxQueriersPerTenant int, numProducers int, numTenants int, start chan struct{}) func(producerIdx int) error { +func runQueueProducerIters( + queue *RequestQueue, + maxQueriersPerTenant int, + totalIters int, + numProducers int, + numTenants int, + start chan struct{}, + additionalQueueDimensionFunc func() []string, +) func(producerIdx int) error { return func(producerIdx int) error { - producerIters := queueActorIterationCount(b.N, numProducers, producerIdx) + producerIters := queueActorIterationCount(totalIters, numProducers, producerIdx) tenantID := producerIdx % numTenants tenantIDStr := strconv.Itoa(tenantID) <-start for i := 0; i < producerIters; i++ { - for { - // when running this benchmark for memory usage comparison, - // we want to have a relatively representative size of request - // in order not to skew the % delta between queue implementations. - // Unless the request starts to get copied, the size of the requests in the queue - // should significantly outweigh the memory used to implement the queue mechanics. - req := makeSchedulerRequest(tenantIDStr) - //req.AdditionalQueueDimensions = randAdditionalQueueDimension() - err := queue.EnqueueRequestToDispatcher(tenantIDStr, req, maxQueriersPerTenant, func() {}) - if err == nil { - break - } - - // Keep retrying if we've hit the max queue length, otherwise give up immediately. - if !errors.Is(err, ErrTooManyRequests) { - return err - } + err := queueProduce(queue, maxQueriersPerTenant, tenantIDStr, additionalQueueDimensionFunc) + if err != nil { + return err } tenantID = (tenantID + 1) % numTenants @@ -186,9 +334,37 @@ func runQueueProducerForBenchmark(b *testing.B, queue *RequestQueue, maxQueriers } } -func runQueueConsumerForBenchmark(ctx context.Context, b *testing.B, queue *RequestQueue, numConsumers int, start chan struct{}) func(consumerIdx int) error { +func queueProduce( + queue *RequestQueue, maxQueriersPerTenant int, tenantID string, additionalQueueDimensionFunc func() []string, +) error { + var additionalQueueDimensions []string + if additionalQueueDimensionFunc != nil { + additionalQueueDimensions = additionalQueueDimensionFunc() + } + req := makeSchedulerRequest(tenantID, additionalQueueDimensions) + for { + err := queue.EnqueueRequestToDispatcher(tenantID, req, maxQueriersPerTenant, func() {}) + if err == nil { + break + } + // Keep retrying if we've hit the max queue length, otherwise give up immediately. + if !errors.Is(err, ErrTooManyRequests) { + return err + } + } + return nil +} + +func runQueueConsumerIters( + ctx context.Context, + queue *RequestQueue, + totalIters int, + numConsumers int, + start chan struct{}, + consumeFunc consumeRequest, +) func(consumerIdx int) error { return func(consumerIdx int) error { - consumerIters := queueActorIterationCount(b.N, numConsumers, consumerIdx) + consumerIters := queueActorIterationCount(totalIters, numConsumers, consumerIdx) lastTenantIndex := FirstUser() querierID := fmt.Sprintf("consumer-%v", consumerIdx) queue.RegisterQuerierConnection(querierID) @@ -197,7 +373,7 @@ func runQueueConsumerForBenchmark(ctx context.Context, b *testing.B, queue *Requ <-start for i := 0; i < consumerIters; i++ { - _, idx, err := queue.GetNextRequestForQuerier(ctx, lastTenantIndex, querierID) + idx, err := queueConsume(ctx, queue, querierID, lastTenantIndex, consumeFunc) if err != nil { return err } @@ -209,6 +385,23 @@ func runQueueConsumerForBenchmark(ctx context.Context, b *testing.B, queue *Requ } } +type consumeRequest func(request Request) error + +func queueConsume( + ctx context.Context, queue *RequestQueue, querierID string, lastTenantIndex UserIndex, consumeFunc consumeRequest, +) (UserIndex, error) { + request, idx, err := queue.GetNextRequestForQuerier(ctx, lastTenantIndex, querierID) + if err != nil { + return lastTenantIndex, err + } + lastTenantIndex = idx + + if consumeFunc != nil { + err = consumeFunc(request) + } + return lastTenantIndex, err +} + func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBecauseQuerierHasBeenForgotten(t *testing.T) { const forgetDelay = 3 * time.Second @@ -249,7 +442,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBe req := &SchedulerRequest{ Ctx: context.Background(), Request: &httpgrpc.HTTPRequest{Method: "GET", Url: "/hello"}, - AdditionalQueueDimensions: randAdditionalQueueDimension(), + AdditionalQueueDimensions: randAdditionalQueueDimension(true), } require.NoError(t, queue.EnqueueRequestToDispatcher("user-1", req, 1, nil)) @@ -350,7 +543,7 @@ func TestRequestQueue_tryDispatchRequestToQuerier_ShouldReEnqueueAfterFailedSend req := &SchedulerRequest{ Ctx: context.Background(), Request: &httpgrpc.HTTPRequest{Method: "GET", Url: "/hello"}, - AdditionalQueueDimensions: randAdditionalQueueDimension(), + AdditionalQueueDimensions: randAdditionalQueueDimension(true), } tr := tenantRequest{ tenantID: TenantID("tenant-1"), diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index a61f9b1311d..ba1159fdd57 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -135,14 +135,6 @@ func TestSchedulerEnqueueWithCancel(t *testing.T) { verifyNoPendingRequestsLeft(t, scheduler) } -func initQuerierLoop(t *testing.T, querierClient schedulerpb.SchedulerForQuerierClient, querier string) schedulerpb.SchedulerForQuerier_QuerierLoopClient { - querierLoop, err := querierClient.QuerierLoop(context.Background()) - require.NoError(t, err) - require.NoError(t, querierLoop.Send(&schedulerpb.QuerierToScheduler{QuerierID: querier})) - - return querierLoop -} - func TestSchedulerEnqueueByMultipleFrontendsWithCancel(t *testing.T) { scheduler, frontendClient, querierClient := setupScheduler(t, nil) @@ -552,6 +544,13 @@ func initFrontendLoop(t *testing.T, client schedulerpb.SchedulerForFrontendClien return loop } +func initQuerierLoop(t *testing.T, querierClient schedulerpb.SchedulerForQuerierClient, querier string) schedulerpb.SchedulerForQuerier_QuerierLoopClient { + querierLoop, err := querierClient.QuerierLoop(context.Background()) + require.NoError(t, err) + require.NoError(t, querierLoop.Send(&schedulerpb.QuerierToScheduler{QuerierID: querier})) + + return querierLoop +} func frontendToScheduler(t *testing.T, frontendLoop schedulerpb.SchedulerForFrontend_FrontendLoopClient, req *schedulerpb.FrontendToScheduler) { require.NoError(t, frontendLoop.Send(req)) msg, err := frontendLoop.Recv()