Skip to content

Commit

Permalink
Revert to "0 means disabled"
Browse files Browse the repository at this point in the history
Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
  • Loading branch information
colega committed Jun 4, 2024
1 parent 086e63c commit 2981c05
Show file tree
Hide file tree
Showing 11 changed files with 305 additions and 364 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
* Query results caching should be more stable as all equivalent queries receive the same cache key, but there may be cache churn on first deploy with the updated format
* Query blocking can no longer be circumvented with an equivalent query in a different format; see [Configure queries to block](https://grafana.com/docs/mimir/latest/configure/configure-blocked-queries/)
* [CHANGE] Query-frontend: stop using `-validation.create-grace-period` to clamp how far into the future a query can span.
* [CHANGE] Distributor, ingester: add new setting `-validation.past-grace-period` to limit how old (based on the wall clock minus OOO window) the ingested samples can be. #8262
* [CHANGE] Clamp [`GOMAXPROCS`](https://pkg.go.dev/runtime#GOMAXPROCS) to [`runtime.NumCPU`](https://pkg.go.dev/runtime#NumCPU). #8201
* [FEATURE] Continuous-test: now runable as a module with `mimir -target=continuous-test`. #7747
* [FEATURE] Store-gateway: Allow specific tenants to be enabled or disabled via `-store-gateway.enabled-tenants` or `-store-gateway.disabled-tenants` CLI flags or their corresponding YAML settings. #7653
Expand All @@ -23,6 +22,7 @@
* [FEATURE] Server: added experimental [PROXY protocol support](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt). The PROXY protocol support can be enabled via `-server.proxy-protocol-enabled=true`. When enabled, the support is added both to HTTP and gRPC listening ports. #7698
* [FEATURE] mimirtool: Add `runtime-config verify` sub-command, for verifying Mimir runtime config files. #8123
* [FEATURE] Query-frontend, querier: new experimental `/cardinality/active_native_histogram_metrics` API to get active native histogram metric names with statistics about active native histogram buckets. #7982 #7986 #8008
* [FEATURE] Distributor, ingester: add new setting `-validation.past-grace-period` to limit how old (based on the wall clock minus OOO window) the ingested samples can be. The default 0 value disables this limit. #8262
* [ENHANCEMENT] Reduced memory allocations in functions used to propagate contextual information between gRPC calls. #7529
* [ENHANCEMENT] Distributor: add experimental limit for exemplars per series per request, enabled with `-distributor.max-exemplars-per-series-per-request`, the number of discarded exemplars are tracked with `cortex_discarded_exemplars_total{reason="too_many_exemplars_per_series_per_request"}` #7989 #8010
* [ENHANCEMENT] Store-gateway: merge series from different blocks concurrently. #7456
Expand Down
88 changes: 41 additions & 47 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,6 @@ var (
generateTestFloatHistogram = util_test.GenerateTestFloatHistogram
)

const (
testPastGracePeriod = 100 * 365 * 24 * time.Hour // 100 years, as many tests push samples around unix time 0 and we don't want to keep fixing this tests again and again.
)

func TestConfig_Validate(t *testing.T) {
tests := map[string]struct {
initLimits func(*validation.Limits)
Expand Down Expand Up @@ -1014,15 +1010,16 @@ func TestDistributor_PushHAInstances(t *testing.T) {
},
} {
t.Run(strconv.Itoa(i), func(t *testing.T) {
limits := prepareDefaultLimits()
var limits validation.Limits
flagext.DefaultValues(&limits)
limits.AcceptHASamples = true
limits.MaxLabelValueLength = 15

ds, _, _, _ := prepare(t, prepConfig{
numIngesters: 3,
happyIngesters: 3,
numDistributors: 1,
limits: limits,
limits: &limits,
enableTracker: tc.enableTracker,
})

Expand Down Expand Up @@ -1271,15 +1268,16 @@ func TestDistributor_Push_LabelRemoval(t *testing.T) {

for _, tc := range cases {
var err error
limits := prepareDefaultLimits()
var limits validation.Limits
flagext.DefaultValues(&limits)
limits.DropLabels = tc.removeLabels
limits.AcceptHASamples = tc.removeReplica

ds, ingesters, _, _ := prepare(t, prepConfig{
numIngesters: 2,
happyIngesters: 2,
numDistributors: 1,
limits: limits,
limits: &limits,
})

// Push the series to the distributor
Expand Down Expand Up @@ -1335,7 +1333,8 @@ func TestDistributor_Push_ShouldGuaranteeShardingTokenConsistencyOverTheTime(t *
},
}

limits := prepareDefaultLimits()
var limits validation.Limits
flagext.DefaultValues(&limits)
limits.DropLabels = []string{"dropped"}
limits.AcceptHASamples = true

Expand All @@ -1345,7 +1344,7 @@ func TestDistributor_Push_ShouldGuaranteeShardingTokenConsistencyOverTheTime(t *
numIngesters: 2,
happyIngesters: 2,
numDistributors: 1,
limits: limits,
limits: &limits,
})

// Push the series to the distributor
Expand Down Expand Up @@ -2089,26 +2088,6 @@ func BenchmarkDistributor_Push(b *testing.B) {
},
expectedErr: "received a sample whose timestamp is too far in the future",
},
"timestamp too far in the past": {
prepareConfig: func(limits *validation.Limits) {
limits.PastGracePeriod = model.Duration(7 * 24 * time.Hour)
},
prepareSeries: func() ([][]mimirpb.LabelAdapter, []mimirpb.Sample) {
metrics := make([][]mimirpb.LabelAdapter, numSeriesPerRequest)
samples := make([]mimirpb.Sample, numSeriesPerRequest)

for i := 0; i < numSeriesPerRequest; i++ {
metrics[i] = mkLabels(10)
samples[i] = mimirpb.Sample{
Value: float64(i),
TimestampMs: time.Now().Add(-200*time.Hour).UnixNano() / int64(time.Millisecond),
}
}

return metrics, samples
},
expectedErr: "received a sample whose timestamp is too far in the past",
},
"all samples go to metric_relabel_configs": {
prepareConfig: func(limits *validation.Limits) {
limits.MetricRelabelConfigs = []*relabel.Config{
Expand Down Expand Up @@ -3367,13 +3346,14 @@ func TestDistributor_LabelNamesAndValuesLimitTest(t *testing.T) {
ctx = api.ContextWithReadConsistency(ctx, api.ReadConsistencyStrong)

// Create distributor
limits := prepareDefaultLimits()
limits := validation.Limits{}
flagext.DefaultValues(&limits)
limits.LabelNamesAndValuesResultsMaxSizeBytes = testData.sizeLimitBytes
ds, _, _, _ := prepare(t, prepConfig{
numIngesters: 3,
happyIngesters: 3,
numDistributors: 1,
limits: limits,
limits: &limits,
ingestStorageEnabled: ingestStorageEnabled,
})

Expand Down Expand Up @@ -4525,13 +4505,14 @@ func TestDistributor_LabelValuesCardinality_Limit(t *testing.T) {
for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
// Create distributor
limits := prepareDefaultLimits()
limits := validation.Limits{}
flagext.DefaultValues(&limits)
limits.LabelValuesMaxCardinalityLabelNamesPerRequest = testData.maxLabelNamesPerRequest
ds, _, _, _ := prepare(t, prepConfig{
numIngesters: 3,
happyIngesters: 3,
numDistributors: 1,
limits: limits,
limits: &limits,
})

// Push fixtures
Expand Down Expand Up @@ -4759,14 +4740,15 @@ func TestInstanceLimitsBeforeHaDedupe(t *testing.T) {
}

// Setup limits with HA enabled and forwarding rules for the metric "foo".
limits := prepareDefaultLimits()
var limits validation.Limits
flagext.DefaultValues(&limits)
limits.AcceptHASamples = true
limits.MaxLabelValueLength = 15

// Prepare distributor and wrap the mock push function with its middlewares.
ds, _, _, _ := prepare(t, prepConfig{
numDistributors: 1,
limits: limits,
limits: &limits,
enableTracker: true,
configure: func(config *Config) {
config.DefaultLimits.MaxInflightPushRequests = 1
Expand Down Expand Up @@ -5376,7 +5358,6 @@ func preparePartitionsRing(cfg prepConfig, ingesters []*mockIngester) *ring.Part
func prepareDefaultLimits() *validation.Limits {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.PastGracePeriod = model.Duration(testPastGracePeriod)
return limits
}

Expand Down Expand Up @@ -6781,6 +6762,7 @@ func TestDistributorValidation(t *testing.T) {
labels [][]mimirpb.LabelAdapter
samples []mimirpb.Sample
exemplars []*mimirpb.Exemplar
limits func(limits *validation.Limits)
expectedErr *status.Status
}{
"validation passes": {
Expand Down Expand Up @@ -6818,13 +6800,16 @@ func TestDistributorValidation(t *testing.T) {
expectedErr: status.New(codes.FailedPrecondition, fmt.Sprintf(sampleTimestampTooNewMsgFormat, future, "testmetric")),
},

"validation does not fail for samples from the past without past_grace_period setting": {
labels: [][]mimirpb.LabelAdapter{{{Name: "foo", Value: "bar"}, {Name: labels.MetricName, Value: "testmetric"}}},
samples: []mimirpb.Sample{{TimestampMs: int64(past), Value: 1}},
},

"validation fails for samples from the past": {
labels: [][]mimirpb.LabelAdapter{{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}}},
samples: []mimirpb.Sample{{
TimestampMs: int64(past.Add(-testPastGracePeriod)),
Value: 4,
}},
expectedErr: status.New(codes.FailedPrecondition, fmt.Sprintf(sampleTimestampTooOldMsgFormat, past.Add(-testPastGracePeriod), "testmetric")),
labels: [][]mimirpb.LabelAdapter{{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}}},
samples: []mimirpb.Sample{{TimestampMs: int64(past), Value: 4}},
limits: func(limits *validation.Limits) { limits.PastGracePeriod = model.Duration(now.Sub(past) / 2) },
expectedErr: status.New(codes.FailedPrecondition, fmt.Sprintf(sampleTimestampTooOldMsgFormat, past, "testmetric")),
},

"exceeds maximum labels per series": {
Expand Down Expand Up @@ -6903,6 +6888,9 @@ func TestDistributorValidation(t *testing.T) {
limits.CreationGracePeriod = model.Duration(2 * time.Hour)
limits.MaxLabelNamesPerSeries = 2
limits.MaxGlobalExemplarsPerUser = 10
if tc.limits != nil {
tc.limits(&limits)
}

ds, _, _, _ := prepare(t, prepConfig{
numIngesters: 3,
Expand Down Expand Up @@ -6956,14 +6944,15 @@ func TestDistributor_Push_Relabel(t *testing.T) {

for _, tc := range cases {
var err error
limits := prepareDefaultLimits()
var limits validation.Limits
flagext.DefaultValues(&limits)
limits.MetricRelabelConfigs = tc.metricRelabelConfigs

ds, ingesters, _, _ := prepare(t, prepConfig{
numIngesters: 2,
happyIngesters: 2,
numDistributors: 1,
limits: limits,
limits: &limits,
})

// Push the series to the distributor
Expand Down Expand Up @@ -7001,11 +6990,12 @@ func TestDistributor_MetricsWithRequestModifications(t *testing.T) {
return user.InjectOrgID(context.Background(), tenant)
}
getDefaultConfig := func() prepConfig {
limits := prepareDefaultLimits()
var limits validation.Limits
flagext.DefaultValues(&limits)
limits.MaxGlobalExemplarsPerUser = 1000

return prepConfig{
limits: limits,
limits: &limits,
numIngesters: 1,
happyIngesters: 1,
replicationFactor: 1,
Expand Down Expand Up @@ -7794,9 +7784,13 @@ func TestStartFinishRequest(t *testing.T) {
t.Run(name, func(t *testing.T) {
pushReq := makeWriteRequestForGenerators(1, uniqueMetricsGen, nil, nil)

var limits validation.Limits
flagext.DefaultValues(&limits)

// Prepare distributor and wrap the mock push function with its middlewares.
ds, _, _, _ := prepare(t, prepConfig{
numDistributors: 1,
limits: &limits,
enableTracker: true,
configure: func(config *Config) {
config.DefaultLimits.MaxIngestionRate = ingestionRateLimit
Expand Down
4 changes: 2 additions & 2 deletions pkg/distributor/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func validateSample(m *sampleValidationMetrics, now model.Time, cfg sampleValida
return fmt.Errorf(sampleTimestampTooNewMsgFormat, s.TimestampMs, unsafeMetricName)
}

if model.Time(s.TimestampMs) < now.Add(-cfg.PastGracePeriod(userID)).Add(-cfg.OutOfOrderTimeWindow(userID)) {
if cfg.PastGracePeriod(userID) > 0 && model.Time(s.TimestampMs) < now.Add(-cfg.PastGracePeriod(userID)).Add(-cfg.OutOfOrderTimeWindow(userID)) {
m.tooFarInPast.WithLabelValues(userID, group).Inc()
unsafeMetricName, _ := extract.UnsafeMetricNameFromLabelAdapters(ls)
return fmt.Errorf(sampleTimestampTooOldMsgFormat, s.TimestampMs, unsafeMetricName)
Expand All @@ -241,7 +241,7 @@ func validateSampleHistogram(m *sampleValidationMetrics, now model.Time, cfg sam
return false, fmt.Errorf(sampleTimestampTooNewMsgFormat, s.Timestamp, unsafeMetricName)
}

if model.Time(s.Timestamp) < now.Add(-cfg.PastGracePeriod(userID)).Add(-cfg.OutOfOrderTimeWindow(userID)) {
if cfg.PastGracePeriod(userID) > 0 && model.Time(s.Timestamp) < now.Add(-cfg.PastGracePeriod(userID)).Add(-cfg.OutOfOrderTimeWindow(userID)) {
m.tooFarInPast.WithLabelValues(userID, group).Inc()
unsafeMetricName, _ := extract.UnsafeMetricNameFromLabelAdapters(ls)
return false, fmt.Errorf(sampleTimestampTooOldMsgFormat, s.Timestamp, unsafeMetricName)
Expand Down
14 changes: 10 additions & 4 deletions pkg/distributor/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,9 +394,17 @@ type sampleValidationCfg struct {
reduceNativeHistogramOverMaxBuckets bool
}

func (c sampleValidationCfg) CreationGracePeriod(_ string) time.Duration { return 0 }
func (c sampleValidationCfg) CreationGracePeriod(_ string) time.Duration {
return 0
}

func (c sampleValidationCfg) PastGracePeriod(_ string) time.Duration { return testPastGracePeriod }
func (c sampleValidationCfg) PastGracePeriod(_ string) time.Duration {
return 0
}

func (c sampleValidationCfg) OutOfOrderTimeWindow(_ string) time.Duration {
return 0
}

func (c sampleValidationCfg) MaxNativeHistogramBuckets(_ string) int {
return c.maxNativeHistogramBuckets
Expand All @@ -406,8 +414,6 @@ func (c sampleValidationCfg) ReduceNativeHistogramOverMaxBuckets(_ string) bool
return c.reduceNativeHistogramOverMaxBuckets
}

func (c sampleValidationCfg) OutOfOrderTimeWindow(_ string) time.Duration { return 0 }

func TestMaxNativeHistorgramBuckets(t *testing.T) {
// All will have 2 buckets, one negative and one positive
testCases := map[string]mimirpb.Histogram{
Expand Down
5 changes: 4 additions & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1324,8 +1324,11 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre
var (
nativeHistogramsIngestionEnabled = i.limits.NativeHistogramsIngestionEnabled(userID)
maxTimestampMs = startAppend.Add(i.limits.CreationGracePeriod(userID)).UnixMilli()
minTimestampMs = startAppend.Add(-i.limits.PastGracePeriod(userID)).Add(-i.limits.OutOfOrderTimeWindow(userID)).UnixMilli()
minTimestampMs = int64(math.MinInt64)
)
if i.limits.PastGracePeriod(userID) > 0 {
minTimestampMs = startAppend.Add(-i.limits.PastGracePeriod(userID)).Add(-i.limits.OutOfOrderTimeWindow(userID)).UnixMilli()
}

var builder labels.ScratchBuilder
var nonCopiedLabels labels.Labels
Expand Down
Loading

0 comments on commit 2981c05

Please sign in to comment.