Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

apply multidimensional query request queuing: supply queue dimensions from frontend & utilize in scheduler #6772

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
62f9994
add query component hints to prometheus range and instant request pro…
francoposa Nov 28, 2023
31eff75
linting
francoposa Nov 29, 2023
bebed29
working middleware to determine query component hints in query frontend
francoposa Nov 29, 2023
85c50aa
fixing config descriptor
francoposa Nov 29, 2023
79b0ed2
receiver naming for linting
francoposa Nov 29, 2023
ee94da1
error handling fix
francoposa Nov 29, 2023
ad0fe16
try to avoid copying and data race
francoposa Nov 29, 2023
e815371
attempting integration test fix
francoposa Nov 29, 2023
5be511e
re-introducing request updating to check integration tests
francoposa Nov 29, 2023
692584a
trying again
francoposa Nov 29, 2023
14b593f
trying more stuff
francoposa Nov 29, 2023
534b424
fix data race by moving infra hints out from under query hints
francoposa Nov 29, 2023
ee5e35b
restore updated versions of previous work without data race
francoposa Nov 30, 2023
cb8db34
working version of encoding and passing queue dimensions from prometh…
francoposa Nov 30, 2023
6e956a4
license for linting
francoposa Nov 30, 2023
48a7fc1
remove middleware approach
francoposa Dec 1, 2023
1111a31
remove autocreated import alias
francoposa Dec 1, 2023
ffc3f58
move SchedulerRequest to queue package
francoposa Dec 1, 2023
8aba84b
enqueuing with the second queue dimension; minimal updates to tests t…
francoposa Dec 2, 2023
6da584b
minor allocation reduction; udpate benchmarking test to create larger…
francoposa Dec 4, 2023
74ae973
move max queue item count up into tenant queues
francoposa Dec 4, 2023
0b996c0
linting
francoposa Dec 4, 2023
e2be9aa
remove unused config
francoposa Dec 4, 2023
62c11ef
WIP PR feedback: splitting v1 and v2 limits, updating adapter to stri…
francoposa Dec 5, 2023
54c1841
improve adapter logic to handle labels and cardinality queries; bette…
francoposa Dec 6, 2023
bb26a03
remove additional dimensions length check
francoposa Dec 6, 2023
decb05d
remove unused prometheus codec
francoposa Dec 6, 2023
402737a
test for queue broker respecting tenant request limit with subqueues
francoposa Dec 6, 2023
9aca35b
update max tenant queue size test, test for failure to enqueue in ten…
francoposa Dec 11, 2023
52c810d
add feature flag to frontend and scheduler for additional queue dimen…
francoposa Dec 12, 2023
223620c
gate on feature flag
francoposa Dec 12, 2023
51a7863
gate on feature flag
francoposa Dec 12, 2023
d662a33
add log warn for unparseable request type; add config option to front…
francoposa Dec 12, 2023
aad1448
add and illustrate test cases for all query time ranges with respect …
francoposa Dec 12, 2023
6a15ef0
CHANGELOG
francoposa Dec 12, 2023
b65b5c8
PR feedback
francoposa Dec 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions development/mimir-microservices-mode/config/mimir.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ frontend:
parallelize_shardable_queries: true
align_queries_with_step: true
cache_results: true
additional_query_queue_dimensions_enabled: true

# Uncomment when using "dns" service discovery mode for query-scheduler.
# scheduler_address: "query-scheduler:9011"
Expand All @@ -156,6 +157,7 @@ frontend_worker:
query_scheduler:
# Change to "dns" to switch to query-scheduler DNS-based service discovery.
service_discovery_mode: "ring"
additional_query_queue_dimensions_enabled: true

limits:
# Limit max query time range to 31d
Expand Down
118 changes: 104 additions & 14 deletions pkg/frontend/v2/frontend_scheduler_adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,17 @@ func TestExtractAdditionalQueueDimensions(t *testing.T) {
now := time.Now()
charleskorn marked this conversation as resolved.
Show resolved Hide resolved

t.Run("query with start after query store after is ingesters only", func(t *testing.T) {
// query ingesters: |------------------
// query store-gateways: ------------------|
// query time range: |----|

ctx := user.InjectOrgID(context.Background(), "tenant-0")
startAfterQueryStoreAfter := now.Add(-adapter.cfg.QueryStoreAfter).Add(1 * time.Minute)
start := now.Add(-adapter.cfg.QueryStoreAfter).Add(1 * time.Minute)
end := now

rangeHTTPReq := makeRangeHTTPRequest(ctx, startAfterQueryStoreAfter, end, 60)
instantHTTPReq := makeInstantHTTPRequest(ctx, startAfterQueryStoreAfter)
labelValuesHTTPReq := makeLabelValuesHTTPRequest(ctx, startAfterQueryStoreAfter, end)
rangeHTTPReq := makeRangeHTTPRequest(ctx, start, end, 60)
instantHTTPReq := makeInstantHTTPRequest(ctx, start)
labelValuesHTTPReq := makeLabelValuesHTTPRequest(ctx, start, end)

reqs := []*http.Request{rangeHTTPReq, instantHTTPReq, labelValuesHTTPReq}

Expand All @@ -74,13 +78,17 @@ func TestExtractAdditionalQueueDimensions(t *testing.T) {
})

t.Run("query with end before query ingesters within is store-gateways only", func(t *testing.T) {
// query ingesters: |------------------
// query store-gateways: ------------------|
// query time range: |----|

ctx := user.InjectOrgID(context.Background(), "tenant-0")
startBeforeQueryIngestersWithin := now.Add(-adapter.limits.QueryIngestersWithin("")).Add(-1 * time.Hour)
endBeforeQueryIngestersWithin := now.Add(-adapter.limits.QueryIngestersWithin("")).Add(-1 * time.Minute)
start := now.Add(-adapter.limits.QueryIngestersWithin("")).Add(-1 * time.Hour)
end := now.Add(-adapter.limits.QueryIngestersWithin("")).Add(-1 * time.Minute)

rangeHTTPReq := makeRangeHTTPRequest(ctx, startBeforeQueryIngestersWithin, endBeforeQueryIngestersWithin, 60)
instantHTTPReq := makeInstantHTTPRequest(ctx, startBeforeQueryIngestersWithin)
labelValuesHTTPReq := makeLabelValuesHTTPRequest(ctx, startBeforeQueryIngestersWithin, endBeforeQueryIngestersWithin)
rangeHTTPReq := makeRangeHTTPRequest(ctx, start, end, 60)
instantHTTPReq := makeInstantHTTPRequest(ctx, start)
labelValuesHTTPReq := makeLabelValuesHTTPRequest(ctx, start, end)

reqs := []*http.Request{rangeHTTPReq, instantHTTPReq, labelValuesHTTPReq}

Expand All @@ -96,14 +104,96 @@ func TestExtractAdditionalQueueDimensions(t *testing.T) {
}
})

t.Run("query with time between query ingesters within and query store after is ingesters and store-gateways", func(t *testing.T) {
t.Run("query with start before query ingesters and end after query store is ingesters-and-store-gateways", func(t *testing.T) {
// query ingesters: |------------------
// query store-gateways: ------------------|
// query time range: |--------------|

ctx := user.InjectOrgID(context.Background(), "tenant-0")
start := now.Add(-adapter.limits.QueryIngestersWithin("")).Add(-1 * time.Minute)
end := now.Add(-adapter.cfg.QueryStoreAfter).Add(1 * time.Minute)

rangeHTTPReq := makeRangeHTTPRequest(ctx, start, end, 60)
labelValuesHTTPReq := makeLabelValuesHTTPRequest(ctx, start, end)

reqs := []*http.Request{rangeHTTPReq, labelValuesHTTPReq}

for _, req := range reqs {
httpgrpcReq, err := httpgrpc.FromHTTPRequest(req)
require.NoError(t, err)

additionalQueueDimensions, err := adapter.extractAdditionalQueueDimensions(
ctx, httpgrpcReq, now,
)
require.NoError(t, err)
require.Equal(t, []string{ShouldQueryIngestersAndStoreGatewayQueueDimension}, additionalQueueDimensions)
}
})

t.Run("query with start before query ingesters and end before query store is ingesters-and-store-gateways", func(t *testing.T) {
// query ingesters: |------------------
// query store-gateways: ------------------|
// query time range: |---------|

ctx := user.InjectOrgID(context.Background(), "tenant-0")
start := now.Add(-adapter.limits.QueryIngestersWithin("")).Add(-1 * time.Minute)
end := now.Add(-adapter.cfg.QueryStoreAfter).Add(-1 * time.Minute)

rangeHTTPReq := makeRangeHTTPRequest(ctx, start, end, 60)
labelValuesHTTPReq := makeLabelValuesHTTPRequest(ctx, start, end)

reqs := []*http.Request{rangeHTTPReq, labelValuesHTTPReq}

for _, req := range reqs {
httpgrpcReq, err := httpgrpc.FromHTTPRequest(req)
require.NoError(t, err)

additionalQueueDimensions, err := adapter.extractAdditionalQueueDimensions(
ctx, httpgrpcReq, now,
)
require.NoError(t, err)
require.Equal(t, []string{ShouldQueryIngestersAndStoreGatewayQueueDimension}, additionalQueueDimensions)
}
})

t.Run("query with start after query ingesters and end after query store is ingesters-and-store-gateways", func(t *testing.T) {
// query ingesters: |------------------
// query store-gateways: ------------------|
// query time range: |---------|

ctx := user.InjectOrgID(context.Background(), "tenant-0")
start := now.Add(-adapter.limits.QueryIngestersWithin("")).Add(-1 * time.Minute)
end := now.Add(-adapter.cfg.QueryStoreAfter).Add(-1 * time.Minute)

rangeHTTPReq := makeRangeHTTPRequest(ctx, start, end, 60)
labelValuesHTTPReq := makeLabelValuesHTTPRequest(ctx, start, end)

reqs := []*http.Request{rangeHTTPReq, labelValuesHTTPReq}

for _, req := range reqs {
httpgrpcReq, err := httpgrpc.FromHTTPRequest(req)
require.NoError(t, err)

additionalQueueDimensions, err := adapter.extractAdditionalQueueDimensions(
ctx, httpgrpcReq, now,
)
require.NoError(t, err)
require.Equal(t, []string{ShouldQueryIngestersAndStoreGatewayQueueDimension}, additionalQueueDimensions)
}
})

t.Run("query with start and end between query ingesters and query store is ingesters-and-store-gateways", func(t *testing.T) {
// query ingesters: |------------------
// query store-gateways: ------------------|
// query time range: |-----|

ctx := user.InjectOrgID(context.Background(), "tenant-0")
startAfterQueryIngestersWithinAndBeforeQueryStoreAfter := now.Add(-adapter.limits.QueryIngestersWithin("")).Add(30 * time.Minute)
start := now.Add(-adapter.limits.QueryIngestersWithin("")).Add(30 * time.Minute)
end := now

rangeHTTPReq := makeRangeHTTPRequest(ctx, startAfterQueryIngestersWithinAndBeforeQueryStoreAfter, end, 60)
instantHTTPReq := makeInstantHTTPRequest(ctx, startAfterQueryIngestersWithinAndBeforeQueryStoreAfter)
labelValuesHTTPReq := makeLabelValuesHTTPRequest(ctx, startAfterQueryIngestersWithinAndBeforeQueryStoreAfter, end)
rangeHTTPReq := makeRangeHTTPRequest(ctx, start, end, 60)
instantHTTPReq := makeInstantHTTPRequest(ctx, start)
labelValuesHTTPReq := makeLabelValuesHTTPRequest(ctx, start, end)

reqs := []*http.Request{rangeHTTPReq, instantHTTPReq, labelValuesHTTPReq}

Expand Down