From 815921c6d140df7f7431c062c557583de6c5667d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Wed, 6 May 2020 13:51:50 +0300 Subject: [PATCH 1/2] query/storeset: do not close the connection if strict mode enabled MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Do not close the gRPC connection if establishing a connection has succeeded but we have failed to get response to a Info() call. Without this and with strict mode in such a case, we will always keep around a closed connection that won't work anymore unless the whole Thanos Query process will be restarted. Signed-off-by: Giedrius Statkevičius --- pkg/query/storeset.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/query/storeset.go b/pkg/query/storeset.go index aa9430178a..8db69d6a2c 100644 --- a/pkg/query/storeset.go +++ b/pkg/query/storeset.go @@ -420,8 +420,9 @@ func (s *StoreSet) getActiveStores(ctx context.Context, stores map[string]*store // Check existing or new store. Is it healthy? What are current metadata? labelSets, minTime, maxTime, storeType, err := spec.Metadata(ctx, st.StoreClient) if err != nil { - if !seenAlready { - // Close only if new. Unactive `s.stores` will be closed later on. + if !seenAlready && !spec.StrictStatic() { + // Close only if new and not static. + // Unactive `s.stores` will be closed later on. st.Close() } s.updateStoreStatus(st, err) From 2a06f160164bdafeea6738c5c35f231bf673bc67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Wed, 6 May 2020 14:19:40 +0300 Subject: [PATCH 2/2] query/storeset: add test, add CHANGELOG item MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Giedrius Statkevičius --- CHANGELOG.md | 1 + pkg/query/storeset.go | 4 ++-- pkg/query/storeset_test.go | 34 +++++++++++++++++++++++++++++++--- 3 files changed, 34 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a91ab498a4..1ed7ad11a2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#2536](https://github.com/thanos-io/thanos/pull/2536) minio-go: Fixed AWS STS endpoint url to https for Web Identity providers on AWS EKS - [#2501](https://github.com/thanos-io/thanos/pull/2501) Query: gracefully handle additional fields in `SeriesResponse` protobuf message that may be added in the future. +- [#2568](https://github.com/thanos-io/thanos/pull/2568) Query: does not close the connection of strict, static nodes if establishing a connection had succeeded but Info() call failed ### Added diff --git a/pkg/query/storeset.go b/pkg/query/storeset.go index 8db69d6a2c..d91f549b50 100644 --- a/pkg/query/storeset.go +++ b/pkg/query/storeset.go @@ -414,14 +414,14 @@ func (s *StoreSet) getActiveStores(ctx context.Context, stores map[string]*store level.Warn(s.logger).Log("msg", "update of store node failed", "err", errors.Wrap(err, "dialing connection"), "address", addr) return } - st = &storeRef{StoreClient: storepb.NewStoreClient(conn), cc: conn, addr: addr, logger: s.logger} + st = &storeRef{StoreClient: storepb.NewStoreClient(conn), storeType: component.UnknownStoreAPI, cc: conn, addr: addr, logger: s.logger} } // Check existing or new store. Is it healthy? What are current metadata? labelSets, minTime, maxTime, storeType, err := spec.Metadata(ctx, st.StoreClient) if err != nil { if !seenAlready && !spec.StrictStatic() { - // Close only if new and not static. + // Close only if new and not a strict static node. // Unactive `s.stores` will be closed later on. st.Close() } diff --git a/pkg/query/storeset_test.go b/pkg/query/storeset_test.go index f2091b39bd..293c7dd87b 100644 --- a/pkg/query/storeset_test.go +++ b/pkg/query/storeset_test.go @@ -27,10 +27,14 @@ var testGRPCOpts = []grpc.DialOption{ } type testStore struct { - info storepb.InfoResponse + infoDelay time.Duration + info storepb.InfoResponse } func (s *testStore) Info(ctx context.Context, r *storepb.InfoRequest) (*storepb.InfoResponse, error) { + if s.infoDelay > 0 { + time.Sleep(s.infoDelay) + } return &s.info, nil } @@ -54,6 +58,7 @@ type testStoreMeta struct { extlsetFn func(addr string) []storepb.LabelSet storeType component.StoreAPI minTime, maxTime int64 + infoDelay time.Duration } type testStores struct { @@ -82,6 +87,7 @@ func startTestStores(storeMetas []testStoreMeta) (*testStores, error) { MaxTime: meta.maxTime, MinTime: meta.minTime, }, + infoDelay: meta.infoDelay, } if meta.storeType != nil { storeSrv.info.StoreType = meta.storeType.ToProto() @@ -585,6 +591,25 @@ func TestQuerierStrict(t *testing.T) { }, storeType: component.Sidecar, }, + // Slow store. + { + minTime: 65644, + maxTime: 77777, + extlsetFn: func(addr string) []storepb.LabelSet { + return []storepb.LabelSet{ + { + Labels: []storepb.Label{ + { + Name: "addr", + Value: addr, + }, + }, + }, + } + }, + storeType: component.Sidecar, + infoDelay: 2 * time.Second, + }, }) testutil.Ok(t, err) @@ -595,6 +620,7 @@ func TestQuerierStrict(t *testing.T) { return []StoreSpec{ NewGRPCStoreSpec(st.StoreAddresses()[0], true), NewGRPCStoreSpec(st.StoreAddresses()[1], false), + NewGRPCStoreSpec(st.StoreAddresses()[2], true), } }, testGRPCOpts, time.Minute) defer storeSet.Close() @@ -602,7 +628,9 @@ func TestQuerierStrict(t *testing.T) { // Initial update. storeSet.Update(context.Background()) - testutil.Equals(t, 2, len(storeSet.stores), "two clients must be available for running store nodes") + testutil.Equals(t, 3, len(storeSet.stores), "three clients must be available for running store nodes") + + testutil.Assert(t, storeSet.stores[st.StoreAddresses()[2]].cc.GetState().String() != "SHUTDOWN", "slow store's connection should not be closed") // The store is statically defined + strict mode is enabled // so its client + information must be retained. @@ -619,7 +647,7 @@ func TestQuerierStrict(t *testing.T) { storeSet.Update(context.Background()) // Check that the information is the same. - testutil.Equals(t, 1, len(storeSet.stores), "one client must remain available for a store node that is down") + testutil.Equals(t, 2, len(storeSet.stores), "two static clients must remain available") testutil.Equals(t, curMin, storeSet.stores[staticStoreAddr].minTime, "minimum time reported by the store node is different") testutil.Equals(t, curMax, storeSet.stores[staticStoreAddr].maxTime, "minimum time reported by the store node is different") testutil.NotOk(t, storeSet.storeStatuses[staticStoreAddr].LastError)