Skip to content

Commit

Permalink
Cache: support redis cache backend
Browse files Browse the repository at this point in the history
Signed-off-by: Jimmie Han <hanjinming@outlook.com>
  • Loading branch information
hanjm committed Nov 22, 2021
1 parent 243526d commit dace033
Show file tree
Hide file tree
Showing 12 changed files with 494 additions and 31 deletions.
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/Azure/go-autorest/autorest/azure/auth v0.5.8
github.com/NYTimes/gziphandler v1.1.1
github.com/alecthomas/units v0.0.0-20210927113745-59d0afb8317a
github.com/alicebob/miniredis/v2 v2.14.3
github.com/aliyun/aliyun-oss-go-sdk v2.0.4+incompatible
github.com/baidubce/bce-sdk-go v0.9.81
github.com/blang/semver/v4 v4.0.0
Expand All @@ -29,11 +30,13 @@ require (
github.com/fsnotify/fsnotify v1.4.9
github.com/go-kit/kit v0.11.0
github.com/go-openapi/strfmt v0.20.3
github.com/go-redis/redis/v8 v8.11.4
github.com/gogo/protobuf v1.3.2
github.com/gogo/status v1.1.0
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da
github.com/golang/snappy v0.0.4
github.com/googleapis/gax-go v2.0.2+incompatible
github.com/grafana/dskit v0.0.0-20210908150159-fcf48cb19aa4
github.com/grpc-ecosystem/go-grpc-middleware/providers/kit/v2 v2.0.0-20201002093600-73cf2ae9d891
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0-rc.2.0.20201207153454-9f6bf00c00a7
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
Expand Down
4 changes: 2 additions & 2 deletions pkg/cache/memcached.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
// MemcachedCache is a memcached-based cache.
type MemcachedCache struct {
logger log.Logger
memcached cacheutil.MemcachedClient
memcached cacheutil.RemoteCacheClient
name string

// Metrics.
Expand All @@ -27,7 +27,7 @@ type MemcachedCache struct {
}

// NewMemcachedCache makes a new MemcachedCache.
func NewMemcachedCache(name string, logger log.Logger, memcached cacheutil.MemcachedClient, reg prometheus.Registerer) *MemcachedCache {
func NewMemcachedCache(name string, logger log.Logger, memcached cacheutil.RemoteCacheClient, reg prometheus.Registerer) *MemcachedCache {
c := &MemcachedCache{
logger: logger,
memcached: memcached,
Expand Down
69 changes: 69 additions & 0 deletions pkg/cache/redis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package cache

import (
"context"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/thanos-io/thanos/pkg/cacheutil"
)

// RedisCache is a redis cache.
type RedisCache struct {
logger log.Logger
redisClient *cacheutil.RedisClient
name string

// Metrics.
requests prometheus.Counter
hits prometheus.Counter
}

// NewRedisCache makes a new RedisCache.
func NewRedisCache(name string, logger log.Logger, redisClient *cacheutil.RedisClient, reg prometheus.Registerer) *RedisCache {
c := &RedisCache{
logger: logger,
redisClient: redisClient,
name: name,
}

c.requests = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_cache_redis_requests_total",
Help: "Total number of items requests to redis.",
ConstLabels: prometheus.Labels{"name": name},
})

c.hits = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_cache_redis_hits_total",
Help: "Total number of items requests to the cache that were a hit.",
ConstLabels: prometheus.Labels{"name": name},
})

level.Info(logger).Log("msg", "created redis cache")

return c
}

// Store data identified by keys.
func (c *RedisCache) Store(ctx context.Context, data map[string][]byte, ttl time.Duration) {
c.redisClient.SetMulti(ctx, data, ttl)
}

// Fetch fetches multiple keys and returns a map containing cache hits, along with a list of missing keys.
// In case of error, it logs and return an empty cache hits map.
func (c *RedisCache) Fetch(ctx context.Context, keys []string) map[string][]byte {
c.requests.Add(float64(len(keys)))
results := c.redisClient.GetMulti(ctx, keys)
c.hits.Add(float64(len(results)))
return results
}

func (c *RedisCache) Name() string {
return c.name
}
120 changes: 120 additions & 0 deletions pkg/cache/redis_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package cache

import (
"context"
"os"
"testing"
"time"

"github.com/alicebob/miniredis/v2"
"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/thanos-io/thanos/pkg/cacheutil"
"github.com/thanos-io/thanos/pkg/testutil"
)

func TestRedisCache(t *testing.T) {
// Init some data to conveniently define test cases later one.
key1 := "key1"
key2 := "key2"
key3 := "key3"
value1 := []byte{1}
value2 := []byte{2}
value3 := []byte{3}

type args struct {
data map[string][]byte
fetchKeys []string
}
type want struct {
hits map[string][]byte
}
tests := []struct {
name string
args args
want want
}{
{
name: "all hit",
args: args{
data: map[string][]byte{
key1: value1,
key2: value2,
key3: value3,
},
fetchKeys: []string{key1, key2, key3},
},
want: want{
hits: map[string][]byte{
key1: value1,
key2: value2,
key3: value3,
},
},
},
{
name: "partial hit",
args: args{
data: map[string][]byte{
key1: value1,
key2: value2,
},
fetchKeys: []string{key1, key2, key3},
},
want: want{
hits: map[string][]byte{
key1: value1,
key2: value2,
},
},
},
{
name: "not hit",
args: args{
data: map[string][]byte{},
fetchKeys: []string{key1, key2, key3},
},
want: want{
hits: map[string][]byte{},
},
},
}
s, err := miniredis.Run()
if err != nil {
testutil.Ok(t, err)
}
defer s.Close()
logger := log.NewLogfmtLogger(os.Stderr)
reg := prometheus.NewRegistry()
cfg := cacheutil.RedisClientConfig{
Addr: s.Addr(),
DialTimeout: time.Second,
ReadTimeout: time.Second,
WriteTimeout: time.Second,
MinIdleConns: 10,
MaxConnAge: time.Minute * 10,
IdleTimeout: time.Minute * 5,
}
c, err := cacheutil.NewRedisClientWithConfig(logger, t.Name(), cfg, reg)
if err != nil {
testutil.Ok(t, err)
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
defer s.FlushAll()
c := NewRedisCache(tt.name, logger, c, reg)
// Store the cache expected before running the test.
ctx := context.Background()
c.Store(ctx, tt.args.data, time.Hour)

// Fetch postings from cached and assert on it.
hits := c.Fetch(ctx, tt.args.fetchKeys)
testutil.Equals(t, tt.want.hits, hits)

// Assert on metrics.
testutil.Equals(t, float64(len(tt.args.fetchKeys)), prom_testutil.ToFloat64(c.requests))
testutil.Equals(t, float64(len(tt.want.hits)), prom_testutil.ToFloat64(c.hits))
})
}
}
21 changes: 15 additions & 6 deletions pkg/cacheutil/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

const (
opSet = "set"
opSetMulti = "setmulti"
opGetMulti = "getmulti"
reasonMaxItemSize = "max-item-size"
reasonAsyncBufferFull = "async-buffer-full"
Expand Down Expand Up @@ -58,9 +59,14 @@ var (
}
)

// MemcachedClient is a high level client to interact with memcached.
type MemcachedClient interface {
// GetMulti fetches multiple keys at once from memcached. In case of error,
var (
_ RemoteCacheClient = (*memcachedClient)(nil)
_ RemoteCacheClient = (*RedisClient)(nil)
)

// RemoteCacheClient is a high level client to interact with remote cache.
type RemoteCacheClient interface {
// GetMulti fetches multiple keys at once from remoteCache. In case of error,
// an empty map is returned and the error tracked/logged.
GetMulti(ctx context.Context, keys []string) map[string][]byte

Expand All @@ -73,13 +79,16 @@ type MemcachedClient interface {
Stop()
}

// MemcachedClient for compatible.
type MemcachedClient = RemoteCacheClient

// memcachedClientBackend is an interface used to mock the underlying client in tests.
type memcachedClientBackend interface {
GetMulti(keys []string) (map[string]*memcache.Item, error)
Set(item *memcache.Item) error
}

// MemcachedClientConfig is the config accepted by MemcachedClient.
// MemcachedClientConfig is the config accepted by RemoteCacheClient.
type MemcachedClientConfig struct {
// Addresses specifies the list of memcached addresses. The addresses get
// resolved with the DNS provider.
Expand Down Expand Up @@ -196,7 +205,7 @@ type memcachedGetMultiResult struct {
err error
}

// NewMemcachedClient makes a new MemcachedClient.
// NewMemcachedClient makes a new RemoteCacheClient.
func NewMemcachedClient(logger log.Logger, name string, conf []byte, reg prometheus.Registerer) (*memcachedClient, error) {
config, err := parseMemcachedClientConfig(conf)
if err != nil {
Expand All @@ -206,7 +215,7 @@ func NewMemcachedClient(logger log.Logger, name string, conf []byte, reg prometh
return NewMemcachedClientWithConfig(logger, name, config, reg)
}

// NewMemcachedClientWithConfig makes a new MemcachedClient.
// NewMemcachedClientWithConfig makes a new RemoteCacheClient.
func NewMemcachedClientWithConfig(logger log.Logger, name string, config MemcachedClientConfig, reg prometheus.Registerer) (*memcachedClient, error) {
if err := config.validate(); err != nil {
return nil, err
Expand Down
Loading

0 comments on commit dace033

Please sign in to comment.