Skip to content

Commit

Permalink
Reloader: Add support for watching and decompressing Prometheus confi…
Browse files Browse the repository at this point in the history
…guration directories (#7199)

Signed-off-by: Daniel Hrabovcak <thespiritxiii@gmail.com>
  • Loading branch information
TheSpiritXIII committed Mar 12, 2024
1 parent 7acce0c commit 7eda7ff
Show file tree
Hide file tree
Showing 3 changed files with 666 additions and 36 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#7105](https://github.com/thanos-io/thanos/pull/7105) Rule: add flag `--query.enable-x-functions` to allow usage of extended promql functions (xrate, xincrease, xdelta) in loaded rules
- [#6867](https://github.com/thanos-io/thanos/pull/6867) Query UI: Tenant input box added to the Query UI, in order to be able to specify which tenant the query should use.
- [#7175](https://github.com/thanos-io/thanos/pull/7175): Query: Add `--query.mode=distributed` which enables the new distributed mode of the Thanos query engine.
- [#7199](https://github.com/thanos-io/thanos/pull/7199): Reloader: Add support for watching and decompressing Prometheus configuration directories

### Changed

Expand Down
183 changes: 148 additions & 35 deletions pkg/reloader/reloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
"fmt"
"hash"
"io"
"maps"
"net/http"
"net/url"
"os"
Expand Down Expand Up @@ -89,6 +90,7 @@ type Reloader struct {
logger log.Logger
cfgFile string
cfgOutputFile string
cfgDirs []CfgDirOption
watchInterval time.Duration
retryInterval time.Duration
watchedDirs []string
Expand All @@ -97,7 +99,9 @@ type Reloader struct {
tr TriggerReloader

lastCfgHash []byte
lastCfgDirsHash [][]byte
lastWatchedDirsHash []byte
lastCfgDirFiles []map[string]struct{}
forceReload bool

reloads prometheus.Counter
Expand All @@ -114,6 +118,22 @@ type TriggerReloader interface {
TriggerReload(ctx context.Context) error
}

// CfgDirOption contains options for watching directories containing configurations. For example, a
// directory could contain additional scrape config files or rule files listed in the main
// Prometheus configuration. Sub-directories are ignored.
type CfgDirOption struct {
// Dir is the path containing the Prometheus configurations to watch.
Dir string

// OutputDir is a directory path to output configurations. If OutputDir is not empty,
// then all config files in the Dir directory are decompressed if needed, environment
// variables will be substituted and the output written into the given path. Prometheus
// should then use OutputDir as its config path.
OutputDir string

// TODO: https://github.com/thanos-io/thanos/issues/7201
}

// Options bundles options for the Reloader.
type Options struct {
// ReloadURL is the Prometheus URL to trigger reloads.
Expand All @@ -133,13 +153,15 @@ type Options struct {
// successful.
RuntimeInfoURL *url.URL

// CfgFile is a path to the prometheus config file to watch.
// CfgFile is a path to the Prometheus config file to watch.
CfgFile string
// CfgOutputFile is a path for the output config file.
// If cfgOutputFile is not empty the config file will be decompressed if needed, environment variables
// will be substituted and the output written into the given path. Prometheus should then use
// cfgOutputFile as its config file path.
CfgOutputFile string
// CfgDirs is an array of paths to directories containing Prometheus configs to watch.
CfgDirs []CfgDirOption
// WatchedDirs is a collection of paths for the reloader to watch over.
WatchedDirs []string
// DelayInterval controls how long the reloader will wait without receiving
Expand All @@ -161,13 +183,15 @@ func New(logger log.Logger, reg prometheus.Registerer, o *Options) *Reloader {
logger = log.NewNopLogger()
}
r := &Reloader{
logger: logger,
cfgFile: o.CfgFile,
cfgOutputFile: o.CfgOutputFile,
watcher: newWatcher(logger, reg, o.DelayInterval),
watchedDirs: o.WatchedDirs,
watchInterval: o.WatchInterval,
retryInterval: o.RetryInterval,
logger: logger,
cfgFile: o.CfgFile,
cfgOutputFile: o.CfgOutputFile,
cfgDirs: o.CfgDirs,
lastCfgDirFiles: make([]map[string]struct{}, len(o.CfgDirs)),
watcher: newWatcher(logger, reg, o.DelayInterval),
watchedDirs: o.WatchedDirs,
watchInterval: o.WatchInterval,
retryInterval: o.RetryInterval,

reloads: promauto.With(reg).NewCounter(
prometheus.CounterOpts{
Expand Down Expand Up @@ -234,7 +258,7 @@ func New(logger log.Logger, reg prometheus.Registerer, o *Options) *Reloader {
// Because some edge cases might be missing, the reloader also relies on the
// watch interval.
func (r *Reloader) Watch(ctx context.Context) error {
if r.cfgFile == "" && len(r.watchedDirs) == 0 {
if r.cfgFile == "" && len(r.cfgDirs) == 0 && len(r.watchedDirs) == 0 {
level.Info(r.logger).Log("msg", "nothing to be watched")
<-ctx.Done()
return nil
Expand All @@ -260,6 +284,13 @@ func (r *Reloader) Watch(ctx context.Context) error {
}
}

for _, cfgDir := range r.cfgDirs {
dir := cfgDir.Dir
if err := r.watcher.addDirectory(dir); err != nil {
return errors.Wrapf(err, "add directory %s to watcher", dir)
}
}

if r.watchInterval == 0 {
// Skip watching the file-system.
return nil
Expand All @@ -279,9 +310,15 @@ func (r *Reloader) Watch(ctx context.Context) error {
wg.Done()
}()

cfgDirsNames := make([]string, 0, len(r.cfgDirs))
for _, cfgDir := range r.cfgDirs {
cfgDirsNames = append(cfgDirsNames, cfgDir.Dir)
}

level.Info(r.logger).Log(
"msg", "started watching config file and directories for changes",
"cfg", r.cfgFile,
"cfgDirs", strings.Join(cfgDirsNames, ","),
"out", r.cfgOutputFile,
"dirs", strings.Join(r.watchedDirs, ","))

Expand Down Expand Up @@ -311,6 +348,44 @@ func (r *Reloader) Watch(ctx context.Context) error {
}
}

func normalize(logger log.Logger, inputFile, outputFile string) error {
b, err := os.ReadFile(inputFile)
if err != nil {
return errors.Wrap(err, "read file")
}

// Detect and extract gzipped file.
if bytes.Equal(b[0:3], firstGzipBytes) {
zr, err := gzip.NewReader(bytes.NewReader(b))
if err != nil {
return errors.Wrap(err, "create gzip reader")
}
defer runutil.CloseWithLogOnErr(logger, zr, "gzip reader close")

b, err = io.ReadAll(zr)
if err != nil {
return errors.Wrap(err, "read compressed config file")
}
}

b, err = expandEnv(b)
if err != nil {
return errors.Wrap(err, "expand environment variables")
}

tmpFile := outputFile + ".tmp"
defer func() {
_ = os.Remove(tmpFile)
}()
if err := os.WriteFile(tmpFile, b, 0644); err != nil {
return errors.Wrap(err, "write file")
}
if err := os.Rename(tmpFile, outputFile); err != nil {
return errors.Wrap(err, "rename file")
}
return nil
}

// apply triggers Prometheus reload if rules or config changed. If cfgOutputFile is set, we also
// expand env vars into config file before reloading.
// Reload is retried in retryInterval until watchInterval.
Expand All @@ -327,41 +402,72 @@ func (r *Reloader) apply(ctx context.Context) error {
}
cfgHash = h.Sum(nil)
if r.cfgOutputFile != "" {
b, err := os.ReadFile(r.cfgFile)
if err != nil {
return errors.Wrap(err, "read file")
if err := normalize(r.logger, r.cfgFile, r.cfgOutputFile); err != nil {
return err
}
}
}

// Detect and extract gzipped file.
if bytes.Equal(b[0:3], firstGzipBytes) {
zr, err := gzip.NewReader(bytes.NewReader(b))
if err != nil {
return errors.Wrap(err, "create gzip reader")
}
defer runutil.CloseWithLogOnErr(r.logger, zr, "gzip reader close")
cfgDirsHash := make([][]byte, len(r.cfgDirs))
cfgDirsChanged := len(r.lastCfgDirsHash) == 0 && len(r.cfgDirs) > 0
for i, cfgDir := range r.cfgDirs {
h := sha256.New()

b, err = io.ReadAll(zr)
if err != nil {
return errors.Wrap(err, "read compressed config file")
}
}
walkDir, err := filepath.EvalSymlinks(cfgDir.Dir)
if err != nil {
return errors.Wrap(err, "dir symlink eval")
}
outDir, err := filepath.EvalSymlinks(cfgDir.OutputDir)
if err != nil {
return errors.Wrap(err, "dir symlink eval")
}

cfgDirFiles := map[string]struct{}{}
entries, err := os.ReadDir(walkDir)
if err != nil {
return errors.Wrapf(err, "read dir: %s", walkDir)
}
for _, entry := range entries {
path := filepath.Join(walkDir, entry.Name())

b, err = expandEnv(b)
// Make sure to follow a symlink before checking if it is a directory.
targetFile, err := os.Stat(path)
if err != nil {
return errors.Wrap(err, "expand environment variables")
return errors.Wrapf(err, "stat file: %s", path)
}

if targetFile.IsDir() {
continue
}

tmpFile := r.cfgOutputFile + ".tmp"
defer func() {
_ = os.Remove(tmpFile)
}()
if err := os.WriteFile(tmpFile, b, 0644); err != nil {
return errors.Wrap(err, "write file")
if err := hashFile(h, path); err != nil {
return errors.Wrapf(err, "build hash for file: %s", path)
}
if err := os.Rename(tmpFile, r.cfgOutputFile); err != nil {
return errors.Wrap(err, "rename file")

outFile := filepath.Join(outDir, targetFile.Name())
cfgDirFiles[outFile] = struct{}{}
if err := normalize(r.logger, path, outFile); err != nil {
return errors.Wrapf(err, "move file: %s", path)
}
}
if r.lastCfgDirFiles[i] != nil {
if !maps.Equal(r.lastCfgDirFiles[i], cfgDirFiles) {
for outFile := range r.lastCfgDirFiles[i] {
if _, ok := cfgDirFiles[outFile]; !ok {
if err := os.Remove(outFile); err != nil {
return err
}
}
}
}
}
r.lastCfgDirFiles[i] = cfgDirFiles

cfgDirsHash[i] = h.Sum(nil)
// Skip comparing bytes if we already set the flag.
if !cfgDirsChanged && !bytes.Equal(r.lastCfgDirsHash[i], cfgDirsHash[i]) {
cfgDirsChanged = true
}
}

h := sha256.New()
Expand Down Expand Up @@ -401,7 +507,7 @@ func (r *Reloader) apply(ctx context.Context) error {
watchedDirsHash = h.Sum(nil)
}

if !r.forceReload && bytes.Equal(r.lastCfgHash, cfgHash) && bytes.Equal(r.lastWatchedDirsHash, watchedDirsHash) {
if !r.forceReload && !cfgDirsChanged && bytes.Equal(r.lastCfgHash, cfgHash) && bytes.Equal(r.lastWatchedDirsHash, watchedDirsHash) {
// Nothing to do.
return nil
}
Expand All @@ -420,11 +526,18 @@ func (r *Reloader) apply(ctx context.Context) error {

r.forceReload = false
r.lastCfgHash = cfgHash
r.lastCfgDirsHash = cfgDirsHash
r.lastWatchedDirsHash = watchedDirsHash

cfgDirsNames := make([]string, 0, len(r.cfgDirs))
for _, cfgDir := range r.cfgDirs {
cfgDirsNames = append(cfgDirsNames, cfgDir.Dir)
}
level.Info(r.logger).Log(
"msg", "Reload triggered",
"cfg_in", r.cfgFile,
"cfg_out", r.cfgOutputFile,
"cfg_dirs", strings.Join(cfgDirsNames, ", "),
"watched_dirs", strings.Join(r.watchedDirs, ", "))
r.lastReloadSuccess.Set(1)
r.lastReloadSuccessTimestamp.SetToCurrentTime()
Expand Down
Loading

0 comments on commit 7eda7ff

Please sign in to comment.