diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index d83bac05ff8..0e1e92d3760 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -1487,6 +1487,8 @@ func (d *Distributor) sendToStorage(ctx context.Context, userID string, partitio } // forReplicationSet runs f, in parallel, for all ingesters in the input replication set. +// +// Deprecated: use forReplicationSets() instead. func forReplicationSet[T any](ctx context.Context, d *Distributor, replicationSet ring.ReplicationSet, f func(context.Context, ingester_client.IngesterClient) (T, error)) ([]T, error) { wrappedF := func(ctx context.Context, ing *ring.InstanceDesc) (T, error) { client, err := d.ingesterPool.GetClientForInstance(*ing) @@ -1505,6 +1507,29 @@ func forReplicationSet[T any](ctx context.Context, d *Distributor, replicationSe return ring.DoUntilQuorum(ctx, replicationSet, d.queryQuorumConfig(ctx, replicationSet), wrappedF, cleanup) } +// forReplicationSets runs f, in parallel, for all ingesters in the input replication sets. +func forReplicationSets[R any](ctx context.Context, d *Distributor, replicationSets []ring.ReplicationSet, f func(context.Context, ingester_client.IngesterClient) (R, error)) ([]R, error) { + wrappedF := func(ctx context.Context, ingester *ring.InstanceDesc) (R, error) { + client, err := d.ingesterPool.GetClientForInstance(*ingester) + if err != nil { + var empty R + return empty, err + } + + return f(ctx, client.(ingester_client.IngesterClient)) + } + + cleanup := func(_ R) { + // Nothing to do. + } + + quorumConfig := d.queryQuorumConfigForReplicationSets(ctx, replicationSets) + + return concurrency.ForEachJobMergeResults[ring.ReplicationSet, R](ctx, replicationSets, 0, func(ctx context.Context, set ring.ReplicationSet) ([]R, error) { + return ring.DoUntilQuorum(ctx, set, quorumConfig, wrappedF, cleanup) + }) +} + // queryQuorumConfig returns the config to use with "do until quorum" functions when running queries. // // Deprecated: use queryQuorumConfigForReplicationSets() instead. @@ -1581,10 +1606,10 @@ func queryIngesterPartitionsRingZoneSorter(preferredZone string) ring.ZoneSorter } } -// LabelValuesForLabelName returns all of the label values that are associated with a given label name. +// LabelValuesForLabelName returns the label values associated with the given labelName, among all series with samples +// timestamp between from and to, and series labels matching the optional matchers. func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, labelName model.LabelName, matchers ...*labels.Matcher) ([]string, error) { - //nolint:staticcheck - replicationSet, err := d.getIngesterReplicationSetForQuery(ctx) + replicationSets, err := d.getIngesterReplicationSetsForQuery(ctx) if err != nil { return nil, err } @@ -1594,7 +1619,7 @@ func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to mode return nil, err } - resps, err := forReplicationSet(ctx, d, replicationSet, func(ctx context.Context, client ingester_client.IngesterClient) (*ingester_client.LabelValuesResponse, error) { + resps, err := forReplicationSets(ctx, d, replicationSets, func(ctx context.Context, client ingester_client.IngesterClient) (*ingester_client.LabelValuesResponse, error) { return client.LabelValues(ctx, req) }) if err != nil { @@ -1619,10 +1644,13 @@ func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to mode return values, nil } -// LabelNamesAndValues query ingesters for label names and values and returns labels with distinct list of values. +// LabelNamesAndValues returns the label name and value pairs for series matching the input label matchers. +// +// The actual series considered eligible depend on countMethod: +// - inmemory: in-memory series in ingesters. +// - active: in-memory series in ingesters which are also tracked as active ones. func (d *Distributor) LabelNamesAndValues(ctx context.Context, matchers []*labels.Matcher, countMethod cardinality.CountMethod) (*ingester_client.LabelNamesAndValuesResponse, error) { - //nolint:staticcheck - replicationSet, err := d.getIngesterReplicationSetForQuery(ctx) + replicationSets, err := d.getIngesterReplicationSetsForQuery(ctx) if err != nil { return nil, err } @@ -1631,13 +1659,16 @@ func (d *Distributor) LabelNamesAndValues(ctx context.Context, matchers []*label if err != nil { return nil, err } + userID, err := tenant.TenantID(ctx) if err != nil { return nil, err } + sizeLimitBytes := d.limits.LabelNamesAndValuesResultsMaxSizeBytes(userID) merger := &labelNamesAndValuesResponseMerger{result: map[string]map[string]struct{}{}, sizeLimitBytes: sizeLimitBytes} - _, err = forReplicationSet(ctx, d, replicationSet, func(ctx context.Context, client ingester_client.IngesterClient) (ingester_client.Ingester_LabelNamesAndValuesClient, error) { + + _, err = forReplicationSets(ctx, d, replicationSets, func(ctx context.Context, client ingester_client.IngesterClient) (ingester_client.Ingester_LabelNamesAndValuesClient, error) { stream, err := client.LabelNamesAndValues(ctx, req) if err != nil { return nil, err @@ -2000,6 +2031,7 @@ func (d *Distributor) ActiveSeries(ctx context.Context, matchers []*labels.Match return ignored{}, nil } + //nolint:staticcheck _, err = forReplicationSet(ctx, d, replicationSet, ingesterQuery) if err != nil { return nil, err @@ -2133,10 +2165,10 @@ func approximateFromZones[T ~float64 | ~uint64](zoneCount int, replicationFactor return sum / T(replicationFactor) } -// LabelNames returns all of the label names. +// LabelNames returns the names of all labels from series with samples timestamp between from and to, and matching +// the input optional series label matchers. The returned label names are sorted. func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) ([]string, error) { - //nolint:staticcheck - replicationSet, err := d.getIngesterReplicationSetForQuery(ctx) + replicationSets, err := d.getIngesterReplicationSetsForQuery(ctx) if err != nil { return nil, err } @@ -2146,7 +2178,7 @@ func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, match return nil, err } - resps, err := forReplicationSet(ctx, d, replicationSet, func(ctx context.Context, client ingester_client.IngesterClient) (*ingester_client.LabelNamesResponse, error) { + resps, err := forReplicationSets(ctx, d, replicationSets, func(ctx context.Context, client ingester_client.IngesterClient) (*ingester_client.LabelNamesResponse, error) { return client.LabelNames(ctx, req) }) if err != nil { @@ -2170,10 +2202,10 @@ func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, match return values, nil } -// MetricsForLabelMatchers gets the metrics that match said matchers +// MetricsForLabelMatchers returns a list of series with samples timestamps between from and through, and series labels +// matching the optional label matchers. The returned series are not sorted. func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error) { - //nolint:staticcheck - replicationSet, err := d.getIngesterReplicationSetForQuery(ctx) + replicationSets, err := d.getIngesterReplicationSetsForQuery(ctx) if err != nil { return nil, err } @@ -2183,7 +2215,7 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through return nil, err } - resps, err := forReplicationSet(ctx, d, replicationSet, func(ctx context.Context, client ingester_client.IngesterClient) (*ingester_client.MetricsForLabelMatchersResponse, error) { + resps, err := forReplicationSets(ctx, d, replicationSets, func(ctx context.Context, client ingester_client.IngesterClient) (*ingester_client.MetricsForLabelMatchersResponse, error) { return client.MetricsForLabelMatchers(ctx, req) }) if err != nil { @@ -2210,15 +2242,14 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through return result, nil } -// MetricsMetadata returns all metric metadata of a user. +// MetricsMetadata returns the metrics metadata based on the provided req. func (d *Distributor) MetricsMetadata(ctx context.Context, req *ingester_client.MetricsMetadataRequest) ([]scrape.MetricMetadata, error) { - //nolint:staticcheck - replicationSet, err := d.getIngesterReplicationSetForQuery(ctx) + replicationSets, err := d.getIngesterReplicationSetsForQuery(ctx) if err != nil { return nil, err } - resps, err := forReplicationSet(ctx, d, replicationSet, func(ctx context.Context, client ingester_client.IngesterClient) (*ingester_client.MetricsMetadataResponse, error) { + resps, err := forReplicationSets(ctx, d, replicationSets, func(ctx context.Context, client ingester_client.IngesterClient) (*ingester_client.MetricsMetadataResponse, error) { return client.MetricsMetadata(ctx, req) }) if err != nil { diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 7ae72ebe46f..c6bb58c7b57 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -57,6 +57,7 @@ import ( "github.com/grafana/mimir/pkg/ingester" "github.com/grafana/mimir/pkg/ingester/client" "github.com/grafana/mimir/pkg/mimirpb" + "github.com/grafana/mimir/pkg/querier/api" "github.com/grafana/mimir/pkg/querier/stats" "github.com/grafana/mimir/pkg/storage/chunk" "github.com/grafana/mimir/pkg/storage/ingest" @@ -1553,10 +1554,6 @@ func TestDistributor_Push_HistogramValidation(t *testing.T) { assert.Nil(t, resp) checkGRPCError(t, tc.expectedErr, expectedDetails, err) } - - t.Cleanup(func() { - require.NoError(t, services.StopAndAwaitTerminated(ctx, ds[0])) - }) }) } } @@ -2228,43 +2225,74 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) { } for testName, testData := range tests { + testData := testData + t.Run(testName, func(t *testing.T) { - now := model.Now() + t.Parallel() - // Create distributor - ds, ingesters, _, _ := prepare(t, prepConfig{ - numIngesters: numIngesters, - happyIngesters: numIngesters, - numDistributors: 1, - shuffleShardSize: testData.shuffleShardSize, - }) + for _, ingestStorageEnabled := range []bool{false, true} { + ingestStorageEnabled := ingestStorageEnabled - // Push fixtures - ctx := user.InjectOrgID(context.Background(), "test") + t.Run(fmt.Sprintf("ingest storage enabled: %t", ingestStorageEnabled), func(t *testing.T) { + t.Parallel() - for _, series := range fixtures { - req := mockWriteRequest(series.lbls, series.value, series.timestamp) - _, err := ds[0].Push(ctx, req) - require.NoError(t, err) - } + now := model.Now() - // Set up limiter - ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(testData.maxSeriesPerQuery, 0, 0, 0, stats.NewQueryMetrics(prometheus.NewPedanticRegistry()))) + testConfig := prepConfig{ + numIngesters: numIngesters, + happyIngesters: numIngesters, + numDistributors: 1, + } - metrics, err := ds[0].MetricsForLabelMatchers(ctx, now, now, testData.matchers...) - if testData.expectedError != nil { - require.ErrorIs(t, err, testData.expectedError) - return - } + if ingestStorageEnabled { + testConfig.ingestStorageEnabled = true + testConfig.ingestStoragePartitions = numIngesters + testConfig.limits = prepareDefaultLimits() + testConfig.limits.IngestionPartitionsTenantShardSize = testData.shuffleShardSize + } else { + testConfig.shuffleShardSize = testData.shuffleShardSize + } - require.NoError(t, err) - assert.ElementsMatch(t, testData.expectedResult, metrics) + // Create distributor + ds, ingesters, _, _ := prepare(t, testConfig) - // Check how many ingesters have been queried. - // Due to the quorum the distributor could cancel the last request towards ingesters - // if all other ones are successful, so we're good either has been queried X or X-1 - // ingesters. - assert.Contains(t, []int{testData.expectedIngesters, testData.expectedIngesters - 1}, countMockIngestersCalled(ingesters, "MetricsForLabelMatchers")) + // Ensure strong read consistency, required to have no flaky tests when ingest storage is enabled. + ctx := user.InjectOrgID(context.Background(), "test") + ctx = api.ContextWithReadConsistency(ctx, api.ReadConsistencyStrong) + + // Push fixtures + for _, series := range fixtures { + req := mockWriteRequest(series.lbls, series.value, series.timestamp) + _, err := ds[0].Push(ctx, req) + require.NoError(t, err) + } + + // Set up limiter + ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(testData.maxSeriesPerQuery, 0, 0, 0, stats.NewQueryMetrics(prometheus.NewPedanticRegistry()))) + + metrics, err := ds[0].MetricsForLabelMatchers(ctx, now, now, testData.matchers...) + if testData.expectedError != nil { + require.ErrorIs(t, err, testData.expectedError) + return + } + + require.NoError(t, err) + assert.ElementsMatch(t, testData.expectedResult, metrics) + + // Check how many ingesters have been queried. + if ingestStorageEnabled { + // When ingest storage is enabled, we request quorum 1 for each partition. + // In this test each ingester owns a different partition, so we expect all + // ingesters to be queried. + assert.Equal(t, testData.expectedIngesters, countMockIngestersCalled(ingesters, "MetricsForLabelMatchers")) + } else { + // Due to the quorum the distributor could cancel the last request towards ingesters + // if all other ones are successful, so we're good either has been queried X or X-1 + // ingesters. + assert.Contains(t, []int{testData.expectedIngesters, testData.expectedIngesters - 1}, countMockIngestersCalled(ingesters, "MetricsForLabelMatchers")) + } + }) + } }) } } @@ -2461,35 +2489,67 @@ func TestDistributor_LabelNames(t *testing.T) { } for testName, testData := range tests { + testData := testData + t.Run(testName, func(t *testing.T) { - now := model.Now() + t.Parallel() - // Create distributor - ds, ingesters, _, _ := prepare(t, prepConfig{ - numIngesters: numIngesters, - happyIngesters: numIngesters, - numDistributors: 1, - shuffleShardSize: testData.shuffleShardSize, - }) + for _, ingestStorageEnabled := range []bool{false, true} { + ingestStorageEnabled := ingestStorageEnabled - // Push fixtures - ctx := user.InjectOrgID(context.Background(), "test") + t.Run(fmt.Sprintf("ingest storage enabled: %t", ingestStorageEnabled), func(t *testing.T) { + t.Parallel() - for _, series := range fixtures { - req := mockWriteRequest(series.lbls, series.value, series.timestamp) - _, err := ds[0].Push(ctx, req) - require.NoError(t, err) - } + now := model.Now() - names, err := ds[0].LabelNames(ctx, now, now, testData.matchers...) - require.NoError(t, err) - assert.ElementsMatch(t, testData.expectedResult, names) + testConfig := prepConfig{ + numIngesters: numIngesters, + happyIngesters: numIngesters, + numDistributors: 1, + } - // Check how many ingesters have been queried. - // Due to the quorum the distributor could cancel the last request towards ingesters - // if all other ones are successful, so we're good either has been queried X or X-1 - // ingesters. - assert.Contains(t, []int{testData.expectedIngesters, testData.expectedIngesters - 1}, countMockIngestersCalled(ingesters, "LabelNames")) + if ingestStorageEnabled { + testConfig.ingestStorageEnabled = true + testConfig.ingestStoragePartitions = numIngesters + + testConfig.limits = prepareDefaultLimits() + testConfig.limits.IngestionPartitionsTenantShardSize = testData.shuffleShardSize + } else { + testConfig.shuffleShardSize = testData.shuffleShardSize + } + + // Create distributor + ds, ingesters, _, _ := prepare(t, testConfig) + + // Ensure strong read consistency, required to have no flaky tests when ingest storage is enabled. + ctx := user.InjectOrgID(context.Background(), "test") + ctx = api.ContextWithReadConsistency(ctx, api.ReadConsistencyStrong) + + // Push fixtures + for _, series := range fixtures { + req := mockWriteRequest(series.lbls, series.value, series.timestamp) + _, err := ds[0].Push(ctx, req) + require.NoError(t, err) + } + + names, err := ds[0].LabelNames(ctx, now, now, testData.matchers...) + require.NoError(t, err) + assert.ElementsMatch(t, testData.expectedResult, names) + + // Check how many ingesters have been queried. + if ingestStorageEnabled { + // When ingest storage is enabled, we request quorum 1 for each partition. + // In this test each ingester owns a different partition, so we expect all + // ingesters to be queried. + assert.Equal(t, testData.expectedIngesters, countMockIngestersCalled(ingesters, "LabelNames")) + } else { + // Due to the quorum the distributor could cancel the last request towards ingesters + // if all other ones are successful, so we're good either has been queried X or X-1 + // ingesters. + assert.Contains(t, []int{testData.expectedIngesters, testData.expectedIngesters - 1}, countMockIngestersCalled(ingesters, "LabelNames")) + } + }) + } }) } } @@ -2512,44 +2572,74 @@ func TestDistributor_MetricsMetadata(t *testing.T) { } for testName, testData := range tests { + testData := testData + t.Run(testName, func(t *testing.T) { - // Create distributor - ds, _, _, _ := prepare(t, prepConfig{ - numIngesters: numIngesters, - happyIngesters: numIngesters, - numDistributors: 1, - shuffleShardSize: testData.shuffleShardSize, - limits: nil, - }) + t.Parallel() - // Push metadata - ctx := user.InjectOrgID(context.Background(), "test") + for _, ingestStorageEnabled := range []bool{false, true} { + ingestStorageEnabled := ingestStorageEnabled - req := makeWriteRequest(0, 0, 10, false, true, "foo") - _, err := ds[0].Push(ctx, req) - require.NoError(t, err) + t.Run(fmt.Sprintf("ingest storage enabled: %t", ingestStorageEnabled), func(t *testing.T) { + t.Parallel() - // Check how many ingesters are queried as part of the shuffle sharding subring. - //nolint:staticcheck - replicationSet, err := ds[0].getIngesterReplicationSetForQuery(ctx) - require.NoError(t, err) - assert.Equal(t, testData.expectedIngesters, len(replicationSet.Instances)) + testConfig := prepConfig{ + numIngesters: numIngesters, + happyIngesters: numIngesters, + numDistributors: 1, + } - // Assert on metric metadata - metadata, err := ds[0].MetricsMetadata(ctx, client.DefaultMetricsMetadataRequest()) - require.NoError(t, err) + if ingestStorageEnabled { + testConfig.ingestStorageEnabled = true + testConfig.ingestStoragePartitions = numIngesters + testConfig.limits = prepareDefaultLimits() + testConfig.limits.IngestionPartitionsTenantShardSize = testData.shuffleShardSize + } else { + testConfig.shuffleShardSize = testData.shuffleShardSize + } + + // Create distributor + ds, ingesters, _, _ := prepare(t, testConfig) - expectedMetadata := make([]scrape.MetricMetadata, 0, len(req.Metadata)) - for _, m := range req.Metadata { - expectedMetadata = append(expectedMetadata, scrape.MetricMetadata{ - Metric: m.MetricFamilyName, - Type: mimirpb.MetricMetadataMetricTypeToMetricType(m.Type), - Help: m.Help, - Unit: m.Unit, + // Ensure strong read consistency, required to have no flaky tests when ingest storage is enabled. + ctx := user.InjectOrgID(context.Background(), "test") + ctx = api.ContextWithReadConsistency(ctx, api.ReadConsistencyStrong) + + // Push metadata + req := makeWriteRequest(0, 0, 10, false, true, "foo") + _, err := ds[0].Push(ctx, req) + require.NoError(t, err) + + // Assert on metric metadata + metadata, err := ds[0].MetricsMetadata(ctx, client.DefaultMetricsMetadataRequest()) + require.NoError(t, err) + + expectedMetadata := make([]scrape.MetricMetadata, 0, len(req.Metadata)) + for _, m := range req.Metadata { + expectedMetadata = append(expectedMetadata, scrape.MetricMetadata{ + Metric: m.MetricFamilyName, + Type: mimirpb.MetricMetadataMetricTypeToMetricType(m.Type), + Help: m.Help, + Unit: m.Unit, + }) + } + + assert.ElementsMatch(t, metadata, expectedMetadata) + + // Check how many ingesters have been queried. + if ingestStorageEnabled { + // When ingest storage is enabled, we request quorum 1 for each partition. + // In this test each ingester owns a different partition, so we expect all + // ingesters to be queried. + assert.Equal(t, testData.expectedIngesters, countMockIngestersCalled(ingesters, "MetricsMetadata")) + } else { + // Due to the quorum the distributor could cancel the last request towards ingesters + // if all other ones are successful, so we're good either has been queried X or X-1 + // ingesters. + assert.Contains(t, []int{testData.expectedIngesters, testData.expectedIngesters - 1}, countMockIngestersCalled(ingesters, "MetricsMetadata")) + } }) } - - assert.ElementsMatch(t, metadata, expectedMetadata) }) } } @@ -2578,35 +2668,50 @@ func TestDistributor_LabelNamesAndValuesLimitTest(t *testing.T) { }, } for testName, testData := range tests { + testData := testData + t.Run(testName, func(t *testing.T) { - ctx := user.InjectOrgID(context.Background(), "label-names-values") + t.Parallel() - // Create distributor - limits := validation.Limits{} - flagext.DefaultValues(&limits) - limits.LabelNamesAndValuesResultsMaxSizeBytes = testData.sizeLimitBytes - ds, _, _, _ := prepare(t, prepConfig{ - numIngesters: 3, - happyIngesters: 3, - numDistributors: 1, - limits: &limits, - }) - t.Cleanup(func() { - require.NoError(t, services.StopAndAwaitTerminated(ctx, ds[0])) - }) + for _, ingestStorageEnabled := range []bool{false, true} { + ingestStorageEnabled := ingestStorageEnabled + + t.Run(fmt.Sprintf("ingest storage enabled: %t", ingestStorageEnabled), func(t *testing.T) { + t.Parallel() + + // Ensure strong read consistency, required to have no flaky tests when ingest storage is enabled. + ctx := user.InjectOrgID(context.Background(), "label-names-values") + ctx = api.ContextWithReadConsistency(ctx, api.ReadConsistencyStrong) + + // Create distributor + limits := validation.Limits{} + flagext.DefaultValues(&limits) + limits.LabelNamesAndValuesResultsMaxSizeBytes = testData.sizeLimitBytes + ds, _, _, _ := prepare(t, prepConfig{ + numIngesters: 3, + happyIngesters: 3, + numDistributors: 1, + limits: &limits, + + // Ingest storage config is ignored when disabled. + ingestStorageEnabled: ingestStorageEnabled, + ingestStoragePartitions: 3, + }) - // Push fixtures - for _, series := range fixtures { - req := mockWriteRequest(series.lbls, series.value, series.timestamp) - _, err := ds[0].Push(ctx, req) - require.NoError(t, err) - } + // Push fixtures + for _, series := range fixtures { + req := mockWriteRequest(series.lbls, series.value, series.timestamp) + _, err := ds[0].Push(ctx, req) + require.NoError(t, err) + } - _, err := ds[0].LabelNamesAndValues(ctx, []*labels.Matcher{}, cardinality.InMemoryMethod) - if len(testData.expectedError) == 0 { - require.NoError(t, err) - } else { - require.EqualError(t, err, testData.expectedError) + _, err := ds[0].LabelNamesAndValues(ctx, []*labels.Matcher{}, cardinality.InMemoryMethod) + if len(testData.expectedError) == 0 { + require.NoError(t, err) + } else { + require.EqualError(t, err, testData.expectedError) + } + }) } }) } @@ -2646,30 +2751,45 @@ func TestDistributor_LabelValuesForLabelName(t *testing.T) { } for testName, testCase := range tests { + testCase := testCase + t.Run(testName, func(t *testing.T) { - ctx := user.InjectOrgID(context.Background(), "label-names-values") + t.Parallel() - // Create distributor - ds, _, _, _ := prepare(t, prepConfig{ - numIngesters: 12, - happyIngesters: 12, - numDistributors: 1, - replicationFactor: 3, - }) - t.Cleanup(func() { - require.NoError(t, services.StopAndAwaitTerminated(ctx, ds[0])) - }) + for _, ingestStorageEnabled := range []bool{false, true} { + ingestStorageEnabled := ingestStorageEnabled - // Push fixtures - for _, series := range fixtures { - req := mockWriteRequest(series.lbls, series.value, series.timestamp) - _, err := ds[0].Push(ctx, req) - require.NoError(t, err) - } + t.Run(fmt.Sprintf("ingest storage enabled: %t", ingestStorageEnabled), func(t *testing.T) { + t.Parallel() - response, err := ds[0].LabelValuesForLabelName(ctx, testCase.from, testCase.to, labels.MetricName, testCase.matchers...) - require.NoError(t, err) - assert.ElementsMatch(t, response, testCase.expectedLabelValues) + // Ensure strong read consistency, required to have no flaky tests when ingest storage is enabled. + ctx := user.InjectOrgID(context.Background(), "label-names-values") + ctx = api.ContextWithReadConsistency(ctx, api.ReadConsistencyStrong) + + // Create distributor + ds, _, _, _ := prepare(t, prepConfig{ + numIngesters: 12, + happyIngesters: 12, + numDistributors: 1, + replicationFactor: 3, + + // Ingest storage config is ignored when disabled. + ingestStorageEnabled: ingestStorageEnabled, + ingestStoragePartitions: 12, + }) + + // Push fixtures + for _, series := range fixtures { + req := mockWriteRequest(series.lbls, series.value, series.timestamp) + _, err := ds[0].Push(ctx, req) + require.NoError(t, err) + } + + response, err := ds[0].LabelValuesForLabelName(ctx, testCase.from, testCase.to, labels.MetricName, testCase.matchers...) + require.NoError(t, err) + assert.ElementsMatch(t, response, testCase.expectedLabelValues) + }) + } }) } } @@ -2698,58 +2818,82 @@ func TestDistributor_LabelNamesAndValues(t *testing.T) { Values: []string{"200", "500"}, }, } - tests := map[string]struct { - zones []string - zonesResponseDelay map[string]time.Duration - }{ - "should group values of labels by label name and return only distinct label values": {}, - "should return the results if zone awareness is enabled and only 2 zones return the results": { - zones: []string{"A", "B", "C"}, - zonesResponseDelay: map[string]time.Duration{"C": 10 * time.Second}, - }, - } - for testName, testData := range tests { - t.Run(testName, func(t *testing.T) { - ctx := user.InjectOrgID(context.Background(), "label-names-values") + t.Run("should group values of labels by label name and return only distinct label values", func(t *testing.T) { + for _, ingestStorageEnabled := range []bool{false, true} { + ingestStorageEnabled := ingestStorageEnabled - // Create distributor - ds, _, _, _ := prepare(t, prepConfig{ - numIngesters: 12, - happyIngesters: 12, - numDistributors: 1, - replicationFactor: 3, - ingesterZones: testData.zones, - labelNamesStreamZonesResponseDelay: testData.zonesResponseDelay, - }) - t.Cleanup(func() { - require.NoError(t, services.StopAndAwaitTerminated(ctx, ds[0])) - }) + t.Run(fmt.Sprintf("ingest storage enabled: %t", ingestStorageEnabled), func(t *testing.T) { + t.Parallel() - // Push fixtures - for _, series := range fixtures { - req := mockWriteRequest(series.lbls, series.value, series.timestamp) - _, err := ds[0].Push(ctx, req) + // Ensure strong read consistency, required to have no flaky tests when ingest storage is enabled. + ctx := user.InjectOrgID(context.Background(), "label-names-values") + ctx = api.ContextWithReadConsistency(ctx, api.ReadConsistencyStrong) + + // Create distributor + ds, _, _, _ := prepare(t, prepConfig{ + numIngesters: 12, + happyIngesters: 12, + numDistributors: 1, + replicationFactor: 3, + ingestStorageEnabled: ingestStorageEnabled, + ingestStoragePartitions: 12, + }) + + // Push fixtures + for _, series := range fixtures { + req := mockWriteRequest(series.lbls, series.value, series.timestamp) + _, err := ds[0].Push(ctx, req) + require.NoError(t, err) + } + + response, err := ds[0].LabelNamesAndValues(ctx, []*labels.Matcher{}, cardinality.InMemoryMethod) require.NoError(t, err) - } + require.Len(t, response.Items, len(expectedLabelValues)) - // Assert on metric metadata - timeBeforeExecution := time.Now() - response, err := ds[0].LabelNamesAndValues(ctx, []*labels.Matcher{}, cardinality.InMemoryMethod) - require.NoError(t, err) - if len(testData.zonesResponseDelay) > 0 { - executionDuration := time.Since(timeBeforeExecution) - require.Less(t, executionDuration, 5*time.Second, "Execution must be completed earlier than in 5 seconds") - } - require.Len(t, response.Items, len(expectedLabelValues)) + // sort label values to make stable assertion + for _, item := range response.Items { + slices.Sort(item.Values) + } + assert.ElementsMatch(t, response.Items, expectedLabelValues) + }) + } + }) - // sort label values to make stable assertion - for _, item := range response.Items { - slices.Sort(item.Values) - } - assert.ElementsMatch(t, response.Items, expectedLabelValues) + t.Run("should return the results if zone awareness is enabled and only 2 zones return the results", func(t *testing.T) { + ctx := user.InjectOrgID(context.Background(), "label-names-values") + slowZoneDelay := 10 * time.Second + + // Create distributor + ds, _, _, _ := prepare(t, prepConfig{ + numIngesters: 12, + happyIngesters: 12, + numDistributors: 1, + replicationFactor: 3, + ingesterZones: []string{"A", "B", "C"}, + labelNamesStreamZonesResponseDelay: map[string]time.Duration{"C": slowZoneDelay}, }) - } + + // Push fixtures + for _, series := range fixtures { + req := mockWriteRequest(series.lbls, series.value, series.timestamp) + _, err := ds[0].Push(ctx, req) + require.NoError(t, err) + } + + // Assert on metric metadata + timeBeforeExecution := time.Now() + response, err := ds[0].LabelNamesAndValues(ctx, []*labels.Matcher{}, cardinality.InMemoryMethod) + require.NoError(t, err) + require.Less(t, time.Since(timeBeforeExecution), slowZoneDelay/2, "Execution must be completed before the slow zone ingesters respond") + require.Len(t, response.Items, len(expectedLabelValues)) + + // sort label values to make stable assertion + for _, item := range response.Items { + slices.Sort(item.Values) + } + assert.ElementsMatch(t, response.Items, expectedLabelValues) + }) } // This test asserts that distributor waits for all ingester responses to be completed even if ZoneAwareness is enabled. @@ -2811,9 +2955,6 @@ func prepareWithZoneAwarenessAndZoneDelay(t *testing.T, fixtures []series) (cont "ZONE-C": 2 * time.Second, }, }) - t.Cleanup(func() { - require.NoError(t, services.StopAndAwaitTerminated(ctx, ds[0])) - }) // Push fixtures for _, series := range fixtures { @@ -3597,6 +3738,7 @@ type prepConfig struct { // Ingest storage specific configuration. ingestStorageEnabled bool ingestStoragePartitions int32 // Number of partitions. + ingestStorageKafka *kfake.Cluster } // totalIngesters takes into account ingesterStateByZone and numIngesters. @@ -3656,18 +3798,18 @@ func (c prepConfig) validate(t testing.TB) { } } -func prepareIngesters(cfg prepConfig) []*mockIngester { +func prepareIngesters(t testing.TB, cfg prepConfig) []*mockIngester { if len(cfg.ingesterStateByZone) != 0 { ingesters := []*mockIngester(nil) for zone, state := range cfg.ingesterStateByZone { - ingesters = append(ingesters, prepareIngesterZone(zone, state, cfg)...) + ingesters = append(ingesters, prepareIngesterZone(t, zone, state, cfg)...) } return ingesters } ingesters := []*mockIngester(nil) numZones := len(cfg.ingesterZones) if numZones == 0 { - return prepareIngesterZone("", ingesterZoneState{numIngesters: cfg.numIngesters, happyIngesters: cfg.happyIngesters}, cfg) + return prepareIngesterZone(t, "", ingesterZoneState{numIngesters: cfg.numIngesters, happyIngesters: cfg.happyIngesters}, cfg) } for zoneIdx, zone := range cfg.ingesterZones { state := ingesterZoneState{ @@ -3688,13 +3830,13 @@ func prepareIngesters(cfg prepConfig) []*mockIngester { state.numIngesters++ } - ingesters = append(ingesters, prepareIngesterZone(zone, state, cfg)...) + ingesters = append(ingesters, prepareIngesterZone(t, zone, state, cfg)...) } return ingesters } -func prepareIngesterZone(zone string, state ingesterZoneState, cfg prepConfig) []*mockIngester { +func prepareIngesterZone(t testing.TB, zone string, state ingesterZoneState, cfg prepConfig) []*mockIngester { ingesters := []*mockIngester(nil) if state.states == nil { @@ -3712,7 +3854,8 @@ func prepareIngesterZone(zone string, state ingesterZoneState, cfg prepConfig) [ if len(cfg.labelNamesStreamZonesResponseDelay) > 0 { labelNamesStreamResponseDelay = cfg.labelNamesStreamZonesResponseDelay[zone] } - ingesters = append(ingesters, &mockIngester{ + + ingester := &mockIngester{ id: i, happy: s == ingesterStateHappy, queryDelay: cfg.queryDelay, @@ -3722,8 +3865,42 @@ func prepareIngesterZone(zone string, state ingesterZoneState, cfg prepConfig) [ labelNamesStreamResponseDelay: labelNamesStreamResponseDelay, timeOut: cfg.timeOut, circuitBreakerOpen: cfg.circuitBreakerOpen, - }) + } + + // Init the partition reader if the ingest storage is enabled. + // This is required to let each mocked ingester to consume their own partition. + if cfg.ingestStorageEnabled { + var err error + + kafkaCfg := ingest.KafkaConfig{} + flagext.DefaultValues(&kafkaCfg) + kafkaCfg.Address = cfg.ingestStorageKafka.ListenAddrs()[0] + kafkaCfg.Topic = kafkaTopic + kafkaCfg.LastProducedOffsetPollInterval = 100 * time.Millisecond + kafkaCfg.LastProducedOffsetRetryTimeout = 100 * time.Millisecond + + ingester.partitionReader, err = ingest.NewPartitionReaderForPusher(kafkaCfg, ingester.partitionID(), ingester.instanceID(), newMockIngesterPusherAdapter(ingester), log.NewNopLogger(), nil) + require.NoError(t, err) + + // We start it async, and then we wait until running in a defer so that multiple partition + // readers will be started concurrently. + require.NoError(t, ingester.partitionReader.StartAsync(context.Background())) + + t.Cleanup(func() { + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ingester.partitionReader)) + }) + } + + ingesters = append(ingesters, ingester) + } + + // Wait until all partition readers have started. + if cfg.ingestStorageEnabled { + for _, ingester := range ingesters { + require.NoError(t, ingester.partitionReader.AwaitRunning(context.Background())) + } } + return ingesters } @@ -3779,10 +3956,19 @@ func prepare(t testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, []* cfg.validate(t) logger := log.NewNopLogger() - ingesters := prepareIngesters(cfg) + + // Init a fake Kafka cluster if ingest storage is enabled. + if cfg.ingestStorageEnabled && cfg.ingestStorageKafka == nil { + cfg.ingestStorageKafka, _ = testkafka.CreateCluster(t, cfg.ingestStoragePartitions, kafkaTopic) + } + + // Create the mocked ingesters. + ingesters := prepareIngesters(t, cfg) + kvStore, closer := consul.NewInMemoryClient(ring.GetCodec(), logger, nil) t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + // Add ingesters to the ring. err := kvStore.CAS(ctx, ingester.IngesterRingKey, func(_ interface{}) (interface{}, bool, error) { return prepareRingInstances(cfg, ingesters), true, nil @@ -3815,6 +4001,8 @@ func prepare(t testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, []* return ingestersRing.InstancesCount() }) + // Mock the ingester clients pool in order to return our mocked ingester instance instead of a + // real gRPC client. factory := ring_client.PoolInstFunc(func(inst ring.InstanceDesc) (ring_client.PoolClient, error) { for _, ing := range ingesters { if ing.address() == inst.Addr { @@ -3824,14 +4012,9 @@ func prepare(t testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, []* return nil, fmt.Errorf("ingester with address %s not found", inst.Addr) }) - // Initialize the ingest storage backend. + // Initialize the ingest storage's partitions ring. var partitionsRing *ring.PartitionInstanceRing - var kafkaCluster *kfake.Cluster - if cfg.ingestStorageEnabled { - // Init a fake Kafka cluster. - kafkaCluster, _ = testkafka.CreateCluster(t, cfg.ingestStoragePartitions, kafkaTopic) - // Init the partitions ring. partitionsStore := kvStore.WithCodec(ring.GetPartitionRingCodec()) require.NoError(t, partitionsStore.CAS(ctx, ingester.PartitionRingKey, func(_ interface{}) (interface{}, bool, error) { @@ -3867,7 +4050,7 @@ func prepare(t testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, []* ingestCfg.Enabled = cfg.ingestStorageEnabled if cfg.ingestStorageEnabled { ingestCfg.KafkaConfig.Topic = kafkaTopic - ingestCfg.KafkaConfig.Address = kafkaCluster.ListenAddrs()[0] + ingestCfg.KafkaConfig.Address = cfg.ingestStorageKafka.ListenAddrs()[0] ingestCfg.KafkaConfig.LastProducedOffsetPollInterval = 100 * time.Millisecond } @@ -3927,7 +4110,7 @@ func prepare(t testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, []* populateIngestersData(t, ingesters, cfg.ingesterDataByZone, cfg.ingesterDataTenantID) } - return distributors, ingesters, distributorRegistries, kafkaCluster + return distributors, ingesters, distributorRegistries, cfg.ingestStorageKafka } func populateIngestersData(t testing.TB, ingesters []*mockIngester, dataPerZone map[string][]*mimirpb.WriteRequest, tenantID string) { @@ -4279,6 +4462,10 @@ type mockIngester struct { tokens []uint32 id int circuitBreakerOpen bool + + // partitionReader is responsible to consume a partition from Kafka when the + // ingest storage is enabled. This field is nil if the ingest storage is disabled. + partitionReader *ingest.PartitionReader } func (i *mockIngester) instanceID() string { @@ -4356,25 +4543,23 @@ func (i *mockIngester) Push(ctx context.Context, req *mimirpb.WriteRequest, _ .. return nil, err } - for _, series := range req.Timeseries { + for _, unsafeSeries := range req.Timeseries { + // Make a copy because the request Timeseries are reused. It's important to make a copy + // even if the timeseries already exists in our local map, because we may still retain + // some request's memory when merging exemplars. + series, err := clonePreallocTimeseries(unsafeSeries) + if err != nil { + return nil, err + } + hash := mimirpb.ShardByAllLabelAdapters(orgid, series.Labels) existing, ok := i.timeseries[hash] if !ok { - // Make a copy because the request Timeseries are reused - item := mimirpb.TimeSeries{ - Labels: make([]mimirpb.LabelAdapter, len(series.TimeSeries.Labels)), - Samples: make([]mimirpb.Sample, len(series.TimeSeries.Samples)), - Histograms: make([]mimirpb.Histogram, len(series.TimeSeries.Histograms)), - } - - copy(item.Labels, series.TimeSeries.Labels) - copy(item.Samples, series.TimeSeries.Samples) - copy(item.Histograms, series.TimeSeries.Histograms) - - i.timeseries[hash] = &mimirpb.PreallocTimeseries{TimeSeries: &item} + i.timeseries[hash] = &series } else { existing.Samples = append(existing.Samples, series.Samples...) existing.Histograms = append(existing.Histograms, series.Histograms...) + existing.Exemplars = append(existing.Exemplars, series.Exemplars...) } } @@ -4403,7 +4588,11 @@ func makeWireChunk(c chunk.EncodedChunk) client.Chunk { return chunk } -func (i *mockIngester) QueryStream(_ context.Context, req *client.QueryRequest, _ ...grpc.CallOption) (client.Ingester_QueryStreamClient, error) { +func (i *mockIngester) QueryStream(ctx context.Context, req *client.QueryRequest, _ ...grpc.CallOption) (client.Ingester_QueryStreamClient, error) { + if err := i.enforceReadConsistency(ctx); err != nil { + return nil, err + } + time.Sleep(i.queryDelay) i.Lock() @@ -4559,7 +4748,83 @@ func (i *mockIngester) QueryStream(_ context.Context, req *client.QueryRequest, }, nil } -func (i *mockIngester) MetricsForLabelMatchers(_ context.Context, req *client.MetricsForLabelMatchersRequest, _ ...grpc.CallOption) (*client.MetricsForLabelMatchersResponse, error) { +func (i *mockIngester) QueryExemplars(ctx context.Context, req *client.ExemplarQueryRequest, _ ...grpc.CallOption) (*client.ExemplarQueryResponse, error) { + if err := i.enforceReadConsistency(ctx); err != nil { + return nil, err + } + + i.Lock() + defer i.Unlock() + + i.trackCall("QueryExemplars") + + if !i.happy { + return nil, errFail + } + + from, through, multiMatchers, err := client.FromExemplarQueryRequest(req) + if err != nil { + return nil, err + } + + res := &client.ExemplarQueryResponse{} + + for _, series := range i.timeseries { + seriesMatches := false + seriesLabels := mimirpb.FromLabelAdaptersToLabels(series.Labels) + + // Check if the series matches any of the matchers in the request. + for _, matchers := range multiMatchers { + matcherMatches := true + + for _, matcher := range matchers { + if !matcher.Matches(seriesLabels.Get(matcher.Name)) { + matcherMatches = false + break + } + } + + if matcherMatches { + seriesMatches = true + break + } + } + + if !seriesMatches { + continue + } + + // Filter exemplars by time range. + var exemplars []mimirpb.Exemplar + for _, exemplar := range series.Exemplars { + if exemplar.TimestampMs >= from && exemplar.TimestampMs <= through { + exemplars = append(exemplars, exemplar) + } + } + + if len(exemplars) > 0 { + res.Timeseries = append(res.Timeseries, mimirpb.TimeSeries{ + Labels: series.Labels, + Exemplars: exemplars, + }) + } + } + + // Sort series by labels because the real ingester returns sorted ones. + slices.SortFunc(res.Timeseries, func(a, b mimirpb.TimeSeries) int { + aKey := client.LabelsToKeyString(mimirpb.FromLabelAdaptersToLabels(a.Labels)) + bKey := client.LabelsToKeyString(mimirpb.FromLabelAdaptersToLabels(b.Labels)) + return strings.Compare(aKey, bKey) + }) + + return res, nil +} + +func (i *mockIngester) MetricsForLabelMatchers(ctx context.Context, req *client.MetricsForLabelMatchersRequest, _ ...grpc.CallOption) (*client.MetricsForLabelMatchersResponse, error) { + if err := i.enforceReadConsistency(ctx); err != nil { + return nil, err + } + i.Lock() defer i.Unlock() @@ -4585,7 +4850,11 @@ func (i *mockIngester) MetricsForLabelMatchers(_ context.Context, req *client.Me return &response, nil } -func (i *mockIngester) LabelValues(_ context.Context, req *client.LabelValuesRequest, _ ...grpc.CallOption) (*client.LabelValuesResponse, error) { +func (i *mockIngester) LabelValues(ctx context.Context, req *client.LabelValuesRequest, _ ...grpc.CallOption) (*client.LabelValuesResponse, error) { + if err := i.enforceReadConsistency(ctx); err != nil { + return nil, err + } + i.Lock() defer i.Unlock() @@ -4632,7 +4901,11 @@ func (i *mockIngester) LabelValues(_ context.Context, req *client.LabelValuesReq return &client.LabelValuesResponse{LabelValues: response}, nil } -func (i *mockIngester) LabelNames(_ context.Context, req *client.LabelNamesRequest, _ ...grpc.CallOption) (*client.LabelNamesResponse, error) { +func (i *mockIngester) LabelNames(ctx context.Context, req *client.LabelNamesRequest, _ ...grpc.CallOption) (*client.LabelNamesResponse, error) { + if err := i.enforceReadConsistency(ctx); err != nil { + return nil, err + } + i.Lock() defer i.Unlock() @@ -4660,7 +4933,11 @@ func (i *mockIngester) LabelNames(_ context.Context, req *client.LabelNamesReque return &response, nil } -func (i *mockIngester) MetricsMetadata(context.Context, *client.MetricsMetadataRequest, ...grpc.CallOption) (*client.MetricsMetadataResponse, error) { +func (i *mockIngester) MetricsMetadata(ctx context.Context, _ *client.MetricsMetadataRequest, _ ...grpc.CallOption) (*client.MetricsMetadataResponse, error) { + if err := i.enforceReadConsistency(ctx); err != nil { + return nil, err + } + i.Lock() defer i.Unlock() @@ -4680,7 +4957,11 @@ func (i *mockIngester) MetricsMetadata(context.Context, *client.MetricsMetadataR return resp, nil } -func (i *mockIngester) LabelNamesAndValues(_ context.Context, _ *client.LabelNamesAndValuesRequest, _ ...grpc.CallOption) (client.Ingester_LabelNamesAndValuesClient, error) { +func (i *mockIngester) LabelNamesAndValues(ctx context.Context, _ *client.LabelNamesAndValuesRequest, _ ...grpc.CallOption) (client.Ingester_LabelNamesAndValuesClient, error) { + if err := i.enforceReadConsistency(ctx); err != nil { + return nil, err + } + i.Lock() defer i.Unlock() results := map[string]map[string]struct{}{} @@ -4727,7 +5008,11 @@ func (s *labelNamesAndValuesMockStream) Recv() (*client.LabelNamesAndValuesRespo return result, nil } -func (i *mockIngester) LabelValuesCardinality(_ context.Context, req *client.LabelValuesCardinalityRequest, _ ...grpc.CallOption) (client.Ingester_LabelValuesCardinalityClient, error) { +func (i *mockIngester) LabelValuesCardinality(ctx context.Context, req *client.LabelValuesCardinalityRequest, _ ...grpc.CallOption) (client.Ingester_LabelValuesCardinalityClient, error) { + if err := i.enforceReadConsistency(ctx); err != nil { + return nil, err + } + i.Lock() defer i.Unlock() @@ -4797,7 +5082,11 @@ func (s *labelValuesCardinalityStream) Recv() (*client.LabelValuesCardinalityRes return result, nil } -func (i *mockIngester) ActiveSeries(_ context.Context, req *client.ActiveSeriesRequest, _ ...grpc.CallOption) (client.Ingester_ActiveSeriesClient, error) { +func (i *mockIngester) ActiveSeries(ctx context.Context, req *client.ActiveSeriesRequest, _ ...grpc.CallOption) (client.Ingester_ActiveSeriesClient, error) { + if err := i.enforceReadConsistency(ctx); err != nil { + return nil, err + } + i.Lock() defer i.Unlock() @@ -4866,6 +5155,35 @@ func (i *mockIngester) countCalls(name string) int { return i.calls[name] } +func (i *mockIngester) enforceReadConsistency(ctx context.Context) error { + // Strong read consistency is required to be enforced only if ingest storage is enabled. + if i.partitionReader == nil { + return nil + } + + level, ok := api.ReadConsistencyFromContext(ctx) + if !ok || level != api.ReadConsistencyStrong { + return nil + } + + return i.partitionReader.WaitReadConsistency(ctx) +} + +type mockIngesterPusherAdapter struct { + ingester *mockIngester +} + +func newMockIngesterPusherAdapter(ingester *mockIngester) *mockIngesterPusherAdapter { + return &mockIngesterPusherAdapter{ + ingester: ingester, + } +} + +// Push implements ingest.Pusher. +func (c *mockIngesterPusherAdapter) Push(ctx context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error) { + return c.ingester.Push(ctx, req) +} + // noopIngester is a mocked ingester which does nothing. type noopIngester struct { client.IngesterClient @@ -6211,3 +6529,23 @@ func (m *mockInstanceClient) Push(ctx context.Context, _ *mimirpb.WriteRequest, m.md, _ = metadata.FromOutgoingContext(ctx) return nil, nil } + +func cloneTimeseries(orig *mimirpb.TimeSeries) (*mimirpb.TimeSeries, error) { + data, err := orig.Marshal() + if err != nil { + return nil, err + } + + cloned := &mimirpb.TimeSeries{} + err = cloned.Unmarshal(data) + return cloned, err +} + +func clonePreallocTimeseries(orig mimirpb.PreallocTimeseries) (mimirpb.PreallocTimeseries, error) { + clonedSeries, err := cloneTimeseries(orig.TimeSeries) + if err != nil { + return mimirpb.PreallocTimeseries{}, err + } + + return mimirpb.PreallocTimeseries{TimeSeries: clonedSeries}, nil +} diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 415385f0050..092dca0104c 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -45,6 +45,8 @@ var ( errStreamClosed = cancellation.NewErrorf("stream closed") ) +// QueryExemplars returns exemplars with timestamp between from and to, for the series matching the input series +// label matchers. The exemplars in the response are sorted by series labels. func (d *Distributor) QueryExemplars(ctx context.Context, from, to model.Time, matchers ...[]*labels.Matcher) (*ingester_client.ExemplarQueryResponse, error) { var result *ingester_client.ExemplarQueryResponse err := instrument.CollectedRequest(ctx, "Distributor.QueryExemplars", d.queryDuration, instrument.ErrorCode, func(ctx context.Context) error { @@ -53,18 +55,20 @@ func (d *Distributor) QueryExemplars(ctx context.Context, from, to model.Time, m return err } - // We ask for all ingesters without passing matchers because exemplar queries take in an array of label matchers. - //nolint:staticcheck - replicationSet, err := d.getIngesterReplicationSetForQuery(ctx) + replicationSets, err := d.getIngesterReplicationSetsForQuery(ctx) if err != nil { return err } - result, err = d.queryIngestersExemplars(ctx, replicationSet, req) + results, err := forReplicationSets(ctx, d, replicationSets, func(ctx context.Context, client ingester_client.IngesterClient) (*ingester_client.ExemplarQueryResponse, error) { + return client.QueryExemplars(ctx, req) + }) if err != nil { return err } + result = mergeExemplarQueryResponses(results) + if s := opentracing.SpanFromContext(ctx); s != nil { s.LogKV("series", len(result.Timeseries)) } @@ -135,7 +139,6 @@ func (d *Distributor) getIngesterReplicationSetForQuery(ctx context.Context) (ri // that must be queried for a read operation. // // If multiple ring.ReplicationSets are returned, each must be queried separately, and results merged. -// getIngesterReplicationSetForQuery returns exactly one replication set if ingest storage is disabled. func (d *Distributor) getIngesterReplicationSetsForQuery(ctx context.Context) ([]ring.ReplicationSet, error) { userID, err := tenant.TenantID(ctx) if err != nil { @@ -199,21 +202,6 @@ func mergeExemplarSets(a, b []mimirpb.Exemplar) []mimirpb.Exemplar { return result } -// queryIngestersExemplars queries the ingesters for exemplars. -func (d *Distributor) queryIngestersExemplars(ctx context.Context, replicationSet ring.ReplicationSet, req *ingester_client.ExemplarQueryRequest) (*ingester_client.ExemplarQueryResponse, error) { - // Fetch exemplars from multiple ingesters in parallel, using the replicationSet - // to deal with consistency. - - results, err := forReplicationSet(ctx, d, replicationSet, func(ctx context.Context, client ingester_client.IngesterClient) (*ingester_client.ExemplarQueryResponse, error) { - return client.QueryExemplars(ctx, req) - }) - if err != nil { - return nil, err - } - - return mergeExemplarQueryResponses(results), nil -} - func mergeExemplarQueryResponses(results []*ingester_client.ExemplarQueryResponse) *ingester_client.ExemplarQueryResponse { var keys []string exemplarResults := make(map[string]mimirpb.TimeSeries) diff --git a/pkg/distributor/query_test.go b/pkg/distributor/query_test.go index bc8e4187599..041d63976a7 100644 --- a/pkg/distributor/query_test.go +++ b/pkg/distributor/query_test.go @@ -25,10 +25,146 @@ import ( ingester_client "github.com/grafana/mimir/pkg/ingester/client" "github.com/grafana/mimir/pkg/mimirpb" + "github.com/grafana/mimir/pkg/querier/api" "github.com/grafana/mimir/pkg/querier/stats" "github.com/grafana/mimir/pkg/util/limiter" ) +func TestDistributor_QueryExemplars(t *testing.T) { + const numIngesters = 5 + + now := model.Now() + + fixtures := []mimirpb.PreallocTimeseries{ + // Note: it's important to write at least a sample, otherwise the exemplar timestamp validation doesn't pass. + makeTimeseries([]string{labels.MetricName, "series_1", "namespace", "a"}, makeSamples(int64(now), 1), makeExemplars([]string{"trace_id", "A"}, int64(now), 0)), + makeTimeseries([]string{labels.MetricName, "series_1", "namespace", "b"}, makeSamples(int64(now), 2), makeExemplars([]string{"trace_id", "B"}, int64(now), 0)), + makeTimeseries([]string{labels.MetricName, "series_2", "namespace", "a"}, makeSamples(int64(now), 3), makeExemplars([]string{"trace_id", "C"}, int64(now), 0)), + makeTimeseries([]string{labels.MetricName, "series_2", "namespace", "b"}, makeSamples(int64(now), 4), makeExemplars([]string{"trace_id", "D"}, int64(now), 0)), + } + + tests := map[string]struct { + shuffleShardSize int + multiMatchers [][]*labels.Matcher + maxSeriesPerQuery int + expectedResult []mimirpb.TimeSeries + expectedIngesters int + expectedErr error + }{ + "should return an empty response if no series match": { + multiMatchers: [][]*labels.Matcher{ + {mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "unknown")}, + }, + expectedResult: []mimirpb.TimeSeries{}, + expectedIngesters: numIngesters, + }, + "should filter series by single matcher": { + multiMatchers: [][]*labels.Matcher{ + {mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "series_1")}, + }, + expectedResult: []mimirpb.TimeSeries{ + {Labels: fixtures[0].Labels, Exemplars: fixtures[0].Exemplars}, + {Labels: fixtures[1].Labels, Exemplars: fixtures[1].Exemplars}, + }, + expectedIngesters: numIngesters, + }, + "should filter metrics by multiple matchers": { + multiMatchers: [][]*labels.Matcher{ + {mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "series_1"), mustNewMatcher(labels.MatchEqual, "namespace", "a")}, + }, + expectedResult: []mimirpb.TimeSeries{ + {Labels: fixtures[0].Labels, Exemplars: fixtures[0].Exemplars}, + }, + expectedIngesters: numIngesters, + }, + "should query only ingesters belonging to tenant's shard if shuffle shard size is set": { + multiMatchers: [][]*labels.Matcher{ + {mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "series_1")}, + }, + expectedResult: []mimirpb.TimeSeries{ + {Labels: fixtures[0].Labels, Exemplars: fixtures[0].Exemplars}, + {Labels: fixtures[1].Labels, Exemplars: fixtures[1].Exemplars}, + }, + shuffleShardSize: 3, + expectedIngesters: 3, + }, + } + + for testName, testData := range tests { + testData := testData + + t.Run(testName, func(t *testing.T) { + t.Parallel() + + for _, ingestStorageEnabled := range []bool{false, true} { + ingestStorageEnabled := ingestStorageEnabled + + t.Run(fmt.Sprintf("ingest storage enabled: %t", ingestStorageEnabled), func(t *testing.T) { + t.Parallel() + + testConfig := prepConfig{ + numIngesters: numIngesters, + happyIngesters: numIngesters, + numDistributors: 1, + limits: prepareDefaultLimits(), + } + + // Enable exemplars ingestion. + testConfig.limits.MaxGlobalExemplarsPerUser = 1000 + + if ingestStorageEnabled { + testConfig.ingestStorageEnabled = true + testConfig.ingestStoragePartitions = numIngesters + testConfig.limits.IngestionPartitionsTenantShardSize = testData.shuffleShardSize + } else { + testConfig.shuffleShardSize = testData.shuffleShardSize + } + + // Create distributor + ds, ingesters, _, _ := prepare(t, testConfig) + + // Ensure strong read consistency, required to have no flaky tests when ingest storage is enabled. + ctx := user.InjectOrgID(context.Background(), "test") + ctx = api.ContextWithReadConsistency(ctx, api.ReadConsistencyStrong) + + // Push fixtures. + for _, series := range fixtures { + // Clone the series so that it's safe to be reused. + clonedSeries, err := clonePreallocTimeseries(series) + require.NoError(t, err) + + _, err = ds[0].Push(ctx, &mimirpb.WriteRequest{Timeseries: []mimirpb.PreallocTimeseries{clonedSeries}}) + require.NoError(t, err) + } + + // Query exemplars. + res, err := ds[0].QueryExemplars(ctx, now, now, testData.multiMatchers...) + if testData.expectedErr != nil { + require.ErrorIs(t, err, testData.expectedErr) + return + } + + require.NoError(t, err) + assert.Equal(t, testData.expectedResult, res.Timeseries) + + // Check how many ingesters have been queried. + if ingestStorageEnabled { + // When ingest storage is enabled, we request quorum 1 for each partition. + // In this test each ingester owns a different partition, so we expect all + // ingesters to be queried. + assert.Equal(t, testData.expectedIngesters, countMockIngestersCalled(ingesters, "QueryExemplars")) + } else { + // Due to the quorum the distributor could cancel the last request towards ingesters + // if all other ones are successful, so we're good either has been queried X or X-1 + // ingesters. + assert.Contains(t, []int{testData.expectedIngesters, testData.expectedIngesters - 1}, countMockIngestersCalled(ingesters, "QueryExemplars")) + } + }) + } + }) + } +} + func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunksPerQueryLimitIsReached(t *testing.T) { const limit = 30 // Chunks are duplicated due to replication factor.