Skip to content

Commit

Permalink
query scheduler test for multidimensional queueing effectiveness scen…
Browse files Browse the repository at this point in the history
…arios (#7162)
  • Loading branch information
francoposa committed Jan 24, 2024
1 parent a7ce333 commit 295b208
Show file tree
Hide file tree
Showing 2 changed files with 237 additions and 45 deletions.
267 changes: 230 additions & 37 deletions pkg/scheduler/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"fmt"
"math/rand"
"strconv"
"strings"
"sync"
"testing"
"time"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
Expand All @@ -33,22 +35,35 @@ func TestMain(m *testing.M) {

// cannot import constants from frontend/v2 due to import cycle,
// but content of the strings should not matter as much as the number of options
const ingesterQueueDimension = "ingester"
const storeGatewayQueueDimension = "store-gateway"
const ingesterAndStoreGatewayQueueDimension = "ingester-and-store-gateway"

var secondQueueDimensionOptions = []string{
"ingester",
"store-gateway",
"ingester-and-store-gateway",
ingesterQueueDimension,
storeGatewayQueueDimension,
ingesterAndStoreGatewayQueueDimension,
}

func randAdditionalQueueDimension() []string {
idx := rand.Intn(len(secondQueueDimensionOptions) + 1)
func randAdditionalQueueDimension(allowEmpty bool) []string {
maxIdx := len(secondQueueDimensionOptions)
if allowEmpty {
maxIdx++
}

idx := rand.Intn(maxIdx)
if idx == len(secondQueueDimensionOptions) {
// randomly don't add a second queue dimension at all to ensure the items still get dequeued
return nil
}
return secondQueueDimensionOptions[idx : idx+1]
}

func makeSchedulerRequest(tenantID string) *SchedulerRequest {
// makeSchedulerRequest is intended to create a query request with a nontrivial size.
//
// When running benchmarks for memory usage, we want a relatively representative request size.
// The size of the requests in a queue of nontrivial depth should significantly outweigh the memory
// used by the queue mechanics, in order get a more meaningful % delta between competing queue implementations.
func makeSchedulerRequest(tenantID string, additionalQueueDimensions []string) *SchedulerRequest {
return &SchedulerRequest{
Ctx: context.Background(),
FrontendAddress: "http://query-frontend:8007",
Expand All @@ -63,10 +78,147 @@ func makeSchedulerRequest(tenantID string) *SchedulerRequest {
},
Url: "/prometheus/api/v1/query_range?end=1701720000&query=rate%28go_goroutines%7Bcluster%3D%22docker-compose-local%22%2Cjob%3D%22mimir-microservices-mode%2Fquery-scheduler%22%2Cnamespace%3D%22mimir-microservices-mode%22%7D%5B10m15s%5D%29&start=1701648000&step=60",
},
AdditionalQueueDimensions: randAdditionalQueueDimension(),
AdditionalQueueDimensions: additionalQueueDimensions,
EnqueueTime: time.Now(),
}
}

// TestMultiDimensionalQueueFairnessSlowConsumerEffects emulates a simplified queue slowdown scenario
// which the scheduler's additional queue dimensions features are intended to solve for.
//
// In this scenario, one category of queue item causes the queue consumer to slow down, introducing a
// significant delay while the queue consumer processes it and before the consumer can dequeue the next item.
// This emulates a situation where one of the query components - the ingesters or store-gateways - is under load.
//
// If queue items belonging to the slow category are in the same queue in front of the normal queue items,
// the normal queue items must wait for all slow queue items to be cleared before they can be serviced.
// In this way, the degraded performance of the slow query component equally degrades the performance of the
// queries which *could* be serviced quickly, but are waiting behind the slow queries in the queue.
//
// With the additional queue dimensions enabled, the queues are split by which query component the query will utilize.
// The queue broker then round-robins between the split queues, which has the effect of alternating between
// dequeuing the slow queries and normal queries rather than blocking normal queries behind slow queries.
func TestMultiDimensionalQueueFairnessSlowConsumerEffects(t *testing.T) {
promRegistry := prometheus.NewPedanticRegistry()

maxQueriersPerTenant := 0 // disable shuffle sharding
forgetQuerierDelay := time.Duration(0)
maxOutstandingRequestsPerTenant := 1000

totalRequests := 100
numTenants := 1
numProducers := 10
numConsumers := 1

normalQueueDimension := "normal-request"
slowConsumerLatency := 20 * time.Millisecond
slowConsumerQueueDimension := "slow-request"
normalQueueDimensionFunc := func() []string { return []string{normalQueueDimension} }
slowQueueDimensionFunc := func() []string { return []string{slowConsumerQueueDimension} }

additionalQueueDimensionsEnabledCases := []bool{false, true}
queueDurationTotals := map[bool]map[string]float64{
false: {normalQueueDimension: 0.0, slowConsumerQueueDimension: 0.0},
true: {normalQueueDimension: 0.0, slowConsumerQueueDimension: 0.0},
}

for _, additionalQueueDimensionsEnabled := range additionalQueueDimensionsEnabledCases {

// Scheduler code uses a histogram for queue duration, but a counter is a more direct metric
// for this test, as we are concerned with the total or average wait time for all queue items.
// Prometheus histograms also lack support for test assertions via prometheus/testutil.
queueDuration := promauto.With(promRegistry).NewCounterVec(prometheus.CounterOpts{
Name: "test_query_scheduler_queue_duration_total_seconds",
Help: "[test] total time spent by items in queue before getting picked up by a consumer",
}, []string{"additional_queue_dimensions"})

queue := NewRequestQueue(
log.NewNopLogger(),
maxOutstandingRequestsPerTenant,
additionalQueueDimensionsEnabled,
forgetQuerierDelay,
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"tenant"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"tenant"}),
promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}),
)

ctx := context.Background()
require.NoError(t, queue.starting(ctx))
t.Cleanup(func() {
require.NoError(t, queue.stop(nil))
})

// fill queue first with the slow queries, then the normal queries
for _, queueDimensionFunc := range []func() []string{slowQueueDimensionFunc, normalQueueDimensionFunc} {
startProducersChan := make(chan struct{})
producersErrGroup, _ := errgroup.WithContext(ctx)

runProducer := runQueueProducerIters(
queue, maxQueriersPerTenant, totalRequests/2, numProducers, numTenants, startProducersChan, queueDimensionFunc,
)
for producerIdx := 0; producerIdx < numProducers; producerIdx++ {
producerIdx := producerIdx
producersErrGroup.Go(func() error {
return runProducer(producerIdx)
})
}
close(startProducersChan)
err := producersErrGroup.Wait()
require.NoError(t, err)
}

// emulate delay when consuming the slow queries
consumeFunc := func(request Request) error {
schedulerRequest := request.(*SchedulerRequest)
if schedulerRequest.AdditionalQueueDimensions[0] == slowConsumerQueueDimension {
time.Sleep(slowConsumerLatency)
}

queueTime := time.Since(schedulerRequest.EnqueueTime)
additionalQueueDimensionLabels := strings.Join(schedulerRequest.AdditionalQueueDimensions, ":")
queueDuration.With(prometheus.Labels{"additional_queue_dimensions": additionalQueueDimensionLabels}).Add(queueTime.Seconds())
return nil
}

// consume queries
queueConsumerErrGroup, ctx := errgroup.WithContext(ctx)
startConsumersChan := make(chan struct{})
runConsumer := runQueueConsumerIters(ctx, queue, totalRequests, numConsumers, startConsumersChan, consumeFunc)

for consumerIdx := 0; consumerIdx < numConsumers; consumerIdx++ {
consumerIdx := consumerIdx
queueConsumerErrGroup.Go(func() error {
return runConsumer(consumerIdx)
})
}

close(startConsumersChan)
err := queueConsumerErrGroup.Wait()
require.NoError(t, err)

// record total queue duration by queue dimensions and whether the queue splitting was enabled
for _, queueDimension := range []string{normalQueueDimension, slowConsumerQueueDimension} {
queueDurationTotals[additionalQueueDimensionsEnabled][queueDimension] = promtest.ToFloat64(
queueDuration.With(prometheus.Labels{"additional_queue_dimensions": queueDimension}),
)
}

promRegistry.Unregister(queueDuration)
}

// total or average time in queue for a normal queue item should be roughly cut in half
// when queue splitting is enabled, as the average normal queue item waits behind
// half of the slow queue items, instead of waiting behind all the slow queue items.
expected := queueDurationTotals[false][normalQueueDimension] / 2
actual := queueDurationTotals[true][normalQueueDimension]
// some variance allowed due to actual time processing needed beyond the slow consumer delay;
// variance is also a function of the number of consumers and the consumer delay chosen.
// variance can be tighter if the test runs longer but there is a tradeoff for testing and CI speed
delta := expected * 0.10
require.InDelta(t, expected, actual, delta)

}

func BenchmarkConcurrentQueueOperations(b *testing.B) {
maxQueriersPerTenant := 0 // disable shuffle sharding
forgetQuerierDelay := time.Duration(0)
Expand Down Expand Up @@ -101,7 +253,9 @@ func BenchmarkConcurrentQueueOperations(b *testing.B) {
require.NoError(b, queue.stop(nil))
})

runProducer := runQueueProducerForBenchmark(b, queue, maxQueriersPerTenant, numProducers, numTenants, startSignalChan)
runProducer := runQueueProducerIters(
queue, maxQueriersPerTenant, b.N, numProducers, numTenants, startSignalChan, nil,
)

for producerIdx := 0; producerIdx < numProducers; producerIdx++ {
producerIdx := producerIdx
Expand All @@ -110,7 +264,7 @@ func BenchmarkConcurrentQueueOperations(b *testing.B) {
})
}

runConsumer := runQueueConsumerForBenchmark(ctx, b, queue, numConsumers, startSignalChan)
runConsumer := runQueueConsumerIters(ctx, queue, b.N, numConsumers, startSignalChan, nil)

for consumerIdx := 0; consumerIdx < numConsumers; consumerIdx++ {
consumerIdx := consumerIdx
Expand All @@ -133,9 +287,9 @@ func BenchmarkConcurrentQueueOperations(b *testing.B) {
}
}

func queueActorIterationCount(benchmarkIters int, numActors int, actorIdx int) int {
actorIters := benchmarkIters / numActors
remainderIters := benchmarkIters % numActors
func queueActorIterationCount(totalIters int, numActors int, actorIdx int) int {
actorIters := totalIters / numActors
remainderIters := totalIters % numActors

if remainderIters == 0 {
// iterations are spread equally across actors without a remainder
Expand All @@ -152,31 +306,25 @@ func queueActorIterationCount(benchmarkIters int, numActors int, actorIdx int) i
return actorIters
}

func runQueueProducerForBenchmark(b *testing.B, queue *RequestQueue, maxQueriersPerTenant int, numProducers int, numTenants int, start chan struct{}) func(producerIdx int) error {
func runQueueProducerIters(
queue *RequestQueue,
maxQueriersPerTenant int,
totalIters int,
numProducers int,
numTenants int,
start chan struct{},
additionalQueueDimensionFunc func() []string,
) func(producerIdx int) error {
return func(producerIdx int) error {
producerIters := queueActorIterationCount(b.N, numProducers, producerIdx)
producerIters := queueActorIterationCount(totalIters, numProducers, producerIdx)
tenantID := producerIdx % numTenants
tenantIDStr := strconv.Itoa(tenantID)
<-start

for i := 0; i < producerIters; i++ {
for {
// when running this benchmark for memory usage comparison,
// we want to have a relatively representative size of request
// in order not to skew the % delta between queue implementations.
// Unless the request starts to get copied, the size of the requests in the queue
// should significantly outweigh the memory used to implement the queue mechanics.
req := makeSchedulerRequest(tenantIDStr)
//req.AdditionalQueueDimensions = randAdditionalQueueDimension()
err := queue.EnqueueRequestToDispatcher(tenantIDStr, req, maxQueriersPerTenant, func() {})
if err == nil {
break
}

// Keep retrying if we've hit the max queue length, otherwise give up immediately.
if !errors.Is(err, ErrTooManyRequests) {
return err
}
err := queueProduce(queue, maxQueriersPerTenant, tenantIDStr, additionalQueueDimensionFunc)
if err != nil {
return err
}

tenantID = (tenantID + 1) % numTenants
Expand All @@ -186,9 +334,37 @@ func runQueueProducerForBenchmark(b *testing.B, queue *RequestQueue, maxQueriers
}
}

func runQueueConsumerForBenchmark(ctx context.Context, b *testing.B, queue *RequestQueue, numConsumers int, start chan struct{}) func(consumerIdx int) error {
func queueProduce(
queue *RequestQueue, maxQueriersPerTenant int, tenantID string, additionalQueueDimensionFunc func() []string,
) error {
var additionalQueueDimensions []string
if additionalQueueDimensionFunc != nil {
additionalQueueDimensions = additionalQueueDimensionFunc()
}
req := makeSchedulerRequest(tenantID, additionalQueueDimensions)
for {
err := queue.EnqueueRequestToDispatcher(tenantID, req, maxQueriersPerTenant, func() {})
if err == nil {
break
}
// Keep retrying if we've hit the max queue length, otherwise give up immediately.
if !errors.Is(err, ErrTooManyRequests) {
return err
}
}
return nil
}

func runQueueConsumerIters(
ctx context.Context,
queue *RequestQueue,
totalIters int,
numConsumers int,
start chan struct{},
consumeFunc consumeRequest,
) func(consumerIdx int) error {
return func(consumerIdx int) error {
consumerIters := queueActorIterationCount(b.N, numConsumers, consumerIdx)
consumerIters := queueActorIterationCount(totalIters, numConsumers, consumerIdx)
lastTenantIndex := FirstUser()
querierID := fmt.Sprintf("consumer-%v", consumerIdx)
queue.RegisterQuerierConnection(querierID)
Expand All @@ -197,7 +373,7 @@ func runQueueConsumerForBenchmark(ctx context.Context, b *testing.B, queue *Requ
<-start

for i := 0; i < consumerIters; i++ {
_, idx, err := queue.GetNextRequestForQuerier(ctx, lastTenantIndex, querierID)
idx, err := queueConsume(ctx, queue, querierID, lastTenantIndex, consumeFunc)
if err != nil {
return err
}
Expand All @@ -209,6 +385,23 @@ func runQueueConsumerForBenchmark(ctx context.Context, b *testing.B, queue *Requ
}
}

type consumeRequest func(request Request) error

func queueConsume(
ctx context.Context, queue *RequestQueue, querierID string, lastTenantIndex UserIndex, consumeFunc consumeRequest,
) (UserIndex, error) {
request, idx, err := queue.GetNextRequestForQuerier(ctx, lastTenantIndex, querierID)
if err != nil {
return lastTenantIndex, err
}
lastTenantIndex = idx

if consumeFunc != nil {
err = consumeFunc(request)
}
return lastTenantIndex, err
}

func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBecauseQuerierHasBeenForgotten(t *testing.T) {
const forgetDelay = 3 * time.Second

Expand Down Expand Up @@ -249,7 +442,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBe
req := &SchedulerRequest{
Ctx: context.Background(),
Request: &httpgrpc.HTTPRequest{Method: "GET", Url: "/hello"},
AdditionalQueueDimensions: randAdditionalQueueDimension(),
AdditionalQueueDimensions: randAdditionalQueueDimension(true),
}
require.NoError(t, queue.EnqueueRequestToDispatcher("user-1", req, 1, nil))

Expand Down Expand Up @@ -350,7 +543,7 @@ func TestRequestQueue_tryDispatchRequestToQuerier_ShouldReEnqueueAfterFailedSend
req := &SchedulerRequest{
Ctx: context.Background(),
Request: &httpgrpc.HTTPRequest{Method: "GET", Url: "/hello"},
AdditionalQueueDimensions: randAdditionalQueueDimension(),
AdditionalQueueDimensions: randAdditionalQueueDimension(true),
}
tr := tenantRequest{
tenantID: TenantID("tenant-1"),
Expand Down
Loading

0 comments on commit 295b208

Please sign in to comment.