From 4e078546e42faa6d9968292cffbae0bb563a775d Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Fri, 31 May 2024 12:33:11 +0200 Subject: [PATCH] Distributor: Don't drop time series with invalid exemplars (#8224) * Distributor: Don't drop time series with invalid exemplars Don't discard time series with invalid exemplars, just drop affected exemplars. --------- Signed-off-by: Arve Knudsen Co-authored-by: Nick Pillitteri <56quarters@users.noreply.github.com> Co-authored-by: Oleg Zaytsev (cherry picked from commit d319b9ff9d0dbdfed71e98526e2bf235b66e04dd) --- CHANGELOG.md | 1 + pkg/distributor/distributor.go | 15 ++-- pkg/distributor/distributor_test.go | 119 +++++++++++++++++----------- pkg/mimirpb/timeseries.go | 2 +- 4 files changed, 83 insertions(+), 54 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 901b19705f3..c29e350689c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -71,6 +71,7 @@ * [BUGFIX] Query scheduler: Fix a crash in result marshaling. #8140 * [BUGFIX] Store-gateway: Allow long-running index scans to be interrupted. #8154 * [BUGFIX] Query-frontend: fix splitting of queries using `@ start()` and `@end()` modifiers on a subquery. Previously the `start()` and `end()` would be evaluated using the start end end of the split query instead of the original query. #8162 +* [BUGFIX] Distributor: Don't discard time series with invalid exemplars, just drop affected exemplars. #8224 ### Mixin diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 025862ef222..6913f42b770 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -753,22 +753,21 @@ func (d *Distributor) validateSeries(nowt time.Time, ts *mimirpb.PreallocTimeser for i := 0; i < len(ts.Exemplars); { e := ts.Exemplars[i] if err := validateExemplar(d.exemplarValidationMetrics, userID, ts.Labels, e); err != nil { - // An exemplar validation error prevents ingesting samples - // in the same series object. However because the current Prometheus - // remote write implementation only populates one or the other, - // there never will be any. - return err + // OTel sends empty exemplars by default which aren't useful and are discarded by TSDB, so let's just skip invalid ones and ingest the data we can instead of returning an error. + ts.DeleteExemplarByMovingLast(i) + // Don't increase index i. After moving the last exemplar to this index, we want to check it again. + continue } if !validateExemplarTimestamp(d.exemplarValidationMetrics, userID, minExemplarTS, maxExemplarTS, e) { ts.DeleteExemplarByMovingLast(i) - // Don't increase index i. After moving last exemplar to this index, we want to check it again. + // Don't increase index i. After moving the last exemplar to this index, we want to check it again. continue } // We want to check if exemplars are in order. If they are not, we will sort them and invalidate the cache. - if isInOrder && previousExemplarTS > ts.Exemplars[i].TimestampMs { + if isInOrder && previousExemplarTS > e.TimestampMs { isInOrder = false } - previousExemplarTS = ts.Exemplars[i].TimestampMs + previousExemplarTS = e.TimestampMs i++ } if !isInOrder { diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 1fdf0718227..129ec0e56a9 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -1426,63 +1426,84 @@ func TestDistributor_Push_ExemplarValidation(t *testing.T) { } tests := map[string]struct { - req *mimirpb.WriteRequest - errMsg string - errID globalerror.ID + req *mimirpb.WriteRequest + expectedDrop bool + expectedErrMsg string + expectedErrID globalerror.ID }{ "valid exemplar": { - req: makeWriteRequestExemplar([]string{model.MetricNameLabel, "test"}, 1000, []string{"foo", "bar"}), + req: makeWriteRequestExemplar([]string{model.MetricNameLabel, "test"}, 1000, 1, []string{"foo", "bar"}), }, - "rejects exemplar with no labels": { - req: makeWriteRequestExemplar([]string{model.MetricNameLabel, "test"}, 1000, []string{}), - errMsg: `received an exemplar with no valid labels, timestamp: 1000 series: test labels: {}`, - errID: globalerror.ExemplarLabelsMissing, + "drops exemplar with no labels": { + req: makeWriteRequestExemplar([]string{model.MetricNameLabel, "test"}, 1000, 1, []string{}), + expectedDrop: true, }, - "rejects exemplar with no timestamp": { - req: makeWriteRequestExemplar([]string{model.MetricNameLabel, "test"}, 0, []string{"foo", "bar"}), - errMsg: `received an exemplar with no timestamp, timestamp: 0 series: test labels: {foo="bar"}`, - errID: globalerror.ExemplarTimestampInvalid, + "drops exemplar with no timestamp": { + req: makeWriteRequestExemplar([]string{model.MetricNameLabel, "test"}, 0, 1, []string{"foo", "bar"}), + expectedDrop: true, }, - "rejects exemplar with too long labelset": { - req: makeWriteRequestExemplar([]string{model.MetricNameLabel, "test"}, 1000, []string{"foo", strings.Repeat("0", 126)}), - errMsg: fmt.Sprintf(`received an exemplar where the size of its combined labels exceeds the limit of 128 characters, timestamp: 1000 series: test labels: {foo="%s"}`, strings.Repeat("0", 126)), - errID: globalerror.ExemplarLabelsTooLong, + "drops exemplar with too long labelset": { + req: makeWriteRequestExemplar([]string{model.MetricNameLabel, "test"}, 1000, 1, []string{"foo", strings.Repeat("0", 126)}), + expectedDrop: true, }, "rejects exemplar with too many series labels": { - req: makeWriteRequestExemplar(manyLabels, 0, nil), - errMsg: "received a series whose number of labels exceeds the limit", - errID: globalerror.MaxLabelNamesPerSeries, + req: makeWriteRequestExemplar(manyLabels, 0, 1, nil), + expectedErrMsg: "received a series whose number of labels exceeds the limit", + expectedErrID: globalerror.MaxLabelNamesPerSeries, }, "rejects exemplar with duplicate series labels": { - req: makeWriteRequestExemplar([]string{model.MetricNameLabel, "test", "foo", "bar", "foo", "bar"}, 0, nil), - errMsg: "received a series with duplicate label name", - errID: globalerror.SeriesWithDuplicateLabelNames, + req: makeWriteRequestExemplar([]string{model.MetricNameLabel, "test", "foo", "bar", "foo", "bar"}, 0, 1, nil), + expectedErrMsg: "received a series with duplicate label name", + expectedErrID: globalerror.SeriesWithDuplicateLabelNames, }, "rejects exemplar with empty series label name": { - req: makeWriteRequestExemplar([]string{model.MetricNameLabel, "test", "", "bar"}, 0, nil), - errMsg: "received a series with an invalid label", - errID: globalerror.SeriesInvalidLabel, + req: makeWriteRequestExemplar([]string{model.MetricNameLabel, "test", "", "bar"}, 0, 1, nil), + expectedErrMsg: "received a series with an invalid label", + expectedErrID: globalerror.SeriesInvalidLabel, }, } for testName, tc := range tests { t.Run(testName, func(t *testing.T) { + // Pass a copy of the reference request, since Push may modify it during cleanup. + reqBytes, err := tc.req.Marshal() + require.NoError(t, err) + reqCopy := &mimirpb.WriteRequest{} + require.NoError(t, reqCopy.Unmarshal(reqBytes)) + + expectedSamples := tc.req.Timeseries[0].Samples + expectedExemplars := tc.req.Timeseries[0].Exemplars + limits := prepareDefaultLimits() limits.MaxGlobalExemplarsPerUser = 10 - ds, _, _, _ := prepare(t, prepConfig{ + ds, ingesters, _, _ := prepare(t, prepConfig{ limits: limits, numIngesters: 2, happyIngesters: 2, numDistributors: 1, shuffleShardSize: 0, }) - _, err := ds[0].Push(ctx, tc.req) - if tc.errMsg != "" { + _, err = ds[0].Push(ctx, reqCopy) + if tc.expectedErrMsg != "" { + require.Error(t, err) fromError, _ := grpcutil.ErrorToStatus(err) - assert.Contains(t, fromError.Message(), tc.errMsg) - assert.Contains(t, fromError.Message(), tc.errID) - } else { - assert.Nil(t, err) + assert.Contains(t, fromError.Message(), tc.expectedErrMsg) + assert.Contains(t, fromError.Message(), tc.expectedErrID) + return + } + + require.NoError(t, err) + for _, i := range ingesters { + ss := i.series() + require.Len(t, ss, 1) + for _, s := range ss { + require.Equal(t, expectedSamples, s.Samples) + if !tc.expectedDrop { + require.Equal(t, expectedExemplars, s.Exemplars) + } else { + require.Empty(t, s.Exemplars) + } + } } }) } @@ -1575,7 +1596,7 @@ func TestDistributor_ExemplarValidation(t *testing.T) { minExemplarTS: 0, maxExemplarTS: 0, req: makeWriteRequestWith( - makeExemplarTimeseries([]string{model.MetricNameLabel, "test1"}, 1000, []string{"foo", "bar"}), + makeExemplarTimeseries([]string{model.MetricNameLabel, "test1"}, 1000, 0, []string{"foo", "bar"}), ), expectedExemplars: []mimirpb.PreallocTimeseries{ {TimeSeries: &mimirpb.TimeSeries{ @@ -1591,12 +1612,12 @@ func TestDistributor_ExemplarValidation(t *testing.T) { minExemplarTS: 0, maxExemplarTS: math.MaxInt64, req: makeWriteRequestWith( - makeExemplarTimeseries([]string{model.MetricNameLabel, "test1"}, 1000, []string{"foo", "bar"}), - makeExemplarTimeseries([]string{model.MetricNameLabel, "test2"}, 1000, []string{"foo", "bar"}), + makeExemplarTimeseries([]string{model.MetricNameLabel, "test1"}, 1000, 0, []string{"foo", "bar"}), + makeExemplarTimeseries([]string{model.MetricNameLabel, "test2"}, 1000, 0, []string{"foo", "bar"}), ), expectedExemplars: []mimirpb.PreallocTimeseries{ - makeExemplarTimeseries([]string{model.MetricNameLabel, "test1"}, 1000, []string{"foo", "bar"}), - makeExemplarTimeseries([]string{model.MetricNameLabel, "test2"}, 1000, []string{"foo", "bar"}), + makeExemplarTimeseries([]string{model.MetricNameLabel, "test1"}, 1000, 0, []string{"foo", "bar"}), + makeExemplarTimeseries([]string{model.MetricNameLabel, "test2"}, 1000, 0, []string{"foo", "bar"}), }, }, "should drop exemplars with timestamp lower than the accepted minimum, when the exemplars are specified in different series": { @@ -1606,15 +1627,15 @@ func TestDistributor_ExemplarValidation(t *testing.T) { minExemplarTS: 300000, maxExemplarTS: math.MaxInt64, req: makeWriteRequestWith( - makeExemplarTimeseries([]string{model.MetricNameLabel, "test"}, 1000, []string{"foo", "bar"}), - makeExemplarTimeseries([]string{model.MetricNameLabel, "test"}, 601000, []string{"foo", "bar"}), + makeExemplarTimeseries([]string{model.MetricNameLabel, "test"}, 1000, 0, []string{"foo", "bar"}), + makeExemplarTimeseries([]string{model.MetricNameLabel, "test"}, 601000, 0, []string{"foo", "bar"}), ), expectedExemplars: []mimirpb.PreallocTimeseries{ {TimeSeries: &mimirpb.TimeSeries{ Labels: []mimirpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "test"}}, Exemplars: []mimirpb.Exemplar{}, }}, - makeExemplarTimeseries([]string{model.MetricNameLabel, "test"}, 601000, []string{"foo", "bar"}), + makeExemplarTimeseries([]string{model.MetricNameLabel, "test"}, 601000, 0, []string{"foo", "bar"}), }, expectedMetrics: ` # HELP cortex_discarded_exemplars_total The total number of exemplars that were discarded. @@ -5745,8 +5766,8 @@ func makeWriteRequestForGenerators(series int, lsg labelSetGen, elsg labelSetGen return request } -func makeWriteRequestExemplar(seriesLabels []string, timestamp int64, exemplarLabels []string) *mimirpb.WriteRequest { - return makeWriteRequestWith(makeExemplarTimeseries(seriesLabels, timestamp, exemplarLabels)) +func makeWriteRequestExemplar(seriesLabels []string, timestamp int64, samples int, exemplarLabels []string) *mimirpb.WriteRequest { + return makeWriteRequestWith(makeExemplarTimeseries(seriesLabels, timestamp, samples, exemplarLabels)) } func makeWriteRequestHistogram(seriesLabels []string, timestamp int64, histogram *histogram.Histogram) *mimirpb.WriteRequest { @@ -5757,10 +5778,18 @@ func makeWriteRequestFloatHistogram(seriesLabels []string, timestamp int64, hist return makeWriteRequestWith(makeFloatHistogramTimeseries(seriesLabels, timestamp, histogram)) } -func makeExemplarTimeseries(seriesLabels []string, timestamp int64, exemplarLabels []string) mimirpb.PreallocTimeseries { +func makeExemplarTimeseries(seriesLabels []string, timestamp int64, numSamples int, exemplarLabels []string) mimirpb.PreallocTimeseries { + var samples []mimirpb.Sample + for i := 0; i < numSamples; i++ { + samples = append(samples, mimirpb.Sample{ + Value: 1, + TimestampMs: timestamp, + }) + } return mimirpb.PreallocTimeseries{ TimeSeries: &mimirpb.TimeSeries{ - Labels: mimirpb.FromLabelsToLabelAdapters(labels.FromStrings(seriesLabels...)), + Labels: mimirpb.FromLabelsToLabelAdapters(labels.FromStrings(seriesLabels...)), + Samples: samples, Exemplars: []mimirpb.Exemplar{ { Labels: mimirpb.FromLabelsToLabelAdapters(labels.FromStrings(exemplarLabels...)), @@ -6824,6 +6853,7 @@ func TestDistributorValidation(t *testing.T) { }}, expectedErr: status.New(codes.FailedPrecondition, metadataMetricNameMissingMsgFormat), }, + // Validation passes for empty exemplar labels, since we just want to skip the exemplars and not fail the time series as a whole. "empty exemplar labels": { metadata: []*mimirpb.MetricMetadata{{MetricFamilyName: "testmetric", Help: "a test metric.", Unit: "", Type: mimirpb.COUNTER}}, labels: [][]mimirpb.LabelAdapter{{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}}}, @@ -6836,7 +6866,6 @@ func TestDistributorValidation(t *testing.T) { TimestampMs: int64(now), Value: 1, }}, - expectedErr: status.New(codes.FailedPrecondition, fmt.Sprintf(exemplarEmptyLabelsMsgFormat, now, `testmetric{foo="bar"}`, "{}")), }, } { t.Run(name, func(t *testing.T) { diff --git a/pkg/mimirpb/timeseries.go b/pkg/mimirpb/timeseries.go index 95ca5c4961e..157b1ede8f2 100644 --- a/pkg/mimirpb/timeseries.go +++ b/pkg/mimirpb/timeseries.go @@ -175,7 +175,7 @@ func (p *PreallocTimeseries) HistogramsUpdated() { p.clearUnmarshalData() } -// DeleteExemplarByMovingLast deletes the exemplar by moving the last one on top and shortening the slice +// DeleteExemplarByMovingLast deletes the exemplar by moving the last one on top and shortening the slice. func (p *PreallocTimeseries) DeleteExemplarByMovingLast(ix int) { last := len(p.Exemplars) - 1 if ix < last {