Skip to content

Commit

Permalink
Cache HelmRepository index files
Browse files Browse the repository at this point in the history
If implemented, will provide user with a way to cache index files.

This addresses issues where the index file is loaded and unmarshalled in
conncurrent reconciliation resulting in a heavy memory footprint.

The caching strategy used is cahe aside, and teh cache is a k/v store
with expiration.

The cache number of entries and ttl for entries are configurable.

The cache is optional and is disable dby default

Signed-off-by: Soule BA <soule@weave.works>
  • Loading branch information
souleb committed Mar 20, 2022
1 parent e4d0b53 commit 0551e72
Show file tree
Hide file tree
Showing 8 changed files with 363 additions and 4 deletions.
3 changes: 3 additions & 0 deletions api/v1beta2/condition_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,7 @@ const (

// SymlinkUpdateFailedReason signals a failure in updating a symlink.
SymlinkUpdateFailedReason string = "SymlinkUpdateFailed"

// CacheOperationFailedReason signals a failure in cache operation.
CacheOperationFailedReason string = "CacheOperationFailed"
)
Binary file added controllers/.DS_Store
Binary file not shown.
31 changes: 31 additions & 0 deletions controllers/helmchart_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

securejoin "github.com/cyphar/filepath-securejoin"
helmgetter "helm.sh/helm/v3/pkg/getter"
helmrepo "helm.sh/helm/v3/pkg/repo"
corev1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -55,6 +56,7 @@ import (
"github.com/fluxcd/pkg/untar"

sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
"github.com/fluxcd/source-controller/internal/cache"
serror "github.com/fluxcd/source-controller/internal/error"
"github.com/fluxcd/source-controller/internal/helm/chart"
"github.com/fluxcd/source-controller/internal/helm/getter"
Expand Down Expand Up @@ -109,6 +111,9 @@ type HelmChartReconciler struct {
Storage *Storage
Getters helmgetter.Providers
ControllerName string

Cache *cache.Cache
TTL time.Duration
}

func (r *HelmChartReconciler) SetupWithManager(mgr ctrl.Manager) error {
Expand Down Expand Up @@ -451,6 +456,15 @@ func (r *HelmChartReconciler) buildFromHelmRepository(ctx context.Context, obj *
}
}

// Try to retrieve the repository index from the cache
if r.Cache != nil {
if index, found := r.Cache.Get(r.Storage.LocalPath(*repo.GetArtifact())); err == nil {
if found {
chartRepo.Index = index.(*helmrepo.IndexFile)
}
}
}

// Construct the chart builder with scoped configuration
cb := chart.NewRemoteBuilder(chartRepo)
opts := chart.BuildOptions{
Expand All @@ -474,6 +488,23 @@ func (r *HelmChartReconciler) buildFromHelmRepository(ctx context.Context, obj *
return sreconcile.ResultEmpty, err
}

defer func() {
// Cache the index if it was successfully retrieved
// and the chart was successfully built
if r.Cache != nil && chartRepo.Index != nil {
err := r.Cache.Set(r.Storage.LocalPath(*repo.GetArtifact()), chartRepo.Index, r.TTL)
if err != nil {
r.eventLogf(ctx, obj, events.EventTypeTrace, sourcev1.CacheOperationFailedReason, "failed to cache index: %v", err)
}

}

// Delete the index reference
if chartRepo.Index != nil {
chartRepo.Unload()
}
}()

*b = *build
return sreconcile.ResultSuccess, nil
}
Expand Down
4 changes: 4 additions & 0 deletions controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/fluxcd/pkg/testserver"

sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
"github.com/fluxcd/source-controller/internal/cache"
// +kubebuilder:scaffold:imports
)

Expand Down Expand Up @@ -126,12 +127,15 @@ func TestMain(m *testing.M) {
panic(fmt.Sprintf("Failed to start HelmRepositoryReconciler: %v", err))
}

cache := cache.New(5, 1*time.Second)
if err := (&HelmChartReconciler{
Client: testEnv,
EventRecorder: record.NewFakeRecorder(32),
Metrics: testMetricsH,
Getters: testGetters,
Storage: testStorage,
Cache: cache,
TTL: 1 * time.Second,
}).SetupWithManager(testEnv); err != nil {
panic(fmt.Sprintf("Failed to start HelmRepositoryReconciler: %v", err))
}
Expand Down
218 changes: 218 additions & 0 deletions internal/cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
package cache

import (
"fmt"
"runtime"
"sync"
"time"
)

// NOTE: this is heavily based on patrickmn/go-cache:
// https://github.com/patrickmn/go-cache

// Cache is a thread-safe in-memory key/value store.
type Cache struct {
*cache
}

// Item is an item stored in the cache.
type Item struct {
Object interface{}
Expiration int64
}

type cache struct {
// Items holds the elements in the cache.
Items map[string]Item
// Maximum number of items the cache can hold.
MaxItems int
mu sync.RWMutex
janitor *janitor
}

// ItemCount returns the number of items in the cache.
// This may include items that have expired, but have not yet been cleaned up.
func (c *cache) ItemCount() int {
c.mu.RLock()
n := len(c.Items)
c.mu.RUnlock()
return n
}

func (c *cache) set(key string, value interface{}, expiration time.Duration) {
var e int64
if expiration > 0 {
e = time.Now().Add(expiration).UnixNano()
}

c.Items[key] = Item{
Object: value,
Expiration: e,
}
}

// Set adds an item to the cache, replacing any existing item.
// If expiration is zero, the item never expires.
// If the cache is full, Set will return an error.
func (c *cache) Set(key string, value interface{}, expiration time.Duration) error {
c.mu.Lock()
_, found := c.Items[key]
if found {
c.set(key, value, expiration)
c.mu.Unlock()
return nil
}

if c.MaxItems > 0 && len(c.Items) < c.MaxItems {
c.set(key, value, expiration)
c.mu.Unlock()
return nil
}

c.mu.Unlock()
return fmt.Errorf("Cache is full")
}

func (c *cache) Add(key string, value interface{}, expiration time.Duration) error {
c.mu.Lock()
_, found := c.Items[key]
if found {
c.mu.Unlock()
return fmt.Errorf("Item %s already exists", key)
}

if c.MaxItems > 0 && len(c.Items) < c.MaxItems {
c.set(key, value, expiration)
c.mu.Unlock()
return nil
}

c.mu.Unlock()
return fmt.Errorf("Cache is full")
}

func (c *cache) Get(key string) (interface{}, bool) {
c.mu.RLock()
item, found := c.Items[key]
if !found {
c.mu.RUnlock()
return nil, false
}
if item.Expiration > 0 {
if item.Expiration < time.Now().UnixNano() {
c.mu.RUnlock()
return nil, false
}
}
c.mu.RUnlock()
return item.Object, true
}

func (c *cache) Delete(key string) {
c.mu.Lock()
delete(c.Items, key)
c.mu.Unlock()
}

func (c *cache) Clear() {
c.mu.Lock()
c.Items = make(map[string]Item)
c.mu.Unlock()
}

func (c *cache) HasExpired(key string) bool {
c.mu.RLock()
item, ok := c.Items[key]
if !ok {
c.mu.RUnlock()
return true
}
if item.Expiration > 0 {
if item.Expiration < time.Now().UnixNano() {
c.mu.RUnlock()
return true
}
}
c.mu.RUnlock()
return false
}

func (c *cache) SetExpiration(key string, expiration time.Duration) {
c.mu.Lock()
item, ok := c.Items[key]
if !ok {
c.mu.Unlock()
return
}
item.Expiration = time.Now().Add(expiration).UnixNano()
c.mu.Unlock()
}

func (c *cache) GetExpiration(key string) time.Duration {
c.mu.RLock()
item, ok := c.Items[key]
if !ok {
c.mu.RUnlock()
return 0
}
if item.Expiration > 0 {
if item.Expiration < time.Now().UnixNano() {
c.mu.RUnlock()
return 0
}
}
c.mu.RUnlock()
return time.Duration(item.Expiration - time.Now().UnixNano())
}

func (c *cache) DeleteExpired() {
c.mu.Lock()
for k, v := range c.Items {
if v.Expiration > 0 && v.Expiration < time.Now().UnixNano() {
delete(c.Items, k)
}
}
c.mu.Unlock()
}

type janitor struct {
Interval time.Duration
stop chan bool
}

func (j *janitor) Run(c *cache) {
ticker := time.NewTicker(j.Interval)
for {
select {
case <-ticker.C:
c.DeleteExpired()
case <-j.stop:
ticker.Stop()
return
}
}
}

func stopJanitor(c *Cache) {
c.janitor.stop <- true
}

func New(maxItems int, interval time.Duration) *Cache {
c := &cache{
Items: make(map[string]Item),
MaxItems: maxItems,
janitor: &janitor{
Interval: interval,
stop: make(chan bool),
},
}

C := &Cache{c}

if interval > 0 {
go c.janitor.Run(c)
runtime.SetFinalizer(C, stopJanitor)
}

return C
}
71 changes: 71 additions & 0 deletions internal/cache/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package cache

import (
"testing"
"time"

. "github.com/onsi/gomega"
)

func TestCache(t *testing.T) {
g := NewWithT(t)
// create a cache that can hold 2 items and have no cleanup
cache := New(2, 0)

// Get an Item from the cache
if _, found := cache.Get("key1"); found {
t.Error("Item should not be found")
}

// Add an item to the cache
err := cache.Add("key1", "value1", 0)
g.Expect(err).ToNot(HaveOccurred())

// Get the item from the cache
item, found := cache.Get("key1")
g.Expect(found).To(BeTrue())
g.Expect(item).To(Equal("value1"))

// Add another item to the cache
err = cache.Add("key2", "value2", 0)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(cache.ItemCount()).To(Equal(2))

// Get the item from the cache
item, found = cache.Get("key2")
g.Expect(found).To(BeTrue())
g.Expect(item).To(Equal("value2"))

//Add an item to the cache
err = cache.Add("key3", "value3", 0)
g.Expect(err).To(HaveOccurred())

// Replace an item in the cache
err = cache.Set("key2", "value3", 0)
g.Expect(err).ToNot(HaveOccurred())

// Get the item from the cache
item, found = cache.Get("key2")
g.Expect(found).To(BeTrue())
g.Expect(item).To(Equal("value3"))

// new cache with a cleanup interval of 1 second
cache = New(2, 1*time.Second)

// Add an item to the cache
err = cache.Add("key1", "value1", 2*time.Second)
g.Expect(err).ToNot(HaveOccurred())

// Get the item from the cache
item, found = cache.Get("key1")
g.Expect(found).To(BeTrue())
g.Expect(item).To(Equal("value1"))

// wait for the item to expire
time.Sleep(3 * time.Second)

// Get the item from the cache
item, found = cache.Get("key1")
g.Expect(found).To(BeFalse())
g.Expect(item).To(BeNil())
}
Loading

0 comments on commit 0551e72

Please sign in to comment.