Skip to content

Commit

Permalink
bbolt-cache: debug log when cleaning objects, and throttle to once pe…
Browse files Browse the repository at this point in the history
…r 1s

Signed-off-by: Dr. Stefan Schimanski <stefan.schimanski@gmail.com>
  • Loading branch information
sttts committed Nov 21, 2023
1 parent bd125c4 commit d7ea773
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 3 deletions.
2 changes: 1 addition & 1 deletion cmd/xgql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func main() { //nolint:gocyclo
var camid []clients.NewCacheMiddlewareFn
// wrap client.Cache in cache.*BBoltCache if cacheFile is specified.
if *cacheFile != "" {
camid = append(camid, cache.WithBBoltCache(*cacheFile))
camid = append(camid, cache.WithBBoltCache(*cacheFile, cache.WithLogger(log)))
}
// enable live queries
camid = append(camid, clients.WithLiveQueries)
Expand Down
12 changes: 11 additions & 1 deletion internal/cache/bbolt_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"

"github.com/crossplane/crossplane-runtime/pkg/errors"
"github.com/crossplane/crossplane-runtime/pkg/logging"

"github.com/upbound/xgql/internal/clients"
)
Expand Down Expand Up @@ -152,6 +153,13 @@ func wrapCacheTranform(prev, next toolscache.TransformFunc) toolscache.Transform
// Option is an option for a cache.
type Option func(*BBoltCache)

// WithLogger wires a logger into the bbolt cache.
func WithLogger(o logging.Logger) Option {
return func(c *BBoltCache) {
c.log = o
}
}

// Cache implements cache.Cache.
var _ cache.Cache = &BBoltCache{}

Expand All @@ -173,6 +181,8 @@ type BBoltCache struct {
unmarshalFn UnmarshalFn

running atomic.Bool

log logging.Logger
}

// NewBBoltCache creates a new cache.
Expand All @@ -196,7 +206,7 @@ func NewBBoltCache(cache cache.Cache, scheme *runtime.Scheme, file string, opts
ca.db = db
}
if ca.cleaner == nil {
ca.cleaner = NewCleaner[client.Object, string](getKey, ca.cleanup)
ca.cleaner = NewCleaner[client.Object, string](getKey, ca.cleanup, WithLoggerCleanerOpt[client.Object, string](ca.log))
}
return ca, nil
}
Expand Down
20 changes: 19 additions & 1 deletion internal/cache/cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"time"

"k8s.io/utils/clock"

"github.com/crossplane/crossplane-runtime/pkg/logging"
)

// Cleaner cleans up objects after a given duration.
Expand Down Expand Up @@ -92,10 +94,19 @@ type cleaner[T any, K cmp.Ordered] struct {
mu sync.Mutex
exps []expKey[K]
refs map[K]expRef[T]

log logging.Logger
}

type CleanerOpt[T any, K cmp.Ordered] func(*cleaner[T, K])

// WithLoggerCleanerOpt wires the logger into the cleaner.
func WithLoggerCleanerOpt[T any, K cmp.Ordered](log logging.Logger) CleanerOpt[T, K] {
return func(c *cleaner[T, K]) {
c.log = log
}
}

// NewCleaner creates a cleaner for objects of type T, identified by comparable key K,
// using cleanFn for cleanup.
func NewCleaner[T any, K cmp.Ordered](keyFn func(T) K, cleanFn func([]T) error, opts ...CleanerOpt[T, K]) *cleaner[T, K] {
Expand All @@ -105,6 +116,7 @@ func NewCleaner[T any, K cmp.Ordered](keyFn func(T) K, cleanFn func([]T) error,
clock: clock.RealClock{},
signal: make(chan struct{}),
refs: make(map[K]expRef[T]),
log: logging.NewNopLogger(),
}
for _, opt := range opts {
opt(c)
Expand Down Expand Up @@ -142,12 +154,18 @@ func (c *cleaner[T, K]) Start(ctx context.Context) error {
}
objs, exp := c.collect(c.clock.Now())
if len(objs) > 0 {
c.log.Debug("cleaning up objects", "count", len(objs), "next", exp)
if err := c.cleanFn(objs); err != nil {
// exit Start and stop the cache
return err
}
}
if !exp.IsZero() {
wakeup = c.clock.After(exp.Sub(c.clock.Now()))
after := exp.Sub(c.clock.Now())
if after < time.Second {
after = time.Second
}
wakeup = c.clock.After(after)
}
}
}
Expand Down

0 comments on commit d7ea773

Please sign in to comment.