From 36b35d16bec2fd895d74c3f3209e533827432d6b Mon Sep 17 00:00:00 2001 From: George Krajcsovits Date: Fri, 26 Apr 2024 13:45:28 +0200 Subject: [PATCH] ingester: active_native_histograms_postings (#7982) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * ingester: active_native_histograms_postings Be able to return active native histograms postings so that we can list them on an API later. Mostly copy paste from active_postings. Signed-off-by: György Krajcsovits --- .../active_native_histogram_postings.go | 76 +++++++ .../active_native_histogram_postings_test.go | 201 ++++++++++++++++++ .../activeseries/active_postings_test.go | 4 +- pkg/ingester/activeseries/active_series.go | 16 ++ 4 files changed, 295 insertions(+), 2 deletions(-) create mode 100644 pkg/ingester/activeseries/active_native_histogram_postings.go create mode 100644 pkg/ingester/activeseries/active_native_histogram_postings_test.go diff --git a/pkg/ingester/activeseries/active_native_histogram_postings.go b/pkg/ingester/activeseries/active_native_histogram_postings.go new file mode 100644 index 00000000000..a77994ddc85 --- /dev/null +++ b/pkg/ingester/activeseries/active_native_histogram_postings.go @@ -0,0 +1,76 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package activeseries + +import ( + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/index" +) + +// NativeHistogramPostings is a wrapper around ActiveSeries and index.Postings. +// Similar to Postings, but filters its output to native histogram series. +// Implements index.Postings interface and returns only series references that +// are active in ActiveSeries and are native histograms. +// It is only valid to use NativeHistogramPostings if the postings are from the +// open TSDB head. It is not valid to use NativeHistogramPostings if the +// postings are from a block. +type NativeHistogramPostings struct { + activeSeries *ActiveSeries + postings index.Postings + currentBucketCount int +} + +func NewNativeHistogramPostings(activeSeries *ActiveSeries, postings index.Postings) *NativeHistogramPostings { + return &NativeHistogramPostings{ + activeSeries: activeSeries, + postings: postings, + } +} + +// Type check. +var _ index.Postings = &NativeHistogramPostings{} + +// At implements index.Postings. +func (a *NativeHistogramPostings) At() storage.SeriesRef { + return a.postings.At() +} + +// AtBucketCount returns the current bucket count for the series reference at the current position. +func (a *NativeHistogramPostings) AtBucketCount() (storage.SeriesRef, int) { + return a.postings.At(), a.currentBucketCount +} + +// Err implements index.Postings. +func (a *NativeHistogramPostings) Err() error { + return a.postings.Err() +} + +// Next implements index.Postings. +func (a *NativeHistogramPostings) Next() bool { + for a.postings.Next() { + if count, ok := a.activeSeries.NativeHistogramBuckets(a.postings.At()); ok { + a.currentBucketCount = count + return true + } + } + return false +} + +// Seek implements index.Postings. +func (a *NativeHistogramPostings) Seek(v storage.SeriesRef) bool { + // Seek in the underlying postings. + // If the underlying postings don't contain a value, return false. + if !a.postings.Seek(v) { + return false + } + + // If the underlying postings contain a value, check if it's active. + if count, ok := a.activeSeries.NativeHistogramBuckets(a.postings.At()); ok { + a.currentBucketCount = count + return true + } + + // If the underlying postings contain a value, but it's not active, + // seek to the next active value. + return a.Next() +} diff --git a/pkg/ingester/activeseries/active_native_histogram_postings_test.go b/pkg/ingester/activeseries/active_native_histogram_postings_test.go new file mode 100644 index 00000000000..294c0f66eb3 --- /dev/null +++ b/pkg/ingester/activeseries/active_native_histogram_postings_test.go @@ -0,0 +1,201 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package activeseries + +import ( + "testing" + "time" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/index" + "github.com/stretchr/testify/require" +) + +func TestNativeHistogramPostings_Expand(t *testing.T) { + ttl := 3 + mockedTime := time.Unix(int64(ttl), 0) + series := []labels.Labels{ + labels.FromStrings("a", "1"), + labels.FromStrings("a", "2"), + labels.FromStrings("a", "3"), // Will make this series a native histogram. + labels.FromStrings("a", "4"), // Will make this series a native histogram. + labels.FromStrings("a", "5"), + } + allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} + storagePostings := index.NewListPostings(allStorageRefs) + activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl)) + + // Update each series at a different time according to its index. + for i := range allStorageRefs { + buckets := -1 // No native histogram buckets. + if i+1 == 3 || i+1 == 4 { + buckets = 10 // Native histogram with 10 buckets. + } + activeSeries.UpdateSeries(series[i], allStorageRefs[i], time.Unix(int64(i), 0), buckets) + } + + valid := activeSeries.Purge(mockedTime) + allActive, _, _, _, _, _ := activeSeries.ActiveWithMatchers() + require.True(t, valid) + require.Equal(t, 2, allActive) + + activeSeriesPostings := NewNativeHistogramPostings(activeSeries, storagePostings) + + activeRefs, err := index.ExpandPostings(activeSeriesPostings) + require.NoError(t, err) + + require.Equal(t, allStorageRefs[3:4], activeRefs) +} + +func TestNativeHistogramPostings_ExpandWithBucketCount(t *testing.T) { + ttl := 0 + mockedTime := time.Unix(int64(ttl), 0) + series := []labels.Labels{ + labels.FromStrings("a", "1"), + labels.FromStrings("a", "2"), + labels.FromStrings("a", "3"), // Will make this series a native histogram. + labels.FromStrings("a", "4"), // Will make this series a native histogram. + labels.FromStrings("a", "5"), + } + allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} + storagePostings := index.NewListPostings(allStorageRefs) + activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl)) + + // Update each series at a different time according to its index. + for i := range allStorageRefs { + buckets := -1 // No native histogram buckets. + if i == 2 || i == 3 { + buckets = i * 10 // Native histogram with i*10 buckets. + } + activeSeries.UpdateSeries(series[i], allStorageRefs[i], time.Unix(int64(i), 0), buckets) + } + + valid := activeSeries.Purge(mockedTime) + allActive, _, _, _, _, _ := activeSeries.ActiveWithMatchers() + require.True(t, valid) + require.Equal(t, 5, allActive) + + activeSeriesPostings := NewNativeHistogramPostings(activeSeries, storagePostings) + + seriesRef := []storage.SeriesRef{} + bucketCounts := []int{} + for activeSeriesPostings.Next() { + ref, count := activeSeriesPostings.AtBucketCount() + seriesRef = append(seriesRef, ref) + bucketCounts = append(bucketCounts, count) + } + //activeRefs, err := index.ExpandPostings(activeSeriesPostings) + require.NoError(t, activeSeriesPostings.Err()) + + require.Equal(t, allStorageRefs[2:4], seriesRef) + require.Equal(t, []int{20, 30}, bucketCounts) +} + +func TestNativeHistogramPostings_SeekSkipsNonNative(t *testing.T) { + ttl := 3 + mockedTime := time.Unix(int64(ttl), 0) + series := []labels.Labels{ + labels.FromStrings("a", "1"), + labels.FromStrings("a", "2"), + labels.FromStrings("a", "3"), + labels.FromStrings("a", "4"), + labels.FromStrings("a", "5"), + } + allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} + storagePostings := index.NewListPostings(allStorageRefs) + activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl)) + + // Update each series at a different time according to its index. + for i := range allStorageRefs { + buckets := i * 10 + if i+1 == 4 { + buckets = -1 // Make ref==4 not a native histogram to check that Seek skips it. + } + activeSeries.UpdateSeries(series[i], allStorageRefs[i], time.Unix(int64(i), 0), buckets) + } + + valid := activeSeries.Purge(mockedTime) + allActive, _, _, _, _, _ := activeSeries.ActiveWithMatchers() + require.True(t, valid) + require.Equal(t, 2, allActive) + + activeSeriesPostings := NewNativeHistogramPostings(activeSeries, storagePostings) + + // Seek to a series that is not active. + require.True(t, activeSeriesPostings.Seek(3)) + // The next active series is 4, but it's not a native histogram. + require.Equal(t, storage.SeriesRef(5), activeSeriesPostings.At()) + // Check the bucket count as well. + ref, count := activeSeriesPostings.AtBucketCount() + require.Equal(t, storage.SeriesRef(5), ref) + require.Equal(t, 40, count) +} + +func TestNativeHistogramPostings_Seek(t *testing.T) { + ttl := 3 + mockedTime := time.Unix(int64(ttl), 0) + series := []labels.Labels{ + labels.FromStrings("a", "1"), + labels.FromStrings("a", "2"), + labels.FromStrings("a", "3"), + labels.FromStrings("a", "4"), + labels.FromStrings("a", "5"), + } + allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} + storagePostings := index.NewListPostings(allStorageRefs) + activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl)) + + // Update each series at a different time according to its index. + for i := range allStorageRefs { + buckets := i * 10 + activeSeries.UpdateSeries(series[i], allStorageRefs[i], time.Unix(int64(i), 0), buckets) + } + + valid := activeSeries.Purge(mockedTime) + allActive, _, _, _, _, _ := activeSeries.ActiveWithMatchers() + require.True(t, valid) + require.Equal(t, 2, allActive) + + activeSeriesPostings := NewNativeHistogramPostings(activeSeries, storagePostings) + + // Seek to a series that is active. + require.True(t, activeSeriesPostings.Seek(4)) + // The next active series is 4. + require.Equal(t, storage.SeriesRef(4), activeSeriesPostings.At()) + // Check the bucket count as well. + ref, count := activeSeriesPostings.AtBucketCount() + require.Equal(t, storage.SeriesRef(4), ref) + require.Equal(t, 30, count) +} + +func TestNativeHistogramPostings_SeekToEnd(t *testing.T) { + ttl := 5 + mockedTime := time.Unix(int64(ttl), 0) + series := []labels.Labels{ + labels.FromStrings("a", "1"), + labels.FromStrings("a", "2"), + labels.FromStrings("a", "3"), + labels.FromStrings("a", "4"), + labels.FromStrings("a", "5"), + } + allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} + storagePostings := index.NewListPostings(allStorageRefs) + activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl)) + + // Update each series at a different time according to its index. + for i := range allStorageRefs { + activeSeries.UpdateSeries(series[i], allStorageRefs[i], time.Unix(int64(i), 0), 10) + } + + valid := activeSeries.Purge(mockedTime) + allActive, _, _, _, _, _ := activeSeries.ActiveWithMatchers() + require.True(t, valid) + require.Equal(t, 0, allActive) + + activeSeriesPostings := NewNativeHistogramPostings(activeSeries, storagePostings) + + // Seek to a series that is not active. + // There are no active series after 3, so Seek should return false. + require.False(t, activeSeriesPostings.Seek(3)) +} diff --git a/pkg/ingester/activeseries/active_postings_test.go b/pkg/ingester/activeseries/active_postings_test.go index 5a057592942..13a006c5110 100644 --- a/pkg/ingester/activeseries/active_postings_test.go +++ b/pkg/ingester/activeseries/active_postings_test.go @@ -70,7 +70,7 @@ func TestPostings_Seek(t *testing.T) { activeSeriesPostings := NewPostings(activeSeries, storagePostings) - // See to a series that is not active. + // Seek to a series that is not active. require.True(t, activeSeriesPostings.Seek(3)) // The next active series is 4. require.Equal(t, storage.SeriesRef(4), activeSeriesPostings.At()) @@ -102,7 +102,7 @@ func TestPostings_SeekToEnd(t *testing.T) { activeSeriesPostings := NewPostings(activeSeries, storagePostings) - // See to a series that is not active. + // Seek to a series that is not active. // There are no active series after 3, so Seek should return false. require.False(t, activeSeriesPostings.Seek(3)) } diff --git a/pkg/ingester/activeseries/active_series.go b/pkg/ingester/activeseries/active_series.go index 244c4a63b29..befb0896c11 100644 --- a/pkg/ingester/activeseries/active_series.go +++ b/pkg/ingester/activeseries/active_series.go @@ -168,6 +168,11 @@ func (c *ActiveSeries) ContainsRef(ref storage.SeriesRef) bool { return c.stripes[stripeID].containsRef(ref) } +func (c *ActiveSeries) NativeHistogramBuckets(ref storage.SeriesRef) (int, bool) { + stripeID := ref % numStripes + return c.stripes[stripeID].nativeHistogramBuckets(ref) +} + // Active returns the total numbers of active series, active native // histogram series, and buckets of those native histogram series. // This method does not purge expired entries, so Purge should be @@ -213,6 +218,17 @@ func (s *seriesStripe) containsRef(ref storage.SeriesRef) bool { return ok } +// nativeHistogramBuckets returns the active buckets for a series if it is active and is a native histogram series. +func (s *seriesStripe) nativeHistogramBuckets(ref storage.SeriesRef) (int, bool) { + s.mu.RLock() + defer s.mu.RUnlock() + if entry, ok := s.refs[ref]; ok && entry.numNativeHistogramBuckets >= 0 { + return entry.numNativeHistogramBuckets, true + } + + return 0, false +} + func (s *seriesStripe) markDeleted(ref storage.SeriesRef, lbls labels.Labels) { s.mu.Lock() defer s.mu.Unlock()