Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get rid of -querier.prefer-streaming-chunks-from-ingesters #7639

Merged
merged 6 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -1810,17 +1810,6 @@
"fieldType": "boolean",
"fieldCategory": "advanced"
},
{
"kind": "field",
"name": "prefer_streaming_chunks_from_ingesters",
"required": false,
"desc": "Request ingesters stream chunks. Ingesters will only respond with a stream of chunks if the target ingester supports this, and this preference will be ignored by ingesters that do not support this.",
"fieldValue": null,
"fieldDefaultValue": true,
"fieldFlag": "querier.prefer-streaming-chunks-from-ingesters",
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "prefer_streaming_chunks_from_store_gateways",
Expand Down
2 changes: 0 additions & 2 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1717,8 +1717,6 @@ Usage of ./cmd/mimir/mimir:
If true, when querying ingesters, only the minimum required ingesters required to reach quorum will be queried initially, with other ingesters queried only if needed due to failures from the initial set of ingesters. Enabling this option reduces resource consumption for the happy path at the cost of increased latency for the unhappy path. (default true)
-querier.minimize-ingester-requests-hedging-delay duration
Delay before initiating requests to further ingesters when request minimization is enabled and the initially selected set of ingesters have not all responded. Ignored if -querier.minimize-ingester-requests is not enabled. (default 3s)
-querier.prefer-streaming-chunks-from-ingesters
[experimental] Request ingesters stream chunks. Ingesters will only respond with a stream of chunks if the target ingester supports this, and this preference will be ignored by ingesters that do not support this. (default true)
-querier.prefer-streaming-chunks-from-store-gateways
[experimental] Request store-gateways stream chunks. Store-gateways will only respond with a stream of chunks if the target store-gateway supports this, and this preference will be ignored by store-gateways that do not support this.
-querier.promql-experimental-functions-enabled
Expand Down
1 change: 0 additions & 1 deletion docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ The following features are currently experimental:
- `-ingester.client.circuit-breaker.cooldown-period`
- Querier
- Use of Redis cache backend (`-blocks-storage.bucket-store.metadata-cache.backend=redis`)
- Streaming chunks from ingester to querier (`-querier.prefer-streaming-chunks-from-ingesters`)
- Streaming chunks from store-gateway to querier (`-querier.prefer-streaming-chunks-from-store-gateways`, `-querier.streaming-chunks-per-store-gateway-buffer-size`)
- Limiting queries based on the estimated number of chunks that will be used (`-querier.max-estimated-fetched-chunks-per-query-multiplier`)
- Max concurrency for tenant federated queries (`-tenant-federation.max-concurrent`)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1291,12 +1291,6 @@ store_gateway_client:
# CLI flag: -querier.shuffle-sharding-ingesters-enabled
[shuffle_sharding_ingesters_enabled: <boolean> | default = true]

# (experimental) Request ingesters stream chunks. Ingesters will only respond
# with a stream of chunks if the target ingester supports this, and this
# preference will be ignored by ingesters that do not support this.
# CLI flag: -querier.prefer-streaming-chunks-from-ingesters
[prefer_streaming_chunks_from_ingesters: <boolean> | default = true]

# (experimental) Request store-gateways stream chunks. Store-gateways will only
# respond with a stream of chunks if the target store-gateway supports this, and
# this preference will be ignored by store-gateways that do not support this.
Expand Down
1 change: 1 addition & 0 deletions docs/sources/mimir/release-notes/v2.12.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ The default value of the following CLI flags have been changed:
The following deprecated configuration options are removed in Grafana Mimir 2.12:

- The YAML setting `frontend.cache_unaligned_requests`.
- Experimental CLI flag `-querier.prefer-streaming-chunks-from-ingesters`.

The following configuration options are deprecated and will be removed in Grafana Mimir 2.14:

Expand Down
277 changes: 133 additions & 144 deletions integration/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package integration

import (
"fmt"
"net/http"
"strconv"
"testing"
Expand Down Expand Up @@ -494,114 +493,13 @@ func TestIngesterQuerying(t *testing.T) {

for testName, tc := range testCases {
t.Run(testName, func(t *testing.T) {
for _, streamingEnabled := range []bool{true, false} {
t.Run(fmt.Sprintf("streaming enabled: %v", streamingEnabled), func(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

baseFlags := map[string]string{
"-distributor.ingestion-tenant-shard-size": "0",
"-ingester.ring.heartbeat-period": "1s",
"-querier.prefer-streaming-chunks-from-ingesters": strconv.FormatBool(streamingEnabled),
}

flags := mergeFlags(
BlocksStorageFlags(),
BlocksStorageS3Flags(),
baseFlags,
)

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(consul, minio))

// Start Mimir components.
distributor := e2emimir.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags)
ingester := e2emimir.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags)
querier := e2emimir.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags)
require.NoError(t, s.StartAndWaitReady(distributor, ingester, querier))

// Wait until distributor has updated the ring.
require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))

// Wait until querier has updated the ring.
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))

client, err := e2emimir.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", userID)
require.NoError(t, err)

res, err := client.Push(tc.inSeries)
require.NoError(t, err)
require.Equal(t, http.StatusOK, res.StatusCode)

result, err := client.QueryRange(query, queryStart, queryEnd, queryStep)
require.NoError(t, err)
require.Equal(t, tc.expectedQueryResult, result)

// The PromQL engine does some special handling for the timestamp() function which previously
// caused queries to fail when streaming chunks was enabled, so check that this regression
// has not been reintroduced.
result, err = client.QueryRange(timestampQuery, queryStart, queryEnd, queryStep)
require.NoError(t, err)
require.Equal(t, tc.expectedTimestampQueryResult, result)

queryRequestCount := func(status string) (float64, error) {
counts, err := querier.SumMetrics([]string{"cortex_ingester_client_request_duration_seconds"},
e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "operation", "/cortex.Ingester/QueryStream"),
labels.MustNewMatcher(labels.MatchRegexp, "status_code", status),
),
e2e.WithMetricCount,
e2e.SkipMissingMetrics,
)

if err != nil {
return 0, err
}

require.Len(t, counts, 1)
return counts[0], nil
}

successfulQueryRequests, err := queryRequestCount("OK")
require.NoError(t, err)

cancelledQueryRequests, err := queryRequestCount("cancel")
require.NoError(t, err)

totalQueryRequests, err := queryRequestCount(".*")
require.NoError(t, err)

// We expect two query requests: the first query request and the timestamp query request
require.Equalf(t, 2.0, totalQueryRequests, "got %v query requests (%v successful, %v cancelled)", totalQueryRequests, successfulQueryRequests, cancelledQueryRequests)
require.Equalf(t, 2.0, successfulQueryRequests, "got %v query requests (%v successful, %v cancelled)", totalQueryRequests, successfulQueryRequests, cancelledQueryRequests)
require.Equalf(t, 0.0, cancelledQueryRequests, "got %v query requests (%v successful, %v cancelled)", totalQueryRequests, successfulQueryRequests, cancelledQueryRequests)
})
}
})
}
}

func TestIngesterQueryingWithRequestMinimization(t *testing.T) {
for _, streamingEnabled := range []bool{true, false} {
t.Run(fmt.Sprintf("streaming enabled: %v", streamingEnabled), func(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()
duricanikolic marked this conversation as resolved.
Show resolved Hide resolved

baseFlags := map[string]string{
"-distributor.ingestion-tenant-shard-size": "0",
"-ingester.ring.heartbeat-period": "1s",
"-ingester.ring.zone-awareness-enabled": "true",
"-ingester.ring.replication-factor": "3",
"-querier.minimize-ingester-requests": "true",
"-querier.prefer-streaming-chunks-from-ingesters": strconv.FormatBool(streamingEnabled),
"-distributor.ingestion-tenant-shard-size": "0",
"-ingester.ring.heartbeat-period": "1s",
}

flags := mergeFlags(
Expand All @@ -615,68 +513,159 @@ func TestIngesterQueryingWithRequestMinimization(t *testing.T) {
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(consul, minio))

ingesterFlags := func(zone string) map[string]string {
return mergeFlags(flags, map[string]string{
"-ingester.ring.instance-availability-zone": zone,
})
}

// Start Mimir components.
distributor := e2emimir.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags)
ingester1 := e2emimir.NewIngester("ingester-1", consul.NetworkHTTPEndpoint(), ingesterFlags("zone-a"))
ingester2 := e2emimir.NewIngester("ingester-2", consul.NetworkHTTPEndpoint(), ingesterFlags("zone-b"))
ingester3 := e2emimir.NewIngester("ingester-3", consul.NetworkHTTPEndpoint(), ingesterFlags("zone-c"))
ingester := e2emimir.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags)
querier := e2emimir.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags)
require.NoError(t, s.StartAndWaitReady(distributor, ingester1, ingester2, ingester3, querier))
require.NoError(t, s.StartAndWaitReady(distributor, ingester, querier))

// Wait until distributor and querier have updated the ring.
for _, component := range []*e2emimir.MimirService{distributor, querier} {
require.NoError(t, component.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))
}
// Wait until distributor has updated the ring.
require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))

// Wait until querier has updated the ring.
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))

client, err := e2emimir.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", userID)
require.NoError(t, err)

// Push some data to the cluster.
seriesName := "test_series"
now := time.Now()
series, expectedVector, _ := generateFloatSeries(seriesName, now, prompb.Label{Name: "foo", Value: "bar"})

res, err := client.Push(series)
res, err := client.Push(tc.inSeries)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
require.Equal(t, http.StatusOK, res.StatusCode)

// Verify we can query the data we just pushed.
queryResult, err := client.Query(seriesName, now)
result, err := client.QueryRange(query, queryStart, queryEnd, queryStep)
require.NoError(t, err)
require.Equal(t, model.ValVector, queryResult.Type())
require.Equal(t, expectedVector, queryResult.(model.Vector))
require.Equal(t, tc.expectedQueryResult, result)

// Check that we only queried two of the three ingesters.
totalQueryRequests := 0.0
// The PromQL engine does some special handling for the timestamp() function which previously
// caused queries to fail when streaming chunks was enabled, so check that this regression
// has not been reintroduced.
result, err = client.QueryRange(timestampQuery, queryStart, queryEnd, queryStep)
require.NoError(t, err)
require.Equal(t, tc.expectedTimestampQueryResult, result)

for _, ingester := range []*e2emimir.MimirService{ingester1, ingester2, ingester3} {
sums, err := ingester.SumMetrics(
[]string{"cortex_request_duration_seconds"},
queryRequestCount := func(status string) (float64, error) {
counts, err := querier.SumMetrics([]string{"cortex_ingester_client_request_duration_seconds"},
e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "route", "/cortex.Ingester/QueryStream"),
labels.MustNewMatcher(labels.MatchEqual, "status_code", "OK"),
labels.MustNewMatcher(labels.MatchEqual, "operation", "/cortex.Ingester/QueryStream"),
labels.MustNewMatcher(labels.MatchRegexp, "status_code", status),
),
e2e.SkipMissingMetrics,
e2e.WithMetricCount,
e2e.SkipMissingMetrics,
)

require.NoError(t, err)
queryRequests := sums[0]
require.LessOrEqual(t, queryRequests, 1.0)
totalQueryRequests += queryRequests
if err != nil {
return 0, err
}

require.Len(t, counts, 1)
return counts[0], nil
}

require.Equal(t, 2.0, totalQueryRequests)
successfulQueryRequests, err := queryRequestCount("OK")
require.NoError(t, err)

cancelledQueryRequests, err := queryRequestCount("cancel")
require.NoError(t, err)

totalQueryRequests, err := queryRequestCount(".*")
require.NoError(t, err)

// We expect two query requests: the first query request and the timestamp query request
require.Equalf(t, 2.0, totalQueryRequests, "got %v query requests (%v successful, %v cancelled)", totalQueryRequests, successfulQueryRequests, cancelledQueryRequests)
require.Equalf(t, 2.0, successfulQueryRequests, "got %v query requests (%v successful, %v cancelled)", totalQueryRequests, successfulQueryRequests, cancelledQueryRequests)
require.Equalf(t, 0.0, cancelledQueryRequests, "got %v query requests (%v successful, %v cancelled)", totalQueryRequests, successfulQueryRequests, cancelledQueryRequests)
})
}
}

func TestIngesterQueryingWithRequestMinimization(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

baseFlags := map[string]string{
"-distributor.ingestion-tenant-shard-size": "0",
"-ingester.ring.heartbeat-period": "1s",
"-ingester.ring.zone-awareness-enabled": "true",
"-ingester.ring.replication-factor": "3",
"-querier.minimize-ingester-requests": "true",
}

flags := mergeFlags(
BlocksStorageFlags(),
BlocksStorageS3Flags(),
baseFlags,
)

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(consul, minio))

ingesterFlags := func(zone string) map[string]string {
return mergeFlags(flags, map[string]string{
"-ingester.ring.instance-availability-zone": zone,
})
}

// Start Mimir components.
distributor := e2emimir.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags)
ingester1 := e2emimir.NewIngester("ingester-1", consul.NetworkHTTPEndpoint(), ingesterFlags("zone-a"))
ingester2 := e2emimir.NewIngester("ingester-2", consul.NetworkHTTPEndpoint(), ingesterFlags("zone-b"))
ingester3 := e2emimir.NewIngester("ingester-3", consul.NetworkHTTPEndpoint(), ingesterFlags("zone-c"))
querier := e2emimir.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags)
require.NoError(t, s.StartAndWaitReady(distributor, ingester1, ingester2, ingester3, querier))

// Wait until distributor and querier have updated the ring.
for _, component := range []*e2emimir.MimirService{distributor, querier} {
require.NoError(t, component.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))
}

client, err := e2emimir.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", userID)
require.NoError(t, err)

// Push some data to the cluster.
seriesName := "test_series"
now := time.Now()
series, expectedVector, _ := generateFloatSeries(seriesName, now, prompb.Label{Name: "foo", Value: "bar"})

res, err := client.Push(series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

// Verify we can query the data we just pushed.
queryResult, err := client.Query(seriesName, now)
require.NoError(t, err)
require.Equal(t, model.ValVector, queryResult.Type())
require.Equal(t, expectedVector, queryResult.(model.Vector))

// Check that we only queried two of the three ingesters.
totalQueryRequests := 0.0

for _, ingester := range []*e2emimir.MimirService{ingester1, ingester2, ingester3} {
sums, err := ingester.SumMetrics(
[]string{"cortex_request_duration_seconds"},
e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "route", "/cortex.Ingester/QueryStream"),
labels.MustNewMatcher(labels.MatchEqual, "status_code", "OK"),
),
e2e.SkipMissingMetrics,
e2e.WithMetricCount,
)

require.NoError(t, err)
queryRequests := sums[0]
require.LessOrEqual(t, queryRequests, 1.0)
totalQueryRequests += queryRequests
}

require.Equal(t, 2.0, totalQueryRequests)
}

func TestIngesterReportGRPCStatusCodes(t *testing.T) {
Expand Down
Loading
Loading