From 3d6dd071899861f570d1fb9bed985637b6e99721 Mon Sep 17 00:00:00 2001 From: utukj Date: Tue, 18 Oct 2022 16:57:11 +0100 Subject: [PATCH] Revert "Query: add query metrics to calls going through the Store API (#5741)" This reverts commit eec4fd0053b841ee5fe5284d0fe25e6cbe14a738. Signed-off-by: utukj --- CHANGELOG.md | 1 - cmd/thanos/query.go | 19 +---- docs/components/query.md | 19 ----- pkg/api/query/grpc.go | 2 - pkg/api/query/v1.go | 99 +++------------------- pkg/api/query/v1_test.go | 6 +- pkg/query/querier.go | 79 +++-------------- pkg/query/querier_test.go | 37 ++------ pkg/query/query_bench_test.go | 13 ++- pkg/query/query_test.go | 11 +-- pkg/store/telemetry.go | 88 ------------------- test/e2e/query_test.go | 155 +++------------------------------- 12 files changed, 51 insertions(+), 478 deletions(-) delete mode 100644 pkg/store/telemetry.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 6e2e854ce2..6e1d2143c3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,7 +29,6 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#5734](https://github.com/thanos-io/thanos/pull/5734) Store: Support disable block viewer UI. - [#5411](https://github.com/thanos-io/thanos/pull/5411) Tracing: Add OpenTelemetry Protocol exporter. - [#5779](https://github.com/thanos-io/thanos/pull/5779) Objstore: Support specifying S3 storage class. -- [#5741](https://github.com/thanos-io/thanos/pull/5741) Query: add metrics on how much data is being selected by downstream Store APIs. - [#5673](https://github.com/thanos-io/thanos/pull/5673) Receive: Reload tenant limit configuration on file change. ### Changed diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 5e5a7fc7cd..54724f59a6 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -25,8 +25,6 @@ import ( "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" - "google.golang.org/grpc" - v1 "github.com/prometheus/prometheus/web/api/v1" "github.com/thanos-community/promql-engine/engine" apiv1 "github.com/thanos-io/thanos/pkg/api/query" @@ -56,6 +54,7 @@ import ( "github.com/thanos-io/thanos/pkg/targets" "github.com/thanos-io/thanos/pkg/tls" "github.com/thanos-io/thanos/pkg/ui" + "google.golang.org/grpc" ) const ( @@ -206,10 +205,6 @@ func registerQuery(app *extkingpin.App) { alertQueryURL := cmd.Flag("alert.query-url", "The external Thanos Query URL that would be set in all alerts 'Source' field.").String() grpcProxyStrategy := cmd.Flag("grpc.proxy-strategy", "Strategy to use when proxying Series requests to leaf nodes. Hidden and only used for testing, will be removed after lazy becomes the default.").Default(string(store.EagerRetrieval)).Hidden().Enum(string(store.EagerRetrieval), string(store.LazyRetrieval)) - queryTelemetryDurationQuantiles := cmd.Flag("query.telemetry.request-duration-seconds-quantiles", "The quantiles for exporting metrics about the request duration quantiles.").Default("0.1", "0.25", "0.75", "1.25", "1.75", "2.5", "3", "5", "10").Float64List() - queryTelemetrySamplesQuantiles := cmd.Flag("query.telemetry.request-samples-quantiles", "The quantiles for exporting metrics about the samples count quantiles.").Default("100", "1000", "10000", "100000", "1000000").Int64List() - queryTelemetrySeriesQuantiles := cmd.Flag("query.telemetry.request-series-seconds-quantiles", "The quantiles for exporting metrics about the series count quantiles.").Default("10", "100", "1000", "10000", "100000").Int64List() - cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error { selectorLset, err := parseFlagLabels(*selectorLabels) if err != nil { @@ -322,9 +317,6 @@ func registerQuery(app *extkingpin.App) { *alertQueryURL, *grpcProxyStrategy, component.Query, - *queryTelemetryDurationQuantiles, - *queryTelemetrySamplesQuantiles, - *queryTelemetrySeriesQuantiles, promqlEngineType(*promqlEngine), ) }) @@ -398,9 +390,6 @@ func runQuery( alertQueryURL string, grpcProxyStrategy string, comp component.Component, - queryTelemetryDurationQuantiles []float64, - queryTelemetrySamplesQuantiles []int64, - queryTelemetrySeriesQuantiles []int64, promqlEngine promqlEngineType, ) error { if alertQueryURL == "" { @@ -705,12 +694,6 @@ func runQuery( extprom.WrapRegistererWithPrefix("thanos_query_concurrent_", reg), maxConcurrentQueries, ), - store.NewSeriesStatsAggregator( - reg, - queryTelemetryDurationQuantiles, - queryTelemetrySamplesQuantiles, - queryTelemetrySeriesQuantiles, - ), reg, ) diff --git a/docs/components/query.md b/docs/components/query.md index c3690ca05a..1a028ee3ed 100644 --- a/docs/components/query.md +++ b/docs/components/query.md @@ -381,15 +381,6 @@ Flags: be able to query without deduplication using 'dedup=false' parameter. Data includes time series, recording rules, and alerting rules. - --query.telemetry.request-duration-seconds-quantiles=0.1... ... - The quantiles for exporting metrics about the - request duration quantiles. - --query.telemetry.request-samples-quantiles=100... ... - The quantiles for exporting metrics about the - samples count quantiles. - --query.telemetry.request-series-seconds-quantiles=10... ... - The quantiles for exporting metrics about the - series count quantiles. --query.timeout=2m Maximum time to process query by query node. --request.logging-config= Alternative to 'request.logging-config-file' @@ -472,13 +463,3 @@ Flags: of Prometheus. ``` - -## Exported metrics - -Thanos Query also exports metrics about its own performance. You can find a list with these metrics below. - -**Disclaimer**: this list is incomplete. The remaining metrics will be added over time. - -| Name | Type | Labels | Description | -|-----------------------------------------|-----------|-----------------------|-------------------------------------------------------------------------------------------------------------------| -| thanos_store_api_query_duration_seconds | Histogram | samples_le, series_le | Duration of the Thanos Store API select phase for a query according to the amount of samples and series selected. | diff --git a/pkg/api/query/grpc.go b/pkg/api/query/grpc.go index 8848cd2ffe..144166f57b 100644 --- a/pkg/api/query/grpc.go +++ b/pkg/api/query/grpc.go @@ -94,7 +94,6 @@ func (g *GRPCAPI) Query(request *querypb.QueryRequest, server querypb.Query_Quer request.EnableQueryPushdown, false, request.ShardInfo, - query.NoopSeriesStatsReporter, ) qry, err := g.queryEngine.NewInstantQuery(queryable, &promql.QueryOpts{LookbackDelta: lookbackDelta}, request.Query, ts) if err != nil { @@ -169,7 +168,6 @@ func (g *GRPCAPI) QueryRange(request *querypb.QueryRangeRequest, srv querypb.Que request.EnableQueryPushdown, false, request.ShardInfo, - query.NoopSeriesStatsReporter, ) startTime := time.Unix(request.StartTimeSeconds, 0) diff --git a/pkg/api/query/v1.go b/pkg/api/query/v1.go index 918bcbf5fd..cbe1327a36 100644 --- a/pkg/api/query/v1.go +++ b/pkg/api/query/v1.go @@ -41,8 +41,10 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/util/stats" v1 "github.com/prometheus/prometheus/web/api/v1" + + "github.com/prometheus/prometheus/util/stats" + "github.com/thanos-io/thanos/pkg/api" "github.com/thanos-io/thanos/pkg/exemplars" "github.com/thanos-io/thanos/pkg/exemplars/exemplarspb" @@ -55,7 +57,6 @@ import ( "github.com/thanos-io/thanos/pkg/rules" "github.com/thanos-io/thanos/pkg/rules/rulespb" "github.com/thanos-io/thanos/pkg/runutil" - "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/targets" "github.com/thanos-io/thanos/pkg/targets/targetspb" @@ -106,13 +107,6 @@ type QueryAPI struct { defaultMetadataTimeRange time.Duration queryRangeHist prometheus.Histogram - - seriesStatsAggregator seriesQueryPerformanceMetricsAggregator -} - -type seriesQueryPerformanceMetricsAggregator interface { - Aggregate(seriesStats storepb.SeriesStatsCounter) - Observe(duration float64) } // NewQueryAPI returns an initialized QueryAPI type. @@ -140,12 +134,8 @@ func NewQueryAPI( defaultMetadataTimeRange time.Duration, disableCORS bool, gate gate.Gate, - statsAggregator seriesQueryPerformanceMetricsAggregator, reg *prometheus.Registry, ) *QueryAPI { - if statsAggregator == nil { - statsAggregator = &store.NoopSeriesStatsAggregator{} - } return &QueryAPI{ baseAPI: api.NewBaseAPI(logger, disableCORS, flagsMap), logger: logger, @@ -170,7 +160,6 @@ func NewQueryAPI( defaultInstantQueryMaxSourceResolution: defaultInstantQueryMaxSourceResolution, defaultMetadataTimeRange: defaultMetadataTimeRange, disableCORS: disableCORS, - seriesStatsAggregator: statsAggregator, queryRangeHist: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ Name: "thanos_query_range_requested_timespan_duration_seconds", @@ -407,24 +396,7 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro span, ctx := tracing.StartSpan(ctx, "promql_instant_query") defer span.Finish() - var seriesStats []storepb.SeriesStatsCounter - qry, err := qapi.queryEngine.NewInstantQuery( - qapi.queryableCreate( - enableDedup, - replicaLabels, - storeDebugMatchers, - maxSourceResolution, - enablePartialResponse, - qapi.enableQueryPushdown, - false, - shardInfo, - query.NewAggregateStatsReporter(&seriesStats), - ), - &promql.QueryOpts{LookbackDelta: lookbackDelta}, - r.FormValue("query"), - ts, - ) - + qry, err := qapi.queryEngine.NewInstantQuery(qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, qapi.enableQueryPushdown, false, shardInfo), &promql.QueryOpts{LookbackDelta: lookbackDelta}, r.FormValue("query"), ts) if err != nil { return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}, func() {} } @@ -437,7 +409,6 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro } defer qapi.gate.Done() - beforeRange := time.Now() res := qry.Exec(ctx) if res.Err != nil { switch res.Err.(type) { @@ -450,10 +421,6 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro } return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: res.Err}, qry.Close } - for i := range seriesStats { - qapi.seriesStatsAggregator.Aggregate(seriesStats[i]) - } - qapi.seriesStatsAggregator.Observe(time.Since(beforeRange).Seconds()) // Optional stats field in response if parameter "stats" is not empty. var qs stats.QueryStats @@ -558,19 +525,8 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap span, ctx := tracing.StartSpan(ctx, "promql_range_query") defer span.Finish() - var seriesStats []storepb.SeriesStatsCounter qry, err := qapi.queryEngine.NewRangeQuery( - qapi.queryableCreate( - enableDedup, - replicaLabels, - storeDebugMatchers, - maxSourceResolution, - enablePartialResponse, - qapi.enableQueryPushdown, - false, - shardInfo, - query.NewAggregateStatsReporter(&seriesStats), - ), + qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, qapi.enableQueryPushdown, false, shardInfo), &promql.QueryOpts{LookbackDelta: lookbackDelta}, r.FormValue("query"), start, @@ -589,7 +545,6 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap } defer qapi.gate.Done() - beforeRange := time.Now() res := qry.Exec(ctx) if res.Err != nil { switch res.Err.(type) { @@ -600,10 +555,6 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap } return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: res.Err}, qry.Close } - for i := range seriesStats { - qapi.seriesStatsAggregator.Aggregate(seriesStats[i]) - } - qapi.seriesStatsAggregator.Observe(time.Since(beforeRange).Seconds()) // Optional stats field in response if parameter "stats" is not empty. var qs stats.QueryStats @@ -649,17 +600,8 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A matcherSets = append(matcherSets, matchers) } - q, err := qapi.queryableCreate( - true, - nil, - storeDebugMatchers, - 0, - enablePartialResponse, - qapi.enableQueryPushdown, - true, - nil, - query.NoopSeriesStatsReporter, - ).Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end)) + q, err := qapi.queryableCreate(true, nil, storeDebugMatchers, 0, enablePartialResponse, qapi.enableQueryPushdown, true, nil). + Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end)) if err != nil { return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}, func() {} } @@ -745,18 +687,8 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr return nil, nil, apiErr, func() {} } - q, err := qapi.queryableCreate( - enableDedup, - replicaLabels, - storeDebugMatchers, - math.MaxInt64, - enablePartialResponse, - qapi.enableQueryPushdown, - true, - nil, - query.NoopSeriesStatsReporter, - ).Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end)) - + q, err := qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, math.MaxInt64, enablePartialResponse, qapi.enableQueryPushdown, true, nil). + Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end)) if err != nil { return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}, func() {} } @@ -805,17 +737,8 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap matcherSets = append(matcherSets, matchers) } - q, err := qapi.queryableCreate( - true, - nil, - storeDebugMatchers, - 0, - enablePartialResponse, - qapi.enableQueryPushdown, - true, - nil, - query.NoopSeriesStatsReporter, - ).Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end)) + q, err := qapi.queryableCreate(true, nil, storeDebugMatchers, 0, enablePartialResponse, qapi.enableQueryPushdown, true, nil). + Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end)) if err != nil { return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}, func() {} } diff --git a/pkg/api/query/v1_test.go b/pkg/api/query/v1_test.go index 07c562af9c..000410ddbd 100644 --- a/pkg/api/query/v1_test.go +++ b/pkg/api/query/v1_test.go @@ -44,8 +44,9 @@ import ( "github.com/prometheus/prometheus/tsdb/tsdbutil" promgate "github.com/prometheus/prometheus/util/gate" "github.com/prometheus/prometheus/util/stats" - baseAPI "github.com/thanos-io/thanos/pkg/api" "github.com/thanos-io/thanos/pkg/compact" + + baseAPI "github.com/thanos-io/thanos/pkg/api" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/gate" "github.com/thanos-io/thanos/pkg/query" @@ -197,7 +198,6 @@ func TestQueryEndpoints(t *testing.T) { queryRangeHist: promauto.With(prometheus.NewRegistry()).NewHistogram(prometheus.HistogramOpts{ Name: "query_range_hist", }), - seriesStatsAggregator: &store.NoopSeriesStatsAggregator{}, } start := time.Unix(0, 0) @@ -737,7 +737,6 @@ func TestMetadataEndpoints(t *testing.T) { queryRangeHist: promauto.With(prometheus.NewRegistry()).NewHistogram(prometheus.HistogramOpts{ Name: "query_range_hist", }), - seriesStatsAggregator: &store.NoopSeriesStatsAggregator{}, } apiWithLabelLookback := &QueryAPI{ baseAPI: &baseAPI.BaseAPI{ @@ -751,7 +750,6 @@ func TestMetadataEndpoints(t *testing.T) { queryRangeHist: promauto.With(prometheus.NewRegistry()).NewHistogram(prometheus.HistogramOpts{ Name: "query_range_hist", }), - seriesStatsAggregator: &store.NoopSeriesStatsAggregator{}, } var tests = []endpointTestCase{ diff --git a/pkg/query/querier.go b/pkg/query/querier.go index b094cbd45c..361834c07d 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -7,7 +7,6 @@ import ( "context" "sort" "strings" - "sync" "time" "github.com/go-kit/log" @@ -29,60 +28,21 @@ import ( "github.com/thanos-io/thanos/pkg/tracing" ) -type seriesStatsReporter func(seriesStats storepb.SeriesStatsCounter) - -var NoopSeriesStatsReporter seriesStatsReporter = func(_ storepb.SeriesStatsCounter) {} - -func NewAggregateStatsReporter(stats *[]storepb.SeriesStatsCounter) seriesStatsReporter { - var mutex sync.Mutex - return func(s storepb.SeriesStatsCounter) { - mutex.Lock() - defer mutex.Unlock() - *stats = append(*stats, s) - } -} - // QueryableCreator returns implementation of promql.Queryable that fetches data from the proxy store API endpoints. // If deduplication is enabled, all data retrieved from it will be deduplicated along all replicaLabels by default. // When the replicaLabels argument is not empty it overwrites the global replicaLabels flag. This allows specifying // replicaLabels at query time. // maxResolutionMillis controls downsampling resolution that is allowed (specified in milliseconds). // partialResponse controls `partialResponseDisabled` option of StoreAPI and partial response behavior of proxy. -type QueryableCreator func( - deduplicate bool, - replicaLabels []string, - storeDebugMatchers [][]*labels.Matcher, - maxResolutionMillis int64, - partialResponse, - enableQueryPushdown, - skipChunks bool, - shardInfo *storepb.ShardInfo, - seriesStatsReporter seriesStatsReporter, -) storage.Queryable +type QueryableCreator func(deduplicate bool, replicaLabels []string, storeDebugMatchers [][]*labels.Matcher, maxResolutionMillis int64, partialResponse, enableQueryPushdown, skipChunks bool, shardInfo *storepb.ShardInfo) storage.Queryable // NewQueryableCreator creates QueryableCreator. -func NewQueryableCreator( - logger log.Logger, - reg prometheus.Registerer, - proxy storepb.StoreServer, - maxConcurrentSelects int, - selectTimeout time.Duration, -) QueryableCreator { +func NewQueryableCreator(logger log.Logger, reg prometheus.Registerer, proxy storepb.StoreServer, maxConcurrentSelects int, selectTimeout time.Duration) QueryableCreator { duration := promauto.With( extprom.WrapRegistererWithPrefix("concurrent_selects_", reg), ).NewHistogram(gate.DurationHistogramOpts) - return func( - deduplicate bool, - replicaLabels []string, - storeDebugMatchers [][]*labels.Matcher, - maxResolutionMillis int64, - partialResponse, - enableQueryPushdown, - skipChunks bool, - shardInfo *storepb.ShardInfo, - seriesStatsReporter seriesStatsReporter, - ) storage.Queryable { + return func(deduplicate bool, replicaLabels []string, storeDebugMatchers [][]*labels.Matcher, maxResolutionMillis int64, partialResponse, enableQueryPushdown, skipChunks bool, shardInfo *storepb.ShardInfo) storage.Queryable { return &queryable{ logger: logger, replicaLabels: replicaLabels, @@ -99,7 +59,6 @@ func NewQueryableCreator( selectTimeout: selectTimeout, enableQueryPushdown: enableQueryPushdown, shardInfo: shardInfo, - seriesStatsReporter: seriesStatsReporter, } } } @@ -118,12 +77,11 @@ type queryable struct { selectTimeout time.Duration enableQueryPushdown bool shardInfo *storepb.ShardInfo - seriesStatsReporter seriesStatsReporter } // Querier returns a new storage querier against the underlying proxy store API. func (q *queryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { - return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabels, q.storeDebugMatchers, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.enableQueryPushdown, q.skipChunks, q.gateProviderFn(), q.selectTimeout, q.shardInfo, q.seriesStatsReporter), nil + return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabels, q.storeDebugMatchers, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.enableQueryPushdown, q.skipChunks, q.gateProviderFn(), q.selectTimeout, q.shardInfo), nil } type querier struct { @@ -142,7 +100,6 @@ type querier struct { selectGate gate.Gate selectTimeout time.Duration shardInfo *storepb.ShardInfo - seriesStatsReporter seriesStatsReporter } // newQuerier creates implementation of storage.Querier that fetches data from the proxy @@ -150,20 +107,16 @@ type querier struct { func newQuerier( ctx context.Context, logger log.Logger, - mint, - maxt int64, + mint, maxt int64, replicaLabels []string, storeDebugMatchers [][]*labels.Matcher, proxy storepb.StoreServer, deduplicate bool, maxResolutionMillis int64, - partialResponse, - enableQueryPushdown, - skipChunks bool, + partialResponse, enableQueryPushdown bool, skipChunks bool, selectGate gate.Gate, selectTimeout time.Duration, shardInfo *storepb.ShardInfo, - seriesStatsReporter seriesStatsReporter, ) *querier { if logger == nil { logger = log.NewNopLogger() @@ -192,7 +145,6 @@ func newQuerier( skipChunks: skipChunks, enableQueryPushdown: enableQueryPushdown, shardInfo: shardInfo, - seriesStatsReporter: seriesStatsReporter, } } @@ -205,9 +157,8 @@ type seriesServer struct { storepb.Store_SeriesServer ctx context.Context - seriesSet []storepb.Series - seriesSetStats storepb.SeriesStatsCounter - warnings []string + seriesSet []storepb.Series + warnings []string } func (s *seriesServer) Send(r *storepb.SeriesResponse) error { @@ -218,7 +169,6 @@ func (s *seriesServer) Send(r *storepb.SeriesResponse) error { if r.GetSeries() != nil { s.seriesSet = append(s.seriesSet, *r.GetSeries()) - s.seriesSetStats.Count(r.GetSeries()) return nil } @@ -307,12 +257,11 @@ func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Match span, ctx := tracing.StartSpan(ctx, "querier_select_select_fn") defer span.Finish() - set, stats, err := q.selectFn(ctx, hints, ms...) + set, err := q.selectFn(ctx, hints, ms...) if err != nil { promise <- storage.ErrSeriesSet(err) return } - q.seriesStatsReporter(stats) promise <- set }() @@ -330,10 +279,10 @@ func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Match }} } -func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, storepb.SeriesStatsCounter, error) { +func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, error) { sms, err := storepb.PromMatchersToMatchers(ms...) if err != nil { - return nil, storepb.SeriesStatsCounter{}, errors.Wrap(err, "convert matchers") + return nil, errors.Wrap(err, "convert matchers") } aggrs := aggrsFromFunc(hints.Func) @@ -361,7 +310,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms . Step: hints.Step, Range: hints.Range, }, resp); err != nil { - return nil, storepb.SeriesStatsCounter{}, errors.Wrap(err, "proxy Series()") + return nil, errors.Wrap(err, "proxy Series()") } var warns storage.Warnings @@ -393,7 +342,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms . set: newStoreSeriesSet(resp.seriesSet), aggrs: aggrs, warns: warns, - }, resp.seriesSetStats, nil + }, nil } // TODO(fabxc): this could potentially pushed further down into the store API to make true streaming possible. @@ -408,7 +357,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms . // The merged series set assembles all potentially-overlapping time ranges of the same series into a single one. // TODO(bwplotka): We could potentially dedup on chunk level, use chunk iterator for that when available. - return dedup.NewSeriesSet(set, q.replicaLabels, hints.Func, q.enableQueryPushdown), resp.seriesSetStats, nil + return dedup.NewSeriesSet(set, q.replicaLabels, hints.Func, q.enableQueryPushdown), nil } // sortDedupLabels re-sorts the set so that the same series with different replica diff --git a/pkg/query/querier_test.go b/pkg/query/querier_test.go index 2e31fa65a0..a43c75e7a5 100644 --- a/pkg/query/querier_test.go +++ b/pkg/query/querier_test.go @@ -44,17 +44,7 @@ func TestQueryableCreator_MaxResolution(t *testing.T) { queryableCreator := NewQueryableCreator(nil, nil, testProxy, 2, 5*time.Second) oneHourMillis := int64(1*time.Hour) / int64(time.Millisecond) - queryable := queryableCreator( - false, - nil, - nil, - oneHourMillis, - false, - false, - false, - nil, - NoopSeriesStatsReporter, - ) + queryable := queryableCreator(false, nil, nil, oneHourMillis, false, false, false, nil) q, err := queryable.Querier(context.Background(), 0, 42) testutil.Ok(t, err) @@ -81,22 +71,7 @@ func TestQuerier_DownsampledData(t *testing.T) { } timeout := 10 * time.Second - q := NewQueryableCreator( - nil, - nil, - testProxy, - 2, - timeout, - )(false, - nil, - nil, - 9999999, - false, - false, - false, - nil, - NoopSeriesStatsReporter, - ) + q := NewQueryableCreator(nil, nil, testProxy, 2, timeout)(false, nil, nil, 9999999, false, false, false, nil) engine := promql.NewEngine( promql.EngineOpts{ MaxSamples: math.MaxInt32, @@ -390,7 +365,7 @@ func TestQuerier_Select_AfterPromQL(t *testing.T) { g := gate.New(2) mq := &mockedQueryable{ Creator: func(mint, maxt int64) storage.Querier { - return newQuerier(context.Background(), nil, mint, maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, false, g, timeout, nil, NoopSeriesStatsReporter) + return newQuerier(context.Background(), nil, mint, maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, false, g, timeout, nil) }, } t.Cleanup(func() { @@ -634,7 +609,7 @@ func TestQuerier_Select(t *testing.T) { {dedup: true, expected: []series{tcase.expectedAfterDedup}}, } { g := gate.New(2) - q := newQuerier(context.Background(), nil, tcase.mint, tcase.maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, false, g, timeout, nil, func(i storepb.SeriesStatsCounter) {}) + q := newQuerier(context.Background(), nil, tcase.mint, tcase.maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, false, g, timeout, nil) t.Cleanup(func() { testutil.Ok(t, q.Close()) }) t.Run(fmt.Sprintf("dedup=%v", sc.dedup), func(t *testing.T) { @@ -863,7 +838,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) { timeout := 100 * time.Second g := gate.New(2) - q := newQuerier(context.Background(), logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, false, 0, true, false, false, g, timeout, nil, NoopSeriesStatsReporter) + q := newQuerier(context.Background(), logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, false, 0, true, false, false, g, timeout, nil) t.Cleanup(func() { testutil.Ok(t, q.Close()) }) @@ -933,7 +908,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) { timeout := 5 * time.Second g := gate.New(2) - q := newQuerier(context.Background(), logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, true, 0, true, false, false, g, timeout, nil, NoopSeriesStatsReporter) + q := newQuerier(context.Background(), logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, true, 0, true, false, false, g, timeout, nil) t.Cleanup(func() { testutil.Ok(t, q.Close()) }) diff --git a/pkg/query/query_bench_test.go b/pkg/query/query_bench_test.go index 84efb46820..301c880877 100644 --- a/pkg/query/query_bench_test.go +++ b/pkg/query/query_bench_test.go @@ -80,13 +80,12 @@ func benchQuerySelect(t testutil.TB, totalSamples, totalSeries int, dedup bool) logger := log.NewNopLogger() q := &querier{ - ctx: context.Background(), - logger: logger, - proxy: &mockedStoreServer{responses: resps}, - replicaLabels: map[string]struct{}{"a_replica": {}}, - deduplicate: dedup, - selectGate: gate.NewNoop(), - seriesStatsReporter: NoopSeriesStatsReporter, + ctx: context.Background(), + logger: logger, + proxy: &mockedStoreServer{responses: resps}, + replicaLabels: map[string]struct{}{"a_replica": {}}, + deduplicate: dedup, + selectGate: gate.NewNoop(), } testSelect(t, q, expectedSeries) } diff --git a/pkg/query/query_test.go b/pkg/query/query_test.go index 060571fc70..99e29be66f 100644 --- a/pkg/query/query_test.go +++ b/pkg/query/query_test.go @@ -54,16 +54,7 @@ func TestQuerier_Proxy(t *testing.T) { name: fmt.Sprintf("store number %v", i), }) } - return q(true, - nil, - nil, - 0, - false, - false, - false, - nil, - NoopSeriesStatsReporter, - ) + return q(true, nil, nil, 0, false, false, false, nil) } for _, fn := range files { diff --git a/pkg/store/telemetry.go b/pkg/store/telemetry.go deleted file mode 100644 index a854daaf0c..0000000000 --- a/pkg/store/telemetry.go +++ /dev/null @@ -1,88 +0,0 @@ -// Copyright (c) The Thanos Authors. -// Licensed under the Apache License 2.0. - -package store - -import ( - "strconv" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/thanos-io/thanos/pkg/store/storepb" -) - -// seriesStatsAggregator aggregates results from fanned-out queries into a histogram given their -// response's shape. -type seriesStatsAggregator struct { - queryDuration *prometheus.HistogramVec - - seriesLeBuckets []int64 - samplesLeBuckets []int64 - seriesStats storepb.SeriesStatsCounter -} - -// NewSeriesStatsAggregator is a constructor for seriesStatsAggregator. -func NewSeriesStatsAggregator( - reg prometheus.Registerer, - durationQuantiles []float64, - sampleQuantiles []int64, - seriesQuantiles []int64, -) *seriesStatsAggregator { - return &seriesStatsAggregator{ - queryDuration: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ - Name: "thanos_store_api_query_duration_seconds", - Help: "Duration of the Thanos Store API select phase for a query.", - Buckets: durationQuantiles, - }, []string{"series_le", "samples_le"}), - seriesLeBuckets: seriesQuantiles, - samplesLeBuckets: sampleQuantiles, - seriesStats: storepb.SeriesStatsCounter{}, - } -} - -// Aggregate is an aggregator for merging `storepb.SeriesStatsCounter` for each incoming fanned out query. -func (s *seriesStatsAggregator) Aggregate(stats storepb.SeriesStatsCounter) { - s.seriesStats.Series += stats.Series - s.seriesStats.Samples += stats.Samples - s.seriesStats.Chunks += stats.Chunks -} - -// Observe commits the aggregated SeriesStatsCounter as an observation. -func (s *seriesStatsAggregator) Observe(duration float64) { - if s.seriesStats.Series == 0 || s.seriesStats.Samples == 0 || s.seriesStats.Chunks == 0 { - return - } - // Bucket matching for series/labels matchSeriesBucket/matchSamplesBucket => float64, float64 - seriesLeBucket := s.findBucket(float64(s.seriesStats.Series), s.seriesLeBuckets) - samplesLeBucket := s.findBucket(float64(s.seriesStats.Samples), s.samplesLeBuckets) - s.queryDuration.With(prometheus.Labels{ - "series_le": strconv.Itoa(int(seriesLeBucket)), - "samples_le": strconv.Itoa(int(samplesLeBucket)), - }).Observe(duration) - s.reset() -} - -func (s *seriesStatsAggregator) reset() { - s.seriesStats = storepb.SeriesStatsCounter{} -} - -func (s *seriesStatsAggregator) findBucket(value float64, quantiles []int64) int64 { - if len(quantiles) == 0 { - return 0 - } - var foundBucket int64 - for _, bucket := range quantiles { - foundBucket = bucket - if value < float64(bucket) { - break - } - } - return foundBucket -} - -// NoopSeriesStatsAggregator is a query performance series aggregator that does nothing. -type NoopSeriesStatsAggregator struct{} - -func (s *NoopSeriesStatsAggregator) Aggregate(_ storepb.SeriesStatsCounter) {} - -func (s *NoopSeriesStatsAggregator) Observe(_ float64) {} diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index 04b425061a..7fc56bda97 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -23,7 +23,6 @@ import ( "github.com/chromedp/cdproto/network" "github.com/chromedp/chromedp" "github.com/efficientgo/e2e" - e2edb "github.com/efficientgo/e2e/db" e2emon "github.com/efficientgo/e2e/monitoring" "github.com/go-kit/log" "github.com/gogo/protobuf/proto" @@ -579,130 +578,6 @@ func newSample(s fakeMetricSample) model.Sample { } } -func TestQueryStoreMetrics(t *testing.T) { - t.Parallel() - - // Build up. - e, err := e2e.New(e2e.WithName("storemetrics01")) - testutil.Ok(t, err) - t.Cleanup(e2ethanos.CleanScenario(t, e)) - - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) - t.Cleanup(cancel) - - bucket := "store-gw-test" - minio := e2ethanos.NewMinio(e, "thanos-minio", bucket) - testutil.Ok(t, e2e.StartAndWaitReady(minio)) - - l := log.NewLogfmtLogger(os.Stdout) - bkt, err := s3.NewBucketWithConfig(l, e2ethanos.NewS3Config(bucket, minio.Endpoint("https"), minio.Dir()), "test") - testutil.Ok(t, err) - - // Preparing 2 different blocks for the tests. - { - blockSizes := []struct { - samples int - series int - name string - }{ - {samples: 10, series: 1, name: "one_series"}, - {samples: 10, series: 1001, name: "thousand_one_series"}, - } - now := time.Now() - externalLabels := labels.FromStrings("prometheus", "p1", "replica", "0") - dir := filepath.Join(e.SharedDir(), "tmp") - testutil.Ok(t, os.MkdirAll(filepath.Join(e.SharedDir(), dir), os.ModePerm)) - for _, blockSize := range blockSizes { - series := make([]labels.Labels, blockSize.series) - for i := 0; i < blockSize.series; i++ { - bigSeriesLabels := labels.FromStrings("__name__", blockSize.name, "instance", fmt.Sprintf("foo_%d", i)) - series[i] = bigSeriesLabels - } - blockID, err := e2eutil.CreateBlockWithBlockDelay(ctx, - dir, - series, - blockSize.samples, - timestamp.FromTime(now), - timestamp.FromTime(now.Add(2*time.Hour)), - 30*time.Minute, - externalLabels, - 0, - metadata.NoneFunc, - ) - testutil.Ok(t, err) - testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, blockID.String()), blockID.String())) - } - } - - storeGW := e2ethanos.NewStoreGW( - e, - "s1", - client.BucketConfig{ - Type: client.S3, - Config: e2ethanos.NewS3Config(bucket, minio.InternalEndpoint("https"), minio.InternalDir()), - }, - "", - nil, - ) - querier := e2ethanos.NewQuerierBuilder(e, "1", storeGW.InternalEndpoint("grpc")).Init() - testutil.Ok(t, e2e.StartAndWaitReady(storeGW, querier)) - testutil.Ok(t, storeGW.WaitSumMetrics(e2emon.Equals(2), "thanos_blocks_meta_synced")) - - // Querying the series in the previously created blocks to ensure we produce Store API query metrics. - { - instantQuery(t, ctx, querier.Endpoint("http"), func() string { - return "max_over_time(one_series{instance='foo_0'}[2h])" - }, time.Now, promclient.QueryOptions{ - Deduplicate: true, - }, 1) - testutil.Ok(t, err) - - instantQuery(t, ctx, querier.Endpoint("http"), func() string { - return "max_over_time(thousand_one_series[2h])" - }, time.Now, promclient.QueryOptions{ - Deduplicate: true, - }, 1001) - testutil.Ok(t, err) - } - - mon, err := e2emon.Start(e) - testutil.Ok(t, err) - - queryWaitAndAssert(t, ctx, mon.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string { - return "thanos_store_api_query_duration_seconds_count{samples_le='100000',series_le='10000'}" - }, time.Now, promclient.QueryOptions{ - Deduplicate: true, - }, model.Vector{ - &model.Sample{ - Metric: model.Metric{ - "__name__": "thanos_store_api_query_duration_seconds_count", - "instance": "storemetrics01-querier-1:8080", - "job": "querier-1", - "samples_le": "100000", - "series_le": "10000", - }, - Value: model.SampleValue(1), - }, - }) - - queryWaitAndAssert(t, ctx, mon.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string { - return "thanos_store_api_query_duration_seconds_count{samples_le='100',series_le='10'}" - }, time.Now, promclient.QueryOptions{ - Deduplicate: true, - }, model.Vector{ - &model.Sample{ - Metric: model.Metric{ - "__name__": "thanos_store_api_query_duration_seconds_count", - "instance": "storemetrics01-querier-1:8080", - "job": "querier-1", - "samples_le": "100", - "series_le": "10", - }, - Value: model.SampleValue(1), - }, - }) -} - // Regression test for https://github.com/thanos-io/thanos/issues/5033. // Tests whether queries work with mixed sources, and with functions // that we are pushing down: min, max, min_over_time, max_over_time, @@ -1007,10 +882,18 @@ func instantQuery(t testing.TB, ctx context.Context, addr string, q func() strin "msg", fmt.Sprintf("Waiting for %d results for query %s", expectedSeriesLen, q()), ) testutil.Ok(t, runutil.RetryWithLog(logger, 5*time.Second, ctx.Done(), func() error { - res, err := simpleInstantQuery(t, ctx, addr, q, ts, opts, expectedSeriesLen) + res, warnings, err := promclient.NewDefaultClient().QueryInstant(ctx, urlParse(t, "http://"+addr), q(), ts(), opts) if err != nil { return err } + + if len(warnings) > 0 { + return errors.Errorf("unexpected warnings %s", warnings) + } + + if len(res) != expectedSeriesLen { + return errors.Errorf("unexpected result size, expected %d; result %d: %v", expectedSeriesLen, len(res), res) + } result = res return nil })) @@ -1018,24 +901,6 @@ func instantQuery(t testing.TB, ctx context.Context, addr string, q func() strin return result } -func simpleInstantQuery(t testing.TB, ctx context.Context, addr string, q func() string, ts func() time.Time, opts promclient.QueryOptions, expectedSeriesLen int) (model.Vector, error) { - res, warnings, err := promclient.NewDefaultClient().QueryInstant(ctx, urlParse(t, "http://"+addr), q(), ts(), opts) - if err != nil { - return nil, err - } - - if len(warnings) > 0 { - return nil, errors.Errorf("unexpected warnings %s", warnings) - } - - if len(res) != expectedSeriesLen { - return nil, errors.Errorf("unexpected result size, expected %d; result %d: %v", expectedSeriesLen, len(res), res) - } - - sortResults(res) - return res, nil -} - func queryWaitAndAssert(t *testing.T, ctx context.Context, addr string, q func() string, ts func() time.Time, opts promclient.QueryOptions, expected model.Vector) { t.Helper() @@ -1047,7 +912,7 @@ func queryWaitAndAssert(t *testing.T, ctx context.Context, addr string, q func() "caller", "queryWaitAndAssert", "msg", fmt.Sprintf("Waiting for %d results for query %s", len(expected), q()), ) - testutil.Ok(t, runutil.RetryWithLog(logger, 10*time.Second, ctx.Done(), func() error { + testutil.Ok(t, runutil.RetryWithLog(logger, 5*time.Second, ctx.Done(), func() error { res, warnings, err := promclient.NewDefaultClient().QueryInstant(ctx, urlParse(t, "http://"+addr), q(), ts(), opts) if err != nil { return err