diff --git a/CHANGELOG.md b/CHANGELOG.md index 017857ea55..a020842006 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#2502](https://github.com/thanos-io/thanos/pull/2502) Added `hints` field to `SeriesResponse`. Hints in an opaque data structure that can be used to carry additional information from the store and its content is implementation specific. - [#2521](https://github.com/thanos-io/thanos/pull/2521) Sidecar: add `thanos_sidecar_reloader_reloads_failed_total`, `thanos_sidecar_reloader_reloads_total`, `thanos_sidecar_reloader_watch_errors_total`, `thanos_sidecar_reloader_watch_events_total` and `thanos_sidecar_reloader_watches` metrics. +- [#2532](https://github.com/thanos-io/thanos/pull/2532) Store: Added hidden option for experimental caching bucket, that can cache chunks into shared memcached. This can speed up querying and reduce number of requests to object storage. ### Changed diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index fcd8f6da29..9425e36dda 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -55,6 +55,10 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { "YAML file that contains index cache configuration. See format details: https://thanos.io/components/store.md/#index-cache", false) + cachingBucketConfig := extflag.RegisterPathOrContent(extflag.HiddenCmdClause(cmd), "store.caching-bucket.config", + "YAML that contains configuration for caching bucket. Experimental feature, with high risk of changes. See format details: https://thanos.io/components/store.md/#caching-bucket", + false) + chunkPoolSize := cmd.Flag("chunk-pool-size", "Maximum size of concurrently allocatable bytes reserved strictly to reuse for chunks in memory."). Default("2GB").Bytes() @@ -149,6 +153,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { *webExternalPrefix, *webPrefixHeaderName, *postingOffsetsInMemSampling, + cachingBucketConfig, ) } } @@ -179,6 +184,7 @@ func runStore( ignoreDeletionMarksDelay time.Duration, externalPrefix, prefixHeader string, postingOffsetsInMemSampling int, + cachingBucketConfig *extflag.PathOrContent, ) error { grpcProbe := prober.NewGRPC() httpProbe := prober.NewHTTP() @@ -214,6 +220,17 @@ func runStore( return errors.Wrap(err, "create bucket client") } + cachingBucketConfigYaml, err := cachingBucketConfig.Content() + if err != nil { + return errors.Wrap(err, "get caching bucket configuration") + } + if len(cachingBucketConfigYaml) > 0 { + bkt, err = storecache.NewCachingBucketFromYaml(cachingBucketConfigYaml, bkt, logger, reg) + if err != nil { + return errors.Wrap(err, "create caching bucket") + } + } + relabelContentYaml, err := selectorRelabelConf.Content() if err != nil { return errors.Wrap(err, "get content of relabel configuration") diff --git a/docs/components/store.md b/docs/components/store.md index 20a919daaa..fa9b1546dd 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -232,6 +232,34 @@ While the remaining settings are **optional**: - `max_item_size`: maximum size of an item to be stored in memcached. This option should be set to the same value of memcached `-I` flag (defaults to 1MB) in order to avoid wasting network round trips to store items larger than the max item size allowed in memcached. If set to `0`, the item size is unlimited. - `dns_provider_update_interval`: the DNS discovery update interval. +## Caching Bucket + +Thanos Store Gateway supports a "caching bucket" with chunks caching to speed up loading of chunks from TSDB blocks. Currently only memcached "backend" is supported: + +```yaml +backend: memcached +backend_config: + addresses: + - localhost:11211 + +caching_config: + chunk_subrange_size: 16000 + max_chunks_get_range_requests: 3 + chunk_object_size_ttl: 24h + chunk_subrange_ttl: 24h +``` + +`backend_config` field for memcached supports all the same configuration as memcached for [index cache](#memcached-index-cache). + +`caching_config` is a configuration for chunks cache and supports the following optional settings: + +- `chunk_subrange_size`: size of segment of chunks object that is stored to the cache. This is the smallest unit that chunks cache is working with. +- `max_chunks_get_range_requests`: how many "get range" sub-requests may cache perform to fetch missing subranges. +- `chunk_object_size_ttl`: how long to keep information about chunk file length in the cache. +- `chunk_subrange_ttl`: how long to keep individual subranges in the cache. + +Note that chunks cache is an experimental feature, and these fields may be renamed or removed completely in the future. + ## Index Header In order to query series inside blocks from object storage, Store Gateway has to know certain initial info about each block such as: diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go new file mode 100644 index 0000000000..acaa0e159d --- /dev/null +++ b/pkg/cache/cache.go @@ -0,0 +1,21 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package cache + +import ( + "context" + "time" +) + +// Generic best-effort cache. +type Cache interface { + // Store data into the cache. + // + // Note that individual byte buffers may be retained by the cache! + Store(ctx context.Context, data map[string][]byte, ttl time.Duration) + + // Fetch multiple keys from cache. Returns map of input keys to data. + // If key isn't in the map, data for given key was not found. + Fetch(ctx context.Context, keys []string) map[string][]byte +} diff --git a/pkg/cache/memcached.go b/pkg/cache/memcached.go new file mode 100644 index 0000000000..04c249576e --- /dev/null +++ b/pkg/cache/memcached.go @@ -0,0 +1,83 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package cache + +import ( + "context" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/thanos-io/thanos/pkg/cacheutil" +) + +// MemcachedCache is a memcached-based cache. +type MemcachedCache struct { + logger log.Logger + memcached cacheutil.MemcachedClient + + // Metrics. + requests prometheus.Counter + hits prometheus.Counter +} + +// NewMemcachedCache makes a new MemcachedCache. +func NewMemcachedCache(name string, logger log.Logger, memcached cacheutil.MemcachedClient, reg prometheus.Registerer) *MemcachedCache { + c := &MemcachedCache{ + logger: logger, + memcached: memcached, + } + + c.requests = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_cache_memcached_requests_total", + Help: "Total number of items requests to memcached.", + ConstLabels: prometheus.Labels{"name": name}, + }) + + c.hits = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_cache_memcached_hits_total", + Help: "Total number of items requests to the cache that were a hit.", + ConstLabels: prometheus.Labels{"name": name}, + }) + + level.Info(logger).Log("msg", "created memcached cache") + + return c +} + +// Store data identified by keys. +// The function enqueues the request and returns immediately: the entry will be +// asynchronously stored in the cache. +func (c *MemcachedCache) Store(ctx context.Context, data map[string][]byte, ttl time.Duration) { + var ( + firstErr error + failed int + ) + + for key, val := range data { + if err := c.memcached.SetAsync(ctx, key, val, ttl); err != nil { + failed++ + if firstErr == nil { + firstErr = err + } + } + } + + if firstErr != nil { + level.Warn(c.logger).Log("msg", "failed to store one or more items into memcached", "failed", failed, "firstErr", firstErr) + } +} + +// Fetch fetches multiple keys and returns a map containing cache hits, along with a list of missing keys. +// In case of error, it logs and return an empty cache hits map. +func (c *MemcachedCache) Fetch(ctx context.Context, keys []string) map[string][]byte { + // Fetch the keys from memcached in a single request. + c.requests.Add(float64(len(keys))) + results := c.memcached.GetMulti(ctx, keys) + c.hits.Add(float64(len(results))) + return results +} diff --git a/pkg/cache/memcached_test.go b/pkg/cache/memcached_test.go new file mode 100644 index 0000000000..b5f6bcf83c --- /dev/null +++ b/pkg/cache/memcached_test.go @@ -0,0 +1,126 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package cache + +import ( + "context" + "testing" + "time" + + "github.com/go-kit/kit/log" + "github.com/pkg/errors" + prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" + + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestMemcachedIndexCache(t *testing.T) { + t.Parallel() + + // Init some data to conveniently define test cases later one. + key1 := "key1" + key2 := "key2" + key3 := "key3" + value1 := []byte{1} + value2 := []byte{2} + value3 := []byte{3} + + tests := map[string]struct { + setup map[string][]byte + mockedErr error + fetchKeys []string + expectedHits map[string][]byte + }{ + "should return no hits on empty cache": { + setup: nil, + fetchKeys: []string{key1, key2}, + expectedHits: map[string][]byte{}, + }, + "should return no misses on 100% hit ratio": { + setup: map[string][]byte{ + key1: value1, + key2: value2, + key3: value3, + }, + fetchKeys: []string{key1}, + expectedHits: map[string][]byte{ + key1: value1, + }, + }, + "should return hits and misses on partial hits": { + setup: map[string][]byte{ + key1: value1, + key2: value2, + }, + fetchKeys: []string{key1, key3}, + expectedHits: map[string][]byte{key1: value1}, + }, + "should return no hits on memcached error": { + setup: map[string][]byte{ + key1: value1, + key2: value2, + key3: value3, + }, + mockedErr: errors.New("mocked error"), + fetchKeys: []string{key1}, + expectedHits: nil, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + memcached := newMockedMemcachedClient(testData.mockedErr) + c := NewMemcachedCache("test", log.NewNopLogger(), memcached, nil) + + // Store the postings expected before running the test. + ctx := context.Background() + c.Store(ctx, testData.setup, time.Hour) + + // Fetch postings from cached and assert on it. + hits := c.Fetch(ctx, testData.fetchKeys) + testutil.Equals(t, testData.expectedHits, hits) + + // Assert on metrics. + testutil.Equals(t, float64(len(testData.fetchKeys)), prom_testutil.ToFloat64(c.requests)) + testutil.Equals(t, float64(len(testData.expectedHits)), prom_testutil.ToFloat64(c.hits)) + }) + } +} + +type mockedMemcachedClient struct { + cache map[string][]byte + mockedGetMultiErr error +} + +func newMockedMemcachedClient(mockedGetMultiErr error) *mockedMemcachedClient { + return &mockedMemcachedClient{ + cache: map[string][]byte{}, + mockedGetMultiErr: mockedGetMultiErr, + } +} + +func (c *mockedMemcachedClient) GetMulti(_ context.Context, keys []string) map[string][]byte { + if c.mockedGetMultiErr != nil { + return nil + } + + hits := map[string][]byte{} + + for _, key := range keys { + if value, ok := c.cache[key]; ok { + hits[key] = value + } + } + + return hits +} + +func (c *mockedMemcachedClient) SetAsync(_ context.Context, key string, value []byte, _ time.Duration) error { + c.cache[key] = value + return nil +} + +func (c *mockedMemcachedClient) Stop() { + // Nothing to do. +} diff --git a/pkg/extflag/hidden.go b/pkg/extflag/hidden.go new file mode 100644 index 0000000000..cf8de957a5 --- /dev/null +++ b/pkg/extflag/hidden.go @@ -0,0 +1,21 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package extflag + +import ( + "gopkg.in/alecthomas/kingpin.v2" +) + +// HiddenCmdClause returns CmdClause that hides created flags. +func HiddenCmdClause(c CmdClause) CmdClause { + return hidden{c: c} +} + +type hidden struct { + c CmdClause +} + +func (h hidden) Flag(name, help string) *kingpin.FlagClause { + return h.c.Flag(name, help).Hidden() +} diff --git a/pkg/store/cache/caching_bucket.go b/pkg/store/cache/caching_bucket.go new file mode 100644 index 0000000000..2ce7e6dd11 --- /dev/null +++ b/pkg/store/cache/caching_bucket.go @@ -0,0 +1,411 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package storecache + +import ( + "context" + "encoding/binary" + "fmt" + "io" + "io/ioutil" + "regexp" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "golang.org/x/sync/errgroup" + + "github.com/thanos-io/thanos/pkg/cache" + "github.com/thanos-io/thanos/pkg/objstore" + "github.com/thanos-io/thanos/pkg/runutil" + "github.com/thanos-io/thanos/pkg/tracing" +) + +const ( + originCache = "cache" + originBucket = "bucket" +) + +type CachingBucketConfig struct { + // Basic unit used to cache chunks. + ChunkSubrangeSize int64 `yaml:"chunk_subrange_size"` + + // Maximum number of GetRange requests issued by this bucket for single GetRange call. Zero or negative value = unlimited. + MaxChunksGetRangeRequests int `yaml:"max_chunks_get_range_requests"` + + // TTLs for various cache items. + ChunkObjectSizeTTL time.Duration `yaml:"chunk_object_size_ttl"` + ChunkSubrangeTTL time.Duration `yaml:"chunk_subrange_ttl"` +} + +func DefaultCachingBucketConfig() CachingBucketConfig { + return CachingBucketConfig{ + ChunkSubrangeSize: 16000, // Equal to max chunk size. + ChunkObjectSizeTTL: 24 * time.Hour, + ChunkSubrangeTTL: 24 * time.Hour, + MaxChunksGetRangeRequests: 3, + } +} + +// Bucket implementation that provides some caching features, using knowledge about how Thanos accesses data. +type CachingBucket struct { + objstore.Bucket + + cache cache.Cache + + config CachingBucketConfig + + logger log.Logger + + requestedChunkBytes prometheus.Counter + fetchedChunkBytes *prometheus.CounterVec + refetchedChunkBytes *prometheus.CounterVec + + objectSizeRequests prometheus.Counter + objectSizeHits prometheus.Counter +} + +func NewCachingBucket(b objstore.Bucket, c cache.Cache, chunks CachingBucketConfig, logger log.Logger, reg prometheus.Registerer) (*CachingBucket, error) { + if b == nil { + return nil, errors.New("bucket is nil") + } + if c == nil { + return nil, errors.New("cache is nil") + } + + cb := &CachingBucket{ + Bucket: b, + config: chunks, + cache: c, + logger: logger, + + requestedChunkBytes: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_store_bucket_cache_requested_chunk_bytes_total", + Help: "Total number of requested bytes for chunk data.", + }), + fetchedChunkBytes: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_store_bucket_cache_fetched_chunk_bytes_total", + Help: "Total number of fetched chunk bytes. Data from bucket is then stored to cache.", + }, []string{"origin"}), + refetchedChunkBytes: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_store_bucket_cache_refetched_chunk_bytes_total", + Help: "Total number of chunk bytes re-fetched from storage, despite being in cache already.", + }, []string{"origin"}), + objectSizeRequests: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_store_bucket_cache_objectsize_requests_total", + Help: "Number of object size requests for objects.", + }), + objectSizeHits: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_store_bucket_cache_objectsize_hits_total", + Help: "Number of object size hits for objects.", + }), + } + + cb.fetchedChunkBytes.WithLabelValues(originBucket) + cb.fetchedChunkBytes.WithLabelValues(originCache) + cb.refetchedChunkBytes.WithLabelValues(originCache) + + return cb, nil +} + +func (cb *CachingBucket) Name() string { + return "caching: " + cb.Bucket.Name() +} + +func (cb *CachingBucket) WithExpectedErrs(expectedFunc objstore.IsOpFailureExpectedFunc) objstore.Bucket { + if ib, ok := cb.Bucket.(objstore.InstrumentedBucket); ok { + // Make a copy, but replace bucket with instrumented one. + res := &CachingBucket{} + *res = *cb + res.Bucket = ib.WithExpectedErrs(expectedFunc) + return res + } + + return cb +} + +func (cb *CachingBucket) ReaderWithExpectedErrs(expectedFunc objstore.IsOpFailureExpectedFunc) objstore.BucketReader { + return cb.WithExpectedErrs(expectedFunc) +} + +var chunksMatcher = regexp.MustCompile(`^.*/chunks/\d+$`) + +func isTSDBChunkFile(name string) bool { + return chunksMatcher.MatchString(name) +} + +func (cb *CachingBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { + if isTSDBChunkFile(name) && off >= 0 && length > 0 { + var ( + r io.ReadCloser + err error + ) + tracing.DoInSpan(ctx, "cachingbucket_getrange_chunkfile", func(ctx context.Context) { + r, err = cb.getRangeChunkFile(ctx, name, off, length) + }) + return r, err + } + + return cb.Bucket.GetRange(ctx, name, off, length) +} + +func (cb *CachingBucket) cachedObjectSize(ctx context.Context, name string, ttl time.Duration) (uint64, error) { + key := cachingKeyObjectSize(name) + + cb.objectSizeRequests.Add(1) + + hits := cb.cache.Fetch(ctx, []string{key}) + if s := hits[key]; len(s) == 8 { + cb.objectSizeHits.Add(1) + return binary.BigEndian.Uint64(s), nil + } + + size, err := cb.Bucket.ObjectSize(ctx, name) + if err != nil { + return 0, err + } + + var buf [8]byte + binary.BigEndian.PutUint64(buf[:], size) + cb.cache.Store(ctx, map[string][]byte{key: buf[:]}, ttl) + + return size, nil +} + +func (cb *CachingBucket) getRangeChunkFile(ctx context.Context, name string, offset, length int64) (io.ReadCloser, error) { + cb.requestedChunkBytes.Add(float64(length)) + + size, err := cb.cachedObjectSize(ctx, name, cb.config.ChunkObjectSizeTTL) + if err != nil { + return nil, errors.Wrapf(err, "failed to get size of chunk file: %s", name) + } + + // If length goes over object size, adjust length. We use it later to limit number of read bytes. + if uint64(offset+length) > size { + length = int64(size - uint64(offset)) + } + + // Start and end range are subrange-aligned offsets into object, that we're going to read. + startRange := (offset / cb.config.ChunkSubrangeSize) * cb.config.ChunkSubrangeSize + endRange := ((offset + length) / cb.config.ChunkSubrangeSize) * cb.config.ChunkSubrangeSize + if (offset+length)%cb.config.ChunkSubrangeSize > 0 { + endRange += cb.config.ChunkSubrangeSize + } + + // The very last subrange in the object may have length that is not divisible by subrange size. + lastSubrangeOffset := endRange - cb.config.ChunkSubrangeSize + lastSubrangeLength := int(cb.config.ChunkSubrangeSize) + if uint64(endRange) > size { + lastSubrangeOffset = (int64(size) / cb.config.ChunkSubrangeSize) * cb.config.ChunkSubrangeSize + lastSubrangeLength = int(int64(size) - lastSubrangeOffset) + } + + numSubranges := (endRange - startRange) / cb.config.ChunkSubrangeSize + + offsetKeys := make(map[int64]string, numSubranges) + keys := make([]string, 0, numSubranges) + + for off := startRange; off < endRange; off += cb.config.ChunkSubrangeSize { + end := off + cb.config.ChunkSubrangeSize + if end > int64(size) { + end = int64(size) + } + + k := cachingKeyObjectSubrange(name, off, end) + keys = append(keys, k) + offsetKeys[off] = k + } + + // Try to get all subranges from the cache. + hits := cb.cache.Fetch(ctx, keys) + for _, b := range hits { + cb.fetchedChunkBytes.WithLabelValues(originCache).Add(float64(len(b))) + } + + if len(hits) < len(keys) { + if hits == nil { + hits = map[string][]byte{} + } + + err := cb.fetchMissingChunkSubranges(ctx, name, startRange, endRange, offsetKeys, hits, lastSubrangeOffset, lastSubrangeLength) + if err != nil { + return nil, err + } + } + + return ioutil.NopCloser(newSubrangesReader(cb.config.ChunkSubrangeSize, offsetKeys, hits, offset, length)), nil +} + +type rng struct { + start, end int64 +} + +// fetchMissingChunkSubranges fetches missing subranges, stores them into "hits" map +// and into cache as well (using provided cacheKeys). +func (cb *CachingBucket) fetchMissingChunkSubranges(ctx context.Context, name string, startRange, endRange int64, cacheKeys map[int64]string, hits map[string][]byte, lastSubrangeOffset int64, lastSubrangeLength int) error { + // Ordered list of missing sub-ranges. + var missing []rng + + for off := startRange; off < endRange; off += cb.config.ChunkSubrangeSize { + if hits[cacheKeys[off]] == nil { + missing = append(missing, rng{start: off, end: off + cb.config.ChunkSubrangeSize}) + } + } + + missing = mergeRanges(missing, 0) // Merge adjacent ranges. + // Keep merging until we have only max number of ranges (= requests). + for limit := cb.config.ChunkSubrangeSize; cb.config.MaxChunksGetRangeRequests > 0 && len(missing) > cb.config.MaxChunksGetRangeRequests; limit = limit * 2 { + missing = mergeRanges(missing, limit) + } + + var hitsMutex sync.Mutex + + // Run parallel queries for each missing range. Fetched data is stored into 'hits' map, protected by hitsMutex. + g, gctx := errgroup.WithContext(ctx) + for _, m := range missing { + m := m + g.Go(func() error { + r, err := cb.Bucket.GetRange(gctx, name, m.start, m.end-m.start) + if err != nil { + return errors.Wrapf(err, "fetching range [%d, %d]", m.start, m.end) + } + defer runutil.CloseWithLogOnErr(cb.logger, r, "fetching range [%d, %d]", m.start, m.end) + + for off := m.start; off < m.end && gctx.Err() == nil; off += cb.config.ChunkSubrangeSize { + key := cacheKeys[off] + if key == "" { + return errors.Errorf("fetching range [%d, %d]: caching key for offset %d not found", m.start, m.end, off) + } + + // We need a new buffer for each subrange, both for storing into hits, and also for caching. + var subrangeData []byte + if off == lastSubrangeOffset { + // The very last subrange in the object may have different length, + // if object length isn't divisible by subrange size. + subrangeData = make([]byte, lastSubrangeLength) + } else { + subrangeData = make([]byte, cb.config.ChunkSubrangeSize) + } + _, err := io.ReadFull(r, subrangeData) + if err != nil { + return errors.Wrapf(err, "fetching range [%d, %d]", m.start, m.end) + } + + storeToCache := false + hitsMutex.Lock() + if _, ok := hits[key]; !ok { + storeToCache = true + hits[key] = subrangeData + } + hitsMutex.Unlock() + + if storeToCache { + cb.fetchedChunkBytes.WithLabelValues(originBucket).Add(float64(len(subrangeData))) + cb.cache.Store(gctx, map[string][]byte{key: subrangeData}, cb.config.ChunkSubrangeTTL) + } else { + cb.refetchedChunkBytes.WithLabelValues(originCache).Add(float64(len(subrangeData))) + } + } + + return gctx.Err() + }) + } + + return g.Wait() +} + +// Merges ranges that are close to each other. Modifies input. +func mergeRanges(input []rng, limit int64) []rng { + if len(input) == 0 { + return input + } + + last := 0 + for ix := 1; ix < len(input); ix++ { + if (input[ix].start - input[last].end) <= limit { + input[last].end = input[ix].end + } else { + last++ + input[last] = input[ix] + } + } + return input[:last+1] +} + +func cachingKeyObjectSize(name string) string { + return fmt.Sprintf("size:%s", name) +} + +func cachingKeyObjectSubrange(name string, start int64, end int64) string { + return fmt.Sprintf("subrange:%s:%d:%d", name, start, end) +} + +// Reader implementation that uses in-memory subranges. +type subrangesReader struct { + subrangeSize int64 + + // Mapping of subrangeSize-aligned offsets to keys in hits. + offsetsKeys map[int64]string + subranges map[string][]byte + + // Offset for next read, used to find correct subrange to return data from. + readOffset int64 + + // Remaining data to return from this reader. Once zero, this reader reports EOF. + remaining int64 +} + +func newSubrangesReader(subrangeSize int64, offsetsKeys map[int64]string, subranges map[string][]byte, readOffset, remaining int64) *subrangesReader { + return &subrangesReader{ + subrangeSize: subrangeSize, + offsetsKeys: offsetsKeys, + subranges: subranges, + + readOffset: readOffset, + remaining: remaining, + } +} + +func (c *subrangesReader) Read(p []byte) (n int, err error) { + if c.remaining <= 0 { + return 0, io.EOF + } + + currentSubrangeOffset := (c.readOffset / c.subrangeSize) * c.subrangeSize + currentSubrange, err := c.subrangeAt(currentSubrangeOffset) + if err != nil { + return 0, errors.Wrapf(err, "read position: %d", c.readOffset) + } + + offsetInSubrange := int(c.readOffset - currentSubrangeOffset) + toCopy := len(currentSubrange) - offsetInSubrange + if toCopy <= 0 { + // This can only happen if subrange's length is not subrangeSize, and reader is told to read more data. + return 0, errors.Errorf("no more data left in subrange at position %d, subrange length %d, reading position %d", currentSubrangeOffset, len(currentSubrange), c.readOffset) + } + + if len(p) < toCopy { + toCopy = len(p) + } + if c.remaining < int64(toCopy) { + toCopy = int(c.remaining) // Conversion is safe, c.remaining is small enough. + } + + copy(p, currentSubrange[offsetInSubrange:offsetInSubrange+toCopy]) + c.readOffset += int64(toCopy) + c.remaining -= int64(toCopy) + + return toCopy, nil +} + +func (c *subrangesReader) subrangeAt(offset int64) ([]byte, error) { + b := c.subranges[c.offsetsKeys[offset]] + if b == nil { + return nil, errors.Errorf("subrange for offset %d not found", offset) + } + return b, nil +} diff --git a/pkg/store/cache/caching_bucket_factory.go b/pkg/store/cache/caching_bucket_factory.go new file mode 100644 index 0000000000..0bc0a78458 --- /dev/null +++ b/pkg/store/cache/caching_bucket_factory.go @@ -0,0 +1,64 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package storecache + +import ( + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "gopkg.in/yaml.v2" + + cache "github.com/thanos-io/thanos/pkg/cache" + "github.com/thanos-io/thanos/pkg/cacheutil" + "github.com/thanos-io/thanos/pkg/objstore" +) + +// BucketCacheProvider is a type used to evaluate all bucket cache providers. +type BucketCacheProvider string + +const ( + MemcachedBucketCacheProvider BucketCacheProvider = "memcached" // Memcached cache-provider for caching bucket. +) + +// CachingBucketWithBackendConfig is a configuration of caching bucket used by Store component. +type CachingBucketWithBackendConfig struct { + Type BucketCacheProvider `yaml:"backend"` + BackendConfig interface{} `yaml:"backend_config"` + + CachingBucketConfig CachingBucketConfig `yaml:"caching_config"` +} + +// NewCachingBucketFromYaml uses YAML configuration to create new caching bucket. +func NewCachingBucketFromYaml(yamlContent []byte, bucket objstore.Bucket, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) { + level.Info(logger).Log("msg", "loading caching bucket configuration") + + config := &CachingBucketWithBackendConfig{} + config.CachingBucketConfig = DefaultCachingBucketConfig() + + if err := yaml.UnmarshalStrict(yamlContent, config); err != nil { + return nil, errors.Wrap(err, "parsing config YAML file") + } + + backendConfig, err := yaml.Marshal(config.BackendConfig) + if err != nil { + return nil, errors.Wrap(err, "marshal content of cache backend configuration") + } + + var c cache.Cache + + switch config.Type { + case MemcachedBucketCacheProvider: + var memcached cacheutil.MemcachedClient + memcached, err := cacheutil.NewMemcachedClient(logger, "caching-bucket", backendConfig, reg) + if err != nil { + return nil, errors.Wrapf(err, "failed to create memcached client") + } + c = cache.NewMemcachedCache("caching-bucket", logger, memcached, reg) + default: + return nil, errors.Errorf("unsupported cache type: %s", config.Type) + } + + return NewCachingBucket(bucket, c, config.CachingBucketConfig, logger, reg) +} diff --git a/pkg/store/cache/caching_bucket_test.go b/pkg/store/cache/caching_bucket_test.go new file mode 100644 index 0000000000..2bbc37c887 --- /dev/null +++ b/pkg/store/cache/caching_bucket_test.go @@ -0,0 +1,344 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package storecache + +import ( + "bytes" + "context" + "fmt" + "io" + "io/ioutil" + "sync" + "testing" + "time" + + "github.com/pkg/errors" + promtest "github.com/prometheus/client_golang/prometheus/testutil" + + "github.com/thanos-io/thanos/pkg/objstore" + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestCachingBucket(t *testing.T) { + length := int64(1024 * 1024) + subrangeSize := int64(16000) // All tests are based on this value. + + data := make([]byte, length) + for ix := 0; ix < len(data); ix++ { + data[ix] = byte(ix) + } + + name := "/test/chunks/000001" + + inmem := objstore.NewInMemBucket() + testutil.Ok(t, inmem.Upload(context.Background(), name, bytes.NewReader(data))) + + // We reuse cache between tests (!) + cache := &mockCache{cache: make(map[string][]byte)} + + // Warning, these tests must be run in order, they depend cache state from previous test. + for _, tc := range []struct { + name string + init func() + offset int64 + length int64 + maxGetRangeRequests int + expectedLength int64 + expectedFetchedBytes int64 + expectedCachedBytes int64 + expectedRefetchedBytes int64 + }{ + { + name: "basic test", + offset: 555555, + length: 55555, + expectedLength: 55555, + expectedFetchedBytes: 5 * subrangeSize, + }, + + { + name: "same request will hit all subranges in the cache", + offset: 555555, + length: 55555, + expectedLength: 55555, + expectedCachedBytes: 5 * subrangeSize, + }, + + { + name: "request data close to the end of object", + offset: length - 10, + length: 3000, + expectedLength: 10, + expectedFetchedBytes: 8576, // Last (incomplete) subrange is fetched. + }, + + { + name: "another request data close to the end of object, cached by previous test", + offset: 1040100, + length: subrangeSize, + expectedLength: 8476, + expectedCachedBytes: 8576, + }, + + { + name: "entire object, combination of cached and uncached subranges", + offset: 0, + length: length, + expectedLength: length, + expectedCachedBytes: 5*subrangeSize + 8576, // 5 subrange cached from first test, plus last incomplete subrange. + expectedFetchedBytes: 60 * subrangeSize, + }, + + { + name: "entire object again, everything is cached", + offset: 0, + length: length, + expectedLength: length, + expectedCachedBytes: length, // Entire file is now cached. + }, + + { + name: "entire object again, nothing is cached", + offset: 0, + length: length, + expectedLength: length, + expectedFetchedBytes: length, + expectedCachedBytes: 0, // Cache is flushed. + init: func() { + cache.cache = map[string][]byte{} // Flush cache. + }, + }, + + { + name: "missing first subranges", + offset: 0, + length: 10 * subrangeSize, + expectedLength: 10 * subrangeSize, + expectedFetchedBytes: 3 * subrangeSize, + expectedCachedBytes: 7 * subrangeSize, + init: func() { + // Delete first 3 subranges. + delete(cache.cache, cachingKeyObjectSubrange(name, 0*subrangeSize, 1*subrangeSize)) + delete(cache.cache, cachingKeyObjectSubrange(name, 1*subrangeSize, 2*subrangeSize)) + delete(cache.cache, cachingKeyObjectSubrange(name, 2*subrangeSize, 3*subrangeSize)) + }, + }, + + { + name: "missing last subranges", + offset: 0, + length: 10 * subrangeSize, + expectedLength: 10 * subrangeSize, + expectedFetchedBytes: 3 * subrangeSize, + expectedCachedBytes: 7 * subrangeSize, + init: func() { + // Delete last 3 subranges. + delete(cache.cache, cachingKeyObjectSubrange(name, 7*subrangeSize, 8*subrangeSize)) + delete(cache.cache, cachingKeyObjectSubrange(name, 8*subrangeSize, 9*subrangeSize)) + delete(cache.cache, cachingKeyObjectSubrange(name, 9*subrangeSize, 10*subrangeSize)) + }, + }, + + { + name: "missing middle subranges", + offset: 0, + length: 10 * subrangeSize, + expectedLength: 10 * subrangeSize, + expectedFetchedBytes: 3 * subrangeSize, + expectedCachedBytes: 7 * subrangeSize, + init: func() { + // Delete 3 subranges in the middle. + delete(cache.cache, cachingKeyObjectSubrange(name, 3*subrangeSize, 4*subrangeSize)) + delete(cache.cache, cachingKeyObjectSubrange(name, 4*subrangeSize, 5*subrangeSize)) + delete(cache.cache, cachingKeyObjectSubrange(name, 5*subrangeSize, 6*subrangeSize)) + }, + }, + + { + name: "missing everything except middle subranges", + offset: 0, + length: 10 * subrangeSize, + expectedLength: 10 * subrangeSize, + expectedFetchedBytes: 7 * subrangeSize, + expectedCachedBytes: 3 * subrangeSize, + init: func() { + // Delete all but 3 subranges in the middle, and keep unlimited number of ranged subrequests. + for i := int64(0); i < 10; i++ { + if i > 0 && i%3 == 0 { + continue + } + delete(cache.cache, cachingKeyObjectSubrange(name, i*subrangeSize, (i+1)*subrangeSize)) + } + }, + }, + + { + name: "missing everything except middle subranges, one subrequest only", + offset: 0, + length: 10 * subrangeSize, + expectedLength: 10 * subrangeSize, + expectedFetchedBytes: 7 * subrangeSize, + expectedCachedBytes: 3 * subrangeSize, + expectedRefetchedBytes: 3 * subrangeSize, // Entire object fetched, 3 subranges are "refetched". + maxGetRangeRequests: 1, + init: func() { + // Delete all but 3 subranges in the middle, but only allow 1 subrequest. + for i := int64(0); i < 10; i++ { + if i == 3 || i == 5 || i == 7 { + continue + } + delete(cache.cache, cachingKeyObjectSubrange(name, i*subrangeSize, (i+1)*subrangeSize)) + } + }, + }, + + { + name: "missing everything except middle subranges, two subrequests", + offset: 0, + length: 10 * subrangeSize, + expectedLength: 10 * subrangeSize, + expectedFetchedBytes: 7 * subrangeSize, + expectedCachedBytes: 3 * subrangeSize, + maxGetRangeRequests: 2, + init: func() { + // Delete all but one subranges in the middle, and allow 2 subrequests. They will be: 0-80000, 128000-160000. + for i := int64(0); i < 10; i++ { + if i == 5 || i == 6 || i == 7 { + continue + } + delete(cache.cache, cachingKeyObjectSubrange(name, i*subrangeSize, (i+1)*subrangeSize)) + } + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + if tc.init != nil { + tc.init() + } + + cfg := DefaultCachingBucketConfig() + cfg.ChunkSubrangeSize = subrangeSize + cfg.MaxChunksGetRangeRequests = tc.maxGetRangeRequests + + cachingBucket, err := NewCachingBucket(inmem, cache, cfg, nil, nil) + testutil.Ok(t, err) + + verifyGetRange(t, cachingBucket, name, tc.offset, tc.length, tc.expectedLength) + testutil.Equals(t, tc.expectedCachedBytes, int64(promtest.ToFloat64(cachingBucket.fetchedChunkBytes.WithLabelValues(originCache)))) + testutil.Equals(t, tc.expectedFetchedBytes, int64(promtest.ToFloat64(cachingBucket.fetchedChunkBytes.WithLabelValues(originBucket)))) + testutil.Equals(t, tc.expectedRefetchedBytes, int64(promtest.ToFloat64(cachingBucket.refetchedChunkBytes.WithLabelValues(originCache)))) + }) + } +} + +func verifyGetRange(t *testing.T, cachingBucket *CachingBucket, name string, offset, length int64, expectedLength int64) { + t.Helper() + + r, err := cachingBucket.GetRange(context.Background(), name, offset, length) + testutil.Ok(t, err) + + read, err := ioutil.ReadAll(r) + testutil.Ok(t, err) + testutil.Equals(t, expectedLength, int64(len(read))) + + for ix := 0; ix < len(read); ix++ { + if byte(ix)+byte(offset) != read[ix] { + t.Fatalf("bytes differ at position %d", ix) + } + } +} + +type mockCache struct { + mu sync.Mutex + cache map[string][]byte +} + +func (m *mockCache) Store(_ context.Context, data map[string][]byte, _ time.Duration) { + m.mu.Lock() + defer m.mu.Unlock() + for key, val := range data { + m.cache[key] = val + } +} + +func (m *mockCache) Fetch(_ context.Context, keys []string) map[string][]byte { + m.mu.Lock() + defer m.mu.Unlock() + + found := make(map[string][]byte, len(keys)) + + for _, k := range keys { + v, ok := m.cache[k] + if ok { + found[k] = v + } + } + + return found +} + +func TestMergeRanges(t *testing.T) { + for ix, tc := range []struct { + input []rng + limit int64 + expected []rng + }{ + { + input: nil, + limit: 0, + expected: nil, + }, + + { + input: []rng{{start: 0, end: 100}, {start: 100, end: 200}, {start: 500, end: 1000}}, + limit: 0, + expected: []rng{{start: 0, end: 200}, {start: 500, end: 1000}}, + }, + + { + input: []rng{{start: 0, end: 100}, {start: 500, end: 1000}}, + limit: 300, + expected: []rng{{start: 0, end: 100}, {start: 500, end: 1000}}, + }, + { + input: []rng{{start: 0, end: 100}, {start: 500, end: 1000}}, + limit: 400, + expected: []rng{{start: 0, end: 1000}}, + }, + } { + t.Run(fmt.Sprintf("%d", ix), func(t *testing.T) { + testutil.Equals(t, tc.expected, mergeRanges(tc.input, tc.limit)) + }) + } +} + +func TestInvalidOffsetAndLength(t *testing.T) { + b := &testBucket{objstore.NewInMemBucket()} + c, err := NewCachingBucket(b, &mockCache{cache: make(map[string][]byte)}, DefaultCachingBucketConfig(), nil, nil) + testutil.Ok(t, err) + + r, err := c.GetRange(context.Background(), "test", -1, 1000) + testutil.Equals(t, nil, r) + testutil.NotOk(t, err) + + r, err = c.GetRange(context.Background(), "test", 100, -1) + testutil.Equals(t, nil, r) + testutil.NotOk(t, err) +} + +type testBucket struct { + *objstore.InMemBucket +} + +func (b *testBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { + if off < 0 { + return nil, errors.Errorf("invalid offset: %d", off) + } + + if length <= 0 { + return nil, errors.Errorf("invalid length: %d", length) + } + + return b.InMemBucket.GetRange(ctx, name, off, length) +}