Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chunks caching at bucket level #2532

Merged
merged 34 commits into from
May 7, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
93343ca
Added generic cache interface.
pstibrany Apr 27, 2020
515a0f7
Added memcached implementation of Cache.
pstibrany Apr 27, 2020
967dd7a
Chunks-caching bucket.
pstibrany Apr 27, 2020
c45f271
Fix sentences
pstibrany Apr 30, 2020
4739c12
Fix sentences
pstibrany Apr 30, 2020
f4c4783
Fix sentences
pstibrany Apr 30, 2020
1f6b415
Rename config objects.
pstibrany May 1, 2020
616891e
Review feedback.
pstibrany May 4, 2020
e0ffb00
Review feedback.
pstibrany May 4, 2020
000b5c3
Added metrics for object size.
pstibrany May 4, 2020
7eb18db
Added requested chunk bytes metric.
pstibrany May 4, 2020
2b42af4
Caching bucket docs.
pstibrany May 4, 2020
1baa24c
Fixed tests.
pstibrany May 4, 2020
801bc6d
Fix test.
pstibrany May 4, 2020
2997437
Update docs/components/store.md
pstibrany May 5, 2020
0c75120
Dots
pstibrany May 5, 2020
9821c29
Always set lastBlockOffset.
pstibrany May 5, 2020
c35f077
Merged cached metric into fetched metric, added labels.
pstibrany May 5, 2020
c9b51b1
Added CHANGELOG.md entry
pstibrany May 5, 2020
65af4fb
Reworded help for thanos_store_bucket_cache_fetched_chunk_bytes_total
pstibrany May 5, 2020
05f2b4a
Added tracing around getRangeChunkFile method.
pstibrany May 5, 2020
015b025
Updated CHANGELOG.md
pstibrany May 5, 2020
3386747
Options
pstibrany May 5, 2020
7269899
Fix parameter name. (store. got dropped by accident)
pstibrany May 6, 2020
13ac200
Use embedded Bucket
pstibrany May 6, 2020
3e96b7c
Added comments.
pstibrany May 6, 2020
2f1b68d
Fixed comment.
pstibrany May 6, 2020
c26bd56
Hide store.caching-bucket.config flags.
pstibrany May 7, 2020
db75725
Renamed block to subrange.
pstibrany May 7, 2020
ba3b5d9
Renamed block to subrange.
pstibrany May 7, 2020
267ce9c
Header
pstibrany May 7, 2020
ec564b6
Added TODO
pstibrany May 7, 2020
2c21dfe
Removed TODO, in favor of creating issue.
pstibrany May 7, 2020
d1eabf7
Use NopCloser.
pstibrany May 7, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
"YAML file that contains index cache configuration. See format details: https://thanos.io/components/store.md/#index-cache",
false)

cachingBucketConfig := extflag.RegisterPathOrContent(cmd, "experimental.caching-bucket.config",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know we were doing opposite but now when I think of it I think it would be nice to have this is hidden but normal flag name.

It's painful to change flag names for users ... (:

so maybe just

Suggested change
cachingBucketConfig := extflag.RegisterPathOrContent(cmd, "experimental.caching-bucket.config",
cachingBucketConfig := extflag.RegisterPathOrContent(cmd, "store.caching-bucket.config",

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we hide it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess by adding RegisterHiddenPathOrContent or something (:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, are you suggesting to add RegisterHiddenPathOrContent that will "hide" the command line option – how? By giving it "hidden" prefix? I'm little confused about what you're suggesting here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hidden by using new extflag.HiddenCmdClause(cmd) like this:

	cachingBucketConfig := extflag.RegisterPathOrContent(extflag.HiddenCmdClause(cmd), "store.caching-bucket.config", ...)

WDYT?

"YAML file that contains configuration for caching bucket. Experimental feature, with high risk of changes. See format details: https://thanos.io/components/store.md/#TBD",
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
false)

chunkPoolSize := cmd.Flag("chunk-pool-size", "Maximum size of concurrently allocatable bytes reserved strictly to reuse for chunks in memory.").
Default("2GB").Bytes()

Expand Down Expand Up @@ -149,6 +153,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
*webExternalPrefix,
*webPrefixHeaderName,
*postingOffsetsInMemSampling,
cachingBucketConfig,
)
}
}
Expand Down Expand Up @@ -179,6 +184,7 @@ func runStore(
ignoreDeletionMarksDelay time.Duration,
externalPrefix, prefixHeader string,
postingOffsetsInMemSampling int,
cachingBucketConfig *extflag.PathOrContent,
) error {
grpcProbe := prober.NewGRPC()
httpProbe := prober.NewHTTP()
Expand Down Expand Up @@ -214,6 +220,18 @@ func runStore(
return errors.Wrap(err, "create bucket client")
}

cachingBucketConfigYaml, err := cachingBucketConfig.Content()
if err != nil {
return errors.Wrap(err, "get caching bucket configuration")
}
if len(cachingBucketConfigYaml) > 0 {
newbkt, err := storecache.NewCachingBucketFromYaml(cachingBucketConfigYaml, bkt, logger, reg)
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return errors.Wrap(err, "create caching bucket")
}
bkt = newbkt
}

relabelContentYaml, err := selectorRelabelConf.Content()
if err != nil {
return errors.Wrap(err, "get content of relabel configuration")
Expand Down
20 changes: 20 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package cache

import (
"context"
"time"
)

// Generic best-effort cache.
type Cache interface {
// Store data into the cache.
//
// Note that individual byte buffers may be retained by the cache!
Store(ctx context.Context, data map[string][]byte, ttl time.Duration)

// Fetch multiple keys from cache.
Fetch(ctx context.Context, keys []string) (found map[string][]byte, missing []string)
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
}
84 changes: 84 additions & 0 deletions pkg/cache/memcached.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package cache

import (
"context"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/thanos-io/thanos/pkg/cacheutil"
)

// MemcachedCache is a memcached-based cache.
type MemcachedCache struct {
logger log.Logger
memcached cacheutil.MemcachedClient

// Metrics.
requests prometheus.Counter
hits prometheus.Counter
}

// NewMemcachedCache makes a new MemcachedCache.
func NewMemcachedCache(logger log.Logger, memcached cacheutil.MemcachedClient, reg prometheus.Registerer) *MemcachedCache {
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
c := &MemcachedCache{
logger: logger,
memcached: memcached,
}

c.requests = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_cache_memcached_requests_total",
Help: "Total number of items requests to the memcached.",
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
})

c.hits = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_cache_memcached_hits_total",
Help: "Total number of items requests to the cache that were a hit.",
})

level.Info(logger).Log("msg", "created memcached cache")

return c
}

// Store data identified by keys.
// The function enqueues the request and returns immediately: the entry will be
// asynchronously stored in the cache.
func (c *MemcachedCache) Store(ctx context.Context, data map[string][]byte, ttl time.Duration) {
for key, val := range data {
if err := c.memcached.SetAsync(ctx, key, val, ttl); err != nil {
level.Error(c.logger).Log("msg", "failed to store data into memcached", "err", err)
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

// Fetch fetches multiple keys and returns a map containing cache hits, along with a list of missing keys.
// In case of error, it logs and return an empty cache hits map.
func (c *MemcachedCache) Fetch(ctx context.Context, keys []string) (map[string][]byte, []string) {
// Fetch the keys from memcached in a single request.
c.requests.Add(float64(len(keys)))
results := c.memcached.GetMulti(ctx, keys)
if len(results) == 0 {
return nil, keys
}

// Construct the resulting hits map and list of missing keys. We iterate on the input
// list of labels to be able to easily create the list of ones in a single iteration.
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
var misses []string
for _, key := range keys {
_, ok := results[key]
if !ok {
misses = append(misses, key)
continue
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
}
}

c.hits.Add(float64(len(results)))
return results, misses
}
132 changes: 132 additions & 0 deletions pkg/cache/memcached_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package cache

import (
"context"
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/pkg/errors"
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"

"github.com/thanos-io/thanos/pkg/testutil"
)

func TestMemcachedIndexCache(t *testing.T) {
t.Parallel()

// Init some data to conveniently define test cases later one.
key1 := "key1"
key2 := "key2"
key3 := "key3"
value1 := []byte{1}
value2 := []byte{2}
value3 := []byte{3}

tests := map[string]struct {
setup map[string][]byte
mockedErr error
fetchKeys []string
expectedHits map[string][]byte
expectedMisses []string
}{
"should return no hits on empty cache": {
setup: nil,
fetchKeys: []string{key1, key2},
expectedHits: nil,
expectedMisses: []string{key1, key2},
},
"should return no misses on 100% hit ratio": {
setup: map[string][]byte{
key1: value1,
key2: value2,
key3: value3,
},
fetchKeys: []string{key1},
expectedHits: map[string][]byte{
key1: value1,
},
expectedMisses: nil,
},
"should return hits and misses on partial hits": {
setup: map[string][]byte{
key1: value1,
key2: value2,
},
fetchKeys: []string{key1, key3},
expectedHits: map[string][]byte{key1: value1},
expectedMisses: []string{key3},
},
"should return no hits on memcached error": {
setup: map[string][]byte{
key1: value1,
key2: value2,
key3: value3,
},
mockedErr: errors.New("mocked error"),
fetchKeys: []string{key1},
expectedHits: nil,
expectedMisses: []string{key1},
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
memcached := newMockedMemcachedClient(testData.mockedErr)
c := NewMemcachedCache(log.NewNopLogger(), memcached, nil)

// Store the postings expected before running the test.
ctx := context.Background()
c.Store(ctx, testData.setup, time.Hour)

// Fetch postings from cached and assert on it.
hits, misses := c.Fetch(ctx, testData.fetchKeys)
testutil.Equals(t, testData.expectedHits, hits)
testutil.Equals(t, testData.expectedMisses, misses)

// Assert on metrics.
testutil.Equals(t, float64(len(testData.fetchKeys)), prom_testutil.ToFloat64(c.requests))
testutil.Equals(t, float64(len(testData.expectedHits)), prom_testutil.ToFloat64(c.hits))
})
}
}

type mockedMemcachedClient struct {
cache map[string][]byte
mockedGetMultiErr error
}

func newMockedMemcachedClient(mockedGetMultiErr error) *mockedMemcachedClient {
return &mockedMemcachedClient{
cache: map[string][]byte{},
mockedGetMultiErr: mockedGetMultiErr,
}
}

func (c *mockedMemcachedClient) GetMulti(_ context.Context, keys []string) map[string][]byte {
if c.mockedGetMultiErr != nil {
return nil
}

hits := map[string][]byte{}

for _, key := range keys {
if value, ok := c.cache[key]; ok {
hits[key] = value
}
}

return hits
}

func (c *mockedMemcachedClient) SetAsync(_ context.Context, key string, value []byte, _ time.Duration) error {
c.cache[key] = value
return nil
}

func (c *mockedMemcachedClient) Stop() {
// Nothing to do.
}
Loading