Skip to content

Commit

Permalink
extend postings cache key with codec
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <benye@amazon.com>
  • Loading branch information
yeya24 committed May 31, 2023
1 parent 541a59a commit 33f6924
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 56 deletions.
8 changes: 4 additions & 4 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,8 @@ func (s *BucketStore) validate() error {

type noopCache struct{}

func (noopCache) StorePostings(ulid.ULID, labels.Label, []byte) {}
func (noopCache) FetchMultiPostings(_ context.Context, _ ulid.ULID, keys []labels.Label) (map[labels.Label][]byte, []labels.Label) {
func (noopCache) StorePostings(ulid.ULID, labels.Label, []byte, storecache.PostingsCodec) {}
func (noopCache) FetchMultiPostings(_ context.Context, _ ulid.ULID, keys []labels.Label, _ storecache.PostingsCodec) (map[labels.Label][]byte, []labels.Label) {
return map[labels.Label][]byte{}, keys
}

Expand Down Expand Up @@ -2388,7 +2388,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
output := make([]index.Postings, len(keys))

// Fetch postings from the cache with a single call.
fromCache, _ := r.block.indexCache.FetchMultiPostings(ctx, r.block.meta.ULID, keys)
fromCache, _ := r.block.indexCache.FetchMultiPostings(ctx, r.block.meta.ULID, keys, storecache.CodecHeaderStreamedSnappy)
for _, dataFromCache := range fromCache {
if err := bytesLimiter.Reserve(uint64(len(dataFromCache))); err != nil {
return nil, closeFns, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading postings from index cache: %s", err)
Expand Down Expand Up @@ -2526,7 +2526,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
// Truncate first 4 bytes which are length of posting.
output[p.keyID] = newBigEndianPostings(pBytes[4:])

r.block.indexCache.StorePostings(r.block.meta.ULID, keys[p.keyID], dataToCache)
r.block.indexCache.StorePostings(r.block.meta.ULID, keys[p.keyID], dataToCache, storecache.CodecHeaderStreamedSnappy)

// If we just fetched it we still have to update the stats for touched postings.
r.stats.postingsTouched++
Expand Down
8 changes: 4 additions & 4 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ func (c *swappableCache) SwapWith(ptr2 storecache.IndexCache) {
c.ptr = ptr2
}

func (c *swappableCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) {
c.ptr.StorePostings(blockID, l, v)
func (c *swappableCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, codec storecache.PostingsCodec) {
c.ptr.StorePostings(blockID, l, v, codec)
}

func (c *swappableCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label) (map[labels.Label][]byte, []labels.Label) {
return c.ptr.FetchMultiPostings(ctx, blockID, keys)
func (c *swappableCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label, codec storecache.PostingsCodec) (map[labels.Label][]byte, []labels.Label) {
return c.ptr.FetchMultiPostings(ctx, blockID, keys, codec)
}

func (c *swappableCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) {
Expand Down
11 changes: 9 additions & 2 deletions pkg/store/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,25 @@ var (
ulidSize = uint64(len(ulid.ULID{}))
)

type PostingsCodec string

const (
CodecHeaderSnappy PostingsCodec = "dvs" // As in "diff+varint+snappy".
CodecHeaderStreamedSnappy PostingsCodec = "dss" // As in "diffvarint+streamed snappy".
)

// IndexCache is the interface exported by index cache backends.
// Store operations do not support context.Context, deadlines need to be
// supported by the backends themselves. This is because Set operations are
// run async and it does not make sense to attach same context
// (potentially with a deadline) as in the original user's request.
type IndexCache interface {
// StorePostings stores postings for a single series.
StorePostings(blockID ulid.ULID, l labels.Label, v []byte)
StorePostings(blockID ulid.ULID, l labels.Label, v []byte, codec PostingsCodec)

// FetchMultiPostings fetches multiple postings - each identified by a label -
// and returns a map containing cache hits, along with a list of missing keys.
FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label)
FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label, codec PostingsCodec) (hits map[labels.Label][]byte, misses []labels.Label)

// StoreSeries stores a single series.
StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte)
Expand Down
4 changes: 2 additions & 2 deletions pkg/store/cache/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,13 +289,13 @@ func copyToKey(l labels.Label) cacheKeyPostings {

// StorePostings sets the postings identified by the ulid and label to the value v,
// if the postings already exists in the cache it is not mutated.
func (c *InMemoryIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) {
func (c *InMemoryIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, _ PostingsCodec) {
c.set(cacheTypePostings, cacheKey{block: blockID.String(), key: copyToKey(l)}, v)
}

// FetchMultiPostings fetches multiple postings - each identified by a label -
// and returns a map containing cache hits, along with a list of missing keys.
func (c *InMemoryIndexCache) FetchMultiPostings(_ context.Context, blockID ulid.ULID, keys []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) {
func (c *InMemoryIndexCache) FetchMultiPostings(_ context.Context, blockID ulid.ULID, keys []labels.Label, _ PostingsCodec) (hits map[labels.Label][]byte, misses []labels.Label) {
hits = map[labels.Label][]byte{}

blockIDKey := blockID.String()
Expand Down
44 changes: 22 additions & 22 deletions pkg/store/cache/inmemory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,14 @@ func TestInMemoryIndexCache_AvoidsDeadlock(t *testing.T) {
testutil.Ok(t, err)
cache.lru = l

cache.StorePostings(ulid.MustNew(0, nil), labels.Label{Name: "test2", Value: "1"}, []byte{42, 33, 14, 67, 11})
cache.StorePostings(ulid.MustNew(0, nil), labels.Label{Name: "test2", Value: "1"}, []byte{42, 33, 14, 67, 11}, CodecHeaderStreamedSnappy)

testutil.Equals(t, uint64(sliceHeaderSize+5), cache.curSize)
testutil.Equals(t, float64(cache.curSize), promtest.ToFloat64(cache.currentSize.WithLabelValues(cacheTypePostings)))
testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings)))

// This triggers deadlock logic.
cache.StorePostings(ulid.MustNew(0, nil), labels.Label{Name: "test1", Value: "1"}, []byte{42})
cache.StorePostings(ulid.MustNew(0, nil), labels.Label{Name: "test1", Value: "1"}, []byte{42}, CodecHeaderStreamedSnappy)

testutil.Equals(t, uint64(sliceHeaderSize+1), cache.curSize)
testutil.Equals(t, float64(cache.curSize), promtest.ToFloat64(cache.currentSize.WithLabelValues(cacheTypePostings)))
Expand Down Expand Up @@ -131,9 +131,9 @@ func TestInMemoryIndexCache_UpdateItem(t *testing.T) {
}{
{
typ: cacheTypePostings,
set: func(id storage.SeriesRef, b []byte) { cache.StorePostings(uid(id), lbl, b) },
set: func(id storage.SeriesRef, b []byte) { cache.StorePostings(uid(id), lbl, b, CodecHeaderStreamedSnappy) },
get: func(id storage.SeriesRef) ([]byte, bool) {
hits, _ := cache.FetchMultiPostings(ctx, uid(id), []labels.Label{lbl})
hits, _ := cache.FetchMultiPostings(ctx, uid(id), []labels.Label{lbl}, CodecHeaderStreamedSnappy)
b, ok := hits[lbl]

return b, ok
Expand Down Expand Up @@ -210,9 +210,9 @@ func TestInMemoryIndexCache_MaxNumberOfItemsHit(t *testing.T) {

id := ulid.MustNew(0, nil)

cache.StorePostings(id, labels.Label{Name: "test", Value: "123"}, []byte{42, 33})
cache.StorePostings(id, labels.Label{Name: "test", Value: "124"}, []byte{42, 33})
cache.StorePostings(id, labels.Label{Name: "test", Value: "125"}, []byte{42, 33})
cache.StorePostings(id, labels.Label{Name: "test", Value: "123"}, []byte{42, 33}, CodecHeaderStreamedSnappy)
cache.StorePostings(id, labels.Label{Name: "test", Value: "124"}, []byte{42, 33}, CodecHeaderStreamedSnappy)
cache.StorePostings(id, labels.Label{Name: "test", Value: "125"}, []byte{42, 33}, CodecHeaderStreamedSnappy)

testutil.Equals(t, uint64(2*sliceHeaderSize+4), cache.curSize)
testutil.Equals(t, float64(0), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypePostings)))
Expand Down Expand Up @@ -243,12 +243,12 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) {
emptySeriesHits := map[storage.SeriesRef][]byte{}
emptySeriesMisses := []storage.SeriesRef(nil)

pHits, pMisses := cache.FetchMultiPostings(ctx, id, []labels.Label{lbls})
pHits, pMisses := cache.FetchMultiPostings(ctx, id, []labels.Label{lbls}, CodecHeaderStreamedSnappy)
testutil.Equals(t, emptyPostingsHits, pHits, "no such key")
testutil.Equals(t, []labels.Label{lbls}, pMisses)

// Add sliceHeaderSize + 2 bytes.
cache.StorePostings(id, lbls, []byte{42, 33})
cache.StorePostings(id, lbls, []byte{42, 33}, CodecHeaderStreamedSnappy)
testutil.Equals(t, uint64(sliceHeaderSize+2), cache.curSize)
testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings)))
testutil.Equals(t, float64(sliceHeaderSize+2), promtest.ToFloat64(cache.currentSize.WithLabelValues(cacheTypePostings)))
Expand All @@ -261,15 +261,15 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) {
testutil.Equals(t, float64(0), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings)))
testutil.Equals(t, float64(0), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries)))

pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls})
pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls}, CodecHeaderStreamedSnappy)
testutil.Equals(t, map[labels.Label][]byte{lbls: {42, 33}}, pHits, "key exists")
testutil.Equals(t, emptyPostingsMisses, pMisses)

pHits, pMisses = cache.FetchMultiPostings(ctx, ulid.MustNew(1, nil), []labels.Label{lbls})
pHits, pMisses = cache.FetchMultiPostings(ctx, ulid.MustNew(1, nil), []labels.Label{lbls}, CodecHeaderStreamedSnappy)
testutil.Equals(t, emptyPostingsHits, pHits, "no such key")
testutil.Equals(t, []labels.Label{lbls}, pMisses)

pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{{Name: "test", Value: "124"}})
pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{{Name: "test", Value: "124"}}, CodecHeaderStreamedSnappy)
testutil.Equals(t, emptyPostingsHits, pHits, "no such key")
testutil.Equals(t, []labels.Label{{Name: "test", Value: "124"}}, pMisses)

Expand Down Expand Up @@ -298,7 +298,7 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) {
for i := 0; i < sliceHeaderSize; i++ {
v = append(v, 3)
}
cache.StorePostings(id, lbls2, v)
cache.StorePostings(id, lbls2, v, CodecHeaderStreamedSnappy)

testutil.Equals(t, uint64(2*sliceHeaderSize+5), cache.curSize)
testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings)))
Expand All @@ -313,20 +313,20 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) {
testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries))) // Eviction.

// Evicted.
pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls})
pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls}, CodecHeaderStreamedSnappy)
testutil.Equals(t, emptyPostingsHits, pHits, "no such key")
testutil.Equals(t, []labels.Label{lbls}, pMisses)

sHits, sMisses = cache.FetchMultiSeries(ctx, id, []storage.SeriesRef{1234})
testutil.Equals(t, emptySeriesHits, sHits, "no such key")
testutil.Equals(t, []storage.SeriesRef{1234}, sMisses)

pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls2})
pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls2}, CodecHeaderStreamedSnappy)
testutil.Equals(t, map[labels.Label][]byte{lbls2: v}, pHits)
testutil.Equals(t, emptyPostingsMisses, pMisses)

// Add same item again.
cache.StorePostings(id, lbls2, v)
cache.StorePostings(id, lbls2, v, CodecHeaderStreamedSnappy)

testutil.Equals(t, uint64(2*sliceHeaderSize+5), cache.curSize)
testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings)))
Expand All @@ -340,12 +340,12 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) {
testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings)))
testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries)))

pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls2})
pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls2}, CodecHeaderStreamedSnappy)
testutil.Equals(t, map[labels.Label][]byte{lbls2: v}, pHits)
testutil.Equals(t, emptyPostingsMisses, pMisses)

// Add too big item.
cache.StorePostings(id, labels.Label{Name: "test", Value: "toobig"}, append(v, 5))
cache.StorePostings(id, labels.Label{Name: "test", Value: "toobig"}, append(v, 5), CodecHeaderStreamedSnappy)
testutil.Equals(t, uint64(2*sliceHeaderSize+5), cache.curSize)
testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings)))
testutil.Equals(t, float64(2*sliceHeaderSize+5), promtest.ToFloat64(cache.currentSize.WithLabelValues(cacheTypePostings)))
Expand Down Expand Up @@ -378,7 +378,7 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) {

lbls3 := labels.Label{Name: "test", Value: "124"}

cache.StorePostings(id, lbls3, []byte{})
cache.StorePostings(id, lbls3, []byte{}, CodecHeaderStreamedSnappy)

testutil.Equals(t, uint64(sliceHeaderSize), cache.curSize)
testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings)))
Expand All @@ -392,13 +392,13 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) {
testutil.Equals(t, float64(2), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings)))
testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries)))

pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls3})
pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls3}, CodecHeaderStreamedSnappy)
testutil.Equals(t, map[labels.Label][]byte{lbls3: {}}, pHits, "key exists")
testutil.Equals(t, emptyPostingsMisses, pMisses)

// nil works and still allocates empty slice.
lbls4 := labels.Label{Name: "test", Value: "125"}
cache.StorePostings(id, lbls4, []byte(nil))
cache.StorePostings(id, lbls4, []byte(nil), CodecHeaderStreamedSnappy)

testutil.Equals(t, 2*uint64(sliceHeaderSize), cache.curSize)
testutil.Equals(t, float64(2), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings)))
Expand All @@ -412,7 +412,7 @@ func TestInMemoryIndexCache_Eviction_WithMetrics(t *testing.T) {
testutil.Equals(t, float64(2), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings)))
testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries)))

pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls4})
pHits, pMisses = cache.FetchMultiPostings(ctx, id, []labels.Label{lbls4}, CodecHeaderStreamedSnappy)
testutil.Equals(t, map[labels.Label][]byte{lbls4: {}}, pHits, "key exists")
testutil.Equals(t, emptyPostingsMisses, pMisses)

Expand Down
13 changes: 10 additions & 3 deletions pkg/store/cache/memcached.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,11 @@ func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheCli
// StorePostings sets the postings identified by the ulid and label to the value v.
// The function enqueues the request and returns immediately: the entry will be
// asynchronously stored in the cache.
func (c *RemoteIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) {
func (c *RemoteIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, codec PostingsCodec) {
key := cacheKey{blockID.String(), cacheKeyPostings(l)}.string()
if codec == CodecHeaderStreamedSnappy {
key += ":" + string(codec)
}

if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil {
level.Error(c.logger).Log("msg", "failed to cache postings in memcached", "err", err)
Expand All @@ -74,12 +77,16 @@ func (c *RemoteIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []
// FetchMultiPostings fetches multiple postings - each identified by a label -
// and returns a map containing cache hits, along with a list of missing keys.
// In case of error, it logs and return an empty cache hits map.
func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, lbls []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) {
func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, lbls []labels.Label, codec PostingsCodec) (hits map[labels.Label][]byte, misses []labels.Label) {
keys := make([]string, 0, len(lbls))

blockIDKey := blockID.String()
suffix := ""
if codec == CodecHeaderStreamedSnappy {
suffix = ":" + string(codec)
}
for _, lbl := range lbls {
key := cacheKey{blockIDKey, cacheKeyPostings(lbl)}.string()
key := cacheKey{blockIDKey, cacheKeyPostings(lbl)}.string() + suffix
keys = append(keys, key)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/store/cache/memcached_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ func TestMemcachedIndexCache_FetchMultiPostings(t *testing.T) {
// Store the postings expected before running the test.
ctx := context.Background()
for _, p := range testData.setup {
c.StorePostings(p.block, p.label, p.value)
c.StorePostings(p.block, p.label, p.value, CodecHeaderStreamedSnappy)
}

// Fetch postings from cached and assert on it.
hits, misses := c.FetchMultiPostings(ctx, testData.fetchBlockID, testData.fetchLabels)
hits, misses := c.FetchMultiPostings(ctx, testData.fetchBlockID, testData.fetchLabels, CodecHeaderStreamedSnappy)
testutil.Equals(t, testData.expectedHits, hits)
testutil.Equals(t, testData.expectedMisses, misses)

Expand Down
Loading

0 comments on commit 33f6924

Please sign in to comment.