Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chunks caching at bucket level #2532

Merged
merged 34 commits into from
May 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
93343ca
Added generic cache interface.
pstibrany Apr 27, 2020
515a0f7
Added memcached implementation of Cache.
pstibrany Apr 27, 2020
967dd7a
Chunks-caching bucket.
pstibrany Apr 27, 2020
c45f271
Fix sentences
pstibrany Apr 30, 2020
4739c12
Fix sentences
pstibrany Apr 30, 2020
f4c4783
Fix sentences
pstibrany Apr 30, 2020
1f6b415
Rename config objects.
pstibrany May 1, 2020
616891e
Review feedback.
pstibrany May 4, 2020
e0ffb00
Review feedback.
pstibrany May 4, 2020
000b5c3
Added metrics for object size.
pstibrany May 4, 2020
7eb18db
Added requested chunk bytes metric.
pstibrany May 4, 2020
2b42af4
Caching bucket docs.
pstibrany May 4, 2020
1baa24c
Fixed tests.
pstibrany May 4, 2020
801bc6d
Fix test.
pstibrany May 4, 2020
2997437
Update docs/components/store.md
pstibrany May 5, 2020
0c75120
Dots
pstibrany May 5, 2020
9821c29
Always set lastBlockOffset.
pstibrany May 5, 2020
c35f077
Merged cached metric into fetched metric, added labels.
pstibrany May 5, 2020
c9b51b1
Added CHANGELOG.md entry
pstibrany May 5, 2020
65af4fb
Reworded help for thanos_store_bucket_cache_fetched_chunk_bytes_total
pstibrany May 5, 2020
05f2b4a
Added tracing around getRangeChunkFile method.
pstibrany May 5, 2020
015b025
Updated CHANGELOG.md
pstibrany May 5, 2020
3386747
Options
pstibrany May 5, 2020
7269899
Fix parameter name. (store. got dropped by accident)
pstibrany May 6, 2020
13ac200
Use embedded Bucket
pstibrany May 6, 2020
3e96b7c
Added comments.
pstibrany May 6, 2020
2f1b68d
Fixed comment.
pstibrany May 6, 2020
c26bd56
Hide store.caching-bucket.config flags.
pstibrany May 7, 2020
db75725
Renamed block to subrange.
pstibrany May 7, 2020
ba3b5d9
Renamed block to subrange.
pstibrany May 7, 2020
267ce9c
Header
pstibrany May 7, 2020
ec564b6
Added TODO
pstibrany May 7, 2020
2c21dfe
Removed TODO, in favor of creating issue.
pstibrany May 7, 2020
d1eabf7
Use NopCloser.
pstibrany May 7, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel

- [#2502](https://github.com/thanos-io/thanos/pull/2502) Added `hints` field to `SeriesResponse`. Hints in an opaque data structure that can be used to carry additional information from the store and its content is implementation specific.
- [#2521](https://github.com/thanos-io/thanos/pull/2521) Sidecar: add `thanos_sidecar_reloader_reloads_failed_total`, `thanos_sidecar_reloader_reloads_total`, `thanos_sidecar_reloader_watch_errors_total`, `thanos_sidecar_reloader_watch_events_total` and `thanos_sidecar_reloader_watches` metrics.
- [#2532](https://github.com/thanos-io/thanos/pull/2532) Store: Added hidden option for experimental caching bucket, that can cache chunks into shared memcached. This can speed up querying and reduce number of requests to object storage.

### Changed

Expand Down
17 changes: 17 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
"YAML file that contains index cache configuration. See format details: https://thanos.io/components/store.md/#index-cache",
false)

cachingBucketConfig := extflag.RegisterPathOrContent(extflag.HiddenCmdClause(cmd), "store.caching-bucket.config",
"YAML that contains configuration for caching bucket. Experimental feature, with high risk of changes. See format details: https://thanos.io/components/store.md/#caching-bucket",
false)

chunkPoolSize := cmd.Flag("chunk-pool-size", "Maximum size of concurrently allocatable bytes reserved strictly to reuse for chunks in memory.").
Default("2GB").Bytes()

Expand Down Expand Up @@ -149,6 +153,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
*webExternalPrefix,
*webPrefixHeaderName,
*postingOffsetsInMemSampling,
cachingBucketConfig,
)
}
}
Expand Down Expand Up @@ -179,6 +184,7 @@ func runStore(
ignoreDeletionMarksDelay time.Duration,
externalPrefix, prefixHeader string,
postingOffsetsInMemSampling int,
cachingBucketConfig *extflag.PathOrContent,
) error {
grpcProbe := prober.NewGRPC()
httpProbe := prober.NewHTTP()
Expand Down Expand Up @@ -214,6 +220,17 @@ func runStore(
return errors.Wrap(err, "create bucket client")
}

cachingBucketConfigYaml, err := cachingBucketConfig.Content()
if err != nil {
return errors.Wrap(err, "get caching bucket configuration")
}
if len(cachingBucketConfigYaml) > 0 {
bkt, err = storecache.NewCachingBucketFromYaml(cachingBucketConfigYaml, bkt, logger, reg)
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return errors.Wrap(err, "create caching bucket")
}
}

relabelContentYaml, err := selectorRelabelConf.Content()
if err != nil {
return errors.Wrap(err, "get content of relabel configuration")
Expand Down
28 changes: 28 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,34 @@ While the remaining settings are **optional**:
- `max_item_size`: maximum size of an item to be stored in memcached. This option should be set to the same value of memcached `-I` flag (defaults to 1MB) in order to avoid wasting network round trips to store items larger than the max item size allowed in memcached. If set to `0`, the item size is unlimited.
- `dns_provider_update_interval`: the DNS discovery update interval.

## Caching Bucket
pstibrany marked this conversation as resolved.
Show resolved Hide resolved

Thanos Store Gateway supports a "caching bucket" with chunks caching to speed up loading of chunks from TSDB blocks. Currently only memcached "backend" is supported:

```yaml
backend: memcached
backend_config:
addresses:
- localhost:11211

caching_config:
chunk_subrange_size: 16000
max_chunks_get_range_requests: 3
chunk_object_size_ttl: 24h
chunk_subrange_ttl: 24h
```

`backend_config` field for memcached supports all the same configuration as memcached for [index cache](#memcached-index-cache).

`caching_config` is a configuration for chunks cache and supports the following optional settings:

- `chunk_subrange_size`: size of segment of chunks object that is stored to the cache. This is the smallest unit that chunks cache is working with.
- `max_chunks_get_range_requests`: how many "get range" sub-requests may cache perform to fetch missing subranges.
- `chunk_object_size_ttl`: how long to keep information about chunk file length in the cache.
- `chunk_subrange_ttl`: how long to keep individual subranges in the cache.

Note that chunks cache is an experimental feature, and these fields may be renamed or removed completely in the future.

## Index Header

In order to query series inside blocks from object storage, Store Gateway has to know certain initial info about each block such as:
Expand Down
21 changes: 21 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package cache

import (
"context"
"time"
)

// Generic best-effort cache.
type Cache interface {
// Store data into the cache.
//
// Note that individual byte buffers may be retained by the cache!
Store(ctx context.Context, data map[string][]byte, ttl time.Duration)

// Fetch multiple keys from cache. Returns map of input keys to data.
// If key isn't in the map, data for given key was not found.
Fetch(ctx context.Context, keys []string) map[string][]byte
}
83 changes: 83 additions & 0 deletions pkg/cache/memcached.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package cache

import (
"context"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/thanos-io/thanos/pkg/cacheutil"
)

// MemcachedCache is a memcached-based cache.
type MemcachedCache struct {
logger log.Logger
memcached cacheutil.MemcachedClient

// Metrics.
requests prometheus.Counter
hits prometheus.Counter
}

// NewMemcachedCache makes a new MemcachedCache.
func NewMemcachedCache(name string, logger log.Logger, memcached cacheutil.MemcachedClient, reg prometheus.Registerer) *MemcachedCache {
c := &MemcachedCache{
logger: logger,
memcached: memcached,
}

c.requests = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_cache_memcached_requests_total",
Help: "Total number of items requests to memcached.",
ConstLabels: prometheus.Labels{"name": name},
})

c.hits = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_cache_memcached_hits_total",
Help: "Total number of items requests to the cache that were a hit.",
ConstLabels: prometheus.Labels{"name": name},
})

level.Info(logger).Log("msg", "created memcached cache")

return c
}

// Store data identified by keys.
// The function enqueues the request and returns immediately: the entry will be
// asynchronously stored in the cache.
func (c *MemcachedCache) Store(ctx context.Context, data map[string][]byte, ttl time.Duration) {
var (
firstErr error
failed int
)

for key, val := range data {
if err := c.memcached.SetAsync(ctx, key, val, ttl); err != nil {
failed++
if firstErr == nil {
firstErr = err
}
}
}

if firstErr != nil {
level.Warn(c.logger).Log("msg", "failed to store one or more items into memcached", "failed", failed, "firstErr", firstErr)
}
}

// Fetch fetches multiple keys and returns a map containing cache hits, along with a list of missing keys.
// In case of error, it logs and return an empty cache hits map.
func (c *MemcachedCache) Fetch(ctx context.Context, keys []string) map[string][]byte {
// Fetch the keys from memcached in a single request.
c.requests.Add(float64(len(keys)))
results := c.memcached.GetMulti(ctx, keys)
c.hits.Add(float64(len(results)))
return results
}
126 changes: 126 additions & 0 deletions pkg/cache/memcached_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package cache

import (
"context"
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/pkg/errors"
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"

"github.com/thanos-io/thanos/pkg/testutil"
)

func TestMemcachedIndexCache(t *testing.T) {
t.Parallel()

// Init some data to conveniently define test cases later one.
key1 := "key1"
key2 := "key2"
key3 := "key3"
value1 := []byte{1}
value2 := []byte{2}
value3 := []byte{3}

tests := map[string]struct {
setup map[string][]byte
mockedErr error
fetchKeys []string
expectedHits map[string][]byte
}{
"should return no hits on empty cache": {
setup: nil,
fetchKeys: []string{key1, key2},
expectedHits: map[string][]byte{},
},
"should return no misses on 100% hit ratio": {
setup: map[string][]byte{
key1: value1,
key2: value2,
key3: value3,
},
fetchKeys: []string{key1},
expectedHits: map[string][]byte{
key1: value1,
},
},
"should return hits and misses on partial hits": {
setup: map[string][]byte{
key1: value1,
key2: value2,
},
fetchKeys: []string{key1, key3},
expectedHits: map[string][]byte{key1: value1},
},
"should return no hits on memcached error": {
setup: map[string][]byte{
key1: value1,
key2: value2,
key3: value3,
},
mockedErr: errors.New("mocked error"),
fetchKeys: []string{key1},
expectedHits: nil,
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
memcached := newMockedMemcachedClient(testData.mockedErr)
c := NewMemcachedCache("test", log.NewNopLogger(), memcached, nil)

// Store the postings expected before running the test.
ctx := context.Background()
c.Store(ctx, testData.setup, time.Hour)

// Fetch postings from cached and assert on it.
hits := c.Fetch(ctx, testData.fetchKeys)
testutil.Equals(t, testData.expectedHits, hits)

// Assert on metrics.
testutil.Equals(t, float64(len(testData.fetchKeys)), prom_testutil.ToFloat64(c.requests))
testutil.Equals(t, float64(len(testData.expectedHits)), prom_testutil.ToFloat64(c.hits))
})
}
}

type mockedMemcachedClient struct {
cache map[string][]byte
mockedGetMultiErr error
}

func newMockedMemcachedClient(mockedGetMultiErr error) *mockedMemcachedClient {
return &mockedMemcachedClient{
cache: map[string][]byte{},
mockedGetMultiErr: mockedGetMultiErr,
}
}

func (c *mockedMemcachedClient) GetMulti(_ context.Context, keys []string) map[string][]byte {
if c.mockedGetMultiErr != nil {
return nil
}

hits := map[string][]byte{}

for _, key := range keys {
if value, ok := c.cache[key]; ok {
hits[key] = value
}
}

return hits
}

func (c *mockedMemcachedClient) SetAsync(_ context.Context, key string, value []byte, _ time.Duration) error {
c.cache[key] = value
return nil
}

func (c *mockedMemcachedClient) Stop() {
// Nothing to do.
}
21 changes: 21 additions & 0 deletions pkg/extflag/hidden.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package extflag

import (
"gopkg.in/alecthomas/kingpin.v2"
)

// HiddenCmdClause returns CmdClause that hides created flags.
func HiddenCmdClause(c CmdClause) CmdClause {
return hidden{c: c}
}

type hidden struct {
c CmdClause
}

func (h hidden) Flag(name, help string) *kingpin.FlagClause {
return h.c.Flag(name, help).Hidden()
}
Loading