Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributor: Don't drop time series with invalid exemplars #8224

Merged
merged 9 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
* [BUGFIX] Query scheduler: Fix a crash in result marshaling. #8140
* [BUGFIX] Store-gateway: Allow long-running index scans to be interrupted. #8154
* [BUGFIX] Query-frontend: fix splitting of queries using `@ start()` and `@end()` modifiers on a subquery. Previously the `start()` and `end()` would be evaluated using the start end end of the split query instead of the original query. #8162
* [BUGFIX] Distributor: Don't discard time series with invalid exemplars, just drop affected exemplars. #8224

### Mixin

Expand Down
15 changes: 7 additions & 8 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,22 +757,21 @@ func (d *Distributor) validateSeries(nowt time.Time, ts *mimirpb.PreallocTimeser
for i := 0; i < len(ts.Exemplars); {
e := ts.Exemplars[i]
if err := validateExemplar(d.exemplarValidationMetrics, userID, ts.Labels, e); err != nil {
// An exemplar validation error prevents ingesting samples
aknuds1 marked this conversation as resolved.
Show resolved Hide resolved
// in the same series object. However because the current Prometheus
// remote write implementation only populates one or the other,
// there never will be any.
return err
// OTel sends empty exemplars by default which aren't useful and are discarded by TSDB, so let's just skip invalid ones and ingest the data we can instead of returning an error.
ts.DeleteExemplarByMovingLast(i)
// Don't increase index i. After moving the last exemplar to this index, we want to check it again.
continue
}
if !validateExemplarTimestamp(d.exemplarValidationMetrics, userID, minExemplarTS, maxExemplarTS, e) {
ts.DeleteExemplarByMovingLast(i)
// Don't increase index i. After moving last exemplar to this index, we want to check it again.
// Don't increase index i. After moving the last exemplar to this index, we want to check it again.
continue
}
// We want to check if exemplars are in order. If they are not, we will sort them and invalidate the cache.
if isInOrder && previousExemplarTS > ts.Exemplars[i].TimestampMs {
if isInOrder && previousExemplarTS > e.TimestampMs {
isInOrder = false
}
previousExemplarTS = ts.Exemplars[i].TimestampMs
previousExemplarTS = e.TimestampMs
i++
}
if !isInOrder {
Expand Down
118 changes: 74 additions & 44 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1426,63 +1426,85 @@ func TestDistributor_Push_ExemplarValidation(t *testing.T) {
}

tests := map[string]struct {
req *mimirpb.WriteRequest
errMsg string
errID globalerror.ID
req *mimirpb.WriteRequest
expectedDrop bool
expectedErrMsg string
expectedErrID globalerror.ID
}{
"valid exemplar": {
req: makeWriteRequestExemplar([]string{model.MetricNameLabel, "test"}, 1000, []string{"foo", "bar"}),
req: makeWriteRequestExemplar([]string{model.MetricNameLabel, "test"}, 1000, 1, []string{"foo", "bar"}),
},
"rejects exemplar with no labels": {
req: makeWriteRequestExemplar([]string{model.MetricNameLabel, "test"}, 1000, []string{}),
errMsg: `received an exemplar with no valid labels, timestamp: 1000 series: test labels: {}`,
errID: globalerror.ExemplarLabelsMissing,
"drops exemplar with no labels": {
req: makeWriteRequestExemplar([]string{model.MetricNameLabel, "test"}, 1000, 1, []string{}),
expectedDrop: true,
},
"rejects exemplar with no timestamp": {
req: makeWriteRequestExemplar([]string{model.MetricNameLabel, "test"}, 0, []string{"foo", "bar"}),
errMsg: `received an exemplar with no timestamp, timestamp: 0 series: test labels: {foo="bar"}`,
errID: globalerror.ExemplarTimestampInvalid,
"drops exemplar with no timestamp": {
req: makeWriteRequestExemplar([]string{model.MetricNameLabel, "test"}, 0, 1, []string{"foo", "bar"}),
expectedDrop: true,
},
"rejects exemplar with too long labelset": {
req: makeWriteRequestExemplar([]string{model.MetricNameLabel, "test"}, 1000, []string{"foo", strings.Repeat("0", 126)}),
errMsg: fmt.Sprintf(`received an exemplar where the size of its combined labels exceeds the limit of 128 characters, timestamp: 1000 series: test labels: {foo="%s"}`, strings.Repeat("0", 126)),
errID: globalerror.ExemplarLabelsTooLong,
"drops exemplar with too long labelset": {
req: makeWriteRequestExemplar([]string{model.MetricNameLabel, "test"}, 1000, 1, []string{"foo", strings.Repeat("0", 126)}),
expectedDrop: true,
},
"rejects exemplar with too many series labels": {
req: makeWriteRequestExemplar(manyLabels, 0, nil),
errMsg: "received a series whose number of labels exceeds the limit",
errID: globalerror.MaxLabelNamesPerSeries,
req: makeWriteRequestExemplar(manyLabels, 0, 1, nil),
expectedErrMsg: "received a series whose number of labels exceeds the limit",
expectedErrID: globalerror.MaxLabelNamesPerSeries,
},
"rejects exemplar with duplicate series labels": {
req: makeWriteRequestExemplar([]string{model.MetricNameLabel, "test", "foo", "bar", "foo", "bar"}, 0, nil),
errMsg: "received a series with duplicate label name",
errID: globalerror.SeriesWithDuplicateLabelNames,
req: makeWriteRequestExemplar([]string{model.MetricNameLabel, "test", "foo", "bar", "foo", "bar"}, 0, 1, nil),
expectedErrMsg: "received a series with duplicate label name",
expectedErrID: globalerror.SeriesWithDuplicateLabelNames,
},
"rejects exemplar with empty series label name": {
req: makeWriteRequestExemplar([]string{model.MetricNameLabel, "test", "", "bar"}, 0, nil),
errMsg: "received a series with an invalid label",
errID: globalerror.SeriesInvalidLabel,
req: makeWriteRequestExemplar([]string{model.MetricNameLabel, "test", "", "bar"}, 0, 1, nil),
expectedErrMsg: "received a series with an invalid label",
expectedErrID: globalerror.SeriesInvalidLabel,
Comment on lines 1449 to +1462

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, why inconsistency with the above? (we are looking on this with @cstyan regarding Prometheus PRW 2.0 handler.

},
}

for testName, tc := range tests {
t.Run(testName, func(t *testing.T) {
expectedSamples := tc.req.Timeseries[0].Samples
// During the call to Push, the labels of tc.req's exemplars are cleared, probably during request cleanup
// (in order to avoid retaining references to unmarshaled input bytes).
// Therefore, make a deep copy.
expectedExemplars := make([]mimirpb.Exemplar, len(tc.req.Timeseries[0].Exemplars))
copy(expectedExemplars, tc.req.Timeseries[0].Exemplars)
for i, e := range expectedExemplars {
expectedExemplars[i].Labels = make([]mimirpb.LabelAdapter, len(e.Labels))
copy(expectedExemplars[i].Labels, e.Labels)
}
aknuds1 marked this conversation as resolved.
Show resolved Hide resolved
limits := prepareDefaultLimits()
limits.MaxGlobalExemplarsPerUser = 10
ds, _, _, _ := prepare(t, prepConfig{
ds, ingesters, _, _ := prepare(t, prepConfig{
limits: limits,
numIngesters: 2,
happyIngesters: 2,
numDistributors: 1,
shuffleShardSize: 0,
})
_, err := ds[0].Push(ctx, tc.req)
if tc.errMsg != "" {
if tc.expectedErrMsg != "" {
require.Error(t, err)
fromError, _ := grpcutil.ErrorToStatus(err)
assert.Contains(t, fromError.Message(), tc.errMsg)
assert.Contains(t, fromError.Message(), tc.errID)
} else {
assert.Nil(t, err)
assert.Contains(t, fromError.Message(), tc.expectedErrMsg)
assert.Contains(t, fromError.Message(), tc.expectedErrID)
return
}

require.NoError(t, err)
for _, i := range ingesters {
ss := i.series()
require.Len(t, ss, 1)
for _, s := range ss {
require.Equal(t, expectedSamples, s.Samples)
if !tc.expectedDrop {
require.Equal(t, expectedExemplars, s.Exemplars)
} else {
require.Empty(t, s.Exemplars)
}
}
}
})
}
Expand Down Expand Up @@ -1575,7 +1597,7 @@ func TestDistributor_ExemplarValidation(t *testing.T) {
minExemplarTS: 0,
maxExemplarTS: 0,
req: makeWriteRequestWith(
makeExemplarTimeseries([]string{model.MetricNameLabel, "test1"}, 1000, []string{"foo", "bar"}),
makeExemplarTimeseries([]string{model.MetricNameLabel, "test1"}, 1000, 0, []string{"foo", "bar"}),
),
expectedExemplars: []mimirpb.PreallocTimeseries{
{TimeSeries: &mimirpb.TimeSeries{
Expand All @@ -1591,12 +1613,12 @@ func TestDistributor_ExemplarValidation(t *testing.T) {
minExemplarTS: 0,
maxExemplarTS: math.MaxInt64,
req: makeWriteRequestWith(
makeExemplarTimeseries([]string{model.MetricNameLabel, "test1"}, 1000, []string{"foo", "bar"}),
makeExemplarTimeseries([]string{model.MetricNameLabel, "test2"}, 1000, []string{"foo", "bar"}),
makeExemplarTimeseries([]string{model.MetricNameLabel, "test1"}, 1000, 0, []string{"foo", "bar"}),
makeExemplarTimeseries([]string{model.MetricNameLabel, "test2"}, 1000, 0, []string{"foo", "bar"}),
),
expectedExemplars: []mimirpb.PreallocTimeseries{
makeExemplarTimeseries([]string{model.MetricNameLabel, "test1"}, 1000, []string{"foo", "bar"}),
makeExemplarTimeseries([]string{model.MetricNameLabel, "test2"}, 1000, []string{"foo", "bar"}),
makeExemplarTimeseries([]string{model.MetricNameLabel, "test1"}, 1000, 0, []string{"foo", "bar"}),
makeExemplarTimeseries([]string{model.MetricNameLabel, "test2"}, 1000, 0, []string{"foo", "bar"}),
},
},
"should drop exemplars with timestamp lower than the accepted minimum, when the exemplars are specified in different series": {
Expand All @@ -1606,15 +1628,15 @@ func TestDistributor_ExemplarValidation(t *testing.T) {
minExemplarTS: 300000,
maxExemplarTS: math.MaxInt64,
req: makeWriteRequestWith(
makeExemplarTimeseries([]string{model.MetricNameLabel, "test"}, 1000, []string{"foo", "bar"}),
makeExemplarTimeseries([]string{model.MetricNameLabel, "test"}, 601000, []string{"foo", "bar"}),
makeExemplarTimeseries([]string{model.MetricNameLabel, "test"}, 1000, 0, []string{"foo", "bar"}),
makeExemplarTimeseries([]string{model.MetricNameLabel, "test"}, 601000, 0, []string{"foo", "bar"}),
),
expectedExemplars: []mimirpb.PreallocTimeseries{
{TimeSeries: &mimirpb.TimeSeries{
Labels: []mimirpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "test"}},
Exemplars: []mimirpb.Exemplar{},
}},
makeExemplarTimeseries([]string{model.MetricNameLabel, "test"}, 601000, []string{"foo", "bar"}),
makeExemplarTimeseries([]string{model.MetricNameLabel, "test"}, 601000, 0, []string{"foo", "bar"}),
},
expectedMetrics: `
# HELP cortex_discarded_exemplars_total The total number of exemplars that were discarded.
Expand Down Expand Up @@ -5745,8 +5767,8 @@ func makeWriteRequestForGenerators(series int, lsg labelSetGen, elsg labelSetGen
return request
}

func makeWriteRequestExemplar(seriesLabels []string, timestamp int64, exemplarLabels []string) *mimirpb.WriteRequest {
return makeWriteRequestWith(makeExemplarTimeseries(seriesLabels, timestamp, exemplarLabels))
func makeWriteRequestExemplar(seriesLabels []string, timestamp int64, samples int, exemplarLabels []string) *mimirpb.WriteRequest {
return makeWriteRequestWith(makeExemplarTimeseries(seriesLabels, timestamp, samples, exemplarLabels))
}

func makeWriteRequestHistogram(seriesLabels []string, timestamp int64, histogram *histogram.Histogram) *mimirpb.WriteRequest {
Expand All @@ -5757,10 +5779,18 @@ func makeWriteRequestFloatHistogram(seriesLabels []string, timestamp int64, hist
return makeWriteRequestWith(makeFloatHistogramTimeseries(seriesLabels, timestamp, histogram))
}

func makeExemplarTimeseries(seriesLabels []string, timestamp int64, exemplarLabels []string) mimirpb.PreallocTimeseries {
func makeExemplarTimeseries(seriesLabels []string, timestamp int64, numSamples int, exemplarLabels []string) mimirpb.PreallocTimeseries {
var samples []mimirpb.Sample
for i := 0; i < numSamples; i++ {
samples = append(samples, mimirpb.Sample{
Value: 1,
TimestampMs: timestamp,
})
}
return mimirpb.PreallocTimeseries{
TimeSeries: &mimirpb.TimeSeries{
Labels: mimirpb.FromLabelsToLabelAdapters(labels.FromStrings(seriesLabels...)),
Labels: mimirpb.FromLabelsToLabelAdapters(labels.FromStrings(seriesLabels...)),
Samples: samples,
Exemplars: []mimirpb.Exemplar{
{
Labels: mimirpb.FromLabelsToLabelAdapters(labels.FromStrings(exemplarLabels...)),
Expand Down Expand Up @@ -6824,6 +6854,7 @@ func TestDistributorValidation(t *testing.T) {
}},
expectedErr: status.New(codes.FailedPrecondition, metadataMetricNameMissingMsgFormat),
},
// Validation passes for empty exemplar labels, since we just want to skip the exemplars and not fail the time series as a whole.
"empty exemplar labels": {
metadata: []*mimirpb.MetricMetadata{{MetricFamilyName: "testmetric", Help: "a test metric.", Unit: "", Type: mimirpb.COUNTER}},
labels: [][]mimirpb.LabelAdapter{{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}}},
Expand All @@ -6836,7 +6867,6 @@ func TestDistributorValidation(t *testing.T) {
TimestampMs: int64(now),
Value: 1,
}},
expectedErr: status.New(codes.FailedPrecondition, fmt.Sprintf(exemplarEmptyLabelsMsgFormat, now, `testmetric{foo="bar"}`, "{}")),
},
} {
t.Run(name, func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/mimirpb/timeseries.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (p *PreallocTimeseries) HistogramsUpdated() {
p.clearUnmarshalData()
}

// DeleteExemplarByMovingLast deletes the exemplar by moving the last one on top and shortening the slice
// DeleteExemplarByMovingLast deletes the exemplar by moving the last one on top and shortening the slice.
func (p *PreallocTimeseries) DeleteExemplarByMovingLast(ix int) {
last := len(p.Exemplars) - 1
if ix < last {
Expand Down
Loading