Skip to content

Commit

Permalink
Move error samplers from Limiter to Ingester (#6014)
Browse files Browse the repository at this point in the history
* Move error samplers from Limiter to Ingester

Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>

* Improve TestIngester_SampledUserLimitExceeded and TestIngester_SampledMetricLimitExceeded

Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>

* Improve CHANGELOG.md

Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>

* Fixing review findings

Co-authored-by: Oleg Zaytsev <mail@olegzaytsev.com>

---------

Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>
Co-authored-by: Oleg Zaytsev <mail@olegzaytsev.com>
  • Loading branch information
duricanikolic and colega committed Sep 13, 2023
1 parent f30e9e0 commit e1d79ee
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 33 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* [ENHANCEMENT] Expose `/sync/mutex/wait/total:seconds` Go runtime metric as `go_sync_mutex_wait_total_seconds_total` from all components. #5879
* [ENHANCEMENT] Query-scheduler: improve latency with many concurrent queriers. #5880
* [ENHANCEMENT] Go: updated to 1.21.1. #5955
* [ENHANCEMENT] Ingester: added support for sampling errors, which can be enabled by setting `-ingester.error-sample-rate`. This way each error will be logged once in the configured number of times. #5584
* [ENHANCEMENT] Ingester: added support for sampling errors, which can be enabled by setting `-ingester.error-sample-rate`. This way each error will be logged once in the configured number of times. All the discarded samples will still be tracked by the `cortex_discarded_samples_total` metric. #5584 #6014
* [BUGFIX] Ingester: fix spurious `not found` errors on label values API during head compaction. #5957

### Mixin
Expand Down
26 changes: 15 additions & 11 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,8 @@ type Ingester struct {
maxOutOfOrderTimeWindowSecondsStat *expvar.Int

utilizationBasedLimiter utilizationBasedLimiter

errorSamplers ingesterErrSamplers
}

func newIngester(cfg Config, limits *validation.Overrides, registerer prometheus.Registerer, logger log.Logger) (*Ingester, error) {
Expand Down Expand Up @@ -322,6 +324,8 @@ func newIngester(cfg Config, limits *validation.Overrides, registerer prometheus
tenantsWithOutOfOrderEnabledStat: usagestats.GetAndResetInt(tenantsWithOutOfOrderEnabledStatName),
minOutOfOrderTimeWindowSecondsStat: usagestats.GetAndResetInt(minOutOfOrderTimeWindowSecondsStatName),
maxOutOfOrderTimeWindowSecondsStat: usagestats.GetAndResetInt(maxOutOfOrderTimeWindowSecondsStatName),

errorSamplers: newIngesterErrSamplers(cfg.ErrorSampleRate),
}, nil
}

Expand Down Expand Up @@ -365,7 +369,7 @@ func New(cfg Config, limits *validation.Overrides, activeGroupsCleanupService *u
i.lifecycler,
cfg.IngesterRing.ReplicationFactor,
cfg.IngesterRing.ZoneAwarenessEnabled,
cfg.ErrorSampleRate)
)

if cfg.ReadPathCPUUtilizationLimit > 0 || cfg.ReadPathMemoryUtilizationLimit > 0 {
i.utilizationBasedLimiter = limiter.NewUtilizationBasedLimiter(cfg.ReadPathCPUUtilizationLimit,
Expand Down Expand Up @@ -966,49 +970,49 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre
case storage.ErrOutOfBounds:
stats.sampleOutOfBoundsCount++
updateFirstPartial(func() error {
return i.limiter.samplers.sampleTimestampTooOld.WrapError(newIngestErrSampleTimestampTooOld(model.Time(timestamp), labels))
return i.errorSamplers.sampleTimestampTooOld.WrapError(newIngestErrSampleTimestampTooOld(model.Time(timestamp), labels))
})
return true

case storage.ErrOutOfOrderSample:
stats.sampleOutOfOrderCount++
updateFirstPartial(func() error {
return i.limiter.samplers.sampleOutOfOrder.WrapError(newIngestErrSampleOutOfOrder(model.Time(timestamp), labels))
return i.errorSamplers.sampleOutOfOrder.WrapError(newIngestErrSampleOutOfOrder(model.Time(timestamp), labels))
})
return true

case storage.ErrTooOldSample:
stats.sampleTooOldCount++
updateFirstPartial(func() error {
return i.limiter.samplers.sampleTimestampTooOldOOOEnabled.WrapError(newIngestErrSampleTimestampTooOldOOOEnabled(model.Time(timestamp), labels, outOfOrderWindow))
return i.errorSamplers.sampleTimestampTooOldOOOEnabled.WrapError(newIngestErrSampleTimestampTooOldOOOEnabled(model.Time(timestamp), labels, outOfOrderWindow))
})
return true

case globalerror.SampleTooFarInFuture:
stats.sampleTooFarInFutureCount++
updateFirstPartial(func() error {
return i.limiter.samplers.sampleTimestampTooFarInFuture.WrapError(newIngestErrSampleTimestampTooFarInFuture(model.Time(timestamp), labels))
return i.errorSamplers.sampleTimestampTooFarInFuture.WrapError(newIngestErrSampleTimestampTooFarInFuture(model.Time(timestamp), labels))
})
return true

case storage.ErrDuplicateSampleForTimestamp:
stats.newValueForTimestampCount++
updateFirstPartial(func() error {
return i.limiter.samplers.sampleDuplicateTimestamp.WrapError(newIngestErrSampleDuplicateTimestamp(model.Time(timestamp), labels))
return i.errorSamplers.sampleDuplicateTimestamp.WrapError(newIngestErrSampleDuplicateTimestamp(model.Time(timestamp), labels))
})
return true

case errMaxSeriesPerUserLimitExceeded:
stats.perUserSeriesLimitCount++
updateFirstPartial(func() error {
return i.limiter.samplers.maxSeriesPerUserLimitExceeded.WrapError(formatMaxSeriesPerUserError(i.limiter.limits, userID))
return i.errorSamplers.maxSeriesPerUserLimitExceeded.WrapError(formatMaxSeriesPerUserError(i.limiter.limits, userID))
})
return true

case errMaxSeriesPerMetricLimitExceeded:
stats.perMetricSeriesLimitCount++
updateFirstPartial(func() error {
return i.limiter.samplers.maxSeriesPerMetricLimitExceeded.WrapError(formatMaxSeriesPerMetricError(i.limiter.limits, mimirpb.FromLabelAdaptersToLabelsWithCopy(labels), userID))
return i.errorSamplers.maxSeriesPerMetricLimitExceeded.WrapError(formatMaxSeriesPerMetricError(i.limiter.limits, mimirpb.FromLabelAdaptersToLabelsWithCopy(labels), userID))
})
return true
}
Expand Down Expand Up @@ -1049,7 +1053,7 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre
}

updateFirstPartial(func() error {
return i.limiter.samplers.sampleTimestampTooOld.WrapError(newIngestErrSampleTimestampTooOld(model.Time(firstTimestamp), ts.Labels))
return i.errorSamplers.sampleTimestampTooOld.WrapError(newIngestErrSampleTimestampTooOld(model.Time(firstTimestamp), ts.Labels))
})
continue
}
Expand All @@ -1064,7 +1068,7 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre
firstTimestamp := ts.Samples[0].TimestampMs

updateFirstPartial(func() error {
return i.limiter.samplers.sampleTimestampTooOld.WrapError(newIngestErrSampleTimestampTooOld(model.Time(firstTimestamp), ts.Labels))
return i.errorSamplers.sampleTimestampTooOld.WrapError(newIngestErrSampleTimestampTooOld(model.Time(firstTimestamp), ts.Labels))
})
continue
}
Expand Down Expand Up @@ -3162,7 +3166,7 @@ func (i *Ingester) getOrCreateUserMetadata(userID string) *userMetricsMetadata {
// Ensure it was not created between switching locks.
userMetadata, ok := i.usersMetadata[userID]
if !ok {
userMetadata = newMetadataMap(i.limiter, i.metrics, userID)
userMetadata = newMetadataMap(i.limiter, i.metrics, i.errorSamplers, userID)
i.usersMetadata[userID] = userMetadata
}
return userMetadata
Expand Down
48 changes: 40 additions & 8 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8453,12 +8453,13 @@ func TestIngester_SampledUserLimitExceeded(t *testing.T) {

// create a data dir that survives an ingester restart
dataDir := t.TempDir()
registry := prometheus.NewRegistry()

errorSampleRate := 5
ingesterCfg := defaultIngesterTestConfig(t)
ingesterCfg.IngesterRing.ReplicationFactor = 1
ingesterCfg.ErrorSampleRate = int64(errorSampleRate)
ing, err := prepareIngesterWithBlocksStorageAndLimits(t, ingesterCfg, limits, dataDir, nil)
ing, err := prepareIngesterWithBlocksStorageAndLimits(t, ingesterCfg, limits, dataDir, registry)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck
Expand Down Expand Up @@ -8516,10 +8517,13 @@ func TestIngester_SampledUserLimitExceeded(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, res)

expectedError := wrapWithUser(ing.limiter.samplers.maxSeriesPerUserLimitExceeded.WrapError(formatMaxSeriesPerUserError(ing.limiter.limits, userID)), userID)
expectedError := wrapWithUser(ing.errorSamplers.maxSeriesPerUserLimitExceeded.WrapError(formatMaxSeriesPerUserError(ing.limiter.limits, userID)), userID)
require.Error(t, expectedError)

// We push series hitting the max series limit too old samples than 2 times more than errorSampleRate.
// We push 2 times more than errorSampleRate series hitting the max-series-per-user limit, i.e., 10 series in total.
// As a result we expect to see:
// - only 2 related log entries,
// - all 10 samples as discarded.
for i := 0; i < 2*errorSampleRate; i++ {
// Append 2 series first, expect max-series-per-user error.
_, err = client.Push(ctx, mimirpb.ToWriteRequest([][]mimirpb.LabelAdapter{metricLabelAdapters1, metricLabelAdapters2}, []mimirpb.Sample{sample2, sample3}, nil, nil, mimirpb.API))
Expand All @@ -8529,8 +8533,20 @@ func TestIngester_SampledUserLimitExceeded(t *testing.T) {
require.Errorf(t, expectedError, status.Message())
}

// We expect to see 2 log entries realted to err-mimir-sample-timestamp-too-old
// We expect to see 2 log entries related to max-series-per-user error.
require.Equal(t, 2, logger.count(expectedError.Error()))

// We expect to see all 10 samples causing max-series-per-user error as discarded.
metricNames := []string{
"cortex_discarded_samples_total",
}
expectedMetrics := `
# HELP cortex_discarded_samples_total The total number of samples that were discarded.
# TYPE cortex_discarded_samples_total counter
cortex_discarded_samples_total{group="",reason="per_user_series_limit",user="1"} 10
`
err = testutil.GatherAndCompare(registry, strings.NewReader(expectedMetrics), metricNames...)
assert.NoError(t, err)
}

func TestIngester_SampledMetricLimitExceeded(t *testing.T) {
Expand All @@ -8540,12 +8556,13 @@ func TestIngester_SampledMetricLimitExceeded(t *testing.T) {

// create a data dir that survives an ingester restart
dataDir := t.TempDir()
registry := prometheus.NewRegistry()

errorSampleRate := 5
ingesterCfg := defaultIngesterTestConfig(t)
ingesterCfg.IngesterRing.ReplicationFactor = 1
ingesterCfg.ErrorSampleRate = int64(errorSampleRate)
ing, err := prepareIngesterWithBlocksStorageAndLimits(t, ingesterCfg, limits, dataDir, nil)
ing, err := prepareIngesterWithBlocksStorageAndLimits(t, ingesterCfg, limits, dataDir, registry)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck
Expand Down Expand Up @@ -8603,10 +8620,13 @@ func TestIngester_SampledMetricLimitExceeded(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, res)

expectedError := wrapWithUser(ing.limiter.samplers.maxSeriesPerUserLimitExceeded.WrapError(formatMaxSeriesPerMetricError(ing.limiter.limits, mimirpb.FromLabelAdaptersToLabels(metricLabelAdapters2), userID)), userID)
expectedError := wrapWithUser(ing.errorSamplers.maxSeriesPerUserLimitExceeded.WrapError(formatMaxSeriesPerMetricError(ing.limiter.limits, mimirpb.FromLabelAdaptersToLabels(metricLabelAdapters2), userID)), userID)
require.Error(t, expectedError)

// We push series hitting the max series limit too old samples than 2 times more than errorSampleRate.
// We push 2 times more than errorSampleRate series hitting the max-series-per-metric, i.e., 10 series in total.
// As a result we expect to see:
// - only 2 related log entries,
// - all 10 samples as discarded.
for i := 0; i < 2*errorSampleRate; i++ {
// Append 2 series first, expect max-series-per-user error.
_, err = client.Push(ctx, mimirpb.ToWriteRequest([][]mimirpb.LabelAdapter{metricLabelAdapters1, metricLabelAdapters2}, []mimirpb.Sample{sample2, sample3}, nil, nil, mimirpb.API))
Expand All @@ -8616,8 +8636,20 @@ func TestIngester_SampledMetricLimitExceeded(t *testing.T) {
require.Errorf(t, expectedError, status.Message())
}

// We expect to see 2 log entries realted to err-mimir-sample-timestamp-too-old
// We expect to see 2 log entries related to max-series-per-metric error.
require.Equal(t, 2, logger.count(expectedError.Error()))

// We expect to see all 10 samples causing max-series-per-metric error as discarded.
metricNames := []string{
"cortex_discarded_samples_total",
}
expectedMetrics := `
# HELP cortex_discarded_samples_total The total number of samples that were discarded.
# TYPE cortex_discarded_samples_total counter
cortex_discarded_samples_total{group="",reason="per_metric_series_limit",user="1"} 10
`
err = testutil.GatherAndCompare(registry, strings.NewReader(expectedMetrics), metricNames...)
assert.NoError(t, err)
}

type loggerWithBuffer struct {
Expand Down
3 changes: 0 additions & 3 deletions pkg/ingester/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ type Limiter struct {
ring RingCount
replicationFactor int
zoneAwarenessEnabled bool
samplers ingesterErrSamplers
}

// NewLimiter makes a new in-memory series limiter
Expand All @@ -45,14 +44,12 @@ func NewLimiter(
ring RingCount,
replicationFactor int,
zoneAwarenessEnabled bool,
errorSampleRate int64,
) *Limiter {
return &Limiter{
limits: limits,
ring: ring,
replicationFactor: replicationFactor,
zoneAwarenessEnabled: zoneAwarenessEnabled,
samplers: newIngesterErrSamplers(errorSampleRate),
}
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/ingester/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func runLimiterMaxFunctionTest(
overrides, err := validation.NewOverrides(limits, nil)
require.NoError(t, err)

limiter := NewLimiter(overrides, ring, testData.ringReplicationFactor, testData.ringZoneAwarenessEnabled, 0)
limiter := NewLimiter(overrides, ring, testData.ringReplicationFactor, testData.ringZoneAwarenessEnabled)
actual := runMaxFn(limiter)
assert.Equal(t, testData.expectedValue, actual)
})
Expand Down Expand Up @@ -352,7 +352,7 @@ func TestLimiter_AssertMaxSeriesPerMetric(t *testing.T) {
}, nil)
require.NoError(t, err)

limiter := NewLimiter(limits, ring, testData.ringReplicationFactor, false, 0)
limiter := NewLimiter(limits, ring, testData.ringReplicationFactor, false)
actual := limiter.IsWithinMaxSeriesPerMetric("test", testData.series)

assert.Equal(t, testData.expected, actual)
Expand Down Expand Up @@ -405,7 +405,7 @@ func TestLimiter_AssertMaxMetadataPerMetric(t *testing.T) {
}, nil)
require.NoError(t, err)

limiter := NewLimiter(limits, ring, testData.ringReplicationFactor, false, 0)
limiter := NewLimiter(limits, ring, testData.ringReplicationFactor, false)
actual := limiter.IsWithinMaxMetadataPerMetric("test", testData.metadata)

assert.Equal(t, testData.expected, actual)
Expand Down Expand Up @@ -459,7 +459,7 @@ func TestLimiter_AssertMaxSeriesPerUser(t *testing.T) {
}, nil)
require.NoError(t, err)

limiter := NewLimiter(limits, ring, testData.ringReplicationFactor, false, 0)
limiter := NewLimiter(limits, ring, testData.ringReplicationFactor, false)
actual := limiter.IsWithinMaxSeriesPerUser("test", testData.series)

assert.Equal(t, testData.expected, actual)
Expand Down Expand Up @@ -513,7 +513,7 @@ func TestLimiter_AssertMaxMetricsWithMetadataPerUser(t *testing.T) {
}, nil)
require.NoError(t, err)

limiter := NewLimiter(limits, ring, testData.ringReplicationFactor, false, 0)
limiter := NewLimiter(limits, ring, testData.ringReplicationFactor, false)
actual := limiter.IsWithinMaxMetricsWithMetadataPerUser("test", testData.metadata)

assert.Equal(t, testData.expected, actual)
Expand Down
9 changes: 6 additions & 3 deletions pkg/ingester/user_metrics_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@ type userMetricsMetadata struct {

mtx sync.RWMutex
metricToMetadata map[string]metricMetadataSet

errorSamplers ingesterErrSamplers
}

func newMetadataMap(l *Limiter, m *ingesterMetrics, userID string) *userMetricsMetadata {
func newMetadataMap(l *Limiter, m *ingesterMetrics, errorSamplers ingesterErrSamplers, userID string) *userMetricsMetadata {
return &userMetricsMetadata{
metricToMetadata: map[string]metricMetadataSet{},
limiter: l,
metrics: m,
errorSamplers: errorSamplers,
userID: userID,
}
}
Expand All @@ -47,15 +50,15 @@ func (mm *userMetricsMetadata) add(metric string, metadata *mimirpb.MetricMetada
// Verify that the user can create more metric metadata given we don't have a set for that metric name.
if !mm.limiter.IsWithinMaxMetricsWithMetadataPerUser(mm.userID, len(mm.metricToMetadata)) {
mm.metrics.discardedMetadataPerUserMetadataLimit.WithLabelValues(mm.userID).Inc()
return mm.limiter.samplers.maxMetadataPerUserLimitExceeded.WrapError(formatMaxMetadataPerUserError(mm.limiter.limits, mm.userID))
return mm.errorSamplers.maxMetadataPerUserLimitExceeded.WrapError(formatMaxMetadataPerUserError(mm.limiter.limits, mm.userID))
}
set = metricMetadataSet{}
mm.metricToMetadata[metric] = set
}

if !mm.limiter.IsWithinMaxMetadataPerMetric(mm.userID, len(set)) {
mm.metrics.discardedMetadataPerMetricMetadataLimit.WithLabelValues(mm.userID).Inc()
return mm.limiter.samplers.maxMetadataPerMetricLimitExceeded.WrapError(formatMaxMetadataPerMetricError(mm.limiter.limits, labels.FromStrings(labels.MetricName, metric), mm.userID))
return mm.errorSamplers.maxMetadataPerMetricLimitExceeded.WrapError(formatMaxMetadataPerMetricError(mm.limiter.limits, labels.FromStrings(labels.MetricName, metric), mm.userID))
}

// if we have seen this metadata before, it is a no-op and we don't need to change our metrics.
Expand Down
6 changes: 4 additions & 2 deletions pkg/ingester/user_metrics_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ func TestUserMetricsMetadata(t *testing.T) {
errContains string
}

errorSamplers := newIngesterErrSamplers(0)

tests := map[string]struct {
maxMetadataPerUser int
maxMetadataPerMetric int
Expand Down Expand Up @@ -81,7 +83,7 @@ func TestUserMetricsMetadata(t *testing.T) {
MaxGlobalMetadataPerMetric: testData.maxMetadataPerMetric,
}, nil)
require.NoError(t, err)
limiter := NewLimiter(limits, ring, 1, false, 0)
limiter := NewLimiter(limits, ring, 1, false)

// Mock metrics
metrics := newIngesterMetrics(
Expand All @@ -92,7 +94,7 @@ func TestUserMetricsMetadata(t *testing.T) {
nil,
)

mm := newMetadataMap(limiter, metrics, "test")
mm := newMetadataMap(limiter, metrics, errorSamplers, "test")

// Attempt to add all metadata
for _, i := range testData.inputMetadata {
Expand Down

0 comments on commit e1d79ee

Please sign in to comment.