Skip to content

Commit

Permalink
Distributor: Don't drop time series with invalid exemplars (#8224)
Browse files Browse the repository at this point in the history
* Distributor: Don't drop time series with invalid exemplars

Don't discard time series with invalid exemplars, just drop affected
exemplars.

---------

Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
Co-authored-by: Nick Pillitteri <56quarters@users.noreply.github.com>
Co-authored-by: Oleg Zaytsev <mail@olegzaytsev.com>
(cherry picked from commit d319b9f)
  • Loading branch information
aknuds1 authored and grafanabot committed May 31, 2024
1 parent 96ed2aa commit 4e07854
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 54 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
* [BUGFIX] Query scheduler: Fix a crash in result marshaling. #8140
* [BUGFIX] Store-gateway: Allow long-running index scans to be interrupted. #8154
* [BUGFIX] Query-frontend: fix splitting of queries using `@ start()` and `@end()` modifiers on a subquery. Previously the `start()` and `end()` would be evaluated using the start end end of the split query instead of the original query. #8162
* [BUGFIX] Distributor: Don't discard time series with invalid exemplars, just drop affected exemplars. #8224

### Mixin

Expand Down
15 changes: 7 additions & 8 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,22 +753,21 @@ func (d *Distributor) validateSeries(nowt time.Time, ts *mimirpb.PreallocTimeser
for i := 0; i < len(ts.Exemplars); {
e := ts.Exemplars[i]
if err := validateExemplar(d.exemplarValidationMetrics, userID, ts.Labels, e); err != nil {
// An exemplar validation error prevents ingesting samples
// in the same series object. However because the current Prometheus
// remote write implementation only populates one or the other,
// there never will be any.
return err
// OTel sends empty exemplars by default which aren't useful and are discarded by TSDB, so let's just skip invalid ones and ingest the data we can instead of returning an error.
ts.DeleteExemplarByMovingLast(i)
// Don't increase index i. After moving the last exemplar to this index, we want to check it again.
continue
}
if !validateExemplarTimestamp(d.exemplarValidationMetrics, userID, minExemplarTS, maxExemplarTS, e) {
ts.DeleteExemplarByMovingLast(i)
// Don't increase index i. After moving last exemplar to this index, we want to check it again.
// Don't increase index i. After moving the last exemplar to this index, we want to check it again.
continue
}
// We want to check if exemplars are in order. If they are not, we will sort them and invalidate the cache.
if isInOrder && previousExemplarTS > ts.Exemplars[i].TimestampMs {
if isInOrder && previousExemplarTS > e.TimestampMs {
isInOrder = false
}
previousExemplarTS = ts.Exemplars[i].TimestampMs
previousExemplarTS = e.TimestampMs
i++
}
if !isInOrder {
Expand Down
119 changes: 74 additions & 45 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1426,63 +1426,84 @@ func TestDistributor_Push_ExemplarValidation(t *testing.T) {
}

tests := map[string]struct {
req *mimirpb.WriteRequest
errMsg string
errID globalerror.ID
req *mimirpb.WriteRequest
expectedDrop bool
expectedErrMsg string
expectedErrID globalerror.ID
}{
"valid exemplar": {
req: makeWriteRequestExemplar([]string{model.MetricNameLabel, "test"}, 1000, []string{"foo", "bar"}),
req: makeWriteRequestExemplar([]string{model.MetricNameLabel, "test"}, 1000, 1, []string{"foo", "bar"}),
},
"rejects exemplar with no labels": {
req: makeWriteRequestExemplar([]string{model.MetricNameLabel, "test"}, 1000, []string{}),
errMsg: `received an exemplar with no valid labels, timestamp: 1000 series: test labels: {}`,
errID: globalerror.ExemplarLabelsMissing,
"drops exemplar with no labels": {
req: makeWriteRequestExemplar([]string{model.MetricNameLabel, "test"}, 1000, 1, []string{}),
expectedDrop: true,
},
"rejects exemplar with no timestamp": {
req: makeWriteRequestExemplar([]string{model.MetricNameLabel, "test"}, 0, []string{"foo", "bar"}),
errMsg: `received an exemplar with no timestamp, timestamp: 0 series: test labels: {foo="bar"}`,
errID: globalerror.ExemplarTimestampInvalid,
"drops exemplar with no timestamp": {
req: makeWriteRequestExemplar([]string{model.MetricNameLabel, "test"}, 0, 1, []string{"foo", "bar"}),
expectedDrop: true,
},
"rejects exemplar with too long labelset": {
req: makeWriteRequestExemplar([]string{model.MetricNameLabel, "test"}, 1000, []string{"foo", strings.Repeat("0", 126)}),
errMsg: fmt.Sprintf(`received an exemplar where the size of its combined labels exceeds the limit of 128 characters, timestamp: 1000 series: test labels: {foo="%s"}`, strings.Repeat("0", 126)),
errID: globalerror.ExemplarLabelsTooLong,
"drops exemplar with too long labelset": {
req: makeWriteRequestExemplar([]string{model.MetricNameLabel, "test"}, 1000, 1, []string{"foo", strings.Repeat("0", 126)}),
expectedDrop: true,
},
"rejects exemplar with too many series labels": {
req: makeWriteRequestExemplar(manyLabels, 0, nil),
errMsg: "received a series whose number of labels exceeds the limit",
errID: globalerror.MaxLabelNamesPerSeries,
req: makeWriteRequestExemplar(manyLabels, 0, 1, nil),
expectedErrMsg: "received a series whose number of labels exceeds the limit",
expectedErrID: globalerror.MaxLabelNamesPerSeries,
},
"rejects exemplar with duplicate series labels": {
req: makeWriteRequestExemplar([]string{model.MetricNameLabel, "test", "foo", "bar", "foo", "bar"}, 0, nil),
errMsg: "received a series with duplicate label name",
errID: globalerror.SeriesWithDuplicateLabelNames,
req: makeWriteRequestExemplar([]string{model.MetricNameLabel, "test", "foo", "bar", "foo", "bar"}, 0, 1, nil),
expectedErrMsg: "received a series with duplicate label name",
expectedErrID: globalerror.SeriesWithDuplicateLabelNames,
},
"rejects exemplar with empty series label name": {
req: makeWriteRequestExemplar([]string{model.MetricNameLabel, "test", "", "bar"}, 0, nil),
errMsg: "received a series with an invalid label",
errID: globalerror.SeriesInvalidLabel,
req: makeWriteRequestExemplar([]string{model.MetricNameLabel, "test", "", "bar"}, 0, 1, nil),
expectedErrMsg: "received a series with an invalid label",
expectedErrID: globalerror.SeriesInvalidLabel,
},
}

for testName, tc := range tests {
t.Run(testName, func(t *testing.T) {
// Pass a copy of the reference request, since Push may modify it during cleanup.
reqBytes, err := tc.req.Marshal()
require.NoError(t, err)
reqCopy := &mimirpb.WriteRequest{}
require.NoError(t, reqCopy.Unmarshal(reqBytes))

expectedSamples := tc.req.Timeseries[0].Samples
expectedExemplars := tc.req.Timeseries[0].Exemplars

limits := prepareDefaultLimits()
limits.MaxGlobalExemplarsPerUser = 10
ds, _, _, _ := prepare(t, prepConfig{
ds, ingesters, _, _ := prepare(t, prepConfig{
limits: limits,
numIngesters: 2,
happyIngesters: 2,
numDistributors: 1,
shuffleShardSize: 0,
})
_, err := ds[0].Push(ctx, tc.req)
if tc.errMsg != "" {
_, err = ds[0].Push(ctx, reqCopy)
if tc.expectedErrMsg != "" {
require.Error(t, err)
fromError, _ := grpcutil.ErrorToStatus(err)
assert.Contains(t, fromError.Message(), tc.errMsg)
assert.Contains(t, fromError.Message(), tc.errID)
} else {
assert.Nil(t, err)
assert.Contains(t, fromError.Message(), tc.expectedErrMsg)
assert.Contains(t, fromError.Message(), tc.expectedErrID)
return
}

require.NoError(t, err)
for _, i := range ingesters {
ss := i.series()
require.Len(t, ss, 1)
for _, s := range ss {
require.Equal(t, expectedSamples, s.Samples)
if !tc.expectedDrop {
require.Equal(t, expectedExemplars, s.Exemplars)
} else {
require.Empty(t, s.Exemplars)
}
}
}
})
}
Expand Down Expand Up @@ -1575,7 +1596,7 @@ func TestDistributor_ExemplarValidation(t *testing.T) {
minExemplarTS: 0,
maxExemplarTS: 0,
req: makeWriteRequestWith(
makeExemplarTimeseries([]string{model.MetricNameLabel, "test1"}, 1000, []string{"foo", "bar"}),
makeExemplarTimeseries([]string{model.MetricNameLabel, "test1"}, 1000, 0, []string{"foo", "bar"}),
),
expectedExemplars: []mimirpb.PreallocTimeseries{
{TimeSeries: &mimirpb.TimeSeries{
Expand All @@ -1591,12 +1612,12 @@ func TestDistributor_ExemplarValidation(t *testing.T) {
minExemplarTS: 0,
maxExemplarTS: math.MaxInt64,
req: makeWriteRequestWith(
makeExemplarTimeseries([]string{model.MetricNameLabel, "test1"}, 1000, []string{"foo", "bar"}),
makeExemplarTimeseries([]string{model.MetricNameLabel, "test2"}, 1000, []string{"foo", "bar"}),
makeExemplarTimeseries([]string{model.MetricNameLabel, "test1"}, 1000, 0, []string{"foo", "bar"}),
makeExemplarTimeseries([]string{model.MetricNameLabel, "test2"}, 1000, 0, []string{"foo", "bar"}),
),
expectedExemplars: []mimirpb.PreallocTimeseries{
makeExemplarTimeseries([]string{model.MetricNameLabel, "test1"}, 1000, []string{"foo", "bar"}),
makeExemplarTimeseries([]string{model.MetricNameLabel, "test2"}, 1000, []string{"foo", "bar"}),
makeExemplarTimeseries([]string{model.MetricNameLabel, "test1"}, 1000, 0, []string{"foo", "bar"}),
makeExemplarTimeseries([]string{model.MetricNameLabel, "test2"}, 1000, 0, []string{"foo", "bar"}),
},
},
"should drop exemplars with timestamp lower than the accepted minimum, when the exemplars are specified in different series": {
Expand All @@ -1606,15 +1627,15 @@ func TestDistributor_ExemplarValidation(t *testing.T) {
minExemplarTS: 300000,
maxExemplarTS: math.MaxInt64,
req: makeWriteRequestWith(
makeExemplarTimeseries([]string{model.MetricNameLabel, "test"}, 1000, []string{"foo", "bar"}),
makeExemplarTimeseries([]string{model.MetricNameLabel, "test"}, 601000, []string{"foo", "bar"}),
makeExemplarTimeseries([]string{model.MetricNameLabel, "test"}, 1000, 0, []string{"foo", "bar"}),
makeExemplarTimeseries([]string{model.MetricNameLabel, "test"}, 601000, 0, []string{"foo", "bar"}),
),
expectedExemplars: []mimirpb.PreallocTimeseries{
{TimeSeries: &mimirpb.TimeSeries{
Labels: []mimirpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "test"}},
Exemplars: []mimirpb.Exemplar{},
}},
makeExemplarTimeseries([]string{model.MetricNameLabel, "test"}, 601000, []string{"foo", "bar"}),
makeExemplarTimeseries([]string{model.MetricNameLabel, "test"}, 601000, 0, []string{"foo", "bar"}),
},
expectedMetrics: `
# HELP cortex_discarded_exemplars_total The total number of exemplars that were discarded.
Expand Down Expand Up @@ -5745,8 +5766,8 @@ func makeWriteRequestForGenerators(series int, lsg labelSetGen, elsg labelSetGen
return request
}

func makeWriteRequestExemplar(seriesLabels []string, timestamp int64, exemplarLabels []string) *mimirpb.WriteRequest {
return makeWriteRequestWith(makeExemplarTimeseries(seriesLabels, timestamp, exemplarLabels))
func makeWriteRequestExemplar(seriesLabels []string, timestamp int64, samples int, exemplarLabels []string) *mimirpb.WriteRequest {
return makeWriteRequestWith(makeExemplarTimeseries(seriesLabels, timestamp, samples, exemplarLabels))
}

func makeWriteRequestHistogram(seriesLabels []string, timestamp int64, histogram *histogram.Histogram) *mimirpb.WriteRequest {
Expand All @@ -5757,10 +5778,18 @@ func makeWriteRequestFloatHistogram(seriesLabels []string, timestamp int64, hist
return makeWriteRequestWith(makeFloatHistogramTimeseries(seriesLabels, timestamp, histogram))
}

func makeExemplarTimeseries(seriesLabels []string, timestamp int64, exemplarLabels []string) mimirpb.PreallocTimeseries {
func makeExemplarTimeseries(seriesLabels []string, timestamp int64, numSamples int, exemplarLabels []string) mimirpb.PreallocTimeseries {
var samples []mimirpb.Sample
for i := 0; i < numSamples; i++ {
samples = append(samples, mimirpb.Sample{
Value: 1,
TimestampMs: timestamp,
})
}
return mimirpb.PreallocTimeseries{
TimeSeries: &mimirpb.TimeSeries{
Labels: mimirpb.FromLabelsToLabelAdapters(labels.FromStrings(seriesLabels...)),
Labels: mimirpb.FromLabelsToLabelAdapters(labels.FromStrings(seriesLabels...)),
Samples: samples,
Exemplars: []mimirpb.Exemplar{
{
Labels: mimirpb.FromLabelsToLabelAdapters(labels.FromStrings(exemplarLabels...)),
Expand Down Expand Up @@ -6824,6 +6853,7 @@ func TestDistributorValidation(t *testing.T) {
}},
expectedErr: status.New(codes.FailedPrecondition, metadataMetricNameMissingMsgFormat),
},
// Validation passes for empty exemplar labels, since we just want to skip the exemplars and not fail the time series as a whole.
"empty exemplar labels": {
metadata: []*mimirpb.MetricMetadata{{MetricFamilyName: "testmetric", Help: "a test metric.", Unit: "", Type: mimirpb.COUNTER}},
labels: [][]mimirpb.LabelAdapter{{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}}},
Expand All @@ -6836,7 +6866,6 @@ func TestDistributorValidation(t *testing.T) {
TimestampMs: int64(now),
Value: 1,
}},
expectedErr: status.New(codes.FailedPrecondition, fmt.Sprintf(exemplarEmptyLabelsMsgFormat, now, `testmetric{foo="bar"}`, "{}")),
},
} {
t.Run(name, func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/mimirpb/timeseries.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (p *PreallocTimeseries) HistogramsUpdated() {
p.clearUnmarshalData()
}

// DeleteExemplarByMovingLast deletes the exemplar by moving the last one on top and shortening the slice
// DeleteExemplarByMovingLast deletes the exemplar by moving the last one on top and shortening the slice.
func (p *PreallocTimeseries) DeleteExemplarByMovingLast(ix int) {
last := len(p.Exemplars) - 1
if ix < last {
Expand Down

0 comments on commit 4e07854

Please sign in to comment.