From 99427cbd99feb16aa57797cd4cf23a35ddbdffa0 Mon Sep 17 00:00:00 2001 From: Daniel Hrabovcak Date: Fri, 8 Mar 2024 11:02:47 -0500 Subject: [PATCH] Reloader: Add support for watching and decompressing Prometheus sub-configuration directories Signed-off-by: Daniel Hrabovcak --- CHANGELOG.md | 1 + pkg/reloader/reloader.go | 181 +++++++++--- pkg/reloader/reloader_test.go | 518 +++++++++++++++++++++++++++++++++- 3 files changed, 664 insertions(+), 36 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 69b05049f84..77666149f2f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#7105](https://github.com/thanos-io/thanos/pull/7105) Rule: add flag `--query.enable-x-functions` to allow usage of extended promql functions (xrate, xincrease, xdelta) in loaded rules - [#6867](https://github.com/thanos-io/thanos/pull/6867) Query UI: Tenant input box added to the Query UI, in order to be able to specify which tenant the query should use. - [#7175](https://github.com/thanos-io/thanos/pull/7175): Query: Add `--query.mode=distributed` which enables the new distributed mode of the Thanos query engine. +- [#7199](https://github.com/thanos-io/thanos/pull/7199): Reloader: Add support for watching and decompressing Prometheus sub-configuration directories ### Changed diff --git a/pkg/reloader/reloader.go b/pkg/reloader/reloader.go index f4ba89a0eca..de04ad4f709 100644 --- a/pkg/reloader/reloader.go +++ b/pkg/reloader/reloader.go @@ -59,6 +59,7 @@ import ( "fmt" "hash" "io" + "maps" "net/http" "net/url" "os" @@ -89,6 +90,7 @@ type Reloader struct { logger log.Logger cfgFile string cfgOutputFile string + cfgDirs []CfgDirOption watchInterval time.Duration retryInterval time.Duration watchedDirs []string @@ -97,7 +99,9 @@ type Reloader struct { tr TriggerReloader lastCfgHash []byte + lastCfgDirsHash [][]byte lastWatchedDirsHash []byte + lastCfgDirFiles []map[string]struct{} forceReload bool reloads prometheus.Counter @@ -114,6 +118,20 @@ type TriggerReloader interface { TriggerReload(ctx context.Context) error } +// CfgDirOption contains options for watching directories containing configurations. For +// example, a directory could be contain such as additional scrape config files or rule +// files listed in the main Prometheus configuration. Sub-directories are ignored. +type CfgDirOption struct { + // Dir is the path containing the Prometheus configurations to watch. + Dir string + + // OutputDir is a directory path to output configurations. If OutputDir is not empty, + // then all config files in the Dir directory are decompressed if needed, environment + // variables will be substituted and the output written into the given path. Prometheus + // should then use OutputDir as its config path. + OutputDir string +} + // Options bundles options for the Reloader. type Options struct { // ReloadURL is the Prometheus URL to trigger reloads. @@ -133,13 +151,15 @@ type Options struct { // successful. RuntimeInfoURL *url.URL - // CfgFile is a path to the prometheus config file to watch. + // CfgFile is a path to the Prometheus config file to watch. CfgFile string // CfgOutputFile is a path for the output config file. // If cfgOutputFile is not empty the config file will be decompressed if needed, environment variables // will be substituted and the output written into the given path. Prometheus should then use // cfgOutputFile as its config file path. CfgOutputFile string + // CfgDirs is an array of paths to directories containing Prometheus configs to watch. + CfgDirs []CfgDirOption // WatchedDirs is a collection of paths for the reloader to watch over. WatchedDirs []string // DelayInterval controls how long the reloader will wait without receiving @@ -161,13 +181,15 @@ func New(logger log.Logger, reg prometheus.Registerer, o *Options) *Reloader { logger = log.NewNopLogger() } r := &Reloader{ - logger: logger, - cfgFile: o.CfgFile, - cfgOutputFile: o.CfgOutputFile, - watcher: newWatcher(logger, reg, o.DelayInterval), - watchedDirs: o.WatchedDirs, - watchInterval: o.WatchInterval, - retryInterval: o.RetryInterval, + logger: logger, + cfgFile: o.CfgFile, + cfgOutputFile: o.CfgOutputFile, + cfgDirs: o.CfgDirs, + lastCfgDirFiles: make([]map[string]struct{}, len(o.CfgDirs)), + watcher: newWatcher(logger, reg, o.DelayInterval), + watchedDirs: o.WatchedDirs, + watchInterval: o.WatchInterval, + retryInterval: o.RetryInterval, reloads: promauto.With(reg).NewCounter( prometheus.CounterOpts{ @@ -234,7 +256,7 @@ func New(logger log.Logger, reg prometheus.Registerer, o *Options) *Reloader { // Because some edge cases might be missing, the reloader also relies on the // watch interval. func (r *Reloader) Watch(ctx context.Context) error { - if r.cfgFile == "" && len(r.watchedDirs) == 0 { + if r.cfgFile == "" && len(r.cfgDirs) == 0 && len(r.watchedDirs) == 0 { level.Info(r.logger).Log("msg", "nothing to be watched") <-ctx.Done() return nil @@ -260,6 +282,13 @@ func (r *Reloader) Watch(ctx context.Context) error { } } + for _, cfgDir := range r.cfgDirs { + dir := cfgDir.Dir + if err := r.watcher.addDirectory(dir); err != nil { + return errors.Wrapf(err, "add directory %s to watcher", dir) + } + } + if r.watchInterval == 0 { // Skip watching the file-system. return nil @@ -279,9 +308,15 @@ func (r *Reloader) Watch(ctx context.Context) error { wg.Done() }() + cfgDirsNames := make([]string, 0, len(r.cfgDirs)) + for _, cfgDir := range r.cfgDirs { + cfgDirsNames = append(cfgDirsNames, cfgDir.Dir) + } + level.Info(r.logger).Log( "msg", "started watching config file and directories for changes", "cfg", r.cfgFile, + "cfgDirs", strings.Join(cfgDirsNames, ","), "out", r.cfgOutputFile, "dirs", strings.Join(r.watchedDirs, ",")) @@ -311,6 +346,44 @@ func (r *Reloader) Watch(ctx context.Context) error { } } +func normalize(logger log.Logger, inputFile, outputFile string) error { + b, err := os.ReadFile(inputFile) + if err != nil { + return errors.Wrap(err, "read file") + } + + // Detect and extract gzipped file. + if bytes.Equal(b[0:3], firstGzipBytes) { + zr, err := gzip.NewReader(bytes.NewReader(b)) + if err != nil { + return errors.Wrap(err, "create gzip reader") + } + defer runutil.CloseWithLogOnErr(logger, zr, "gzip reader close") + + b, err = io.ReadAll(zr) + if err != nil { + return errors.Wrap(err, "read compressed config file") + } + } + + b, err = expandEnv(b) + if err != nil { + return errors.Wrap(err, "expand environment variables") + } + + tmpFile := outputFile + ".tmp" + defer func() { + _ = os.Remove(tmpFile) + }() + if err := os.WriteFile(tmpFile, b, 0644); err != nil { + return errors.Wrap(err, "write file") + } + if err := os.Rename(tmpFile, outputFile); err != nil { + return errors.Wrap(err, "rename file") + } + return nil +} + // apply triggers Prometheus reload if rules or config changed. If cfgOutputFile is set, we also // expand env vars into config file before reloading. // Reload is retried in retryInterval until watchInterval. @@ -327,41 +400,72 @@ func (r *Reloader) apply(ctx context.Context) error { } cfgHash = h.Sum(nil) if r.cfgOutputFile != "" { - b, err := os.ReadFile(r.cfgFile) - if err != nil { - return errors.Wrap(err, "read file") + if err := normalize(r.logger, r.cfgFile, r.cfgOutputFile); err != nil { + return err } + } + } - // Detect and extract gzipped file. - if bytes.Equal(b[0:3], firstGzipBytes) { - zr, err := gzip.NewReader(bytes.NewReader(b)) - if err != nil { - return errors.Wrap(err, "create gzip reader") - } - defer runutil.CloseWithLogOnErr(r.logger, zr, "gzip reader close") + cfgDirsHash := make([][]byte, len(r.cfgDirs)) + cfgDirsChanged := len(r.lastCfgDirsHash) == 0 && len(r.cfgDirs) > 0 + for i, cfgDir := range r.cfgDirs { + h := sha256.New() - b, err = io.ReadAll(zr) - if err != nil { - return errors.Wrap(err, "read compressed config file") - } - } + walkDir, err := filepath.EvalSymlinks(cfgDir.Dir) + if err != nil { + return errors.Wrap(err, "dir symlink eval") + } + outDir, err := filepath.EvalSymlinks(cfgDir.OutputDir) + if err != nil { + return errors.Wrap(err, "dir symlink eval") + } + + cfgDirFiles := map[string]struct{}{} + entries, err := os.ReadDir(walkDir) + if err != nil { + return errors.Wrapf(err, "read dir: %s", walkDir) + } + for _, entry := range entries { + path := filepath.Join(walkDir, entry.Name()) - b, err = expandEnv(b) + // Make sure to follow a symlink before checking if it is a directory. + targetFile, err := os.Stat(path) if err != nil { - return errors.Wrap(err, "expand environment variables") + return errors.Wrapf(err, "stat file: %s", path) + } + + if targetFile.IsDir() { + continue } - tmpFile := r.cfgOutputFile + ".tmp" - defer func() { - _ = os.Remove(tmpFile) - }() - if err := os.WriteFile(tmpFile, b, 0644); err != nil { - return errors.Wrap(err, "write file") + if err := hashFile(h, path); err != nil { + return errors.Wrapf(err, "build hash for file: %s", path) } - if err := os.Rename(tmpFile, r.cfgOutputFile); err != nil { - return errors.Wrap(err, "rename file") + + outFile := filepath.Join(outDir, targetFile.Name()) + cfgDirFiles[outFile] = struct{}{} + if err := normalize(r.logger, path, outFile); err != nil { + return errors.Wrapf(err, "move file: %s", path) } } + if r.lastCfgDirFiles[i] != nil { + if !maps.Equal(r.lastCfgDirFiles[i], cfgDirFiles) { + for outFile := range r.lastCfgDirFiles[i] { + if _, ok := cfgDirFiles[outFile]; !ok { + if err := os.Remove(outFile); err != nil { + return err + } + } + } + } + } + r.lastCfgDirFiles[i] = cfgDirFiles + + cfgDirsHash[i] = h.Sum(nil) + // Skip comparing bytes if we already set the flag. + if !cfgDirsChanged && !bytes.Equal(r.lastCfgDirsHash[i], cfgDirsHash[i]) { + cfgDirsChanged = true + } } h := sha256.New() @@ -401,7 +505,7 @@ func (r *Reloader) apply(ctx context.Context) error { watchedDirsHash = h.Sum(nil) } - if !r.forceReload && bytes.Equal(r.lastCfgHash, cfgHash) && bytes.Equal(r.lastWatchedDirsHash, watchedDirsHash) { + if !r.forceReload && !cfgDirsChanged && bytes.Equal(r.lastCfgHash, cfgHash) && bytes.Equal(r.lastWatchedDirsHash, watchedDirsHash) { // Nothing to do. return nil } @@ -420,11 +524,18 @@ func (r *Reloader) apply(ctx context.Context) error { r.forceReload = false r.lastCfgHash = cfgHash + r.lastCfgDirsHash = cfgDirsHash r.lastWatchedDirsHash = watchedDirsHash + + cfgDirsNames := make([]string, 0, len(r.cfgDirs)) + for _, cfgDir := range r.cfgDirs { + cfgDirsNames = append(cfgDirsNames, cfgDir.Dir) + } level.Info(r.logger).Log( "msg", "Reload triggered", "cfg_in", r.cfgFile, "cfg_out", r.cfgOutputFile, + "cfg_dirs", strings.Join(cfgDirsNames, ", "), "watched_dirs", strings.Join(r.watchedDirs, ", ")) r.lastReloadSuccess.Set(1) r.lastReloadSuccessTimestamp.SetToCurrentTime() diff --git a/pkg/reloader/reloader_test.go b/pkg/reloader/reloader_test.go index df484ce1598..c2448b6ea12 100644 --- a/pkg/reloader/reloader_test.go +++ b/pkg/reloader/reloader_test.go @@ -13,6 +13,7 @@ import ( "path" "path/filepath" "runtime" + "slices" "strings" "sync" "testing" @@ -72,6 +73,7 @@ func TestReloader_ConfigApply(t *testing.T) { ReloadURL: reloadURL, CfgFile: input, CfgOutputFile: output, + CfgDirs: nil, WatchedDirs: nil, WatchInterval: 9999 * time.Hour, // Disable interval to test watch logic only. RetryInterval: 100 * time.Millisecond, @@ -216,6 +218,7 @@ faulty_config: ReloadURL: reloadURL, CfgFile: input, CfgOutputFile: output, + CfgDirs: nil, WatchedDirs: nil, WatchInterval: 10 * time.Second, // 10 seconds to make the reload of faulty config fail quick RetryInterval: 100 * time.Millisecond, @@ -275,6 +278,516 @@ faulty_config: g.Wait() } +func TestReloader_ConfigDirApply(t *testing.T) { + l, err := net.Listen("tcp", "localhost:0") + testutil.Ok(t, err) + + i := 0 + reloads := 0 + reloadsMtx := sync.Mutex{} + + srv := &http.Server{} + srv.Handler = http.HandlerFunc(func(resp http.ResponseWriter, r *http.Request) { + reloadsMtx.Lock() + defer reloadsMtx.Unlock() + + i++ + if i%2 == 0 { + // Fail every second request to ensure that retry works. + resp.WriteHeader(http.StatusServiceUnavailable) + return + } + + reloads++ + resp.WriteHeader(http.StatusOK) + }) + go func() { + _ = srv.Serve(l) + }() + defer func() { testutil.Ok(t, srv.Close()) }() + + reloadURL, err := url.Parse(fmt.Sprintf("http://%s", l.Addr().String())) + testutil.Ok(t, err) + + ruleDir := t.TempDir() + tempRule1File := path.Join(ruleDir, "rule1.yaml") + tempRule3File := path.Join(ruleDir, "rule3.yaml") + tempRule4File := path.Join(ruleDir, "rule4.yaml") + + testutil.Ok(t, os.WriteFile(tempRule1File, []byte("rule1-changed"), os.ModePerm)) + testutil.Ok(t, os.WriteFile(tempRule3File, []byte("rule3-changed"), os.ModePerm)) + testutil.Ok(t, os.WriteFile(tempRule4File, []byte("rule4-changed"), os.ModePerm)) + + dir := t.TempDir() + dir2 := t.TempDir() + + outDir := t.TempDir() + outDir2 := t.TempDir() + + // dir + // └─ rule-dir -> dir2/rule-dir + // dir2 + // └─ rule-dir + testutil.Ok(t, os.Mkdir(path.Join(dir2, "rule-dir"), os.ModePerm)) + testutil.Ok(t, os.Symlink(path.Join(dir2, "rule-dir"), path.Join(dir, "rule-dir"))) + + logger := log.NewNopLogger() + r := prometheus.NewRegistry() + reloader := New( + logger, + r, + &Options{ + ReloadURL: reloadURL, + CfgFile: "", + CfgOutputFile: "", + CfgDirs: []CfgDirOption{ + { + Dir: dir, + OutputDir: outDir, + }, + { + Dir: dir2, + OutputDir: outDir2, + }, + }, + WatchedDirs: nil, + WatchInterval: 9999 * time.Hour, // Disable interval to test watch logic only. + RetryInterval: 100 * time.Millisecond, + }) + + // dir + // ├─ rule-dir -> dir2/rule-dir + // └─ rule1.yaml + // dir2 + // ├─ rule-dir + // │ └─ rule4.yaml + // ├─ rule3-001.yaml -> rule3-source.yaml + // └─ rule3-source.yaml + // The reloader watches 2 directories: dir and dir/rule-dir. + testutil.Ok(t, os.WriteFile(path.Join(dir, "rule1.yaml"), []byte("rule"), os.ModePerm)) + testutil.Ok(t, os.WriteFile(path.Join(dir2, "rule3-source.yaml"), []byte("rule3"), os.ModePerm)) + testutil.Ok(t, os.Symlink(path.Join(dir2, "rule3-source.yaml"), path.Join(dir2, "rule3-001.yaml"))) + testutil.Ok(t, os.WriteFile(path.Join(dir2, "rule-dir", "rule4.yaml"), []byte("rule4"), os.ModePerm)) + + stepFunc := func(rel int) { + t.Log("Performing step number", rel) + switch rel { + case 0: + // Create rule2.yaml. + // + // dir + // ├─ rule-dir -> dir2/rule-dir + // ├─ rule1.yaml + // └─ rule2.yaml (*) + // dir2 + // ├─ rule-dir + // │ └─ rule4.yaml + // ├─ rule3-001.yaml -> rule3-source.yaml + // └─ rule3-source.yaml + testutil.Ok(t, os.WriteFile(path.Join(dir, "rule2.yaml"), []byte("rule2"), os.ModePerm)) + // out1 + // ├─ rule1.yaml + // └─ rule2.yaml + // out2 + // ├─ rule3-001.yaml + // └─ rule3-source.yaml + case 1: + // Update rule1.yaml. + // + // dir + // ├─ rule-dir -> dir2/rule-dir + // ├─ rule1.yaml (*) + // └─ rule2.yaml + // dir2 + // ├─ rule-dir + // │ └─ rule4.yaml + // ├─ rule3-001.yaml -> rule3-source.yaml + // └─ rule3-source.yaml + testutil.Ok(t, os.Rename(tempRule1File, path.Join(dir, "rule1.yaml"))) + // out1 + // ├─ rule1.yaml + // └─ rule2.yaml + // out2 + // ├─ rule3-001.yaml + // └─ rule3-source.yaml + case 2: + // Create dir/rule3.yaml (symlink to rule3-001.yaml). + // + // dir + // ├─ rule-dir -> dir2/rule-dir + // ├─ rule1.yaml + // ├─ rule2.yaml + // └─ rule3.yaml -> dir2/rule3-001.yaml (*) + // dir2 + // ├─ rule-dir + // │ └─ rule4.yaml + // ├─ rule3-001.yaml -> rule3-source.yaml + // └─ rule3-source.yaml + testutil.Ok(t, os.Symlink(path.Join(dir2, "rule3-001.yaml"), path.Join(dir2, "rule3.yaml"))) + testutil.Ok(t, os.Rename(path.Join(dir2, "rule3.yaml"), path.Join(dir, "rule3.yaml"))) + // out1 + // ├─ rule1.yaml + // ├─ rule2.yaml + // └─ rule3.yaml + // out2 + // ├─ rule3-001.yaml + // └─ rule3-source.yaml + case 3: + // Update the symlinked file and replace the symlink file to trigger fsnotify. + // + // dir + // ├─ rule-dir -> dir2/rule-dir + // ├─ rule1.yaml + // ├─ rule2.yaml + // └─ rule3.yaml -> dir2/rule3-002.yaml (*) + // dir2 + // ├─ rule-dir + // │ └─ rule4.yaml + // ├─ rule3-002.yaml -> rule3-source.yaml (*) + // └─ rule3-source.yaml (*) + testutil.Ok(t, os.Rename(tempRule3File, path.Join(dir2, "rule3-source.yaml"))) + testutil.Ok(t, os.Symlink(path.Join(dir2, "rule3-source.yaml"), path.Join(dir2, "rule3-002.yaml"))) + testutil.Ok(t, os.Symlink(path.Join(dir2, "rule3-002.yaml"), path.Join(dir2, "rule3.yaml"))) + testutil.Ok(t, os.Rename(path.Join(dir2, "rule3.yaml"), path.Join(dir, "rule3.yaml"))) + testutil.Ok(t, os.Remove(path.Join(dir2, "rule3-001.yaml"))) + // out1 + // ├─ rule1.yaml + // ├─ rule2.yaml + // └─ rule3.yaml + // out2 + // ├─ rule3-002.yaml + // └─ rule3-source.yaml + case 4: + // Update rule4.yaml in the symlinked directory. + // + // dir + // ├─ rule-dir -> dir2/rule-dir + // ├─ rule1.yaml + // ├─ rule2.yaml + // └─ rule3.yaml -> rule3-source.yaml + // dir2 + // ├─ rule-dir + // │ └─ rule4.yaml (*) + // └─ rule3-source.yaml + testutil.Ok(t, os.Rename(tempRule4File, path.Join(dir2, "rule-dir", "rule4.yaml"))) + // out1 + // ├─ rule1.yaml + // ├─ rule2.yaml + // └─ rule3.yaml + // out2 + // ├─ rule3-002.yaml + // └─ rule3-source.yaml + } + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + g := sync.WaitGroup{} + g.Add(1) + go func() { + defer g.Done() + defer cancel() + + reloadsSeen := 0 + init := false + for { + runtime.Gosched() // Ensure during testing on small machine, other go routines have chance to continue. + + select { + case <-ctx.Done(): + return + case <-time.After(500 * time.Millisecond): + } + + reloadsMtx.Lock() + rel := reloads + reloadsMtx.Unlock() + if init && rel <= reloadsSeen { + continue + } + + // Catch up if reloader is step(s) ahead. + for skipped := rel - reloadsSeen - 1; skipped > 0; skipped-- { + stepFunc(rel - skipped) + } + + stepFunc(rel) + + init = true + reloadsSeen = rel + + if rel > 4 { + // All good. + return + } + } + }() + err = reloader.Watch(ctx) + cancel() + g.Wait() + + testutil.Ok(t, err) + testutil.Equals(t, 12.0, promtest.ToFloat64(reloader.watcher.watchEvents)) + testutil.Equals(t, 0.0, promtest.ToFloat64(reloader.watcher.watchErrors)) + testutil.Equals(t, 3.0, promtest.ToFloat64(reloader.reloadErrors)) + testutil.Equals(t, 7.0, promtest.ToFloat64(reloader.reloads)) + testutil.Equals(t, 4, reloads) + + outEntries, err := os.ReadDir(outDir) + testutil.Ok(t, err) + outFiles := []string{} + for _, entry := range outEntries { + outFiles = append(outFiles, entry.Name()) + } + slices.Sort(outFiles) + expectedOutFiles := []string{ + "rule1.yaml", + "rule2.yaml", + "rule3.yaml", + } + slices.Sort(expectedOutFiles) + testutil.Equals(t, expectedOutFiles, outFiles) + + data, err := os.ReadFile(filepath.Join(outDir, "rule1.yaml")) + testutil.Ok(t, err) + testutil.Equals(t, "rule1-changed", string(data)) + data, err = os.ReadFile(filepath.Join(outDir, "rule2.yaml")) + testutil.Ok(t, err) + testutil.Equals(t, "rule2", string(data)) + data, err = os.ReadFile(filepath.Join(outDir, "rule3.yaml")) + testutil.Ok(t, err) + testutil.Equals(t, "rule3-changed", string(data)) + + outEntries2, err := os.ReadDir(outDir2) + testutil.Ok(t, err) + outFiles2 := []string{} + for _, entry := range outEntries2 { + outFiles2 = append(outFiles2, entry.Name()) + } + slices.Sort(outFiles2) + expectedOutFiles2 := []string{ + "rule3-002.yaml", + "rule3-source.yaml", + } + slices.Sort(expectedOutFiles2) + testutil.Equals(t, expectedOutFiles2, outFiles2) + + data, err = os.ReadFile(filepath.Join(outDir2, "rule3-002.yaml")) + testutil.Ok(t, err) + testutil.Equals(t, "rule3-changed", string(data)) + data, err = os.ReadFile(filepath.Join(outDir2, "rule3-source.yaml")) + testutil.Ok(t, err) + testutil.Equals(t, "rule3-changed", string(data)) +} + +func TestReloader_ConfigDirApplyBasedOnWatchInterval(t *testing.T) { + l, err := net.Listen("tcp", "localhost:0") + testutil.Ok(t, err) + + reloads := &atomic.Value{} + reloads.Store(0) + srv := &http.Server{} + srv.Handler = http.HandlerFunc(func(resp http.ResponseWriter, r *http.Request) { + reloads.Store(reloads.Load().(int) + 1) // The only writer. + resp.WriteHeader(http.StatusOK) + }) + go func() { + _ = srv.Serve(l) + }() + defer func() { testutil.Ok(t, srv.Close()) }() + + reloadURL, err := url.Parse(fmt.Sprintf("http://%s", l.Addr().String())) + testutil.Ok(t, err) + + dir := t.TempDir() + dir2 := t.TempDir() + + outDir := t.TempDir() + outDir2 := t.TempDir() + + // dir + // └─ rule-dir -> dir2/rule-dir + // dir2 + // └─ rule-dir + testutil.Ok(t, os.Mkdir(path.Join(dir2, "rule-dir"), os.ModePerm)) + testutil.Ok(t, os.Symlink(path.Join(dir2, "rule-dir"), path.Join(dir, "rule-dir"))) + + logger := log.NewNopLogger() + reloader := New( + logger, + nil, + &Options{ + ReloadURL: reloadURL, + CfgFile: "", + CfgOutputFile: "", + CfgDirs: []CfgDirOption{ + { + Dir: dir, + OutputDir: outDir, + }, + { + Dir: dir2, + OutputDir: outDir2, + }, + }, + WatchedDirs: nil, + WatchInterval: 1 * time.Second, // use a small watch interval. + RetryInterval: 9999 * time.Hour, + }, + ) + + // dir + // ├─ rule-dir -> dir2/rule-dir + // ├─ rule1.yaml + // └─ rule2.yaml + // dir2 + // ├─ rule-dir + // │ └─ rule4.yaml + // ├─ rule3-001.yaml -> rule3-source.yaml + // └─ rule3-source.yaml + // + // The reloader watches 2 directories: dir and dir/rule-dir. + testutil.Ok(t, os.WriteFile(path.Join(dir, "rule1.yaml"), []byte("rule"), os.ModePerm)) + testutil.Ok(t, os.WriteFile(path.Join(dir, "rule2.yaml"), []byte("rule2"), os.ModePerm)) + testutil.Ok(t, os.WriteFile(path.Join(dir2, "rule3-source.yaml"), []byte("rule3"), os.ModePerm)) + testutil.Ok(t, os.Symlink(path.Join(dir2, "rule3-source.yaml"), path.Join(dir2, "rule3-001.yaml"))) + testutil.Ok(t, os.WriteFile(path.Join(dir2, "rule-dir", "rule4.yaml"), []byte("rule4"), os.ModePerm)) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + g := sync.WaitGroup{} + g.Add(1) + go func() { + defer g.Done() + defer cancel() + + reloadsSeen := 0 + init := false + for { + runtime.Gosched() // Ensure during testing on small machine, other go routines have chance to continue. + + select { + case <-ctx.Done(): + return + case <-time.After(500 * time.Millisecond): + } + + rel := reloads.Load().(int) + if init && rel <= reloadsSeen { + continue + } + init = true + reloadsSeen = rel + + t.Log("Performing step number", rel) + switch rel { + case 0: + // Create rule3.yaml (symlink to rule3-001.yaml). + // + // dir + // ├─ rule-dir -> dir2/rule-dir + // ├─ rule1.yaml + // ├─ rule2.yaml + // └─ rule3.yaml -> dir2/rule3-001.yaml (*) + // dir2 + // ├─ rule-dir + // │ └─ rule4.yaml + // ├─ rule3-001.yaml -> rule3-source.yaml + // └─ rule3-source.yaml + testutil.Ok(t, os.Symlink(path.Join(dir2, "rule3-001.yaml"), path.Join(dir2, "rule3.yaml"))) + testutil.Ok(t, os.Rename(path.Join(dir2, "rule3.yaml"), path.Join(dir, "rule3.yaml"))) + // out1 + // ├─ rule1.yaml + // ├─ rule2.yaml + // └─ rule3.yaml + // out2 + // ├─ rule3-001.yaml + // └─ rule3-source.yaml + case 1: + // Update the symlinked file but do not replace the symlink in dir. + // + // fsnotify shouldn't send any event because the change happens + // in a directory that isn't watched but the reloader should detect + // the update thanks to the watch interval. + // + // dir + // ├─ rule-dir -> dir2/rule-dir + // ├─ rule1.yaml + // ├─ rule2.yaml + // └─ rule3.yaml -> dir2/rule3-001.yaml + // dir2 + // ├─ rule-dir + // │ └─ rule4.yaml + // ├─ rule3-001.yaml -> rule3-source.yaml + // └─ rule3-source.yaml (*) + testutil.Ok(t, os.WriteFile(path.Join(dir2, "rule3-source.yaml"), []byte("rule3-changed"), os.ModePerm)) + // out1 + // ├─ rule1.yaml + // ├─ rule2.yaml + // └─ rule3.yaml + // out2 + // ├─ rule3-001.yaml + // └─ rule3-source.yaml + } + + if rel > 1 { + // All good. + return + } + } + }() + err = reloader.Watch(ctx) + cancel() + g.Wait() + + testutil.Ok(t, err) + testutil.Equals(t, 2, reloads.Load().(int)) + + outEntries, err := os.ReadDir(outDir) + testutil.Ok(t, err) + outFiles := []string{} + for _, entry := range outEntries { + outFiles = append(outFiles, entry.Name()) + } + slices.Sort(outFiles) + expectedOutFiles := []string{ + "rule1.yaml", + "rule2.yaml", + "rule3.yaml", + } + slices.Sort(expectedOutFiles) + testutil.Equals(t, expectedOutFiles, outFiles) + + data, err := os.ReadFile(filepath.Join(outDir, "rule1.yaml")) + testutil.Ok(t, err) + testutil.Equals(t, "rule", string(data)) + data, err = os.ReadFile(filepath.Join(outDir, "rule2.yaml")) + testutil.Ok(t, err) + testutil.Equals(t, "rule2", string(data)) + data, err = os.ReadFile(filepath.Join(outDir, "rule3.yaml")) + testutil.Ok(t, err) + testutil.Equals(t, "rule3-changed", string(data)) + + outEntries2, err := os.ReadDir(outDir2) + testutil.Ok(t, err) + outFiles2 := []string{} + for _, entry := range outEntries2 { + outFiles2 = append(outFiles2, entry.Name()) + } + slices.Sort(outFiles2) + expectedOutFiles2 := []string{ + "rule3-001.yaml", + "rule3-source.yaml", + } + slices.Sort(expectedOutFiles2) + testutil.Equals(t, expectedOutFiles2, outFiles2) + + data, err = os.ReadFile(filepath.Join(outDir2, "rule3-001.yaml")) + testutil.Ok(t, err) + testutil.Equals(t, "rule3-changed", string(data)) + data, err = os.ReadFile(filepath.Join(outDir2, "rule3-source.yaml")) + testutil.Ok(t, err) + testutil.Equals(t, "rule3-changed", string(data)) +} + func TestReloader_DirectoriesApply(t *testing.T) { l, err := net.Listen("tcp", "localhost:0") testutil.Ok(t, err) @@ -334,6 +847,7 @@ func TestReloader_DirectoriesApply(t *testing.T) { ReloadURL: reloadURL, CfgFile: "", CfgOutputFile: "", + CfgDirs: nil, WatchedDirs: []string{dir, path.Join(dir, "rule-dir")}, WatchInterval: 9999 * time.Hour, // Disable interval to test watch logic only. RetryInterval: 100 * time.Millisecond, @@ -484,7 +998,7 @@ func TestReloader_DirectoriesApply(t *testing.T) { testutil.Equals(t, 5, reloads) } -func TestReloaderDirectoriesApplyBasedOnWatchInterval(t *testing.T) { +func TestReloader_DirectoriesApplyBasedOnWatchInterval(t *testing.T) { l, err := net.Listen("tcp", "localhost:0") testutil.Ok(t, err) @@ -521,6 +1035,7 @@ func TestReloaderDirectoriesApplyBasedOnWatchInterval(t *testing.T) { ReloadURL: reloadURL, CfgFile: "", CfgOutputFile: "", + CfgDirs: nil, WatchedDirs: []string{dir, path.Join(dir, "rule-dir")}, WatchInterval: 1 * time.Second, // use a small watch interval. RetryInterval: 9999 * time.Hour, @@ -651,6 +1166,7 @@ func TestReloader_ConfigApplyWithWatchIntervalEqualsZero(t *testing.T) { ReloadURL: reloadURL, CfgFile: input, CfgOutputFile: output, + CfgDirs: nil, WatchedDirs: nil, WatchInterval: 0, // Set WatchInterval equals to 0 RetryInterval: 100 * time.Millisecond,