Skip to content

Commit

Permalink
*: convert to annotations
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Hoffmann <mhoffm@posteo.de>
  • Loading branch information
MichaHoffmann committed Oct 4, 2023
1 parent c28ed8b commit ee4fbdc
Show file tree
Hide file tree
Showing 17 changed files with 77 additions and 75 deletions.
4 changes: 2 additions & 2 deletions pkg/api/query/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (g *GRPCAPI) Query(request *querypb.QueryRequest, server querypb.Query_Quer
}

if len(result.Warnings) != 0 {
if err := server.Send(querypb.NewQueryWarningsResponse(result.Warnings...)); err != nil {
if err := server.Send(querypb.NewQueryWarningsResponse(result.Warnings.AsErrors()...)); err != nil {
return err
}
}
Expand Down Expand Up @@ -231,7 +231,7 @@ func (g *GRPCAPI) QueryRange(request *querypb.QueryRangeRequest, srv querypb.Que
}

if len(result.Warnings) != 0 {
if err := srv.Send(querypb.NewQueryRangeWarningsResponse(result.Warnings...)); err != nil {
if err := srv.Send(querypb.NewQueryRangeWarningsResponse(result.Warnings.AsErrors()...)); err != nil {
return err
}
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/api/query/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/annotations"
v1 "github.com/prometheus/prometheus/web/api/v1"

"github.com/thanos-io/thanos/pkg/api/query/querypb"
Expand Down Expand Up @@ -41,7 +42,7 @@ func TestGRPCQueryAPIErrorHandling(t *testing.T) {
{
name: "error response",
engine: &engineStub{
warns: []error{errors.New("warn stub")},
warns: annotations.New().Add(errors.New("warn stub")),
},
},
}
Expand All @@ -68,7 +69,7 @@ func TestGRPCQueryAPIErrorHandling(t *testing.T) {
if len(test.engine.warns) > 0 {
testutil.Ok(t, err)
for i, resp := range srv.responses {
testutil.Equals(t, test.engine.warns[i].Error(), resp.GetWarnings())
testutil.Equals(t, test.engine.warns.AsErrors()[i].Error(), resp.GetWarnings())
}
}
})
Expand All @@ -87,7 +88,7 @@ func TestGRPCQueryAPIErrorHandling(t *testing.T) {
if len(test.engine.warns) > 0 {
testutil.Ok(t, err)
for i, resp := range srv.responses {
testutil.Equals(t, test.engine.warns[i].Error(), resp.GetWarnings())
testutil.Equals(t, test.engine.warns.AsErrors()[i].Error(), resp.GetWarnings())
}
}
})
Expand All @@ -97,7 +98,7 @@ func TestGRPCQueryAPIErrorHandling(t *testing.T) {
type engineStub struct {
v1.QueryEngine
err error
warns []error
warns annotations.Annotations
}

func (e engineStub) NewInstantQuery(_ context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) {
Expand All @@ -111,7 +112,7 @@ func (e engineStub) NewRangeQuery(_ context.Context, q storage.Queryable, opts p
type queryStub struct {
promql.Query
err error
warns []error
warns annotations.Annotations
}

func (q queryStub) Close() {}
Expand Down
41 changes: 21 additions & 20 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/annotations"
"github.com/prometheus/prometheus/util/stats"
v1 "github.com/prometheus/prometheus/web/api/v1"
promqlapi "github.com/thanos-io/promql-engine/api"
Expand Down Expand Up @@ -589,7 +590,7 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro
Result: res.Value,
Stats: qs,
QueryExplanation: explanation,
}, res.Warnings, nil, qry.Close
}, res.Warnings.AsErrors(), nil, qry.Close
}

func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.ApiError, func()) {
Expand Down Expand Up @@ -759,7 +760,7 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap
Result: res.Value,
Stats: qs,
QueryExplanation: explanation,
}, res.Warnings, nil, qry.Close
}, res.Warnings.AsErrors(), nil, qry.Close
}

func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.ApiError, func()) {
Expand Down Expand Up @@ -811,7 +812,7 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A
true,
nil,
query.NoopSeriesStatsReporter,
).Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end))
).Querier(timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}, func() {}
}
Expand All @@ -825,11 +826,11 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A
var callWarnings annotations.Annotations
labelValuesSet := make(map[string]struct{})
for _, matchers := range matcherSets {
vals, callWarnings, err = q.LabelValues(name, matchers...)
vals, callWarnings, err = q.LabelValues(ctx, name, matchers...)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}, func() {}
}
warnings = append(warnings, callWarnings...)
warnings.Merge(callWarnings)
for _, val := range vals {
labelValuesSet[val] = struct{}{}
}
Expand All @@ -841,7 +842,7 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A
}
sort.Strings(vals)
} else {
vals, warnings, err = q.LabelValues(name)
vals, warnings, err = q.LabelValues(ctx, name)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}, func() {}
}
Expand All @@ -851,7 +852,7 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A
vals = make([]string, 0)
}

return vals, warnings, nil, func() {}
return vals, warnings.AsErrors(), nil, func() {}
}

func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiError, func()) {
Expand Down Expand Up @@ -914,7 +915,7 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr
true,
nil,
query.NoopSeriesStatsReporter,
).Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end))
).Querier(timestamp.FromTime(start), timestamp.FromTime(end))

if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}, func() {}
Expand All @@ -926,7 +927,7 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr
sets []storage.SeriesSet
)
for _, mset := range matcherSets {
sets = append(sets, q.Select(false, nil, mset...))
sets = append(sets, q.Select(ctx, false, nil, mset...))
}

set := storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge)
Expand All @@ -936,7 +937,7 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr
if set.Err() != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: set.Err()}, func() {}
}
return metrics, set.Warnings(), nil, func() {}
return metrics, set.Warnings().AsErrors(), nil, func() {}
}

func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.ApiError, func()) {
Expand Down Expand Up @@ -981,7 +982,7 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap
true,
nil,
query.NoopSeriesStatsReporter,
).Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end))
).Querier(timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}, func() {}
}
Expand All @@ -996,11 +997,11 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap
var callWarnings annotations.Annotations
labelNamesSet := make(map[string]struct{})
for _, matchers := range matcherSets {
names, callWarnings, err = q.LabelNames(matchers...)
names, callWarnings, err = q.LabelNames(ctx, matchers...)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}, func() {}
}
warnings = append(warnings, callWarnings...)
warnings.Merge(callWarnings)
for _, val := range names {
labelNamesSet[val] = struct{}{}
}
Expand All @@ -1012,7 +1013,7 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap
}
sort.Strings(names)
} else {
names, warnings, err = q.LabelNames()
names, warnings, err = q.LabelNames(ctx)
}

if err != nil {
Expand All @@ -1022,7 +1023,7 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap
names = make([]string, 0)
}

return names, warnings, nil, func() {}
return names, warnings.AsErrors(), nil, func() {}
}

func (qapi *QueryAPI) stores(_ *http.Request) (interface{}, []error, *api.ApiError, func()) {
Expand Down Expand Up @@ -1065,7 +1066,7 @@ func NewTargetsHandler(client targets.UnaryClient, enablePartialResponse bool) f
return nil, nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Wrap(err, "retrieving targets")}, func() {}
}

return t, warnings, nil, func() {}
return t, warnings.AsErrors(), nil, func() {}
}
}

Expand Down Expand Up @@ -1111,7 +1112,7 @@ func NewAlertsHandler(client rules.UnaryClient, enablePartialResponse bool) func
resp.Alerts = append(resp.Alerts, a.Alerts...)
}
}
return resp, warnings, nil, func() {}
return resp, warnings.AsErrors(), nil, func() {}
}
}

Expand Down Expand Up @@ -1158,7 +1159,7 @@ func NewRulesHandler(client rules.UnaryClient, enablePartialResponse bool) func(
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Errorf("error retrieving rules: %v", err)}, func() {}
}
return groups, warnings, nil, func() {}
return groups, warnings.AsErrors(), nil, func() {}
}
}

Expand Down Expand Up @@ -1203,7 +1204,7 @@ func NewExemplarsHandler(client exemplars.UnaryClient, enablePartialResponse boo
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Wrap(err, "retrieving exemplars")}, func() {}
}
return data, warnings, nil, func() {}
return data, warnings.AsErrors(), nil, func() {}
}
}

Expand Down Expand Up @@ -1321,6 +1322,6 @@ func NewMetricMetadataHandler(client metadata.UnaryClient, enablePartialResponse
return nil, nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Wrap(err, "retrieving metadata")}, func() {}
}

return t, warnings, nil, func() {}
return t, warnings.AsErrors(), nil, func() {}
}
}
7 changes: 4 additions & 3 deletions pkg/api/query/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/util/annotations"
promgate "github.com/prometheus/prometheus/util/gate"
"github.com/prometheus/prometheus/util/stats"
baseAPI "github.com/thanos-io/thanos/pkg/api"
Expand Down Expand Up @@ -691,7 +692,7 @@ func TestMetadataEndpoints(t *testing.T) {
var series []storage.Series

for _, lbl := range old {
var samples []tsdbutil.Sample
var samples []chunks.Sample

for i := int64(0); i < 10; i++ {
samples = append(samples, sample{
Expand Down Expand Up @@ -1797,7 +1798,7 @@ func TestRulesHandler(t *testing.T) {
}
res, errors, apiError, releaseResources := endpoint(req.WithContext(ctx))
defer releaseResources()
if errors != nil {
if len(errors) > 0 {
t.Fatalf("Unexpected errors: %s", errors)
return
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/exemplars/exemplars.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type exemplarsServer struct {
exemplarspb.Exemplars_ExemplarsServer
ctx context.Context

warnings []error
warnings annotations.Annotations
data []*exemplarspb.ExemplarData
mu sync.Mutex
}
Expand All @@ -46,7 +46,7 @@ func (srv *exemplarsServer) Send(res *exemplarspb.ExemplarsResponse) error {
if res.GetWarning() != "" {
srv.mu.Lock()
defer srv.mu.Unlock()
srv.warnings = append(srv.warnings, errors.New(res.GetWarning()))
srv.warnings.Add(errors.New(res.GetWarning()))
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type metadataServer struct {
metric string
limit int

warnings []error
warnings annotations.Annotations
metadataMap map[string][]metadatapb.Meta
mu sync.Mutex
}
Expand All @@ -75,7 +75,7 @@ func (srv *metadataServer) Send(res *metadatapb.MetricMetadataResponse) error {
if res.GetWarning() != "" {
srv.mu.Lock()
defer srv.mu.Unlock()
srv.warnings = append(srv.warnings, errors.New(res.GetWarning()))
srv.warnings.Add(errors.New(res.GetWarning()))
return nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/metadata/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/util/annotations"
"github.com/thanos-io/thanos/pkg/metadata/metadatapb"
"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/runutil"
Expand Down
15 changes: 6 additions & 9 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ package query

import (
"context"
"github.com/prometheus/prometheus/util/annotations"
"strings"
"sync"
"time"

"github.com/prometheus/prometheus/util/annotations"

"github.com/go-kit/log"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
Expand Down Expand Up @@ -201,12 +202,12 @@ type seriesServer struct {

seriesSet []storepb.Series
seriesSetStats storepb.SeriesStatsCounter
warnings []string
warnings annotations.Annotations
}

func (s *seriesServer) Send(r *storepb.SeriesResponse) error {
if r.GetWarning() != "" {
s.warnings = append(s.warnings, r.GetWarning())
s.warnings.Add(errors.New(r.GetWarning()))
return nil
}

Expand Down Expand Up @@ -277,7 +278,7 @@ func (q *querier) Select(ctx context.Context, _ bool, hints *storage.SelectHints
// The querier has a context, but it gets canceled as soon as query evaluation is completed by the engine.
// We want to prevent this from happening for the async store API calls we make while preserving tracing context.
// TODO(bwplotka): Does the above still is true? It feels weird to leave unfinished calls behind query API.
ctx := tracing.CopyTraceContext(context.Background(), q.ctx)
ctx = tracing.CopyTraceContext(context.Background(), ctx)
ctx, cancel := context.WithTimeout(ctx, q.selectTimeout)
span, ctx := tracing.StartSpan(ctx, "querier_select", opentracing.Tags{
"minTime": hints.Start,
Expand Down Expand Up @@ -362,11 +363,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .
if err := q.proxy.Series(&req, resp); err != nil {
return nil, storepb.SeriesStatsCounter{}, errors.Wrap(err, "proxy Series()")
}

var warns annotations.Annotations
for _, w := range resp.warnings {
warns.Add(errors.New(w))
}
warns := annotations.New().Merge(resp.warnings)

if q.enableQueryPushdown && (hints.Func == "max_over_time" || hints.Func == "min_over_time") {
// On query pushdown, delete the metric's name from the result because that's what the
Expand Down
Loading

0 comments on commit ee4fbdc

Please sign in to comment.