From 5c68eaadadf1df8edb8331f00991e43d80c40461 Mon Sep 17 00:00:00 2001 From: chlins Date: Sat, 17 Jun 2023 20:55:42 +0800 Subject: [PATCH] refactor: migrate the redis command keys to scan Refine the cache interface, migrate the Keys to Scan, change the redis underlying keys command to scan. Signed-off-by: chlins --- src/go.mod | 1 - src/lib/cache/cache.go | 11 ++- src/lib/cache/memory/memory.go | 71 ++++++++++++++----- src/lib/cache/memory/memory_test.go | 71 +++++++++++++------ src/lib/cache/redis/redis.go | 50 ++++++++----- src/lib/cache/redis/redis_test.go | 70 ++++++++++++------ src/pkg/cached/base_manager.go | 19 +++-- .../cached/project_metadata/redis/manager.go | 8 +-- src/pkg/task/dao/execution.go | 7 +- src/testing/lib/cache/cache.go | 64 ++++++++--------- 10 files changed, 243 insertions(+), 129 deletions(-) diff --git a/src/go.mod b/src/go.mod index b12401187fc2..04d4529db35b 100644 --- a/src/go.mod +++ b/src/go.mod @@ -47,7 +47,6 @@ require ( github.com/opencontainers/go-digest v1.0.0 github.com/opencontainers/image-spec v1.1.0-rc2.0.20221005185240-3a7f492d3f1b github.com/pkg/errors v0.9.1 - github.com/prometheus/client_golang v1.14.0 github.com/robfig/cron/v3 v3.0.0 github.com/spf13/viper v1.8.1 diff --git a/src/lib/cache/cache.go b/src/lib/cache/cache.go index aa23efaaa647..c44f13631a82 100644 --- a/src/lib/cache/cache.go +++ b/src/lib/cache/cache.go @@ -40,6 +40,12 @@ var ( ErrNotFound = errors.New("key not found") ) +// Iterator returns the ScanIterator +type Iterator interface { + Next(ctx context.Context) bool + Val() string +} + // Cache cache interface type Cache interface { // Contains returns true if key exists @@ -57,8 +63,9 @@ type Cache interface { // Save cache the value by key Save(ctx context.Context, key string, value interface{}, expiration ...time.Duration) error - // Keys returns the key matched by prefixes - Keys(ctx context.Context, prefixes ...string) ([]string, error) + // Scan scans the keys matched by match string + // NOTICE: memory cache does not support use wildcard, compared by strings.Contains + Scan(ctx context.Context, match string) (Iterator, error) } var ( diff --git a/src/lib/cache/memory/memory.go b/src/lib/cache/memory/memory.go index 6f65f23686d8..a92d07660b8e 100644 --- a/src/lib/cache/memory/memory.go +++ b/src/lib/cache/memory/memory.go @@ -117,27 +117,64 @@ func (c *Cache) Save(ctx context.Context, key string, value interface{}, expirat return nil } -// Keys returns the key matched by prefixes. -func (c *Cache) Keys(ctx context.Context, prefixes ...string) ([]string, error) { - // if no prefix, means match all keys. - matchAll := len(prefixes) == 0 - // range map to get all keys - keys := make([]string, 0) - c.storage.Range(func(k, v interface{}) bool { - ks := k.(string) - if matchAll { - keys = append(keys, ks) - } else { - for _, p := range prefixes { - if strings.HasPrefix(ks, c.opts.Key(p)) { - keys = append(keys, strings.TrimPrefix(ks, c.opts.Prefix)) +// Scan scans the keys matched by match string +func (c *Cache) Scan(ctx context.Context, match string) (cache.Iterator, error) { + return &ScanIterator{storage: &c.storage, prefix: c.opts.Prefix, match: match}, nil +} + +// ScanIterator is a ScanIterator for memory cache +type ScanIterator struct { + once sync.Once + storage *sync.Map + mu sync.Mutex + pos int + keys []string + match string + prefix string +} + +// Next checks whether has the next element +func (i *ScanIterator) Next(ctx context.Context) bool { + i.once.Do(func() { + i.storage.Range(func(k, v interface{}) bool { + matched := true + if i.match != "" { + matched = strings.Contains(k.(string), i.match) + } + + if matched { + if v.(*entry).isExpirated() { + i.storage.Delete(k) + } else { + i.keys = append(i.keys, strings.TrimPrefix(k.(string), i.prefix)) } } - } - return true + return true + }) }) - return keys, nil + i.mu.Lock() + defer i.mu.Unlock() + + i.pos++ + if i.pos > len(i.keys) { + return false + } + + return true +} + +// Val returns the key +func (i *ScanIterator) Val() string { + var val string + i.mu.Lock() + defer i.mu.Unlock() + + if i.pos <= len(i.keys) { + val = i.keys[i.pos-1] + } + + return val } // New returns memory cache diff --git a/src/lib/cache/memory/memory_test.go b/src/lib/cache/memory/memory_test.go index 9d6c6119de1a..883045b5cc68 100644 --- a/src/lib/cache/memory/memory_test.go +++ b/src/lib/cache/memory/memory_test.go @@ -16,6 +16,7 @@ package memory import ( "context" + "fmt" "testing" "time" @@ -109,28 +110,54 @@ func (suite *CacheTestSuite) TestPing() { suite.NoError(suite.cache.Ping(suite.ctx)) } -func (suite *CacheTestSuite) TestKeys() { - key1 := "p1" - key2 := "p2" - - var err error - err = suite.cache.Save(suite.ctx, key1, "hello, p1") - suite.Nil(err) - err = suite.cache.Save(suite.ctx, key2, "hello, p2") - suite.Nil(err) - - // should match all - keys, err := suite.cache.Keys(suite.ctx, "p") - suite.Nil(err) - suite.ElementsMatch([]string{"p1", "p2"}, keys) - // only get p1 - keys, err = suite.cache.Keys(suite.ctx, key1) - suite.Nil(err) - suite.Equal([]string{"p1"}, keys) - // only get p2 - keys, err = suite.cache.Keys(suite.ctx, key2) - suite.Nil(err) - suite.Equal([]string{"p2"}, keys) +func (suite *CacheTestSuite) TestScan() { + seed := func(n int) { + for i := 0; i < n; i++ { + key := fmt.Sprintf("test-scan-%d", i) + err := suite.cache.Save(suite.ctx, key, "") + suite.NoError(err) + } + } + clean := func(n int) { + for i := 0; i < n; i++ { + key := fmt.Sprintf("test-scan-%d", i) + err := suite.cache.Delete(suite.ctx, key) + suite.NoError(err) + } + } + { + // no match should return all keys + expect := []string{"test-scan-0", "test-scan-1", "test-scan-2"} + // seed data + seed(3) + // test scan + iter, err := suite.cache.Scan(suite.ctx, "") + suite.NoError(err) + got := []string{} + for iter.Next(suite.ctx) { + got = append(got, iter.Val()) + } + suite.ElementsMatch(expect, got) + // clean up + clean(3) + } + + { + // with match should return matched keys + expect := []string{"test-scan-1", "test-scan-10"} + // seed data + seed(11) + // test scan + iter, err := suite.cache.Scan(suite.ctx, "test-scan-1") + suite.NoError(err) + got := []string{} + for iter.Next(suite.ctx) { + got = append(got, iter.Val()) + } + suite.ElementsMatch(expect, got) + // clean up + clean(11) + } } func TestCacheTestSuite(t *testing.T) { diff --git a/src/lib/cache/redis/redis.go b/src/lib/cache/redis/redis.go index 3e7d5e433aaa..a72b05e9c877 100644 --- a/src/lib/cache/redis/redis.go +++ b/src/lib/cache/redis/redis.go @@ -25,6 +25,7 @@ import ( "github.com/goharbor/harbor/src/lib/cache" "github.com/goharbor/harbor/src/lib/errors" + "github.com/goharbor/harbor/src/lib/log" ) var _ cache.Cache = (*Cache)(nil) @@ -89,30 +90,41 @@ func (c *Cache) Save(ctx context.Context, key string, value interface{}, expirat return c.Client.Set(ctx, c.opts.Key(key), data, exp).Err() } -// Keys returns the key matched by prefixes. -func (c *Cache) Keys(ctx context.Context, prefixes ...string) ([]string, error) { - patterns := make([]string, 0, len(prefixes)) - if len(prefixes) == 0 { - patterns = append(patterns, "*") - } else { - for _, p := range prefixes { - patterns = append(patterns, c.opts.Key(p)+"*") - } +// Scan scans the keys matched by match string +func (c *Cache) Scan(ctx context.Context, match string) (cache.Iterator, error) { + // the cursor and count are used for scan from redis, do not expose to outside + // by performance concern. + // cursor should start from 0 + cursor := uint64(0) + count := int64(1000) + match = fmt.Sprintf("%s*%s*", c.opts.Prefix, match) + iter := c.Client.Scan(ctx, cursor, match, count).Iterator() + if iter.Err() != nil { + return nil, iter.Err() } - keys := make([]string, 0) - for _, pattern := range patterns { - cmd := c.Client.Keys(ctx, pattern) - if err := cmd.Err(); err != nil { - return nil, err - } + return &ScanIterator{iter: iter, prefix: c.opts.Prefix}, nil +} - for _, k := range cmd.Val() { - keys = append(keys, strings.TrimPrefix(k, c.opts.Prefix)) - } +// ScanIterator is a wrapper for redis ScanIterator +type ScanIterator struct { + iter *redis.ScanIterator + prefix string +} + +// Next check whether has the next element +func (i *ScanIterator) Next(ctx context.Context) bool { + hasNext := i.iter.Next(ctx) + if !hasNext && i.iter.Err() != nil { + log.Errorf("error occurred when scan redis: %v", i.iter.Err()) } - return keys, nil + return hasNext +} + +// Val returns the key +func (i *ScanIterator) Val() string { + return strings.TrimPrefix(i.iter.Val(), i.prefix) } // New returns redis cache diff --git a/src/lib/cache/redis/redis_test.go b/src/lib/cache/redis/redis_test.go index cd8a67ea5f74..1170543aab10 100644 --- a/src/lib/cache/redis/redis_test.go +++ b/src/lib/cache/redis/redis_test.go @@ -110,28 +110,54 @@ func (suite *CacheTestSuite) TestPing() { suite.NoError(suite.cache.Ping(suite.ctx)) } -func (suite *CacheTestSuite) TestKeys() { - key1 := "p1" - key2 := "p2" - - var err error - err = suite.cache.Save(suite.ctx, key1, "hello, p1") - suite.Nil(err) - err = suite.cache.Save(suite.ctx, key2, "hello, p2") - suite.Nil(err) - - // should match all - keys, err := suite.cache.Keys(suite.ctx, "p") - suite.Nil(err) - suite.ElementsMatch([]string{"p1", "p2"}, keys) - // only get p1 - keys, err = suite.cache.Keys(suite.ctx, key1) - suite.Nil(err) - suite.Equal([]string{"p1"}, keys) - // only get p2 - keys, err = suite.cache.Keys(suite.ctx, key2) - suite.Nil(err) - suite.Equal([]string{"p2"}, keys) +func (suite *CacheTestSuite) TestScan() { + seed := func(n int) { + for i := 0; i < n; i++ { + key := fmt.Sprintf("test-scan-%d", i) + err := suite.cache.Save(suite.ctx, key, "") + suite.NoError(err) + } + } + clean := func(n int) { + for i := 0; i < n; i++ { + key := fmt.Sprintf("test-scan-%d", i) + err := suite.cache.Delete(suite.ctx, key) + suite.NoError(err) + } + } + { + // no match should return all keys + expect := []string{"test-scan-0", "test-scan-1", "test-scan-2"} + // seed data + seed(3) + // test scan + iter, err := suite.cache.Scan(suite.ctx, "") + suite.NoError(err) + got := []string{} + for iter.Next(suite.ctx) { + got = append(got, iter.Val()) + } + suite.ElementsMatch(expect, got) + // clean up + clean(3) + } + + { + // with match should return matched keys + expect := []string{"test-scan-1", "test-scan-10"} + // seed data + seed(11) + // test scan + iter, err := suite.cache.Scan(suite.ctx, "*test-scan-1*") + suite.NoError(err) + got := []string{} + for iter.Next(suite.ctx) { + got = append(got, iter.Val()) + } + suite.ElementsMatch(expect, got) + // clean up + clean(11) + } } func TestCacheTestSuite(t *testing.T) { diff --git a/src/pkg/cached/base_manager.go b/src/pkg/cached/base_manager.go index aa5cd8f4f8e6..bc8103728496 100644 --- a/src/pkg/cached/base_manager.go +++ b/src/pkg/cached/base_manager.go @@ -60,8 +60,8 @@ func (*cacheClient) Save(ctx context.Context, key string, value interface{}, exp return cache.Default().Save(ctx, key, value, expiration...) } -func (*cacheClient) Keys(ctx context.Context, prefixes ...string) ([]string, error) { - return cache.Default().Keys(ctx, prefixes...) +func (*cacheClient) Scan(ctx context.Context, match string) (cache.Iterator, error) { + return cache.Default().Scan(ctx, match) } var _ Manager = &BaseManager{} @@ -98,13 +98,18 @@ func (bm *BaseManager) ResourceType(ctx context.Context) string { // CountCache returns current this resource occupied cache count. func (bm *BaseManager) CountCache(ctx context.Context) (int64, error) { + var count int64 // prefix is resource type - keys, err := bm.CacheClient(ctx).Keys(ctx, bm.ResourceType(ctx)) + iter, err := bm.CacheClient(ctx).Scan(ctx, bm.ResourceType(ctx)) if err != nil { return 0, err } - return int64(len(keys)), nil + for iter.Next(ctx) { + count++ + } + + return count, nil } // DeleteCache deletes specific cache by key. @@ -115,14 +120,14 @@ func (bm *BaseManager) DeleteCache(ctx context.Context, key string) error { // FlushAll flush this resource's all cache. func (bm *BaseManager) FlushAll(ctx context.Context) error { // prefix is resource type - keys, err := bm.CacheClient(ctx).Keys(ctx, bm.ResourceType(ctx)) + iter, err := bm.CacheClient(ctx).Scan(ctx, bm.ResourceType(ctx)) if err != nil { return err } var errs errors.Errors - for _, key := range keys { - if err = bm.CacheClient(ctx).Delete(ctx, key); err != nil { + for iter.Next(ctx) { + if err = bm.CacheClient(ctx).Delete(ctx, iter.Val()); err != nil { errs = append(errs, err) } } diff --git a/src/pkg/cached/project_metadata/redis/manager.go b/src/pkg/cached/project_metadata/redis/manager.go index 07658296c083..1bf1ee6df072 100644 --- a/src/pkg/cached/project_metadata/redis/manager.go +++ b/src/pkg/cached/project_metadata/redis/manager.go @@ -119,14 +119,14 @@ func (m *Manager) Update(ctx context.Context, projectID int64, meta map[string]s return err } // lookup all keys with projectID prefix - keys, err := m.CacheClient(ctx).Keys(ctx, prefix) + iter, err := m.CacheClient(ctx).Scan(ctx, prefix) if err != nil { return err } - for _, key := range keys { - if err = retry.Retry(func() error { return m.CacheClient(ctx).Delete(ctx, key) }); err != nil { - log.Errorf("delete project metadata cache key %s error: %v", key, err) + for iter.Next(ctx) { + if err = retry.Retry(func() error { return m.CacheClient(ctx).Delete(ctx, iter.Val()) }); err != nil { + log.Errorf("delete project metadata cache key %s error: %v", iter.Val(), err) } } diff --git a/src/pkg/task/dao/execution.go b/src/pkg/task/dao/execution.go index 611dca1fcef8..cd26e792e56e 100644 --- a/src/pkg/task/dao/execution.go +++ b/src/pkg/task/dao/execution.go @@ -447,11 +447,16 @@ func (e *executionDAO) AsyncRefreshStatus(ctx context.Context, id int64, vendor // scanAndRefreshOutdateStatus scans the outdate execution status from redis and then refresh the status to db, // do not want to expose to external use so keep it as private. func scanAndRefreshOutdateStatus(ctx context.Context) { - keys, err := cache.Default().Keys(ctx, "execution:id:") + iter, err := cache.Default().Scan(ctx, "execution:id:*vendor:*status_outdate") if err != nil { log.Errorf("failed to scan the outdate executions, error: %v", err) return } + + var keys []string + for iter.Next(ctx) { + keys = append(keys, iter.Val()) + } // return earlier if no keys found which represents no outdate execution if len(keys) == 0 { log.Debug("skip to refresh, no outdate execution status found") diff --git a/src/testing/lib/cache/cache.go b/src/testing/lib/cache/cache.go index efac85b0a833..230d170832d9 100644 --- a/src/testing/lib/cache/cache.go +++ b/src/testing/lib/cache/cache.go @@ -4,9 +4,12 @@ package cache import ( context "context" - time "time" + + cache "github.com/goharbor/harbor/src/lib/cache" mock "github.com/stretchr/testify/mock" + + time "time" ) // Cache is an autogenerated mock type for the Cache type @@ -56,39 +59,6 @@ func (_m *Cache) Fetch(ctx context.Context, key string, value interface{}) error return r0 } -// Keys provides a mock function with given fields: ctx, prefixes -func (_m *Cache) Keys(ctx context.Context, prefixes ...string) ([]string, error) { - _va := make([]interface{}, len(prefixes)) - for _i := range prefixes { - _va[_i] = prefixes[_i] - } - var _ca []interface{} - _ca = append(_ca, ctx) - _ca = append(_ca, _va...) - ret := _m.Called(_ca...) - - var r0 []string - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, ...string) ([]string, error)); ok { - return rf(ctx, prefixes...) - } - if rf, ok := ret.Get(0).(func(context.Context, ...string) []string); ok { - r0 = rf(ctx, prefixes...) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]string) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, ...string) error); ok { - r1 = rf(ctx, prefixes...) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - // Ping provides a mock function with given fields: ctx func (_m *Cache) Ping(ctx context.Context) error { ret := _m.Called(ctx) @@ -124,6 +94,32 @@ func (_m *Cache) Save(ctx context.Context, key string, value interface{}, expira return r0 } +// Scan provides a mock function with given fields: ctx, match +func (_m *Cache) Scan(ctx context.Context, match string) (cache.Iterator, error) { + ret := _m.Called(ctx, match) + + var r0 cache.Iterator + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (cache.Iterator, error)); ok { + return rf(ctx, match) + } + if rf, ok := ret.Get(0).(func(context.Context, string) cache.Iterator); ok { + r0 = rf(ctx, match) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(cache.Iterator) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, match) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + type mockConstructorTestingTNewCache interface { mock.TestingT Cleanup(func())