Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

query: add --store-strict flag #2337

Merged
merged 2 commits into from
Mar 30, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel

### Added

- [#2252](https://github.com/thanos-io/thanos/pull/2252) Query: add new `--store.strict-mode` flag. More information available [here](/docs/proposals/202001_thanos_query_health_handling.md).
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
- [#2265](https://github.com/thanos-io/thanos/pull/2265) Compactor: Add `--wait-interval` to specify compaction wait interval between consecutive compact runs when `--wait` enabled.
- [#2250](https://github.com/thanos-io/thanos/pull/2250) Compactor: Enable vertical compaction for offline deduplication (Experimental). Uses `--deduplication.replica-label` flag to specify the replica label to deduplicate on (Hidden). Please note that this uses a NAIVE algorithm for merging (no smart replica deduplication, just chaining samples together). This works well for deduplication of blocks with **precisely the same samples** like produced by Receiver replication. We plan to add a smarter algorithm in the following weeks.
- [#1714](https://github.com/thanos-io/thanos/pull/1714) Run the bucket web UI in the compact component when it is run as a long-lived process.
Expand Down
19 changes: 17 additions & 2 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
stores := cmd.Flag("store", "Addresses of statically configured store API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect store API servers through respective DNS lookups.").
PlaceHolder("<store>").Strings()

strictStores := cmd.Flag("store-strict", "Addresses of only statically configured store API servers that are always used, even if the health check fails. Useful if you have a caching layer on top.").
PlaceHolder("<staticstore>").Strings()

fileSDFiles := cmd.Flag("store.sd-files", "Path to files that contain addresses of store API servers. The path can be a glob pattern (repeatable).").
PlaceHolder("<path>").Strings()

Expand Down Expand Up @@ -162,6 +165,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
*dnsSDResolver,
time.Duration(*unhealthyStoreTimeout),
time.Duration(*instantDefaultMaxSourceResolution),
*strictStores,
component.Query,
)
}
Expand Down Expand Up @@ -202,6 +206,7 @@ func runQuery(
dnsSDResolver string,
unhealthyStoreTimeout time.Duration,
instantDefaultMaxSourceResolution time.Duration,
strictStores []string,
comp component.Component,
) error {
// TODO(bplotka in PR #513 review): Move arguments into struct.
Expand All @@ -222,14 +227,24 @@ func runQuery(
dns.ResolverType(dnsSDResolver),
)

for _, store := range strictStores {
if dns.IsDynamicNode(store) {
return errors.Errorf("%s is a dynamically specified store i.e. it uses SD and that is not permitted under strict mode. Use --store for this", store)
}
}

var (
stores = query.NewStoreSet(
logger,
reg,
func() (specs []query.StoreSpec) {
// Add DNS resolved addresses from static flags and file SD.
// Add DNS resolved addresses.
for _, addr := range dnsProvider.Addresses() {
specs = append(specs, query.NewGRPCStoreSpec(addr))
specs = append(specs, query.NewGRPCStoreSpec(addr, false))
}
// Add strict & static nodes.
for _, addr := range strictStores {
specs = append(specs, query.NewGRPCStoreSpec(addr, true))
}

specs = removeDuplicateStoreSpecs(logger, duplicatedStores, specs)
Expand Down
5 changes: 5 additions & 0 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,11 @@ Flags:
prefixed with 'dns+' or 'dnssrv+' to detect
store API servers through respective DNS
lookups.
--store-strict=<staticstore> ...
Addresses of only statically configured store
API servers that are always used, even if the
health check fails. Useful if you have a
caching layer on top.
--store.sd-files=<path> ...
Path to files that contain addresses of store
API servers. The path can be a glob pattern
Expand Down
11 changes: 6 additions & 5 deletions docs/proposals/202001_thanos_query_health_handling.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
title: Thanos Query store nodes healthiness handling
type: proposal
menu: proposals
status: accepted
status: complete
owner: GiedriusS
---

Expand Down Expand Up @@ -35,6 +35,7 @@ Thus, this logic needs to be changed somehow. There are a few possible options:
2. Another option could be introduced such as `--store.hold-timeout` which would be `--store.unhealthy-timeout`'s brother and we would hold the StoreAPI nodes for `max(hold_timeout, unhealthy_timeout)`.
3. Another option such as `--store.strict-mode` could be introduced which means that we would always retain the last information of the StoreAPI nodes of the last successful check.
4. The StoreAPI node specification format that is used in `--store` could be extended to include another flag which would let specify the previous option per-specific node.
5. Instead of extending the specification format, we could move the same inforamtion to the command line options themselves. This would increase the explicitness of this new mode i.e. that it only applies to statically defined nodes.

Lets look through their pros and cons:

Expand All @@ -47,10 +48,10 @@ If we were to graph these choices in terms of their incisiveness and complexity
```text
Most incisive / Least Complex ------------ Least incisive / Most Complex
#1 #2 #4
#3
#3 #5
```

After careful consideration and with the rationale in this proposal, we have decided to go with the third option. It should provide a sweet spot between being too invasive and providing our users the ability to fall-back to the old behavior.
After careful consideration and with the rationale in this proposal, we have decided to go with the fifth option. It should provide a sweet spot between being too invasive and providing our users the ability to fall-back to the old behavior.

## Goals

Expand All @@ -77,15 +78,15 @@ The way this will need to be done should be as generic as possible so the design

## Proposal

* Add a new flag to Thanos Query `--store.strict-mode` which will make it always retain the last successfully retrieved information via the `Info()` gRPC method of **statically** defined nodes and thus always consider them part of the active store set.
* Add a new flag to Thanos Query `--store-strict` which will only accept statically specified nodes and Thanos Query will always retain the last successfully retrieved information of them via the `Info()` gRPC method. Thus, they will always be considered as part of the active store set.

## Risk

* Users might have problems removing the store nodes from the active store set since they will be there forever with this option set. However, one might argue that if the nodes go down then something like DNS service discovery needs to be used which would dynamically add and remove those nodes.

## Work Plan

* Implement the new flag `--store.strict-mode` in Thanos Query which will make it keep around statically defined nodes. It will be disabled by default to reduce surprises when upgrading.
* Implement the new flag `--store-strict` in Thanos Query which will only accept statically defined nodes that will be permanently kept around. It is optional to use so there will be no surprises when upgrading.
* Implement tests with dummy store nodes.
* Document the new behavior.

Expand Down
27 changes: 21 additions & 6 deletions pkg/discovery/dns/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,23 @@ func (p *Provider) Clone() *Provider {
}
}

// IsDynamicNode returns if the specified StoreAPI addr uses
// any kind of SD mechanism.
func IsDynamicNode(addr string) bool {
qtype, _ := GetQTypeName(addr)
return qtype != ""
}

// GetQTypeName splits the provided addr into two parts: the QType (if any)
// and the name.
func GetQTypeName(addr string) (qtype string, name string) {
qtypeAndName := strings.SplitN(addr, "+", 2)
if len(qtypeAndName) != 2 {
return "", addr
}
return qtypeAndName[0], qtypeAndName[1]
}

// Resolve stores a list of provided addresses or their DNS records if requested.
// Addresses prefixed with `dns+` or `dnssrv+` will be resolved through respective DNS lookup (A/AAAA or SRV).
// defaultPort is used for non-SRV records when a port is not supplied.
Expand All @@ -100,14 +117,12 @@ func (p *Provider) Resolve(ctx context.Context, addrs []string) {
resolvedAddrs := map[string][]string{}
for _, addr := range addrs {
var resolved []string
qtypeAndName := strings.SplitN(addr, "+", 2)
if len(qtypeAndName) != 2 {
// No lookup specified. Add to results and continue to the next address.
resolvedAddrs[addr] = []string{addr}
p.resolverAddrs.WithLabelValues(addr).Set(1.0)
qtype, name := GetQTypeName(addr)
if qtype == "" {
resolvedAddrs[name] = []string{name}
p.resolverAddrs.WithLabelValues(name).Set(1.0)
continue
}
qtype, name := qtypeAndName[0], qtypeAndName[1]

resolved, err := p.resolver.Resolve(ctx, name, QType(qtype))
p.resolverLookupsCount.Inc()
Expand Down
32 changes: 32 additions & 0 deletions pkg/discovery/dns/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,35 @@ func (d *mockResolver) Resolve(_ context.Context, name string, _ QType) ([]strin
}
return d.res[name], nil
}

// TestIsDynamicNode tests whether we properly catch dynamically defined nodes.
func TestIsDynamicNode(t *testing.T) {
for _, tcase := range []struct {
node string
isDynamic bool
}{
{
node: "1.2.3.4",
isDynamic: false,
},
{
node: "gibberish+1.1.1.1+noa",
isDynamic: true,
},
{
node: "",
isDynamic: false,
},
{
node: "dns+aaa",
isDynamic: true,
},
{
node: "dnssrv+asdasdsa",
isDynamic: true,
},
} {
isDynamic := IsDynamicNode(tcase.node)
testutil.Equals(t, tcase.isDynamic, isDynamic, "mismatch between results")
}
}
57 changes: 38 additions & 19 deletions pkg/query/storeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type StoreSpec interface {
// NOTE: It is implementation responsibility to retry until context timeout, but a caller responsibility to manage
// given store connection.
Metadata(ctx context.Context, client storepb.StoreClient) (labelSets []storepb.LabelSet, mint int64, maxt int64, storeType component.StoreAPI, err error)
// StrictStatic returns true if the StoreAPI has been statically defined and it is under a strict mode.
StrictStatic() bool
}

type StoreStatus struct {
Expand All @@ -49,13 +51,19 @@ type StoreStatus struct {
}

type grpcStoreSpec struct {
addr string
addr string
strictstatic bool
}

// NewGRPCStoreSpec creates store pure gRPC spec.
// It uses Info gRPC call to get Metadata.
func NewGRPCStoreSpec(addr string) StoreSpec {
return &grpcStoreSpec{addr: addr}
func NewGRPCStoreSpec(addr string, strictstatic bool) StoreSpec {
return &grpcStoreSpec{addr: addr, strictstatic: strictstatic}
}

// StrictStatic returns true if the StoreAPI has been statically defined and it is under a strict mode.
func (s *grpcStoreSpec) StrictStatic() bool {
return s.strictstatic
}

func (s *grpcStoreSpec) Addr() string {
Expand Down Expand Up @@ -320,7 +328,7 @@ func newStoreAPIStats() map[component.StoreAPI]map[string]int {
}

// Update updates the store set. It fetches current list of store specs from function and updates the fresh metadata
// from all stores.
// from all stores. Keeps around statically defined nodes that were defined with the strict mode.
func (s *StoreSet) Update(ctx context.Context) {
s.updateMtx.Lock()
defer s.updateMtx.Unlock()
Expand All @@ -334,14 +342,14 @@ func (s *StoreSet) Update(ctx context.Context) {

level.Debug(s.logger).Log("msg", "starting updating storeAPIs", "cachedStores", len(stores))

healthyStores := s.getHealthyStores(ctx, stores)
level.Debug(s.logger).Log("msg", "checked requested storeAPIs", "healthyStores", len(healthyStores), "cachedStores", len(stores))
activeStores := s.getActiveStores(ctx, stores)
level.Debug(s.logger).Log("msg", "checked requested storeAPIs", "activeStores", len(activeStores), "cachedStores", len(stores))

stats := newStoreAPIStats()

// Close stores that where not healthy this time (are not in healthy stores map).
// Close stores that where not active this time (are not in active stores map).
for addr, st := range stores {
if _, ok := healthyStores[addr]; ok {
if _, ok := activeStores[addr]; ok {
stats[st.StoreType()][st.LabelSetsString()]++
continue
}
Expand All @@ -353,7 +361,7 @@ func (s *StoreSet) Update(ctx context.Context) {
}

// Add stores that are not yet in stores.
for addr, st := range healthyStores {
for addr, st := range activeStores {
if _, ok := stores[addr]; ok {
continue
}
Expand Down Expand Up @@ -384,15 +392,15 @@ func (s *StoreSet) Update(ctx context.Context) {
s.cleanUpStoreStatuses(stores)
}

func (s *StoreSet) getHealthyStores(ctx context.Context, stores map[string]*storeRef) map[string]*storeRef {
func (s *StoreSet) getActiveStores(ctx context.Context, stores map[string]*storeRef) map[string]*storeRef {
var (
unique = make(map[string]struct{})
healthyStores = make(map[string]*storeRef, len(stores))
mtx sync.Mutex
wg sync.WaitGroup
unique = make(map[string]struct{})
activeStores = make(map[string]*storeRef, len(stores))
mtx sync.Mutex
wg sync.WaitGroup
)

// Gather healthy stores map concurrently. Build new store if does not exist already.
// Gather active stores map concurrently. Build new store if does not exist already.
for _, storeSpec := range s.storeSpecs() {
if _, ok := unique[storeSpec.Addr()]; ok {
level.Warn(s.logger).Log("msg", "duplicated address in store nodes", "address", storeSpec.Addr())
Expand All @@ -411,7 +419,7 @@ func (s *StoreSet) getHealthyStores(ctx context.Context, stores map[string]*stor

st, seenAlready := stores[addr]
if !seenAlready {
// New store or was unhealthy and was removed in the past - create new one.
// New store or was unactive and was removed in the past - create new one.
conn, err := grpc.DialContext(ctx, addr, s.dialOpts...)
if err != nil {
s.updateStoreStatus(&storeRef{addr: addr}, err)
Expand All @@ -425,25 +433,36 @@ func (s *StoreSet) getHealthyStores(ctx context.Context, stores map[string]*stor
labelSets, minTime, maxTime, storeType, err := spec.Metadata(ctx, st.StoreClient)
if err != nil {
if !seenAlready {
// Close only if new. Unhealthy `s.stores` will be closed later on.
// Close only if new. Unactive `s.stores` will be closed later on.
st.Close()
}
s.updateStoreStatus(st, err)
level.Warn(s.logger).Log("msg", "update of store node failed", "err", errors.Wrap(err, "getting metadata"), "address", addr)

if !spec.StrictStatic() {
return
}

// Still keep it around if static & strict mode enabled.
mtx.Lock()
defer mtx.Unlock()

activeStores[addr] = st
return
}

s.updateStoreStatus(st, nil)
st.Update(labelSets, minTime, maxTime, storeType)

mtx.Lock()
defer mtx.Unlock()

healthyStores[addr] = st
activeStores[addr] = st
}(storeSpec)
}
wg.Wait()

return healthyStores
return activeStores
}

func (s *StoreSet) updateStoreStatus(store *storeRef, err error) {
Expand Down
Loading