Skip to content

Commit

Permalink
pkg/reloader: improve detection of directory changes
Browse files Browse the repository at this point in the history
When watching for changes in directories, the reloader used to rely only
on the watch interval and not the inotify events. This commit implements
a more efficient detection of changes for watched directories.

The change also adds a new `DelayInterval` option that allows to delay
the config reload after no additional event are received.

Finally a new metric,
`thanos_sidecar_reloader_config_apply_operations_total`, is added and
`thanos_sidecar_reloader_config_apply_errors_total` has been renamed
to `thanos_sidecar_reloader_config_apply_operations_failed_total` for
consistency.

Signed-off-by: Simon Pasquier <spasquie@redhat.com>
  • Loading branch information
simonpasquier committed Sep 8, 2020
1 parent 03b6991 commit 434e274
Show file tree
Hide file tree
Showing 4 changed files with 441 additions and 94 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

## Unreleased

### Changed

- [#3136](https://github.com/thanos-io/thanos/pull/3136) Sidecar: Add metric `thanos_sidecar_reloader_config_apply_operations_total` and rename metric `thanos_sidecar_reloader_config_apply_errors_total` to `thanos_sidecar_reloader_config_apply_operations_failed_total`.

## [v0.15.0](https://github.com/thanos-io/thanos/releases) - 2020.09.07

Highlights:
Expand Down
1 change: 1 addition & 0 deletions pkg/reloader/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func ExampleReloader() {
WatchedDirs: []string{"/path/to/dirs"},
WatchInterval: 3 * time.Minute,
RetryInterval: 5 * time.Second,
DelayInterval: 1 * time.Second,
})

ctx, cancel := context.WithCancel(context.Background())
Expand Down
273 changes: 206 additions & 67 deletions pkg/reloader/reloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ import (
"path/filepath"
"regexp"
"strings"
"sync"
"time"

"github.com/fsnotify/fsnotify"
Expand All @@ -86,19 +87,18 @@ type Reloader struct {
reloadURL *url.URL
cfgFile string
cfgOutputFile string
watchedDirs []string
watchInterval time.Duration
retryInterval time.Duration
watchedDirs []string
watcher *watcher

lastCfgHash []byte
lastWatchedDirsHash []byte

reloads prometheus.Counter
reloadErrors prometheus.Counter
watches prometheus.Gauge
watchEvents prometheus.Counter
watchErrors prometheus.Counter
configErrors prometheus.Counter
reloads prometheus.Counter
reloadErrors prometheus.Counter
configApplyErrors prometheus.Counter
configApply prometheus.Counter
}

// Options bundles options for the Reloader.
Expand All @@ -112,11 +112,15 @@ type Options struct {
// will be substituted and the output written into the given path. Prometheus should then use
// cfgOutputFile as its config file path.
CfgOutputFile string
// WatchedDirs is a collection of paths for this reloader to watch over.
// WatchedDirs is a collection of paths for the reloader to watch over.
WatchedDirs []string
// DelayInterval controls how long the reloader will wait without receiving
// new file-system events before it applies the reload.
DelayInterval time.Duration
// WatchInterval controls how often reloader re-reads config and directories.
WatchInterval time.Duration
// RetryInterval controls how often reloader retries config reload in case of error.
// RetryInterval controls how often the reloader retries a reloading of the
// configuration in case the endpoint returned an error.
RetryInterval time.Duration
}

Expand All @@ -133,6 +137,7 @@ func New(logger log.Logger, reg prometheus.Registerer, o *Options) *Reloader {
reloadURL: o.ReloadURL,
cfgFile: o.CfgFile,
cfgOutputFile: o.CfgOutputFile,
watcher: newWatcher(logger, reg, o.DelayInterval),
watchedDirs: o.WatchedDirs,
watchInterval: o.WatchInterval,
retryInterval: o.RetryInterval,
Expand All @@ -149,56 +154,44 @@ func New(logger log.Logger, reg prometheus.Registerer, o *Options) *Reloader {
Help: "Total number of reload requests that failed.",
},
),
configErrors: promauto.With(reg).NewCounter(
configApply: promauto.With(reg).NewCounter(
prometheus.CounterOpts{
Name: "reloader_config_apply_errors_total",
Help: "Total number of config applies that failed.",
Name: "reloader_config_apply_operations_total",
Help: "Total number of config apply operations.",
},
),
watches: promauto.With(reg).NewGauge(
prometheus.GaugeOpts{
Name: "reloader_watches",
Help: "Number of resources watched by the reloader.",
},
),
watchEvents: promauto.With(reg).NewCounter(
configApplyErrors: promauto.With(reg).NewCounter(
prometheus.CounterOpts{
Name: "reloader_watch_events_total",
Help: "Total number of events received by the reloader from the watcher.",
},
),
watchErrors: promauto.With(reg).NewCounter(
prometheus.CounterOpts{
Name: "reloader_watch_errors_total",
Help: "Total number of errors received by the reloader from the watcher.",
Name: "reloader_config_apply_operations_failed_total",
Help: "Total number of config apply operations that failed.",
},
),
}
return r
}

// We cannot detect everything via watch. Watch interval controls how often we re-read given dirs non-recursively.
func (r *Reloader) WithWatchInterval(duration time.Duration) {
r.watchInterval = duration
}

// Watch starts to watch periodically the config file and directories and process them until the context
// gets canceled. Config file gets env expanded if cfgOutputFile is specified and reload is trigger if
// config or directories changed.
// Watch watchers periodically based on r.watchInterval.
// For config file it watches it directly as well via fsnotify.
// It watches directories as well, but lot's of edge cases are missing, so rely on interval mostly.
// Watch detects any change made to the watched config file and directories. It
// returns when the context is canceled.
// Whenever a filesystem change is detected or the watch interval has elapsed,
// the reloader expands the config file (if cfgOutputFile is specified) and
// triggers a reload if the configuration file or files in the watched
// directories have changed.
// Because some edge cases might be missing, the reloader also relies on the
// watch interval.
func (r *Reloader) Watch(ctx context.Context) error {
if r.cfgFile == "" && len(r.watchedDirs) == 0 {
level.Info(r.logger).Log("msg", "nothing to be watched")
return nil
}

watcher, err := fsnotify.NewWatcher()
if err != nil {
return errors.Wrap(err, "create watcher")
}
defer runutil.CloseWithLogOnErr(r.logger, watcher, "config watcher close")

watchables := map[string]struct{}{}
if r.cfgFile != "" {
watchables[filepath.Dir(r.cfgFile)] = struct{}{}
if err := watcher.Add(r.cfgFile); err != nil {
if err := r.watcher.addFile(r.cfgFile); err != nil {
return errors.Wrapf(err, "add config file %s to watcher", r.cfgFile)
}

Expand All @@ -207,42 +200,46 @@ func (r *Reloader) Watch(ctx context.Context) error {
}
}

// Watch directories in best effort manner.
for _, dir := range r.watchedDirs {
watchables[filepath.Dir(dir)] = struct{}{}
if err := watcher.Add(dir); err != nil {
return errors.Wrapf(err, "add dir %s to watcher", dir)
if err := r.watcher.addDirectory(dir); err != nil {
return errors.Wrapf(err, "add directory %s to watcher", dir)
}
}

tick := time.NewTicker(r.watchInterval)
defer tick.Stop()
// Start watching the file-system.
var wg sync.WaitGroup
wg.Add(1)
go func() {
r.watcher.run(ctx)
wg.Done()
}()

r.watches.Set(float64(len(watchables)))
level.Info(r.logger).Log(
"msg", "started watching config file and directories for changes",
"cfg", r.cfgFile,
"out", r.cfgOutputFile,
"dirs", strings.Join(r.watchedDirs, ","))

applyCtx, cancel := context.WithTimeout(ctx, r.watchInterval)

for {
select {
case <-ctx.Done():
return nil
case <-tick.C:
case event := <-watcher.Events:
r.watchEvents.Inc()
if _, ok := watchables[filepath.Dir(event.Name)]; !ok {
continue
case <-applyCtx.Done():
if ctx.Err() != nil {
cancel()
wg.Wait()
return nil
}
case err := <-watcher.Errors:
r.watchErrors.Inc()
level.Error(r.logger).Log("msg", "watch error", "err", err)
continue
case <-r.watcher.notify:
}

if err := r.apply(ctx); err != nil {
r.configErrors.Inc()
// Reset the watch timeout.
cancel()
applyCtx, cancel = context.WithTimeout(ctx, r.watchInterval)

r.configApply.Inc()
if err := r.apply(applyCtx); err != nil {
r.configApplyErrors.Inc()
level.Error(r.logger).Log("msg", "apply error", "err", err)
}
}
Expand Down Expand Up @@ -341,11 +338,7 @@ func (r *Reloader) apply(ctx context.Context) error {
return nil
}

// Retry trigger reload until it succeeded or next tick is near.
retryCtx, cancel := context.WithTimeout(ctx, r.watchInterval)
defer cancel()

if err := runutil.RetryWithLog(r.logger, r.retryInterval, retryCtx.Done(), func() error {
if err := runutil.RetryWithLog(r.logger, r.retryInterval, ctx.Done(), func() error {
r.reloads.Inc()
if err := r.triggerReload(ctx); err != nil {
r.reloadErrors.Inc()
Expand All @@ -355,7 +348,7 @@ func (r *Reloader) apply(ctx context.Context) error {
r.lastCfgHash = cfgHash
r.lastWatchedDirsHash = watchedDirsHash
level.Info(r.logger).Log(
"msg", "Prometheus reload triggered",
"msg", "Reload triggered",
"cfg_in", r.cfgFile,
"cfg_out", r.cfgOutputFile,
"watched_dirs", strings.Join(r.watchedDirs, ", "))
Expand Down Expand Up @@ -434,3 +427,149 @@ func expandEnv(b []byte) (r []byte, err error) {
})
return r, err
}

type watcher struct {
notify chan struct{}

w *fsnotify.Watcher
watchedDirs map[string]struct{}
delayInterval time.Duration

logger log.Logger
watchedItems prometheus.Gauge
watchEvents prometheus.Counter
watchErrors prometheus.Counter
}

func newWatcher(logger log.Logger, reg prometheus.Registerer, delayInterval time.Duration) *watcher {
return &watcher{
logger: logger,
delayInterval: delayInterval,
notify: make(chan struct{}),
watchedDirs: make(map[string]struct{}),

watchedItems: promauto.With(reg).NewGauge(
prometheus.GaugeOpts{
Name: "reloader_watches",
Help: "Number of resources watched by the reloader.",
},
),
watchEvents: promauto.With(reg).NewCounter(
prometheus.CounterOpts{
Name: "reloader_watch_events_total",
Help: "Total number of events received by the reloader from the watcher.",
},
),
watchErrors: promauto.With(reg).NewCounter(
prometheus.CounterOpts{
Name: "reloader_watch_errors_total",
Help: "Total number of errors received by the reloader from the watcher.",
},
),
}
}

func (w *watcher) addPath(name string) error {
if w.w == nil {
fsWatcher, err := fsnotify.NewWatcher()
if err != nil {
return errors.Wrap(err, "create watcher")
}
w.w = fsWatcher
}

if err := w.w.Add(name); err != nil {
return err
}

w.watchedDirs[name] = struct{}{}
w.watchedItems.Set(float64(len(w.watchedDirs)))

return nil
}

func (w *watcher) addDirectory(name string) error {
w.watchedDirs[name] = struct{}{}
return w.addPath(name)
}

func (w *watcher) addFile(name string) error {
w.watchedDirs[filepath.Dir(name)] = struct{}{}
return w.addPath(name)
}

func (w *watcher) run(ctx context.Context) {
defer runutil.CloseWithLogOnErr(w.logger, w.w, "config watcher close")

var wg sync.WaitGroup
notify := make(chan struct{})

wg.Add(1)
go func() {
defer wg.Done()

var (
delayCtx context.Context
cancel context.CancelFunc
)

for {
select {
case <-ctx.Done():
if cancel != nil {
cancel()
}
return

case <-notify:
if cancel != nil {
cancel()
}

delayCtx, cancel = context.WithCancel(ctx)

wg.Add(1)
go func(ctx context.Context) {
defer wg.Done()

if w.delayInterval > 0 {
t := time.NewTicker(w.delayInterval)
defer t.Stop()

select {
case <-ctx.Done():
return
case <-t.C:
}
}

select {
case w.notify <- struct{}{}:
case <-ctx.Done():
}
}(delayCtx)
}
}
}()

for {
select {
case <-ctx.Done():
wg.Wait()
return

case event := <-w.w.Events:
w.watchEvents.Inc()
if _, ok := w.watchedDirs[filepath.Dir(event.Name)]; ok {
select {
case notify <- struct{}{}:
default:
}
}

case err := <-w.w.Errors:
w.watchErrors.Inc()
level.Error(w.logger).Log("msg", "watch error", "err", err)
}
}
}
Loading

0 comments on commit 434e274

Please sign in to comment.