diff --git a/CHANGELOG.md b/CHANGELOG.md index 619d68a35d..e4c8a2304c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -153,6 +153,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#6692](https://github.com/thanos-io/thanos/pull/6692) Store: Fix matching bug when using empty alternative in regex matcher, for example (a||b). - [#6679](https://github.com/thanos-io/thanos/pull/6697) Store: Fix block deduplication - [#6706](https://github.com/thanos-io/thanos/pull/6706) Store: Series responses should always be sorted +- [#7286](https://github.com/thanos-io/thanos/pull/7286) Query: Propagate instant query warnings in distributed execution mode. ### Added diff --git a/pkg/query/remote_engine.go b/pkg/query/remote_engine.go index 8e27d28f53..2b0e67e056 100644 --- a/pkg/query/remote_engine.go +++ b/pkg/query/remote_engine.go @@ -256,8 +256,9 @@ func (r *remoteQuery) Exec(ctx context.Context) *promql.Result { return &promql.Result{Err: err} } var ( - result = make(promql.Vector, 0) - builder = labels.NewScratchBuilder(8) + result = make(promql.Vector, 0) + warnings annotations.Annotations + builder = labels.NewScratchBuilder(8) ) for { msg, err := qry.Recv() @@ -269,7 +270,8 @@ func (r *remoteQuery) Exec(ctx context.Context) *promql.Result { } if warn := msg.GetWarnings(); warn != "" { - return &promql.Result{Err: errors.New(warn)} + warnings.Add(errors.New(warn)) + continue } ts := msg.GetTimeseries() @@ -287,7 +289,10 @@ func (r *remoteQuery) Exec(ctx context.Context) *promql.Result { } } - return &promql.Result{Value: result} + return &promql.Result{ + Value: result, + Warnings: warnings, + } } request := &querypb.QueryRangeRequest{ diff --git a/pkg/query/remote_engine_test.go b/pkg/query/remote_engine_test.go index 1831f2356d..bca79598e5 100644 --- a/pkg/query/remote_engine_test.go +++ b/pkg/query/remote_engine_test.go @@ -25,7 +25,7 @@ import ( ) func TestRemoteEngine_Warnings(t *testing.T) { - client := NewClient(&queryWarnClient{}, "", nil) + client := NewClient(&warnClient{}, "", nil) engine := NewRemoteEngine(log.NewNopLogger(), client, Opts{ Timeout: 1 * time.Second, }) @@ -41,11 +41,23 @@ func TestRemoteEngine_Warnings(t *testing.T) { Start: time.Now(), End: time.Now().Add(2 * time.Hour), }, logicalplan.PlanOptions{}) - qry, err := engine.NewRangeQuery(context.Background(), nil, plan.Root(), start, end, step) - testutil.Ok(t, err) - res := qry.Exec(context.Background()) - testutil.Ok(t, res.Err) - testutil.Equals(t, 1, len(res.Warnings)) + + t.Run("instant_query", func(t *testing.T) { + qry, err := engine.NewInstantQuery(context.Background(), nil, plan.Root(), start) + testutil.Ok(t, err) + res := qry.Exec(context.Background()) + testutil.Ok(t, res.Err) + testutil.Equals(t, 1, len(res.Warnings)) + }) + + t.Run("range_query", func(t *testing.T) { + qry, err := engine.NewRangeQuery(context.Background(), nil, plan.Root(), start, end, step) + testutil.Ok(t, err) + res := qry.Exec(context.Background()) + testutil.Ok(t, res.Err) + testutil.Equals(t, 1, len(res.Warnings)) + }) + } func TestRemoteEngine_LabelSets(t *testing.T) { @@ -198,11 +210,15 @@ func zLabelSetFromStrings(ss ...string) labelpb.ZLabelSet { } } -type queryWarnClient struct { +type warnClient struct { querypb.QueryClient } -func (m queryWarnClient) QueryRange(ctx context.Context, in *querypb.QueryRangeRequest, opts ...grpc.CallOption) (querypb.Query_QueryRangeClient, error) { +func (m warnClient) Query(ctx context.Context, in *querypb.QueryRequest, opts ...grpc.CallOption) (querypb.Query_QueryClient, error) { + return &queryWarnClient{}, nil +} + +func (m warnClient) QueryRange(ctx context.Context, in *querypb.QueryRangeRequest, opts ...grpc.CallOption) (querypb.Query_QueryRangeClient, error) { return &queryRangeWarnClient{}, nil } @@ -218,3 +234,16 @@ func (m *queryRangeWarnClient) Recv() (*querypb.QueryRangeResponse, error) { m.warnSent = true return querypb.NewQueryRangeWarningsResponse(errors.New("warning")), nil } + +type queryWarnClient struct { + querypb.Query_QueryClient + warnSent bool +} + +func (m *queryWarnClient) Recv() (*querypb.QueryResponse, error) { + if m.warnSent { + return nil, io.EOF + } + m.warnSent = true + return querypb.NewQueryWarningsResponse(errors.New("warning")), nil +}