Skip to content

Commit

Permalink
query-frontend: add /api/v1/cardinality/active_native_histogram_metrics
Browse files Browse the repository at this point in the history
The new endpoint gives a list of metric names of active native
histogram series with associated statistics about active bucket counts.

Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
  • Loading branch information
krajorama committed May 3, 2024
1 parent de4b334 commit 815d55d
Show file tree
Hide file tree
Showing 18 changed files with 1,147 additions and 266 deletions.
55 changes: 55 additions & 0 deletions integration/e2emimir/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
yaml "gopkg.in/yaml.v3"

"github.com/grafana/mimir/pkg/alertmanager"
"github.com/grafana/mimir/pkg/cardinality"
"github.com/grafana/mimir/pkg/distributor"
"github.com/grafana/mimir/pkg/frontend/querymiddleware"
"github.com/grafana/mimir/pkg/mimirpb"
Expand Down Expand Up @@ -591,6 +592,60 @@ func (c *Client) ActiveSeries(selector string, options ...ActiveSeriesOption) (*
return res, nil
}

func (c *Client) ActiveNativeHistogramMetrics(selector string, options ...ActiveSeriesOption) (*cardinality.ActiveNativeHistogramMetricsResponse, error) {
cfg := activeSeriesRequestConfig{method: http.MethodGet, header: http.Header{"X-Scope-OrgID": []string{c.orgID}}}
for _, option := range options {
option(&cfg)
}

req, err := http.NewRequest(cfg.method, fmt.Sprintf("http://%s/prometheus/api/v1/cardinality/active_native_histogram_metrics", c.querierAddress), nil)
if err != nil {
return nil, err
}
req.Header = cfg.header

q := req.URL.Query()
q.Set("selector", selector)
switch cfg.method {
case http.MethodGet:
req.URL.RawQuery = q.Encode()
case http.MethodPost:
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
req.Body = io.NopCloser(strings.NewReader(q.Encode()))
default:
return nil, fmt.Errorf("invalid method %s", cfg.method)
}

ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

resp, err := c.httpClient.Do(req.WithContext(ctx))
if err != nil {
return nil, err
}
defer func(body io.ReadCloser) {
_, _ = io.ReadAll(body)
_ = body.Close()
}(resp.Body)

var bodyReader io.Reader = resp.Body
if resp.Header.Get("Content-Encoding") == "x-snappy-framed" {
bodyReader = s2.NewReader(bodyReader)
}

if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(bodyReader)
return nil, fmt.Errorf("unexpected status code %d, body: %s", resp.StatusCode, body)
}

res := &cardinality.ActiveNativeHistogramMetricsResponse{}
err = json.NewDecoder(bodyReader).Decode(res)
if err != nil {
return nil, fmt.Errorf("error decoding active native histograms response: %w", err)
}
return res, nil
}

// GetPrometheusMetadata fetches the metadata from the Prometheus endpoint /api/v1/metadata.
func (c *Client) GetPrometheusMetadata() (*http.Response, error) {
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://%s/prometheus/api/v1/metadata", c.querierAddress), nil)
Expand Down
111 changes: 84 additions & 27 deletions integration/query_frontend_active_series_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/grafana/mimir/integration/e2emimir"
"github.com/grafana/mimir/pkg/cardinality"
)

func TestActiveSeriesWithQuerySharding(t *testing.T) {
Expand Down Expand Up @@ -50,16 +51,89 @@ func TestActiveSeriesWithQuerySharding(t *testing.T) {
testName := fmt.Sprintf("query scheduler=%v/query sharding=%v",
tc.querySchedulerEnabled, tc.shardingEnabled,
)
t.Run(testName, func(t *testing.T) {
t.Run("active series/"+testName, func(t *testing.T) {
runTestActiveSeriesWithQueryShardingHTTPTest(t, config)
})

t.Run("active native histograms/"+testName, func(t *testing.T) {
runTestActiveNativeHistogramMetricsWithQueryShardingHTTPTest(t, config)
})
}
}

const (
metricName = "test_metric"
)

func runTestActiveSeriesWithQueryShardingHTTPTest(t *testing.T, cfg queryFrontendTestConfig) {
numSeries := 100
s, c := setupComponentsForActiveSeriesTest(t, cfg, numSeries)
defer s.Close()

// Query active series.
for _, options := range [][]e2emimir.ActiveSeriesOption{
{e2emimir.WithRequestMethod(http.MethodGet)},
{e2emimir.WithRequestMethod(http.MethodPost)},
{e2emimir.WithRequestMethod(http.MethodGet), e2emimir.WithEnableCompression()},
{e2emimir.WithRequestMethod(http.MethodPost), e2emimir.WithEnableCompression()},
{e2emimir.WithQueryShards(1)},
{e2emimir.WithQueryShards(12)},
} {
response, err := c.ActiveSeries(metricName, options...)
require.NoError(t, err)
require.Len(t, response.Data, numSeries)
}

var err error
_, err = c.ActiveSeries(metricName, e2emimir.WithQueryShards(512))
if cfg.shardActiveSeriesQueries {
require.Error(t, err)
require.Contains(t, err.Error(), "shard count 512 exceeds allowed maximum (128)")
} else {
require.NoError(t, err)
}
}

func runTestActiveNativeHistogramMetricsWithQueryShardingHTTPTest(t *testing.T, cfg queryFrontendTestConfig) {
numSeries := 100
s, c := setupComponentsForActiveSeriesTest(t, cfg, numSeries)
defer s.Close()

// Query active series.
for _, options := range [][]e2emimir.ActiveSeriesOption{
{e2emimir.WithRequestMethod(http.MethodGet)},
{e2emimir.WithRequestMethod(http.MethodPost)},
{e2emimir.WithRequestMethod(http.MethodGet), e2emimir.WithEnableCompression()},
{e2emimir.WithRequestMethod(http.MethodPost), e2emimir.WithEnableCompression()},
{e2emimir.WithQueryShards(1)},
{e2emimir.WithQueryShards(12)},
} {
response, err := c.ActiveNativeHistogramMetrics(metricName, options...)
require.NoError(t, err)
require.Len(t, response.Data, 1)
require.Equal(t, response.Data[0], cardinality.ActiveMetricWithBucketCount{
Metric: metricName,
SeriesCount: uint64(numSeries / 2), // Only half of the series are native histograms.
BucketCount: uint64(numSeries / 2 * 8), // We add up the bucket count of all histograms.
AvgBucketCount: 8.0,
MinBucketCount: 8,
MaxBucketCount: 8,
})
}

var err error
_, err = c.ActiveNativeHistogramMetrics(metricName, e2emimir.WithQueryShards(512))
if cfg.shardActiveSeriesQueries {
require.Error(t, err)
require.Contains(t, err.Error(), "shard count 512 exceeds allowed maximum (128)")
} else {
require.NoError(t, err)
}
}

func setupComponentsForActiveSeriesTest(t *testing.T, cfg queryFrontendTestConfig, numSeries int) (*e2e.Scenario, *e2emimir.Client) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

memcached := e2ecache.NewMemcached()
consul := e2edb.NewConsul()
Expand Down Expand Up @@ -130,38 +204,21 @@ func runTestActiveSeriesWithQueryShardingHTTPTest(t *testing.T, cfg queryFronten
// Push series for the test user to Mimir.
c, err := e2emimir.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "", userID)
require.NoError(t, err)
numSeries := 100
metricName := "test_metric"
now := time.Now()
series := make([]prompb.TimeSeries, numSeries)
for i := 0; i < numSeries; i++ {
ts, _, _ := e2e.GenerateSeries(metricName, now, prompb.Label{Name: "index", Value: strconv.Itoa(i)})
var ts []prompb.TimeSeries
if i%2 == 0 {
// Let half of the series be float samples and the other half native histograms.
ts, _, _ = e2e.GenerateSeries(metricName, now, prompb.Label{Name: "index", Value: strconv.Itoa(i)})
} else {
ts, _, _ = generateHistogramSeries(metricName, now, prompb.Label{Name: "index", Value: strconv.Itoa(i)})
}
series[i] = ts[0]
}
res, err := c.Push(series)
require.NoError(t, err)
require.Equal(t, http.StatusOK, res.StatusCode)

// Query active series.
for _, options := range [][]e2emimir.ActiveSeriesOption{
{e2emimir.WithRequestMethod(http.MethodGet)},
{e2emimir.WithRequestMethod(http.MethodPost)},
{e2emimir.WithRequestMethod(http.MethodGet), e2emimir.WithEnableCompression()},
{e2emimir.WithRequestMethod(http.MethodPost), e2emimir.WithEnableCompression()},
{e2emimir.WithQueryShards(1)},
{e2emimir.WithQueryShards(12)},
} {
response, err := c.ActiveSeries(metricName, options...)
require.NoError(t, err)
require.Equal(t, http.StatusOK, res.StatusCode)
require.Len(t, response.Data, numSeries)
}

_, err = c.ActiveSeries(metricName, e2emimir.WithQueryShards(512))
if cfg.shardActiveSeriesQueries {
require.Error(t, err)
require.Contains(t, err.Error(), "shard count 512 exceeds allowed maximum (128)")
} else {
require.NoError(t, err)
}
return s, c
}
1 change: 1 addition & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ func (a *API) RegisterQueryAPI(handler http.Handler, buildInfoHandler http.Handl
a.RegisterRoute(path.Join(a.cfg.PrometheusHTTPPrefix, "/api/v1/cardinality/label_names"), handler, true, true, "GET", "POST")
a.RegisterRoute(path.Join(a.cfg.PrometheusHTTPPrefix, "/api/v1/cardinality/label_values"), handler, true, true, "GET", "POST")
a.RegisterRoute(path.Join(a.cfg.PrometheusHTTPPrefix, "/api/v1/cardinality/active_series"), handler, true, true, "GET", "POST")
a.RegisterRoute(path.Join(a.cfg.PrometheusHTTPPrefix, "/api/v1/cardinality/active_native_histogram_metrics"), handler, true, true, "GET", "POST")
a.RegisterRoute(path.Join(a.cfg.PrometheusHTTPPrefix, "/api/v1/format_query"), handler, true, true, "GET", "POST")
}

Expand Down
1 change: 1 addition & 0 deletions pkg/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ func NewQuerierHandler(
router.Path(path.Join(prefix, "/api/v1/cardinality/label_names")).Methods("GET", "POST").Handler(cardinalityQueryStats.Wrap(querier.LabelNamesCardinalityHandler(distributor, limits)))
router.Path(path.Join(prefix, "/api/v1/cardinality/label_values")).Methods("GET", "POST").Handler(cardinalityQueryStats.Wrap(querier.LabelValuesCardinalityHandler(distributor, limits)))
router.Path(path.Join(prefix, "/api/v1/cardinality/active_series")).Methods("GET", "POST").Handler(cardinalityQueryStats.Wrap(querier.ActiveSeriesCardinalityHandler(distributor, limits)))
router.Path(path.Join(prefix, "/api/v1/cardinality/active_native_histogram_metrics")).Methods("GET", "POST").Handler(cardinalityQueryStats.Wrap(querier.ActiveNativeHistogramMetricsHandler(distributor, limits)))
router.Path(path.Join(prefix, "/api/v1/format_query")).Methods("GET", "POST").Handler(formattingQueryStats.Wrap(promRouter))

// Track execution time.
Expand Down
13 changes: 13 additions & 0 deletions pkg/cardinality/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,19 @@ type ActiveSeriesRequest struct {
Matchers []*labels.Matcher
}

type ActiveMetricWithBucketCount struct {
Metric string `json:"metric"`
SeriesCount uint64 `json:"series_count"`
BucketCount uint64 `json:"bucket_count"`
AvgBucketCount float64 `json:"avg_bucket_count"`
MinBucketCount uint64 `json:"min_bucket_count"`
MaxBucketCount uint64 `json:"max_bucket_count"`
}

type ActiveNativeHistogramMetricsResponse struct {
Data []ActiveMetricWithBucketCount `json:"data"`
}

// DecodeActiveSeriesRequest decodes the input http.Request into an ActiveSeriesRequest.
func DecodeActiveSeriesRequest(r *http.Request) (*ActiveSeriesRequest, error) {
if err := r.ParseForm(); err != nil {
Expand Down
Loading

0 comments on commit 815d55d

Please sign in to comment.