Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingester: ActiveSeriesRequest can return native histograms information #7986

Merged
merged 6 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 54 additions & 13 deletions pkg/ingester/active_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/dskit/tenant"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/index"

"github.com/grafana/mimir/pkg/ingester/activeseries"
"github.com/grafana/mimir/pkg/ingester/client"
Expand Down Expand Up @@ -56,16 +58,31 @@ func (i *Ingester) ActiveSeries(request *client.ActiveSeriesRequest, stream clie
return nil
}

series, err := listActiveSeries(ctx, db, matchers)
idx, err := db.Head().Index()
if err != nil {
return fmt.Errorf("error getting index: %w", err)
}

isNativeHistogram := request.GetType() == client.NATIVE_HISTOGRAM_SERIES
postings, err := getPostings(ctx, db, idx, matchers, isNativeHistogram)
if err != nil {
return fmt.Errorf("error listing active series: %w", err)
}

buf := labels.NewScratchBuilder(10)
resp := &client.ActiveSeriesResponse{}
currentSize := 0
for series.Next() {
m := &mimirpb.Metric{Labels: mimirpb.FromLabelsToLabelAdapters(series.At())}
for postings.Next() {
seriesRef, count := postings.AtBucketCount()
err = idx.Series(seriesRef, &buf, nil)
if err != nil {
return fmt.Errorf("error getting series: %w", err)
}
m := &mimirpb.Metric{Labels: mimirpb.FromLabelsToLabelAdapters(buf.Labels())}
mSize := m.Size()
if isNativeHistogram {
mSize += 8 // 8 bytes for the bucket count.
}
if currentSize+mSize > activeSeriesMaxSizeBytes {
if err := client.SendActiveSeriesResponse(stream, resp); err != nil {
return fmt.Errorf("error sending response: %w", err)
Expand All @@ -74,9 +91,12 @@ func (i *Ingester) ActiveSeries(request *client.ActiveSeriesRequest, stream clie
currentSize = 0
}
resp.Metric = append(resp.Metric, m)
if isNativeHistogram {
resp.BucketCount = append(resp.BucketCount, uint64(count))
}
currentSize += mSize
}
if err := series.Err(); err != nil {
if err := postings.Err(); err != nil {
return fmt.Errorf("error iterating over series: %w", err)
}

Expand All @@ -89,13 +109,7 @@ func (i *Ingester) ActiveSeries(request *client.ActiveSeriesRequest, stream clie
return nil
}

// listActiveSeries returns an iterator over the active series matching the given matchers.
func listActiveSeries(ctx context.Context, db *userTSDB, matchers []*labels.Matcher) (series *Series, err error) {
idx, err := db.Head().Index()
if err != nil {
return nil, fmt.Errorf("error getting index: %w", err)
}

func getPostings(ctx context.Context, db *userTSDB, idx tsdb.IndexReader, matchers []*labels.Matcher, isNativeHistogram bool) (activeseries.BucketCountPostings, error) {
if db.activeSeries == nil {
return nil, fmt.Errorf("active series tracker is not initialized")
}
Expand All @@ -110,11 +124,38 @@ func listActiveSeries(ctx context.Context, db *userTSDB, matchers []*labels.Matc
return nil, fmt.Errorf("error getting postings: %w", err)
}

postings = activeseries.NewPostings(db.activeSeries, postings)

if shard != nil {
postings = idx.ShardedPostings(postings, shard.ShardIndex, shard.ShardCount)
}

if isNativeHistogram {
return activeseries.NewNativeHistogramPostings(db.activeSeries, postings), nil
}

return &ZeroBucketCountPostings{*activeseries.NewPostings(db.activeSeries, postings)}, nil
}

// listActiveSeries is used for testing purposes, builds the whole array of active series in memory.
func listActiveSeries(ctx context.Context, db *userTSDB, matchers []*labels.Matcher) (series *Series, err error) {
idx, err := db.Head().Index()
if err != nil {
return nil, fmt.Errorf("error getting index: %w", err)
}
postings, err := getPostings(ctx, db, idx, matchers, false)
if err != nil {
return nil, err
}
return NewSeries(postings, idx), nil
}

type ZeroBucketCountPostings struct {
activeseries.Postings
}

func (z *ZeroBucketCountPostings) AtBucketCount() (storage.SeriesRef, int) {
return z.At(), 0
}

// Type check.
var _ index.Postings = &ZeroBucketCountPostings{}
var _ activeseries.BucketCountPostings = &ZeroBucketCountPostings{}
77 changes: 75 additions & 2 deletions pkg/ingester/active_series_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (

"github.com/grafana/mimir/pkg/ingester/client"
"github.com/grafana/mimir/pkg/mimirpb"
util_test "github.com/grafana/mimir/pkg/util/test"
"github.com/grafana/mimir/pkg/util/validation"
)

func TestIngester_ActiveSeries(t *testing.T) {
Expand Down Expand Up @@ -46,7 +48,7 @@ func TestIngester_ActiveSeries(t *testing.T) {
}

// Write the series.
ingesterClient := prepareHealthyIngester(t)
ingesterClient := prepareHealthyIngester(t, nil)
ctx := user.InjectOrgID(context.Background(), userID)
_, err := ingesterClient.Push(ctx, writeReq)
require.NoError(t, err)
Expand All @@ -65,21 +67,92 @@ func TestIngester_ActiveSeries(t *testing.T) {
returnedSeriesCount := 0
for _, res := range server.responses {
returnedSeriesCount += len(res.Metric)
// Check that all series have the expected number of labels.
for _, m := range res.Metric {
assert.Equal(t, 2, len(m.Labels))
}
}
assert.Equal(t, len(writeReq.Timeseries), returnedSeriesCount)

// Check that we got the correct number of messages.
assert.Equal(t, expectedMessageCount, len(server.responses))
}

func TestIngester_ActiveNativeHistogramSeries(t *testing.T) {
samples := []mimirpb.Sample{{TimestampMs: 1_000, Value: 1}}
histograms := []mimirpb.Histogram{mimirpb.FromHistogramToHistogramProto(1_000, util_test.GenerateTestHistogram(1))}

seriesWithLabelsOfSize := func(size, index int, isHistogram bool) mimirpb.PreallocTimeseries {
// 24 bytes of static strings and slice overhead, the remaining bytes are used to
// pad the value of the "lbl" label.
require.Greater(t, size, 24, "minimum message size is 24 bytes")
tpl := fmt.Sprintf("%%0%dd", size-24)
ts := &mimirpb.TimeSeries{
Labels: mimirpb.FromLabelsToLabelAdapters(labels.FromStrings(labels.MetricName, "test", "lbl", fmt.Sprintf(tpl, index))),
}
if isHistogram {
ts.Histograms = histograms
} else {
ts.Samples = samples
}
return mimirpb.PreallocTimeseries{TimeSeries: ts}
}

expectedMessageCount := 4
totalSeriesSize := expectedMessageCount * activeSeriesMaxSizeBytes

writeReq := &mimirpb.WriteRequest{Source: mimirpb.API}
currentSize := 0
for i := 0; currentSize < totalSeriesSize; i++ {
isHistogram := i%2 != 0 // Half of the series will be float and the other half will be native histograms.
s := seriesWithLabelsOfSize(1024, i, isHistogram)
writeReq.Timeseries = append(writeReq.Timeseries, s)
if isHistogram {
currentSize += s.Size()
}
}

// Write the series.
ingesterClient := prepareHealthyIngester(t, func(limits *validation.Limits) { limits.NativeHistogramsIngestionEnabled = true })
ctx := user.InjectOrgID(context.Background(), userID)
_, err := ingesterClient.Push(ctx, writeReq)
require.NoError(t, err)

// Get active series
req, err := client.ToActiveSeriesRequest([]*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "test"),
})
req.Type = client.NATIVE_HISTOGRAM_SERIES
require.NoError(t, err)

server := &mockActiveSeriesServer{ctx: ctx}
err = ingesterClient.ActiveSeries(req, server)
require.NoError(t, err)

// Check that all series were returned.
returnedSeriesCount := 0
for _, res := range server.responses {
returnedSeriesCount += len(res.Metric)
// Check that all series have a corresponding bucket count.
assert.Equal(t, len(res.Metric), len(res.BucketCount), "All series should have a bucket count.")
for _, bc := range res.BucketCount {
assert.Equal(t, uint64(8), bc)
}
}
assert.Equal(t, len(writeReq.Timeseries)/2, returnedSeriesCount)

// Check that we got the correct number of messages.
assert.Equal(t, expectedMessageCount, len(server.responses))
}

func BenchmarkIngester_ActiveSeries(b *testing.B) {
const (
userID = "test"
numSeries = 2e6
metricName = "metric_name"
)

in := prepareHealthyIngester(b)
in := prepareHealthyIngester(b, nil)
ctx := user.InjectOrgID(context.Background(), userID)

samples := []mimirpb.Sample{{TimestampMs: 1_000, Value: 1}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ import (
"github.com/prometheus/prometheus/tsdb/index"
)

type BucketCountPostings interface {
index.Postings
// AtBucketCount returns the series reference currently pointed to and its bucket count (if it's a native histogram series).
AtBucketCount() (storage.SeriesRef, int)
}

// NativeHistogramPostings is a wrapper around ActiveSeries and index.Postings.
// Similar to Postings, but filters its output to native histogram series.
// Implements index.Postings interface and returns only series references that
Expand All @@ -29,6 +35,7 @@ func NewNativeHistogramPostings(activeSeries *ActiveSeries, postings index.Posti

// Type check.
var _ index.Postings = &NativeHistogramPostings{}
var _ BucketCountPostings = &NativeHistogramPostings{}

// At implements index.Postings.
func (a *NativeHistogramPostings) At() storage.SeriesRef {
Expand Down
Loading
Loading