Skip to content

Commit

Permalink
[fswatcher] File Reconciler w/ Resync Interval
Browse files Browse the repository at this point in the history
Signed-off-by: hfuss <hayden.fuss@kaleido.io>
  • Loading branch information
onelapahead committed Sep 10, 2024
1 parent 9cc0e2c commit 60b8dc2
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 8 deletions.
35 changes: 32 additions & 3 deletions pkg/fswatcher/fswatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"os"
"path"
"time"

"github.com/fsnotify/fsnotify"
"github.com/hyperledger/firefly-common/pkg/fftypes"
Expand All @@ -35,9 +36,19 @@ import (
// - Only fires if the data in the file is different to the last notification
// - Does not reload the config - that's the caller's responsibility
func Watch(ctx context.Context, fullFilePath string, onChange, onClose func()) error {
return sync(ctx, fullFilePath, onChange, onClose, nil, nil)
}

// Reconcile behaves the same as Watch, except it allows for running the onSync func on a provided
// interval. The default re-sync internal is 1m.
func Reconcile(ctx context.Context, fullFilePath string, onChange, onClose, onSync func(), resyncInterval *time.Duration) error {
return sync(ctx, fullFilePath, onChange, onClose, onSync, resyncInterval)
}

func sync(ctx context.Context, fullFilePath string, onChange, onClose, onSync func(), resyncInterval *time.Duration) error {
filePath := path.Dir(fullFilePath)
fileName := path.Base(fullFilePath)
log.L(ctx).Debugf("Starting file listener for '%s' in directory '%s'", fileName, filePath)
log.L(ctx).Debugf("Starting file reconciler for '%s' in directory '%s'", fileName, filePath)

watcher, err := fsnotify.NewWatcher()
if err == nil {
Expand All @@ -46,7 +57,7 @@ func Watch(ctx context.Context, fullFilePath string, onChange, onClose func()) e
if onClose != nil {
onClose()
}
}, watcher.Events, watcher.Errors)
}, onSync, resyncInterval, watcher.Events, watcher.Errors)
err = watcher.Add(filePath)
}
if err != nil {
Expand All @@ -56,9 +67,18 @@ func Watch(ctx context.Context, fullFilePath string, onChange, onClose func()) e
return nil
}

func fsListenerLoop(ctx context.Context, fullFilePath string, onChange, onClose func(), events chan fsnotify.Event, errors chan error) {
func fsListenerLoop(ctx context.Context, fullFilePath string, onChange, onClose, onSync func(), resyncInterval *time.Duration, events chan fsnotify.Event, errors chan error) {
defer onClose()

timeout := resyncInterval
if timeout == nil {
timeout = func() *time.Duration {
defaultTimeout := time.Minute
return &defaultTimeout
}()
}
log.L(ctx).Debugf("re-sync interval set to '%s'", *timeout)

var lastHash *fftypes.Bytes32
for {
select {
Expand All @@ -83,6 +103,15 @@ func fsListenerLoop(ctx context.Context, fullFilePath string, onChange, onClose
lastHash = dataHash
}
}
case <-time.After(*timeout):
if onSync != nil {
data, err := os.ReadFile(fullFilePath)
if err == nil {
dataHash := fftypes.HashString(string(data))
log.L(ctx).Infof("Config file re-sync. Event=Resync Name=%s Size=%d Hash=%s", fullFilePath, len(data), dataHash)
onSync()
}
}
case err, ok := <-errors:
if ok {
log.L(ctx).Errorf("FSEvent error: %s", err)
Expand Down
18 changes: 13 additions & 5 deletions pkg/fswatcher/fswatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ import (
"fmt"
"os"
"testing"
"time"

"github.com/fsnotify/fsnotify"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
)

func TestFileListenerE2E(t *testing.T) {
func TestFileReconcilerE2E(t *testing.T) {

logrus.SetLevel(logrus.DebugLevel)
tmpDir := t.TempDir()
Expand All @@ -47,14 +48,20 @@ func TestFileListenerE2E(t *testing.T) {
// Start listener on config file
fsListenerDone := make(chan struct{})
fsListenerFired := make(chan bool)
reSyncFired := make(chan bool)
reSyncInterval := 1 * time.Second
ctx, cancelCtx := context.WithCancel(context.Background())
err := Watch(ctx, filePath, func() {
err := Reconcile(ctx, filePath, func() {
err := viper.ReadInConfig()
assert.NoError(t, err)
fsListenerFired <- true
}, func() {
close(fsListenerDone)
})
}, func() {
err := viper.ReadInConfig()
assert.NoError(t, err)
reSyncFired <- true
}, &reSyncInterval)
assert.NoError(t, err)

// Delete and rename in another file
Expand All @@ -63,6 +70,7 @@ func TestFileListenerE2E(t *testing.T) {
os.Rename(fmt.Sprintf("%s/another.yaml", tmpDir), fmt.Sprintf("%s/test.yaml", tmpDir))
<-fsListenerFired
assert.Equal(t, "two", viper.Get("ut_conf"))
<-reSyncFired

defer func() {
cancelCtx()
Expand All @@ -74,7 +82,7 @@ func TestFileListenerE2E(t *testing.T) {

}

func TestFileListenerFail(t *testing.T) {
func TestFileWatcherFail(t *testing.T) {

logrus.SetLevel(logrus.DebugLevel)
tmpDir := t.TempDir()
Expand All @@ -95,7 +103,7 @@ func TestFileListenerLogError(t *testing.T) {
defer cancelCtx()
errors := make(chan error)
fsListenerDone := make(chan struct{})
go fsListenerLoop(ctx, "somefile", func() {}, func() { close(fsListenerDone) }, make(chan fsnotify.Event), errors)
go fsListenerLoop(ctx, "somefile", func() {}, func() { close(fsListenerDone) }, nil, nil, make(chan fsnotify.Event), errors)

errors <- fmt.Errorf("pop")
cancelCtx()
Expand Down

0 comments on commit 60b8dc2

Please sign in to comment.