Skip to content

Commit

Permalink
Reloader: Add support for watching and decompressing Prometheus sub-c…
Browse files Browse the repository at this point in the history
…onfiguration directories
  • Loading branch information
TheSpiritXIII committed Mar 8, 2024
1 parent 5910ed6 commit 5e29732
Show file tree
Hide file tree
Showing 3 changed files with 669 additions and 37 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#7105](https://github.com/thanos-io/thanos/pull/7105) Rule: add flag `--query.enable-x-functions` to allow usage of extended promql functions (xrate, xincrease, xdelta) in loaded rules
- [#6867](https://github.com/thanos-io/thanos/pull/6867) Query UI: Tenant input box added to the Query UI, in order to be able to specify which tenant the query should use.
- [#7175](https://github.com/thanos-io/thanos/pull/7175): Query: Add `--query.mode=distributed` which enables the new distributed mode of the Thanos query engine.
- [#7199](https://github.com/thanos-io/thanos/pull/7199): Reloader: Add support for watching and decompressing Prometheus sub-configuration directories

### Changed

Expand Down
187 changes: 151 additions & 36 deletions pkg/reloader/reloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
"fmt"
"hash"
"io"
"maps"
"net/http"
"net/url"
"os"
Expand Down Expand Up @@ -89,6 +90,7 @@ type Reloader struct {
logger log.Logger
cfgFile string
cfgOutputFile string
subCfgs []SubConfigOptions
watchInterval time.Duration
retryInterval time.Duration
watchedDirs []string
Expand All @@ -97,7 +99,9 @@ type Reloader struct {
tr TriggerReloader

lastCfgHash []byte
lastSubCfgsHash []byte
lastWatchedDirsHash []byte
lastSubCfgFiles []map[string]struct{}
forceReload bool

reloads prometheus.Counter
Expand All @@ -114,6 +118,20 @@ type TriggerReloader interface {
TriggerReload(ctx context.Context) error
}

// SubConfigOptions contains options for a sub-config. For example, these could be something
// such as additional scrape config files or rule files listed in the main Prometheus
// configuration. Sub-directories are ignored.
type SubConfigOptions struct {
// Dir is the path containing the Prometheus configurations to watch.
Dir string

// OutputDir is a directory path to output configurations. If OutputDir is not empty,
// then all config files in the Dir directory are decompressed if needed, environment
// variables will be substituted and the output written into the given path. Prometheus
// should then use OutputDir as its config path.
OutputDir string
}

// Options bundles options for the Reloader.
type Options struct {
// ReloadURL is the Prometheus URL to trigger reloads.
Expand All @@ -133,13 +151,15 @@ type Options struct {
// successful.
RuntimeInfoURL *url.URL

// CfgFile is a path to the prometheus config file to watch.
// CfgFile is a path to the Prometheus config file to watch.
CfgFile string
// CfgOutputFile is a path for the output config file.
// If cfgOutputFile is not empty the config file will be decompressed if needed, environment variables
// will be substituted and the output written into the given path. Prometheus should then use
// cfgOutputFile as its config file path.
CfgOutputFile string
// SubCfgs is a collection of path to Prometheus sub-configs to watch.
SubCfgs []SubConfigOptions
// WatchedDirs is a collection of paths for the reloader to watch over.
WatchedDirs []string
// DelayInterval controls how long the reloader will wait without receiving
Expand All @@ -161,13 +181,15 @@ func New(logger log.Logger, reg prometheus.Registerer, o *Options) *Reloader {
logger = log.NewNopLogger()
}
r := &Reloader{
logger: logger,
cfgFile: o.CfgFile,
cfgOutputFile: o.CfgOutputFile,
watcher: newWatcher(logger, reg, o.DelayInterval),
watchedDirs: o.WatchedDirs,
watchInterval: o.WatchInterval,
retryInterval: o.RetryInterval,
logger: logger,
cfgFile: o.CfgFile,
cfgOutputFile: o.CfgOutputFile,
subCfgs: o.SubCfgs,
lastSubCfgFiles: make([]map[string]struct{}, len(o.SubCfgs)),
watcher: newWatcher(logger, reg, o.DelayInterval),
watchedDirs: o.WatchedDirs,
watchInterval: o.WatchInterval,
retryInterval: o.RetryInterval,

reloads: promauto.With(reg).NewCounter(
prometheus.CounterOpts{
Expand Down Expand Up @@ -234,7 +256,7 @@ func New(logger log.Logger, reg prometheus.Registerer, o *Options) *Reloader {
// Because some edge cases might be missing, the reloader also relies on the
// watch interval.
func (r *Reloader) Watch(ctx context.Context) error {
if r.cfgFile == "" && len(r.watchedDirs) == 0 {
if r.cfgFile == "" && len(r.subCfgs) == 0 && len(r.watchedDirs) == 0 {
level.Info(r.logger).Log("msg", "nothing to be watched")
<-ctx.Done()
return nil
Expand All @@ -260,6 +282,13 @@ func (r *Reloader) Watch(ctx context.Context) error {
}
}

for _, subCfg := range r.subCfgs {
dir := subCfg.Dir
if err := r.watcher.addDirectory(dir); err != nil {
return errors.Wrapf(err, "add directory %s to watcher", dir)
}
}

if r.watchInterval == 0 {
// Skip watching the file-system.
return nil
Expand All @@ -279,9 +308,18 @@ func (r *Reloader) Watch(ctx context.Context) error {
wg.Done()
}()

var subCfgsBuilder strings.Builder
for i, subCfg := range r.subCfgs {
subCfgsBuilder.WriteString(subCfg.Dir)
if i != len(r.subCfgs)-1 {
subCfgsBuilder.WriteRune(',')
}
}

level.Info(r.logger).Log(
"msg", "started watching config file and directories for changes",
"cfg", r.cfgFile,
"subCfgs", subCfgsBuilder.String(),
"out", r.cfgOutputFile,
"dirs", strings.Join(r.watchedDirs, ","))

Expand Down Expand Up @@ -311,12 +349,51 @@ func (r *Reloader) Watch(ctx context.Context) error {
}
}

func normalize(logger log.Logger, inputFile, outputFile string) error {
b, err := os.ReadFile(inputFile)
if err != nil {
return errors.Wrap(err, "read file")
}

// Detect and extract gzipped file.
if bytes.Equal(b[0:3], firstGzipBytes) {
zr, err := gzip.NewReader(bytes.NewReader(b))
if err != nil {
return errors.Wrap(err, "create gzip reader")
}
defer runutil.CloseWithLogOnErr(logger, zr, "gzip reader close")

b, err = io.ReadAll(zr)
if err != nil {
return errors.Wrap(err, "read compressed config file")
}
}

b, err = expandEnv(b)
if err != nil {
return errors.Wrap(err, "expand environment variables")
}

tmpFile := outputFile + ".tmp"
defer func() {
_ = os.Remove(tmpFile)
}()
if err := os.WriteFile(tmpFile, b, 0644); err != nil {
return errors.Wrap(err, "write file")
}
if err := os.Rename(tmpFile, outputFile); err != nil {
return errors.Wrap(err, "rename file")
}
return nil
}

// apply triggers Prometheus reload if rules or config changed. If cfgOutputFile is set, we also
// expand env vars into config file before reloading.
// Reload is retried in retryInterval until watchInterval.
func (r *Reloader) apply(ctx context.Context) error {
var (
cfgHash []byte
subCfgsHash []byte
watchedDirsHash []byte
)

Expand All @@ -327,44 +404,70 @@ func (r *Reloader) apply(ctx context.Context) error {
}
cfgHash = h.Sum(nil)
if r.cfgOutputFile != "" {
b, err := os.ReadFile(r.cfgFile)
if err != nil {
return errors.Wrap(err, "read file")
if err := normalize(r.logger, r.cfgFile, r.cfgOutputFile); err != nil {
return err
}
}
}

// Detect and extract gzipped file.
if bytes.Equal(b[0:3], firstGzipBytes) {
zr, err := gzip.NewReader(bytes.NewReader(b))
if err != nil {
return errors.Wrap(err, "create gzip reader")
}
defer runutil.CloseWithLogOnErr(r.logger, zr, "gzip reader close")
h := sha256.New()
for i, subCfg := range r.subCfgs {
walkDir, err := filepath.EvalSymlinks(subCfg.Dir)
if err != nil {
return errors.Wrap(err, "dir symlink eval")
}
outDir, err := filepath.EvalSymlinks(subCfg.OutputDir)
if err != nil {
return errors.Wrap(err, "dir symlink eval")
}

b, err = io.ReadAll(zr)
if err != nil {
return errors.Wrap(err, "read compressed config file")
}
}
subCfgFiles := map[string]struct{}{}
entries, err := os.ReadDir(walkDir)
if err != nil {
return errors.Wrapf(err, "read dir: %s", walkDir)
}
for _, entry := range entries {
path := filepath.Join(walkDir, entry.Name())

b, err = expandEnv(b)
// Make sure to follow a symlink before checking if it is a directory.
targetFile, err := os.Stat(path)
if err != nil {
return errors.Wrap(err, "expand environment variables")
return errors.Wrapf(err, "stat file: %s", path)
}

tmpFile := r.cfgOutputFile + ".tmp"
defer func() {
_ = os.Remove(tmpFile)
}()
if err := os.WriteFile(tmpFile, b, 0644); err != nil {
return errors.Wrap(err, "write file")
if targetFile.IsDir() {
continue
}
if err := os.Rename(tmpFile, r.cfgOutputFile); err != nil {
return errors.Wrap(err, "rename file")

if err := hashFile(h, path); err != nil {
return errors.Wrapf(err, "build hash for file: %s", path)
}

outFile := filepath.Join(outDir, targetFile.Name())
subCfgFiles[outFile] = struct{}{}
if err := normalize(r.logger, path, outFile); err != nil {
return errors.Wrapf(err, "move file: %s", path)
}
}
if r.lastSubCfgFiles[i] != nil {
if !maps.Equal(r.lastSubCfgFiles[i], subCfgFiles) {
for outFile := range r.lastSubCfgFiles[i] {
if _, ok := subCfgFiles[outFile]; !ok {
if err := os.Remove(outFile); err != nil {
return err
}
}
}
}
}
r.lastSubCfgFiles[i] = subCfgFiles
}

h := sha256.New()
if len(r.subCfgs) > 0 {
subCfgsHash = h.Sum(nil)
}

h = sha256.New()
for _, dir := range r.watchedDirs {
walkDir, err := filepath.EvalSymlinks(dir)
if err != nil {
Expand Down Expand Up @@ -401,7 +504,7 @@ func (r *Reloader) apply(ctx context.Context) error {
watchedDirsHash = h.Sum(nil)
}

if !r.forceReload && bytes.Equal(r.lastCfgHash, cfgHash) && bytes.Equal(r.lastWatchedDirsHash, watchedDirsHash) {
if !r.forceReload && bytes.Equal(r.lastCfgHash, cfgHash) && bytes.Equal(r.lastSubCfgsHash, subCfgsHash) && bytes.Equal(r.lastWatchedDirsHash, watchedDirsHash) {
// Nothing to do.
return nil
}
Expand All @@ -420,11 +523,23 @@ func (r *Reloader) apply(ctx context.Context) error {

r.forceReload = false
r.lastCfgHash = cfgHash
r.lastSubCfgsHash = subCfgsHash
r.lastWatchedDirsHash = watchedDirsHash

var subCfgsBuilder strings.Builder
for i, subCfg := range r.subCfgs {
subCfgsBuilder.WriteString(subCfg.Dir)
subCfgsBuilder.WriteRune('→')
subCfgsBuilder.WriteString(subCfg.OutputDir)
if i != len(r.subCfgs)-1 {
subCfgsBuilder.WriteRune(',')
}
}
level.Info(r.logger).Log(
"msg", "Reload triggered",
"cfg_in", r.cfgFile,
"cfg_out", r.cfgOutputFile,
"subcfgs", subCfgsBuilder.String(),
"watched_dirs", strings.Join(r.watchedDirs, ", "))
r.lastReloadSuccess.Set(1)
r.lastReloadSuccessTimestamp.SetToCurrentTime()
Expand Down
Loading

0 comments on commit 5e29732

Please sign in to comment.