Skip to content

Commit

Permalink
Added file sd to query
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivan Valkov committed Oct 5, 2018
1 parent 566d90e commit 6c9aca9
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 1 deletion.
87 changes: 86 additions & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"gopkg.in/alecthomas/kingpin.v2"
"github.com/improbable-eng/thanos/pkg/discovery"
"sync"
)

// registerQuery registers a query command.
Expand Down Expand Up @@ -64,6 +66,9 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string
stores := cmd.Flag("store", "Addresses of statically configured store API servers (repeatable).").
PlaceHolder("<store>").Strings()

filesToWatch := cmd.Flag("filesd", "Path to file that contain addresses of store API servers (repeatable).").
PlaceHolder("<path>").Strings()

enableAutodownsampling := cmd.Flag("query.auto-downsampling", "Enable automatic adjustment (step / 5) to what source of data should be used in store gateways if no max_source_resolution param is specified. ").
Default("false").Bool()

Expand All @@ -86,6 +91,15 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string
lookupStores[s] = struct{}{}
}

var filesd *discovery.FileDiscoverer
if len(*filesToWatch) > 0 {
conf := &discovery.SDConfig{
Files: *filesToWatch,
RefreshInterval: 5 * time.Second,
}
filesd = discovery.NewFileDiscoverer(conf, logger)
}

return runQuery(
g,
logger,
Expand All @@ -107,6 +121,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string
selectorLset,
*stores,
*enableAutodownsampling,
filesd,
)
}
}
Expand Down Expand Up @@ -211,6 +226,7 @@ func runQuery(
selectorLset labels.Labels,
storeAddrs []string,
enableAutodownsampling bool,
fileSD *discovery.FileDiscoverer,
) error {
var staticSpecs []query.StoreSpec
for _, addr := range storeAddrs {
Expand All @@ -226,13 +242,14 @@ func runQuery(
return errors.Wrap(err, "building gRPC client")
}

addrFromFileSD := newFileSDAddrs()

var (
stores = query.NewStoreSet(
logger,
reg,
func() (specs []query.StoreSpec) {
specs = append(staticSpecs)

for id, ps := range peer.PeerStates(cluster.PeerTypesStoreAPIs()...) {
if ps.StoreAPIAddr == "" {
level.Error(logger).Log("msg", "Gossip found peer that propagates empty address, ignoring.", "lset", fmt.Sprintf("%v", ps.Metadata.Labels))
Expand All @@ -241,6 +258,15 @@ func runQuery(

specs = append(specs, &gossipSpec{id: id, addr: ps.StoreAPIAddr, peer: peer})
}

addrFromFileSD.mtx.Lock()
defer addrFromFileSD.mtx.Unlock()
for _, addresses := range addrFromFileSD.addrs {
for _, addr := range addresses {
specs = append(specs, query.NewGRPCStoreSpec(addr))
}
}

return specs
},
dialOpts,
Expand All @@ -264,6 +290,48 @@ func runQuery(
stores.Close()
})
}
// Run File Service Discovery and update the store set when the files are modified
{
if fileSD != nil {
var fileSDUpdates chan *discovery.Discoverable
ctx, cancel := context.WithCancel(context.Background())

fileSDUpdates = make(chan *discovery.Discoverable)

g.Add(func() error {
fileSD.Run(ctx, fileSDUpdates)
return nil
}, func(error) {
cancel()
})

g.Add(func() error {
for {
select {
case update, ok := <-fileSDUpdates:
// Handle the case that a discoverer exits and closes the channel
// before the context is done.
if !ok {
return nil
}
// Discoverers sometimes send nil updates so need to check for it to avoid panics
if update == nil {
continue
}
// TODO(ivan): resolve dns here maybe?
addrFromFileSD.update(update.Source, update.Services)
stores.Update(ctx)
case <-ctx.Done():
return nil
}
}
}, func(error) {
cancel()
stores.Close()
close(fileSDUpdates)
})
}
}
{
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
Expand Down Expand Up @@ -340,6 +408,23 @@ type gossipSpec struct {
peer *cluster.Peer
}

type fileSDAddrs struct {
addrs map[string][]string
mtx sync.Mutex
}

func newFileSDAddrs() *fileSDAddrs {
return &fileSDAddrs{
addrs: make(map[string][]string),
}
}

func (f *fileSDAddrs) update(source string, addrs []string) {
f.mtx.Lock()
defer f.mtx.Unlock()
f.addrs[source] = addrs
}

func (s *gossipSpec) Addr() string {
return s.addr
}
Expand Down
2 changes: 2 additions & 0 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ Flags:
info endpoint (repeated).
--store=<store> ... Addresses of statically configured store API
servers (repeatable).
--filesd=<path> ... Path to file that contain addresses of store
API servers (repeatable).
--query.auto-downsampling Enable automatic adjustment (step / 5) to what
source of data should be used in store gateways
if no max_source_resolution param is specified.
Expand Down

0 comments on commit 6c9aca9

Please sign in to comment.