From b85f452ce010ea8b397d7394ce699fa3eb393f5d Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Mon, 6 Mar 2023 17:48:02 +0530 Subject: [PATCH] Support native histograms in pkg/ruler Signed-off-by: Ganesh Vernekar --- pkg/ruler/compat.go | 28 +++++-- pkg/ruler/compat_test.go | 160 ++++++++++++++++++++++++++++++++------- 2 files changed, 154 insertions(+), 34 deletions(-) diff --git a/pkg/ruler/compat.go b/pkg/ruler/compat.go index 371a79682d9..0d3e4c6b797 100644 --- a/pkg/ruler/compat.go +++ b/pkg/ruler/compat.go @@ -41,11 +41,13 @@ type PusherAppender struct { failedWrites prometheus.Counter totalWrites prometheus.Counter - ctx context.Context - pusher Pusher - labels []labels.Labels - samples []mimirpb.Sample - userID string + ctx context.Context + pusher Pusher + labels []labels.Labels + samples []mimirpb.Sample + histogramLabels []labels.Labels + histograms []mimirpb.Histogram + userID string } func (a *PusherAppender) Append(_ storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { @@ -65,8 +67,16 @@ func (a *PusherAppender) UpdateMetadata(_ storage.SeriesRef, _ labels.Labels, _ return 0, errors.New("metadata updates are unsupported") } -func (a *PusherAppender) AppendHistogram(_ storage.SeriesRef, _ labels.Labels, _ int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) { - return 0, errors.New("histograms are unsupported") +func (a *PusherAppender) AppendHistogram(_ storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + a.histogramLabels = append(a.histogramLabels, l) + var hp mimirpb.Histogram + if h != nil { + hp = mimirpb.FromHistogramToHistogramProto(t, h) + } else { + hp = mimirpb.FromFloatHistogramToHistogramProto(t, fh) + } + a.histograms = append(a.histograms, hp) + return 0, nil } func (a *PusherAppender) Commit() error { @@ -74,7 +84,9 @@ func (a *PusherAppender) Commit() error { // Since a.pusher is distributor, client.ReuseSlice will be called in a.pusher.Push. // We shouldn't call client.ReuseSlice here. - _, err := a.pusher.Push(user.InjectOrgID(a.ctx, a.userID), mimirpb.ToWriteRequest(a.labels, a.samples, nil, nil, mimirpb.RULE)) + req := mimirpb.ToWriteRequest(a.labels, a.samples, nil, nil, mimirpb.RULE) + req.AddHistogramSeries(a.histogramLabels, a.histograms, nil) + _, err := a.pusher.Push(user.InjectOrgID(a.ctx, a.userID), req) if err != nil { // Don't report errors that ended with 4xx HTTP status code (series limits, duplicate samples, out of order, etc.) diff --git a/pkg/ruler/compat_test.go b/pkg/ruler/compat_test.go index 5e700d4b12b..b8f1ad18a7d 100644 --- a/pkg/ruler/compat_test.go +++ b/pkg/ruler/compat_test.go @@ -18,6 +18,8 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/rulefmt" "github.com/prometheus/prometheus/model/value" "github.com/prometheus/prometheus/notifier" @@ -30,6 +32,7 @@ import ( "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/ruler/rulespb" + "github.com/grafana/mimir/pkg/util/test" "github.com/grafana/mimir/pkg/util/validation" ) @@ -48,52 +51,157 @@ func TestPusherAppendable(t *testing.T) { pusher := &fakePusher{} pa := NewPusherAppendable(pusher, "user-1", nil, promauto.With(nil).NewCounter(prometheus.CounterOpts{}), promauto.With(nil).NewCounter(prometheus.CounterOpts{})) + type sample struct { + series string + value float64 + histogram *histogram.Histogram + floatHistogram *histogram.FloatHistogram + ts int64 + } + for _, tc := range []struct { - name string - series string - value float64 - expectedTS int64 + name string + hasNanSample bool // If true, it will be a single float sample with NaN. + samples []sample }{ { - name: "tenant without delay, normal value", - series: "foo_bar", - value: 1.234, - expectedTS: 120_000, + name: "tenant without delay, normal value", + samples: []sample{ + { + series: "foo_bar", + value: 1.234, + ts: 120_000, + }, + }, }, { - name: "tenant without delay, stale nan value", - series: "foo_bar", - value: math.Float64frombits(value.StaleNaN), - expectedTS: 120_000, + name: "tenant without delay, stale nan value", + hasNanSample: true, + samples: []sample{ + { + series: "foo_bar", + value: math.Float64frombits(value.StaleNaN), + ts: 120_000, + }, + }, }, { - name: "ALERTS, normal value", - series: `ALERTS{alertname="boop"}`, - value: 1.234, - expectedTS: 120_000, + name: "ALERTS, normal value", + samples: []sample{ + { + series: `ALERTS{alertname="boop"}`, + value: 1.234, + ts: 120_000, + }, + }, }, { - name: "ALERTS, stale nan value", - series: `ALERTS{alertname="boop"}`, - value: math.Float64frombits(value.StaleNaN), - expectedTS: 120_000, + name: "ALERTS, stale nan value", + hasNanSample: true, + samples: []sample{ + { + series: `ALERTS{alertname="boop"}`, + value: math.Float64frombits(value.StaleNaN), + ts: 120_000, + }, + }, + }, + { + name: "tenant without delay, histogram value", + samples: []sample{ + { + series: "foo_bar", + histogram: test.GenerateTestHistogram(10), + ts: 200_000, + }, + }, + }, + { + name: "tenant without delay, float histogram value", + samples: []sample{ + { + series: "foo_bar", + floatHistogram: test.GenerateTestFloatHistogram(10), + ts: 230_000, + }, + }, + }, + { + name: "mix of float and float histogram", + samples: []sample{ + { + series: "foo_bar1", + value: 999, + ts: 230_000, + }, + { + series: "foo_bar3", + value: 888, + ts: 230_000, + }, + { + series: "foo_bar2", + floatHistogram: test.GenerateTestFloatHistogram(10), + ts: 230_000, + }, + { + series: "foo_bar4", + floatHistogram: test.GenerateTestFloatHistogram(99), + ts: 230_000, + }, + }, }, } { t.Run(tc.name, func(t *testing.T) { ctx := context.Background() - lbls, err := parser.ParseMetric(tc.series) - require.NoError(t, err) + var expReq []mimirpb.PreallocTimeseries pusher.response = &mimirpb.WriteResponse{} a := pa.Appender(ctx) - _, err = a.Append(0, lbls, 120_000, tc.value) - require.NoError(t, err) - + for _, sm := range tc.samples { + lbls, err := parser.ParseMetric(sm.series) + require.NoError(t, err) + timeseries := mimirpb.PreallocTimeseries{ + TimeSeries: &mimirpb.TimeSeries{ + Labels: mimirpb.FromLabelsToLabelAdapters(lbls), + Exemplars: []mimirpb.Exemplar{}, + Samples: []mimirpb.Sample{}, + }, + } + expReq = append(expReq, timeseries) + + if sm.histogram != nil || sm.floatHistogram != nil { + _, err = a.AppendHistogram(0, lbls, sm.ts, sm.histogram, sm.floatHistogram) + if sm.histogram != nil { + timeseries.Histograms = append(timeseries.Histograms, mimirpb.FromHistogramToHistogramProto(sm.ts, sm.histogram)) + } else { + timeseries.Histograms = append(timeseries.Histograms, mimirpb.FromFloatHistogramToHistogramProto(sm.ts, sm.floatHistogram)) + } + } else { + _, err = a.Append(0, lbls, sm.ts, sm.value) + timeseries.Samples = append(timeseries.Samples, mimirpb.Sample{ + TimestampMs: sm.ts, + Value: sm.value, + }) + } + require.NoError(t, err) + } require.NoError(t, a.Commit()) - require.Equal(t, tc.expectedTS, pusher.request.Timeseries[0].Samples[0].TimestampMs) + if !tc.hasNanSample { + require.Equal(t, expReq, pusher.request.Timeseries) + return + } + // For NaN, we cannot use require.Equal. + require.Len(t, pusher.request.Timeseries, 1) + require.Len(t, pusher.request.Timeseries[0].Samples, 1) + lbls, err := parser.ParseMetric(tc.samples[0].series) + require.NoError(t, err) + require.Equal(t, 0, labels.Compare(mimirpb.FromLabelAdaptersToLabels(pusher.request.Timeseries[0].Labels), lbls)) + require.Equal(t, tc.samples[0].ts, pusher.request.Timeseries[0].Samples[0].TimestampMs) + require.True(t, math.IsNaN(pusher.request.Timeseries[0].Samples[0].Value)) }) } }