From 18ba93d2c6808e041e9e54886a4e98776a38e2d6 Mon Sep 17 00:00:00 2001 From: Sergiusz Urbaniak Date: Thu, 21 Jan 2021 11:40:44 +0100 Subject: [PATCH] pkg/rules/proxy: fix hotlooping when receiving client errors Currently, if we receive an error from the underlying client stream, we continue with trying to receive additional data. This causes a hotloop as we will receive the same error again. This fixes it by returning in the error case and adds a unit test for the proxy logic. Fixes #3717 Signed-off-by: Sergiusz Urbaniak --- pkg/rules/proxy.go | 7 +- pkg/rules/proxy_test.go | 299 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 305 insertions(+), 1 deletion(-) create mode 100644 pkg/rules/proxy_test.go diff --git a/pkg/rules/proxy.go b/pkg/rules/proxy.go index 7910f0ef687..1952224bcaa 100644 --- a/pkg/rules/proxy.go +++ b/pkg/rules/proxy.go @@ -108,6 +108,9 @@ func (stream *rulesStream) receive(ctx context.Context) error { } if err != nil { + // An error happened in Recv(), hence the underlying stream is aborted + // as per https://github.com/grpc/grpc-go/blob/7f2581f910fc21497091c4109b56d310276fc943/stream.go#L117-L125. + // We must not continue receiving additional data from it and must return. err = errors.Wrapf(err, "receiving rules from rules client %v", stream.client) if stream.request.PartialResponseStrategy == storepb.PartialResponseStrategy_ABORT { @@ -118,13 +121,15 @@ func (stream *rulesStream) receive(ctx context.Context) error { return errors.Wrapf(err, "sending rules error to server %v", stream.server) } - continue + // Return no error if response strategy is warning. + return nil } if w := rule.GetWarning(); w != "" { if err := stream.server.Send(rulespb.NewWarningRulesResponse(errors.New(w))); err != nil { return errors.Wrapf(err, "sending rules warning to server %v", stream.server) } + // client stream is not aborted, it is ok to receive additional data. continue } diff --git a/pkg/rules/proxy_test.go b/pkg/rules/proxy_test.go new file mode 100644 index 00000000000..f8fc8018fc2 --- /dev/null +++ b/pkg/rules/proxy_test.go @@ -0,0 +1,299 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package rules + +import ( + "context" + "io" + "os" + "reflect" + "testing" + + "github.com/thanos-io/thanos/pkg/store/storepb" + + "google.golang.org/grpc/metadata" + + "google.golang.org/grpc" + + "github.com/go-kit/kit/log" + "github.com/pkg/errors" + "github.com/thanos-io/thanos/pkg/rules/rulespb" +) + +type testRulesClient struct { + rulesErr, recvErr error + response *rulespb.RulesResponse + sentResponse bool +} + +func (t *testRulesClient) String() string { + return "test" +} + +func (t *testRulesClient) Recv() (*rulespb.RulesResponse, error) { + // a naive simulation of underlying grpc Recv behavior as per https://github.com/grpc/grpc-go/blob/7f2581f910fc21497091c4109b56d310276fc943/stream.go#L117-L125. + if t.recvErr != nil { + return nil, t.recvErr + } + + if t.sentResponse { + return nil, io.EOF + } + t.sentResponse = true + + return t.response, nil +} + +func (t *testRulesClient) Header() (metadata.MD, error) { + panic("not implemented") +} + +func (t *testRulesClient) Trailer() metadata.MD { + panic("not implemented") +} + +func (t *testRulesClient) CloseSend() error { + panic("not implemented") +} + +func (t *testRulesClient) Context() context.Context { + panic("not implemented") +} + +func (t *testRulesClient) SendMsg(m interface{}) error { + panic("not implemented") +} + +func (t *testRulesClient) RecvMsg(m interface{}) error { + panic("not implemented") +} + +func (t *testRulesClient) Rules(ctx context.Context, in *rulespb.RulesRequest, opts ...grpc.CallOption) (rulespb.Rules_RulesClient, error) { + return t, t.rulesErr +} + +var _ rulespb.RulesClient = &testRulesClient{} + +type testRulesServer struct { + sendErr error + response *rulespb.RulesResponse +} + +func (t *testRulesServer) String() string { + return "test" +} + +func (t *testRulesServer) Send(response *rulespb.RulesResponse) error { + if t.sendErr != nil { + return t.sendErr + } + t.response = response + return nil +} + +func (t *testRulesServer) SetHeader(md metadata.MD) error { + panic("not implemented") +} + +func (t *testRulesServer) SendHeader(md metadata.MD) error { + panic("not implemented") +} + +func (t *testRulesServer) SetTrailer(md metadata.MD) { + panic("not implemented") +} + +func (t *testRulesServer) Context() context.Context { + return context.Background() +} + +func (t *testRulesServer) SendMsg(m interface{}) error { + panic("not implemented") +} + +func (t *testRulesServer) RecvMsg(m interface{}) error { + panic("not implemented") +} + +func TestProxy(t *testing.T) { + logger := log.NewLogfmtLogger(os.Stderr) + + for _, tc := range []struct { + name string + request *rulespb.RulesRequest + client rulespb.RulesClient + server *testRulesServer + wantResponse *rulespb.RulesResponse + wantError error + }{ + { + name: "rule group proxy success", + request: &rulespb.RulesRequest{ + Type: rulespb.RulesRequest_ALL, + PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, + }, + client: &testRulesClient{ + response: rulespb.NewRuleGroupRulesResponse(&rulespb.RuleGroup{ + Name: "foo", + }), + recvErr: nil, + }, + server: &testRulesServer{}, + wantResponse: rulespb.NewRuleGroupRulesResponse(&rulespb.RuleGroup{ + Name: "foo", + }), + }, + { + name: "warning proxy success", + request: &rulespb.RulesRequest{ + Type: rulespb.RulesRequest_ALL, + PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, + }, + client: &testRulesClient{ + response: rulespb.NewWarningRulesResponse(errors.New("warning from client")), + recvErr: nil, + }, + server: &testRulesServer{}, + wantResponse: rulespb.NewWarningRulesResponse(errors.New("warning from client")), + }, + { + name: "warn: retreiving rules client failed", + request: &rulespb.RulesRequest{ + Type: rulespb.RulesRequest_ALL, + PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, + }, + client: &testRulesClient{ + response: nil, + rulesErr: errors.New("retreiving rules failed"), + }, + server: &testRulesServer{}, + wantResponse: rulespb.NewWarningRulesResponse(errors.New("fetching rules from rules client test: retreiving rules failed")), + }, + { + name: "warn: retreiving rules client failed, forward warning failed", + request: &rulespb.RulesRequest{ + Type: rulespb.RulesRequest_ALL, + PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, + }, + client: &testRulesClient{ + response: nil, + rulesErr: errors.New("retreiving rules failed"), + }, + server: &testRulesServer{ + sendErr: errors.New("forwarding warning response failed"), + }, + wantError: errors.New("forwarding warning response failed"), + }, + { + name: "abort: retreiving rules client failed", + request: &rulespb.RulesRequest{ + Type: rulespb.RulesRequest_ALL, + PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT, + }, + client: &testRulesClient{ + response: nil, + rulesErr: errors.New("retreiving rules failed"), + }, + server: &testRulesServer{}, + wantError: errors.New("fetching rules from rules client test: retreiving rules failed"), + }, + { + name: "warn: receive failed", + request: &rulespb.RulesRequest{ + Type: rulespb.RulesRequest_ALL, + PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, + }, + client: &testRulesClient{ + response: nil, + recvErr: errors.New("503 from Prometheus"), + }, + server: &testRulesServer{}, + wantResponse: rulespb.NewWarningRulesResponse(errors.New("receiving rules from rules client test: 503 from Prometheus")), + }, + { + name: "warn: receive failed, forward warning failed", + request: &rulespb.RulesRequest{ + Type: rulespb.RulesRequest_ALL, + PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, + }, + client: &testRulesClient{ + response: nil, + recvErr: errors.New("503 from Prometheus"), + }, + server: &testRulesServer{ + sendErr: errors.New("forwarding warning response failed"), + }, + wantError: errors.New("sending rules error to server test: forwarding warning response failed"), + }, + { + name: "abort: receive failed", + request: &rulespb.RulesRequest{ + Type: rulespb.RulesRequest_ALL, + PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT, + }, + client: &testRulesClient{ + response: nil, + recvErr: errors.New("503 from Prometheus"), + }, + server: &testRulesServer{}, + wantError: errors.New("receiving rules from rules client test: 503 from Prometheus"), + }, + { + name: "send failed", + request: &rulespb.RulesRequest{ + Type: rulespb.RulesRequest_ALL, + PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, + }, + client: &testRulesClient{ + response: rulespb.NewRuleGroupRulesResponse(&rulespb.RuleGroup{ + Name: "foo", + }), + recvErr: nil, + }, + server: &testRulesServer{ + sendErr: errors.New("sending message failed"), + }, + wantError: errors.New("rpc error: code = Unknown desc = send rules response: sending message failed"), + }, + { + name: "sending warning response failed", + request: &rulespb.RulesRequest{ + Type: rulespb.RulesRequest_ALL, + PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, + }, + client: &testRulesClient{ + response: rulespb.NewWarningRulesResponse(errors.New("warning from client")), + recvErr: nil, + }, + server: &testRulesServer{ + sendErr: errors.New("sending message failed"), + }, + wantError: errors.New("sending rules warning to server test: sending message failed"), + }, + } { + t.Run(tc.name, func(t *testing.T) { + p := NewProxy(logger, func() []rulespb.RulesClient { + return []rulespb.RulesClient{tc.client} + }) + + err := p.Rules(tc.request, tc.server) + gotErr := "" + if err != nil { + gotErr = err.Error() + } + wantErr := "" + if tc.wantError != nil { + wantErr = tc.wantError.Error() + } + + if gotErr != wantErr { + t.Errorf("want error %q, got %q", wantErr, gotErr) + } + + if !reflect.DeepEqual(tc.wantResponse, tc.server.response) { + t.Errorf("want response %v, got %v", tc.wantResponse, tc.server.response) + } + }) + } +}