Skip to content

Commit

Permalink
ingester: ActiveSeriesRequest can return native histograms information
Browse files Browse the repository at this point in the history
Allow ActiveSeriesRequest to return only native histograms and
include active bucket count in the label __nh_bucket_count__

Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
  • Loading branch information
krajorama committed Apr 29, 2024
1 parent f739d47 commit e19a6b2
Show file tree
Hide file tree
Showing 7 changed files with 345 additions and 147 deletions.
64 changes: 51 additions & 13 deletions pkg/ingester/active_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/dskit/tenant"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/index"

"github.com/grafana/mimir/pkg/ingester/activeseries"
"github.com/grafana/mimir/pkg/ingester/client"
Expand Down Expand Up @@ -56,15 +58,30 @@ func (i *Ingester) ActiveSeries(request *client.ActiveSeriesRequest, stream clie
return nil
}

series, err := listActiveSeries(ctx, db, matchers)
idx, err := db.Head().Index()
if err != nil {
return fmt.Errorf("error getting index: %w", err)
}

isNativeHistogram := request.GetType() == client.NATIVE_HISTOGRAM_SERIES
postings, err := getPostings(ctx, db, idx, matchers, isNativeHistogram)
if err != nil {
return fmt.Errorf("error listing active series: %w", err)
}

buf := labels.NewScratchBuilder(10)
resp := &client.ActiveSeriesResponse{}
currentSize := 0
for series.Next() {
m := &mimirpb.Metric{Labels: mimirpb.FromLabelsToLabelAdapters(series.At())}
for postings.Next() {
seriesRef, count := postings.AtBucketCount()
err = idx.Series(seriesRef, &buf, nil)
if err != nil {
return fmt.Errorf("error getting series: %w", err)
}
if isNativeHistogram {
buf.Add("__nh_bucket_count__", fmt.Sprintf("%d", count))
}
m := &mimirpb.Metric{Labels: mimirpb.FromLabelsToLabelAdapters(buf.Labels())}
mSize := m.Size()
if currentSize+mSize > activeSeriesMaxSizeBytes {
if err := client.SendActiveSeriesResponse(stream, resp); err != nil {
Expand All @@ -76,7 +93,7 @@ func (i *Ingester) ActiveSeries(request *client.ActiveSeriesRequest, stream clie
resp.Metric = append(resp.Metric, m)
currentSize += mSize
}
if err := series.Err(); err != nil {
if err := postings.Err(); err != nil {
return fmt.Errorf("error iterating over series: %w", err)
}

Expand All @@ -89,13 +106,7 @@ func (i *Ingester) ActiveSeries(request *client.ActiveSeriesRequest, stream clie
return nil
}

// listActiveSeries returns an iterator over the active series matching the given matchers.
func listActiveSeries(ctx context.Context, db *userTSDB, matchers []*labels.Matcher) (series *Series, err error) {
idx, err := db.Head().Index()
if err != nil {
return nil, fmt.Errorf("error getting index: %w", err)
}

func getPostings(ctx context.Context, db *userTSDB, idx tsdb.IndexReader, matchers []*labels.Matcher, isNativeHistogram bool) (activeseries.BucketCountPostings, error) {
if db.activeSeries == nil {
return nil, fmt.Errorf("active series tracker is not initialized")
}
Expand All @@ -110,11 +121,38 @@ func listActiveSeries(ctx context.Context, db *userTSDB, matchers []*labels.Matc
return nil, fmt.Errorf("error getting postings: %w", err)
}

postings = activeseries.NewPostings(db.activeSeries, postings)

if shard != nil {
postings = idx.ShardedPostings(postings, shard.ShardIndex, shard.ShardCount)
}

if isNativeHistogram {
return activeseries.NewNativeHistogramPostings(db.activeSeries, postings), nil
}

return &ZeroBucketCountPostings{*activeseries.NewPostings(db.activeSeries, postings)}, nil
}

// listActiveSeries is used for testing purposes, builds the whole array of active series in memory.
func listActiveSeries(ctx context.Context, db *userTSDB, matchers []*labels.Matcher) (series *Series, err error) {
idx, err := db.Head().Index()
if err != nil {
return nil, fmt.Errorf("error getting index: %w", err)
}
postings, err := getPostings(ctx, db, idx, matchers, false)
if err != nil {
return nil, err
}
return NewSeries(postings, idx), nil
}

type ZeroBucketCountPostings struct {
activeseries.Postings
}

func (z *ZeroBucketCountPostings) AtBucketCount() (storage.SeriesRef, int) {
return z.At(), 0
}

// Type check.
var _ index.Postings = &ZeroBucketCountPostings{}
var _ activeseries.BucketCountPostings = &ZeroBucketCountPostings{}
77 changes: 75 additions & 2 deletions pkg/ingester/active_series_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (

"github.com/grafana/mimir/pkg/ingester/client"
"github.com/grafana/mimir/pkg/mimirpb"
util_test "github.com/grafana/mimir/pkg/util/test"
"github.com/grafana/mimir/pkg/util/validation"
)

func TestIngester_ActiveSeries(t *testing.T) {
Expand Down Expand Up @@ -46,7 +48,7 @@ func TestIngester_ActiveSeries(t *testing.T) {
}

// Write the series.
ingesterClient := prepareHealthyIngester(t)
ingesterClient := prepareHealthyIngester(t, nil)
ctx := user.InjectOrgID(context.Background(), userID)
_, err := ingesterClient.Push(ctx, writeReq)
require.NoError(t, err)
Expand All @@ -65,21 +67,92 @@ func TestIngester_ActiveSeries(t *testing.T) {
returnedSeriesCount := 0
for _, res := range server.responses {
returnedSeriesCount += len(res.Metric)
// Check that all series has the expected number of labels.
for _, m := range res.Metric {
assert.Equal(t, 2, len(m.Labels))
}
}
assert.Equal(t, len(writeReq.Timeseries), returnedSeriesCount)

// Check that we got the correct number of messages.
assert.Equal(t, expectedMessageCount, len(server.responses))
}

func TestIngester_ActiveNativeHistogramSeries(t *testing.T) {
samples := []mimirpb.Sample{{TimestampMs: 1_000, Value: 1}}
histograms := []mimirpb.Histogram{mimirpb.FromHistogramToHistogramProto(1_000, util_test.GenerateTestHistogram(1))}

seriesWithLabelsOfSize := func(size, index int, isHistogram bool) mimirpb.PreallocTimeseries {
// 24 bytes of static strings and slice overhead, the remaining bytes are used to
// pad the value of the "lbl" label.
require.Greater(t, size, 24, "minimum message size is 24 bytes")
tpl := fmt.Sprintf("%%0%dd", size-24)
ts := &mimirpb.TimeSeries{
Labels: mimirpb.FromLabelsToLabelAdapters(labels.FromStrings(labels.MetricName, "test", "lbl", fmt.Sprintf(tpl, index))),
}
if isHistogram {
ts.Histograms = histograms
} else {
ts.Samples = samples
}
return mimirpb.PreallocTimeseries{TimeSeries: ts}
}

expectedMessageCount := 4
totalSeriesSize := expectedMessageCount * activeSeriesMaxSizeBytes

writeReq := &mimirpb.WriteRequest{Source: mimirpb.API}
currentSize := 0
for i := 0; currentSize < totalSeriesSize; i++ {
isHistogram := i%2 != 0 // Half of the series will be float and the other half will be native histograms.
s := seriesWithLabelsOfSize(1024, i, isHistogram)
writeReq.Timeseries = append(writeReq.Timeseries, s)
if isHistogram {
currentSize += s.Size()
}
}

// Write the series.
ingesterClient := prepareHealthyIngester(t, func(limits *validation.Limits) { limits.NativeHistogramsIngestionEnabled = true })
ctx := user.InjectOrgID(context.Background(), userID)
_, err := ingesterClient.Push(ctx, writeReq)
require.NoError(t, err)

// Get active series
req, err := client.ToActiveSeriesRequest([]*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "test"),
})
req.Type = client.NATIVE_HISTOGRAM_SERIES
require.NoError(t, err)

server := &mockActiveSeriesServer{ctx: ctx}
err = ingesterClient.ActiveSeries(req, server)
require.NoError(t, err)

// Check that all series were returned.
returnedSeriesCount := 0
for _, res := range server.responses {
returnedSeriesCount += len(res.Metric)
// Check that all series contain the special label "__nh_bucket_count__".
for _, m := range res.Metric {
assert.Equal(t, 3, len(m.Labels))
assert.Equal(t, "8", mimirpb.FromLabelAdaptersToLabels(m.Labels).Get("__nh_bucket_count__"))
}
}
assert.Equal(t, len(writeReq.Timeseries)/2, returnedSeriesCount)

// Check that we got the correct number of messages.
assert.Equal(t, expectedMessageCount, len(server.responses))
}

func BenchmarkIngester_ActiveSeries(b *testing.B) {
const (
userID = "test"
numSeries = 2e6
metricName = "metric_name"
)

in := prepareHealthyIngester(b)
in := prepareHealthyIngester(b, nil)
ctx := user.InjectOrgID(context.Background(), userID)

samples := []mimirpb.Sample{{TimestampMs: 1_000, Value: 1}}
Expand Down
8 changes: 8 additions & 0 deletions pkg/ingester/activeseries/active_native_histogram_postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ import (
"github.com/prometheus/prometheus/tsdb/index"
)

type BucketCountPostings interface {
index.Postings
// NativeHistogramBuckets returns the number of buckets for the series reference.
// If the series reference is not active, it returns false.
AtBucketCount() (storage.SeriesRef, int)
}

// NativeHistogramPostings is a wrapper around ActiveSeries and index.Postings.
// Similar to Postings, but filters its output to native histogram series.
// Implements index.Postings interface and returns only series references that
Expand All @@ -29,6 +36,7 @@ func NewNativeHistogramPostings(activeSeries *ActiveSeries, postings index.Posti

// Type check.
var _ index.Postings = &NativeHistogramPostings{}
var _ BucketCountPostings = &NativeHistogramPostings{}

// At implements index.Postings.
func (a *NativeHistogramPostings) At() storage.SeriesRef {
Expand Down
Loading

0 comments on commit e19a6b2

Please sign in to comment.