Skip to content

Commit

Permalink
Addressed PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivan Valkov committed Oct 9, 2018
1 parent 138b9ff commit 1ff8e29
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 41 deletions.
98 changes: 57 additions & 41 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string
stores := cmd.Flag("store", "Addresses of statically configured store API servers (repeatable).").
PlaceHolder("<store>").Strings()

filesToWatch := cmd.Flag("filesd", "Path to file that contain addresses of store API servers (repeatable).").
filesToWatch := cmd.Flag("store-sd-file", "Path to files that contain addresses of store API servers. The path can be a glob pattern (repeatable).").
PlaceHolder("<path>").Strings()

enableAutodownsampling := cmd.Flag("query.auto-downsampling", "Enable automatic adjustment (step / 5) to what source of data should be used in store gateways if no max_source_resolution param is specified. ").
Expand Down Expand Up @@ -252,14 +252,17 @@ func runQuery(
return errors.Wrap(err, "building gRPC client")
}

fileSDProvider := newFileSDProvider()
fileSDProvider := newFileSDCache()

var (
stores = query.NewStoreSet(
logger,
reg,
func() (specs []query.StoreSpec) {
// Add store specs from static flags.
specs = append(staticSpecs)

// Add store specs from gossip.
for id, ps := range peer.PeerStates(cluster.PeerTypesStoreAPIs()...) {
if ps.StoreAPIAddr == "" {
level.Error(logger).Log("msg", "Gossip found peer that propagates empty address, ignoring.", "lset", fmt.Sprintf("%v", ps.Metadata.Labels))
Expand All @@ -269,10 +272,13 @@ func runQuery(
specs = append(specs, &gossipSpec{id: id, addr: ps.StoreAPIAddr, peer: peer})
}

// Add store specs from file sd.
for _, addr := range fileSDProvider.addresses() {
specs = append(specs, query.NewGRPCStoreSpec(addr))
}

specs = removeDuplicates(specs)

return specs
},
dialOpts,
Expand All @@ -296,46 +302,45 @@ func runQuery(
stores.Close()
})
}
// Run File Service Discovery and update the store set when the files are modified
{
if fileSD != nil {
var fileSDUpdates chan []*targetgroup.Group
ctx, cancel := context.WithCancel(context.Background())
// Run File Service Discovery and update the store set when the files are modified.
if fileSD != nil {
var fileSDUpdates chan []*targetgroup.Group
ctxRun, cancelRun := context.WithCancel(context.Background())

fileSDUpdates = make(chan []*targetgroup.Group)
fileSDUpdates = make(chan []*targetgroup.Group)

g.Add(func() error {
fileSD.Run(ctx, fileSDUpdates)
return nil
}, func(error) {
cancel()
})
g.Add(func() error {
fileSD.Run(ctxRun, fileSDUpdates)
return nil
}, func(error) {
cancelRun()
})

g.Add(func() error {
for {
select {
case update, ok := <-fileSDUpdates:
// Handle the case that a discoverer exits and closes the channel
// before the context is done.
if !ok {
return nil
}
// Discoverers sometimes send nil updates so need to check for it to avoid panics
if update == nil {
continue
}
// TODO(ivan): resolve dns here maybe?
fileSDProvider.update(update)
stores.Update(ctx)
case <-ctx.Done():
ctxUpdate, cancelUpdate := context.WithCancel(context.Background())
g.Add(func() error {
for {
select {
case update, ok := <-fileSDUpdates:
// Handle the case that a discoverer exits and closes the channel
// before the context is done.
if !ok {
return nil
}
// Discoverers sometimes send nil updates so need to check for it to avoid panics.
if update == nil {
continue
}
// TODO(ivan): resolve dns here maybe?
fileSDProvider.update(update)
stores.Update(ctxUpdate)
case <-ctxUpdate.Done():
return nil
}
}, func(error) {
cancel()
close(fileSDUpdates)
})
}
}
}, func(error) {
cancelUpdate()
close(fileSDUpdates)
})
}
{
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -405,6 +410,17 @@ func runQuery(
level.Info(logger).Log("msg", "starting query node")
return nil
}
func removeDuplicates(specs []query.StoreSpec) []query.StoreSpec {
set := make(map[string]query.StoreSpec)
for _, spec := range specs {
set[spec.Addr()] = spec
}
deduplicated := make([]query.StoreSpec, 0, len(set))
for _, value := range set {
deduplicated = append(deduplicated, value)
}
return deduplicated
}

type gossipSpec struct {
id string
Expand All @@ -413,18 +429,18 @@ type gossipSpec struct {
peer *cluster.Peer
}

type fileSDProvider struct {
type fileSDCache struct {
tgs map[string]*targetgroup.Group
sync.Mutex
}

func newFileSDProvider() *fileSDProvider {
return &fileSDProvider{
func newFileSDCache() *fileSDCache {
return &fileSDCache{
tgs: make(map[string]*targetgroup.Group),
}
}

func (f *fileSDProvider) update(tgs []*targetgroup.Group) {
func (f *fileSDCache) update(tgs []*targetgroup.Group) {
f.Lock()
defer f.Unlock()
for _, tg := range tgs {
Expand All @@ -435,7 +451,7 @@ func (f *fileSDProvider) update(tgs []*targetgroup.Group) {
}
}

func (f *fileSDProvider) addresses() []string {
func (f *fileSDCache) addresses() []string {
f.Lock()
defer f.Unlock()
var addresses []string
Expand Down
4 changes: 4 additions & 0 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ Flags:
info endpoint (repeated).
--store=<store> ... Addresses of statically configured store API
servers (repeatable).
--store-sd-file=<path> ...
Path to files that contain addresses of store
API servers. The path can be a glob pattern
(repeatable).
--query.auto-downsampling Enable automatic adjustment (step / 5) to what
source of data should be used in store gateways
if no max_source_resolution param is specified.
Expand Down

0 comments on commit 1ff8e29

Please sign in to comment.