From e6d5c1d6dd874e0d82d21e2e8c54f5b16e393923 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Tue, 16 Jan 2024 15:49:45 +0100 Subject: [PATCH] Update vendored mimir-prometheus Signed-off-by: Marco Pracucci --- go.mod | 2 +- go.sum | 4 +- pkg/compactor/bucket_compactor_e2e_test.go | 4 +- pkg/compactor/split_merge_compactor.go | 2 +- pkg/ingester/ingester.go | 2 +- pkg/storage/tsdb/block/block_generator.go | 2 +- pkg/storegateway/bucket_test.go | 2 +- tools/tsdb-compact/main.go | 2 +- .../prometheus/prometheus/config/config.go | 3 + .../model/textparse/openmetricsparse.go | 1 - .../model/textparse/protobufparse.go | 17 +- .../prometheus/prometheus/promql/engine.go | 13 ++ .../prometheus/promql/parser/parse.go | 14 ++ .../storage/remote/queue_manager.go | 190 +++++++++++++++--- .../prometheus/prometheus/tsdb/blockwriter.go | 2 +- .../prometheus/prometheus/tsdb/compact.go | 49 ++++- .../prometheus/prometheus/tsdb/db.go | 27 +-- .../prometheus/prometheus/tsdb/index/index.go | 43 +++- .../prometheus/prometheus/web/api/v1/api.go | 10 +- vendor/modules.txt | 4 +- 20 files changed, 310 insertions(+), 83 deletions(-) diff --git a/go.mod b/go.mod index 8919691d29e..c9653455238 100644 --- a/go.mod +++ b/go.mod @@ -259,7 +259,7 @@ require ( ) // Using a fork of Prometheus with Mimir-specific changes. -replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240105142307-b0c6c0a2a31b +replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240116133529-e3a486e7f801 // Replace memberlist with our fork which includes some fixes that haven't been // merged upstream yet: diff --git a/go.sum b/go.sum index 6b9bb3e802c..11ae1b7fe7d 100644 --- a/go.sum +++ b/go.sum @@ -552,8 +552,8 @@ github.com/grafana/gomemcache v0.0.0-20231023152154-6947259a0586 h1:/of8Z8taCPft github.com/grafana/gomemcache v0.0.0-20231023152154-6947259a0586/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= -github.com/grafana/mimir-prometheus v0.0.0-20240105142307-b0c6c0a2a31b h1:Q68mIRbBG4tmb3ABonylCc+orLsYtHgXZ6lDw7B/S3w= -github.com/grafana/mimir-prometheus v0.0.0-20240105142307-b0c6c0a2a31b/go.mod h1:W4s/zaz2ypTeyg7h7HDJ4/g0+p5tXBWJ6ToK3g0a5zs= +github.com/grafana/mimir-prometheus v0.0.0-20240116133529-e3a486e7f801 h1:65eoE+Cwgi8PS+TBmdBn3xtS/JFeuTImzQI4GNDrhTQ= +github.com/grafana/mimir-prometheus v0.0.0-20240116133529-e3a486e7f801/go.mod h1:W4s/zaz2ypTeyg7h7HDJ4/g0+p5tXBWJ6ToK3g0a5zs= github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956 h1:em1oddjXL8c1tL0iFdtVtPloq2hRPen2MJQKoAWpxu0= github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956/go.mod h1:qtI1ogk+2JhVPIXVc6q+NHziSmy2W5GbdQZFUHADCBU= github.com/grafana/pyroscope-go/godeltaprof v0.1.6 h1:nEdZ8louGAplSvIJi1HVp7kWvFvdiiYg3COLlTwJiFo= diff --git a/pkg/compactor/bucket_compactor_e2e_test.go b/pkg/compactor/bucket_compactor_e2e_test.go index b4ecf151a94..ff815a84c95 100644 --- a/pkg/compactor/bucket_compactor_e2e_test.go +++ b/pkg/compactor/bucket_compactor_e2e_test.go @@ -236,7 +236,7 @@ func TestGroupCompactE2E(t *testing.T) { sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, blocksMarkedForDeletion) require.NoError(t, err) - comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil, nil, true) + comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil, nil) require.NoError(t, err) planner := NewSplitAndMergePlanner([]int64{1000, 3000}) @@ -714,7 +714,7 @@ func createBlockWithOptions( if err := g.Wait(); err != nil { return id, err } - c, err := tsdb.NewLeveledCompactor(ctx, nil, log.NewNopLogger(), []int64{maxt - mint}, nil, nil, true) + c, err := tsdb.NewLeveledCompactor(ctx, nil, log.NewNopLogger(), []int64{maxt - mint}, nil, nil) if err != nil { return id, errors.Wrap(err, "create compactor") } diff --git a/pkg/compactor/split_merge_compactor.go b/pkg/compactor/split_merge_compactor.go index b75dbc766c1..154c8454dc4 100644 --- a/pkg/compactor/split_merge_compactor.go +++ b/pkg/compactor/split_merge_compactor.go @@ -21,7 +21,7 @@ func splitAndMergeGrouperFactory(_ context.Context, cfg Config, cfgProvider Conf func splitAndMergeCompactorFactory(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (Compactor, Planner, error) { // We don't need to customise the TSDB compactor so we're just using the Prometheus one. - compactor, err := tsdb.NewLeveledCompactor(ctx, reg, logger, cfg.BlockRanges.ToMilliseconds(), nil, nil, true) + compactor, err := tsdb.NewLeveledCompactor(ctx, reg, logger, cfg.BlockRanges.ToMilliseconds(), nil, nil) if err != nil { return nil, nil, err } diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 7f7364d0ca4..4c765685a07 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -2380,7 +2380,7 @@ func (i *Ingester) createTSDB(userID string, walReplayConcurrency int) (*userTSD EnableMemorySnapshotOnShutdown: i.cfg.BlocksStorageConfig.TSDB.MemorySnapshotOnShutdown, IsolationDisabled: true, HeadChunksWriteQueueSize: i.cfg.BlocksStorageConfig.TSDB.HeadChunksWriteQueueSize, - AllowOverlappingCompaction: false, // always false since Mimir only uploads lvl 1 compacted blocks + EnableOverlappingCompaction: false, // always false since Mimir only uploads lvl 1 compacted blocks OutOfOrderTimeWindow: oooTW.Milliseconds(), // The unit must be same as our timestamps. OutOfOrderCapMax: int64(i.cfg.BlocksStorageConfig.TSDB.OutOfOrderCapacityMax), HeadPostingsForMatchersCacheTTL: i.cfg.BlocksStorageConfig.TSDB.HeadPostingsForMatchersCacheTTL, diff --git a/pkg/storage/tsdb/block/block_generator.go b/pkg/storage/tsdb/block/block_generator.go index 15ae8facf3b..27859823e58 100644 --- a/pkg/storage/tsdb/block/block_generator.go +++ b/pkg/storage/tsdb/block/block_generator.go @@ -266,7 +266,7 @@ func CreateBlock( if err := g.Wait(); err != nil { return id, err } - c, err := tsdb.NewLeveledCompactor(ctx, nil, log.NewNopLogger(), []int64{maxt - mint}, nil, nil, true) + c, err := tsdb.NewLeveledCompactor(ctx, nil, log.NewNopLogger(), []int64{maxt - mint}, nil, nil) if err != nil { return id, errors.Wrap(err, "create compactor") } diff --git a/pkg/storegateway/bucket_test.go b/pkg/storegateway/bucket_test.go index f0941142199..7878e3f2b6a 100644 --- a/pkg/storegateway/bucket_test.go +++ b/pkg/storegateway/bucket_test.go @@ -1093,7 +1093,7 @@ func appendTestSeries(series int) func(testing.TB, func() storage.Appender) { func createBlockFromHead(t testing.TB, dir string, head *tsdb.Head) ulid.ULID { // Put a 3 MiB limit on segment files so we can test with many segment files without creating too big blocks. - compactor, err := tsdb.NewLeveledCompactorWithChunkSize(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, 3*1024*1024, nil, true) + compactor, err := tsdb.NewLeveledCompactorWithChunkSize(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, 3*1024*1024, nil) assert.NoError(t, err) assert.NoError(t, os.MkdirAll(dir, 0777)) diff --git a/tools/tsdb-compact/main.go b/tools/tsdb-compact/main.go index da9975004d5..e04bd6d45cb 100644 --- a/tools/tsdb-compact/main.go +++ b/tools/tsdb-compact/main.go @@ -86,7 +86,7 @@ func main() { ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer cancel() - c, err := tsdb.NewLeveledCompactorWithChunkSize(ctx, nil, logger, []int64{0}, nil, segmentSizeMB*1024*1024, nil, true) + c, err := tsdb.NewLeveledCompactorWithChunkSize(ctx, nil, logger, []int64{0}, nil, segmentSizeMB*1024*1024, nil) if err != nil { log.Fatalln("creating compator", err) } diff --git a/vendor/github.com/prometheus/prometheus/config/config.go b/vendor/github.com/prometheus/prometheus/config/config.go index ddcca84dc78..c544d7e7489 100644 --- a/vendor/github.com/prometheus/prometheus/config/config.go +++ b/vendor/github.com/prometheus/prometheus/config/config.go @@ -1124,6 +1124,9 @@ type QueueConfig struct { MinBackoff model.Duration `yaml:"min_backoff,omitempty"` MaxBackoff model.Duration `yaml:"max_backoff,omitempty"` RetryOnRateLimit bool `yaml:"retry_on_http_429,omitempty"` + + // Samples older than the limit will be dropped. + SampleAgeLimit model.Duration `yaml:"sample_age_limit,omitempty"` } // MetadataConfig is the configuration for sending metadata to remote diff --git a/vendor/github.com/prometheus/prometheus/model/textparse/openmetricsparse.go b/vendor/github.com/prometheus/prometheus/model/textparse/openmetricsparse.go index ddfbe4fc5c2..4c15ff5fc04 100644 --- a/vendor/github.com/prometheus/prometheus/model/textparse/openmetricsparse.go +++ b/vendor/github.com/prometheus/prometheus/model/textparse/openmetricsparse.go @@ -136,7 +136,6 @@ func (p *OpenMetricsParser) Type() ([]byte, model.MetricType) { // Must only be called after Next returned a unit entry. // The returned byte slices become invalid after the next call to Next. func (p *OpenMetricsParser) Unit() ([]byte, []byte) { - // The Prometheus format does not have units. return p.l.b[p.offsets[0]:p.offsets[1]], p.text } diff --git a/vendor/github.com/prometheus/prometheus/model/textparse/protobufparse.go b/vendor/github.com/prometheus/prometheus/model/textparse/protobufparse.go index 534bbebb203..2c9c384b6b5 100644 --- a/vendor/github.com/prometheus/prometheus/model/textparse/protobufparse.go +++ b/vendor/github.com/prometheus/prometheus/model/textparse/protobufparse.go @@ -269,10 +269,11 @@ func (p *ProtobufParser) Type() ([]byte, model.MetricType) { return n, model.MetricTypeUnknown } -// Unit always returns (nil, nil) because units aren't supported by the protobuf -// format. +// Unit returns the metric unit in the current entry. +// Must only be called after Next returned a unit entry. +// The returned byte slices become invalid after the next call to Next. func (p *ProtobufParser) Unit() ([]byte, []byte) { - return nil, nil + return p.metricBytes.Bytes(), []byte(p.mf.GetUnit()) } // Comment always returns nil because comments aren't supported by the protobuf @@ -422,6 +423,16 @@ func (p *ProtobufParser) Next() (Entry, error) { default: return EntryInvalid, fmt.Errorf("unknown metric type for metric %q: %s", name, p.mf.GetType()) } + unit := p.mf.GetUnit() + if len(unit) > 0 { + if p.mf.GetType() == dto.MetricType_COUNTER && strings.HasSuffix(name, "_total") { + if !strings.HasSuffix(name[:len(name)-6], unit) || len(name)-6 < len(unit)+1 || name[len(name)-6-len(unit)-1] != '_' { + return EntryInvalid, fmt.Errorf("unit %q not a suffix of counter %q", unit, name) + } + } else if !strings.HasSuffix(name, unit) || len(name) < len(unit)+1 || name[len(name)-len(unit)-1] != '_' { + return EntryInvalid, fmt.Errorf("unit %q not a suffix of metric %q", unit, name) + } + } p.metricBytes.Reset() p.metricBytes.WriteString(name) diff --git a/vendor/github.com/prometheus/prometheus/promql/engine.go b/vendor/github.com/prometheus/prometheus/promql/engine.go index cca43a51e89..d1cc719be21 100644 --- a/vendor/github.com/prometheus/prometheus/promql/engine.go +++ b/vendor/github.com/prometheus/prometheus/promql/engine.go @@ -24,6 +24,7 @@ import ( "runtime" "sort" "strconv" + "strings" "sync" "time" @@ -1544,6 +1545,18 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio } } ev.samplesStats.UpdatePeak(ev.currentSamples) + + if e.Func.Name == "rate" || e.Func.Name == "increase" { + samples := inMatrix[0] + metricName := samples.Metric.Get(labels.MetricName) + if metricName != "" && len(samples.Floats) > 0 && + !strings.HasSuffix(metricName, "_total") && + !strings.HasSuffix(metricName, "_sum") && + !strings.HasSuffix(metricName, "_count") && + !strings.HasSuffix(metricName, "_bucket") { + warnings.Add(annotations.NewPossibleNonCounterInfo(metricName, e.Args[0].PositionRange())) + } + } } ev.samplesStats.UpdatePeak(ev.currentSamples) diff --git a/vendor/github.com/prometheus/prometheus/promql/parser/parse.go b/vendor/github.com/prometheus/prometheus/promql/parser/parse.go index 122286c5517..c2a42ed153a 100644 --- a/vendor/github.com/prometheus/prometheus/promql/parser/parse.go +++ b/vendor/github.com/prometheus/prometheus/promql/parser/parse.go @@ -208,6 +208,20 @@ func ParseMetricSelector(input string) (m []*labels.Matcher, err error) { return m, err } +// ParseMetricSelectors parses a list of provided textual metric selectors into lists of +// label matchers. +func ParseMetricSelectors(matchers []string) (m [][]*labels.Matcher, err error) { + var matcherSets [][]*labels.Matcher + for _, s := range matchers { + matchers, err := ParseMetricSelector(s) + if err != nil { + return nil, err + } + matcherSets = append(matcherSets, matchers) + } + return matcherSets, nil +} + // SequenceValue is an omittable value in a sequence of time series values. type SequenceValue struct { Value float64 diff --git a/vendor/github.com/prometheus/prometheus/storage/remote/queue_manager.go b/vendor/github.com/prometheus/prometheus/storage/remote/queue_manager.go index a25c7d90c07..e37ec8c7052 100644 --- a/vendor/github.com/prometheus/prometheus/storage/remote/queue_manager.go +++ b/vendor/github.com/prometheus/prometheus/storage/remote/queue_manager.go @@ -36,6 +36,7 @@ import ( "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" + "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/tsdb/chunks" @@ -51,6 +52,10 @@ const ( // Allow 30% too many shards before scaling down. shardToleranceFraction = 0.3 + + reasonTooOld = "too_old" + reasonDroppedSeries = "dropped_series" + reasonUnintentionalDroppedSeries = "unintentionally_dropped_series" ) type queueManagerMetrics struct { @@ -68,9 +73,9 @@ type queueManagerMetrics struct { retriedExemplarsTotal prometheus.Counter retriedHistogramsTotal prometheus.Counter retriedMetadataTotal prometheus.Counter - droppedSamplesTotal prometheus.Counter - droppedExemplarsTotal prometheus.Counter - droppedHistogramsTotal prometheus.Counter + droppedSamplesTotal *prometheus.CounterVec + droppedExemplarsTotal *prometheus.CounterVec + droppedHistogramsTotal *prometheus.CounterVec enqueueRetriesTotal prometheus.Counter sentBatchDuration prometheus.Histogram highestSentTimestamp *maxTimestamp @@ -180,27 +185,27 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager Help: "Total number of metadata entries which failed on send to remote storage but were retried because the send error was recoverable.", ConstLabels: constLabels, }) - m.droppedSamplesTotal = prometheus.NewCounter(prometheus.CounterOpts{ + m.droppedSamplesTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, Name: "samples_dropped_total", - Help: "Total number of samples which were dropped after being read from the WAL before being sent via remote write, either via relabelling or unintentionally because of an unknown reference ID.", + Help: "Total number of samples which were dropped after being read from the WAL before being sent via remote write, either via relabelling, due to being too old or unintentionally because of an unknown reference ID.", ConstLabels: constLabels, - }) - m.droppedExemplarsTotal = prometheus.NewCounter(prometheus.CounterOpts{ + }, []string{"reason"}) + m.droppedExemplarsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, Name: "exemplars_dropped_total", - Help: "Total number of exemplars which were dropped after being read from the WAL before being sent via remote write, either via relabelling or unintentionally because of an unknown reference ID.", + Help: "Total number of exemplars which were dropped after being read from the WAL before being sent via remote write, either via relabelling, due to being too old or unintentionally because of an unknown reference ID.", ConstLabels: constLabels, - }) - m.droppedHistogramsTotal = prometheus.NewCounter(prometheus.CounterOpts{ + }, []string{"reason"}) + m.droppedHistogramsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, Name: "histograms_dropped_total", - Help: "Total number of histograms which were dropped after being read from the WAL before being sent via remote write, either via relabelling or unintentionally because of an unknown reference ID.", + Help: "Total number of histograms which were dropped after being read from the WAL before being sent via remote write, either via relabelling, due to being too old or unintentionally because of an unknown reference ID.", ConstLabels: constLabels, - }) + }, []string{"reason"}) m.enqueueRetriesTotal = prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, @@ -391,7 +396,8 @@ type WriteClient interface { // indicated by the provided WriteClient. Implements writeTo interface // used by WAL Watcher. type QueueManager struct { - lastSendTimestamp atomic.Int64 + lastSendTimestamp atomic.Int64 + buildRequestLimitTimestamp atomic.Int64 logger log.Logger flushDeadline time.Duration @@ -529,7 +535,7 @@ func (t *QueueManager) AppendMetadata(ctx context.Context, metadata []scrape.Met func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []prompb.MetricMetadata, pBuf *proto.Buffer) error { // Build the WriteRequest with no samples. - req, _, err := buildWriteRequest(nil, metadata, pBuf, nil) + req, _, _, err := buildWriteRequest(t.logger, nil, metadata, pBuf, nil, nil) if err != nil { return err } @@ -575,18 +581,65 @@ func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []p return nil } +func isSampleOld(baseTime time.Time, sampleAgeLimit time.Duration, ts int64) bool { + if sampleAgeLimit == 0 { + // If sampleAgeLimit is unset, then we never skip samples due to their age. + return false + } + limitTs := baseTime.Add(-sampleAgeLimit) + sampleTs := timestamp.Time(ts) + return sampleTs.Before(limitTs) +} + +func isTimeSeriesOldFilter(metrics *queueManagerMetrics, baseTime time.Time, sampleAgeLimit time.Duration) func(ts prompb.TimeSeries) bool { + return func(ts prompb.TimeSeries) bool { + if sampleAgeLimit == 0 { + // If sampleAgeLimit is unset, then we never skip samples due to their age. + return false + } + switch { + // Only the first element should be set in the series, therefore we only check the first element. + case len(ts.Samples) > 0: + if isSampleOld(baseTime, sampleAgeLimit, ts.Samples[0].Timestamp) { + metrics.droppedSamplesTotal.WithLabelValues(reasonTooOld).Inc() + return true + } + case len(ts.Histograms) > 0: + if isSampleOld(baseTime, sampleAgeLimit, ts.Histograms[0].Timestamp) { + metrics.droppedHistogramsTotal.WithLabelValues(reasonTooOld).Inc() + return true + } + case len(ts.Exemplars) > 0: + if isSampleOld(baseTime, sampleAgeLimit, ts.Exemplars[0].Timestamp) { + metrics.droppedExemplarsTotal.WithLabelValues(reasonTooOld).Inc() + return true + } + default: + return false + } + return false + } +} + // Append queues a sample to be sent to the remote storage. Blocks until all samples are // enqueued on their shards or a shutdown signal is received. func (t *QueueManager) Append(samples []record.RefSample) bool { + currentTime := time.Now() outer: for _, s := range samples { + if isSampleOld(currentTime, time.Duration(t.cfg.SampleAgeLimit), s.T) { + t.metrics.droppedSamplesTotal.WithLabelValues(reasonTooOld).Inc() + continue + } t.seriesMtx.Lock() lbls, ok := t.seriesLabels[s.Ref] if !ok { - t.metrics.droppedSamplesTotal.Inc() t.dataDropped.incr(1) if _, ok := t.droppedSeries[s.Ref]; !ok { level.Info(t.logger).Log("msg", "Dropped sample for series that was not explicitly dropped via relabelling", "ref", s.Ref) + t.metrics.droppedSamplesTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc() + } else { + t.metrics.droppedSamplesTotal.WithLabelValues(reasonDroppedSeries).Inc() } t.seriesMtx.Unlock() continue @@ -629,17 +682,23 @@ func (t *QueueManager) AppendExemplars(exemplars []record.RefExemplar) bool { if !t.sendExemplars { return true } - + currentTime := time.Now() outer: for _, e := range exemplars { + if isSampleOld(currentTime, time.Duration(t.cfg.SampleAgeLimit), e.T) { + t.metrics.droppedExemplarsTotal.WithLabelValues(reasonTooOld).Inc() + continue + } t.seriesMtx.Lock() lbls, ok := t.seriesLabels[e.Ref] if !ok { - t.metrics.droppedExemplarsTotal.Inc() // Track dropped exemplars in the same EWMA for sharding calc. t.dataDropped.incr(1) if _, ok := t.droppedSeries[e.Ref]; !ok { level.Info(t.logger).Log("msg", "Dropped exemplar for series that was not explicitly dropped via relabelling", "ref", e.Ref) + t.metrics.droppedExemplarsTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc() + } else { + t.metrics.droppedExemplarsTotal.WithLabelValues(reasonDroppedSeries).Inc() } t.seriesMtx.Unlock() continue @@ -678,16 +737,22 @@ func (t *QueueManager) AppendHistograms(histograms []record.RefHistogramSample) if !t.sendNativeHistograms { return true } - + currentTime := time.Now() outer: for _, h := range histograms { + if isSampleOld(currentTime, time.Duration(t.cfg.SampleAgeLimit), h.T) { + t.metrics.droppedHistogramsTotal.WithLabelValues(reasonTooOld).Inc() + continue + } t.seriesMtx.Lock() lbls, ok := t.seriesLabels[h.Ref] if !ok { - t.metrics.droppedHistogramsTotal.Inc() t.dataDropped.incr(1) if _, ok := t.droppedSeries[h.Ref]; !ok { level.Info(t.logger).Log("msg", "Dropped histogram for series that was not explicitly dropped via relabelling", "ref", h.Ref) + t.metrics.droppedHistogramsTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc() + } else { + t.metrics.droppedHistogramsTotal.WithLabelValues(reasonDroppedSeries).Inc() } t.seriesMtx.Unlock() continue @@ -725,16 +790,22 @@ func (t *QueueManager) AppendFloatHistograms(floatHistograms []record.RefFloatHi if !t.sendNativeHistograms { return true } - + currentTime := time.Now() outer: for _, h := range floatHistograms { + if isSampleOld(currentTime, time.Duration(t.cfg.SampleAgeLimit), h.T) { + t.metrics.droppedHistogramsTotal.WithLabelValues(reasonTooOld).Inc() + continue + } t.seriesMtx.Lock() lbls, ok := t.seriesLabels[h.Ref] if !ok { - t.metrics.droppedHistogramsTotal.Inc() t.dataDropped.incr(1) if _, ok := t.droppedSeries[h.Ref]; !ok { level.Info(t.logger).Log("msg", "Dropped histogram for series that was not explicitly dropped via relabelling", "ref", h.Ref) + t.metrics.droppedHistogramsTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc() + } else { + t.metrics.droppedHistogramsTotal.WithLabelValues(reasonDroppedSeries).Inc() } t.seriesMtx.Unlock() continue @@ -1490,7 +1561,8 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s // sendSamples to the remote storage with backoff for recoverable errors. func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) error { // Build the WriteRequest with no metadata. - req, highest, err := buildWriteRequest(samples, nil, pBuf, *buf) + req, highest, lowest, err := buildWriteRequest(s.qm.logger, samples, nil, pBuf, *buf, nil) + s.qm.buildRequestLimitTimestamp.Store(lowest) if err != nil { // Failing to build the write request is non-recoverable, since it will // only error if marshaling the proto to bytes fails. @@ -1504,6 +1576,25 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti // without causing a memory leak, and it has the nice effect of not propagating any // parameters for sendSamplesWithBackoff/3. attemptStore := func(try int) error { + currentTime := time.Now() + lowest := s.qm.buildRequestLimitTimestamp.Load() + if isSampleOld(currentTime, time.Duration(s.qm.cfg.SampleAgeLimit), lowest) { + // This will filter out old samples during retries. + req, _, lowest, err := buildWriteRequest( + s.qm.logger, + samples, + nil, + pBuf, + *buf, + isTimeSeriesOldFilter(s.qm.metrics, currentTime, time.Duration(s.qm.cfg.SampleAgeLimit)), + ) + s.qm.buildRequestLimitTimestamp.Store(lowest) + if err != nil { + return err + } + *buf = req + } + ctx, span := otel.Tracer("").Start(ctx, "Remote Send Batch") defer span.End() @@ -1608,9 +1699,27 @@ func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l } } -func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, buf []byte) ([]byte, int64, error) { +func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(prompb.TimeSeries) bool) (int64, int64, []prompb.TimeSeries, int, int, int) { var highest int64 - for _, ts := range samples { + var lowest int64 + var droppedSamples, droppedExemplars, droppedHistograms int + + keepIdx := 0 + lowest = math.MaxInt64 + for i, ts := range timeSeries { + if filter != nil && filter(ts) { + if len(ts.Samples) > 0 { + droppedSamples++ + } + if len(ts.Exemplars) > 0 { + droppedExemplars++ + } + if len(ts.Histograms) > 0 { + droppedHistograms++ + } + continue + } + // At the moment we only ever append a TimeSeries with a single sample or exemplar in it. if len(ts.Samples) > 0 && ts.Samples[0].Timestamp > highest { highest = ts.Samples[0].Timestamp @@ -1621,10 +1730,37 @@ func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMeta if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp > highest { highest = ts.Histograms[0].Timestamp } + + // Get lowest timestamp + if len(ts.Samples) > 0 && ts.Samples[0].Timestamp < lowest { + lowest = ts.Samples[0].Timestamp + } + if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp < lowest { + lowest = ts.Exemplars[0].Timestamp + } + if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp < lowest { + lowest = ts.Histograms[0].Timestamp + } + + // Move the current element to the write position and increment the write pointer + timeSeries[keepIdx] = timeSeries[i] + keepIdx++ + } + + timeSeries = timeSeries[:keepIdx] + return highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms +} + +func buildWriteRequest(logger log.Logger, timeSeries []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, buf []byte, filter func(prompb.TimeSeries) bool) ([]byte, int64, int64, error) { + highest, lowest, timeSeries, + droppedSamples, droppedExemplars, droppedHistograms := buildTimeSeries(timeSeries, filter) + + if droppedSamples > 0 || droppedExemplars > 0 || droppedHistograms > 0 { + level.Debug(logger).Log("msg", "dropped data due to their age", "droppedSamples", droppedSamples, "droppedExemplars", droppedExemplars, "droppedHistograms", droppedHistograms) } req := &prompb.WriteRequest{ - Timeseries: samples, + Timeseries: timeSeries, Metadata: metadata, } @@ -1635,7 +1771,7 @@ func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMeta } err := pBuf.Marshal(req) if err != nil { - return nil, highest, err + return nil, highest, lowest, err } // snappy uses len() to see if it needs to allocate a new slice. Make the @@ -1644,5 +1780,5 @@ func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMeta buf = buf[0:cap(buf)] } compressed := snappy.Encode(buf, pBuf.Bytes()) - return compressed, highest, nil + return compressed, highest, lowest, nil } diff --git a/vendor/github.com/prometheus/prometheus/tsdb/blockwriter.go b/vendor/github.com/prometheus/prometheus/tsdb/blockwriter.go index 872c7653023..73bc5f1e35b 100644 --- a/vendor/github.com/prometheus/prometheus/tsdb/blockwriter.go +++ b/vendor/github.com/prometheus/prometheus/tsdb/blockwriter.go @@ -101,7 +101,7 @@ func (w *BlockWriter) Flush(ctx context.Context) (ulid.ULID, error) { nil, w.logger, []int64{w.blockSize}, - chunkenc.NewPool(), nil, true) + chunkenc.NewPool(), nil) if err != nil { return ulid.ULID{}, fmt.Errorf("create leveled compactor: %w", err) } diff --git a/vendor/github.com/prometheus/prometheus/tsdb/compact.go b/vendor/github.com/prometheus/prometheus/tsdb/compact.go index d2fd5967a74..9fa7b9f82a8 100644 --- a/vendor/github.com/prometheus/prometheus/tsdb/compact.go +++ b/vendor/github.com/prometheus/prometheus/tsdb/compact.go @@ -90,9 +90,9 @@ type LeveledCompactor struct { ctx context.Context maxBlockChunkSegmentSize int64 mergeFunc storage.VerticalChunkSeriesMergeFunc + postingsEncoder index.PostingsEncoder enableOverlappingCompaction bool - - concurrencyOpts LeveledCompactorConcurrencyOptions + concurrencyOpts LeveledCompactorConcurrencyOptions } type CompactorMetrics struct { @@ -155,12 +155,35 @@ func newCompactorMetrics(r prometheus.Registerer) *CompactorMetrics { return m } -// NewLeveledCompactor returns a LeveledCompactor. -func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, enableOverlappingCompaction bool) (*LeveledCompactor, error) { - return NewLeveledCompactorWithChunkSize(ctx, r, l, ranges, pool, chunks.DefaultChunkSegmentSize, mergeFunc, enableOverlappingCompaction) +type LeveledCompactorOptions struct { + // PE specifies the postings encoder. It is called when compactor is writing out the postings for a label name/value pair during compaction. + // If it is nil then the default encoder is used. At the moment that is the "raw" encoder. See index.EncodePostingsRaw for more. + PE index.PostingsEncoder + // MaxBlockChunkSegmentSize is the max block chunk segment size. If it is 0 then the default chunks.DefaultChunkSegmentSize is used. + MaxBlockChunkSegmentSize int64 + // MergeFunc is used for merging series together in vertical compaction. By default storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge) is used. + MergeFunc storage.VerticalChunkSeriesMergeFunc + // EnableOverlappingCompaction enables compaction of overlapping blocks. In Prometheus it is always enabled. + // It is useful for downstream projects like Mimir, Cortex, Thanos where they have a separate component that does compaction. + EnableOverlappingCompaction bool } -func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc, enableOverlappingCompaction bool) (*LeveledCompactor, error) { +func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) { + return NewLeveledCompactorWithOptions(ctx, r, l, ranges, pool, LeveledCompactorOptions{ + MaxBlockChunkSegmentSize: maxBlockChunkSegmentSize, + MergeFunc: mergeFunc, + EnableOverlappingCompaction: true, + }) +} + +func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) { + return NewLeveledCompactorWithOptions(ctx, r, l, ranges, pool, LeveledCompactorOptions{ + MergeFunc: mergeFunc, + EnableOverlappingCompaction: true, + }) +} + +func NewLeveledCompactorWithOptions(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, opts LeveledCompactorOptions) (*LeveledCompactor, error) { if len(ranges) == 0 { return nil, fmt.Errorf("at least one range must be provided") } @@ -170,9 +193,18 @@ func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Register if l == nil { l = log.NewNopLogger() } + mergeFunc := opts.MergeFunc if mergeFunc == nil { mergeFunc = storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge) } + maxBlockChunkSegmentSize := opts.MaxBlockChunkSegmentSize + if maxBlockChunkSegmentSize == 0 { + maxBlockChunkSegmentSize = chunks.DefaultChunkSegmentSize + } + pe := opts.PE + if pe == nil { + pe = index.EncodePostingsRaw + } return &LeveledCompactor{ ranges: ranges, chunkPool: pool, @@ -181,8 +213,9 @@ func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Register ctx: ctx, maxBlockChunkSegmentSize: maxBlockChunkSegmentSize, mergeFunc: mergeFunc, + postingsEncoder: pe, + enableOverlappingCompaction: opts.EnableOverlappingCompaction, concurrencyOpts: DefaultLeveledCompactorConcurrencyOptions(), - enableOverlappingCompaction: enableOverlappingCompaction, }, nil } @@ -810,7 +843,7 @@ func (c *LeveledCompactor) write(dest string, outBlocks []shardedBlock, blockPop outBlocks[ix].chunkw = chunkw var indexw IndexWriter - indexw, err = index.NewWriter(c.ctx, filepath.Join(tmp, indexFilename)) + indexw, err = index.NewWriterWithEncoder(c.ctx, filepath.Join(tmp, indexFilename), c.postingsEncoder) if err != nil { return fmt.Errorf("open index writer: %w", err) } diff --git a/vendor/github.com/prometheus/prometheus/tsdb/db.go b/vendor/github.com/prometheus/prometheus/tsdb/db.go index da8f42fe2b6..9546bf97c46 100644 --- a/vendor/github.com/prometheus/prometheus/tsdb/db.go +++ b/vendor/github.com/prometheus/prometheus/tsdb/db.go @@ -78,7 +78,6 @@ func DefaultOptions() *Options { MinBlockDuration: DefaultBlockDuration, MaxBlockDuration: DefaultBlockDuration, NoLockfile: false, - AllowOverlappingCompaction: true, SamplesPerChunk: DefaultSamplesPerChunk, WALCompression: wlog.CompressionNone, StripeSize: DefaultStripeSize, @@ -87,6 +86,7 @@ func DefaultOptions() *Options { HeadChunksEndTimeVariance: 0, HeadChunksWriteQueueSize: chunks.DefaultWriteQueueSize, OutOfOrderCapMax: DefaultOutOfOrderCapMax, + EnableOverlappingCompaction: true, HeadPostingsForMatchersCacheTTL: DefaultPostingsForMatchersCacheTTL, HeadPostingsForMatchersCacheMaxItems: DefaultPostingsForMatchersCacheMaxItems, HeadPostingsForMatchersCacheMaxBytes: DefaultPostingsForMatchersCacheMaxBytes, @@ -126,14 +126,6 @@ type Options struct { // NoLockfile disables creation and consideration of a lock file. NoLockfile bool - // Compaction of overlapping blocks are allowed if AllowOverlappingCompaction is true. - // This is an optional flag for overlapping blocks. - // The reason why this flag exists is because there are various users of the TSDB - // that do not want vertical compaction happening on ingest time. Instead, - // they'd rather keep overlapping blocks and let another component do the overlapping compaction later. - // For Prometheus, this will always be true. - AllowOverlappingCompaction bool - // WALCompression configures the compression type to use on records in the WAL. WALCompression wlog.CompressionType @@ -206,6 +198,14 @@ type Options struct { // If it is <=0, the default value is assumed. OutOfOrderCapMax int64 + // Compaction of overlapping blocks are allowed if EnableOverlappingCompaction is true. + // This is an optional flag for overlapping blocks. + // The reason why this flag exists is because there are various users of the TSDB + // that do not want vertical compaction happening on ingest time. Instead, + // they'd rather keep overlapping blocks and let another component do the overlapping compaction later. + // For Prometheus, this will always be true. + EnableOverlappingCompaction bool + // HeadPostingsForMatchersCacheTTL is the TTL of the postings for matchers cache in the Head. // If it's 0, the cache will only deduplicate in-flight requests, deleting the results once the first request has finished. HeadPostingsForMatchersCacheTTL time.Duration @@ -505,9 +505,7 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) { nil, db.logger, ExponentialBlockRanges(DefaultOptions().MinBlockDuration, 3, 5), - chunkenc.NewPool(), - nil, - false, + chunkenc.NewPool(), nil, ) if err != nil { return fmt.Errorf("create leveled compactor: %w", err) @@ -884,7 +882,10 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs } ctx, cancel := context.WithCancel(context.Background()) - db.compactor, err = NewLeveledCompactorWithChunkSize(ctx, r, l, rngs, db.chunkPool, opts.MaxBlockChunkSegmentSize, nil, opts.AllowOverlappingCompaction) + db.compactor, err = NewLeveledCompactorWithOptions(ctx, r, l, rngs, db.chunkPool, LeveledCompactorOptions{ + MaxBlockChunkSegmentSize: opts.MaxBlockChunkSegmentSize, + EnableOverlappingCompaction: opts.EnableOverlappingCompaction, + }) if err != nil { cancel() return nil, fmt.Errorf("create leveled compactor: %w", err) diff --git a/vendor/github.com/prometheus/prometheus/tsdb/index/index.go b/vendor/github.com/prometheus/prometheus/tsdb/index/index.go index 09812ddf48f..181e80570a5 100644 --- a/vendor/github.com/prometheus/prometheus/tsdb/index/index.go +++ b/vendor/github.com/prometheus/prometheus/tsdb/index/index.go @@ -111,6 +111,8 @@ type symbolCacheEntry struct { lastValue string } +type PostingsEncoder func(*encoding.Encbuf, []uint32) error + // Writer implements the IndexWriter interface for the standard // serialization format. type Writer struct { @@ -149,6 +151,8 @@ type Writer struct { crc32 hash.Hash Version int + + postingsEncoder PostingsEncoder } // TOC represents index Table Of Content that states where each section of index starts. @@ -187,7 +191,8 @@ func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) { } // NewWriter returns a new Writer to the given filename. It serializes data in format version 2. -func NewWriter(ctx context.Context, fn string) (*Writer, error) { +// It uses the given encoder to encode each postings list. +func NewWriterWithEncoder(ctx context.Context, fn string, encoder PostingsEncoder) (*Writer, error) { dir := filepath.Dir(fn) df, err := fileutil.OpenDir(dir) @@ -230,9 +235,10 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) { buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, - symbolCache: make(map[string]symbolCacheEntry, 1<<8), - labelNames: make(map[string]uint64, 1<<8), - crc32: newCRC32(), + symbolCache: make(map[string]symbolCacheEntry, 1<<8), + labelNames: make(map[string]uint64, 1<<8), + crc32: newCRC32(), + postingsEncoder: encoder, } if err := iw.writeMeta(); err != nil { return nil, err @@ -240,6 +246,12 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) { return iw, nil } +// NewWriter creates a new index writer using the default encoder. See +// NewWriterWithEncoder. +func NewWriter(ctx context.Context, fn string) (*Writer, error) { + return NewWriterWithEncoder(ctx, fn, EncodePostingsRaw) +} + func (w *Writer) write(bufs ...[]byte) error { return w.f.Write(bufs...) } @@ -942,6 +954,20 @@ func (w *Writer) writePostingsToTmpFiles() error { return nil } +// EncodePostingsRaw uses the "basic" postings list encoding format with no compression: +// .... +func EncodePostingsRaw(e *encoding.Encbuf, offs []uint32) error { + e.PutBE32int(len(offs)) + + for _, off := range offs { + if off > (1<<32)-1 { + return fmt.Errorf("series offset %d exceeds 4 bytes", off) + } + e.PutBE32(off) + } + return nil +} + func (w *Writer) writePosting(name, value string, offs []uint32) error { // Align beginning to 4 bytes for more efficient postings list scans. if err := w.fP.AddPadding(4); err != nil { @@ -960,13 +986,8 @@ func (w *Writer) writePosting(name, value string, offs []uint32) error { w.cntPO++ w.buf1.Reset() - w.buf1.PutBE32int(len(offs)) - - for _, off := range offs { - if off > (1<<32)-1 { - return fmt.Errorf("series offset %d exceeds 4 bytes", off) - } - w.buf1.PutBE32(off) + if err := w.postingsEncoder(&w.buf1, offs); err != nil { + return err } w.buf2.Reset() diff --git a/vendor/github.com/prometheus/prometheus/web/api/v1/api.go b/vendor/github.com/prometheus/prometheus/web/api/v1/api.go index dd35d1fe995..d9d4cfd1df8 100644 --- a/vendor/github.com/prometheus/prometheus/web/api/v1/api.go +++ b/vendor/github.com/prometheus/prometheus/web/api/v1/api.go @@ -1848,13 +1848,9 @@ func parseDuration(s string) (time.Duration, error) { } func parseMatchersParam(matchers []string) ([][]*labels.Matcher, error) { - var matcherSets [][]*labels.Matcher - for _, s := range matchers { - matchers, err := parser.ParseMetricSelector(s) - if err != nil { - return nil, err - } - matcherSets = append(matcherSets, matchers) + matcherSets, err := parser.ParseMetricSelectors(matchers) + if err != nil { + return nil, err } OUTER: diff --git a/vendor/modules.txt b/vendor/modules.txt index 871b503f927..f27718539d5 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -906,7 +906,7 @@ github.com/prometheus/exporter-toolkit/web github.com/prometheus/procfs github.com/prometheus/procfs/internal/fs github.com/prometheus/procfs/internal/util -# github.com/prometheus/prometheus v1.99.0 => github.com/grafana/mimir-prometheus v0.0.0-20240105142307-b0c6c0a2a31b +# github.com/prometheus/prometheus v1.99.0 => github.com/grafana/mimir-prometheus v0.0.0-20240116133529-e3a486e7f801 ## explicit; go 1.20 github.com/prometheus/prometheus/config github.com/prometheus/prometheus/discovery @@ -1523,7 +1523,7 @@ sigs.k8s.io/kustomize/kyaml/yaml/walk sigs.k8s.io/yaml sigs.k8s.io/yaml/goyaml.v2 sigs.k8s.io/yaml/goyaml.v3 -# github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240105142307-b0c6c0a2a31b +# github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240116133529-e3a486e7f801 # github.com/hashicorp/memberlist => github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe # gopkg.in/yaml.v3 => github.com/colega/go-yaml-yaml v0.0.0-20220720105220-255a8d16d094 # github.com/grafana/regexp => github.com/grafana/regexp v0.0.0-20221005093135-b4c2bcb0a4b6