Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move error samplers from Limiter to Ingester #6014

Merged
merged 4 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* [ENHANCEMENT] Expose `/sync/mutex/wait/total:seconds` Go runtime metric as `go_sync_mutex_wait_total_seconds_total` from all components. #5879
* [ENHANCEMENT] Query-scheduler: improve latency with many concurrent queriers. #5880
* [ENHANCEMENT] Go: updated to 1.21.1. #5955
* [ENHANCEMENT] Ingester: added support for sampling errors, which can be enabled by setting `-ingester.error-sample-rate`. This way each error will be logged once in the configured number of times. #5584
* [ENHANCEMENT] Ingester: added support for sampling errors, which can be enabled by setting `-ingester.error-sample-rate`. This way each error will be logged once in the configured number of times, but all the samples causing an error will be tracked by the `cortex_discarded_samples_total` counter. #5584
duricanikolic marked this conversation as resolved.
Show resolved Hide resolved
* [BUGFIX] Ingester: fix spurious `not found` errors on label values API during head compaction. #5957

### Mixin
Expand Down
26 changes: 15 additions & 11 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,8 @@ type Ingester struct {
maxOutOfOrderTimeWindowSecondsStat *expvar.Int

utilizationBasedLimiter utilizationBasedLimiter

errorSamplers ingesterErrSamplers
}

func newIngester(cfg Config, limits *validation.Overrides, registerer prometheus.Registerer, logger log.Logger) (*Ingester, error) {
Expand Down Expand Up @@ -322,6 +324,8 @@ func newIngester(cfg Config, limits *validation.Overrides, registerer prometheus
tenantsWithOutOfOrderEnabledStat: usagestats.GetAndResetInt(tenantsWithOutOfOrderEnabledStatName),
minOutOfOrderTimeWindowSecondsStat: usagestats.GetAndResetInt(minOutOfOrderTimeWindowSecondsStatName),
maxOutOfOrderTimeWindowSecondsStat: usagestats.GetAndResetInt(maxOutOfOrderTimeWindowSecondsStatName),

errorSamplers: newIngesterErrSamplers(cfg.ErrorSampleRate),
}, nil
}

Expand Down Expand Up @@ -365,7 +369,7 @@ func New(cfg Config, limits *validation.Overrides, activeGroupsCleanupService *u
i.lifecycler,
cfg.IngesterRing.ReplicationFactor,
cfg.IngesterRing.ZoneAwarenessEnabled,
cfg.ErrorSampleRate)
)
duricanikolic marked this conversation as resolved.
Show resolved Hide resolved

if cfg.ReadPathCPUUtilizationLimit > 0 || cfg.ReadPathMemoryUtilizationLimit > 0 {
i.utilizationBasedLimiter = limiter.NewUtilizationBasedLimiter(cfg.ReadPathCPUUtilizationLimit,
Expand Down Expand Up @@ -966,49 +970,49 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre
case storage.ErrOutOfBounds:
stats.sampleOutOfBoundsCount++
updateFirstPartial(func() error {
return i.limiter.samplers.sampleTimestampTooOld.WrapError(newIngestErrSampleTimestampTooOld(model.Time(timestamp), labels))
return i.errorSamplers.sampleTimestampTooOld.WrapError(newIngestErrSampleTimestampTooOld(model.Time(timestamp), labels))
})
return true

case storage.ErrOutOfOrderSample:
stats.sampleOutOfOrderCount++
updateFirstPartial(func() error {
return i.limiter.samplers.sampleOutOfOrder.WrapError(newIngestErrSampleOutOfOrder(model.Time(timestamp), labels))
return i.errorSamplers.sampleOutOfOrder.WrapError(newIngestErrSampleOutOfOrder(model.Time(timestamp), labels))
})
return true

case storage.ErrTooOldSample:
stats.sampleTooOldCount++
updateFirstPartial(func() error {
return i.limiter.samplers.sampleTimestampTooOldOOOEnabled.WrapError(newIngestErrSampleTimestampTooOldOOOEnabled(model.Time(timestamp), labels, outOfOrderWindow))
return i.errorSamplers.sampleTimestampTooOldOOOEnabled.WrapError(newIngestErrSampleTimestampTooOldOOOEnabled(model.Time(timestamp), labels, outOfOrderWindow))
})
return true

case globalerror.SampleTooFarInFuture:
stats.sampleTooFarInFutureCount++
updateFirstPartial(func() error {
return i.limiter.samplers.sampleTimestampTooFarInFuture.WrapError(newIngestErrSampleTimestampTooFarInFuture(model.Time(timestamp), labels))
return i.errorSamplers.sampleTimestampTooFarInFuture.WrapError(newIngestErrSampleTimestampTooFarInFuture(model.Time(timestamp), labels))
})
return true

case storage.ErrDuplicateSampleForTimestamp:
stats.newValueForTimestampCount++
updateFirstPartial(func() error {
return i.limiter.samplers.sampleDuplicateTimestamp.WrapError(newIngestErrSampleDuplicateTimestamp(model.Time(timestamp), labels))
return i.errorSamplers.sampleDuplicateTimestamp.WrapError(newIngestErrSampleDuplicateTimestamp(model.Time(timestamp), labels))
})
return true

case errMaxSeriesPerUserLimitExceeded:
stats.perUserSeriesLimitCount++
updateFirstPartial(func() error {
return i.limiter.samplers.maxSeriesPerUserLimitExceeded.WrapError(formatMaxSeriesPerUserError(i.limiter.limits, userID))
return i.errorSamplers.maxSeriesPerUserLimitExceeded.WrapError(formatMaxSeriesPerUserError(i.limiter.limits, userID))
})
return true

case errMaxSeriesPerMetricLimitExceeded:
stats.perMetricSeriesLimitCount++
updateFirstPartial(func() error {
return i.limiter.samplers.maxSeriesPerMetricLimitExceeded.WrapError(formatMaxSeriesPerMetricError(i.limiter.limits, mimirpb.FromLabelAdaptersToLabelsWithCopy(labels), userID))
return i.errorSamplers.maxSeriesPerMetricLimitExceeded.WrapError(formatMaxSeriesPerMetricError(i.limiter.limits, mimirpb.FromLabelAdaptersToLabelsWithCopy(labels), userID))
})
return true
}
Expand Down Expand Up @@ -1049,7 +1053,7 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre
}

updateFirstPartial(func() error {
return i.limiter.samplers.sampleTimestampTooOld.WrapError(newIngestErrSampleTimestampTooOld(model.Time(firstTimestamp), ts.Labels))
return i.errorSamplers.sampleTimestampTooOld.WrapError(newIngestErrSampleTimestampTooOld(model.Time(firstTimestamp), ts.Labels))
})
continue
}
Expand All @@ -1064,7 +1068,7 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre
firstTimestamp := ts.Samples[0].TimestampMs

updateFirstPartial(func() error {
return i.limiter.samplers.sampleTimestampTooOld.WrapError(newIngestErrSampleTimestampTooOld(model.Time(firstTimestamp), ts.Labels))
return i.errorSamplers.sampleTimestampTooOld.WrapError(newIngestErrSampleTimestampTooOld(model.Time(firstTimestamp), ts.Labels))
})
continue
}
Expand Down Expand Up @@ -3162,7 +3166,7 @@ func (i *Ingester) getOrCreateUserMetadata(userID string) *userMetricsMetadata {
// Ensure it was not created between switching locks.
userMetadata, ok := i.usersMetadata[userID]
if !ok {
userMetadata = newMetadataMap(i.limiter, i.metrics, userID)
userMetadata = newMetadataMap(i.limiter, i.metrics, i.errorSamplers, userID)
i.usersMetadata[userID] = userMetadata
}
return userMetadata
Expand Down
48 changes: 40 additions & 8 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8453,12 +8453,13 @@ func TestIngester_SampledUserLimitExceeded(t *testing.T) {

// create a data dir that survives an ingester restart
dataDir := t.TempDir()
registry := prometheus.NewRegistry()

errorSampleRate := 5
ingesterCfg := defaultIngesterTestConfig(t)
ingesterCfg.IngesterRing.ReplicationFactor = 1
ingesterCfg.ErrorSampleRate = int64(errorSampleRate)
ing, err := prepareIngesterWithBlocksStorageAndLimits(t, ingesterCfg, limits, dataDir, nil)
ing, err := prepareIngesterWithBlocksStorageAndLimits(t, ingesterCfg, limits, dataDir, registry)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck
Expand Down Expand Up @@ -8516,10 +8517,13 @@ func TestIngester_SampledUserLimitExceeded(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, res)

expectedError := wrapWithUser(ing.limiter.samplers.maxSeriesPerUserLimitExceeded.WrapError(formatMaxSeriesPerUserError(ing.limiter.limits, userID)), userID)
expectedError := wrapWithUser(ing.errorSamplers.maxSeriesPerUserLimitExceeded.WrapError(formatMaxSeriesPerUserError(ing.limiter.limits, userID)), userID)
require.Error(t, expectedError)

// We push series hitting the max series limit too old samples than 2 times more than errorSampleRate.
// We push 2 times more than errorSampleRate series hitting the max-series-per-user limit, i.e., 10 series in total.
// As a result we expect to see:
// - only 2 related log entries,
// - all 10 samples as discarded.
for i := 0; i < 2*errorSampleRate; i++ {
// Append 2 series first, expect max-series-per-user error.
_, err = client.Push(ctx, mimirpb.ToWriteRequest([][]mimirpb.LabelAdapter{metricLabelAdapters1, metricLabelAdapters2}, []mimirpb.Sample{sample2, sample3}, nil, nil, mimirpb.API))
Expand All @@ -8529,8 +8533,20 @@ func TestIngester_SampledUserLimitExceeded(t *testing.T) {
require.Errorf(t, expectedError, status.Message())
}

// We expect to see 2 log entries realted to err-mimir-sample-timestamp-too-old
// We expect to see 2 log entries related to max-series-per-user error.
require.Equal(t, 2, logger.count(expectedError.Error()))

// We expect to see all 10 samples causing max-series-per-user error as discarded.
metricNames := []string{
"cortex_discarded_samples_total",
}
expectedMetrics := `
# HELP cortex_discarded_samples_total The total number of samples that were discarded.
# TYPE cortex_discarded_samples_total counter
cortex_discarded_samples_total{group="",reason="per_user_series_limit",user="1"} 10
`
err = testutil.GatherAndCompare(registry, strings.NewReader(expectedMetrics), metricNames...)
assert.NoError(t, err)
}

func TestIngester_SampledMetricLimitExceeded(t *testing.T) {
Expand All @@ -8540,12 +8556,13 @@ func TestIngester_SampledMetricLimitExceeded(t *testing.T) {

// create a data dir that survives an ingester restart
dataDir := t.TempDir()
registry := prometheus.NewRegistry()

errorSampleRate := 5
ingesterCfg := defaultIngesterTestConfig(t)
ingesterCfg.IngesterRing.ReplicationFactor = 1
ingesterCfg.ErrorSampleRate = int64(errorSampleRate)
ing, err := prepareIngesterWithBlocksStorageAndLimits(t, ingesterCfg, limits, dataDir, nil)
ing, err := prepareIngesterWithBlocksStorageAndLimits(t, ingesterCfg, limits, dataDir, registry)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck
Expand Down Expand Up @@ -8603,10 +8620,13 @@ func TestIngester_SampledMetricLimitExceeded(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, res)

expectedError := wrapWithUser(ing.limiter.samplers.maxSeriesPerUserLimitExceeded.WrapError(formatMaxSeriesPerMetricError(ing.limiter.limits, mimirpb.FromLabelAdaptersToLabels(metricLabelAdapters2), userID)), userID)
expectedError := wrapWithUser(ing.errorSamplers.maxSeriesPerUserLimitExceeded.WrapError(formatMaxSeriesPerMetricError(ing.limiter.limits, mimirpb.FromLabelAdaptersToLabels(metricLabelAdapters2), userID)), userID)
require.Error(t, expectedError)

// We push series hitting the max series limit too old samples than 2 times more than errorSampleRate.
// We push 2 times more than errorSampleRate series hitting the max-series-per-metric, i.e., 10 series in total.
// As a result we expect to see:
// - only 2 related log entries,
// - all 10 samples as discarded.
for i := 0; i < 2*errorSampleRate; i++ {
// Append 2 series first, expect max-series-per-user error.
_, err = client.Push(ctx, mimirpb.ToWriteRequest([][]mimirpb.LabelAdapter{metricLabelAdapters1, metricLabelAdapters2}, []mimirpb.Sample{sample2, sample3}, nil, nil, mimirpb.API))
Expand All @@ -8616,8 +8636,20 @@ func TestIngester_SampledMetricLimitExceeded(t *testing.T) {
require.Errorf(t, expectedError, status.Message())
}

// We expect to see 2 log entries realted to err-mimir-sample-timestamp-too-old
// We expect to see 2 log entries related to max-series-per-metric error.
require.Equal(t, 2, logger.count(expectedError.Error()))

// We expect to see all 10 samples causing max-series-per-metric error as discarded.
metricNames := []string{
"cortex_discarded_samples_total",
}
expectedMetrics := `
# HELP cortex_discarded_samples_total The total number of samples that were discarded.
# TYPE cortex_discarded_samples_total counter
cortex_discarded_samples_total{group="",reason="per_metric_series_limit",user="1"} 10
`
err = testutil.GatherAndCompare(registry, strings.NewReader(expectedMetrics), metricNames...)
assert.NoError(t, err)
}

type loggerWithBuffer struct {
Expand Down
3 changes: 0 additions & 3 deletions pkg/ingester/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ type Limiter struct {
ring RingCount
replicationFactor int
zoneAwarenessEnabled bool
samplers ingesterErrSamplers
}

// NewLimiter makes a new in-memory series limiter
Expand All @@ -45,14 +44,12 @@ func NewLimiter(
ring RingCount,
replicationFactor int,
zoneAwarenessEnabled bool,
errorSampleRate int64,
) *Limiter {
return &Limiter{
limits: limits,
ring: ring,
replicationFactor: replicationFactor,
zoneAwarenessEnabled: zoneAwarenessEnabled,
samplers: newIngesterErrSamplers(errorSampleRate),
}
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/ingester/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func runLimiterMaxFunctionTest(
overrides, err := validation.NewOverrides(limits, nil)
require.NoError(t, err)

limiter := NewLimiter(overrides, ring, testData.ringReplicationFactor, testData.ringZoneAwarenessEnabled, 0)
limiter := NewLimiter(overrides, ring, testData.ringReplicationFactor, testData.ringZoneAwarenessEnabled)
actual := runMaxFn(limiter)
assert.Equal(t, testData.expectedValue, actual)
})
Expand Down Expand Up @@ -352,7 +352,7 @@ func TestLimiter_AssertMaxSeriesPerMetric(t *testing.T) {
}, nil)
require.NoError(t, err)

limiter := NewLimiter(limits, ring, testData.ringReplicationFactor, false, 0)
limiter := NewLimiter(limits, ring, testData.ringReplicationFactor, false)
actual := limiter.IsWithinMaxSeriesPerMetric("test", testData.series)

assert.Equal(t, testData.expected, actual)
Expand Down Expand Up @@ -405,7 +405,7 @@ func TestLimiter_AssertMaxMetadataPerMetric(t *testing.T) {
}, nil)
require.NoError(t, err)

limiter := NewLimiter(limits, ring, testData.ringReplicationFactor, false, 0)
limiter := NewLimiter(limits, ring, testData.ringReplicationFactor, false)
actual := limiter.IsWithinMaxMetadataPerMetric("test", testData.metadata)

assert.Equal(t, testData.expected, actual)
Expand Down Expand Up @@ -459,7 +459,7 @@ func TestLimiter_AssertMaxSeriesPerUser(t *testing.T) {
}, nil)
require.NoError(t, err)

limiter := NewLimiter(limits, ring, testData.ringReplicationFactor, false, 0)
limiter := NewLimiter(limits, ring, testData.ringReplicationFactor, false)
actual := limiter.IsWithinMaxSeriesPerUser("test", testData.series)

assert.Equal(t, testData.expected, actual)
Expand Down Expand Up @@ -513,7 +513,7 @@ func TestLimiter_AssertMaxMetricsWithMetadataPerUser(t *testing.T) {
}, nil)
require.NoError(t, err)

limiter := NewLimiter(limits, ring, testData.ringReplicationFactor, false, 0)
limiter := NewLimiter(limits, ring, testData.ringReplicationFactor, false)
actual := limiter.IsWithinMaxMetricsWithMetadataPerUser("test", testData.metadata)

assert.Equal(t, testData.expected, actual)
Expand Down
9 changes: 6 additions & 3 deletions pkg/ingester/user_metrics_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@ type userMetricsMetadata struct {

mtx sync.RWMutex
metricToMetadata map[string]metricMetadataSet

errorSamplers ingesterErrSamplers
}

func newMetadataMap(l *Limiter, m *ingesterMetrics, userID string) *userMetricsMetadata {
func newMetadataMap(l *Limiter, m *ingesterMetrics, errorSamplers ingesterErrSamplers, userID string) *userMetricsMetadata {
return &userMetricsMetadata{
metricToMetadata: map[string]metricMetadataSet{},
limiter: l,
metrics: m,
errorSamplers: errorSamplers,
userID: userID,
}
}
Expand All @@ -47,15 +50,15 @@ func (mm *userMetricsMetadata) add(metric string, metadata *mimirpb.MetricMetada
// Verify that the user can create more metric metadata given we don't have a set for that metric name.
if !mm.limiter.IsWithinMaxMetricsWithMetadataPerUser(mm.userID, len(mm.metricToMetadata)) {
mm.metrics.discardedMetadataPerUserMetadataLimit.WithLabelValues(mm.userID).Inc()
return mm.limiter.samplers.maxMetadataPerUserLimitExceeded.WrapError(formatMaxMetadataPerUserError(mm.limiter.limits, mm.userID))
return mm.errorSamplers.maxMetadataPerUserLimitExceeded.WrapError(formatMaxMetadataPerUserError(mm.limiter.limits, mm.userID))
}
set = metricMetadataSet{}
mm.metricToMetadata[metric] = set
}

if !mm.limiter.IsWithinMaxMetadataPerMetric(mm.userID, len(set)) {
mm.metrics.discardedMetadataPerMetricMetadataLimit.WithLabelValues(mm.userID).Inc()
return mm.limiter.samplers.maxMetadataPerMetricLimitExceeded.WrapError(formatMaxMetadataPerMetricError(mm.limiter.limits, labels.FromStrings(labels.MetricName, metric), mm.userID))
return mm.errorSamplers.maxMetadataPerMetricLimitExceeded.WrapError(formatMaxMetadataPerMetricError(mm.limiter.limits, labels.FromStrings(labels.MetricName, metric), mm.userID))
}

// if we have seen this metadata before, it is a no-op and we don't need to change our metrics.
Expand Down
6 changes: 4 additions & 2 deletions pkg/ingester/user_metrics_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ func TestUserMetricsMetadata(t *testing.T) {
errContains string
}

errorSamplers := newIngesterErrSamplers(0)

tests := map[string]struct {
maxMetadataPerUser int
maxMetadataPerMetric int
Expand Down Expand Up @@ -81,7 +83,7 @@ func TestUserMetricsMetadata(t *testing.T) {
MaxGlobalMetadataPerMetric: testData.maxMetadataPerMetric,
}, nil)
require.NoError(t, err)
limiter := NewLimiter(limits, ring, 1, false, 0)
limiter := NewLimiter(limits, ring, 1, false)

// Mock metrics
metrics := newIngesterMetrics(
Expand All @@ -92,7 +94,7 @@ func TestUserMetricsMetadata(t *testing.T) {
nil,
)

mm := newMetadataMap(limiter, metrics, "test")
mm := newMetadataMap(limiter, metrics, errorSamplers, "test")

// Attempt to add all metadata
for _, i := range testData.inputMetadata {
Expand Down
Loading