From 60b8dc2fb9b067cd0cd1c34e9c7ebf75d2de321b Mon Sep 17 00:00:00 2001 From: hfuss Date: Tue, 10 Sep 2024 17:11:11 -0400 Subject: [PATCH] [fswatcher] File Reconciler w/ Resync Interval Signed-off-by: hfuss --- pkg/fswatcher/fswatcher.go | 35 ++++++++++++++++++++++++++++++--- pkg/fswatcher/fswatcher_test.go | 18 ++++++++++++----- 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/pkg/fswatcher/fswatcher.go b/pkg/fswatcher/fswatcher.go index c56edee..249ea80 100644 --- a/pkg/fswatcher/fswatcher.go +++ b/pkg/fswatcher/fswatcher.go @@ -20,6 +20,7 @@ import ( "context" "os" "path" + "time" "github.com/fsnotify/fsnotify" "github.com/hyperledger/firefly-common/pkg/fftypes" @@ -35,9 +36,19 @@ import ( // - Only fires if the data in the file is different to the last notification // - Does not reload the config - that's the caller's responsibility func Watch(ctx context.Context, fullFilePath string, onChange, onClose func()) error { + return sync(ctx, fullFilePath, onChange, onClose, nil, nil) +} + +// Reconcile behaves the same as Watch, except it allows for running the onSync func on a provided +// interval. The default re-sync internal is 1m. +func Reconcile(ctx context.Context, fullFilePath string, onChange, onClose, onSync func(), resyncInterval *time.Duration) error { + return sync(ctx, fullFilePath, onChange, onClose, onSync, resyncInterval) +} + +func sync(ctx context.Context, fullFilePath string, onChange, onClose, onSync func(), resyncInterval *time.Duration) error { filePath := path.Dir(fullFilePath) fileName := path.Base(fullFilePath) - log.L(ctx).Debugf("Starting file listener for '%s' in directory '%s'", fileName, filePath) + log.L(ctx).Debugf("Starting file reconciler for '%s' in directory '%s'", fileName, filePath) watcher, err := fsnotify.NewWatcher() if err == nil { @@ -46,7 +57,7 @@ func Watch(ctx context.Context, fullFilePath string, onChange, onClose func()) e if onClose != nil { onClose() } - }, watcher.Events, watcher.Errors) + }, onSync, resyncInterval, watcher.Events, watcher.Errors) err = watcher.Add(filePath) } if err != nil { @@ -56,9 +67,18 @@ func Watch(ctx context.Context, fullFilePath string, onChange, onClose func()) e return nil } -func fsListenerLoop(ctx context.Context, fullFilePath string, onChange, onClose func(), events chan fsnotify.Event, errors chan error) { +func fsListenerLoop(ctx context.Context, fullFilePath string, onChange, onClose, onSync func(), resyncInterval *time.Duration, events chan fsnotify.Event, errors chan error) { defer onClose() + timeout := resyncInterval + if timeout == nil { + timeout = func() *time.Duration { + defaultTimeout := time.Minute + return &defaultTimeout + }() + } + log.L(ctx).Debugf("re-sync interval set to '%s'", *timeout) + var lastHash *fftypes.Bytes32 for { select { @@ -83,6 +103,15 @@ func fsListenerLoop(ctx context.Context, fullFilePath string, onChange, onClose lastHash = dataHash } } + case <-time.After(*timeout): + if onSync != nil { + data, err := os.ReadFile(fullFilePath) + if err == nil { + dataHash := fftypes.HashString(string(data)) + log.L(ctx).Infof("Config file re-sync. Event=Resync Name=%s Size=%d Hash=%s", fullFilePath, len(data), dataHash) + onSync() + } + } case err, ok := <-errors: if ok { log.L(ctx).Errorf("FSEvent error: %s", err) diff --git a/pkg/fswatcher/fswatcher_test.go b/pkg/fswatcher/fswatcher_test.go index c38c588..d81b2df 100644 --- a/pkg/fswatcher/fswatcher_test.go +++ b/pkg/fswatcher/fswatcher_test.go @@ -21,6 +21,7 @@ import ( "fmt" "os" "testing" + "time" "github.com/fsnotify/fsnotify" "github.com/sirupsen/logrus" @@ -28,7 +29,7 @@ import ( "github.com/stretchr/testify/assert" ) -func TestFileListenerE2E(t *testing.T) { +func TestFileReconcilerE2E(t *testing.T) { logrus.SetLevel(logrus.DebugLevel) tmpDir := t.TempDir() @@ -47,14 +48,20 @@ func TestFileListenerE2E(t *testing.T) { // Start listener on config file fsListenerDone := make(chan struct{}) fsListenerFired := make(chan bool) + reSyncFired := make(chan bool) + reSyncInterval := 1 * time.Second ctx, cancelCtx := context.WithCancel(context.Background()) - err := Watch(ctx, filePath, func() { + err := Reconcile(ctx, filePath, func() { err := viper.ReadInConfig() assert.NoError(t, err) fsListenerFired <- true }, func() { close(fsListenerDone) - }) + }, func() { + err := viper.ReadInConfig() + assert.NoError(t, err) + reSyncFired <- true + }, &reSyncInterval) assert.NoError(t, err) // Delete and rename in another file @@ -63,6 +70,7 @@ func TestFileListenerE2E(t *testing.T) { os.Rename(fmt.Sprintf("%s/another.yaml", tmpDir), fmt.Sprintf("%s/test.yaml", tmpDir)) <-fsListenerFired assert.Equal(t, "two", viper.Get("ut_conf")) + <-reSyncFired defer func() { cancelCtx() @@ -74,7 +82,7 @@ func TestFileListenerE2E(t *testing.T) { } -func TestFileListenerFail(t *testing.T) { +func TestFileWatcherFail(t *testing.T) { logrus.SetLevel(logrus.DebugLevel) tmpDir := t.TempDir() @@ -95,7 +103,7 @@ func TestFileListenerLogError(t *testing.T) { defer cancelCtx() errors := make(chan error) fsListenerDone := make(chan struct{}) - go fsListenerLoop(ctx, "somefile", func() {}, func() { close(fsListenerDone) }, make(chan fsnotify.Event), errors) + go fsListenerLoop(ctx, "somefile", func() {}, func() { close(fsListenerDone) }, nil, nil, make(chan fsnotify.Event), errors) errors <- fmt.Errorf("pop") cancelCtx()