diff --git a/integration/e2emimir/client.go b/integration/e2emimir/client.go index f61de8b04fe..e2c9fbb023e 100644 --- a/integration/e2emimir/client.go +++ b/integration/e2emimir/client.go @@ -33,6 +33,7 @@ import ( yaml "gopkg.in/yaml.v3" "github.com/grafana/mimir/pkg/alertmanager" + "github.com/grafana/mimir/pkg/cardinality" "github.com/grafana/mimir/pkg/distributor" "github.com/grafana/mimir/pkg/frontend/querymiddleware" "github.com/grafana/mimir/pkg/mimirpb" @@ -591,6 +592,60 @@ func (c *Client) ActiveSeries(selector string, options ...ActiveSeriesOption) (* return res, nil } +func (c *Client) ActiveNativeHistogramMetrics(selector string, options ...ActiveSeriesOption) (*cardinality.ActiveNativeHistogramMetricsResponse, error) { + cfg := activeSeriesRequestConfig{method: http.MethodGet, header: http.Header{"X-Scope-OrgID": []string{c.orgID}}} + for _, option := range options { + option(&cfg) + } + + req, err := http.NewRequest(cfg.method, fmt.Sprintf("http://%s/prometheus/api/v1/cardinality/active_native_histogram_metrics", c.querierAddress), nil) + if err != nil { + return nil, err + } + req.Header = cfg.header + + q := req.URL.Query() + q.Set("selector", selector) + switch cfg.method { + case http.MethodGet: + req.URL.RawQuery = q.Encode() + case http.MethodPost: + req.Header.Add("Content-Type", "application/x-www-form-urlencoded") + req.Body = io.NopCloser(strings.NewReader(q.Encode())) + default: + return nil, fmt.Errorf("invalid method %s", cfg.method) + } + + ctx, cancel := context.WithTimeout(context.Background(), c.timeout) + defer cancel() + + resp, err := c.httpClient.Do(req.WithContext(ctx)) + if err != nil { + return nil, err + } + defer func(body io.ReadCloser) { + _, _ = io.ReadAll(body) + _ = body.Close() + }(resp.Body) + + var bodyReader io.Reader = resp.Body + if resp.Header.Get("Content-Encoding") == "x-snappy-framed" { + bodyReader = s2.NewReader(bodyReader) + } + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(bodyReader) + return nil, fmt.Errorf("unexpected status code %d, body: %s", resp.StatusCode, body) + } + + res := &cardinality.ActiveNativeHistogramMetricsResponse{} + err = json.NewDecoder(bodyReader).Decode(res) + if err != nil { + return nil, fmt.Errorf("error decoding active native histograms response: %w", err) + } + return res, nil +} + // GetPrometheusMetadata fetches the metadata from the Prometheus endpoint /api/v1/metadata. func (c *Client) GetPrometheusMetadata() (*http.Response, error) { req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://%s/prometheus/api/v1/metadata", c.querierAddress), nil) diff --git a/integration/query_frontend_active_series_test.go b/integration/query_frontend_active_series_test.go index 8354975ada9..d7d5c13b74e 100644 --- a/integration/query_frontend_active_series_test.go +++ b/integration/query_frontend_active_series_test.go @@ -18,6 +18,7 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/mimir/integration/e2emimir" + "github.com/grafana/mimir/pkg/cardinality" ) func TestActiveSeriesWithQuerySharding(t *testing.T) { @@ -50,16 +51,89 @@ func TestActiveSeriesWithQuerySharding(t *testing.T) { testName := fmt.Sprintf("query scheduler=%v/query sharding=%v", tc.querySchedulerEnabled, tc.shardingEnabled, ) - t.Run(testName, func(t *testing.T) { + t.Run("active series/"+testName, func(t *testing.T) { runTestActiveSeriesWithQueryShardingHTTPTest(t, config) }) + + t.Run("active native histograms/"+testName, func(t *testing.T) { + runTestActiveNativeHistogramMetricsWithQueryShardingHTTPTest(t, config) + }) } } +const ( + metricName = "test_metric" +) + func runTestActiveSeriesWithQueryShardingHTTPTest(t *testing.T, cfg queryFrontendTestConfig) { + numSeries := 100 + s, c := setupComponentsForActiveSeriesTest(t, cfg, numSeries) + defer s.Close() + + // Query active series. + for _, options := range [][]e2emimir.ActiveSeriesOption{ + {e2emimir.WithRequestMethod(http.MethodGet)}, + {e2emimir.WithRequestMethod(http.MethodPost)}, + {e2emimir.WithRequestMethod(http.MethodGet), e2emimir.WithEnableCompression()}, + {e2emimir.WithRequestMethod(http.MethodPost), e2emimir.WithEnableCompression()}, + {e2emimir.WithQueryShards(1)}, + {e2emimir.WithQueryShards(12)}, + } { + response, err := c.ActiveSeries(metricName, options...) + require.NoError(t, err) + require.Len(t, response.Data, numSeries) + } + + var err error + _, err = c.ActiveSeries(metricName, e2emimir.WithQueryShards(512)) + if cfg.shardActiveSeriesQueries { + require.Error(t, err) + require.Contains(t, err.Error(), "shard count 512 exceeds allowed maximum (128)") + } else { + require.NoError(t, err) + } +} + +func runTestActiveNativeHistogramMetricsWithQueryShardingHTTPTest(t *testing.T, cfg queryFrontendTestConfig) { + numSeries := 100 + s, c := setupComponentsForActiveSeriesTest(t, cfg, numSeries) + defer s.Close() + + // Query active series. + for _, options := range [][]e2emimir.ActiveSeriesOption{ + {e2emimir.WithRequestMethod(http.MethodGet)}, + {e2emimir.WithRequestMethod(http.MethodPost)}, + {e2emimir.WithRequestMethod(http.MethodGet), e2emimir.WithEnableCompression()}, + {e2emimir.WithRequestMethod(http.MethodPost), e2emimir.WithEnableCompression()}, + {e2emimir.WithQueryShards(1)}, + {e2emimir.WithQueryShards(12)}, + } { + response, err := c.ActiveNativeHistogramMetrics(metricName, options...) + require.NoError(t, err) + require.Len(t, response.Data, 1) + require.Equal(t, response.Data[0], cardinality.ActiveMetricWithBucketCount{ + Metric: metricName, + SeriesCount: uint64(numSeries / 2), // Only half of the series are native histograms. + BucketCount: uint64(numSeries / 2 * 8), // We add up the bucket count of all histograms. + AvgBucketCount: 8.0, + MinBucketCount: 8, + MaxBucketCount: 8, + }) + } + + var err error + _, err = c.ActiveNativeHistogramMetrics(metricName, e2emimir.WithQueryShards(512)) + if cfg.shardActiveSeriesQueries { + require.Error(t, err) + require.Contains(t, err.Error(), "shard count 512 exceeds allowed maximum (128)") + } else { + require.NoError(t, err) + } +} + +func setupComponentsForActiveSeriesTest(t *testing.T, cfg queryFrontendTestConfig, numSeries int) (*e2e.Scenario, *e2emimir.Client) { s, err := e2e.NewScenario(networkName) require.NoError(t, err) - defer s.Close() memcached := e2ecache.NewMemcached() consul := e2edb.NewConsul() @@ -130,38 +204,21 @@ func runTestActiveSeriesWithQueryShardingHTTPTest(t *testing.T, cfg queryFronten // Push series for the test user to Mimir. c, err := e2emimir.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "", userID) require.NoError(t, err) - numSeries := 100 - metricName := "test_metric" now := time.Now() series := make([]prompb.TimeSeries, numSeries) for i := 0; i < numSeries; i++ { - ts, _, _ := e2e.GenerateSeries(metricName, now, prompb.Label{Name: "index", Value: strconv.Itoa(i)}) + var ts []prompb.TimeSeries + if i%2 == 0 { + // Let half of the series be float samples and the other half native histograms. + ts, _, _ = e2e.GenerateSeries(metricName, now, prompb.Label{Name: "index", Value: strconv.Itoa(i)}) + } else { + ts, _, _ = generateHistogramSeries(metricName, now, prompb.Label{Name: "index", Value: strconv.Itoa(i)}) + } series[i] = ts[0] } res, err := c.Push(series) require.NoError(t, err) require.Equal(t, http.StatusOK, res.StatusCode) - // Query active series. - for _, options := range [][]e2emimir.ActiveSeriesOption{ - {e2emimir.WithRequestMethod(http.MethodGet)}, - {e2emimir.WithRequestMethod(http.MethodPost)}, - {e2emimir.WithRequestMethod(http.MethodGet), e2emimir.WithEnableCompression()}, - {e2emimir.WithRequestMethod(http.MethodPost), e2emimir.WithEnableCompression()}, - {e2emimir.WithQueryShards(1)}, - {e2emimir.WithQueryShards(12)}, - } { - response, err := c.ActiveSeries(metricName, options...) - require.NoError(t, err) - require.Equal(t, http.StatusOK, res.StatusCode) - require.Len(t, response.Data, numSeries) - } - - _, err = c.ActiveSeries(metricName, e2emimir.WithQueryShards(512)) - if cfg.shardActiveSeriesQueries { - require.Error(t, err) - require.Contains(t, err.Error(), "shard count 512 exceeds allowed maximum (128)") - } else { - require.NoError(t, err) - } + return s, c } diff --git a/pkg/api/api.go b/pkg/api/api.go index 24c072f65d1..e9edc912d5c 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -443,6 +443,7 @@ func (a *API) RegisterQueryAPI(handler http.Handler, buildInfoHandler http.Handl a.RegisterRoute(path.Join(a.cfg.PrometheusHTTPPrefix, "/api/v1/cardinality/label_names"), handler, true, true, "GET", "POST") a.RegisterRoute(path.Join(a.cfg.PrometheusHTTPPrefix, "/api/v1/cardinality/label_values"), handler, true, true, "GET", "POST") a.RegisterRoute(path.Join(a.cfg.PrometheusHTTPPrefix, "/api/v1/cardinality/active_series"), handler, true, true, "GET", "POST") + a.RegisterRoute(path.Join(a.cfg.PrometheusHTTPPrefix, "/api/v1/cardinality/active_native_histogram_metrics"), handler, true, true, "GET", "POST") a.RegisterRoute(path.Join(a.cfg.PrometheusHTTPPrefix, "/api/v1/format_query"), handler, true, true, "GET", "POST") } diff --git a/pkg/api/handlers.go b/pkg/api/handlers.go index b1cfd1529e8..e359c150b89 100644 --- a/pkg/api/handlers.go +++ b/pkg/api/handlers.go @@ -328,6 +328,7 @@ func NewQuerierHandler( router.Path(path.Join(prefix, "/api/v1/cardinality/label_names")).Methods("GET", "POST").Handler(cardinalityQueryStats.Wrap(querier.LabelNamesCardinalityHandler(distributor, limits))) router.Path(path.Join(prefix, "/api/v1/cardinality/label_values")).Methods("GET", "POST").Handler(cardinalityQueryStats.Wrap(querier.LabelValuesCardinalityHandler(distributor, limits))) router.Path(path.Join(prefix, "/api/v1/cardinality/active_series")).Methods("GET", "POST").Handler(cardinalityQueryStats.Wrap(querier.ActiveSeriesCardinalityHandler(distributor, limits))) + router.Path(path.Join(prefix, "/api/v1/cardinality/active_native_histogram_metrics")).Methods("GET", "POST").Handler(cardinalityQueryStats.Wrap(querier.ActiveNativeHistogramMetricsHandler(distributor, limits))) router.Path(path.Join(prefix, "/api/v1/format_query")).Methods("GET", "POST").Handler(formattingQueryStats.Wrap(promRouter)) // Track execution time. diff --git a/pkg/cardinality/request.go b/pkg/cardinality/request.go index efcc9c8074a..10a24644e88 100644 --- a/pkg/cardinality/request.go +++ b/pkg/cardinality/request.go @@ -279,6 +279,19 @@ type ActiveSeriesRequest struct { Matchers []*labels.Matcher } +type ActiveMetricWithBucketCount struct { + Metric string `json:"metric"` + SeriesCount uint64 `json:"series_count"` + BucketCount uint64 `json:"bucket_count"` + AvgBucketCount float64 `json:"avg_bucket_count"` + MinBucketCount uint64 `json:"min_bucket_count"` + MaxBucketCount uint64 `json:"max_bucket_count"` +} + +type ActiveNativeHistogramMetricsResponse struct { + Data []ActiveMetricWithBucketCount `json:"data"` +} + // DecodeActiveSeriesRequest decodes the input http.Request into an ActiveSeriesRequest. func DecodeActiveSeriesRequest(r *http.Request) (*ActiveSeriesRequest, error) { if err := r.ParseForm(); err != nil { diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index fba21ee5deb..2f0ab7fc62c 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -2079,6 +2079,34 @@ func (cm *labelValuesCardinalityConcurrentMap) toResponse(approximateFromZonesFu // ActiveSeries queries the ingester replication set for active series matching // the given selector. It combines and deduplicates the results. func (d *Distributor) ActiveSeries(ctx context.Context, matchers []*labels.Matcher) ([]labels.Labels, error) { + res, err := d.deduplicateActiveSeries(ctx, matchers, false) + if err != nil { + return nil, err + } + + deduplicatedSeries, fetchedSeries := res.result() + + reqStats := stats.FromContext(ctx) + reqStats.AddFetchedSeries(fetchedSeries) + + return deduplicatedSeries, nil +} + +func (d *Distributor) ActiveNativeHistogramMetrics(ctx context.Context, matchers []*labels.Matcher) (*cardinality.ActiveNativeHistogramMetricsResponse, error) { + res, err := d.deduplicateActiveSeries(ctx, matchers, true) + if err != nil { + return nil, err + } + + metrics, fetchedSeries := res.metricResult() + + reqStats := stats.FromContext(ctx) + reqStats.AddFetchedSeries(fetchedSeries) + + return &cardinality.ActiveNativeHistogramMetricsResponse{Data: metrics}, nil +} + +func (d *Distributor) deduplicateActiveSeries(ctx context.Context, matchers []*labels.Matcher, nativeHistograms bool) (*activeSeriesResponse, error) { replicationSets, err := d.getIngesterReplicationSetsForQuery(ctx) if err != nil { return nil, err @@ -2094,6 +2122,9 @@ func (d *Distributor) ActiveSeries(ctx context.Context, matchers []*labels.Match if err != nil { return nil, err } + if nativeHistograms { + req.Type = ingester_client.NATIVE_HISTOGRAM_SERIES + } tenantID, err := tenant.TenantID(ctx) if err != nil { @@ -2101,10 +2132,13 @@ func (d *Distributor) ActiveSeries(ctx context.Context, matchers []*labels.Match } maxResponseSize := math.MaxInt - if limit := d.limits.ActiveSeriesResultsMaxSizeBytes(tenantID); limit > 0 { - maxResponseSize = limit + if !nativeHistograms { + // We are going to return metrics level information, this limit doesn't make much sense in that case + if limit := d.limits.ActiveSeriesResultsMaxSizeBytes(tenantID); limit > 0 { + maxResponseSize = limit + } } - res := newActiveSeriesResponse(d.hashCollisionCount, maxResponseSize) + res := newActiveSeriesResponse(d.hashCollisionCount, maxResponseSize, !nativeHistograms) ingesterQuery := func(ctx context.Context, client ingester_client.IngesterClient) (any, error) { // This function is invoked purely for its side effects on the captured @@ -2145,7 +2179,7 @@ func (d *Distributor) ActiveSeries(ctx context.Context, matchers []*labels.Match return nil, err } - err = res.add(msg.Metric) + err = res.add(msg.Metric, msg.BucketCount) if err != nil { return nil, err } @@ -2159,21 +2193,22 @@ func (d *Distributor) ActiveSeries(ctx context.Context, matchers []*labels.Match return nil, err } - deduplicatedSeries := res.result() - - reqStats := stats.FromContext(ctx) - reqStats.AddFetchedSeries(uint64(len(deduplicatedSeries))) + return res, nil +} - return deduplicatedSeries, nil +type labelsWithBucketCount struct { + labels.Labels + bucketCount uint64 } type entry struct { - first labels.Labels - collisions []labels.Labels + first labelsWithBucketCount + collisions []labelsWithBucketCount } // activeSeriesResponse is a helper to merge/deduplicate ActiveSeries responses from ingesters. type activeSeriesResponse struct { + ignoreBucketCount bool m sync.Mutex series map[uint64]entry builder labels.ScratchBuilder @@ -2185,8 +2220,9 @@ type activeSeriesResponse struct { maxSize int } -func newActiveSeriesResponse(hashCollisionCount prometheus.Counter, maxSize int) *activeSeriesResponse { +func newActiveSeriesResponse(hashCollisionCount prometheus.Counter, maxSize int, ignoreBucketCount bool) *activeSeriesResponse { return &activeSeriesResponse{ + ignoreBucketCount: ignoreBucketCount, series: map[uint64]entry{}, builder: labels.NewScratchBuilder(40), hashCollisionCount: hashCollisionCount, @@ -2196,12 +2232,12 @@ func newActiveSeriesResponse(hashCollisionCount prometheus.Counter, maxSize int) var ErrResponseTooLarge = errors.New("response too large") -func (r *activeSeriesResponse) add(series []*mimirpb.Metric) error { +func (r *activeSeriesResponse) add(series []*mimirpb.Metric, bucketCounts []uint64) error { r.m.Lock() defer r.m.Unlock() - for _, metric := range series { + for i, metric := range series { mimirpb.FromLabelAdaptersOverwriteLabels(&r.builder, metric.Labels, &r.lbls) lblHash := r.lbls.Hash() if e, ok := r.series[lblHash]; !ok { @@ -2212,18 +2248,21 @@ func (r *activeSeriesResponse) add(series []*mimirpb.Metric) error { if r.size > r.maxSize { return ErrResponseTooLarge } - - r.series[lblHash] = entry{first: l} + if r.ignoreBucketCount { + r.series[lblHash] = entry{first: labelsWithBucketCount{labels.Labels(l), 0}} + } else { + r.series[lblHash] = entry{first: labelsWithBucketCount{labels.Labels(l), bucketCounts[i]}} + } } else { // A series with this hash is already present in the result set, we need to // detect potential hash collisions by comparing the labels of the candidate to // the labels in the result set and add the candidate if it's not present. - present := labels.Equal(e.first, r.lbls) + present := labels.Equal(e.first.Labels, r.lbls) for _, lbls := range e.collisions { if present { break } - present = labels.Equal(lbls, r.lbls) + present = labels.Equal(lbls.Labels, r.lbls) } if !present { @@ -2234,8 +2273,11 @@ func (r *activeSeriesResponse) add(series []*mimirpb.Metric) error { if r.size > r.maxSize { return ErrResponseTooLarge } - - e.collisions = append(e.collisions, l) + if r.ignoreBucketCount { + e.collisions = append(e.collisions, labelsWithBucketCount{labels.Labels(l), 0}) + } else { + e.collisions = append(e.collisions, labelsWithBucketCount{labels.Labels(l), bucketCounts[i]}) + } r.series[lblHash] = e r.hashCollisionCount.Inc() } @@ -2245,16 +2287,79 @@ func (r *activeSeriesResponse) add(series []*mimirpb.Metric) error { return nil } -func (r *activeSeriesResponse) result() []labels.Labels { +func (r *activeSeriesResponse) result() ([]labels.Labels, uint64) { r.m.Lock() defer r.m.Unlock() result := make([]labels.Labels, 0, len(r.series)) for _, series := range r.series { - result = append(result, series.first) - result = append(result, series.collisions...) + result = append(result, series.first.Labels) + for _, collision := range series.collisions { + result = append(result, collision.Labels) + } + } + return result, uint64(len(result)) +} + +type metricBucketCounts struct { + SeriesCount uint64 + BucketCount uint64 + MaxBucketCount uint64 + MinBucketCount uint64 +} + +// Aggregate native histogram bucket counts on metric level. +func (r *activeSeriesResponse) metricResult() ([]cardinality.ActiveMetricWithBucketCount, uint64) { + metrics, fetchedSeries := r.getMetricCounts() + result := make([]cardinality.ActiveMetricWithBucketCount, 0, len(metrics)) + for name, metric := range metrics { + result = append(result, cardinality.ActiveMetricWithBucketCount{ + Metric: name, + SeriesCount: metric.SeriesCount, + BucketCount: metric.BucketCount, + MaxBucketCount: metric.MaxBucketCount, + MinBucketCount: metric.MinBucketCount, + AvgBucketCount: float64(metric.BucketCount) / float64(metric.SeriesCount), + }) + } + return result, fetchedSeries +} + +func (r *activeSeriesResponse) getMetricCounts() (map[string]*metricBucketCounts, uint64) { + r.m.Lock() + defer r.m.Unlock() + + fetchedSeries := uint64(0) + metrics := map[string]*metricBucketCounts{} + for _, series := range r.series { + fetchedSeries += 1 + uint64(len(series.collisions)) + updateMetricCounts(metrics, series.first) + for _, collision := range series.collisions { + updateMetricCounts(metrics, collision) + } + } + return metrics, fetchedSeries +} + +func updateMetricCounts(metrics map[string]*metricBucketCounts, series labelsWithBucketCount) { + bucketCount := series.bucketCount + if m, ok := metrics[series.Get(model.MetricNameLabel)]; ok { + m.SeriesCount++ + m.BucketCount += bucketCount + if m.MaxBucketCount < bucketCount { + m.MaxBucketCount = bucketCount + } + if m.MinBucketCount > bucketCount { + m.MinBucketCount = bucketCount + } + } else { + metrics[series.Get(model.MetricNameLabel)] = &metricBucketCounts{ + SeriesCount: 1, + BucketCount: bucketCount, + MaxBucketCount: bucketCount, + MinBucketCount: bucketCount, + } } - return result } // approximateFromZones computes a zonal value while factoring in replication; diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 00826f9263e..247d0d3e52f 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -2505,6 +2505,153 @@ func TestDistributor_ActiveSeries(t *testing.T) { } } +func TestDistributor_ActiveNativeHistogramSeries(t *testing.T) { + const numIngesters = 5 + const responseSizeLimitBytes = 1024 + + collision1, collision2 := labelsWithHashCollision() + + pushedData := []struct { + lbls labels.Labels + value float64 + timestamp int64 + }{ + {labels.FromStrings(labels.MetricName, "test_1", "team", "a"), 1, 100000}, + {labels.FromStrings(labels.MetricName, "test_1", "team", "b"), 1, 110000}, + {labels.FromStrings(labels.MetricName, "test_2"), 2, 200000}, + {collision1, 3, 300000}, + {collision2, 4, 300000}, + {labels.FromStrings(labels.MetricName, "large_metric", "label", strings.Repeat("1", 2*responseSizeLimitBytes)), 5, 400000}, + } + + tests := map[string]struct { + shuffleShardSize int + requestMatchers []*labels.Matcher + expectedFetchedSeries uint64 + expectedMetrics []cardinality.ActiveMetricWithBucketCount + expectedNumQueriedIngesters int + }{ + "should return an empty response if no metric match": { + requestMatchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "unknown")}, + expectedMetrics: []cardinality.ActiveMetricWithBucketCount{}, + expectedNumQueriedIngesters: numIngesters, + }, + "should return all matching metrics": { + requestMatchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_1")}, + expectedFetchedSeries: 2, + expectedMetrics: []cardinality.ActiveMetricWithBucketCount{{Metric: "test_1", SeriesCount: 2, BucketCount: 16, MaxBucketCount: 8, MinBucketCount: 8, AvgBucketCount: 8.0}}, + expectedNumQueriedIngesters: numIngesters, + }, + "should honour shuffle shard size": { + shuffleShardSize: 3, + requestMatchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_2")}, + expectedFetchedSeries: 1, + expectedMetrics: []cardinality.ActiveMetricWithBucketCount{{Metric: "test_2", SeriesCount: 1, BucketCount: 8, MaxBucketCount: 8, MinBucketCount: 8, AvgBucketCount: 8.0}}, + expectedNumQueriedIngesters: 3, + }, + "should return all matching series even if their hash collides": { + requestMatchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "metric")}, + expectedFetchedSeries: 2, + expectedMetrics: []cardinality.ActiveMetricWithBucketCount{{Metric: "metric", SeriesCount: 2, BucketCount: 16, MaxBucketCount: 8, MinBucketCount: 8, AvgBucketCount: 8.0}}, + expectedNumQueriedIngesters: numIngesters, + }, + } + + // Programmatically build different scenarios under which run the tests. + type scenario struct { + ingestStorageEnabled bool + minimizeIngesterRequests bool + } + + scenarios := map[string]scenario{} + for _, minimizeIngesterRequests := range []bool{false, true} { + for _, ingestStorageEnabled := range []bool{false, true} { + name := fmt.Sprintf("minimize ingester requests: %t, ingest storage enabled: %t", minimizeIngesterRequests, ingestStorageEnabled) + scenarios[name] = scenario{ + ingestStorageEnabled: ingestStorageEnabled, + minimizeIngesterRequests: minimizeIngesterRequests, + } + } + } + + for testName, testData := range tests { + testData := testData + + t.Run(testName, func(t *testing.T) { + t.Parallel() + + for scenarioName, scenarioData := range scenarios { + scenarioData := scenarioData + + t.Run(scenarioName, func(t *testing.T) { + t.Parallel() + + testConfig := prepConfig{ + numIngesters: numIngesters, + happyIngesters: numIngesters, + numDistributors: 1, + ingestStorageEnabled: scenarioData.ingestStorageEnabled, + configure: func(config *Config) { + config.MinimizeIngesterRequests = scenarioData.minimizeIngesterRequests + }, + limits: func() *validation.Limits { + limits := prepareDefaultLimits() + limits.ActiveSeriesResultsMaxSizeBytes = responseSizeLimitBytes + limits.NativeHistogramsIngestionEnabled = true + return limits + }(), + } + + if scenarioData.ingestStorageEnabled { + testConfig.limits.IngestionPartitionsTenantShardSize = testData.shuffleShardSize + } else { + testConfig.shuffleShardSize = testData.shuffleShardSize + } + + // Create distributor and ingesters. + distributors, ingesters, _, _ := prepare(t, testConfig) + d := distributors[0] + + // Ensure strong read consistency, required to have no flaky tests when ingest storage is enabled. + ctx := user.InjectOrgID(context.Background(), "test") + ctx = api.ContextWithReadConsistency(ctx, api.ReadConsistencyStrong) + + // Push test data. + for _, series := range pushedData { + req := mockWriteHistogramRequest(series.lbls, series.value, series.timestamp) + _, err := d.Push(ctx, req) + require.NoError(t, err) + } + + // Prepare empty query stats. + qStats, ctx := stats.ContextWithEmptyStats(ctx) + + // Query active series. + series, err := d.ActiveNativeHistogramMetrics(ctx, testData.requestMatchers) + require.NoError(t, err) + assert.ElementsMatch(t, testData.expectedMetrics, series.Data) + + // Check that query stats are set correctly. + assert.Equal(t, testData.expectedFetchedSeries, qStats.GetFetchedSeriesCount()) + + // Check how many ingesters have been queried. + if scenarioData.ingestStorageEnabled { + // When ingest storage is enabled, we request quorum 1 for each partition. + // In this test each ingester owns a different partition, so we expect all + // ingesters to be queried. + assert.Equal(t, testData.expectedNumQueriedIngesters, countMockIngestersCalled(ingesters, "ActiveSeries")) + } else { + // Due to the quorum the distributor could cancel the last request towards ingesters + // if all other ones are successful, so we're good either has been queried X or X-1 + // ingesters. + assert.Contains(t, []int{testData.expectedNumQueriedIngesters, testData.expectedNumQueriedIngesters - 1}, countMockIngestersCalled(ingesters, "ActiveSeries")) + } + }) + } + }) + } +} + func TestDistributor_ActiveSeries_AvailabilityAndConsistency(t *testing.T) { // In this test we run all queries with a matcher which matches all series. reqMatchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, model.MetricNameLabel, ".+")} @@ -4868,6 +5015,13 @@ func mockWriteRequest(lbls labels.Labels, value float64, timestampMs int64) *mim return mimirpb.ToWriteRequest([][]mimirpb.LabelAdapter{mimirpb.FromLabelsToLabelAdapters(lbls)}, samples, nil, nil, mimirpb.API) } +func mockWriteHistogramRequest(lbls labels.Labels, value float64, timestampMs int64) *mimirpb.WriteRequest { + histograms := []mimirpb.Histogram{mimirpb.FromHistogramToHistogramProto(timestampMs, util_test.GenerateTestHistogram(int(value)))} + + req := mimirpb.NewWriteRequest(nil, mimirpb.API) + return req.AddHistogramSeries([][]mimirpb.LabelAdapter{mimirpb.FromLabelsToLabelAdapters(lbls)}, histograms, nil) +} + type ingesterState int const ( @@ -6372,7 +6526,16 @@ func (i *mockIngester) ActiveSeries(ctx context.Context, req *client.ActiveSerie for _, series := range i.timeseries { if match(series.Labels, matchers) { - resp.Metric = append(resp.Metric, &mimirpb.Metric{Labels: series.Labels}) + lbls := series.Labels + if req.Type == client.NATIVE_HISTOGRAM_SERIES { + if len(series.Histograms) == 0 { + continue + } + h := series.Histograms[len(series.Histograms)-1] + bucketCount := len(h.NegativeCounts) + len(h.NegativeDeltas) + len(h.PositiveCounts) + len(h.PositiveDeltas) + resp.BucketCount = append(resp.BucketCount, uint64(bucketCount)) + } + resp.Metric = append(resp.Metric, &mimirpb.Metric{Labels: lbls}) } if len(resp.Metric) > 1 { results = append(results, resp) diff --git a/pkg/frontend/querymiddleware/roundtrip.go b/pkg/frontend/querymiddleware/roundtrip.go index 293b2c3e2c8..fb155131660 100644 --- a/pkg/frontend/querymiddleware/roundtrip.go +++ b/pkg/frontend/querymiddleware/roundtrip.go @@ -28,24 +28,26 @@ import ( ) const ( - day = 24 * time.Hour - queryRangePathSuffix = "/api/v1/query_range" - instantQueryPathSuffix = "/api/v1/query" - cardinalityLabelNamesPathSuffix = "/api/v1/cardinality/label_names" - cardinalityLabelValuesPathSuffix = "/api/v1/cardinality/label_values" - cardinalityActiveSeriesPathSuffix = "/api/v1/cardinality/active_series" - labelNamesPathSuffix = "/api/v1/labels" + day = 24 * time.Hour + queryRangePathSuffix = "/api/v1/query_range" + instantQueryPathSuffix = "/api/v1/query" + cardinalityLabelNamesPathSuffix = "/api/v1/cardinality/label_names" + cardinalityLabelValuesPathSuffix = "/api/v1/cardinality/label_values" + cardinalityActiveSeriesPathSuffix = "/api/v1/cardinality/active_series" + cardinalityActiveNativeHistogramMetricsPathSuffix = "/api/v1/cardinality/active_native_histogram_metrics" + labelNamesPathSuffix = "/api/v1/labels" // DefaultDeprecatedAlignQueriesWithStep is the default value for the deprecated querier frontend config DeprecatedAlignQueriesWithStep // which has been moved to a per-tenant limit; TODO remove in Mimir 2.14 DefaultDeprecatedAlignQueriesWithStep = false - queryTypeInstant = "query" - queryTypeRange = "query_range" - queryTypeCardinality = "cardinality" - queryTypeLabels = "label_names_and_values" - queryTypeActiveSeries = "active_series" - queryTypeOther = "other" + queryTypeInstant = "query" + queryTypeRange = "query_range" + queryTypeCardinality = "cardinality" + queryTypeLabels = "label_names_and_values" + queryTypeActiveSeries = "active_series" + queryTypeActiveNativeHistogramMetrics = "active_native_histogram_metrics" + queryTypeOther = "other" ) var ( @@ -327,6 +329,7 @@ func newQueryTripperware( next = newQueryDetailsStartEndRoundTripper(next) cardinality := next activeSeries := next + activeNativeHistogramMetrics := next labels := next // Inject the cardinality and labels query cache roundtripper only if the query results cache is enabled. @@ -337,6 +340,7 @@ func newQueryTripperware( if cfg.ShardActiveSeriesQueries { activeSeries = newShardActiveSeriesMiddleware(activeSeries, cfg.UseActiveSeriesDecoder, limits, log) + activeNativeHistogramMetrics = newShardActiveNativeHistogramMetricsMiddleware(activeNativeHistogramMetrics, limits, log) } return RoundTripFunc(func(r *http.Request) (*http.Response, error) { @@ -349,6 +353,8 @@ func newQueryTripperware( return cardinality.RoundTrip(r) case IsActiveSeriesQuery(r.URL.Path): return activeSeries.RoundTrip(r) + case IsActiveNativeHistogramMetricsQuery(r.URL.Path): + return activeNativeHistogramMetrics.RoundTrip(r) case IsLabelsQuery(r.URL.Path): return labels.RoundTrip(r) default: @@ -402,6 +408,8 @@ func newQueryCountTripperware(registerer prometheus.Registerer) Tripperware { op = queryTypeCardinality case IsActiveSeriesQuery(r.URL.Path): op = queryTypeActiveSeries + case IsActiveNativeHistogramMetricsQuery(r.URL.Path): + op = queryTypeActiveNativeHistogramMetrics case IsLabelsQuery(r.URL.Path): op = queryTypeLabels } @@ -448,3 +456,7 @@ func IsLabelsQuery(path string) bool { func IsActiveSeriesQuery(path string) bool { return strings.HasSuffix(path, cardinalityActiveSeriesPathSuffix) } + +func IsActiveNativeHistogramMetricsQuery(path string) bool { + return strings.HasSuffix(path, cardinalityActiveNativeHistogramMetricsPathSuffix) +} diff --git a/pkg/frontend/querymiddleware/roundtrip_test.go b/pkg/frontend/querymiddleware/roundtrip_test.go index 5b4e2b22c18..4689850c634 100644 --- a/pkg/frontend/querymiddleware/roundtrip_test.go +++ b/pkg/frontend/querymiddleware/roundtrip_test.go @@ -351,6 +351,18 @@ func TestTripperware_Metrics(t *testing.T) { cortex_query_frontend_queries_total{op="active_series",user="user-1"} 1 `, }, + "cardinality active native histogram metrics": { + path: "/api/v1/cardinality/active_native_histogram_metrics?selector=%7Bjob%3D%22test%22%7D", + expectedMetrics: ` + # HELP cortex_query_frontend_non_step_aligned_queries_total Total queries sent that are not step aligned. + # TYPE cortex_query_frontend_non_step_aligned_queries_total counter + cortex_query_frontend_non_step_aligned_queries_total 0 + + # HELP cortex_query_frontend_queries_total Total queries sent per tenant. + # TYPE cortex_query_frontend_queries_total counter + cortex_query_frontend_queries_total{op="active_native_histogram_metrics",user="user-1"} 1 + `, + }, "unknown query type": { path: "/api/v1/unknown", expectedMetrics: ` diff --git a/pkg/frontend/querymiddleware/shard_active_native_histogram_metrics.go b/pkg/frontend/querymiddleware/shard_active_native_histogram_metrics.go new file mode 100644 index 00000000000..a41a193eb8e --- /dev/null +++ b/pkg/frontend/querymiddleware/shard_active_native_histogram_metrics.go @@ -0,0 +1,227 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package querymiddleware + +import ( + "context" + "fmt" + "io" + "net/http" + "sort" + "sync" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + jsoniter "github.com/json-iterator/go" + "github.com/opentracing/opentracing-go" + otlog "github.com/opentracing/opentracing-go/log" + "github.com/pkg/errors" + "golang.org/x/sync/errgroup" + + "github.com/grafana/mimir/pkg/cardinality" + "github.com/grafana/mimir/pkg/util/spanlogger" +) + +type shardActiveNativeHistogramMetricsMiddleware struct { + shardBySeriesBase +} + +func newShardActiveNativeHistogramMetricsMiddleware(upstream http.RoundTripper, limits Limits, logger log.Logger) http.RoundTripper { + return &shardActiveNativeHistogramMetricsMiddleware{shardBySeriesBase{ + upstream: upstream, + limits: limits, + logger: logger, + }} +} + +func (s *shardActiveNativeHistogramMetricsMiddleware) RoundTrip(r *http.Request) (*http.Response, error) { + spanLog, ctx := spanlogger.NewWithLogger(r.Context(), s.logger, "shardActiveNativeHistogramMetrics.RoundTrip") + defer spanLog.Finish() + + resp, err := s.shardBySeriesSelector(ctx, spanLog, r, s.mergeResponses) + + if err != nil { + return nil, err + } + return resp, nil +} + +func (s *shardActiveNativeHistogramMetricsMiddleware) mergeResponses(ctx context.Context, responses []*http.Response, encoding string) *http.Response { + reader, writer := io.Pipe() + + mtx := sync.Mutex{} + metricIdx := make(map[string]int, 0) + metricBucketCount := make([]cardinality.ActiveMetricWithBucketCount, 0) + + updateMetric := func(item *cardinality.ActiveMetricWithBucketCount) { + if item == nil || len(item.Metric) == 0 { + // Skip empty/unknown metrics. + return + } + mtx.Lock() + defer mtx.Unlock() + if idx, ok := metricIdx[item.Metric]; ok { + metricBucketCount[idx].SeriesCount += item.SeriesCount + metricBucketCount[idx].BucketCount += item.BucketCount + if item.MinBucketCount < metricBucketCount[idx].MinBucketCount { + metricBucketCount[idx].MinBucketCount = item.MinBucketCount + } + if item.MaxBucketCount > metricBucketCount[idx].MaxBucketCount { + metricBucketCount[idx].MaxBucketCount = item.MaxBucketCount + } + } else { + metricIdx[item.Metric] = len(metricBucketCount) + metricBucketCount = append(metricBucketCount, *item) + } + } + + g := new(errgroup.Group) + for _, res := range responses { + if res == nil { + continue + } + r := res + g.Go(func() error { + defer func(body io.ReadCloser) { + // drain body reader + _, _ = io.Copy(io.Discard, body) + _ = body.Close() + }(r.Body) + + bufPtr := jsoniterBufferPool.Get().(*[]byte) + defer jsoniterBufferPool.Put(bufPtr) + + it := jsoniter.ConfigFastest.BorrowIterator(*bufPtr) + it.Reset(r.Body) + defer func() { + jsoniter.ConfigFastest.ReturnIterator(it) + }() + + // Iterate over fields until we find data or error fields + foundDataField := false + for it.Error == nil { + field := it.ReadObject() + if field == "error" { + return fmt.Errorf("error in partial response: %s", it.ReadString()) + } + if field == "data" { + foundDataField = true + break + } + // If the field is neither data nor error, we skip it. + it.ReadAny() + } + if !foundDataField { + return fmt.Errorf("expected data field at top level, found %s", it.CurrentBuffer()) + } + + if it.WhatIsNext() != jsoniter.ArrayValue { + err := errors.New("expected data field to contain an array") + return err + } + + for it.ReadArray() { + if err := ctx.Err(); err != nil { + if cause := context.Cause(ctx); cause != nil { + return fmt.Errorf("aborted streaming because context was cancelled: %w", cause) + } + return ctx.Err() + } + + item := cardinality.ActiveMetricWithBucketCount{} + it.ReadVal(&item) + updateMetric(&item) + } + + return it.Error + }) + } + + // Cannot start streaming until we merged all results. + err := g.Wait() + + // Sort the results by metric name, unless there was an error. + if err == nil { + sort.Slice(metricBucketCount, func(i, j int) bool { + return metricBucketCount[i].Metric < metricBucketCount[j].Metric + }) + } else { + metricBucketCount = nil + } + + resp := &http.Response{Body: reader, StatusCode: http.StatusOK, Header: http.Header{}} + resp.Header.Set("Content-Type", "application/json") + if encoding == encodingTypeSnappyFramed { + resp.Header.Set("Content-Encoding", encodingTypeSnappyFramed) + } + + go s.writeMergedResponse(ctx, err, writer, metricBucketCount, encoding) + + return resp +} + +func (s *shardActiveNativeHistogramMetricsMiddleware) writeMergedResponse(ctx context.Context, mergeError error, w io.WriteCloser, metricBucketCount []cardinality.ActiveMetricWithBucketCount, encoding string) { + defer w.Close() + + span, _ := opentracing.StartSpanFromContext(ctx, "shardActiveNativeHistogramMetrics.writeMergedResponse") + defer span.Finish() + + var out io.Writer = w + if encoding == encodingTypeSnappyFramed { + span.LogFields(otlog.String("encoding", encodingTypeSnappyFramed)) + enc := getSnappyWriter(w) + out = enc + defer func() { + enc.Close() + // Reset the encoder before putting it back to pool to avoid it to hold the writer. + enc.Reset(nil) + snappyWriterPool.Put(enc) + }() + } else { + span.LogFields(otlog.String("encoding", "none")) + } + + stream := jsoniter.ConfigFastest.BorrowStream(out) + defer func(stream *jsoniter.Stream) { + _ = stream.Flush() + + if cap(stream.Buffer()) > jsoniterMaxBufferSize { + return + } + jsoniter.ConfigFastest.ReturnStream(stream) + }(stream) + + stream.WriteObjectStart() + defer stream.WriteObjectEnd() + + if mergeError != nil { + level.Error(s.logger).Log("msg", "error merging partial responses", "err", mergeError.Error()) + span.LogFields(otlog.Error(mergeError)) + stream.WriteMore() + stream.WriteObjectField("status") + stream.WriteString("error") + stream.WriteMore() + stream.WriteObjectField("error") + stream.WriteString(fmt.Sprintf("error merging partial responses: %s", mergeError.Error())) + return + } + + stream.WriteObjectField("data") + stream.WriteArrayStart() + firstItem := true + for idx := range metricBucketCount { + if firstItem { + firstItem = false + } else { + stream.WriteMore() + } + + stream.WriteVal(&metricBucketCount[idx]) + + // Flush the stream buffer if it's getting too large. + if stream.Buffered() > jsoniterMaxBufferSize { + _ = stream.Flush() + } + } + stream.WriteArrayEnd() +} diff --git a/pkg/frontend/querymiddleware/shard_active_series.go b/pkg/frontend/querymiddleware/shard_active_series.go index 54fc791411b..8038d5e3ef0 100644 --- a/pkg/frontend/querymiddleware/shard_active_series.go +++ b/pkg/frontend/querymiddleware/shard_active_series.go @@ -8,29 +8,20 @@ import ( "fmt" "io" "net/http" - "net/url" "os" - "strconv" "sync" "unsafe" "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/grafana/dskit/tenant" - "github.com/grafana/dskit/user" jsoniter "github.com/json-iterator/go" "github.com/klauspost/compress/s2" "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/promql/parser" "golang.org/x/sync/errgroup" - apierror "github.com/grafana/mimir/pkg/api/error" - "github.com/grafana/mimir/pkg/querier/stats" - "github.com/grafana/mimir/pkg/storage/sharding" - "github.com/grafana/mimir/pkg/util" "github.com/grafana/mimir/pkg/util/spanlogger" ) @@ -67,214 +58,36 @@ func getSnappyWriter(w io.Writer) *s2.Writer { } type shardActiveSeriesMiddleware struct { - upstream http.RoundTripper + shardBySeriesBase useZeroAllocationDecoder bool - limits Limits - logger log.Logger } func newShardActiveSeriesMiddleware(upstream http.RoundTripper, useZeroAllocationDecoder bool, limits Limits, logger log.Logger) http.RoundTripper { - return &shardActiveSeriesMiddleware{ - upstream: upstream, - useZeroAllocationDecoder: useZeroAllocationDecoder, - limits: limits, - logger: logger, - } + return &shardActiveSeriesMiddleware{shardBySeriesBase{ + upstream: upstream, + limits: limits, + logger: logger, + }, useZeroAllocationDecoder} } func (s *shardActiveSeriesMiddleware) RoundTrip(r *http.Request) (*http.Response, error) { spanLog, ctx := spanlogger.NewWithLogger(r.Context(), s.logger, "shardActiveSeries.RoundTrip") defer spanLog.Finish() - tenantID, err := tenant.TenantID(ctx) - if err != nil { - return nil, apierror.New(apierror.TypeBadData, err.Error()) - } - - defaultShardCount := s.limits.QueryShardingTotalShards(tenantID) - shardCount := setShardCountFromHeader(defaultShardCount, r, spanLog) - - if shardCount < 2 { - spanLog.DebugLog("msg", "query sharding disabled for request") - return s.upstream.RoundTrip(r) - } - - if maxShards := s.limits.QueryShardingMaxShardedQueries(tenantID); shardCount > maxShards { - return nil, apierror.New( - apierror.TypeBadData, - fmt.Sprintf("shard count %d exceeds allowed maximum (%d)", shardCount, maxShards), - ) - } - - selector, err := parseSelector(r) - if err != nil { - return nil, apierror.New(apierror.TypeBadData, err.Error()) - } - - spanLog.DebugLog( - "msg", "sharding active series query", - "shardCount", shardCount, "selector", selector.String(), + var ( + resp *http.Response + err error ) - - reqs, err := buildShardedRequests(ctx, r, shardCount, selector) - if err != nil { - return nil, apierror.New(apierror.TypeInternal, err.Error()) - } - - resp, err := doShardedRequests(ctx, reqs, s.upstream) - if err != nil { - if errors.Is(err, errShardCountTooLow) { - return nil, apierror.New(apierror.TypeBadData, fmt.Errorf("%w: try increasing the requested shard count", err).Error()) - } - return nil, apierror.New(apierror.TypeInternal, err.Error()) - } - acceptEncoding := r.Header.Get("Accept-Encoding") - if s.useZeroAllocationDecoder { - return s.mergeResponsesWithZeroAllocationDecoder(ctx, resp, acceptEncoding), nil - } - return s.mergeResponses(ctx, resp, acceptEncoding), nil -} - -func setShardCountFromHeader(origShardCount int, r *http.Request, spanLog *spanlogger.SpanLogger) int { - for _, value := range r.Header.Values(totalShardsControlHeader) { - shards, err := strconv.ParseInt(value, 10, 32) - if err != nil { - continue - } - if shards >= 0 { - spanLog.DebugLog("msg", fmt.Sprintf("using shard count from header %s: %d", totalShardsControlHeader, shards)) - return int(shards) - } - } - return origShardCount -} - -func parseSelector(req *http.Request) (*parser.VectorSelector, error) { - values, err := util.ParseRequestFormWithoutConsumingBody(req) - if err != nil { - return nil, err - } - valSelector := values.Get("selector") - if valSelector == "" { - return nil, errors.New("selector parameter is required") - } - parsed, err := parser.ParseExpr(valSelector) - if err != nil { - return nil, fmt.Errorf("invalid selector: %w", err) - } - selector, ok := parsed.(*parser.VectorSelector) - if !ok { - return nil, fmt.Errorf("invalid selector: %w", err) - } - - return selector, nil -} - -func buildShardedRequests(ctx context.Context, req *http.Request, numRequests int, selector parser.Expr) ([]*http.Request, error) { - reqs := make([]*http.Request, numRequests) - for i := 0; i < numRequests; i++ { - r, err := http.NewRequestWithContext(ctx, http.MethodGet, req.URL.Path, http.NoBody) - if err != nil { - return nil, err - } - - sharded, err := shardedSelector(numRequests, i, selector) - if err != nil { - return nil, err - } - - vals := url.Values{} - vals.Set("selector", sharded.String()) - r.URL.RawQuery = vals.Encode() - // This is the field read by httpgrpc.FromHTTPRequest, so we need to populate it - // here to ensure the request parameter makes it to the querier. - r.RequestURI = r.URL.String() - - if err := user.InjectOrgIDIntoHTTPRequest(ctx, r); err != nil { - return nil, err - } - - reqs[i] = r - } - - return reqs, nil -} - -func doShardedRequests(ctx context.Context, upstreamRequests []*http.Request, next http.RoundTripper) ([]*http.Response, error) { - resps := make([]*http.Response, len(upstreamRequests)) - - g, ctx := errgroup.WithContext(ctx) - queryStats := stats.FromContext(ctx) - for i, req := range upstreamRequests { - i, r := i, req - g.Go(func() error { - partialStats, childCtx := stats.ContextWithEmptyStats(ctx) - partialStats.AddShardedQueries(1) - - var span opentracing.Span - span, childCtx = opentracing.StartSpanFromContext(childCtx, "shardActiveSeries.doShardedRequest") - defer span.Finish() - - resp, err := next.RoundTrip(r.WithContext(childCtx)) - if err != nil { - span.LogFields(otlog.Error(err)) - return err - } - - if resp.StatusCode != http.StatusOK { - span.LogFields(otlog.Int("statusCode", resp.StatusCode)) - if resp.StatusCode == http.StatusRequestEntityTooLarge { - return errShardCountTooLow - } - var body []byte - if resp.Body != nil { - body, _ = io.ReadAll(resp.Body) - } - return fmt.Errorf("received unexpected response from upstream: status %d, body: %s", resp.StatusCode, string(body)) - } - - resps[i] = resp - - span.LogFields(otlog.Uint64("seriesCount", partialStats.LoadFetchedSeries())) - queryStats.Merge(partialStats) - - return nil - }) - } - - err := g.Wait() - if err != nil { - // If there was an error, we need to read and close all response bodies. - for _, resp := range resps { - if resp != nil { - _, _ = io.ReadAll(resp.Body) - _ = resp.Body.Close() - } - } - } - - return resps, err -} - -func shardedSelector(shardCount, currentShard int, expr parser.Expr) (parser.Expr, error) { - originalSelector, ok := expr.(*parser.VectorSelector) - if !ok { - return nil, errors.New("invalid selector") + resp, err = s.shardBySeriesSelector(ctx, spanLog, r, s.mergeResponsesWithZeroAllocationDecoder) + } else { + resp, err = s.shardBySeriesSelector(ctx, spanLog, r, s.mergeResponses) } - shardMatcher, err := labels.NewMatcher( - labels.MatchEqual, sharding.ShardLabel, - sharding.ShardSelector{ShardIndex: uint64(currentShard), ShardCount: uint64(shardCount)}.LabelValue(), - ) if err != nil { return nil, err } - - return &parser.VectorSelector{ - Name: originalSelector.Name, - LabelMatchers: append([]*labels.Matcher{shardMatcher}, originalSelector.LabelMatchers...), - }, nil + return resp, nil } func (s *shardActiveSeriesMiddleware) mergeResponses(ctx context.Context, responses []*http.Response, encoding string) *http.Response { diff --git a/pkg/frontend/querymiddleware/shard_by_series_base.go b/pkg/frontend/querymiddleware/shard_by_series_base.go new file mode 100644 index 00000000000..cea12230e91 --- /dev/null +++ b/pkg/frontend/querymiddleware/shard_by_series_base.go @@ -0,0 +1,223 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package querymiddleware + +import ( + "context" + "fmt" + "io" + "net/http" + "net/url" + "strconv" + + "github.com/go-kit/log" + "github.com/grafana/dskit/tenant" + "github.com/grafana/dskit/user" + "github.com/opentracing/opentracing-go" + otlog "github.com/opentracing/opentracing-go/log" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql/parser" + "golang.org/x/sync/errgroup" + + apierror "github.com/grafana/mimir/pkg/api/error" + "github.com/grafana/mimir/pkg/querier/stats" + "github.com/grafana/mimir/pkg/storage/sharding" + "github.com/grafana/mimir/pkg/util" + "github.com/grafana/mimir/pkg/util/spanlogger" +) + +type shardBySeriesBase struct { + upstream http.RoundTripper + limits Limits + logger log.Logger +} + +func (s *shardBySeriesBase) shardBySeriesSelector(ctx context.Context, spanLog *spanlogger.SpanLogger, r *http.Request, mergeFn func(ctx context.Context, reponses []*http.Response, encoding string) *http.Response) (*http.Response, error) { + tenantID, err := tenant.TenantID(ctx) + if err != nil { + return nil, apierror.New(apierror.TypeBadData, err.Error()) + } + + defaultShardCount := s.limits.QueryShardingTotalShards(tenantID) + shardCount := setShardCountFromHeader(defaultShardCount, r, spanLog) + + if shardCount < 2 { + spanLog.DebugLog("msg", "query sharding disabled for request") + return s.upstream.RoundTrip(r) + } + + if maxShards := s.limits.QueryShardingMaxShardedQueries(tenantID); shardCount > maxShards { + return nil, apierror.New( + apierror.TypeBadData, + fmt.Sprintf("shard count %d exceeds allowed maximum (%d)", shardCount, maxShards), + ) + } + + selector, err := parseSelector(r) + if err != nil { + return nil, apierror.New(apierror.TypeBadData, err.Error()) + } + + spanLog.DebugLog( + "msg", "sharding active series query", + "shardCount", shardCount, "selector", selector.String(), + ) + + reqs, err := buildShardedRequests(ctx, r, shardCount, selector) + if err != nil { + return nil, apierror.New(apierror.TypeInternal, err.Error()) + } + + resp, err := doShardedRequests(ctx, reqs, s.upstream) + if err != nil { + if errors.Is(err, errShardCountTooLow) { + return nil, apierror.New(apierror.TypeBadData, fmt.Errorf("%w: try increasing the requested shard count", err).Error()) + } + return nil, apierror.New(apierror.TypeInternal, err.Error()) + } + acceptEncoding := r.Header.Get("Accept-Encoding") + + return mergeFn(ctx, resp, acceptEncoding), nil +} + +func setShardCountFromHeader(origShardCount int, r *http.Request, spanLog *spanlogger.SpanLogger) int { + for _, value := range r.Header.Values(totalShardsControlHeader) { + shards, err := strconv.ParseInt(value, 10, 32) + if err != nil { + continue + } + if shards >= 0 { + spanLog.DebugLog("msg", fmt.Sprintf("using shard count from header %s: %d", totalShardsControlHeader, shards)) + return int(shards) + } + } + return origShardCount +} + +func parseSelector(req *http.Request) (*parser.VectorSelector, error) { + values, err := util.ParseRequestFormWithoutConsumingBody(req) + if err != nil { + return nil, err + } + valSelector := values.Get("selector") + if valSelector == "" { + return nil, errors.New("selector parameter is required") + } + parsed, err := parser.ParseExpr(valSelector) + if err != nil { + return nil, fmt.Errorf("invalid selector: %w", err) + } + selector, ok := parsed.(*parser.VectorSelector) + if !ok { + return nil, fmt.Errorf("invalid selector: %w", err) + } + + return selector, nil +} + +func buildShardedRequests(ctx context.Context, req *http.Request, numRequests int, selector parser.Expr) ([]*http.Request, error) { + reqs := make([]*http.Request, numRequests) + for i := 0; i < numRequests; i++ { + r, err := http.NewRequestWithContext(ctx, http.MethodGet, req.URL.Path, http.NoBody) + if err != nil { + return nil, err + } + + sharded, err := shardedSelector(numRequests, i, selector) + if err != nil { + return nil, err + } + + vals := url.Values{} + vals.Set("selector", sharded.String()) + r.URL.RawQuery = vals.Encode() + // This is the field read by httpgrpc.FromHTTPRequest, so we need to populate it + // here to ensure the request parameter makes it to the querier. + r.RequestURI = r.URL.String() + + if err := user.InjectOrgIDIntoHTTPRequest(ctx, r); err != nil { + return nil, err + } + + reqs[i] = r + } + + return reqs, nil +} + +func doShardedRequests(ctx context.Context, upstreamRequests []*http.Request, next http.RoundTripper) ([]*http.Response, error) { + resps := make([]*http.Response, len(upstreamRequests)) + + g, ctx := errgroup.WithContext(ctx) + queryStats := stats.FromContext(ctx) + for i, req := range upstreamRequests { + i, r := i, req + g.Go(func() error { + partialStats, childCtx := stats.ContextWithEmptyStats(ctx) + partialStats.AddShardedQueries(1) + + var span opentracing.Span + span, childCtx = opentracing.StartSpanFromContext(childCtx, "shardBySeries.doShardedRequest") + defer span.Finish() + + resp, err := next.RoundTrip(r.WithContext(childCtx)) + if err != nil { + span.LogFields(otlog.Error(err)) + return err + } + + if resp.StatusCode != http.StatusOK { + span.LogFields(otlog.Int("statusCode", resp.StatusCode)) + if resp.StatusCode == http.StatusRequestEntityTooLarge { + return errShardCountTooLow + } + var body []byte + if resp.Body != nil { + body, _ = io.ReadAll(resp.Body) + } + return fmt.Errorf("received unexpected response from upstream: status %d, body: %s", resp.StatusCode, string(body)) + } + + resps[i] = resp + + span.LogFields(otlog.Uint64("seriesCount", partialStats.LoadFetchedSeries())) + queryStats.Merge(partialStats) + + return nil + }) + } + + err := g.Wait() + if err != nil { + // If there was an error, we need to read and close all response bodies. + for _, resp := range resps { + if resp != nil { + _, _ = io.ReadAll(resp.Body) + _ = resp.Body.Close() + } + } + } + + return resps, err +} + +func shardedSelector(shardCount, currentShard int, expr parser.Expr) (parser.Expr, error) { + originalSelector, ok := expr.(*parser.VectorSelector) + if !ok { + return nil, errors.New("invalid selector") + } + + shardMatcher, err := labels.NewMatcher( + labels.MatchEqual, sharding.ShardLabel, + sharding.ShardSelector{ShardIndex: uint64(currentShard), ShardCount: uint64(shardCount)}.LabelValue(), + ) + if err != nil { + return nil, err + } + + return &parser.VectorSelector{ + Name: originalSelector.Name, + LabelMatchers: append([]*labels.Matcher{shardMatcher}, originalSelector.LabelMatchers...), + }, nil +} diff --git a/pkg/frontend/v2/frontend_scheduler_adapter.go b/pkg/frontend/v2/frontend_scheduler_adapter.go index cc0fab3b56d..2556ea35fa1 100644 --- a/pkg/frontend/v2/frontend_scheduler_adapter.go +++ b/pkg/frontend/v2/frontend_scheduler_adapter.go @@ -85,7 +85,7 @@ func (a *frontendToSchedulerAdapter) extractAdditionalQueueDimensions( return a.queryComponentQueueDimensionFromTimeParams( tenantIDs, decodedRequest.GetStart(), decodedRequest.GetEnd(), now, ), nil - case querymiddleware.IsCardinalityQuery(httpRequest.URL.Path), querymiddleware.IsActiveSeriesQuery(httpRequest.URL.Path): + case querymiddleware.IsCardinalityQuery(httpRequest.URL.Path), querymiddleware.IsActiveSeriesQuery(httpRequest.URL.Path), querymiddleware.IsActiveNativeHistogramMetricsQuery(httpRequest.URL.Path): // cardinality only hits ingesters return []string{ShouldQueryIngestersQueueDimension}, nil default: diff --git a/pkg/querier/cardinality_analysis_handler.go b/pkg/querier/cardinality_analysis_handler.go index 72e8bbccd19..9a6d57081bd 100644 --- a/pkg/querier/cardinality_analysis_handler.go +++ b/pkg/querier/cardinality_analysis_handler.go @@ -132,6 +132,55 @@ func ActiveSeriesCardinalityHandler(d Distributor, limits *validation.Overrides) }) } +func ActiveNativeHistogramMetricsHandler(d Distributor, limits *validation.Overrides) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + // Guarantee request's context is for a single tenant id + tenantID, err := tenant.TenantID(ctx) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + if !limits.CardinalityAnalysisEnabled(tenantID) { + http.Error(w, fmt.Sprintf("cardinality analysis is disabled for the tenant: %v", tenantID), http.StatusBadRequest) + return + } + + req, err := cardinality.DecodeActiveSeriesRequest(r) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + res, err := d.ActiveNativeHistogramMetrics(ctx, req.Matchers) + if err != nil { + if errors.Is(err, distributor.ErrResponseTooLarge) { + // http.StatusRequestEntityTooLarge (413) is about the request (not the response) + // body size, but it's the closest we have, and we're using the same status code + // in the query scheduler to express the same error condition. + http.Error(w, fmt.Errorf("%w: try increasing the requested shard count", err).Error(), http.StatusRequestEntityTooLarge) + return + } + respondFromError(err, w) + return + } + + var json = jsoniter.ConfigCompatibleWithStandardLibrary + bytes, err := json.Marshal(res) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } + + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Content-Length", strconv.Itoa(len(bytes))) + w.Header().Set(worker.ResponseStreamingEnabledHeader, "true") + + // Nothing we can do about this error, so ignore it. + _, _ = w.Write(bytes) + }) +} + func respondFromError(err error, w http.ResponseWriter) { httpResp, ok := httpgrpc.HTTPResponseFromError(err) if !ok { diff --git a/pkg/querier/cardinality_analysis_handler_test.go b/pkg/querier/cardinality_analysis_handler_test.go index 673b0fd919f..b18e9c0de07 100644 --- a/pkg/querier/cardinality_analysis_handler_test.go +++ b/pkg/querier/cardinality_analysis_handler_test.go @@ -902,6 +902,142 @@ func BenchmarkActiveSeriesHandler_ServeHTTP(b *testing.B) { } } +func TestActiveNativeHistogramMetricsCardinalityHandler(t *testing.T) { + tests := []struct { + name string + requestParams map[string][]string + expectMatcherSetSize int + returnedError error + expectStatusCode int + }{ + { + name: "should error on missing selector param", + expectStatusCode: http.StatusBadRequest, + }, + { + name: "should error on invalid selector", + requestParams: map[string][]string{"selector": {"-not-valid-"}}, + expectStatusCode: http.StatusBadRequest, + }, + { + name: "should error on multiple selectors", + requestParams: map[string][]string{"selector": {"a", "b"}}, + expectStatusCode: http.StatusBadRequest, + }, + { + name: "valid selector", + requestParams: map[string][]string{"selector": {`{job="prometheus"}`}}, + expectStatusCode: http.StatusOK, + }, + { + name: "upstream error: response too large", + requestParams: map[string][]string{"selector": {`{job="prometheus"}`}}, + returnedError: pkg_distributor.ErrResponseTooLarge, + expectStatusCode: http.StatusRequestEntityTooLarge, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + d := &mockDistributor{} + + dResp := cardinality.ActiveNativeHistogramMetricsResponse{ + Data: []cardinality.ActiveMetricWithBucketCount{ + { + Metric: "request_duration_seconds", + SeriesCount: 10, + BucketCount: 200, + AvgBucketCount: 20.0, + MinBucketCount: 1, + MaxBucketCount: 40, + }, + { + Metric: "store_duration_seconds", + SeriesCount: 10, + BucketCount: 100, + AvgBucketCount: 10.0, + MinBucketCount: 10, + MaxBucketCount: 10, + }, + }, + } + d.On("ActiveNativeHistogramMetrics", mock.Anything, mock.Anything).Return(&dResp, test.returnedError) + + handler := createEnabledHandler(t, ActiveNativeHistogramMetricsHandler, d) + ctx := user.InjectOrgID(context.Background(), "test") + + data := url.Values{} + for key, values := range test.requestParams { + for _, value := range values { + data.Add(key, value) + } + } + request, err := http.NewRequestWithContext(ctx, "POST", "/active_native_histogram_metrics", strings.NewReader(data.Encode())) + require.NoError(t, err) + request.Header.Add("Content-Type", "application/x-www-form-urlencoded") + + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, request) + + assert.Equal(t, test.expectStatusCode, recorder.Result().StatusCode) + + if test.expectStatusCode != http.StatusOK { + return + } + + require.Equal(t, http.StatusOK, recorder.Result().StatusCode) + + body := recorder.Result().Body + defer func(body io.ReadCloser) { + err := body.Close() + require.NoError(t, err) + }(body) + bodyContent, err := io.ReadAll(body) + require.NoError(t, err) + + resp := cardinality.ActiveNativeHistogramMetricsResponse{} + err = json.Unmarshal(bodyContent, &resp) + require.NoError(t, err) + assert.Len(t, resp.Data, len(dResp.Data)) + }) + } +} + +func BenchmarkActiveNativeHistogramMetricsHandler_ServeHTTP(b *testing.B) { + const numResponseMetrics = 1000 + + d := &mockDistributor{} + + dResp := cardinality.ActiveNativeHistogramMetricsResponse{} + for i := 0; i < numResponseMetrics; i++ { + dResp.Data = append(dResp.Data, cardinality.ActiveMetricWithBucketCount{ + Metric: "request_duration_" + fmt.Sprint(i) + "_seconds", + SeriesCount: 10, + BucketCount: 200, + AvgBucketCount: 20.0, + MinBucketCount: 1, + MaxBucketCount: 40, + }) + } + d.On("ActiveNativeHistogramMetrics", mock.Anything, mock.Anything).Return(&dResp, nil) + + handler := createEnabledHandler(b, ActiveNativeHistogramMetricsHandler, d) + ctx := user.InjectOrgID(context.Background(), "test") + + for i := 0; i < b.N; i++ { + // Prepare a request. + r := httptest.NewRequest("POST", "/active_native_histogram_metrics", strings.NewReader("selector={job=\"prometheus\"}")) + r.Header.Add("Content-Type", "application/x-www-form-urlencoded") + + // Run the benchmark. + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, r.WithContext(ctx)) + + // Make sure we're not benchmarking error responses. + require.Equal(b, http.StatusOK, recorder.Result().StatusCode) + } +} + // createEnabledHandler creates a cardinalityHandler that can be either a LabelNamesCardinalityHandler or a LabelValuesCardinalityHandler func createEnabledHandler(t testing.TB, cardinalityHandler func(Distributor, *validation.Overrides) http.Handler, distributor *mockDistributor) http.Handler { limits := validation.Limits{CardinalityAnalysisEnabled: true} diff --git a/pkg/querier/distributor_queryable.go b/pkg/querier/distributor_queryable.go index 57168835a3e..61151b1bc04 100644 --- a/pkg/querier/distributor_queryable.go +++ b/pkg/querier/distributor_queryable.go @@ -40,6 +40,7 @@ type Distributor interface { LabelNamesAndValues(ctx context.Context, matchers []*labels.Matcher, countMethod cardinality.CountMethod) (*client.LabelNamesAndValuesResponse, error) LabelValuesCardinality(ctx context.Context, labelNames []model.LabelName, matchers []*labels.Matcher, countMethod cardinality.CountMethod) (uint64, *client.LabelValuesCardinalityResponse, error) ActiveSeries(ctx context.Context, matchers []*labels.Matcher) ([]labels.Labels, error) + ActiveNativeHistogramMetrics(ctx context.Context, matchers []*labels.Matcher) (*cardinality.ActiveNativeHistogramMetricsResponse, error) } func NewDistributorQueryable(distributor Distributor, cfgProvider distributorQueryableConfigProvider, queryMetrics *stats.QueryMetrics, logger log.Logger) storage.Queryable { diff --git a/pkg/querier/distributor_queryable_test.go b/pkg/querier/distributor_queryable_test.go index e20d74cd2e8..1d2e323a7c7 100644 --- a/pkg/querier/distributor_queryable_test.go +++ b/pkg/querier/distributor_queryable_test.go @@ -711,6 +711,11 @@ func (m *mockDistributor) ActiveSeries(ctx context.Context, matchers []*labels.M return args.Get(0).([]labels.Labels), args.Error(1) } +func (m *mockDistributor) ActiveNativeHistogramMetrics(ctx context.Context, matchers []*labels.Matcher) (*cardinality.ActiveNativeHistogramMetricsResponse, error) { + args := m.Called(ctx, matchers) + return args.Get(0).(*cardinality.ActiveNativeHistogramMetricsResponse), args.Error(1) +} + type mockConfigProvider struct { queryIngestersWithin time.Duration seenUserIDs []string diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 49de21b4905..400d6b452de 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -1164,6 +1164,10 @@ func (m *errDistributor) ActiveSeries(context.Context, []*labels.Matcher) ([]lab return nil, errDistributorError } +func (m *errDistributor) ActiveNativeHistogramMetrics(context.Context, []*labels.Matcher) (*cardinality.ActiveNativeHistogramMetricsResponse, error) { + return nil, errDistributorError +} + type emptyDistributor struct{} func (d *emptyDistributor) LabelNamesAndValues(_ context.Context, _ []*labels.Matcher, _ cardinality.CountMethod) (*client.LabelNamesAndValuesResponse, error) { @@ -1202,6 +1206,10 @@ func (d *emptyDistributor) ActiveSeries(context.Context, []*labels.Matcher) ([]l return nil, nil } +func (m *emptyDistributor) ActiveNativeHistogramMetrics(context.Context, []*labels.Matcher) (*cardinality.ActiveNativeHistogramMetricsResponse, error) { + return &cardinality.ActiveNativeHistogramMetricsResponse{}, nil +} + func TestQuerier_QueryStoreAfterConfig(t *testing.T) { testCases := []struct { name string