Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

query/storeset: do not close the connection if strict mode enabled #2568

Merged
merged 2 commits into from
May 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel

- [#2536](https://github.com/thanos-io/thanos/pull/2536) minio-go: Fixed AWS STS endpoint url to https for Web Identity providers on AWS EKS
- [#2501](https://github.com/thanos-io/thanos/pull/2501) Query: gracefully handle additional fields in `SeriesResponse` protobuf message that may be added in the future.
- [#2568](https://github.com/thanos-io/thanos/pull/2568) Query: does not close the connection of strict, static nodes if establishing a connection had succeeded but Info() call failed

### Added

Expand Down
7 changes: 4 additions & 3 deletions pkg/query/storeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,14 +414,15 @@ func (s *StoreSet) getActiveStores(ctx context.Context, stores map[string]*store
level.Warn(s.logger).Log("msg", "update of store node failed", "err", errors.Wrap(err, "dialing connection"), "address", addr)
return
}
st = &storeRef{StoreClient: storepb.NewStoreClient(conn), cc: conn, addr: addr, logger: s.logger}
st = &storeRef{StoreClient: storepb.NewStoreClient(conn), storeType: component.UnknownStoreAPI, cc: conn, addr: addr, logger: s.logger}
}

// Check existing or new store. Is it healthy? What are current metadata?
labelSets, minTime, maxTime, storeType, err := spec.Metadata(ctx, st.StoreClient)
if err != nil {
if !seenAlready {
// Close only if new. Unactive `s.stores` will be closed later on.
if !seenAlready && !spec.StrictStatic() {
// Close only if new and not a strict static node.
// Unactive `s.stores` will be closed later on.
st.Close()
}
s.updateStoreStatus(st, err)
Expand Down
34 changes: 31 additions & 3 deletions pkg/query/storeset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,14 @@ var testGRPCOpts = []grpc.DialOption{
}

type testStore struct {
info storepb.InfoResponse
infoDelay time.Duration
info storepb.InfoResponse
}

func (s *testStore) Info(ctx context.Context, r *storepb.InfoRequest) (*storepb.InfoResponse, error) {
if s.infoDelay > 0 {
time.Sleep(s.infoDelay)
}
return &s.info, nil
}

Expand All @@ -54,6 +58,7 @@ type testStoreMeta struct {
extlsetFn func(addr string) []storepb.LabelSet
storeType component.StoreAPI
minTime, maxTime int64
infoDelay time.Duration
}

type testStores struct {
Expand Down Expand Up @@ -82,6 +87,7 @@ func startTestStores(storeMetas []testStoreMeta) (*testStores, error) {
MaxTime: meta.maxTime,
MinTime: meta.minTime,
},
infoDelay: meta.infoDelay,
}
if meta.storeType != nil {
storeSrv.info.StoreType = meta.storeType.ToProto()
Expand Down Expand Up @@ -585,6 +591,25 @@ func TestQuerierStrict(t *testing.T) {
},
storeType: component.Sidecar,
},
// Slow store.
{
minTime: 65644,
maxTime: 77777,
extlsetFn: func(addr string) []storepb.LabelSet {
return []storepb.LabelSet{
{
Labels: []storepb.Label{
{
Name: "addr",
Value: addr,
},
},
},
}
},
storeType: component.Sidecar,
infoDelay: 2 * time.Second,
},
})

testutil.Ok(t, err)
Expand All @@ -595,14 +620,17 @@ func TestQuerierStrict(t *testing.T) {
return []StoreSpec{
NewGRPCStoreSpec(st.StoreAddresses()[0], true),
NewGRPCStoreSpec(st.StoreAddresses()[1], false),
NewGRPCStoreSpec(st.StoreAddresses()[2], true),
}
}, testGRPCOpts, time.Minute)
defer storeSet.Close()
storeSet.gRPCInfoCallTimeout = 1 * time.Second

// Initial update.
storeSet.Update(context.Background())
testutil.Equals(t, 2, len(storeSet.stores), "two clients must be available for running store nodes")
testutil.Equals(t, 3, len(storeSet.stores), "three clients must be available for running store nodes")

testutil.Assert(t, storeSet.stores[st.StoreAddresses()[2]].cc.GetState().String() != "SHUTDOWN", "slow store's connection should not be closed")

// The store is statically defined + strict mode is enabled
// so its client + information must be retained.
Expand All @@ -619,7 +647,7 @@ func TestQuerierStrict(t *testing.T) {
storeSet.Update(context.Background())

// Check that the information is the same.
testutil.Equals(t, 1, len(storeSet.stores), "one client must remain available for a store node that is down")
testutil.Equals(t, 2, len(storeSet.stores), "two static clients must remain available")
testutil.Equals(t, curMin, storeSet.stores[staticStoreAddr].minTime, "minimum time reported by the store node is different")
testutil.Equals(t, curMax, storeSet.stores[staticStoreAddr].maxTime, "minimum time reported by the store node is different")
testutil.NotOk(t, storeSet.storeStatuses[staticStoreAddr].LastError)
Expand Down