From 93343ca6d2509dd66eefb691cb08c25ae8bf8c1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 27 Apr 2020 09:57:12 +0200 Subject: [PATCH 01/34] Added generic cache interface. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/cache/cache.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 pkg/cache/cache.go diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go new file mode 100644 index 0000000000..0cba44f2ba --- /dev/null +++ b/pkg/cache/cache.go @@ -0,0 +1,16 @@ +package cache + +import ( + "context" + "time" +) + +// Generic cache +type Cache interface { + // Store data into the cache. If data for given key is nil, data is removed from cache instead. + // Only positive ttl values are used. + Store(ctx context.Context, data map[string][]byte, ttl time.Duration) + + // Fetch multiple keys from cache. + Fetch(ctx context.Context, keys []string) (found map[string][]byte, missing []string) +} From 515a0f7b6426f0b68e5ef1f336b312eae50f6c38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 27 Apr 2020 10:10:06 +0200 Subject: [PATCH 02/34] Added memcached implementation of Cache. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/cache/memcached.go | 85 +++++++++++++++++++++++ pkg/cache/memcached_test.go | 132 ++++++++++++++++++++++++++++++++++++ 2 files changed, 217 insertions(+) create mode 100644 pkg/cache/memcached.go create mode 100644 pkg/cache/memcached_test.go diff --git a/pkg/cache/memcached.go b/pkg/cache/memcached.go new file mode 100644 index 0000000000..71a6ba2705 --- /dev/null +++ b/pkg/cache/memcached.go @@ -0,0 +1,85 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package cache + +import ( + "context" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/thanos-io/thanos/pkg/cacheutil" +) + +// MemcachedCache is a memcached-based cache. +type MemcachedCache struct { + logger log.Logger + memcached cacheutil.MemcachedClient + + // Metrics. + requests prometheus.Counter + hits prometheus.Counter +} + +// NewMemcachedCache makes a new MemcachedCache. +func NewMemcachedCache(logger log.Logger, memcached cacheutil.MemcachedClient, reg prometheus.Registerer) *MemcachedCache { + c := &MemcachedCache{ + logger: logger, + memcached: memcached, + } + + c.requests = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_store_memcached_requests_total", + Help: "Total number of items requests to the memcached.", + }) + + c.hits = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_store_memcached_hits_total", + Help: "Total number of items requests to the cache that were a hit.", + }) + + level.Info(logger).Log("msg", "created memcached cache") + + return c +} + +// Store data identified by keys. +// The function enqueues the request and returns immediately: the entry will be +// asynchronously stored in the cache. +func (c *MemcachedCache) Store(ctx context.Context, data map[string][]byte, ttl time.Duration) { + for key, val := range data { + if err := c.memcached.SetAsync(ctx, key, val, ttl); err != nil { + level.Error(c.logger).Log("msg", "failed to cache postings in memcached", "err", err) + } + } +} + +// FetchMultiPostings fetches multiple postings - each identified by a label - +// and returns a map containing cache hits, along with a list of missing keys. +// In case of error, it logs and return an empty cache hits map. +func (c *MemcachedCache) Fetch(ctx context.Context, keys []string) (map[string][]byte, []string) { + // Fetch the keys from memcached in a single request. + c.requests.Add(float64(len(keys))) + results := c.memcached.GetMulti(ctx, keys) + if len(results) == 0 { + return nil, keys + } + + // Construct the resulting hits map and list of missing keys. We iterate on the input + // list of labels to be able to easily create the list of ones in a single iteration. + var misses []string + for _, key := range keys { + _, ok := results[key] + if !ok { + misses = append(misses, key) + continue + } + } + + c.hits.Add(float64(len(results))) + return results, misses +} diff --git a/pkg/cache/memcached_test.go b/pkg/cache/memcached_test.go new file mode 100644 index 0000000000..11720d6a37 --- /dev/null +++ b/pkg/cache/memcached_test.go @@ -0,0 +1,132 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package cache + +import ( + "context" + "testing" + "time" + + "github.com/go-kit/kit/log" + "github.com/pkg/errors" + prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" + + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestMemcachedIndexCache(t *testing.T) { + t.Parallel() + + // Init some data to conveniently define test cases later one. + key1 := "key1" + key2 := "key2" + key3 := "key3" + value1 := []byte{1} + value2 := []byte{2} + value3 := []byte{3} + + tests := map[string]struct { + setup map[string][]byte + mockedErr error + fetchKeys []string + expectedHits map[string][]byte + expectedMisses []string + }{ + "should return no hits on empty cache": { + setup: nil, + fetchKeys: []string{key1, key2}, + expectedHits: nil, + expectedMisses: []string{key1, key2}, + }, + "should return no misses on 100% hit ratio": { + setup: map[string][]byte{ + key1: value1, + key2: value2, + key3: value3, + }, + fetchKeys: []string{key1}, + expectedHits: map[string][]byte{ + key1: value1, + }, + expectedMisses: nil, + }, + "should return hits and misses on partial hits": { + setup: map[string][]byte{ + key1: value1, + key2: value2, + }, + fetchKeys: []string{key1, key3}, + expectedHits: map[string][]byte{key1: value1}, + expectedMisses: []string{key3}, + }, + "should return no hits on memcached error": { + setup: map[string][]byte{ + key1: value1, + key2: value2, + key3: value3, + }, + mockedErr: errors.New("mocked error"), + fetchKeys: []string{key1}, + expectedHits: nil, + expectedMisses: []string{key1}, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + memcached := newMockedMemcachedClient(testData.mockedErr) + c := NewMemcachedCache(log.NewNopLogger(), memcached, nil) + + // Store the postings expected before running the test. + ctx := context.Background() + c.Store(ctx, testData.setup, time.Hour) + + // Fetch postings from cached and assert on it. + hits, misses := c.Fetch(ctx, testData.fetchKeys) + testutil.Equals(t, testData.expectedHits, hits) + testutil.Equals(t, testData.expectedMisses, misses) + + // Assert on metrics. + testutil.Equals(t, float64(len(testData.fetchKeys)), prom_testutil.ToFloat64(c.requests)) + testutil.Equals(t, float64(len(testData.expectedHits)), prom_testutil.ToFloat64(c.hits)) + }) + } +} + +type mockedMemcachedClient struct { + cache map[string][]byte + mockedGetMultiErr error +} + +func newMockedMemcachedClient(mockedGetMultiErr error) *mockedMemcachedClient { + return &mockedMemcachedClient{ + cache: map[string][]byte{}, + mockedGetMultiErr: mockedGetMultiErr, + } +} + +func (c *mockedMemcachedClient) GetMulti(_ context.Context, keys []string) map[string][]byte { + if c.mockedGetMultiErr != nil { + return nil + } + + hits := map[string][]byte{} + + for _, key := range keys { + if value, ok := c.cache[key]; ok { + hits[key] = value + } + } + + return hits +} + +func (c *mockedMemcachedClient) SetAsync(_ context.Context, key string, value []byte, _ time.Duration) error { + c.cache[key] = value + return nil +} + +func (c *mockedMemcachedClient) Stop() { + // Nothing to do. +} From 967dd7a1907d6a6e0ed31f54058fb59b28f8f197 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 27 Apr 2020 14:29:55 +0200 Subject: [PATCH 03/34] Chunks-caching bucket. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- cmd/thanos/store.go | 18 + pkg/cache/cache.go | 8 +- pkg/cache/memcached.go | 9 +- pkg/store/cache/caching_bucket.go | 389 ++++++++++++++++++++++ pkg/store/cache/caching_bucket_factory.go | 59 ++++ pkg/store/cache/caching_bucket_test.go | 336 +++++++++++++++++++ 6 files changed, 812 insertions(+), 7 deletions(-) create mode 100644 pkg/store/cache/caching_bucket.go create mode 100644 pkg/store/cache/caching_bucket_factory.go create mode 100644 pkg/store/cache/caching_bucket_test.go diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index fcd8f6da29..76bb73a4c1 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -55,6 +55,10 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { "YAML file that contains index cache configuration. See format details: https://thanos.io/components/store.md/#index-cache", false) + cachingBucketConfig := extflag.RegisterPathOrContent(cmd, "experimental.caching-bucket.config", + "YAML file that contains configuration for caching bucket. Experimental feature, with high risk of changes. See format details: https://thanos.io/components/store.md/#TBD", + false) + chunkPoolSize := cmd.Flag("chunk-pool-size", "Maximum size of concurrently allocatable bytes reserved strictly to reuse for chunks in memory."). Default("2GB").Bytes() @@ -149,6 +153,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { *webExternalPrefix, *webPrefixHeaderName, *postingOffsetsInMemSampling, + cachingBucketConfig, ) } } @@ -179,6 +184,7 @@ func runStore( ignoreDeletionMarksDelay time.Duration, externalPrefix, prefixHeader string, postingOffsetsInMemSampling int, + cachingBucketConfig *extflag.PathOrContent, ) error { grpcProbe := prober.NewGRPC() httpProbe := prober.NewHTTP() @@ -214,6 +220,18 @@ func runStore( return errors.Wrap(err, "create bucket client") } + cachingBucketConfigYaml, err := cachingBucketConfig.Content() + if err != nil { + return errors.Wrap(err, "get caching bucket configuration") + } + if len(cachingBucketConfigYaml) > 0 { + newbkt, err := storecache.NewCachingBucketFromYaml(cachingBucketConfigYaml, bkt, logger, reg) + if err != nil { + return errors.Wrap(err, "create caching bucket") + } + bkt = newbkt + } + relabelContentYaml, err := selectorRelabelConf.Content() if err != nil { return errors.Wrap(err, "get content of relabel configuration") diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 0cba44f2ba..804a966e9d 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -1,3 +1,6 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + package cache import ( @@ -7,8 +10,9 @@ import ( // Generic cache type Cache interface { - // Store data into the cache. If data for given key is nil, data is removed from cache instead. - // Only positive ttl values are used. + // Store data into the cache. + // + // Note that individual byte buffers may be retained by the cache! Store(ctx context.Context, data map[string][]byte, ttl time.Duration) // Fetch multiple keys from cache. diff --git a/pkg/cache/memcached.go b/pkg/cache/memcached.go index 71a6ba2705..2da9b5bf18 100644 --- a/pkg/cache/memcached.go +++ b/pkg/cache/memcached.go @@ -33,12 +33,12 @@ func NewMemcachedCache(logger log.Logger, memcached cacheutil.MemcachedClient, r } c.requests = promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_store_memcached_requests_total", + Name: "thanos_cache_memcached_requests_total", Help: "Total number of items requests to the memcached.", }) c.hits = promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_store_memcached_hits_total", + Name: "thanos_cache_memcached_hits_total", Help: "Total number of items requests to the cache that were a hit.", }) @@ -53,13 +53,12 @@ func NewMemcachedCache(logger log.Logger, memcached cacheutil.MemcachedClient, r func (c *MemcachedCache) Store(ctx context.Context, data map[string][]byte, ttl time.Duration) { for key, val := range data { if err := c.memcached.SetAsync(ctx, key, val, ttl); err != nil { - level.Error(c.logger).Log("msg", "failed to cache postings in memcached", "err", err) + level.Error(c.logger).Log("msg", "failed to store data into memcached", "err", err) } } } -// FetchMultiPostings fetches multiple postings - each identified by a label - -// and returns a map containing cache hits, along with a list of missing keys. +// Fetch fetches multiple keys and returns a map containing cache hits, along with a list of missing keys. // In case of error, it logs and return an empty cache hits map. func (c *MemcachedCache) Fetch(ctx context.Context, keys []string) (map[string][]byte, []string) { // Fetch the keys from memcached in a single request. diff --git a/pkg/store/cache/caching_bucket.go b/pkg/store/cache/caching_bucket.go new file mode 100644 index 0000000000..7b544945ad --- /dev/null +++ b/pkg/store/cache/caching_bucket.go @@ -0,0 +1,389 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package storecache + +import ( + "context" + "encoding/binary" + "fmt" + "io" + "regexp" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "golang.org/x/sync/errgroup" + + "github.com/thanos-io/thanos/pkg/cache" + "github.com/thanos-io/thanos/pkg/objstore" + "github.com/thanos-io/thanos/pkg/runutil" +) + +type ChunksCachingConfig struct { + // Basic unit used to cache chunks. + ChunkBlockSize int64 `yaml:"chunk_block_size"` + + // Maximum number of GetRange requests issued by this bucket for single GetRange call. Zero or negative value = unlimited. + MaxChunksGetRangeRequests int `yaml:"max_chunks_get_range_requests"` + + // TTLs for various cache items. + ChunkObjectSizeTTL time.Duration `yaml:"chunk_object_size_ttl"` + ChunkBlockTTL time.Duration `yaml:"chunk_block_ttl"` +} + +func DefaultChunksCachingConfig() ChunksCachingConfig { + return ChunksCachingConfig{ + ChunkBlockSize: 16000, // equal to max chunk size + ChunkObjectSizeTTL: 24 * time.Hour, + ChunkBlockTTL: 24 * time.Hour, + MaxChunksGetRangeRequests: 1, + } +} + +// Bucket implementation that provides some caching features, using knowledge about how Thanos accesses data. +type CachingBucket struct { + bucket objstore.Bucket + cache cache.Cache + + chunks ChunksCachingConfig + + logger log.Logger + cachedChunkBytes prometheus.Counter + fetchedChunkBytes prometheus.Counter +} + +func NewCachingBucket(b objstore.Bucket, c cache.Cache, chunks ChunksCachingConfig, logger log.Logger, reg prometheus.Registerer) (*CachingBucket, error) { + if b == nil { + return nil, errors.New("bucket is nil") + } + if c == nil { + return nil, errors.New("cache is nil") + } + + return &CachingBucket{ + chunks: chunks, + bucket: b, + cache: c, + logger: logger, + + cachedChunkBytes: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_store_caching_bucket_cached_chunk_bytes_total", + Help: "Total number of chunk bytes used from cache", + }), + fetchedChunkBytes: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_store_caching_bucket_fetched_chunk_bytes_total", + Help: "Total number of chunk bytes fetched from storage", + }), + }, nil +} + +func (cb *CachingBucket) Close() error { + return cb.bucket.Close() +} + +func (cb *CachingBucket) Upload(ctx context.Context, name string, r io.Reader) error { + return cb.bucket.Upload(ctx, name, r) +} + +func (cb *CachingBucket) Delete(ctx context.Context, name string) error { + return cb.bucket.Delete(ctx, name) +} + +func (cb *CachingBucket) Name() string { + return "caching: " + cb.bucket.Name() +} + +func (cb *CachingBucket) WithExpectedErrs(expectedFunc objstore.IsOpFailureExpectedFunc) objstore.Bucket { + if ib, ok := cb.bucket.(objstore.InstrumentedBucket); ok { + // make a copy, but replace bucket + res := &CachingBucket{} + *res = *cb + res.bucket = ib.WithExpectedErrs(expectedFunc) + return res + } + + // nothing else to do (?) + return cb +} + +func (cb *CachingBucket) ReaderWithExpectedErrs(expectedFunc objstore.IsOpFailureExpectedFunc) objstore.BucketReader { + return cb.WithExpectedErrs(expectedFunc) +} + +func (cb *CachingBucket) Iter(ctx context.Context, dir string, f func(string) error) error { + return cb.bucket.Iter(ctx, dir, f) +} + +func (cb *CachingBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { + return cb.bucket.Get(ctx, name) +} + +var chunksMatcher = regexp.MustCompile(`^.*/chunks/\d+$`) + +func isTSDBChunkFile(name string) bool { + return chunksMatcher.MatchString(name) +} + +func (cb *CachingBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { + if isTSDBChunkFile(name) && off >= 0 && length > 0 { + return cb.getRangeChunkFile(ctx, name, off, length) + } + + return cb.bucket.GetRange(ctx, name, off, length) +} + +func (cb *CachingBucket) Exists(ctx context.Context, name string) (bool, error) { + return cb.bucket.Exists(ctx, name) +} + +func (cb *CachingBucket) IsObjNotFoundErr(err error) bool { + return cb.bucket.IsObjNotFoundErr(err) +} + +func (cb *CachingBucket) ObjectSize(ctx context.Context, name string) (uint64, error) { + return cb.bucket.ObjectSize(ctx, name) +} + +func (cb *CachingBucket) cachedObjectSize(ctx context.Context, name string, ttl time.Duration) (uint64, error) { + key := cachingKeyObjectSize(name) + + hits, _ := cb.cache.Fetch(ctx, []string{key}) + if s := hits[key]; len(s) == 8 { + return binary.BigEndian.Uint64(s), nil + } + + size, err := cb.bucket.ObjectSize(ctx, name) + if err != nil { + return 0, err + } + + var buf [8]byte + binary.BigEndian.PutUint64(buf[:], size) + cb.cache.Store(ctx, map[string][]byte{key: buf[:]}, ttl) + + return size, nil +} + +func (cb *CachingBucket) getRangeChunkFile(ctx context.Context, name string, offset, length int64) (io.ReadCloser, error) { + size, err := cb.cachedObjectSize(ctx, name, cb.chunks.ChunkObjectSizeTTL) + if err != nil { + return nil, errors.Wrapf(err, "failed to get size of chunk file: %s", name) + } + + // If length goes over object size, adjust length. We use it later to limit number of read bytes. + if uint64(offset+length) > size { + length = int64(size - uint64(offset)) + } + + startOffset := (offset / cb.chunks.ChunkBlockSize) * cb.chunks.ChunkBlockSize + endOffset := ((offset + length) / cb.chunks.ChunkBlockSize) * cb.chunks.ChunkBlockSize + if (offset+length)%cb.chunks.ChunkBlockSize > 0 { + endOffset += cb.chunks.ChunkBlockSize + } + + blocks := (endOffset - startOffset) / cb.chunks.ChunkBlockSize + + offsetKeys := make(map[int64]string, blocks) + keys := make([]string, 0, blocks) + lastBlockLength := 0 + + for o := startOffset; o < endOffset; o += cb.chunks.ChunkBlockSize { + end := o + cb.chunks.ChunkBlockSize + if end > int64(size) { + end = int64(size) + } + + lastBlockLength = int(end - o) + + k := cachingKeyObjectBlock(name, o, end) + keys = append(keys, k) + offsetKeys[o] = k + } + + // Try to get all blocks from the cache. + hits, _ := cb.cache.Fetch(ctx, keys) + + for _, b := range hits { + cb.cachedChunkBytes.Add(float64(len(b))) + } + + if len(hits) < len(keys) { + if hits == nil { + hits = map[string][]byte{} + } + + err := cb.fetchMissingChunkBlocks(ctx, name, startOffset, endOffset, offsetKeys, hits, lastBlockLength) + if err != nil { + return nil, err + } + } + + return newBlocksReader(cb.chunks.ChunkBlockSize, offsetKeys, hits, offset, length), nil +} + +type rng struct { + start, end int64 +} + +// Fetches missing blocks and stores them into "hits" map. +func (cb *CachingBucket) fetchMissingChunkBlocks(ctx context.Context, name string, startOffset, endOffset int64, cacheKeys map[int64]string, hits map[string][]byte, lastBlockLength int) error { + // Ordered list of missing sub-ranges. + var missing []rng + + for off := startOffset; off < endOffset; off += cb.chunks.ChunkBlockSize { + if hits[cacheKeys[off]] == nil { + missing = append(missing, rng{start: off, end: off + cb.chunks.ChunkBlockSize}) + } + } + + missing = mergeRanges(missing, 0) // Merge adjacent ranges. + // Keep merging until we have only max number of ranges (= requests). + for limit := cb.chunks.ChunkBlockSize; cb.chunks.MaxChunksGetRangeRequests > 0 && len(missing) > cb.chunks.MaxChunksGetRangeRequests; limit = limit * 2 { + missing = mergeRanges(missing, limit) + } + + var hitsMutex sync.Mutex + + // Run parallel queries for each missing range. Fetched data is stored into 'hits' map, protected by hitsMutex. + eg, nctx := errgroup.WithContext(ctx) + for _, m := range missing { + m := m + eg.Go(func() error { + r, err := cb.bucket.GetRange(nctx, name, m.start, m.end-m.start) + if err != nil { + return errors.Wrapf(err, "fetching range [%d, %d]", m.start, m.end) + } + defer runutil.CloseWithLogOnErr(cb.logger, r, "fetching range [%d, %d]", m.start, m.end) + + for off := m.start; off < m.end && nctx.Err() == nil; off += cb.chunks.ChunkBlockSize { + key := cacheKeys[off] + if key == "" { + return errors.Errorf("fetching range [%d, %d]: caching key for offset %d not found", m.start, m.end, off) + } + + // We need a new buffer for each block, both for storing into hits, and also for caching. + blockData := make([]byte, cb.chunks.ChunkBlockSize) + n, err := io.ReadFull(r, blockData) + if err == io.ErrUnexpectedEOF && (off+cb.chunks.ChunkBlockSize) == endOffset && n == lastBlockLength { + // Last block can be shorter. + err = nil + blockData = blockData[:n] + } + if err != nil { + return errors.Wrapf(err, "fetching range [%d, %d]", m.start, m.end) + } + + hitsMutex.Lock() + hits[key] = blockData + hitsMutex.Unlock() + + cb.fetchedChunkBytes.Add(float64(len(blockData))) + cb.cache.Store(nctx, map[string][]byte{key: blockData}, cb.chunks.ChunkBlockTTL) + } + + return nctx.Err() + }) + } + + return eg.Wait() +} + +// Merges ranges that are close to each other. Modifies input. +func mergeRanges(input []rng, limit int64) []rng { + if len(input) == 0 { + return input + } + + last := 0 + for ix := 1; ix < len(input); ix++ { + if (input[ix].start - input[last].end) <= limit { + input[last].end = input[ix].end + } else { + last++ + input[last] = input[ix] + } + } + return input[:last+1] +} + +func cachingKeyObjectSize(name string) string { + return fmt.Sprintf("size:%s", name) +} + +func cachingKeyObjectBlock(name string, start int64, end int64) string { + return fmt.Sprintf("block:%s:%d:%d", name, start, end) +} + +// io.ReadCloser implementation that uses in-memory blocks. +type blocksReader struct { + blockSize int64 + + // Mapping of blockSize-aligned offsets to keys in hits. + offsetsKeys map[int64]string + blocks map[string][]byte + + // Offset for next read, used to find correct block to return data from. + readOffset int64 + + // Remaining data to return from this reader. Once zero, this reader reports EOF. + remaining int64 +} + +func newBlocksReader(blockSize int64, offsetsKeys map[int64]string, blocks map[string][]byte, readOffset, remaining int64) *blocksReader { + return &blocksReader{ + blockSize: blockSize, + offsetsKeys: offsetsKeys, + blocks: blocks, + + readOffset: readOffset, + remaining: remaining, + } +} + +func (c *blocksReader) Close() error { + return nil +} + +func (c *blocksReader) Read(p []byte) (n int, err error) { + if c.remaining <= 0 { + return 0, io.EOF + } + + currentBlockOffset := (c.readOffset / c.blockSize) * c.blockSize + currentBlock, err := c.blockAt(currentBlockOffset) + if err != nil { + return 0, errors.Wrapf(err, "read position: %d", c.readOffset) + } + + offsetInBlock := int(c.readOffset - currentBlockOffset) + toCopy := len(currentBlock) - offsetInBlock + if toCopy <= 0 { + // This can only happen if block's length is not blockSize, and reader is told to read more data. + return 0, errors.Errorf("no more data left in block at position %d, block length %d, reading position %d", currentBlockOffset, len(currentBlock), c.readOffset) + } + + if len(p) < toCopy { + toCopy = len(p) + } + if c.remaining < int64(toCopy) { + toCopy = int(c.remaining) // Conversion is safe, c.remaining is small enough. + } + + copy(p, currentBlock[offsetInBlock:offsetInBlock+toCopy]) + c.readOffset += int64(toCopy) + c.remaining -= int64(toCopy) + + return toCopy, nil +} + +func (c *blocksReader) blockAt(offset int64) ([]byte, error) { + b := c.blocks[c.offsetsKeys[offset]] + if b == nil { + return nil, errors.Errorf("block for offset %d not found", offset) + } + return b, nil +} diff --git a/pkg/store/cache/caching_bucket_factory.go b/pkg/store/cache/caching_bucket_factory.go new file mode 100644 index 0000000000..d9f1840af6 --- /dev/null +++ b/pkg/store/cache/caching_bucket_factory.go @@ -0,0 +1,59 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package storecache + +import ( + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "gopkg.in/yaml.v2" + + cache "github.com/thanos-io/thanos/pkg/cache" + "github.com/thanos-io/thanos/pkg/cacheutil" + "github.com/thanos-io/thanos/pkg/objstore" +) + +type BucketCacheProvider string + +const MemcachedBucketCacheProvider BucketCacheProvider = "memcached" + +type CachingBucketConfig struct { + Type BucketCacheProvider `yaml:"backend"` + CacheConfig interface{} `yaml:"backend_config"` + + ChunksCachingConfig ChunksCachingConfig `yaml:"chunks_caching_config"` +} + +func NewCachingBucketFromYaml(yamlContent []byte, bucket objstore.Bucket, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) { + level.Info(logger).Log("msg", "loading caching bucket configuration") + + config := &CachingBucketConfig{} + config.ChunksCachingConfig = DefaultChunksCachingConfig() + + if err := yaml.UnmarshalStrict(yamlContent, config); err != nil { + return nil, errors.Wrap(err, "parsing config YAML file") + } + + backendConfig, err := yaml.Marshal(config.CacheConfig) + if err != nil { + return nil, errors.Wrap(err, "marshal content of cache backend configuration") + } + + var c cache.Cache + + switch config.Type { + case MemcachedBucketCacheProvider: + var memcached cacheutil.MemcachedClient + memcached, err := cacheutil.NewMemcachedClient(logger, "cache-bucket", backendConfig, reg) + if err != nil { + return nil, errors.Wrapf(err, "failed to create memcached client") + } + c = cache.NewMemcachedCache(logger, memcached, reg) + default: + return nil, errors.Errorf("unsupported cache type: %s", config.Type) + } + + return NewCachingBucket(bucket, c, config.ChunksCachingConfig, logger, reg) +} diff --git a/pkg/store/cache/caching_bucket_test.go b/pkg/store/cache/caching_bucket_test.go new file mode 100644 index 0000000000..68dbd8ad0c --- /dev/null +++ b/pkg/store/cache/caching_bucket_test.go @@ -0,0 +1,336 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package storecache + +import ( + "bytes" + "context" + "fmt" + "io/ioutil" + "sync" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + + "github.com/thanos-io/thanos/pkg/objstore" + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestCachingBucket(t *testing.T) { + length := int64(1024 * 1024) + blockSize := int64(16000) // all tests are based on this value + + data := make([]byte, length) + for ix := 0; ix < len(data); ix++ { + data[ix] = byte(ix) + } + + name := "/test/chunks/000001" + + inmem := objstore.NewInMemBucket() + testutil.Ok(t, inmem.Upload(context.Background(), name, bytes.NewReader(data))) + + cache := &mockCache{cache: make(map[string][]byte)} + + cachingBucket, err := NewCachingBucket(inmem, cache, DefaultChunksCachingConfig(), nil, nil) + testutil.Ok(t, err) + cachingBucket.chunks.ChunkBlockSize = blockSize + + // Warning, these tests must be run in order, they depend cache state from previous test. + for _, tc := range []struct { + name string + init func() + offset int64 + length int64 + maxGetRangeRequests int + expectedLength int64 + expectedFetchedBytes int64 + expectedCachedBytes int64 + }{ + { + name: "basic test", + offset: 555555, + length: 55555, + expectedLength: 55555, + expectedFetchedBytes: 5 * blockSize, + }, + + { + name: "same request will hit all blocks in the cache", + offset: 555555, + length: 55555, + expectedLength: 55555, + expectedCachedBytes: 5 * blockSize, + }, + + { + name: "request data close to the end of object", + offset: length - 10, + length: 3000, + expectedLength: 10, + expectedFetchedBytes: 8576, // last block is fetched + }, + + { + name: "another request data close to the end of object, cached by previous test", + offset: 1040100, + length: blockSize, + expectedLength: 8476, + expectedCachedBytes: 8576, + }, + + { + name: "entire object, combination of cached and uncached blocks", + offset: 0, + length: length, + expectedLength: length, + expectedCachedBytes: 5*blockSize + 8576, // 5 block cached from first test, plus last incomplete block + expectedFetchedBytes: 60 * blockSize, + }, + + { + name: "entire object again, everything is cached", + offset: 0, + length: length, + expectedLength: length, + expectedCachedBytes: length, // entire file is now cached + }, + + { + name: "entire object again, nothing is cached", + offset: 0, + length: length, + expectedLength: length, + expectedFetchedBytes: length, // cache is flushed + init: func() { + cache.cache = map[string][]byte{} // flush cache + }, + }, + + { + name: "missing first blocks", + offset: 0, + length: 10 * blockSize, + expectedLength: 10 * blockSize, + expectedFetchedBytes: 3 * blockSize, + expectedCachedBytes: 7 * blockSize, + init: func() { + // delete first 3 blocks + delete(cache.cache, cachingKeyObjectBlock(name, 0*blockSize, 1*blockSize)) + delete(cache.cache, cachingKeyObjectBlock(name, 1*blockSize, 2*blockSize)) + delete(cache.cache, cachingKeyObjectBlock(name, 2*blockSize, 3*blockSize)) + }, + }, + + { + name: "missing last blocks", + offset: 0, + length: 10 * blockSize, + expectedLength: 10 * blockSize, + expectedFetchedBytes: 3 * blockSize, + expectedCachedBytes: 7 * blockSize, + init: func() { + // delete last 3 blocks + delete(cache.cache, cachingKeyObjectBlock(name, 7*blockSize, 8*blockSize)) + delete(cache.cache, cachingKeyObjectBlock(name, 8*blockSize, 9*blockSize)) + delete(cache.cache, cachingKeyObjectBlock(name, 9*blockSize, 10*blockSize)) + }, + }, + + { + name: "missing middle blocks", + offset: 0, + length: 10 * blockSize, + expectedLength: 10 * blockSize, + expectedFetchedBytes: 3 * blockSize, + expectedCachedBytes: 7 * blockSize, + init: func() { + // Delete 3 blocks in the middle. + delete(cache.cache, cachingKeyObjectBlock(name, 3*blockSize, 4*blockSize)) + delete(cache.cache, cachingKeyObjectBlock(name, 4*blockSize, 5*blockSize)) + delete(cache.cache, cachingKeyObjectBlock(name, 5*blockSize, 6*blockSize)) + }, + }, + + { + name: "missing everything except middle blocks", + offset: 0, + length: 10 * blockSize, + expectedLength: 10 * blockSize, + expectedFetchedBytes: 7 * blockSize, + expectedCachedBytes: 3 * blockSize, + init: func() { + // Delete all but 3 blocks in the middle, and keep unlimited number of ranged subrequests. + for i := int64(0); i < 10; i++ { + if i > 0 && i%3 == 0 { + continue + } + delete(cache.cache, cachingKeyObjectBlock(name, i*blockSize, (i+1)*blockSize)) + } + }, + }, + + { + name: "missing everything except middle blocks, one subrequest only", + offset: 0, + length: 10 * blockSize, + expectedLength: 10 * blockSize, + expectedFetchedBytes: 10 * blockSize, // We need to fetch beginning and end in single request. + expectedCachedBytes: 3 * blockSize, + maxGetRangeRequests: 1, + init: func() { + // Delete all but 3 blocks in the middle, but only allow 1 subrequest. + for i := int64(0); i < 10; i++ { + if i == 3 || i == 5 || i == 7 { + continue + } + delete(cache.cache, cachingKeyObjectBlock(name, i*blockSize, (i+1)*blockSize)) + } + }, + }, + + { + name: "missing everything except middle blocks, two subrequests", + offset: 0, + length: 10 * blockSize, + expectedLength: 10 * blockSize, + expectedFetchedBytes: 7 * blockSize, + expectedCachedBytes: 3 * blockSize, + maxGetRangeRequests: 2, + init: func() { + // Delete all but one blocks in the middle, and allow 2 subrequests. They will be: 0-80000, 128000-160000. + for i := int64(0); i < 10; i++ { + if i == 5 || i == 6 || i == 7 { + continue + } + delete(cache.cache, cachingKeyObjectBlock(name, i*blockSize, (i+1)*blockSize)) + } + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + if tc.init != nil { + tc.init() + } + cachedBytes, fetchedBytes := &counter{}, &counter{} + cachingBucket.cachedChunkBytes = cachedBytes + cachingBucket.fetchedChunkBytes = fetchedBytes + cachingBucket.chunks.MaxChunksGetRangeRequests = tc.maxGetRangeRequests + + verifyGetRange(t, cachingBucket, name, tc.offset, tc.length, tc.expectedLength) + testutil.Equals(t, tc.expectedCachedBytes, int64(cachedBytes.Get())) + testutil.Equals(t, tc.expectedFetchedBytes, int64(fetchedBytes.Get())) + }) + } +} + +func verifyGetRange(t *testing.T, cachingBucket *CachingBucket, name string, offset, length int64, expectedLength int64) { + t.Helper() + + r, err := cachingBucket.GetRange(context.Background(), name, offset, length) + testutil.Ok(t, err) + + read, err := ioutil.ReadAll(r) + testutil.Ok(t, err) + testutil.Equals(t, expectedLength, int64(len(read))) + + for ix := 0; ix < len(read); ix++ { + if byte(ix)+byte(offset) != read[ix] { + t.Fatalf("bytes differ at position %d", ix) + } + } +} + +type mockCache struct { + mu sync.Mutex + cache map[string][]byte +} + +func (m *mockCache) Store(_ context.Context, data map[string][]byte, _ time.Duration) { + m.mu.Lock() + defer m.mu.Unlock() + for key, val := range data { + m.cache[key] = val + } +} + +func (m *mockCache) Fetch(_ context.Context, keys []string) (found map[string][]byte, missing []string) { + m.mu.Lock() + defer m.mu.Unlock() + + found = make(map[string][]byte) + + for _, k := range keys { + v, ok := m.cache[k] + if ok { + found[k] = v + } else { + missing = append(missing, k) + } + } + + return +} + +func TestMergeRanges(t *testing.T) { + for ix, tc := range []struct { + input []rng + limit int64 + expected []rng + }{ + { + input: nil, + limit: 0, + expected: nil, + }, + + { + input: []rng{{start: 0, end: 100}, {start: 100, end: 200}}, + limit: 0, + expected: []rng{{start: 0, end: 200}}, + }, + + { + input: []rng{{start: 0, end: 100}, {start: 500, end: 1000}}, + limit: 300, + expected: []rng{{start: 0, end: 100}, {start: 500, end: 1000}}, + }, + { + input: []rng{{start: 0, end: 100}, {start: 500, end: 1000}}, + limit: 400, + expected: []rng{{start: 0, end: 1000}}, + }, + } { + t.Run(fmt.Sprintf("%d", ix), func(t *testing.T) { + testutil.Equals(t, tc.expected, mergeRanges(tc.input, tc.limit)) + }) + } +} + +type counter struct { + mu sync.Mutex + count float64 +} + +func (c *counter) Desc() *prometheus.Desc { return nil } +func (c *counter) Write(_ *dto.Metric) error { return nil } +func (c *counter) Describe(_ chan<- *prometheus.Desc) {} +func (c *counter) Collect(_ chan<- prometheus.Metric) {} +func (c *counter) Inc() { + c.mu.Lock() + c.count++ + c.mu.Unlock() +} +func (c *counter) Add(f float64) { + c.mu.Lock() + c.count += f + c.mu.Unlock() +} +func (c *counter) Get() float64 { + c.mu.Lock() + defer c.mu.Unlock() + return c.count +} From c45f271b921474bf181cc5f6b2bb32ccfe857587 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Thu, 30 Apr 2020 13:42:05 +0200 Subject: [PATCH 04/34] Fix sentences MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/cache/caching_bucket.go | 4 ++-- pkg/store/cache/caching_bucket_test.go | 15 ++++++++------- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/pkg/store/cache/caching_bucket.go b/pkg/store/cache/caching_bucket.go index 7b544945ad..b76a134036 100644 --- a/pkg/store/cache/caching_bucket.go +++ b/pkg/store/cache/caching_bucket.go @@ -37,7 +37,7 @@ type ChunksCachingConfig struct { func DefaultChunksCachingConfig() ChunksCachingConfig { return ChunksCachingConfig{ - ChunkBlockSize: 16000, // equal to max chunk size + ChunkBlockSize: 16000, // Equal to max chunk size. ChunkObjectSizeTTL: 24 * time.Hour, ChunkBlockTTL: 24 * time.Hour, MaxChunksGetRangeRequests: 1, @@ -99,7 +99,7 @@ func (cb *CachingBucket) Name() string { func (cb *CachingBucket) WithExpectedErrs(expectedFunc objstore.IsOpFailureExpectedFunc) objstore.Bucket { if ib, ok := cb.bucket.(objstore.InstrumentedBucket); ok { - // make a copy, but replace bucket + // Make a copy, but replace bucket with instrumented one. res := &CachingBucket{} *res = *cb res.bucket = ib.WithExpectedErrs(expectedFunc) diff --git a/pkg/store/cache/caching_bucket_test.go b/pkg/store/cache/caching_bucket_test.go index 68dbd8ad0c..1834a8db10 100644 --- a/pkg/store/cache/caching_bucket_test.go +++ b/pkg/store/cache/caching_bucket_test.go @@ -21,7 +21,7 @@ import ( func TestCachingBucket(t *testing.T) { length := int64(1024 * 1024) - blockSize := int64(16000) // all tests are based on this value + blockSize := int64(16000) // All tests are based on this value. data := make([]byte, length) for ix := 0; ix < len(data); ix++ { @@ -71,7 +71,7 @@ func TestCachingBucket(t *testing.T) { offset: length - 10, length: 3000, expectedLength: 10, - expectedFetchedBytes: 8576, // last block is fetched + expectedFetchedBytes: 8576, // Last (incomplete) block is fetched. }, { @@ -87,7 +87,7 @@ func TestCachingBucket(t *testing.T) { offset: 0, length: length, expectedLength: length, - expectedCachedBytes: 5*blockSize + 8576, // 5 block cached from first test, plus last incomplete block + expectedCachedBytes: 5*blockSize + 8576, // 5 block cached from first test, plus last incomplete block. expectedFetchedBytes: 60 * blockSize, }, @@ -96,7 +96,7 @@ func TestCachingBucket(t *testing.T) { offset: 0, length: length, expectedLength: length, - expectedCachedBytes: length, // entire file is now cached + expectedCachedBytes: length, // Entire file is now cached. }, { @@ -104,7 +104,8 @@ func TestCachingBucket(t *testing.T) { offset: 0, length: length, expectedLength: length, - expectedFetchedBytes: length, // cache is flushed + expectedFetchedBytes: length, + expectedCachedBytes: 0, // Cache is flushed. init: func() { cache.cache = map[string][]byte{} // flush cache }, @@ -118,7 +119,7 @@ func TestCachingBucket(t *testing.T) { expectedFetchedBytes: 3 * blockSize, expectedCachedBytes: 7 * blockSize, init: func() { - // delete first 3 blocks + // Delete first 3 blocks. delete(cache.cache, cachingKeyObjectBlock(name, 0*blockSize, 1*blockSize)) delete(cache.cache, cachingKeyObjectBlock(name, 1*blockSize, 2*blockSize)) delete(cache.cache, cachingKeyObjectBlock(name, 2*blockSize, 3*blockSize)) @@ -133,7 +134,7 @@ func TestCachingBucket(t *testing.T) { expectedFetchedBytes: 3 * blockSize, expectedCachedBytes: 7 * blockSize, init: func() { - // delete last 3 blocks + // Delete last 3 blocks. delete(cache.cache, cachingKeyObjectBlock(name, 7*blockSize, 8*blockSize)) delete(cache.cache, cachingKeyObjectBlock(name, 8*blockSize, 9*blockSize)) delete(cache.cache, cachingKeyObjectBlock(name, 9*blockSize, 10*blockSize)) From 4739c128acbdedd3379fe5cd22b7ea8a78bc0f62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Thu, 30 Apr 2020 13:54:36 +0200 Subject: [PATCH 05/34] Fix sentences MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/cache/caching_bucket_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/store/cache/caching_bucket_test.go b/pkg/store/cache/caching_bucket_test.go index 1834a8db10..47a8378647 100644 --- a/pkg/store/cache/caching_bucket_test.go +++ b/pkg/store/cache/caching_bucket_test.go @@ -107,7 +107,7 @@ func TestCachingBucket(t *testing.T) { expectedFetchedBytes: length, expectedCachedBytes: 0, // Cache is flushed. init: func() { - cache.cache = map[string][]byte{} // flush cache + cache.cache = map[string][]byte{} // Flush cache. }, }, From f4c4783525c35475e36d23781c95c942806ddaf1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Thu, 30 Apr 2020 14:01:44 +0200 Subject: [PATCH 06/34] Fix sentences MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/cache/cache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 804a966e9d..2c7aebe3ea 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -8,7 +8,7 @@ import ( "time" ) -// Generic cache +// Generic best-effort cache. type Cache interface { // Store data into the cache. // From 1f6b4155b320063605c0fdb4b6f6fc9fb3af083b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Fri, 1 May 2020 12:52:06 +0200 Subject: [PATCH 07/34] Rename config objects. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/cache/caching_bucket.go | 44 +++++++++++------------ pkg/store/cache/caching_bucket_factory.go | 16 ++++----- pkg/store/cache/caching_bucket_test.go | 6 ++-- 3 files changed, 33 insertions(+), 33 deletions(-) diff --git a/pkg/store/cache/caching_bucket.go b/pkg/store/cache/caching_bucket.go index b76a134036..85db3dc689 100644 --- a/pkg/store/cache/caching_bucket.go +++ b/pkg/store/cache/caching_bucket.go @@ -23,7 +23,7 @@ import ( "github.com/thanos-io/thanos/pkg/runutil" ) -type ChunksCachingConfig struct { +type CachingBucketConfig struct { // Basic unit used to cache chunks. ChunkBlockSize int64 `yaml:"chunk_block_size"` @@ -35,8 +35,8 @@ type ChunksCachingConfig struct { ChunkBlockTTL time.Duration `yaml:"chunk_block_ttl"` } -func DefaultChunksCachingConfig() ChunksCachingConfig { - return ChunksCachingConfig{ +func DefaultCachingBucketConfig() CachingBucketConfig { + return CachingBucketConfig{ ChunkBlockSize: 16000, // Equal to max chunk size. ChunkObjectSizeTTL: 24 * time.Hour, ChunkBlockTTL: 24 * time.Hour, @@ -49,14 +49,14 @@ type CachingBucket struct { bucket objstore.Bucket cache cache.Cache - chunks ChunksCachingConfig + config CachingBucketConfig logger log.Logger cachedChunkBytes prometheus.Counter fetchedChunkBytes prometheus.Counter } -func NewCachingBucket(b objstore.Bucket, c cache.Cache, chunks ChunksCachingConfig, logger log.Logger, reg prometheus.Registerer) (*CachingBucket, error) { +func NewCachingBucket(b objstore.Bucket, c cache.Cache, chunks CachingBucketConfig, logger log.Logger, reg prometheus.Registerer) (*CachingBucket, error) { if b == nil { return nil, errors.New("bucket is nil") } @@ -65,7 +65,7 @@ func NewCachingBucket(b objstore.Bucket, c cache.Cache, chunks ChunksCachingConf } return &CachingBucket{ - chunks: chunks, + config: chunks, bucket: b, cache: c, logger: logger, @@ -169,7 +169,7 @@ func (cb *CachingBucket) cachedObjectSize(ctx context.Context, name string, ttl } func (cb *CachingBucket) getRangeChunkFile(ctx context.Context, name string, offset, length int64) (io.ReadCloser, error) { - size, err := cb.cachedObjectSize(ctx, name, cb.chunks.ChunkObjectSizeTTL) + size, err := cb.cachedObjectSize(ctx, name, cb.config.ChunkObjectSizeTTL) if err != nil { return nil, errors.Wrapf(err, "failed to get size of chunk file: %s", name) } @@ -179,20 +179,20 @@ func (cb *CachingBucket) getRangeChunkFile(ctx context.Context, name string, off length = int64(size - uint64(offset)) } - startOffset := (offset / cb.chunks.ChunkBlockSize) * cb.chunks.ChunkBlockSize - endOffset := ((offset + length) / cb.chunks.ChunkBlockSize) * cb.chunks.ChunkBlockSize - if (offset+length)%cb.chunks.ChunkBlockSize > 0 { - endOffset += cb.chunks.ChunkBlockSize + startOffset := (offset / cb.config.ChunkBlockSize) * cb.config.ChunkBlockSize + endOffset := ((offset + length) / cb.config.ChunkBlockSize) * cb.config.ChunkBlockSize + if (offset+length)%cb.config.ChunkBlockSize > 0 { + endOffset += cb.config.ChunkBlockSize } - blocks := (endOffset - startOffset) / cb.chunks.ChunkBlockSize + blocks := (endOffset - startOffset) / cb.config.ChunkBlockSize offsetKeys := make(map[int64]string, blocks) keys := make([]string, 0, blocks) lastBlockLength := 0 - for o := startOffset; o < endOffset; o += cb.chunks.ChunkBlockSize { - end := o + cb.chunks.ChunkBlockSize + for o := startOffset; o < endOffset; o += cb.config.ChunkBlockSize { + end := o + cb.config.ChunkBlockSize if end > int64(size) { end = int64(size) } @@ -222,7 +222,7 @@ func (cb *CachingBucket) getRangeChunkFile(ctx context.Context, name string, off } } - return newBlocksReader(cb.chunks.ChunkBlockSize, offsetKeys, hits, offset, length), nil + return newBlocksReader(cb.config.ChunkBlockSize, offsetKeys, hits, offset, length), nil } type rng struct { @@ -234,15 +234,15 @@ func (cb *CachingBucket) fetchMissingChunkBlocks(ctx context.Context, name strin // Ordered list of missing sub-ranges. var missing []rng - for off := startOffset; off < endOffset; off += cb.chunks.ChunkBlockSize { + for off := startOffset; off < endOffset; off += cb.config.ChunkBlockSize { if hits[cacheKeys[off]] == nil { - missing = append(missing, rng{start: off, end: off + cb.chunks.ChunkBlockSize}) + missing = append(missing, rng{start: off, end: off + cb.config.ChunkBlockSize}) } } missing = mergeRanges(missing, 0) // Merge adjacent ranges. // Keep merging until we have only max number of ranges (= requests). - for limit := cb.chunks.ChunkBlockSize; cb.chunks.MaxChunksGetRangeRequests > 0 && len(missing) > cb.chunks.MaxChunksGetRangeRequests; limit = limit * 2 { + for limit := cb.config.ChunkBlockSize; cb.config.MaxChunksGetRangeRequests > 0 && len(missing) > cb.config.MaxChunksGetRangeRequests; limit = limit * 2 { missing = mergeRanges(missing, limit) } @@ -259,16 +259,16 @@ func (cb *CachingBucket) fetchMissingChunkBlocks(ctx context.Context, name strin } defer runutil.CloseWithLogOnErr(cb.logger, r, "fetching range [%d, %d]", m.start, m.end) - for off := m.start; off < m.end && nctx.Err() == nil; off += cb.chunks.ChunkBlockSize { + for off := m.start; off < m.end && nctx.Err() == nil; off += cb.config.ChunkBlockSize { key := cacheKeys[off] if key == "" { return errors.Errorf("fetching range [%d, %d]: caching key for offset %d not found", m.start, m.end, off) } // We need a new buffer for each block, both for storing into hits, and also for caching. - blockData := make([]byte, cb.chunks.ChunkBlockSize) + blockData := make([]byte, cb.config.ChunkBlockSize) n, err := io.ReadFull(r, blockData) - if err == io.ErrUnexpectedEOF && (off+cb.chunks.ChunkBlockSize) == endOffset && n == lastBlockLength { + if err == io.ErrUnexpectedEOF && (off+cb.config.ChunkBlockSize) == endOffset && n == lastBlockLength { // Last block can be shorter. err = nil blockData = blockData[:n] @@ -282,7 +282,7 @@ func (cb *CachingBucket) fetchMissingChunkBlocks(ctx context.Context, name strin hitsMutex.Unlock() cb.fetchedChunkBytes.Add(float64(len(blockData))) - cb.cache.Store(nctx, map[string][]byte{key: blockData}, cb.chunks.ChunkBlockTTL) + cb.cache.Store(nctx, map[string][]byte{key: blockData}, cb.config.ChunkBlockTTL) } return nctx.Err() diff --git a/pkg/store/cache/caching_bucket_factory.go b/pkg/store/cache/caching_bucket_factory.go index d9f1840af6..449e0d6c0e 100644 --- a/pkg/store/cache/caching_bucket_factory.go +++ b/pkg/store/cache/caching_bucket_factory.go @@ -19,24 +19,24 @@ type BucketCacheProvider string const MemcachedBucketCacheProvider BucketCacheProvider = "memcached" -type CachingBucketConfig struct { - Type BucketCacheProvider `yaml:"backend"` - CacheConfig interface{} `yaml:"backend_config"` +type CachingBucketWithBackendConfig struct { + Type BucketCacheProvider `yaml:"backend"` + BackendConfig interface{} `yaml:"backend_config"` - ChunksCachingConfig ChunksCachingConfig `yaml:"chunks_caching_config"` + CachingBucketConfig CachingBucketConfig `yaml:"caching_config"` } func NewCachingBucketFromYaml(yamlContent []byte, bucket objstore.Bucket, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) { level.Info(logger).Log("msg", "loading caching bucket configuration") - config := &CachingBucketConfig{} - config.ChunksCachingConfig = DefaultChunksCachingConfig() + config := &CachingBucketWithBackendConfig{} + config.CachingBucketConfig = DefaultCachingBucketConfig() if err := yaml.UnmarshalStrict(yamlContent, config); err != nil { return nil, errors.Wrap(err, "parsing config YAML file") } - backendConfig, err := yaml.Marshal(config.CacheConfig) + backendConfig, err := yaml.Marshal(config.BackendConfig) if err != nil { return nil, errors.Wrap(err, "marshal content of cache backend configuration") } @@ -55,5 +55,5 @@ func NewCachingBucketFromYaml(yamlContent []byte, bucket objstore.Bucket, logger return nil, errors.Errorf("unsupported cache type: %s", config.Type) } - return NewCachingBucket(bucket, c, config.ChunksCachingConfig, logger, reg) + return NewCachingBucket(bucket, c, config.CachingBucketConfig, logger, reg) } diff --git a/pkg/store/cache/caching_bucket_test.go b/pkg/store/cache/caching_bucket_test.go index 47a8378647..008c2269f8 100644 --- a/pkg/store/cache/caching_bucket_test.go +++ b/pkg/store/cache/caching_bucket_test.go @@ -35,9 +35,9 @@ func TestCachingBucket(t *testing.T) { cache := &mockCache{cache: make(map[string][]byte)} - cachingBucket, err := NewCachingBucket(inmem, cache, DefaultChunksCachingConfig(), nil, nil) + cachingBucket, err := NewCachingBucket(inmem, cache, DefaultCachingBucketConfig(), nil, nil) testutil.Ok(t, err) - cachingBucket.chunks.ChunkBlockSize = blockSize + cachingBucket.config.ChunkBlockSize = blockSize // Warning, these tests must be run in order, they depend cache state from previous test. for _, tc := range []struct { @@ -219,7 +219,7 @@ func TestCachingBucket(t *testing.T) { cachedBytes, fetchedBytes := &counter{}, &counter{} cachingBucket.cachedChunkBytes = cachedBytes cachingBucket.fetchedChunkBytes = fetchedBytes - cachingBucket.chunks.MaxChunksGetRangeRequests = tc.maxGetRangeRequests + cachingBucket.config.MaxChunksGetRangeRequests = tc.maxGetRangeRequests verifyGetRange(t, cachingBucket, name, tc.offset, tc.length, tc.expectedLength) testutil.Equals(t, tc.expectedCachedBytes, int64(cachedBytes.Get())) From 616891e520de5ccee19fa1c6c06f4586d3e2ff61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 4 May 2020 15:06:20 +0200 Subject: [PATCH 08/34] Review feedback. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- cmd/thanos/store.go | 3 +- pkg/cache/cache.go | 5 +- pkg/cache/memcached.go | 45 +++++++++-------- pkg/cache/memcached_test.go | 2 +- pkg/store/cache/caching_bucket.go | 59 +++++++++++------------ pkg/store/cache/caching_bucket_factory.go | 4 +- pkg/store/cache/caching_bucket_test.go | 52 +++++--------------- 7 files changed, 68 insertions(+), 102 deletions(-) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 76bb73a4c1..fc294c888e 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -225,11 +225,10 @@ func runStore( return errors.Wrap(err, "get caching bucket configuration") } if len(cachingBucketConfigYaml) > 0 { - newbkt, err := storecache.NewCachingBucketFromYaml(cachingBucketConfigYaml, bkt, logger, reg) + bkt, err = storecache.NewCachingBucketFromYaml(cachingBucketConfigYaml, bkt, logger, reg) if err != nil { return errors.Wrap(err, "create caching bucket") } - bkt = newbkt } relabelContentYaml, err := selectorRelabelConf.Content() diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 2c7aebe3ea..acaa0e159d 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -15,6 +15,7 @@ type Cache interface { // Note that individual byte buffers may be retained by the cache! Store(ctx context.Context, data map[string][]byte, ttl time.Duration) - // Fetch multiple keys from cache. - Fetch(ctx context.Context, keys []string) (found map[string][]byte, missing []string) + // Fetch multiple keys from cache. Returns map of input keys to data. + // If key isn't in the map, data for given key was not found. + Fetch(ctx context.Context, keys []string) map[string][]byte } diff --git a/pkg/cache/memcached.go b/pkg/cache/memcached.go index 2da9b5bf18..04c249576e 100644 --- a/pkg/cache/memcached.go +++ b/pkg/cache/memcached.go @@ -26,20 +26,22 @@ type MemcachedCache struct { } // NewMemcachedCache makes a new MemcachedCache. -func NewMemcachedCache(logger log.Logger, memcached cacheutil.MemcachedClient, reg prometheus.Registerer) *MemcachedCache { +func NewMemcachedCache(name string, logger log.Logger, memcached cacheutil.MemcachedClient, reg prometheus.Registerer) *MemcachedCache { c := &MemcachedCache{ logger: logger, memcached: memcached, } c.requests = promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_cache_memcached_requests_total", - Help: "Total number of items requests to the memcached.", + Name: "thanos_cache_memcached_requests_total", + Help: "Total number of items requests to memcached.", + ConstLabels: prometheus.Labels{"name": name}, }) c.hits = promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_cache_memcached_hits_total", - Help: "Total number of items requests to the cache that were a hit.", + Name: "thanos_cache_memcached_hits_total", + Help: "Total number of items requests to the cache that were a hit.", + ConstLabels: prometheus.Labels{"name": name}, }) level.Info(logger).Log("msg", "created memcached cache") @@ -51,34 +53,31 @@ func NewMemcachedCache(logger log.Logger, memcached cacheutil.MemcachedClient, r // The function enqueues the request and returns immediately: the entry will be // asynchronously stored in the cache. func (c *MemcachedCache) Store(ctx context.Context, data map[string][]byte, ttl time.Duration) { + var ( + firstErr error + failed int + ) + for key, val := range data { if err := c.memcached.SetAsync(ctx, key, val, ttl); err != nil { - level.Error(c.logger).Log("msg", "failed to store data into memcached", "err", err) + failed++ + if firstErr == nil { + firstErr = err + } } } + + if firstErr != nil { + level.Warn(c.logger).Log("msg", "failed to store one or more items into memcached", "failed", failed, "firstErr", firstErr) + } } // Fetch fetches multiple keys and returns a map containing cache hits, along with a list of missing keys. // In case of error, it logs and return an empty cache hits map. -func (c *MemcachedCache) Fetch(ctx context.Context, keys []string) (map[string][]byte, []string) { +func (c *MemcachedCache) Fetch(ctx context.Context, keys []string) map[string][]byte { // Fetch the keys from memcached in a single request. c.requests.Add(float64(len(keys))) results := c.memcached.GetMulti(ctx, keys) - if len(results) == 0 { - return nil, keys - } - - // Construct the resulting hits map and list of missing keys. We iterate on the input - // list of labels to be able to easily create the list of ones in a single iteration. - var misses []string - for _, key := range keys { - _, ok := results[key] - if !ok { - misses = append(misses, key) - continue - } - } - c.hits.Add(float64(len(results))) - return results, misses + return results } diff --git a/pkg/cache/memcached_test.go b/pkg/cache/memcached_test.go index 11720d6a37..27dbc6c240 100644 --- a/pkg/cache/memcached_test.go +++ b/pkg/cache/memcached_test.go @@ -76,7 +76,7 @@ func TestMemcachedIndexCache(t *testing.T) { for testName, testData := range tests { t.Run(testName, func(t *testing.T) { memcached := newMockedMemcachedClient(testData.mockedErr) - c := NewMemcachedCache(log.NewNopLogger(), memcached, nil) + c := NewMemcachedCache("test", log.NewNopLogger(), memcached, nil) // Store the postings expected before running the test. ctx := context.Background() diff --git a/pkg/store/cache/caching_bucket.go b/pkg/store/cache/caching_bucket.go index 85db3dc689..4c983bf7a7 100644 --- a/pkg/store/cache/caching_bucket.go +++ b/pkg/store/cache/caching_bucket.go @@ -40,7 +40,7 @@ func DefaultCachingBucketConfig() CachingBucketConfig { ChunkBlockSize: 16000, // Equal to max chunk size. ChunkObjectSizeTTL: 24 * time.Hour, ChunkBlockTTL: 24 * time.Hour, - MaxChunksGetRangeRequests: 1, + MaxChunksGetRangeRequests: 3, } } @@ -71,11 +71,11 @@ func NewCachingBucket(b objstore.Bucket, c cache.Cache, chunks CachingBucketConf logger: logger, cachedChunkBytes: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_store_caching_bucket_cached_chunk_bytes_total", + Name: "thanos_store_bucket_cache_cached_chunk_bytes_total", Help: "Total number of chunk bytes used from cache", }), fetchedChunkBytes: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_store_caching_bucket_fetched_chunk_bytes_total", + Name: "thanos_store_bucket_cache_fetched_chunk_bytes_total", Help: "Total number of chunk bytes fetched from storage", }), }, nil @@ -106,7 +106,6 @@ func (cb *CachingBucket) WithExpectedErrs(expectedFunc objstore.IsOpFailureExpec return res } - // nothing else to do (?) return cb } @@ -151,7 +150,7 @@ func (cb *CachingBucket) ObjectSize(ctx context.Context, name string) (uint64, e func (cb *CachingBucket) cachedObjectSize(ctx context.Context, name string, ttl time.Duration) (uint64, error) { key := cachingKeyObjectSize(name) - hits, _ := cb.cache.Fetch(ctx, []string{key}) + hits := cb.cache.Fetch(ctx, []string{key}) if s := hits[key]; len(s) == 8 { return binary.BigEndian.Uint64(s), nil } @@ -179,34 +178,32 @@ func (cb *CachingBucket) getRangeChunkFile(ctx context.Context, name string, off length = int64(size - uint64(offset)) } - startOffset := (offset / cb.config.ChunkBlockSize) * cb.config.ChunkBlockSize - endOffset := ((offset + length) / cb.config.ChunkBlockSize) * cb.config.ChunkBlockSize + startRange := (offset / cb.config.ChunkBlockSize) * cb.config.ChunkBlockSize + endRange := ((offset + length) / cb.config.ChunkBlockSize) * cb.config.ChunkBlockSize + lastBlockLength := 0 if (offset+length)%cb.config.ChunkBlockSize > 0 { - endOffset += cb.config.ChunkBlockSize + lastBlockLength = int((offset + length) - endRange) + endRange += cb.config.ChunkBlockSize } - blocks := (endOffset - startOffset) / cb.config.ChunkBlockSize + numBlocks := (endRange - startRange) / cb.config.ChunkBlockSize - offsetKeys := make(map[int64]string, blocks) - keys := make([]string, 0, blocks) - lastBlockLength := 0 + offsetKeys := make(map[int64]string, numBlocks) + keys := make([]string, 0, numBlocks) - for o := startOffset; o < endOffset; o += cb.config.ChunkBlockSize { - end := o + cb.config.ChunkBlockSize + for off := startRange; off < endRange; off += cb.config.ChunkBlockSize { + end := off + cb.config.ChunkBlockSize if end > int64(size) { end = int64(size) } - lastBlockLength = int(end - o) - - k := cachingKeyObjectBlock(name, o, end) + k := cachingKeyObjectBlock(name, off, end) keys = append(keys, k) - offsetKeys[o] = k + offsetKeys[off] = k } // Try to get all blocks from the cache. - hits, _ := cb.cache.Fetch(ctx, keys) - + hits := cb.cache.Fetch(ctx, keys) for _, b := range hits { cb.cachedChunkBytes.Add(float64(len(b))) } @@ -216,7 +213,7 @@ func (cb *CachingBucket) getRangeChunkFile(ctx context.Context, name string, off hits = map[string][]byte{} } - err := cb.fetchMissingChunkBlocks(ctx, name, startOffset, endOffset, offsetKeys, hits, lastBlockLength) + err := cb.fetchMissingChunkBlocks(ctx, name, startRange, endRange, offsetKeys, hits, lastBlockLength) if err != nil { return nil, err } @@ -230,11 +227,11 @@ type rng struct { } // Fetches missing blocks and stores them into "hits" map. -func (cb *CachingBucket) fetchMissingChunkBlocks(ctx context.Context, name string, startOffset, endOffset int64, cacheKeys map[int64]string, hits map[string][]byte, lastBlockLength int) error { +func (cb *CachingBucket) fetchMissingChunkBlocks(ctx context.Context, name string, startRange, endRange int64, cacheKeys map[int64]string, hits map[string][]byte, lastBlockLength int) error { // Ordered list of missing sub-ranges. var missing []rng - for off := startOffset; off < endOffset; off += cb.config.ChunkBlockSize { + for off := startRange; off < endRange; off += cb.config.ChunkBlockSize { if hits[cacheKeys[off]] == nil { missing = append(missing, rng{start: off, end: off + cb.config.ChunkBlockSize}) } @@ -249,17 +246,17 @@ func (cb *CachingBucket) fetchMissingChunkBlocks(ctx context.Context, name strin var hitsMutex sync.Mutex // Run parallel queries for each missing range. Fetched data is stored into 'hits' map, protected by hitsMutex. - eg, nctx := errgroup.WithContext(ctx) + g, gctx := errgroup.WithContext(ctx) for _, m := range missing { m := m - eg.Go(func() error { - r, err := cb.bucket.GetRange(nctx, name, m.start, m.end-m.start) + g.Go(func() error { + r, err := cb.bucket.GetRange(gctx, name, m.start, m.end-m.start) if err != nil { return errors.Wrapf(err, "fetching range [%d, %d]", m.start, m.end) } defer runutil.CloseWithLogOnErr(cb.logger, r, "fetching range [%d, %d]", m.start, m.end) - for off := m.start; off < m.end && nctx.Err() == nil; off += cb.config.ChunkBlockSize { + for off := m.start; off < m.end && gctx.Err() == nil; off += cb.config.ChunkBlockSize { key := cacheKeys[off] if key == "" { return errors.Errorf("fetching range [%d, %d]: caching key for offset %d not found", m.start, m.end, off) @@ -268,7 +265,7 @@ func (cb *CachingBucket) fetchMissingChunkBlocks(ctx context.Context, name strin // We need a new buffer for each block, both for storing into hits, and also for caching. blockData := make([]byte, cb.config.ChunkBlockSize) n, err := io.ReadFull(r, blockData) - if err == io.ErrUnexpectedEOF && (off+cb.config.ChunkBlockSize) == endOffset && n == lastBlockLength { + if err == io.ErrUnexpectedEOF && (off+cb.config.ChunkBlockSize) == endRange && n == lastBlockLength { // Last block can be shorter. err = nil blockData = blockData[:n] @@ -282,14 +279,14 @@ func (cb *CachingBucket) fetchMissingChunkBlocks(ctx context.Context, name strin hitsMutex.Unlock() cb.fetchedChunkBytes.Add(float64(len(blockData))) - cb.cache.Store(nctx, map[string][]byte{key: blockData}, cb.config.ChunkBlockTTL) + cb.cache.Store(gctx, map[string][]byte{key: blockData}, cb.config.ChunkBlockTTL) } - return nctx.Err() + return gctx.Err() }) } - return eg.Wait() + return g.Wait() } // Merges ranges that are close to each other. Modifies input. diff --git a/pkg/store/cache/caching_bucket_factory.go b/pkg/store/cache/caching_bucket_factory.go index 449e0d6c0e..fbf4c1836b 100644 --- a/pkg/store/cache/caching_bucket_factory.go +++ b/pkg/store/cache/caching_bucket_factory.go @@ -46,11 +46,11 @@ func NewCachingBucketFromYaml(yamlContent []byte, bucket objstore.Bucket, logger switch config.Type { case MemcachedBucketCacheProvider: var memcached cacheutil.MemcachedClient - memcached, err := cacheutil.NewMemcachedClient(logger, "cache-bucket", backendConfig, reg) + memcached, err := cacheutil.NewMemcachedClient(logger, "caching-bucket", backendConfig, reg) if err != nil { return nil, errors.Wrapf(err, "failed to create memcached client") } - c = cache.NewMemcachedCache(logger, memcached, reg) + c = cache.NewMemcachedCache("caching-bucket", logger, memcached, reg) default: return nil, errors.Errorf("unsupported cache type: %s", config.Type) } diff --git a/pkg/store/cache/caching_bucket_test.go b/pkg/store/cache/caching_bucket_test.go index 008c2269f8..6b0cb64329 100644 --- a/pkg/store/cache/caching_bucket_test.go +++ b/pkg/store/cache/caching_bucket_test.go @@ -12,8 +12,7 @@ import ( "testing" "time" - "github.com/prometheus/client_golang/prometheus" - dto "github.com/prometheus/client_model/go" + promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/testutil" @@ -33,12 +32,9 @@ func TestCachingBucket(t *testing.T) { inmem := objstore.NewInMemBucket() testutil.Ok(t, inmem.Upload(context.Background(), name, bytes.NewReader(data))) + // We reuse cache between tests (!) cache := &mockCache{cache: make(map[string][]byte)} - cachingBucket, err := NewCachingBucket(inmem, cache, DefaultCachingBucketConfig(), nil, nil) - testutil.Ok(t, err) - cachingBucket.config.ChunkBlockSize = blockSize - // Warning, these tests must be run in order, they depend cache state from previous test. for _, tc := range []struct { name string @@ -216,14 +212,15 @@ func TestCachingBucket(t *testing.T) { if tc.init != nil { tc.init() } - cachedBytes, fetchedBytes := &counter{}, &counter{} - cachingBucket.cachedChunkBytes = cachedBytes - cachingBucket.fetchedChunkBytes = fetchedBytes + + cachingBucket, err := NewCachingBucket(inmem, cache, DefaultCachingBucketConfig(), nil, nil) + testutil.Ok(t, err) + cachingBucket.config.ChunkBlockSize = blockSize cachingBucket.config.MaxChunksGetRangeRequests = tc.maxGetRangeRequests verifyGetRange(t, cachingBucket, name, tc.offset, tc.length, tc.expectedLength) - testutil.Equals(t, tc.expectedCachedBytes, int64(cachedBytes.Get())) - testutil.Equals(t, tc.expectedFetchedBytes, int64(fetchedBytes.Get())) + testutil.Equals(t, tc.expectedCachedBytes, int64(promtest.ToFloat64(cachingBucket.cachedChunkBytes))) + testutil.Equals(t, tc.expectedFetchedBytes, int64(promtest.ToFloat64(cachingBucket.fetchedChunkBytes))) }) } } @@ -258,22 +255,20 @@ func (m *mockCache) Store(_ context.Context, data map[string][]byte, _ time.Dura } } -func (m *mockCache) Fetch(_ context.Context, keys []string) (found map[string][]byte, missing []string) { +func (m *mockCache) Fetch(_ context.Context, keys []string) map[string][]byte { m.mu.Lock() defer m.mu.Unlock() - found = make(map[string][]byte) + found := make(map[string][]byte, len(keys)) for _, k := range keys { v, ok := m.cache[k] if ok { found[k] = v - } else { - missing = append(missing, k) } } - return + return found } func TestMergeRanges(t *testing.T) { @@ -310,28 +305,3 @@ func TestMergeRanges(t *testing.T) { }) } } - -type counter struct { - mu sync.Mutex - count float64 -} - -func (c *counter) Desc() *prometheus.Desc { return nil } -func (c *counter) Write(_ *dto.Metric) error { return nil } -func (c *counter) Describe(_ chan<- *prometheus.Desc) {} -func (c *counter) Collect(_ chan<- prometheus.Metric) {} -func (c *counter) Inc() { - c.mu.Lock() - c.count++ - c.mu.Unlock() -} -func (c *counter) Add(f float64) { - c.mu.Lock() - c.count += f - c.mu.Unlock() -} -func (c *counter) Get() float64 { - c.mu.Lock() - defer c.mu.Unlock() - return c.count -} From e0ffb008cf7b5930fda8cd73005a96ba73e0d979 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 4 May 2020 16:08:20 +0200 Subject: [PATCH 09/34] Review feedback. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/cache/caching_bucket.go | 54 ++++++++++++------ pkg/store/cache/caching_bucket_test.go | 77 +++++++++++++++++++------- 2 files changed, 95 insertions(+), 36 deletions(-) diff --git a/pkg/store/cache/caching_bucket.go b/pkg/store/cache/caching_bucket.go index 4c983bf7a7..51108b2e16 100644 --- a/pkg/store/cache/caching_bucket.go +++ b/pkg/store/cache/caching_bucket.go @@ -51,9 +51,10 @@ type CachingBucket struct { config CachingBucketConfig - logger log.Logger - cachedChunkBytes prometheus.Counter - fetchedChunkBytes prometheus.Counter + logger log.Logger + cachedChunkBytes prometheus.Counter + fetchedChunkBytes prometheus.Counter + refetchedChunkBytes prometheus.Counter } func NewCachingBucket(b objstore.Bucket, c cache.Cache, chunks CachingBucketConfig, logger log.Logger, reg prometheus.Registerer) (*CachingBucket, error) { @@ -78,6 +79,10 @@ func NewCachingBucket(b objstore.Bucket, c cache.Cache, chunks CachingBucketConf Name: "thanos_store_bucket_cache_fetched_chunk_bytes_total", Help: "Total number of chunk bytes fetched from storage", }), + refetchedChunkBytes: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_store_bucket_cache_refetched_chunk_bytes_total", + Help: "Total number of chunk bytes re-fetched from storage, despite being in cache already.", + }), }, nil } @@ -178,14 +183,21 @@ func (cb *CachingBucket) getRangeChunkFile(ctx context.Context, name string, off length = int64(size - uint64(offset)) } + // Start and end range are block-aligned offsets into object, that we're going to read. startRange := (offset / cb.config.ChunkBlockSize) * cb.config.ChunkBlockSize endRange := ((offset + length) / cb.config.ChunkBlockSize) * cb.config.ChunkBlockSize - lastBlockLength := 0 if (offset+length)%cb.config.ChunkBlockSize > 0 { - lastBlockLength = int((offset + length) - endRange) endRange += cb.config.ChunkBlockSize } + // The very last block in the object may have length that is not divisible by block size. + lastBlockOffset := int64(-1) + lastBlockLength := int(cb.config.ChunkBlockSize) + if uint64(endRange) > size { + lastBlockOffset = (int64(size) / cb.config.ChunkBlockSize) * cb.config.ChunkBlockSize + lastBlockLength = int(int64(size) - lastBlockOffset) + } + numBlocks := (endRange - startRange) / cb.config.ChunkBlockSize offsetKeys := make(map[int64]string, numBlocks) @@ -213,7 +225,7 @@ func (cb *CachingBucket) getRangeChunkFile(ctx context.Context, name string, off hits = map[string][]byte{} } - err := cb.fetchMissingChunkBlocks(ctx, name, startRange, endRange, offsetKeys, hits, lastBlockLength) + err := cb.fetchMissingChunkBlocks(ctx, name, startRange, endRange, offsetKeys, hits, lastBlockOffset, lastBlockLength) if err != nil { return nil, err } @@ -227,7 +239,7 @@ type rng struct { } // Fetches missing blocks and stores them into "hits" map. -func (cb *CachingBucket) fetchMissingChunkBlocks(ctx context.Context, name string, startRange, endRange int64, cacheKeys map[int64]string, hits map[string][]byte, lastBlockLength int) error { +func (cb *CachingBucket) fetchMissingChunkBlocks(ctx context.Context, name string, startRange, endRange int64, cacheKeys map[int64]string, hits map[string][]byte, lastBlockOffset int64, lastBlockLength int) error { // Ordered list of missing sub-ranges. var missing []rng @@ -263,23 +275,33 @@ func (cb *CachingBucket) fetchMissingChunkBlocks(ctx context.Context, name strin } // We need a new buffer for each block, both for storing into hits, and also for caching. - blockData := make([]byte, cb.config.ChunkBlockSize) - n, err := io.ReadFull(r, blockData) - if err == io.ErrUnexpectedEOF && (off+cb.config.ChunkBlockSize) == endRange && n == lastBlockLength { - // Last block can be shorter. - err = nil - blockData = blockData[:n] + var blockData []byte + if off == lastBlockOffset { + // The very last block in the object may have different length, + // if object length isn't divisible by block size. + blockData = make([]byte, lastBlockLength) + } else { + blockData = make([]byte, cb.config.ChunkBlockSize) } + _, err := io.ReadFull(r, blockData) if err != nil { return errors.Wrapf(err, "fetching range [%d, %d]", m.start, m.end) } + storeToCache := false hitsMutex.Lock() - hits[key] = blockData + if _, ok := hits[key]; !ok { + storeToCache = true + hits[key] = blockData + } hitsMutex.Unlock() - cb.fetchedChunkBytes.Add(float64(len(blockData))) - cb.cache.Store(gctx, map[string][]byte{key: blockData}, cb.config.ChunkBlockTTL) + if storeToCache { + cb.fetchedChunkBytes.Add(float64(len(blockData))) + cb.cache.Store(gctx, map[string][]byte{key: blockData}, cb.config.ChunkBlockTTL) + } else { + cb.refetchedChunkBytes.Add(float64(len(blockData))) + } } return gctx.Err() diff --git a/pkg/store/cache/caching_bucket_test.go b/pkg/store/cache/caching_bucket_test.go index 6b0cb64329..c7317e1091 100644 --- a/pkg/store/cache/caching_bucket_test.go +++ b/pkg/store/cache/caching_bucket_test.go @@ -7,11 +7,13 @@ import ( "bytes" "context" "fmt" + "io" "io/ioutil" "sync" "testing" "time" + "github.com/pkg/errors" promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/thanos-io/thanos/pkg/objstore" @@ -37,14 +39,15 @@ func TestCachingBucket(t *testing.T) { // Warning, these tests must be run in order, they depend cache state from previous test. for _, tc := range []struct { - name string - init func() - offset int64 - length int64 - maxGetRangeRequests int - expectedLength int64 - expectedFetchedBytes int64 - expectedCachedBytes int64 + name string + init func() + offset int64 + length int64 + maxGetRangeRequests int + expectedLength int64 + expectedFetchedBytes int64 + expectedCachedBytes int64 + expectedRefetchedBytes int64 }{ { name: "basic test", @@ -171,13 +174,14 @@ func TestCachingBucket(t *testing.T) { }, { - name: "missing everything except middle blocks, one subrequest only", - offset: 0, - length: 10 * blockSize, - expectedLength: 10 * blockSize, - expectedFetchedBytes: 10 * blockSize, // We need to fetch beginning and end in single request. - expectedCachedBytes: 3 * blockSize, - maxGetRangeRequests: 1, + name: "missing everything except middle blocks, one subrequest only", + offset: 0, + length: 10 * blockSize, + expectedLength: 10 * blockSize, + expectedFetchedBytes: 7 * blockSize, + expectedCachedBytes: 3 * blockSize, + expectedRefetchedBytes: 3 * blockSize, // Entire object fetched, 3 blocks are "refetched". + maxGetRangeRequests: 1, init: func() { // Delete all but 3 blocks in the middle, but only allow 1 subrequest. for i := int64(0); i < 10; i++ { @@ -213,14 +217,17 @@ func TestCachingBucket(t *testing.T) { tc.init() } - cachingBucket, err := NewCachingBucket(inmem, cache, DefaultCachingBucketConfig(), nil, nil) + cfg := DefaultCachingBucketConfig() + cfg.ChunkBlockSize = blockSize + cfg.MaxChunksGetRangeRequests = tc.maxGetRangeRequests + + cachingBucket, err := NewCachingBucket(inmem, cache, cfg, nil, nil) testutil.Ok(t, err) - cachingBucket.config.ChunkBlockSize = blockSize - cachingBucket.config.MaxChunksGetRangeRequests = tc.maxGetRangeRequests verifyGetRange(t, cachingBucket, name, tc.offset, tc.length, tc.expectedLength) testutil.Equals(t, tc.expectedCachedBytes, int64(promtest.ToFloat64(cachingBucket.cachedChunkBytes))) testutil.Equals(t, tc.expectedFetchedBytes, int64(promtest.ToFloat64(cachingBucket.fetchedChunkBytes))) + testutil.Equals(t, tc.expectedRefetchedBytes, int64(promtest.ToFloat64(cachingBucket.refetchedChunkBytes))) }) } } @@ -284,9 +291,9 @@ func TestMergeRanges(t *testing.T) { }, { - input: []rng{{start: 0, end: 100}, {start: 100, end: 200}}, + input: []rng{{start: 0, end: 100}, {start: 100, end: 200}, {start: 500, end: 1000}}, limit: 0, - expected: []rng{{start: 0, end: 200}}, + expected: []rng{{start: 0, end: 200}, {start: 500, end: 1000}}, }, { @@ -305,3 +312,33 @@ func TestMergeRanges(t *testing.T) { }) } } + +func TestInvalidOffsetAndLength(t *testing.T) { + b := &testBucket{objstore.NewInMemBucket()} + c, err := NewCachingBucket(b, &mockCache{cache: make(map[string][]byte)}, DefaultCachingBucketConfig(), nil, nil) + testutil.Ok(t, err) + + r, err := c.GetRange(context.Background(), "test", -1, 1000) + testutil.Equals(t, nil, r) + testutil.NotOk(t, err) + + r, err = c.GetRange(context.Background(), "test", 100, -1) + testutil.Equals(t, nil, r) + testutil.NotOk(t, err) +} + +type testBucket struct { + *objstore.InMemBucket +} + +func (b *testBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { + if off < 0 { + return nil, errors.Errorf("invalid offset: %d", off) + } + + if length <= 0 { + return nil, errors.Errorf("invalid length: %d", length) + } + + return b.InMemBucket.GetRange(ctx, name, off, length) +} From 000b5c3330f177e6957b7d5bc88ee8d354118636 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 4 May 2020 16:20:46 +0200 Subject: [PATCH 10/34] Added metrics for object size. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/cache/caching_bucket.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/pkg/store/cache/caching_bucket.go b/pkg/store/cache/caching_bucket.go index 51108b2e16..90d888dd9b 100644 --- a/pkg/store/cache/caching_bucket.go +++ b/pkg/store/cache/caching_bucket.go @@ -55,6 +55,9 @@ type CachingBucket struct { cachedChunkBytes prometheus.Counter fetchedChunkBytes prometheus.Counter refetchedChunkBytes prometheus.Counter + + objectSizeRequests prometheus.Counter + objectSizeHits prometheus.Counter } func NewCachingBucket(b objstore.Bucket, c cache.Cache, chunks CachingBucketConfig, logger log.Logger, reg prometheus.Registerer) (*CachingBucket, error) { @@ -83,6 +86,14 @@ func NewCachingBucket(b objstore.Bucket, c cache.Cache, chunks CachingBucketConf Name: "thanos_store_bucket_cache_refetched_chunk_bytes_total", Help: "Total number of chunk bytes re-fetched from storage, despite being in cache already.", }), + objectSizeRequests: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_store_bucket_cache_objectsize_requests_total", + Help: "Number of object size requests for objects", + }), + objectSizeHits: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_store_bucket_cache_objectsize_hits_total", + Help: "Number of object size hits for objects", + }), }, nil } @@ -155,8 +166,11 @@ func (cb *CachingBucket) ObjectSize(ctx context.Context, name string) (uint64, e func (cb *CachingBucket) cachedObjectSize(ctx context.Context, name string, ttl time.Duration) (uint64, error) { key := cachingKeyObjectSize(name) + cb.objectSizeRequests.Add(1) + hits := cb.cache.Fetch(ctx, []string{key}) if s := hits[key]; len(s) == 8 { + cb.objectSizeHits.Add(1) return binary.BigEndian.Uint64(s), nil } From 7eb18db4fb50e3aa5e425065de60cc205e1bca73 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 4 May 2020 16:29:43 +0200 Subject: [PATCH 11/34] Added requested chunk bytes metric. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/cache/caching_bucket.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/pkg/store/cache/caching_bucket.go b/pkg/store/cache/caching_bucket.go index 90d888dd9b..956f27418b 100644 --- a/pkg/store/cache/caching_bucket.go +++ b/pkg/store/cache/caching_bucket.go @@ -51,7 +51,9 @@ type CachingBucket struct { config CachingBucketConfig - logger log.Logger + logger log.Logger + + requestedChunkBytes prometheus.Counter cachedChunkBytes prometheus.Counter fetchedChunkBytes prometheus.Counter refetchedChunkBytes prometheus.Counter @@ -74,6 +76,10 @@ func NewCachingBucket(b objstore.Bucket, c cache.Cache, chunks CachingBucketConf cache: c, logger: logger, + requestedChunkBytes: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_store_bucket_cache_requested_chunk_bytes_total", + Help: "Total number of requested bytes for chunk data ", + }), cachedChunkBytes: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_store_bucket_cache_cached_chunk_bytes_total", Help: "Total number of chunk bytes used from cache", @@ -187,6 +193,8 @@ func (cb *CachingBucket) cachedObjectSize(ctx context.Context, name string, ttl } func (cb *CachingBucket) getRangeChunkFile(ctx context.Context, name string, offset, length int64) (io.ReadCloser, error) { + cb.requestedChunkBytes.Add(float64(length)) + size, err := cb.cachedObjectSize(ctx, name, cb.config.ChunkObjectSizeTTL) if err != nil { return nil, errors.Wrapf(err, "failed to get size of chunk file: %s", name) From 2b42af443d49d742cb749053499dcad18ae2c9b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 4 May 2020 16:56:21 +0200 Subject: [PATCH 12/34] Caching bucket docs. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- cmd/thanos/store.go | 2 +- docs/components/store.md | 28 ++++++++++++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index fc294c888e..5ac4f8b640 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -56,7 +56,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { false) cachingBucketConfig := extflag.RegisterPathOrContent(cmd, "experimental.caching-bucket.config", - "YAML file that contains configuration for caching bucket. Experimental feature, with high risk of changes. See format details: https://thanos.io/components/store.md/#TBD", + "YAML file that contains configuration for caching bucket. Experimental feature, with high risk of changes. See format details: https://thanos.io/components/store.md/#caching-bucket", false) chunkPoolSize := cmd.Flag("chunk-pool-size", "Maximum size of concurrently allocatable bytes reserved strictly to reuse for chunks in memory."). diff --git a/docs/components/store.md b/docs/components/store.md index 20a919daaa..8cb3bec53b 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -232,6 +232,34 @@ While the remaining settings are **optional**: - `max_item_size`: maximum size of an item to be stored in memcached. This option should be set to the same value of memcached `-I` flag (defaults to 1MB) in order to avoid wasting network round trips to store items larger than the max item size allowed in memcached. If set to `0`, the item size is unlimited. - `dns_provider_update_interval`: the DNS discovery update interval. +## Caching Bucket + +Thanos Store Gateway supports a "caching bucket" with chunks caching to speed up loading of chunks from TSDB blocks. Currently only memcached "backend" is supported: + +```yaml +backend: memcached +backend_config: + addresses: + - localhost:11211 + +caching_config: + chunk_block_size: 16000 + max_chunks_get_range_requests: 3 + chunk_object_size_ttl: 24h + chunk_block_ttl: 24h +``` + +`backend_config` field for memcached supports all the same configuration as memcached for [index cache](#memcached-index-cache). + +`caching_config` is a configuration for chunks cache. + +- `chunk_block_size`: size of segment of chunks object that is stored to the cache. This is the smallest unit that chunks cache is working with. +- `max_chunks_get_range_requests`: how many "get range" sub-requests may cache perform to fetch missing blocks. +- `chunk_object_size_ttl`: how long to keep information about chunk file length in the cache. +- `chunk_block_ttl`: how long to keep individual blocks in the cache. + +Note that chunks cache is an experimental feature, and these fields may be renamed or removed completely in the future. + ## Index Header In order to query series inside blocks from object storage, Store Gateway has to know certain initial info about each block such as: From 1baa24cb85f793f9f39d6535f14c5092811f5b12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 4 May 2020 17:03:24 +0200 Subject: [PATCH 13/34] Fixed tests. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/cache/memcached_test.go | 32 +++++++++++++------------------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/pkg/cache/memcached_test.go b/pkg/cache/memcached_test.go index 27dbc6c240..7806594241 100644 --- a/pkg/cache/memcached_test.go +++ b/pkg/cache/memcached_test.go @@ -27,17 +27,15 @@ func TestMemcachedIndexCache(t *testing.T) { value3 := []byte{3} tests := map[string]struct { - setup map[string][]byte - mockedErr error - fetchKeys []string - expectedHits map[string][]byte - expectedMisses []string + setup map[string][]byte + mockedErr error + fetchKeys []string + expectedHits map[string][]byte }{ "should return no hits on empty cache": { - setup: nil, - fetchKeys: []string{key1, key2}, - expectedHits: nil, - expectedMisses: []string{key1, key2}, + setup: nil, + fetchKeys: []string{key1, key2}, + expectedHits: nil, }, "should return no misses on 100% hit ratio": { setup: map[string][]byte{ @@ -49,16 +47,14 @@ func TestMemcachedIndexCache(t *testing.T) { expectedHits: map[string][]byte{ key1: value1, }, - expectedMisses: nil, }, "should return hits and misses on partial hits": { setup: map[string][]byte{ key1: value1, key2: value2, }, - fetchKeys: []string{key1, key3}, - expectedHits: map[string][]byte{key1: value1}, - expectedMisses: []string{key3}, + fetchKeys: []string{key1, key3}, + expectedHits: map[string][]byte{key1: value1}, }, "should return no hits on memcached error": { setup: map[string][]byte{ @@ -66,10 +62,9 @@ func TestMemcachedIndexCache(t *testing.T) { key2: value2, key3: value3, }, - mockedErr: errors.New("mocked error"), - fetchKeys: []string{key1}, - expectedHits: nil, - expectedMisses: []string{key1}, + mockedErr: errors.New("mocked error"), + fetchKeys: []string{key1}, + expectedHits: nil, }, } @@ -83,9 +78,8 @@ func TestMemcachedIndexCache(t *testing.T) { c.Store(ctx, testData.setup, time.Hour) // Fetch postings from cached and assert on it. - hits, misses := c.Fetch(ctx, testData.fetchKeys) + hits := c.Fetch(ctx, testData.fetchKeys) testutil.Equals(t, testData.expectedHits, hits) - testutil.Equals(t, testData.expectedMisses, misses) // Assert on metrics. testutil.Equals(t, float64(len(testData.fetchKeys)), prom_testutil.ToFloat64(c.requests)) From 801bc6dd7f182ca6d07708baef310e504e332ed0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 4 May 2020 17:51:09 +0200 Subject: [PATCH 14/34] Fix test. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/cache/memcached_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/cache/memcached_test.go b/pkg/cache/memcached_test.go index 7806594241..b5f6bcf83c 100644 --- a/pkg/cache/memcached_test.go +++ b/pkg/cache/memcached_test.go @@ -35,7 +35,7 @@ func TestMemcachedIndexCache(t *testing.T) { "should return no hits on empty cache": { setup: nil, fetchKeys: []string{key1, key2}, - expectedHits: nil, + expectedHits: map[string][]byte{}, }, "should return no misses on 100% hit ratio": { setup: map[string][]byte{ From 2997437126497c395a868ad6076de8224bcd063a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Tue, 5 May 2020 08:43:56 +0200 Subject: [PATCH 15/34] Update docs/components/store.md Update pkg/store/cache/caching_bucket.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Marco Pracucci Signed-off-by: Peter Štibraný --- docs/components/store.md | 2 +- pkg/store/cache/caching_bucket.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/components/store.md b/docs/components/store.md index 8cb3bec53b..db19ff92fa 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -251,7 +251,7 @@ caching_config: `backend_config` field for memcached supports all the same configuration as memcached for [index cache](#memcached-index-cache). -`caching_config` is a configuration for chunks cache. +`caching_config` is a configuration for chunks cache and supports the following optional settings: - `chunk_block_size`: size of segment of chunks object that is stored to the cache. This is the smallest unit that chunks cache is working with. - `max_chunks_get_range_requests`: how many "get range" sub-requests may cache perform to fetch missing blocks. diff --git a/pkg/store/cache/caching_bucket.go b/pkg/store/cache/caching_bucket.go index 956f27418b..78a59d81be 100644 --- a/pkg/store/cache/caching_bucket.go +++ b/pkg/store/cache/caching_bucket.go @@ -78,7 +78,7 @@ func NewCachingBucket(b objstore.Bucket, c cache.Cache, chunks CachingBucketConf requestedChunkBytes: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_store_bucket_cache_requested_chunk_bytes_total", - Help: "Total number of requested bytes for chunk data ", + Help: "Total number of requested bytes for chunk data", }), cachedChunkBytes: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_store_bucket_cache_cached_chunk_bytes_total", @@ -86,7 +86,7 @@ func NewCachingBucket(b objstore.Bucket, c cache.Cache, chunks CachingBucketConf }), fetchedChunkBytes: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_store_bucket_cache_fetched_chunk_bytes_total", - Help: "Total number of chunk bytes fetched from storage", + Help: "Total number of chunk bytes fetched from storage because missing from cache", }), refetchedChunkBytes: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_store_bucket_cache_refetched_chunk_bytes_total", From 0c75120426c53ab820e411a0cd033a4c7e92f492 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Tue, 5 May 2020 08:46:49 +0200 Subject: [PATCH 16/34] Dots MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/cache/caching_bucket.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/store/cache/caching_bucket.go b/pkg/store/cache/caching_bucket.go index 78a59d81be..8b525a30f6 100644 --- a/pkg/store/cache/caching_bucket.go +++ b/pkg/store/cache/caching_bucket.go @@ -78,15 +78,15 @@ func NewCachingBucket(b objstore.Bucket, c cache.Cache, chunks CachingBucketConf requestedChunkBytes: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_store_bucket_cache_requested_chunk_bytes_total", - Help: "Total number of requested bytes for chunk data", + Help: "Total number of requested bytes for chunk data.", }), cachedChunkBytes: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_store_bucket_cache_cached_chunk_bytes_total", - Help: "Total number of chunk bytes used from cache", + Help: "Total number of chunk bytes used from cache.", }), fetchedChunkBytes: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_store_bucket_cache_fetched_chunk_bytes_total", - Help: "Total number of chunk bytes fetched from storage because missing from cache", + Help: "Total number of chunk bytes fetched from storage because missing from cache.", }), refetchedChunkBytes: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_store_bucket_cache_refetched_chunk_bytes_total", @@ -94,11 +94,11 @@ func NewCachingBucket(b objstore.Bucket, c cache.Cache, chunks CachingBucketConf }), objectSizeRequests: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_store_bucket_cache_objectsize_requests_total", - Help: "Number of object size requests for objects", + Help: "Number of object size requests for objects.", }), objectSizeHits: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_store_bucket_cache_objectsize_hits_total", - Help: "Number of object size hits for objects", + Help: "Number of object size hits for objects.", }), }, nil } From 9821c2998d3302b1606d8889aaa2754aff5375b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Tue, 5 May 2020 08:47:45 +0200 Subject: [PATCH 17/34] Always set lastBlockOffset. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/cache/caching_bucket.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/store/cache/caching_bucket.go b/pkg/store/cache/caching_bucket.go index 8b525a30f6..437b37b816 100644 --- a/pkg/store/cache/caching_bucket.go +++ b/pkg/store/cache/caching_bucket.go @@ -213,7 +213,7 @@ func (cb *CachingBucket) getRangeChunkFile(ctx context.Context, name string, off } // The very last block in the object may have length that is not divisible by block size. - lastBlockOffset := int64(-1) + lastBlockOffset := endRange - cb.config.ChunkBlockSize lastBlockLength := int(cb.config.ChunkBlockSize) if uint64(endRange) > size { lastBlockOffset = (int64(size) / cb.config.ChunkBlockSize) * cb.config.ChunkBlockSize From c35f077efa122fe4d6150569b69bda795d6b5ced Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Tue, 5 May 2020 10:39:10 +0200 Subject: [PATCH 18/34] Merged cached metric into fetched metric, added labels. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/cache/caching_bucket.go | 38 +++++++++++++++----------- pkg/store/cache/caching_bucket_test.go | 6 ++-- 2 files changed, 25 insertions(+), 19 deletions(-) diff --git a/pkg/store/cache/caching_bucket.go b/pkg/store/cache/caching_bucket.go index 437b37b816..11222f4f84 100644 --- a/pkg/store/cache/caching_bucket.go +++ b/pkg/store/cache/caching_bucket.go @@ -23,6 +23,11 @@ import ( "github.com/thanos-io/thanos/pkg/runutil" ) +const ( + originCache = "cache" + originBucket = "bucket" +) + type CachingBucketConfig struct { // Basic unit used to cache chunks. ChunkBlockSize int64 `yaml:"chunk_block_size"` @@ -54,9 +59,8 @@ type CachingBucket struct { logger log.Logger requestedChunkBytes prometheus.Counter - cachedChunkBytes prometheus.Counter - fetchedChunkBytes prometheus.Counter - refetchedChunkBytes prometheus.Counter + fetchedChunkBytes *prometheus.CounterVec + refetchedChunkBytes *prometheus.CounterVec objectSizeRequests prometheus.Counter objectSizeHits prometheus.Counter @@ -70,7 +74,7 @@ func NewCachingBucket(b objstore.Bucket, c cache.Cache, chunks CachingBucketConf return nil, errors.New("cache is nil") } - return &CachingBucket{ + cb := &CachingBucket{ config: chunks, bucket: b, cache: c, @@ -80,18 +84,14 @@ func NewCachingBucket(b objstore.Bucket, c cache.Cache, chunks CachingBucketConf Name: "thanos_store_bucket_cache_requested_chunk_bytes_total", Help: "Total number of requested bytes for chunk data.", }), - cachedChunkBytes: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_store_bucket_cache_cached_chunk_bytes_total", - Help: "Total number of chunk bytes used from cache.", - }), - fetchedChunkBytes: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + fetchedChunkBytes: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_store_bucket_cache_fetched_chunk_bytes_total", Help: "Total number of chunk bytes fetched from storage because missing from cache.", - }), - refetchedChunkBytes: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + }, []string{"origin"}), + refetchedChunkBytes: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_store_bucket_cache_refetched_chunk_bytes_total", Help: "Total number of chunk bytes re-fetched from storage, despite being in cache already.", - }), + }, []string{"origin"}), objectSizeRequests: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_store_bucket_cache_objectsize_requests_total", Help: "Number of object size requests for objects.", @@ -100,7 +100,13 @@ func NewCachingBucket(b objstore.Bucket, c cache.Cache, chunks CachingBucketConf Name: "thanos_store_bucket_cache_objectsize_hits_total", Help: "Number of object size hits for objects.", }), - }, nil + } + + cb.fetchedChunkBytes.WithLabelValues(originBucket) + cb.fetchedChunkBytes.WithLabelValues(originCache) + cb.refetchedChunkBytes.WithLabelValues(originCache) + + return cb, nil } func (cb *CachingBucket) Close() error { @@ -239,7 +245,7 @@ func (cb *CachingBucket) getRangeChunkFile(ctx context.Context, name string, off // Try to get all blocks from the cache. hits := cb.cache.Fetch(ctx, keys) for _, b := range hits { - cb.cachedChunkBytes.Add(float64(len(b))) + cb.fetchedChunkBytes.WithLabelValues(originCache).Add(float64(len(b))) } if len(hits) < len(keys) { @@ -319,10 +325,10 @@ func (cb *CachingBucket) fetchMissingChunkBlocks(ctx context.Context, name strin hitsMutex.Unlock() if storeToCache { - cb.fetchedChunkBytes.Add(float64(len(blockData))) + cb.fetchedChunkBytes.WithLabelValues(originBucket).Add(float64(len(blockData))) cb.cache.Store(gctx, map[string][]byte{key: blockData}, cb.config.ChunkBlockTTL) } else { - cb.refetchedChunkBytes.Add(float64(len(blockData))) + cb.refetchedChunkBytes.WithLabelValues(originCache).Add(float64(len(blockData))) } } diff --git a/pkg/store/cache/caching_bucket_test.go b/pkg/store/cache/caching_bucket_test.go index c7317e1091..213b05d496 100644 --- a/pkg/store/cache/caching_bucket_test.go +++ b/pkg/store/cache/caching_bucket_test.go @@ -225,9 +225,9 @@ func TestCachingBucket(t *testing.T) { testutil.Ok(t, err) verifyGetRange(t, cachingBucket, name, tc.offset, tc.length, tc.expectedLength) - testutil.Equals(t, tc.expectedCachedBytes, int64(promtest.ToFloat64(cachingBucket.cachedChunkBytes))) - testutil.Equals(t, tc.expectedFetchedBytes, int64(promtest.ToFloat64(cachingBucket.fetchedChunkBytes))) - testutil.Equals(t, tc.expectedRefetchedBytes, int64(promtest.ToFloat64(cachingBucket.refetchedChunkBytes))) + testutil.Equals(t, tc.expectedCachedBytes, int64(promtest.ToFloat64(cachingBucket.fetchedChunkBytes.WithLabelValues(originCache)))) + testutil.Equals(t, tc.expectedFetchedBytes, int64(promtest.ToFloat64(cachingBucket.fetchedChunkBytes.WithLabelValues(originBucket)))) + testutil.Equals(t, tc.expectedRefetchedBytes, int64(promtest.ToFloat64(cachingBucket.refetchedChunkBytes.WithLabelValues(originCache)))) }) } } From c9b51b13731b7b47b7c69d20f37ad9b45375e7c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Tue, 5 May 2020 11:07:42 +0200 Subject: [PATCH 19/34] Added CHANGELOG.md entry MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 017857ea55..b4c73c4650 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#2502](https://github.com/thanos-io/thanos/pull/2502) Added `hints` field to `SeriesResponse`. Hints in an opaque data structure that can be used to carry additional information from the store and its content is implementation specific. - [#2521](https://github.com/thanos-io/thanos/pull/2521) Sidecar: add `thanos_sidecar_reloader_reloads_failed_total`, `thanos_sidecar_reloader_reloads_total`, `thanos_sidecar_reloader_watch_errors_total`, `thanos_sidecar_reloader_watch_events_total` and `thanos_sidecar_reloader_watches` metrics. +- [#2532](https://github.com/thanos-io/thanos/pull/2532) Store: Added caching bucket, that can cache chunks into shared memcached. This can speed up querying and reduce number of requests to object storage. ### Changed From 65af4fba1fd92311ff304e1dcef3301e46356472 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Tue, 5 May 2020 11:18:10 +0200 Subject: [PATCH 20/34] Reworded help for thanos_store_bucket_cache_fetched_chunk_bytes_total MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/cache/caching_bucket.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/store/cache/caching_bucket.go b/pkg/store/cache/caching_bucket.go index 11222f4f84..f0197b5bf4 100644 --- a/pkg/store/cache/caching_bucket.go +++ b/pkg/store/cache/caching_bucket.go @@ -86,7 +86,7 @@ func NewCachingBucket(b objstore.Bucket, c cache.Cache, chunks CachingBucketConf }), fetchedChunkBytes: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_store_bucket_cache_fetched_chunk_bytes_total", - Help: "Total number of chunk bytes fetched from storage because missing from cache.", + Help: "Total number of fetched chunk bytes. Data from bucket is then stored to cache.", }, []string{"origin"}), refetchedChunkBytes: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_store_bucket_cache_refetched_chunk_bytes_total", From 05f2b4ab1752dea9785d43675f74e9f017d68a8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Tue, 5 May 2020 11:43:12 +0200 Subject: [PATCH 21/34] Added tracing around getRangeChunkFile method. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/cache/caching_bucket.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/pkg/store/cache/caching_bucket.go b/pkg/store/cache/caching_bucket.go index f0197b5bf4..653dced3c8 100644 --- a/pkg/store/cache/caching_bucket.go +++ b/pkg/store/cache/caching_bucket.go @@ -21,6 +21,7 @@ import ( "github.com/thanos-io/thanos/pkg/cache" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/runutil" + "github.com/thanos-io/thanos/pkg/tracing" ) const ( @@ -157,7 +158,14 @@ func isTSDBChunkFile(name string) bool { func (cb *CachingBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { if isTSDBChunkFile(name) && off >= 0 && length > 0 { - return cb.getRangeChunkFile(ctx, name, off, length) + var ( + r io.ReadCloser + err error + ) + tracing.DoInSpan(ctx, "cachingbucket_getrange_chunkfile", func(ctx context.Context) { + r, err = cb.getRangeChunkFile(ctx, name, off, length) + }) + return r, err } return cb.bucket.GetRange(ctx, name, off, length) From 015b025eb5224f006f9ec1f37ec727d106822f4c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Tue, 5 May 2020 13:42:41 +0200 Subject: [PATCH 22/34] Updated CHANGELOG.md MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b4c73c4650..a020842006 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,7 +19,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#2502](https://github.com/thanos-io/thanos/pull/2502) Added `hints` field to `SeriesResponse`. Hints in an opaque data structure that can be used to carry additional information from the store and its content is implementation specific. - [#2521](https://github.com/thanos-io/thanos/pull/2521) Sidecar: add `thanos_sidecar_reloader_reloads_failed_total`, `thanos_sidecar_reloader_reloads_total`, `thanos_sidecar_reloader_watch_errors_total`, `thanos_sidecar_reloader_watch_events_total` and `thanos_sidecar_reloader_watches` metrics. -- [#2532](https://github.com/thanos-io/thanos/pull/2532) Store: Added caching bucket, that can cache chunks into shared memcached. This can speed up querying and reduce number of requests to object storage. +- [#2532](https://github.com/thanos-io/thanos/pull/2532) Store: Added hidden option for experimental caching bucket, that can cache chunks into shared memcached. This can speed up querying and reduce number of requests to object storage. ### Changed From 3386747a1301940143831732b6e8aa4af77e8dc4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Tue, 5 May 2020 13:47:40 +0200 Subject: [PATCH 23/34] Options MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- cmd/thanos/store.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 5ac4f8b640..36f4d81d0d 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -55,8 +55,8 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { "YAML file that contains index cache configuration. See format details: https://thanos.io/components/store.md/#index-cache", false) - cachingBucketConfig := extflag.RegisterPathOrContent(cmd, "experimental.caching-bucket.config", - "YAML file that contains configuration for caching bucket. Experimental feature, with high risk of changes. See format details: https://thanos.io/components/store.md/#caching-bucket", + cachingBucketConfig := extflag.RegisterPathOrContent(cmd, "caching-bucket.config", + "YAML that contains configuration for caching bucket. Experimental feature, with high risk of changes. See format details: https://thanos.io/components/store.md/#caching-bucket", false) chunkPoolSize := cmd.Flag("chunk-pool-size", "Maximum size of concurrently allocatable bytes reserved strictly to reuse for chunks in memory."). From 72698999f985f3f2ad8d7dbf0251883f19b16c4c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Wed, 6 May 2020 10:20:04 +0200 Subject: [PATCH 24/34] Fix parameter name. (store. got dropped by accident) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- cmd/thanos/store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 36f4d81d0d..dbe3737b66 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -55,7 +55,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { "YAML file that contains index cache configuration. See format details: https://thanos.io/components/store.md/#index-cache", false) - cachingBucketConfig := extflag.RegisterPathOrContent(cmd, "caching-bucket.config", + cachingBucketConfig := extflag.RegisterPathOrContent(cmd, "store.caching-bucket.config", "YAML that contains configuration for caching bucket. Experimental feature, with high risk of changes. See format details: https://thanos.io/components/store.md/#caching-bucket", false) From 13ac20054b4b6b4998468dc091b1fba990776065 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Wed, 6 May 2020 10:26:05 +0200 Subject: [PATCH 25/34] Use embedded Bucket MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/cache/caching_bucket.go | 51 ++++++------------------------- 1 file changed, 10 insertions(+), 41 deletions(-) diff --git a/pkg/store/cache/caching_bucket.go b/pkg/store/cache/caching_bucket.go index 653dced3c8..ec035a9757 100644 --- a/pkg/store/cache/caching_bucket.go +++ b/pkg/store/cache/caching_bucket.go @@ -52,8 +52,9 @@ func DefaultCachingBucketConfig() CachingBucketConfig { // Bucket implementation that provides some caching features, using knowledge about how Thanos accesses data. type CachingBucket struct { - bucket objstore.Bucket - cache cache.Cache + objstore.Bucket + + cache cache.Cache config CachingBucketConfig @@ -76,8 +77,8 @@ func NewCachingBucket(b objstore.Bucket, c cache.Cache, chunks CachingBucketConf } cb := &CachingBucket{ + Bucket: b, config: chunks, - bucket: b, cache: c, logger: logger, @@ -110,28 +111,16 @@ func NewCachingBucket(b objstore.Bucket, c cache.Cache, chunks CachingBucketConf return cb, nil } -func (cb *CachingBucket) Close() error { - return cb.bucket.Close() -} - -func (cb *CachingBucket) Upload(ctx context.Context, name string, r io.Reader) error { - return cb.bucket.Upload(ctx, name, r) -} - -func (cb *CachingBucket) Delete(ctx context.Context, name string) error { - return cb.bucket.Delete(ctx, name) -} - func (cb *CachingBucket) Name() string { - return "caching: " + cb.bucket.Name() + return "caching: " + cb.Bucket.Name() } func (cb *CachingBucket) WithExpectedErrs(expectedFunc objstore.IsOpFailureExpectedFunc) objstore.Bucket { - if ib, ok := cb.bucket.(objstore.InstrumentedBucket); ok { + if ib, ok := cb.Bucket.(objstore.InstrumentedBucket); ok { // Make a copy, but replace bucket with instrumented one. res := &CachingBucket{} *res = *cb - res.bucket = ib.WithExpectedErrs(expectedFunc) + res.Bucket = ib.WithExpectedErrs(expectedFunc) return res } @@ -142,14 +131,6 @@ func (cb *CachingBucket) ReaderWithExpectedErrs(expectedFunc objstore.IsOpFailur return cb.WithExpectedErrs(expectedFunc) } -func (cb *CachingBucket) Iter(ctx context.Context, dir string, f func(string) error) error { - return cb.bucket.Iter(ctx, dir, f) -} - -func (cb *CachingBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { - return cb.bucket.Get(ctx, name) -} - var chunksMatcher = regexp.MustCompile(`^.*/chunks/\d+$`) func isTSDBChunkFile(name string) bool { @@ -168,19 +149,7 @@ func (cb *CachingBucket) GetRange(ctx context.Context, name string, off, length return r, err } - return cb.bucket.GetRange(ctx, name, off, length) -} - -func (cb *CachingBucket) Exists(ctx context.Context, name string) (bool, error) { - return cb.bucket.Exists(ctx, name) -} - -func (cb *CachingBucket) IsObjNotFoundErr(err error) bool { - return cb.bucket.IsObjNotFoundErr(err) -} - -func (cb *CachingBucket) ObjectSize(ctx context.Context, name string) (uint64, error) { - return cb.bucket.ObjectSize(ctx, name) + return cb.Bucket.GetRange(ctx, name, off, length) } func (cb *CachingBucket) cachedObjectSize(ctx context.Context, name string, ttl time.Duration) (uint64, error) { @@ -194,7 +163,7 @@ func (cb *CachingBucket) cachedObjectSize(ctx context.Context, name string, ttl return binary.BigEndian.Uint64(s), nil } - size, err := cb.bucket.ObjectSize(ctx, name) + size, err := cb.Bucket.ObjectSize(ctx, name) if err != nil { return 0, err } @@ -298,7 +267,7 @@ func (cb *CachingBucket) fetchMissingChunkBlocks(ctx context.Context, name strin for _, m := range missing { m := m g.Go(func() error { - r, err := cb.bucket.GetRange(gctx, name, m.start, m.end-m.start) + r, err := cb.Bucket.GetRange(gctx, name, m.start, m.end-m.start) if err != nil { return errors.Wrapf(err, "fetching range [%d, %d]", m.start, m.end) } From 3e96b7ca8dc9d0ba0a3c65756c9372c19d6958ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Wed, 6 May 2020 11:01:45 +0200 Subject: [PATCH 26/34] Added comments. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/cache/caching_bucket_factory.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/store/cache/caching_bucket_factory.go b/pkg/store/cache/caching_bucket_factory.go index fbf4c1836b..0bc0a78458 100644 --- a/pkg/store/cache/caching_bucket_factory.go +++ b/pkg/store/cache/caching_bucket_factory.go @@ -15,10 +15,14 @@ import ( "github.com/thanos-io/thanos/pkg/objstore" ) +// BucketCacheProvider is a type used to evaluate all bucket cache providers. type BucketCacheProvider string -const MemcachedBucketCacheProvider BucketCacheProvider = "memcached" +const ( + MemcachedBucketCacheProvider BucketCacheProvider = "memcached" // Memcached cache-provider for caching bucket. +) +// CachingBucketWithBackendConfig is a configuration of caching bucket used by Store component. type CachingBucketWithBackendConfig struct { Type BucketCacheProvider `yaml:"backend"` BackendConfig interface{} `yaml:"backend_config"` @@ -26,6 +30,7 @@ type CachingBucketWithBackendConfig struct { CachingBucketConfig CachingBucketConfig `yaml:"caching_config"` } +// NewCachingBucketFromYaml uses YAML configuration to create new caching bucket. func NewCachingBucketFromYaml(yamlContent []byte, bucket objstore.Bucket, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) { level.Info(logger).Log("msg", "loading caching bucket configuration") From 2f1b68d71f859a91469c021fa1f224bb96a7f2d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Wed, 6 May 2020 11:03:42 +0200 Subject: [PATCH 27/34] Fixed comment. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/cache/caching_bucket.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/store/cache/caching_bucket.go b/pkg/store/cache/caching_bucket.go index ec035a9757..ea4a15e64d 100644 --- a/pkg/store/cache/caching_bucket.go +++ b/pkg/store/cache/caching_bucket.go @@ -243,7 +243,8 @@ type rng struct { start, end int64 } -// Fetches missing blocks and stores them into "hits" map. +// fetchMissingChunkBlocks fetches missing blocks, stores them into "hits" map +// and into cache as well (using provided cacheKeys). func (cb *CachingBucket) fetchMissingChunkBlocks(ctx context.Context, name string, startRange, endRange int64, cacheKeys map[int64]string, hits map[string][]byte, lastBlockOffset int64, lastBlockLength int) error { // Ordered list of missing sub-ranges. var missing []rng From c26bd564a3a735f2006a6bf44b14bd8121b38293 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Thu, 7 May 2020 11:05:24 +0200 Subject: [PATCH 28/34] Hide store.caching-bucket.config flags. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- cmd/thanos/store.go | 2 +- pkg/extflag/hidden.go | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) create mode 100644 pkg/extflag/hidden.go diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index dbe3737b66..9425e36dda 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -55,7 +55,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { "YAML file that contains index cache configuration. See format details: https://thanos.io/components/store.md/#index-cache", false) - cachingBucketConfig := extflag.RegisterPathOrContent(cmd, "store.caching-bucket.config", + cachingBucketConfig := extflag.RegisterPathOrContent(extflag.HiddenCmdClause(cmd), "store.caching-bucket.config", "YAML that contains configuration for caching bucket. Experimental feature, with high risk of changes. See format details: https://thanos.io/components/store.md/#caching-bucket", false) diff --git a/pkg/extflag/hidden.go b/pkg/extflag/hidden.go new file mode 100644 index 0000000000..0493e9c6fc --- /dev/null +++ b/pkg/extflag/hidden.go @@ -0,0 +1,18 @@ +package extflag + +import ( + "gopkg.in/alecthomas/kingpin.v2" +) + +// HiddenCmdClause returns CmdClause that hides created flags. +func HiddenCmdClause(c CmdClause) CmdClause { + return hidden{c: c} +} + +type hidden struct { + c CmdClause +} + +func (h hidden) Flag(name, help string) *kingpin.FlagClause { + return h.c.Flag(name, help).Hidden() +} From db757257b5c2975bd7f0422b0b752af9ba41aad2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Thu, 7 May 2020 11:08:58 +0200 Subject: [PATCH 29/34] Renamed block to subrange. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/cache/caching_bucket.go | 132 ++++++++++++------------- pkg/store/cache/caching_bucket_test.go | 118 +++++++++++----------- 2 files changed, 125 insertions(+), 125 deletions(-) diff --git a/pkg/store/cache/caching_bucket.go b/pkg/store/cache/caching_bucket.go index ea4a15e64d..43b54186bc 100644 --- a/pkg/store/cache/caching_bucket.go +++ b/pkg/store/cache/caching_bucket.go @@ -31,21 +31,21 @@ const ( type CachingBucketConfig struct { // Basic unit used to cache chunks. - ChunkBlockSize int64 `yaml:"chunk_block_size"` + ChunkSubrangeSize int64 `yaml:"chunk_subrange_size"` // Maximum number of GetRange requests issued by this bucket for single GetRange call. Zero or negative value = unlimited. MaxChunksGetRangeRequests int `yaml:"max_chunks_get_range_requests"` // TTLs for various cache items. ChunkObjectSizeTTL time.Duration `yaml:"chunk_object_size_ttl"` - ChunkBlockTTL time.Duration `yaml:"chunk_block_ttl"` + ChunkSubrangeTTL time.Duration `yaml:"chunk_subrange_ttl"` } func DefaultCachingBucketConfig() CachingBucketConfig { return CachingBucketConfig{ - ChunkBlockSize: 16000, // Equal to max chunk size. + ChunkSubrangeSize: 16000, // Equal to max chunk size. ChunkObjectSizeTTL: 24 * time.Hour, - ChunkBlockTTL: 24 * time.Hour, + ChunkSubrangeTTL: 24 * time.Hour, MaxChunksGetRangeRequests: 3, } } @@ -188,38 +188,38 @@ func (cb *CachingBucket) getRangeChunkFile(ctx context.Context, name string, off length = int64(size - uint64(offset)) } - // Start and end range are block-aligned offsets into object, that we're going to read. - startRange := (offset / cb.config.ChunkBlockSize) * cb.config.ChunkBlockSize - endRange := ((offset + length) / cb.config.ChunkBlockSize) * cb.config.ChunkBlockSize - if (offset+length)%cb.config.ChunkBlockSize > 0 { - endRange += cb.config.ChunkBlockSize + // Start and end range are subrange-aligned offsets into object, that we're going to read. + startRange := (offset / cb.config.ChunkSubrangeSize) * cb.config.ChunkSubrangeSize + endRange := ((offset + length) / cb.config.ChunkSubrangeSize) * cb.config.ChunkSubrangeSize + if (offset+length)%cb.config.ChunkSubrangeSize > 0 { + endRange += cb.config.ChunkSubrangeSize } - // The very last block in the object may have length that is not divisible by block size. - lastBlockOffset := endRange - cb.config.ChunkBlockSize - lastBlockLength := int(cb.config.ChunkBlockSize) + // The very last subrange in the object may have length that is not divisible by subrange size. + lastSubrangeOffset := endRange - cb.config.ChunkSubrangeSize + lastSubrangeLength := int(cb.config.ChunkSubrangeSize) if uint64(endRange) > size { - lastBlockOffset = (int64(size) / cb.config.ChunkBlockSize) * cb.config.ChunkBlockSize - lastBlockLength = int(int64(size) - lastBlockOffset) + lastSubrangeOffset = (int64(size) / cb.config.ChunkSubrangeSize) * cb.config.ChunkSubrangeSize + lastSubrangeLength = int(int64(size) - lastSubrangeOffset) } - numBlocks := (endRange - startRange) / cb.config.ChunkBlockSize + numSubranges := (endRange - startRange) / cb.config.ChunkSubrangeSize - offsetKeys := make(map[int64]string, numBlocks) - keys := make([]string, 0, numBlocks) + offsetKeys := make(map[int64]string, numSubranges) + keys := make([]string, 0, numSubranges) - for off := startRange; off < endRange; off += cb.config.ChunkBlockSize { - end := off + cb.config.ChunkBlockSize + for off := startRange; off < endRange; off += cb.config.ChunkSubrangeSize { + end := off + cb.config.ChunkSubrangeSize if end > int64(size) { end = int64(size) } - k := cachingKeyObjectBlock(name, off, end) + k := cachingKeyObjectSubrange(name, off, end) keys = append(keys, k) offsetKeys[off] = k } - // Try to get all blocks from the cache. + // Try to get all subranges from the cache. hits := cb.cache.Fetch(ctx, keys) for _, b := range hits { cb.fetchedChunkBytes.WithLabelValues(originCache).Add(float64(len(b))) @@ -230,34 +230,34 @@ func (cb *CachingBucket) getRangeChunkFile(ctx context.Context, name string, off hits = map[string][]byte{} } - err := cb.fetchMissingChunkBlocks(ctx, name, startRange, endRange, offsetKeys, hits, lastBlockOffset, lastBlockLength) + err := cb.fetchMissingChunkSubranges(ctx, name, startRange, endRange, offsetKeys, hits, lastSubrangeOffset, lastSubrangeLength) if err != nil { return nil, err } } - return newBlocksReader(cb.config.ChunkBlockSize, offsetKeys, hits, offset, length), nil + return newSubrangesReader(cb.config.ChunkSubrangeSize, offsetKeys, hits, offset, length), nil } type rng struct { start, end int64 } -// fetchMissingChunkBlocks fetches missing blocks, stores them into "hits" map +// fetchMissingChunkSubranges fetches missing subranges, stores them into "hits" map // and into cache as well (using provided cacheKeys). -func (cb *CachingBucket) fetchMissingChunkBlocks(ctx context.Context, name string, startRange, endRange int64, cacheKeys map[int64]string, hits map[string][]byte, lastBlockOffset int64, lastBlockLength int) error { +func (cb *CachingBucket) fetchMissingChunkSubranges(ctx context.Context, name string, startRange, endRange int64, cacheKeys map[int64]string, hits map[string][]byte, lastSubrangeOffset int64, lastSubrangeLength int) error { // Ordered list of missing sub-ranges. var missing []rng - for off := startRange; off < endRange; off += cb.config.ChunkBlockSize { + for off := startRange; off < endRange; off += cb.config.ChunkSubrangeSize { if hits[cacheKeys[off]] == nil { - missing = append(missing, rng{start: off, end: off + cb.config.ChunkBlockSize}) + missing = append(missing, rng{start: off, end: off + cb.config.ChunkSubrangeSize}) } } missing = mergeRanges(missing, 0) // Merge adjacent ranges. // Keep merging until we have only max number of ranges (= requests). - for limit := cb.config.ChunkBlockSize; cb.config.MaxChunksGetRangeRequests > 0 && len(missing) > cb.config.MaxChunksGetRangeRequests; limit = limit * 2 { + for limit := cb.config.ChunkSubrangeSize; cb.config.MaxChunksGetRangeRequests > 0 && len(missing) > cb.config.MaxChunksGetRangeRequests; limit = limit * 2 { missing = mergeRanges(missing, limit) } @@ -274,22 +274,22 @@ func (cb *CachingBucket) fetchMissingChunkBlocks(ctx context.Context, name strin } defer runutil.CloseWithLogOnErr(cb.logger, r, "fetching range [%d, %d]", m.start, m.end) - for off := m.start; off < m.end && gctx.Err() == nil; off += cb.config.ChunkBlockSize { + for off := m.start; off < m.end && gctx.Err() == nil; off += cb.config.ChunkSubrangeSize { key := cacheKeys[off] if key == "" { return errors.Errorf("fetching range [%d, %d]: caching key for offset %d not found", m.start, m.end, off) } - // We need a new buffer for each block, both for storing into hits, and also for caching. - var blockData []byte - if off == lastBlockOffset { - // The very last block in the object may have different length, - // if object length isn't divisible by block size. - blockData = make([]byte, lastBlockLength) + // We need a new buffer for each subrange, both for storing into hits, and also for caching. + var subrangeData []byte + if off == lastSubrangeOffset { + // The very last subrange in the object may have different length, + // if object length isn't divisible by subrange size. + subrangeData = make([]byte, lastSubrangeLength) } else { - blockData = make([]byte, cb.config.ChunkBlockSize) + subrangeData = make([]byte, cb.config.ChunkSubrangeSize) } - _, err := io.ReadFull(r, blockData) + _, err := io.ReadFull(r, subrangeData) if err != nil { return errors.Wrapf(err, "fetching range [%d, %d]", m.start, m.end) } @@ -298,15 +298,15 @@ func (cb *CachingBucket) fetchMissingChunkBlocks(ctx context.Context, name strin hitsMutex.Lock() if _, ok := hits[key]; !ok { storeToCache = true - hits[key] = blockData + hits[key] = subrangeData } hitsMutex.Unlock() if storeToCache { - cb.fetchedChunkBytes.WithLabelValues(originBucket).Add(float64(len(blockData))) - cb.cache.Store(gctx, map[string][]byte{key: blockData}, cb.config.ChunkBlockTTL) + cb.fetchedChunkBytes.WithLabelValues(originBucket).Add(float64(len(subrangeData))) + cb.cache.Store(gctx, map[string][]byte{key: subrangeData}, cb.config.ChunkSubrangeTTL) } else { - cb.refetchedChunkBytes.WithLabelValues(originCache).Add(float64(len(blockData))) + cb.refetchedChunkBytes.WithLabelValues(originCache).Add(float64(len(subrangeData))) } } @@ -339,56 +339,56 @@ func cachingKeyObjectSize(name string) string { return fmt.Sprintf("size:%s", name) } -func cachingKeyObjectBlock(name string, start int64, end int64) string { - return fmt.Sprintf("block:%s:%d:%d", name, start, end) +func cachingKeyObjectSubrange(name string, start int64, end int64) string { + return fmt.Sprintf("subrange:%s:%d:%d", name, start, end) } -// io.ReadCloser implementation that uses in-memory blocks. -type blocksReader struct { - blockSize int64 +// io.ReadCloser implementation that uses in-memory subranges. +type subrangesReader struct { + subrangeSize int64 - // Mapping of blockSize-aligned offsets to keys in hits. + // Mapping of subrangeSize-aligned offsets to keys in hits. offsetsKeys map[int64]string - blocks map[string][]byte + subranges map[string][]byte - // Offset for next read, used to find correct block to return data from. + // Offset for next read, used to find correct subrange to return data from. readOffset int64 // Remaining data to return from this reader. Once zero, this reader reports EOF. remaining int64 } -func newBlocksReader(blockSize int64, offsetsKeys map[int64]string, blocks map[string][]byte, readOffset, remaining int64) *blocksReader { - return &blocksReader{ - blockSize: blockSize, - offsetsKeys: offsetsKeys, - blocks: blocks, +func newSubrangesReader(subrangeSize int64, offsetsKeys map[int64]string, subranges map[string][]byte, readOffset, remaining int64) *subrangesReader { + return &subrangesReader{ + subrangeSize: subrangeSize, + offsetsKeys: offsetsKeys, + subranges: subranges, readOffset: readOffset, remaining: remaining, } } -func (c *blocksReader) Close() error { +func (c *subrangesReader) Close() error { return nil } -func (c *blocksReader) Read(p []byte) (n int, err error) { +func (c *subrangesReader) Read(p []byte) (n int, err error) { if c.remaining <= 0 { return 0, io.EOF } - currentBlockOffset := (c.readOffset / c.blockSize) * c.blockSize - currentBlock, err := c.blockAt(currentBlockOffset) + currentSubrangeOffset := (c.readOffset / c.subrangeSize) * c.subrangeSize + currentSubrange, err := c.subrangeAt(currentSubrangeOffset) if err != nil { return 0, errors.Wrapf(err, "read position: %d", c.readOffset) } - offsetInBlock := int(c.readOffset - currentBlockOffset) - toCopy := len(currentBlock) - offsetInBlock + offsetInSubrange := int(c.readOffset - currentSubrangeOffset) + toCopy := len(currentSubrange) - offsetInSubrange if toCopy <= 0 { - // This can only happen if block's length is not blockSize, and reader is told to read more data. - return 0, errors.Errorf("no more data left in block at position %d, block length %d, reading position %d", currentBlockOffset, len(currentBlock), c.readOffset) + // This can only happen if subrange's length is not subrangeSize, and reader is told to read more data. + return 0, errors.Errorf("no more data left in subrange at position %d, subrange length %d, reading position %d", currentSubrangeOffset, len(currentSubrange), c.readOffset) } if len(p) < toCopy { @@ -398,17 +398,17 @@ func (c *blocksReader) Read(p []byte) (n int, err error) { toCopy = int(c.remaining) // Conversion is safe, c.remaining is small enough. } - copy(p, currentBlock[offsetInBlock:offsetInBlock+toCopy]) + copy(p, currentSubrange[offsetInSubrange:offsetInSubrange+toCopy]) c.readOffset += int64(toCopy) c.remaining -= int64(toCopy) return toCopy, nil } -func (c *blocksReader) blockAt(offset int64) ([]byte, error) { - b := c.blocks[c.offsetsKeys[offset]] +func (c *subrangesReader) subrangeAt(offset int64) ([]byte, error) { + b := c.subranges[c.offsetsKeys[offset]] if b == nil { - return nil, errors.Errorf("block for offset %d not found", offset) + return nil, errors.Errorf("subrange for offset %d not found", offset) } return b, nil } diff --git a/pkg/store/cache/caching_bucket_test.go b/pkg/store/cache/caching_bucket_test.go index 213b05d496..2bbc37c887 100644 --- a/pkg/store/cache/caching_bucket_test.go +++ b/pkg/store/cache/caching_bucket_test.go @@ -22,7 +22,7 @@ import ( func TestCachingBucket(t *testing.T) { length := int64(1024 * 1024) - blockSize := int64(16000) // All tests are based on this value. + subrangeSize := int64(16000) // All tests are based on this value. data := make([]byte, length) for ix := 0; ix < len(data); ix++ { @@ -54,15 +54,15 @@ func TestCachingBucket(t *testing.T) { offset: 555555, length: 55555, expectedLength: 55555, - expectedFetchedBytes: 5 * blockSize, + expectedFetchedBytes: 5 * subrangeSize, }, { - name: "same request will hit all blocks in the cache", + name: "same request will hit all subranges in the cache", offset: 555555, length: 55555, expectedLength: 55555, - expectedCachedBytes: 5 * blockSize, + expectedCachedBytes: 5 * subrangeSize, }, { @@ -70,24 +70,24 @@ func TestCachingBucket(t *testing.T) { offset: length - 10, length: 3000, expectedLength: 10, - expectedFetchedBytes: 8576, // Last (incomplete) block is fetched. + expectedFetchedBytes: 8576, // Last (incomplete) subrange is fetched. }, { name: "another request data close to the end of object, cached by previous test", offset: 1040100, - length: blockSize, + length: subrangeSize, expectedLength: 8476, expectedCachedBytes: 8576, }, { - name: "entire object, combination of cached and uncached blocks", + name: "entire object, combination of cached and uncached subranges", offset: 0, length: length, expectedLength: length, - expectedCachedBytes: 5*blockSize + 8576, // 5 block cached from first test, plus last incomplete block. - expectedFetchedBytes: 60 * blockSize, + expectedCachedBytes: 5*subrangeSize + 8576, // 5 subrange cached from first test, plus last incomplete subrange. + expectedFetchedBytes: 60 * subrangeSize, }, { @@ -111,103 +111,103 @@ func TestCachingBucket(t *testing.T) { }, { - name: "missing first blocks", + name: "missing first subranges", offset: 0, - length: 10 * blockSize, - expectedLength: 10 * blockSize, - expectedFetchedBytes: 3 * blockSize, - expectedCachedBytes: 7 * blockSize, + length: 10 * subrangeSize, + expectedLength: 10 * subrangeSize, + expectedFetchedBytes: 3 * subrangeSize, + expectedCachedBytes: 7 * subrangeSize, init: func() { - // Delete first 3 blocks. - delete(cache.cache, cachingKeyObjectBlock(name, 0*blockSize, 1*blockSize)) - delete(cache.cache, cachingKeyObjectBlock(name, 1*blockSize, 2*blockSize)) - delete(cache.cache, cachingKeyObjectBlock(name, 2*blockSize, 3*blockSize)) + // Delete first 3 subranges. + delete(cache.cache, cachingKeyObjectSubrange(name, 0*subrangeSize, 1*subrangeSize)) + delete(cache.cache, cachingKeyObjectSubrange(name, 1*subrangeSize, 2*subrangeSize)) + delete(cache.cache, cachingKeyObjectSubrange(name, 2*subrangeSize, 3*subrangeSize)) }, }, { - name: "missing last blocks", + name: "missing last subranges", offset: 0, - length: 10 * blockSize, - expectedLength: 10 * blockSize, - expectedFetchedBytes: 3 * blockSize, - expectedCachedBytes: 7 * blockSize, + length: 10 * subrangeSize, + expectedLength: 10 * subrangeSize, + expectedFetchedBytes: 3 * subrangeSize, + expectedCachedBytes: 7 * subrangeSize, init: func() { - // Delete last 3 blocks. - delete(cache.cache, cachingKeyObjectBlock(name, 7*blockSize, 8*blockSize)) - delete(cache.cache, cachingKeyObjectBlock(name, 8*blockSize, 9*blockSize)) - delete(cache.cache, cachingKeyObjectBlock(name, 9*blockSize, 10*blockSize)) + // Delete last 3 subranges. + delete(cache.cache, cachingKeyObjectSubrange(name, 7*subrangeSize, 8*subrangeSize)) + delete(cache.cache, cachingKeyObjectSubrange(name, 8*subrangeSize, 9*subrangeSize)) + delete(cache.cache, cachingKeyObjectSubrange(name, 9*subrangeSize, 10*subrangeSize)) }, }, { - name: "missing middle blocks", + name: "missing middle subranges", offset: 0, - length: 10 * blockSize, - expectedLength: 10 * blockSize, - expectedFetchedBytes: 3 * blockSize, - expectedCachedBytes: 7 * blockSize, + length: 10 * subrangeSize, + expectedLength: 10 * subrangeSize, + expectedFetchedBytes: 3 * subrangeSize, + expectedCachedBytes: 7 * subrangeSize, init: func() { - // Delete 3 blocks in the middle. - delete(cache.cache, cachingKeyObjectBlock(name, 3*blockSize, 4*blockSize)) - delete(cache.cache, cachingKeyObjectBlock(name, 4*blockSize, 5*blockSize)) - delete(cache.cache, cachingKeyObjectBlock(name, 5*blockSize, 6*blockSize)) + // Delete 3 subranges in the middle. + delete(cache.cache, cachingKeyObjectSubrange(name, 3*subrangeSize, 4*subrangeSize)) + delete(cache.cache, cachingKeyObjectSubrange(name, 4*subrangeSize, 5*subrangeSize)) + delete(cache.cache, cachingKeyObjectSubrange(name, 5*subrangeSize, 6*subrangeSize)) }, }, { - name: "missing everything except middle blocks", + name: "missing everything except middle subranges", offset: 0, - length: 10 * blockSize, - expectedLength: 10 * blockSize, - expectedFetchedBytes: 7 * blockSize, - expectedCachedBytes: 3 * blockSize, + length: 10 * subrangeSize, + expectedLength: 10 * subrangeSize, + expectedFetchedBytes: 7 * subrangeSize, + expectedCachedBytes: 3 * subrangeSize, init: func() { - // Delete all but 3 blocks in the middle, and keep unlimited number of ranged subrequests. + // Delete all but 3 subranges in the middle, and keep unlimited number of ranged subrequests. for i := int64(0); i < 10; i++ { if i > 0 && i%3 == 0 { continue } - delete(cache.cache, cachingKeyObjectBlock(name, i*blockSize, (i+1)*blockSize)) + delete(cache.cache, cachingKeyObjectSubrange(name, i*subrangeSize, (i+1)*subrangeSize)) } }, }, { - name: "missing everything except middle blocks, one subrequest only", + name: "missing everything except middle subranges, one subrequest only", offset: 0, - length: 10 * blockSize, - expectedLength: 10 * blockSize, - expectedFetchedBytes: 7 * blockSize, - expectedCachedBytes: 3 * blockSize, - expectedRefetchedBytes: 3 * blockSize, // Entire object fetched, 3 blocks are "refetched". + length: 10 * subrangeSize, + expectedLength: 10 * subrangeSize, + expectedFetchedBytes: 7 * subrangeSize, + expectedCachedBytes: 3 * subrangeSize, + expectedRefetchedBytes: 3 * subrangeSize, // Entire object fetched, 3 subranges are "refetched". maxGetRangeRequests: 1, init: func() { - // Delete all but 3 blocks in the middle, but only allow 1 subrequest. + // Delete all but 3 subranges in the middle, but only allow 1 subrequest. for i := int64(0); i < 10; i++ { if i == 3 || i == 5 || i == 7 { continue } - delete(cache.cache, cachingKeyObjectBlock(name, i*blockSize, (i+1)*blockSize)) + delete(cache.cache, cachingKeyObjectSubrange(name, i*subrangeSize, (i+1)*subrangeSize)) } }, }, { - name: "missing everything except middle blocks, two subrequests", + name: "missing everything except middle subranges, two subrequests", offset: 0, - length: 10 * blockSize, - expectedLength: 10 * blockSize, - expectedFetchedBytes: 7 * blockSize, - expectedCachedBytes: 3 * blockSize, + length: 10 * subrangeSize, + expectedLength: 10 * subrangeSize, + expectedFetchedBytes: 7 * subrangeSize, + expectedCachedBytes: 3 * subrangeSize, maxGetRangeRequests: 2, init: func() { - // Delete all but one blocks in the middle, and allow 2 subrequests. They will be: 0-80000, 128000-160000. + // Delete all but one subranges in the middle, and allow 2 subrequests. They will be: 0-80000, 128000-160000. for i := int64(0); i < 10; i++ { if i == 5 || i == 6 || i == 7 { continue } - delete(cache.cache, cachingKeyObjectBlock(name, i*blockSize, (i+1)*blockSize)) + delete(cache.cache, cachingKeyObjectSubrange(name, i*subrangeSize, (i+1)*subrangeSize)) } }, }, @@ -218,7 +218,7 @@ func TestCachingBucket(t *testing.T) { } cfg := DefaultCachingBucketConfig() - cfg.ChunkBlockSize = blockSize + cfg.ChunkSubrangeSize = subrangeSize cfg.MaxChunksGetRangeRequests = tc.maxGetRangeRequests cachingBucket, err := NewCachingBucket(inmem, cache, cfg, nil, nil) From ba3b5d9774ab9cac871a5528af410b0182c4af4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Thu, 7 May 2020 11:13:56 +0200 Subject: [PATCH 30/34] Renamed block to subrange. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- docs/components/store.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/components/store.md b/docs/components/store.md index db19ff92fa..fa9b1546dd 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -243,20 +243,20 @@ backend_config: - localhost:11211 caching_config: - chunk_block_size: 16000 + chunk_subrange_size: 16000 max_chunks_get_range_requests: 3 chunk_object_size_ttl: 24h - chunk_block_ttl: 24h + chunk_subrange_ttl: 24h ``` `backend_config` field for memcached supports all the same configuration as memcached for [index cache](#memcached-index-cache). `caching_config` is a configuration for chunks cache and supports the following optional settings: -- `chunk_block_size`: size of segment of chunks object that is stored to the cache. This is the smallest unit that chunks cache is working with. -- `max_chunks_get_range_requests`: how many "get range" sub-requests may cache perform to fetch missing blocks. +- `chunk_subrange_size`: size of segment of chunks object that is stored to the cache. This is the smallest unit that chunks cache is working with. +- `max_chunks_get_range_requests`: how many "get range" sub-requests may cache perform to fetch missing subranges. - `chunk_object_size_ttl`: how long to keep information about chunk file length in the cache. -- `chunk_block_ttl`: how long to keep individual blocks in the cache. +- `chunk_subrange_ttl`: how long to keep individual subranges in the cache. Note that chunks cache is an experimental feature, and these fields may be renamed or removed completely in the future. From 267ce9ce16ed0093a904ad3cbc9b58c87af0ea70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Thu, 7 May 2020 11:18:48 +0200 Subject: [PATCH 31/34] Header MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/extflag/hidden.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/extflag/hidden.go b/pkg/extflag/hidden.go index 0493e9c6fc..cf8de957a5 100644 --- a/pkg/extflag/hidden.go +++ b/pkg/extflag/hidden.go @@ -1,3 +1,6 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + package extflag import ( From ec564b67808c0b16920cdb7f4aa9efcaab3dce86 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Thu, 7 May 2020 11:36:33 +0200 Subject: [PATCH 32/34] Added TODO MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/cache/factory.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/store/cache/factory.go b/pkg/store/cache/factory.go index e67251d3c7..c41c19188f 100644 --- a/pkg/store/cache/factory.go +++ b/pkg/store/cache/factory.go @@ -24,6 +24,7 @@ const ( // IndexCacheConfig specifies the index cache config. type IndexCacheConfig struct { + // TODO: rename to backend and backend_config. Type IndexCacheProvider `yaml:"type"` Config interface{} `yaml:"config"` } From 2c21dfe75d4c73affb403dbd58bc0cdafc72bfdd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Thu, 7 May 2020 11:37:09 +0200 Subject: [PATCH 33/34] Removed TODO, in favor of creating issue. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/cache/factory.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/store/cache/factory.go b/pkg/store/cache/factory.go index c41c19188f..e67251d3c7 100644 --- a/pkg/store/cache/factory.go +++ b/pkg/store/cache/factory.go @@ -24,7 +24,6 @@ const ( // IndexCacheConfig specifies the index cache config. type IndexCacheConfig struct { - // TODO: rename to backend and backend_config. Type IndexCacheProvider `yaml:"type"` Config interface{} `yaml:"config"` } From d1eabf7297aac70f8660e7d2c638c894fab96eab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Thu, 7 May 2020 13:46:12 +0200 Subject: [PATCH 34/34] Use NopCloser. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/cache/caching_bucket.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/pkg/store/cache/caching_bucket.go b/pkg/store/cache/caching_bucket.go index 43b54186bc..2ce7e6dd11 100644 --- a/pkg/store/cache/caching_bucket.go +++ b/pkg/store/cache/caching_bucket.go @@ -8,6 +8,7 @@ import ( "encoding/binary" "fmt" "io" + "io/ioutil" "regexp" "sync" "time" @@ -236,7 +237,7 @@ func (cb *CachingBucket) getRangeChunkFile(ctx context.Context, name string, off } } - return newSubrangesReader(cb.config.ChunkSubrangeSize, offsetKeys, hits, offset, length), nil + return ioutil.NopCloser(newSubrangesReader(cb.config.ChunkSubrangeSize, offsetKeys, hits, offset, length)), nil } type rng struct { @@ -343,7 +344,7 @@ func cachingKeyObjectSubrange(name string, start int64, end int64) string { return fmt.Sprintf("subrange:%s:%d:%d", name, start, end) } -// io.ReadCloser implementation that uses in-memory subranges. +// Reader implementation that uses in-memory subranges. type subrangesReader struct { subrangeSize int64 @@ -369,10 +370,6 @@ func newSubrangesReader(subrangeSize int64, offsetsKeys map[int64]string, subran } } -func (c *subrangesReader) Close() error { - return nil -} - func (c *subrangesReader) Read(p []byte) (n int, err error) { if c.remaining <= 0 { return 0, io.EOF