Skip to content

Commit

Permalink
refactor: migrate the redis command keys to scan
Browse files Browse the repository at this point in the history
Refine the cache interface, migrate the Keys to Scan, change the redis
underlying keys command to scan.

Signed-off-by: chlins <chenyuzh@vmware.com>
  • Loading branch information
chlins committed Jun 19, 2023
1 parent b822952 commit 5c68eaa
Show file tree
Hide file tree
Showing 10 changed files with 243 additions and 129 deletions.
1 change: 0 additions & 1 deletion src/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ require (
github.com/opencontainers/go-digest v1.0.0
github.com/opencontainers/image-spec v1.1.0-rc2.0.20221005185240-3a7f492d3f1b
github.com/pkg/errors v0.9.1

github.com/prometheus/client_golang v1.14.0
github.com/robfig/cron/v3 v3.0.0
github.com/spf13/viper v1.8.1
Expand Down
11 changes: 9 additions & 2 deletions src/lib/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ var (
ErrNotFound = errors.New("key not found")
)

// Iterator returns the ScanIterator
type Iterator interface {
Next(ctx context.Context) bool
Val() string
}

// Cache cache interface
type Cache interface {
// Contains returns true if key exists
Expand All @@ -57,8 +63,9 @@ type Cache interface {
// Save cache the value by key
Save(ctx context.Context, key string, value interface{}, expiration ...time.Duration) error

// Keys returns the key matched by prefixes
Keys(ctx context.Context, prefixes ...string) ([]string, error)
// Scan scans the keys matched by match string
// NOTICE: memory cache does not support use wildcard, compared by strings.Contains
Scan(ctx context.Context, match string) (Iterator, error)
}

var (
Expand Down
71 changes: 54 additions & 17 deletions src/lib/cache/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,27 +117,64 @@ func (c *Cache) Save(ctx context.Context, key string, value interface{}, expirat
return nil
}

// Keys returns the key matched by prefixes.
func (c *Cache) Keys(ctx context.Context, prefixes ...string) ([]string, error) {
// if no prefix, means match all keys.
matchAll := len(prefixes) == 0
// range map to get all keys
keys := make([]string, 0)
c.storage.Range(func(k, v interface{}) bool {
ks := k.(string)
if matchAll {
keys = append(keys, ks)
} else {
for _, p := range prefixes {
if strings.HasPrefix(ks, c.opts.Key(p)) {
keys = append(keys, strings.TrimPrefix(ks, c.opts.Prefix))
// Scan scans the keys matched by match string
func (c *Cache) Scan(ctx context.Context, match string) (cache.Iterator, error) {
return &ScanIterator{storage: &c.storage, prefix: c.opts.Prefix, match: match}, nil
}

// ScanIterator is a ScanIterator for memory cache
type ScanIterator struct {
once sync.Once
storage *sync.Map
mu sync.Mutex
pos int
keys []string
match string
prefix string
}

// Next checks whether has the next element
func (i *ScanIterator) Next(ctx context.Context) bool {
i.once.Do(func() {
i.storage.Range(func(k, v interface{}) bool {
matched := true
if i.match != "" {
matched = strings.Contains(k.(string), i.match)
}

if matched {
if v.(*entry).isExpirated() {
i.storage.Delete(k)
} else {
i.keys = append(i.keys, strings.TrimPrefix(k.(string), i.prefix))
}
}
}
return true
return true
})
})

return keys, nil
i.mu.Lock()
defer i.mu.Unlock()

i.pos++
if i.pos > len(i.keys) {
return false
}

return true
}

// Val returns the key
func (i *ScanIterator) Val() string {
var val string
i.mu.Lock()
defer i.mu.Unlock()

if i.pos <= len(i.keys) {
val = i.keys[i.pos-1]
}

return val
}

// New returns memory cache
Expand Down
71 changes: 49 additions & 22 deletions src/lib/cache/memory/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package memory

import (
"context"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -109,28 +110,54 @@ func (suite *CacheTestSuite) TestPing() {
suite.NoError(suite.cache.Ping(suite.ctx))
}

func (suite *CacheTestSuite) TestKeys() {
key1 := "p1"
key2 := "p2"

var err error
err = suite.cache.Save(suite.ctx, key1, "hello, p1")
suite.Nil(err)
err = suite.cache.Save(suite.ctx, key2, "hello, p2")
suite.Nil(err)

// should match all
keys, err := suite.cache.Keys(suite.ctx, "p")
suite.Nil(err)
suite.ElementsMatch([]string{"p1", "p2"}, keys)
// only get p1
keys, err = suite.cache.Keys(suite.ctx, key1)
suite.Nil(err)
suite.Equal([]string{"p1"}, keys)
// only get p2
keys, err = suite.cache.Keys(suite.ctx, key2)
suite.Nil(err)
suite.Equal([]string{"p2"}, keys)
func (suite *CacheTestSuite) TestScan() {
seed := func(n int) {
for i := 0; i < n; i++ {
key := fmt.Sprintf("test-scan-%d", i)
err := suite.cache.Save(suite.ctx, key, "")
suite.NoError(err)
}
}
clean := func(n int) {
for i := 0; i < n; i++ {
key := fmt.Sprintf("test-scan-%d", i)
err := suite.cache.Delete(suite.ctx, key)
suite.NoError(err)
}
}
{
// no match should return all keys
expect := []string{"test-scan-0", "test-scan-1", "test-scan-2"}
// seed data
seed(3)
// test scan
iter, err := suite.cache.Scan(suite.ctx, "")
suite.NoError(err)
got := []string{}
for iter.Next(suite.ctx) {
got = append(got, iter.Val())
}
suite.ElementsMatch(expect, got)
// clean up
clean(3)
}

{
// with match should return matched keys
expect := []string{"test-scan-1", "test-scan-10"}
// seed data
seed(11)
// test scan
iter, err := suite.cache.Scan(suite.ctx, "test-scan-1")
suite.NoError(err)
got := []string{}
for iter.Next(suite.ctx) {
got = append(got, iter.Val())
}
suite.ElementsMatch(expect, got)
// clean up
clean(11)
}
}

func TestCacheTestSuite(t *testing.T) {
Expand Down
50 changes: 31 additions & 19 deletions src/lib/cache/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/goharbor/harbor/src/lib/cache"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/log"
)

var _ cache.Cache = (*Cache)(nil)
Expand Down Expand Up @@ -89,30 +90,41 @@ func (c *Cache) Save(ctx context.Context, key string, value interface{}, expirat
return c.Client.Set(ctx, c.opts.Key(key), data, exp).Err()
}

// Keys returns the key matched by prefixes.
func (c *Cache) Keys(ctx context.Context, prefixes ...string) ([]string, error) {
patterns := make([]string, 0, len(prefixes))
if len(prefixes) == 0 {
patterns = append(patterns, "*")
} else {
for _, p := range prefixes {
patterns = append(patterns, c.opts.Key(p)+"*")
}
// Scan scans the keys matched by match string
func (c *Cache) Scan(ctx context.Context, match string) (cache.Iterator, error) {
// the cursor and count are used for scan from redis, do not expose to outside
// by performance concern.
// cursor should start from 0
cursor := uint64(0)
count := int64(1000)
match = fmt.Sprintf("%s*%s*", c.opts.Prefix, match)
iter := c.Client.Scan(ctx, cursor, match, count).Iterator()
if iter.Err() != nil {
return nil, iter.Err()
}

keys := make([]string, 0)
for _, pattern := range patterns {
cmd := c.Client.Keys(ctx, pattern)
if err := cmd.Err(); err != nil {
return nil, err
}
return &ScanIterator{iter: iter, prefix: c.opts.Prefix}, nil
}

for _, k := range cmd.Val() {
keys = append(keys, strings.TrimPrefix(k, c.opts.Prefix))
}
// ScanIterator is a wrapper for redis ScanIterator
type ScanIterator struct {
iter *redis.ScanIterator
prefix string
}

// Next check whether has the next element
func (i *ScanIterator) Next(ctx context.Context) bool {
hasNext := i.iter.Next(ctx)
if !hasNext && i.iter.Err() != nil {
log.Errorf("error occurred when scan redis: %v", i.iter.Err())
}

return keys, nil
return hasNext
}

// Val returns the key
func (i *ScanIterator) Val() string {
return strings.TrimPrefix(i.iter.Val(), i.prefix)
}

// New returns redis cache
Expand Down
70 changes: 48 additions & 22 deletions src/lib/cache/redis/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,28 +110,54 @@ func (suite *CacheTestSuite) TestPing() {
suite.NoError(suite.cache.Ping(suite.ctx))
}

func (suite *CacheTestSuite) TestKeys() {
key1 := "p1"
key2 := "p2"

var err error
err = suite.cache.Save(suite.ctx, key1, "hello, p1")
suite.Nil(err)
err = suite.cache.Save(suite.ctx, key2, "hello, p2")
suite.Nil(err)

// should match all
keys, err := suite.cache.Keys(suite.ctx, "p")
suite.Nil(err)
suite.ElementsMatch([]string{"p1", "p2"}, keys)
// only get p1
keys, err = suite.cache.Keys(suite.ctx, key1)
suite.Nil(err)
suite.Equal([]string{"p1"}, keys)
// only get p2
keys, err = suite.cache.Keys(suite.ctx, key2)
suite.Nil(err)
suite.Equal([]string{"p2"}, keys)
func (suite *CacheTestSuite) TestScan() {
seed := func(n int) {
for i := 0; i < n; i++ {
key := fmt.Sprintf("test-scan-%d", i)
err := suite.cache.Save(suite.ctx, key, "")
suite.NoError(err)
}
}
clean := func(n int) {
for i := 0; i < n; i++ {
key := fmt.Sprintf("test-scan-%d", i)
err := suite.cache.Delete(suite.ctx, key)
suite.NoError(err)
}
}
{
// no match should return all keys
expect := []string{"test-scan-0", "test-scan-1", "test-scan-2"}
// seed data
seed(3)
// test scan
iter, err := suite.cache.Scan(suite.ctx, "")
suite.NoError(err)
got := []string{}
for iter.Next(suite.ctx) {
got = append(got, iter.Val())
}
suite.ElementsMatch(expect, got)
// clean up
clean(3)
}

{
// with match should return matched keys
expect := []string{"test-scan-1", "test-scan-10"}
// seed data
seed(11)
// test scan
iter, err := suite.cache.Scan(suite.ctx, "*test-scan-1*")
suite.NoError(err)
got := []string{}
for iter.Next(suite.ctx) {
got = append(got, iter.Val())
}
suite.ElementsMatch(expect, got)
// clean up
clean(11)
}
}

func TestCacheTestSuite(t *testing.T) {
Expand Down
19 changes: 12 additions & 7 deletions src/pkg/cached/base_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ func (*cacheClient) Save(ctx context.Context, key string, value interface{}, exp
return cache.Default().Save(ctx, key, value, expiration...)
}

func (*cacheClient) Keys(ctx context.Context, prefixes ...string) ([]string, error) {
return cache.Default().Keys(ctx, prefixes...)
func (*cacheClient) Scan(ctx context.Context, match string) (cache.Iterator, error) {
return cache.Default().Scan(ctx, match)
}

var _ Manager = &BaseManager{}
Expand Down Expand Up @@ -98,13 +98,18 @@ func (bm *BaseManager) ResourceType(ctx context.Context) string {

// CountCache returns current this resource occupied cache count.
func (bm *BaseManager) CountCache(ctx context.Context) (int64, error) {
var count int64
// prefix is resource type
keys, err := bm.CacheClient(ctx).Keys(ctx, bm.ResourceType(ctx))
iter, err := bm.CacheClient(ctx).Scan(ctx, bm.ResourceType(ctx))
if err != nil {
return 0, err
}

return int64(len(keys)), nil
for iter.Next(ctx) {
count++
}

return count, nil
}

// DeleteCache deletes specific cache by key.
Expand All @@ -115,14 +120,14 @@ func (bm *BaseManager) DeleteCache(ctx context.Context, key string) error {
// FlushAll flush this resource's all cache.
func (bm *BaseManager) FlushAll(ctx context.Context) error {
// prefix is resource type
keys, err := bm.CacheClient(ctx).Keys(ctx, bm.ResourceType(ctx))
iter, err := bm.CacheClient(ctx).Scan(ctx, bm.ResourceType(ctx))
if err != nil {
return err
}

var errs errors.Errors
for _, key := range keys {
if err = bm.CacheClient(ctx).Delete(ctx, key); err != nil {
for iter.Next(ctx) {
if err = bm.CacheClient(ctx).Delete(ctx, iter.Val()); err != nil {
errs = append(errs, err)
}
}
Expand Down
Loading

0 comments on commit 5c68eaa

Please sign in to comment.