Skip to content

Commit

Permalink
Add resync on interval support and config
Browse files Browse the repository at this point in the history
  • Loading branch information
jacksontj committed Mar 16, 2023
1 parent 8f719d3 commit efed011
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 18 deletions.
89 changes: 76 additions & 13 deletions pkg/promclient/labelfilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,107 @@ package promclient

import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/promql/parser"
"github.com/sirupsen/logrus"
)

// TODO: config
func NewLabelFilterClient(a API) (*LabelFilterClient, error) {
type LabelFilterConfig struct {
LabelsToFilter []string `yaml:"labels_to_filter"`
SyncInterval time.Duration `yaml:"sync_interval"`
}

func (c *LabelFilterConfig) Validate() error {
for _, l := range c.LabelsToFilter {
if !model.IsValidMetricName(model.LabelValue(l)) {
return fmt.Errorf("%s is not a valid label name", l)
}
}

return nil
}

// UnmarshalYAML implements the yaml.Unmarshaler interface.
func (c *LabelFilterConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
type plain LabelFilterConfig
if err := unmarshal((*plain)(c)); err != nil {
return err
}

return c.Validate()
}

// NewLabelFilterClient returns a LabelFilterClient which will filter the queries sent downstream based
// on a filter of labels maintained in memory from the downstream API.
func NewLabelFilterClient(ctx context.Context, a API, cfg *LabelFilterConfig) (*LabelFilterClient, error) {
c := &LabelFilterClient{
API: a,
LabelsToFilter: []string{"__name__", "job", "version"},
API: a,
ctx: ctx,
cfg: cfg,
}

// TODO; background
if err := c.Sync(context.TODO()); err != nil {
// Do an initial sync
if err := c.Sync(ctx); err != nil {
return nil, err
}

if cfg.SyncInterval > 0 {
go func() {
ticker := time.NewTicker(cfg.SyncInterval)
for {
select {
case <-ticker.C:
// TODO: metric on sync status
if err := c.Sync(ctx); err != nil {
logrus.Errorf("error syncing in label_filter from downstream: %#v", err)
}
case <-ctx.Done():
ticker.Stop()
return
}
}
}()
}

return c, nil
}

// LabelFilterClient proxies a client and adds the given labels to all results
// LabelFilterClient filters out calls to the downstream based on a label filter
// which is pulled and maintained from the downstream API.
type LabelFilterClient struct {
API

LabelsToFilter []string // Which labels we want to pull to check

// TODO: move to a local block (to disk)?? or optionally so?
// labelFilter is a map of labelName -> labelValue -> nothing (for quick lookups)
labelFilter map[string]map[string]struct{}
// filter is an atomic to hold the LabelFilter which is a map of labelName -> labelValue -> nothing (for quick lookups)
filter atomic.Value

// Used as the background context for this client
ctx context.Context

// cfg is a pointer to the config for this client
cfg *LabelFilterConfig
}

// State returns the current ServerGroupState
func (c *LabelFilterClient) LabelFilter() map[string]map[string]struct{} {
tmp := c.filter.Load()
if ret, ok := tmp.(map[string]map[string]struct{}); ok {
return ret
}
return nil
}

func (c *LabelFilterClient) Sync(ctx context.Context) error {
filter := make(map[string]map[string]struct{})

for _, label := range c.LabelsToFilter {
for _, label := range c.cfg.LabelsToFilter {
labelFilter := make(map[string]struct{})
// TODO: warn?
vals, _, err := c.LabelValues(ctx, label, nil, model.Time(0).Time(), model.Now().Time())
Expand All @@ -52,7 +115,7 @@ func (c *LabelFilterClient) Sync(ctx context.Context) error {
filter[label] = labelFilter
}

c.labelFilter = filter
c.filter.Store(filter)

return nil
}
Expand All @@ -65,7 +128,7 @@ func (c *LabelFilterClient) Query(ctx context.Context, query string, ts time.Tim
return nil, nil, err
}

filterVisitor := NewFilterLabelVisitor(c.labelFilter)
filterVisitor := NewFilterLabelVisitor(c.LabelFilter())
if _, err := parser.Walk(ctx, filterVisitor, &parser.EvalStmt{Expr: e}, e, nil, nil); err != nil {
return nil, nil, err
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/servergroup/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ type Config struct {
// An example use-case would be if a specific servergroup was was "deprecated" and wasn't getting
// any new data after a specific given point in time
AbsoluteTimeRangeConfig *AbsoluteTimeRangeConfig `yaml:"absolute_time_range"`

// TODO: docs
LabelFilterConfig *promclient.LabelFilterConfig `yaml:"label_filter"`
}

// GetScheme returns the scheme for this servergroup
Expand Down
29 changes: 24 additions & 5 deletions pkg/servergroup/servergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ type ServerGroupState struct {
// Targets is the list of target URLs for this discovery round
Targets []string
apiClient promclient.API

ctx context.Context
ctxCancel context.CancelFunc
}

// ServerGroup encapsulates a set of prometheus downstreams to query/aggregate
Expand Down Expand Up @@ -132,10 +135,17 @@ func (s *ServerGroup) Sync() {
}
}

func (s *ServerGroup) loadTargetGroupMap(targetGroupMap map[string][]*targetgroup.Group) error {
func (s *ServerGroup) loadTargetGroupMap(targetGroupMap map[string][]*targetgroup.Group) (err error) {
targets := make([]string, 0)
apiClients := make([]promclient.API, 0)

ctx, ctxCancel := context.WithCancel(context.Background())
defer func() {
if err != nil {
ctxCancel()
}
}()

for _, targetGroupList := range targetGroupMap {
for _, targetGroup := range targetGroupList {
for _, target := range targetGroup.Targets {
Expand Down Expand Up @@ -252,9 +262,11 @@ func (s *ServerGroup) loadTargetGroupMap(targetGroupMap map[string][]*targetgrou
apiClient = &promclient.DebugAPI{apiClient, u.String()}
}

apiClient, err = promclient.NewLabelFilterClient(apiClient)
if err != nil {
return err
if s.Cfg.LabelFilterConfig != nil {
apiClient, err = promclient.NewLabelFilterClient(ctx, apiClient, s.Cfg.LabelFilterConfig)
if err != nil {
return err
}
}

apiClients = append(apiClients, apiClient)
Expand All @@ -271,16 +283,23 @@ func (s *ServerGroup) loadTargetGroupMap(targetGroupMap map[string][]*targetgrou
if err != nil {
return err
}

newState := &ServerGroupState{
Targets: targets,
apiClient: apiClient,
ctx: ctx,
ctxCancel: ctxCancel,
}

if s.Cfg.IgnoreError {
newState.apiClient = &promclient.IgnoreErrorAPI{newState.apiClient}
}

s.state.Store(newState)
oldState := s.State() // Fetch the current state (so we can stop it)
s.state.Store(newState) // Store new state
if oldState != nil {
oldState.ctxCancel() // Cancel the old state
}

if !s.loaded {
s.loaded = true
Expand Down

0 comments on commit efed011

Please sign in to comment.