Skip to content

Commit

Permalink
ingester: active_native_histograms_postings (#7982)
Browse files Browse the repository at this point in the history
* ingester: active_native_histograms_postings

Be able to return active native histograms postings so that
we can list them on an API later.

Mostly copy paste from active_postings.

Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
  • Loading branch information
krajorama committed Apr 26, 2024
1 parent cf2c9db commit 36b35d1
Show file tree
Hide file tree
Showing 4 changed files with 295 additions and 2 deletions.
76 changes: 76 additions & 0 deletions pkg/ingester/activeseries/active_native_histogram_postings.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// SPDX-License-Identifier: AGPL-3.0-only

package activeseries

import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/index"
)

// NativeHistogramPostings is a wrapper around ActiveSeries and index.Postings.
// Similar to Postings, but filters its output to native histogram series.
// Implements index.Postings interface and returns only series references that
// are active in ActiveSeries and are native histograms.
// It is only valid to use NativeHistogramPostings if the postings are from the
// open TSDB head. It is not valid to use NativeHistogramPostings if the
// postings are from a block.
type NativeHistogramPostings struct {
activeSeries *ActiveSeries
postings index.Postings
currentBucketCount int
}

func NewNativeHistogramPostings(activeSeries *ActiveSeries, postings index.Postings) *NativeHistogramPostings {
return &NativeHistogramPostings{
activeSeries: activeSeries,
postings: postings,
}
}

// Type check.
var _ index.Postings = &NativeHistogramPostings{}

// At implements index.Postings.
func (a *NativeHistogramPostings) At() storage.SeriesRef {
return a.postings.At()
}

// AtBucketCount returns the current bucket count for the series reference at the current position.
func (a *NativeHistogramPostings) AtBucketCount() (storage.SeriesRef, int) {
return a.postings.At(), a.currentBucketCount
}

// Err implements index.Postings.
func (a *NativeHistogramPostings) Err() error {
return a.postings.Err()
}

// Next implements index.Postings.
func (a *NativeHistogramPostings) Next() bool {
for a.postings.Next() {
if count, ok := a.activeSeries.NativeHistogramBuckets(a.postings.At()); ok {
a.currentBucketCount = count
return true
}
}
return false
}

// Seek implements index.Postings.
func (a *NativeHistogramPostings) Seek(v storage.SeriesRef) bool {
// Seek in the underlying postings.
// If the underlying postings don't contain a value, return false.
if !a.postings.Seek(v) {
return false
}

// If the underlying postings contain a value, check if it's active.
if count, ok := a.activeSeries.NativeHistogramBuckets(a.postings.At()); ok {
a.currentBucketCount = count
return true
}

// If the underlying postings contain a value, but it's not active,
// seek to the next active value.
return a.Next()
}
201 changes: 201 additions & 0 deletions pkg/ingester/activeseries/active_native_histogram_postings_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
// SPDX-License-Identifier: AGPL-3.0-only

package activeseries

import (
"testing"
"time"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/stretchr/testify/require"
)

func TestNativeHistogramPostings_Expand(t *testing.T) {
ttl := 3
mockedTime := time.Unix(int64(ttl), 0)
series := []labels.Labels{
labels.FromStrings("a", "1"),
labels.FromStrings("a", "2"),
labels.FromStrings("a", "3"), // Will make this series a native histogram.
labels.FromStrings("a", "4"), // Will make this series a native histogram.
labels.FromStrings("a", "5"),
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl))

// Update each series at a different time according to its index.
for i := range allStorageRefs {
buckets := -1 // No native histogram buckets.
if i+1 == 3 || i+1 == 4 {
buckets = 10 // Native histogram with 10 buckets.
}
activeSeries.UpdateSeries(series[i], allStorageRefs[i], time.Unix(int64(i), 0), buckets)
}

valid := activeSeries.Purge(mockedTime)
allActive, _, _, _, _, _ := activeSeries.ActiveWithMatchers()
require.True(t, valid)
require.Equal(t, 2, allActive)

activeSeriesPostings := NewNativeHistogramPostings(activeSeries, storagePostings)

activeRefs, err := index.ExpandPostings(activeSeriesPostings)
require.NoError(t, err)

require.Equal(t, allStorageRefs[3:4], activeRefs)
}

func TestNativeHistogramPostings_ExpandWithBucketCount(t *testing.T) {
ttl := 0
mockedTime := time.Unix(int64(ttl), 0)
series := []labels.Labels{
labels.FromStrings("a", "1"),
labels.FromStrings("a", "2"),
labels.FromStrings("a", "3"), // Will make this series a native histogram.
labels.FromStrings("a", "4"), // Will make this series a native histogram.
labels.FromStrings("a", "5"),
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl))

// Update each series at a different time according to its index.
for i := range allStorageRefs {
buckets := -1 // No native histogram buckets.
if i == 2 || i == 3 {
buckets = i * 10 // Native histogram with i*10 buckets.
}
activeSeries.UpdateSeries(series[i], allStorageRefs[i], time.Unix(int64(i), 0), buckets)
}

valid := activeSeries.Purge(mockedTime)
allActive, _, _, _, _, _ := activeSeries.ActiveWithMatchers()
require.True(t, valid)
require.Equal(t, 5, allActive)

activeSeriesPostings := NewNativeHistogramPostings(activeSeries, storagePostings)

seriesRef := []storage.SeriesRef{}
bucketCounts := []int{}
for activeSeriesPostings.Next() {
ref, count := activeSeriesPostings.AtBucketCount()
seriesRef = append(seriesRef, ref)
bucketCounts = append(bucketCounts, count)
}
//activeRefs, err := index.ExpandPostings(activeSeriesPostings)
require.NoError(t, activeSeriesPostings.Err())

require.Equal(t, allStorageRefs[2:4], seriesRef)
require.Equal(t, []int{20, 30}, bucketCounts)
}

func TestNativeHistogramPostings_SeekSkipsNonNative(t *testing.T) {
ttl := 3
mockedTime := time.Unix(int64(ttl), 0)
series := []labels.Labels{
labels.FromStrings("a", "1"),
labels.FromStrings("a", "2"),
labels.FromStrings("a", "3"),
labels.FromStrings("a", "4"),
labels.FromStrings("a", "5"),
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl))

// Update each series at a different time according to its index.
for i := range allStorageRefs {
buckets := i * 10
if i+1 == 4 {
buckets = -1 // Make ref==4 not a native histogram to check that Seek skips it.
}
activeSeries.UpdateSeries(series[i], allStorageRefs[i], time.Unix(int64(i), 0), buckets)
}

valid := activeSeries.Purge(mockedTime)
allActive, _, _, _, _, _ := activeSeries.ActiveWithMatchers()
require.True(t, valid)
require.Equal(t, 2, allActive)

activeSeriesPostings := NewNativeHistogramPostings(activeSeries, storagePostings)

// Seek to a series that is not active.
require.True(t, activeSeriesPostings.Seek(3))
// The next active series is 4, but it's not a native histogram.
require.Equal(t, storage.SeriesRef(5), activeSeriesPostings.At())
// Check the bucket count as well.
ref, count := activeSeriesPostings.AtBucketCount()
require.Equal(t, storage.SeriesRef(5), ref)
require.Equal(t, 40, count)
}

func TestNativeHistogramPostings_Seek(t *testing.T) {
ttl := 3
mockedTime := time.Unix(int64(ttl), 0)
series := []labels.Labels{
labels.FromStrings("a", "1"),
labels.FromStrings("a", "2"),
labels.FromStrings("a", "3"),
labels.FromStrings("a", "4"),
labels.FromStrings("a", "5"),
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl))

// Update each series at a different time according to its index.
for i := range allStorageRefs {
buckets := i * 10
activeSeries.UpdateSeries(series[i], allStorageRefs[i], time.Unix(int64(i), 0), buckets)
}

valid := activeSeries.Purge(mockedTime)
allActive, _, _, _, _, _ := activeSeries.ActiveWithMatchers()
require.True(t, valid)
require.Equal(t, 2, allActive)

activeSeriesPostings := NewNativeHistogramPostings(activeSeries, storagePostings)

// Seek to a series that is active.
require.True(t, activeSeriesPostings.Seek(4))
// The next active series is 4.
require.Equal(t, storage.SeriesRef(4), activeSeriesPostings.At())
// Check the bucket count as well.
ref, count := activeSeriesPostings.AtBucketCount()
require.Equal(t, storage.SeriesRef(4), ref)
require.Equal(t, 30, count)
}

func TestNativeHistogramPostings_SeekToEnd(t *testing.T) {
ttl := 5
mockedTime := time.Unix(int64(ttl), 0)
series := []labels.Labels{
labels.FromStrings("a", "1"),
labels.FromStrings("a", "2"),
labels.FromStrings("a", "3"),
labels.FromStrings("a", "4"),
labels.FromStrings("a", "5"),
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl))

// Update each series at a different time according to its index.
for i := range allStorageRefs {
activeSeries.UpdateSeries(series[i], allStorageRefs[i], time.Unix(int64(i), 0), 10)
}

valid := activeSeries.Purge(mockedTime)
allActive, _, _, _, _, _ := activeSeries.ActiveWithMatchers()
require.True(t, valid)
require.Equal(t, 0, allActive)

activeSeriesPostings := NewNativeHistogramPostings(activeSeries, storagePostings)

// Seek to a series that is not active.
// There are no active series after 3, so Seek should return false.
require.False(t, activeSeriesPostings.Seek(3))
}
4 changes: 2 additions & 2 deletions pkg/ingester/activeseries/active_postings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestPostings_Seek(t *testing.T) {

activeSeriesPostings := NewPostings(activeSeries, storagePostings)

// See to a series that is not active.
// Seek to a series that is not active.
require.True(t, activeSeriesPostings.Seek(3))
// The next active series is 4.
require.Equal(t, storage.SeriesRef(4), activeSeriesPostings.At())
Expand Down Expand Up @@ -102,7 +102,7 @@ func TestPostings_SeekToEnd(t *testing.T) {

activeSeriesPostings := NewPostings(activeSeries, storagePostings)

// See to a series that is not active.
// Seek to a series that is not active.
// There are no active series after 3, so Seek should return false.
require.False(t, activeSeriesPostings.Seek(3))
}
16 changes: 16 additions & 0 deletions pkg/ingester/activeseries/active_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ func (c *ActiveSeries) ContainsRef(ref storage.SeriesRef) bool {
return c.stripes[stripeID].containsRef(ref)
}

func (c *ActiveSeries) NativeHistogramBuckets(ref storage.SeriesRef) (int, bool) {
stripeID := ref % numStripes
return c.stripes[stripeID].nativeHistogramBuckets(ref)
}

// Active returns the total numbers of active series, active native
// histogram series, and buckets of those native histogram series.
// This method does not purge expired entries, so Purge should be
Expand Down Expand Up @@ -213,6 +218,17 @@ func (s *seriesStripe) containsRef(ref storage.SeriesRef) bool {
return ok
}

// nativeHistogramBuckets returns the active buckets for a series if it is active and is a native histogram series.
func (s *seriesStripe) nativeHistogramBuckets(ref storage.SeriesRef) (int, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
if entry, ok := s.refs[ref]; ok && entry.numNativeHistogramBuckets >= 0 {
return entry.numNativeHistogramBuckets, true
}

return 0, false
}

func (s *seriesStripe) markDeleted(ref storage.SeriesRef, lbls labels.Labels) {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down

0 comments on commit 36b35d1

Please sign in to comment.