From 03441a3faeaef3720b54281411974adc69b5ecda Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Tue, 4 Apr 2023 20:58:41 +0200 Subject: [PATCH 01/18] Cache unapplied matchers in ExpandedPostings Signed-off-by: Dimitar Dimitrov --- pkg/storegateway/bucket.go | 7 +- pkg/storegateway/bucket_index_postings.go | 79 +++++--- pkg/storegateway/bucket_index_reader.go | 188 +++++++++++++------ pkg/storegateway/bucket_test.go | 43 +++-- pkg/storegateway/indexcache/cache.go | 4 +- pkg/storegateway/indexcache/inmemory.go | 15 +- pkg/storegateway/indexcache/inmemory_test.go | 4 +- pkg/storegateway/indexcache/remote.go | 12 +- pkg/storegateway/indexcache/remote_test.go | 52 ++--- pkg/storegateway/indexcache/tracing.go | 9 +- pkg/storegateway/postings_codec.go | 146 +++++++++++++- pkg/storegateway/postings_codec_test.go | 104 +++++++++- 12 files changed, 505 insertions(+), 158 deletions(-) diff --git a/pkg/storegateway/bucket.go b/pkg/storegateway/bucket.go index bd6a390f963..7909f122ceb 100644 --- a/pkg/storegateway/bucket.go +++ b/pkg/storegateway/bucket.go @@ -147,10 +147,10 @@ func (noopCache) FetchMultiSeriesForRefs(_ context.Context, _ string, _ ulid.ULI return map[storage.SeriesRef][]byte{}, ids } -func (c noopCache) StoreExpandedPostings(_ string, _ ulid.ULID, _ indexcache.LabelMatchersKey, _ []byte) { +func (c noopCache) StoreExpandedPostings(_ string, _ ulid.ULID, _ indexcache.LabelMatchersKey, _ string, _ []byte) { } -func (c noopCache) FetchExpandedPostings(_ context.Context, _ string, _ ulid.ULID, _ indexcache.LabelMatchersKey) ([]byte, bool) { +func (c noopCache) FetchExpandedPostings(_ context.Context, _ string, _ ulid.ULID, _ indexcache.LabelMatchersKey, _ string) ([]byte, bool) { return nil, false } @@ -1482,7 +1482,8 @@ func (b *bucketBlock) chunkRangeReader(ctx context.Context, seq int, off, length func (b *bucketBlock) indexReader() *bucketIndexReader { b.pendingReaders.Add(1) - return newBucketIndexReader(b) + // This will be replaced with a strategy selected via a CLI flag. + return newBucketIndexReader(b, selectAllStrategy{}) } func (b *bucketBlock) chunkReader(ctx context.Context) *bucketChunkReader { diff --git a/pkg/storegateway/bucket_index_postings.go b/pkg/storegateway/bucket_index_postings.go index c140a59cc6c..17cef1efab3 100644 --- a/pkg/storegateway/bucket_index_postings.go +++ b/pkg/storegateway/bucket_index_postings.go @@ -31,9 +31,9 @@ type rawPostingGroup struct { labelName string keys []labels.Label - isLazy bool - lazyMatcher func(string) bool - prefix string + isLazy bool + originalMatcher *labels.Matcher + prefix string } func newRawIntersectingPostingGroup(labelName string, keys []labels.Label) rawPostingGroup { @@ -52,70 +52,85 @@ func newRawSubtractingPostingGroup(labelName string, keys []labels.Label) rawPos } } -func newLazyIntersectingPostingGroup(labelName string, prefix string, matcher func(string) bool) rawPostingGroup { +func newLazyIntersectingPostingGroup(m *labels.Matcher) rawPostingGroup { return rawPostingGroup{ - isLazy: true, - isSubtract: false, - labelName: labelName, - lazyMatcher: matcher, - prefix: prefix, + isLazy: true, + isSubtract: false, + labelName: m.Name, + prefix: m.Prefix(), + originalMatcher: m, } } -func newLazySubtractingPostingGroup(labelName string, prefix string, matcher func(string) bool) rawPostingGroup { +func newLazySubtractingPostingGroup(m *labels.Matcher) rawPostingGroup { return rawPostingGroup{ - isLazy: true, - isSubtract: true, - labelName: labelName, - lazyMatcher: matcher, - prefix: prefix, + isLazy: true, + isSubtract: true, + labelName: m.Name, + prefix: m.Prefix(), + originalMatcher: m, } } // toPostingGroup returns a postingGroup which shares the underlying keys slice with g. // This means that after calling toPostingGroup g.keys will be modified. func (g rawPostingGroup) toPostingGroup(r indexheader.Reader) (postingGroup, error) { - var keys []labels.Label + var ( + keys []labels.Label + totalSize int64 + ) if g.isLazy { - vals, err := r.LabelValues(g.labelName, g.prefix, g.lazyMatcher) + filter := g.originalMatcher.Matches + if g.isSubtract { + filter = not(filter) + } + vals, err := r.LabelValuesOffsets(g.labelName, g.prefix, filter) if err != nil { return postingGroup{}, err } keys = make([]labels.Label, len(vals)) for i := range vals { - keys[i] = labels.Label{Name: g.labelName, Value: vals[i]} + keys[i] = labels.Label{Name: g.labelName, Value: vals[i].LabelValue} + totalSize += vals[i].Off.End - vals[i].Off.Start } } else { var err error - keys, err = g.filterNonExistingKeys(r) + keys, totalSize, err = g.filterNonExistingKeys(r) if err != nil { return postingGroup{}, errors.Wrap(err, "filter posting keys") } } return postingGroup{ - isSubtract: g.isSubtract, - keys: keys, + isSubtract: g.isSubtract, + originalMatcher: g.originalMatcher, + keys: keys, + totalSize: totalSize, }, nil } // filterNonExistingKeys uses the indexheader.Reader to filter out any label values that do not exist in this index. // modifies the underlying keys slice of the group. Do not use the rawPostingGroup after calling toPostingGroup. -func (g rawPostingGroup) filterNonExistingKeys(r indexheader.Reader) ([]labels.Label, error) { - writeIdx := 0 +func (g rawPostingGroup) filterNonExistingKeys(r indexheader.Reader) ([]labels.Label, int64, error) { + var ( + writeIdx int + totalSize int64 + ) for _, l := range g.keys { - if _, err := r.PostingsOffset(l.Name, l.Value); errors.Is(err, indexheader.NotFoundRangeErr) { + offset, err := r.PostingsOffset(l.Name, l.Value) + if errors.Is(err, indexheader.NotFoundRangeErr) { // This label name and value doesn't exist in this block, so there are 0 postings we can match. // Try with the rest of the set matchers, maybe they can match some series. // Continue so we overwrite it next time there's an existing value. continue } else if err != nil { - return nil, err + return nil, 0, err } g.keys[writeIdx] = l writeIdx++ + totalSize += offset.End - offset.Start } - return g.keys[:writeIdx], nil + return g.keys[:writeIdx], totalSize, nil } func toRawPostingGroup(m *labels.Matcher) rawPostingGroup { @@ -153,12 +168,12 @@ func toRawPostingGroup(m *labels.Matcher) rawPostingGroup { // have the label name set too. See: https://github.com/prometheus/prometheus/issues/3575 // and https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555. if m.Matches("") { - return newLazySubtractingPostingGroup(m.Name, m.Prefix(), not(m.Matches)) + return newLazySubtractingPostingGroup(m) } // Our matcher does not match the empty value, so we just need the postings that correspond // to label values matched by the matcher. - return newLazyIntersectingPostingGroup(m.Name, m.Prefix(), m.Matches) + return newLazyIntersectingPostingGroup(m) } func not(filter func(string) bool) func(string) bool { @@ -171,8 +186,12 @@ func not(filter func(string) bool) func(string) bool { // All the labels in keys should have a corresponding postings list in the index. // This computation happens in expandedPostings. type postingGroup struct { - isSubtract bool - keys []labels.Label + isSubtract bool + originalMatcher *labels.Matcher + keys []labels.Label + + // totalSize is the size in bytes of all the posting lists for keys + totalSize int64 } type postingPtr struct { diff --git a/pkg/storegateway/bucket_index_reader.go b/pkg/storegateway/bucket_index_reader.go index 3b3ce934c0c..0950a529aea 100644 --- a/pkg/storegateway/bucket_index_reader.go +++ b/pkg/storegateway/bucket_index_reader.go @@ -32,18 +32,35 @@ import ( // expandedPostingsPromise is the promise returned by bucketIndexReader.expandedPostingsPromise. // The second return value indicates whether the returned data comes from the cache. -type expandedPostingsPromise func(ctx context.Context) ([]storage.SeriesRef, bool, error) +type expandedPostingsPromise func(ctx context.Context) ([]storage.SeriesRef, []*labels.Matcher, bool, error) + +type postingsSelectionStrategy interface { + name() string + selectPostings([]postingGroup) (selected, omitted []postingGroup) +} + +type selectAllStrategy struct{} + +func (selectAllStrategy) name() string { + return "selectAll" +} + +func (selectAllStrategy) selectPostings(groups []postingGroup) (selected, omitted []postingGroup) { + return groups, nil +} // bucketIndexReader is a custom index reader (not conforming index.Reader interface) that reads index that is stored in // object storage without having to fully download it. type bucketIndexReader struct { - block *bucketBlock - dec *index.Decoder + block *bucketBlock + postingsStrat postingsSelectionStrategy + dec *index.Decoder } -func newBucketIndexReader(block *bucketBlock) *bucketIndexReader { +func newBucketIndexReader(block *bucketBlock, postingsStrat postingsSelectionStrategy) *bucketIndexReader { r := &bucketIndexReader{ - block: block, + block: block, + postingsStrat: postingsStrat, dec: &index.Decoder{ LookupSymbol: block.indexHeaderReader.LookupSymbol, }, @@ -62,8 +79,10 @@ func newBucketIndexReader(block *bucketBlock) *bucketIndexReader { // single label name=value. func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.Matcher, stats *safeQueryStats) (returnRefs []storage.SeriesRef, returnErr error) { var ( - loaded bool - cached bool + loaded bool + cached bool + promise expandedPostingsPromise + unappliedMatchers []*labels.Matcher ) span, ctx := tracing.StartSpan(ctx, "ExpandedPostings()") defer func() { @@ -73,9 +92,15 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M } span.Finish() }() - var promise expandedPostingsPromise promise, loaded = r.expandedPostingsPromise(ctx, ms, stats) - returnRefs, cached, returnErr = promise(ctx) + returnRefs, unappliedMatchers, cached, returnErr = promise(ctx) + if len(unappliedMatchers) > 0 { + // This shouldn't be reached since we do not have any postingsSelectionStrategy at the moment + // and all matchers should be applied via posting lists. + // This will change once the clients of ExpandedPostings can work with unapplied matchers. + return nil, fmt.Errorf("there are unapplied matchers (%s) for query (%s)", indexcache.CanonicalLabelMatchersKey(unappliedMatchers), indexcache.CanonicalLabelMatchersKey(ms)) + } + return returnRefs, returnErr } @@ -88,28 +113,29 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M // TODO: https://github.com/grafana/mimir/issues/331 func (r *bucketIndexReader) expandedPostingsPromise(ctx context.Context, ms []*labels.Matcher, stats *safeQueryStats) (promise expandedPostingsPromise, loaded bool) { var ( - refs []storage.SeriesRef - err error - done = make(chan struct{}) - cached bool + refs []storage.SeriesRef + unappliedMatchers []*labels.Matcher + err error + done = make(chan struct{}) + cached bool ) - promise = func(ctx context.Context) ([]storage.SeriesRef, bool, error) { + promise = func(ctx context.Context) ([]storage.SeriesRef, []*labels.Matcher, bool, error) { select { case <-ctx.Done(): - return nil, false, ctx.Err() + return nil, nil, false, ctx.Err() case <-done: } if err != nil { - return nil, false, err + return nil, nil, false, err } // We must make a copy of refs to return, because caller can modify the postings slice in place. refsCopy := make([]storage.SeriesRef, len(refs)) copy(refsCopy, refs) - return refsCopy, cached, nil + return refsCopy, unappliedMatchers, cached, nil } key := indexcache.CanonicalLabelMatchersKey(ms) @@ -122,62 +148,66 @@ func (r *bucketIndexReader) expandedPostingsPromise(ctx context.Context, ms []*l defer close(done) defer r.block.expandedPostingsPromises.Delete(key) - refs, cached = r.fetchCachedExpandedPostings(ctx, r.block.userID, key, stats) + refs, unappliedMatchers, cached = r.fetchCachedExpandedPostings(ctx, r.block.userID, key, stats) if cached { return promise, false } - refs, err = r.expandedPostings(ctx, ms, stats) + refs, unappliedMatchers, err = r.expandedPostings(ctx, ms, stats) if err != nil { return promise, false } - r.cacheExpandedPostings(r.block.userID, key, refs) + r.cacheExpandedPostings(r.block.userID, key, refs, unappliedMatchers) return promise, false } -func (r *bucketIndexReader) cacheExpandedPostings(userID string, key indexcache.LabelMatchersKey, refs []storage.SeriesRef) { - data, err := diffVarintSnappyEncode(index.NewListPostings(refs), len(refs)) +func (r *bucketIndexReader) cacheExpandedPostings(userID string, key indexcache.LabelMatchersKey, refs []storage.SeriesRef, unappliedMatchers []*labels.Matcher) { + data, err := diffVarintSnappyMatchersEncode(index.NewListPostings(refs), len(refs), unappliedMatchers) if err != nil { level.Warn(r.block.logger).Log("msg", "can't encode expanded postings cache", "err", err, "matchers_key", key, "block", r.block.meta.ULID) return } - r.block.indexCache.StoreExpandedPostings(userID, r.block.meta.ULID, key, data) + r.block.indexCache.StoreExpandedPostings(userID, r.block.meta.ULID, key, r.postingsStrat.name(), data) } -func (r *bucketIndexReader) fetchCachedExpandedPostings(ctx context.Context, userID string, key indexcache.LabelMatchersKey, stats *safeQueryStats) ([]storage.SeriesRef, bool) { - data, ok := r.block.indexCache.FetchExpandedPostings(ctx, userID, r.block.meta.ULID, key) +func (r *bucketIndexReader) fetchCachedExpandedPostings(ctx context.Context, userID string, key indexcache.LabelMatchersKey, stats *safeQueryStats) ([]storage.SeriesRef, []*labels.Matcher, bool) { + data, ok := r.block.indexCache.FetchExpandedPostings(ctx, userID, r.block.meta.ULID, key, r.postingsStrat.name()) if !ok { - return nil, false + return nil, nil, false } - p, err := r.decodePostings(data, stats) + p, unappliedMatchers, err := r.decodePostings(data, stats) if err != nil { level.Warn(r.block.logger).Log("msg", "can't decode expanded postings cache", "err", err, "matchers_key", key, "block", r.block.meta.ULID) - return nil, false + return nil, nil, false } refs, err := index.ExpandPostings(p) if err != nil { level.Warn(r.block.logger).Log("msg", "can't expand decoded expanded postings cache", "err", err, "matchers_key", key, "block", r.block.meta.ULID) - return nil, false + return nil, nil, false } - return refs, true + return refs, unappliedMatchers, true } // expandedPostings is the main logic of ExpandedPostings, without the promise wrapper. -func (r *bucketIndexReader) expandedPostings(ctx context.Context, ms []*labels.Matcher, stats *safeQueryStats) (returnRefs []storage.SeriesRef, returnErr error) { - postingGroups, keys, err := toPostingGroups(ms, r.block.indexHeaderReader) +func (r *bucketIndexReader) expandedPostings(ctx context.Context, ms []*labels.Matcher, stats *safeQueryStats) (returnRefs []storage.SeriesRef, unappliedMatchers []*labels.Matcher, returnErr error) { + postingGroups, err := toPostingGroups(ms, r.block.indexHeaderReader) if err != nil { - return nil, errors.Wrap(err, "toPostingGroups") + return nil, nil, errors.Wrap(err, "toPostingGroups") } - if len(keys) == 0 { - return nil, nil + if len(postingGroups) == 0 { + return nil, nil, nil } - fetchedPostings, err := r.fetchPostings(ctx, keys, stats) + postingGroups, omittedPostingGroups := r.postingsStrat.selectPostings(postingGroups) + + fetchedPostings, err := r.fetchPostings(ctx, extractLabels(postingGroups), stats) if err != nil { - return nil, errors.Wrap(err, "get postings") + return nil, nil, errors.Wrap(err, "get postings") } + // The order of the fetched postings is the same as the order of the requested keys. + // This is guaranteed by extractLabels. postingIndex := 0 var groupAdds, groupRemovals []index.Postings @@ -202,14 +232,14 @@ func (r *bucketIndexReader) expandedPostings(ctx context.Context, ms []*labels.M ps, err := index.ExpandPostings(result) if err != nil { - return nil, errors.Wrap(err, "expand") + return nil, nil, errors.Wrap(err, "expand") } // As of version two all series entries are 16 byte padded. All references // we get have to account for that to get the correct offset. version, err := r.block.indexHeaderReader.IndexVersion() if err != nil { - return nil, errors.Wrap(err, "get index version") + return nil, nil, errors.Wrap(err, "get index version") } if version >= 2 { for i, id := range ps { @@ -217,13 +247,36 @@ func (r *bucketIndexReader) expandedPostings(ctx context.Context, ms []*labels.M } } - return ps, nil + return ps, extractLabelMatchers(omittedPostingGroups), nil +} + +func extractLabelMatchers(groups []postingGroup) []*labels.Matcher { + if len(groups) == 0 { + return nil + } + m := make([]*labels.Matcher, len(groups)) + for i := range groups { + m[i] = groups[i].originalMatcher + } + return m +} + +// extractLabels returns the keys of the posting groups in the that they are found in each posting group. +func extractLabels(groups []postingGroup) []labels.Label { + numKeys := 0 + for _, pg := range groups { + numKeys += len(pg.keys) + } + keys := make([]labels.Label, 0, numKeys) + for _, pg := range groups { + keys = append(keys, pg.keys...) + } + return keys } // toPostingGroups returns a set of labels for which to look up postings lists. It guarantees that -// each postingGroup's keys exist in the index. The order of the returned labels -// is the same as iterating through the posting groups and for each group adding all keys. -func toPostingGroups(ms []*labels.Matcher, indexhdr indexheader.Reader) ([]postingGroup, []labels.Label, error) { +// each postingGroup's keys exist in the index. +func toPostingGroups(ms []*labels.Matcher, indexhdr indexheader.Reader) ([]postingGroup, error) { var ( rawPostingGroups = make([]rawPostingGroup, 0, len(ms)) allRequested = false @@ -262,22 +315,20 @@ func toPostingGroups(ms []*labels.Matcher, indexhdr indexheader.Reader) ([]posti // Next we check whether the posting groups won't select an empty set of postings. // Based on the previous sorting, we start with the ones that have a known set of values because it's less expensive to check them in // the index header. - numKeys := 0 for _, rawGroup := range rawPostingGroups { pg, err := rawGroup.toPostingGroup(indexhdr) if err != nil { - return nil, nil, errors.Wrap(err, "filtering posting group") + return nil, errors.Wrap(err, "filtering posting group") } // If this group has no keys to work though and is not a subtract group, then it's an empty group. // We can shortcut this, since intersection with empty postings would return no postings. // E.g. `label="non-existent-value"` returns empty group. if !pg.isSubtract && len(pg.keys) == 0 { - return nil, nil, nil + return nil, nil } postingGroups = append(postingGroups, pg) - numKeys += len(pg.keys) // If the group is a subtraction group, we must fetch all postings and remove the ones that this matcher selects. allRequested = allRequested || pg.isSubtract hasAdds = hasAdds || !pg.isSubtract @@ -291,7 +342,6 @@ func toPostingGroups(ms []*labels.Matcher, indexhdr indexheader.Reader) ([]posti allPostingsLabel := labels.Label{Name: name, Value: value} postingGroups = append(postingGroups, postingGroup{isSubtract: false, keys: []labels.Label{allPostingsLabel}}) - numKeys++ } // If hasAdds is false, then there were no posting lists for any labels that we will intersect. @@ -299,15 +349,10 @@ func toPostingGroups(ms []*labels.Matcher, indexhdr indexheader.Reader) ([]posti // Shortcut doing any set operations and just return an empty set here. // A query that might end up in this case is `{pod=~"non-existent-value.*"}`. if !allRequested && !hasAdds { - return nil, nil, nil - } - - keys := make([]labels.Label, 0, numKeys) - for _, pg := range postingGroups { - keys = append(keys, pg.keys...) + return nil, nil } - return postingGroups, keys, nil + return postingGroups, nil } // FetchPostings fills postings requested by posting groups. @@ -359,7 +404,11 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab stats.postingsTouchedSizeSum += len(b) }) - l, err := r.decodePostings(b, stats) + l, unappliedMatchers, err := r.decodePostings(b, stats) + if len(unappliedMatchers) > 0 { + return nil, fmt.Errorf("not expecting matchers on non-expanded postings for %s=%s in block %s, but got %s", + key.Name, key.Value, r.block.meta.ULID, indexcache.CanonicalLabelMatchersKey(unappliedMatchers)) + } if err == nil { output[ix] = l continue @@ -482,14 +531,17 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab return output, g.Wait() } -func (r *bucketIndexReader) decodePostings(b []byte, stats *safeQueryStats) (index.Postings, error) { +// decodePostings may retain pointers to the buffer. +func (r *bucketIndexReader) decodePostings(b []byte, stats *safeQueryStats) (index.Postings, []*labels.Matcher, error) { // Even if this instance is not using compression, there may be compressed // entries in the cache written by other stores. var ( - l index.Postings - err error + l index.Postings + matchers []*labels.Matcher + err error ) - if isDiffVarintSnappyEncodedPostings(b) { + switch { + case isDiffVarintSnappyEncodedPostings(b): s := time.Now() l, err = diffVarintSnappyDecode(b) @@ -500,10 +552,22 @@ func (r *bucketIndexReader) decodePostings(b []byte, stats *safeQueryStats) (ind stats.cachedPostingsDecompressionErrors++ } }) - } else { + + case isDiffVarintSnappyWithmatchersEncodedPostings(b): + s := time.Now() + l, matchers, err = diffVarintSnappyMatchersDecode(b) + + stats.update(func(stats *queryStats) { + stats.cachedPostingsDecompressions++ + stats.cachedPostingsDecompressionTimeSum += time.Since(s) + if err != nil { + stats.cachedPostingsDecompressionErrors++ + } + }) + default: _, l, err = r.dec.Postings(b) } - return l, err + return l, matchers, err } func (r *bucketIndexReader) preloadSeries(ctx context.Context, ids []storage.SeriesRef, stats *safeQueryStats) (*bucketIndexLoadedSeries, error) { span, ctx := tracing.StartSpan(ctx, "preloadSeries()") diff --git a/pkg/storegateway/bucket_test.go b/pkg/storegateway/bucket_test.go index 2d026c52bcd..a308abc75a2 100644 --- a/pkg/storegateway/bucket_test.go +++ b/pkg/storegateway/bucket_test.go @@ -62,6 +62,7 @@ import ( "github.com/grafana/mimir/pkg/storegateway/hintspb" "github.com/grafana/mimir/pkg/storegateway/indexcache" "github.com/grafana/mimir/pkg/storegateway/indexheader" + "github.com/grafana/mimir/pkg/storegateway/indexheader/index" "github.com/grafana/mimir/pkg/storegateway/storepb" "github.com/grafana/mimir/pkg/storegateway/testhelper" "github.com/grafana/mimir/pkg/util/pool" @@ -320,8 +321,8 @@ func TestBlockLabelNames(t *testing.T) { t.Run("error with matchers", func(t *testing.T) { b := newTestBucketBlock() b.indexHeaderReader = &interceptedIndexReader{ - Reader: b.indexHeaderReader, - onLabelValuesCalled: func(_ string) error { return context.DeadlineExceeded }, + Reader: b.indexHeaderReader, + onLabelValuesOffsetsCalled: func(_ string) error { return context.DeadlineExceeded }, } b.indexCache = cacheNotExpectingToStoreLabelNames{t: t} @@ -340,7 +341,7 @@ func TestBlockLabelNames(t *testing.T) { onLabelNamesCalled: func() error { return fmt.Errorf("not expected the LabelNames() calls with matchers") }, - onLabelValuesCalled: func(name string) error { + onLabelValuesOffsetsCalled: func(name string) error { expectedCalls-- if expectedCalls < 0 { return fmt.Errorf("didn't expect another index.Reader.LabelValues() call") @@ -405,7 +406,7 @@ func TestBlockLabelValues(t *testing.T) { b := newTestBucketBlock() b.indexHeaderReader = &interceptedIndexReader{ Reader: b.indexHeaderReader, - onLabelValuesCalled: func(name string) error { + onLabelValuesOffsetsCalled: func(name string) error { expectedCalls-- if expectedCalls < 0 { return fmt.Errorf("didn't expect another index.Reader.LabelValues() call") @@ -428,8 +429,8 @@ func TestBlockLabelValues(t *testing.T) { t.Run("error with matchers", func(t *testing.T) { b := newTestBucketBlock() b.indexHeaderReader = &interceptedIndexReader{ - Reader: b.indexHeaderReader, - onLabelValuesCalled: func(_ string) error { return context.DeadlineExceeded }, + Reader: b.indexHeaderReader, + onLabelValuesOffsetsCalled: func(_ string) error { return context.DeadlineExceeded }, } b.indexCache = cacheNotExpectingToStoreLabelValues{t: t} @@ -525,8 +526,8 @@ func TestBucketIndexReader_ExpandedPostings(t *testing.T) { b := newTestBucketBlock() b.indexHeaderReader = &interceptedIndexReader{ - Reader: b.indexHeaderReader, - onLabelValuesCalled: onlabelValuesCalled, + Reader: b.indexHeaderReader, + onLabelValuesOffsetsCalled: onlabelValuesCalled, } // we're building a scenario where: @@ -653,8 +654,8 @@ func TestBucketIndexReader_ExpandedPostings(t *testing.T) { b := newTestBucketBlock() b.indexHeaderReader = &interceptedIndexReader{ - Reader: b.indexHeaderReader, - onLabelValuesCalled: onLabelValuesCalled, + Reader: b.indexHeaderReader, + onLabelValuesOffsetsCalled: onLabelValuesCalled, } b.indexCache = newInMemoryIndexCache(t) @@ -693,7 +694,7 @@ func TestBucketIndexReader_ExpandedPostings(t *testing.T) { b := newTestBucketBlock() b.indexHeaderReader = &interceptedIndexReader{ Reader: b.indexHeaderReader, - onLabelValuesCalled: func(_ string) error { + onLabelValuesOffsetsCalled: func(_ string) error { return context.Canceled // alwaysFails }, } @@ -752,8 +753,9 @@ func newInMemoryIndexCache(t testing.TB) indexcache.IndexCache { type interceptedIndexReader struct { indexheader.Reader - onLabelNamesCalled func() error - onLabelValuesCalled func(name string) error + onLabelNamesCalled func() error + onLabelValuesCalled func(name string) error + onLabelValuesOffsetsCalled func(name string) error } func (iir *interceptedIndexReader) LabelNames() ([]string, error) { @@ -774,6 +776,15 @@ func (iir *interceptedIndexReader) LabelValues(name string, prefix string, filte return iir.Reader.LabelValues(name, prefix, filter) } +func (iir *interceptedIndexReader) LabelValuesOffsets(name string, prefix string, filter func(string) bool) ([]index.PostingListOffset, error) { + if iir.onLabelValuesOffsetsCalled != nil { + if err := iir.onLabelValuesOffsetsCalled(name); err != nil { + return nil, err + } + } + return iir.Reader.LabelValuesOffsets(name, prefix, filter) +} + type contextNotifyingOnDoneWaiting struct { context.Context once sync.Once @@ -789,7 +800,7 @@ func (w *contextNotifyingOnDoneWaiting) Done() <-chan struct{} { type corruptedExpandedPostingsCache struct{ noopCache } -func (c corruptedExpandedPostingsCache) FetchExpandedPostings(ctx context.Context, userID string, blockID ulid.ULID, key indexcache.LabelMatchersKey) ([]byte, bool) { +func (c corruptedExpandedPostingsCache) FetchExpandedPostings(ctx context.Context, userID string, blockID ulid.ULID, key indexcache.LabelMatchersKey, postingsSelectionStrategy string) ([]byte, bool) { return []byte(codecHeaderSnappy + "corrupted"), true } @@ -808,7 +819,7 @@ type cacheNotExpectingToStoreExpandedPostings struct { t *testing.T } -func (c cacheNotExpectingToStoreExpandedPostings) StoreExpandedPostings(userID string, blockID ulid.ULID, key indexcache.LabelMatchersKey, v []byte) { +func (c cacheNotExpectingToStoreExpandedPostings) StoreExpandedPostings(userID string, blockID ulid.ULID, key indexcache.LabelMatchersKey, postingsSelectionStrategy string, v []byte) { c.t.Fatalf("StoreExpandedPostings should not be called") } @@ -953,7 +964,7 @@ func benchmarkExpandedPostings( for _, testCase := range seriesSelectionTestCases(tb, series) { tb.Run(testCase.name, func(tb test.TB) { - indexr := newBucketIndexReader(newTestBucketBlock()) + indexr := newBucketIndexReader(newTestBucketBlock(), selectAllStrategy{}) indexrStats := newSafeQueryStats() tb.ResetTimer() diff --git a/pkg/storegateway/indexcache/cache.go b/pkg/storegateway/indexcache/cache.go index 92d236ce5c0..a29da8ab8d4 100644 --- a/pkg/storegateway/indexcache/cache.go +++ b/pkg/storegateway/indexcache/cache.go @@ -59,10 +59,10 @@ type IndexCache interface { FetchMultiSeriesForRefs(ctx context.Context, userID string, blockID ulid.ULID, ids []storage.SeriesRef) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) // StoreExpandedPostings stores the result of ExpandedPostings, encoded with an unspecified codec. - StoreExpandedPostings(userID string, blockID ulid.ULID, key LabelMatchersKey, v []byte) + StoreExpandedPostings(userID string, blockID ulid.ULID, key LabelMatchersKey, postingsSelectionStrategy string, v []byte) // FetchExpandedPostings fetches the result of ExpandedPostings, encoded with an unspecified codec. - FetchExpandedPostings(ctx context.Context, userID string, blockID ulid.ULID, key LabelMatchersKey) ([]byte, bool) + FetchExpandedPostings(ctx context.Context, userID string, blockID ulid.ULID, key LabelMatchersKey, postingsSelectionStrategy string) ([]byte, bool) // StoreSeriesForPostings stores a series set for the provided postings. StoreSeriesForPostings(userID string, blockID ulid.ULID, shard *sharding.ShardSelector, postingsKey PostingsKey, v []byte) diff --git a/pkg/storegateway/indexcache/inmemory.go b/pkg/storegateway/indexcache/inmemory.go index 9134e3e58b6..c9550c020fe 100644 --- a/pkg/storegateway/indexcache/inmemory.go +++ b/pkg/storegateway/indexcache/inmemory.go @@ -336,13 +336,13 @@ func (c *InMemoryIndexCache) FetchMultiSeriesForRefs(_ context.Context, userID s } // StoreExpandedPostings stores the encoded result of ExpandedPostings for specified matchers identified by the provided LabelMatchersKey. -func (c *InMemoryIndexCache) StoreExpandedPostings(userID string, blockID ulid.ULID, key LabelMatchersKey, v []byte) { - c.set(cacheKeyExpandedPostings{userID, blockID, key}, v) +func (c *InMemoryIndexCache) StoreExpandedPostings(userID string, blockID ulid.ULID, key LabelMatchersKey, postingsSelectionStrategy string, v []byte) { + c.set(cacheKeyExpandedPostings{userID, blockID, key, postingsSelectionStrategy}, v) } // FetchExpandedPostings fetches the encoded result of ExpandedPostings for specified matchers identified by the provided LabelMatchersKey. -func (c *InMemoryIndexCache) FetchExpandedPostings(_ context.Context, userID string, blockID ulid.ULID, key LabelMatchersKey) ([]byte, bool) { - return c.get(cacheKeyExpandedPostings{userID, blockID, key}) +func (c *InMemoryIndexCache) FetchExpandedPostings(_ context.Context, userID string, blockID ulid.ULID, key LabelMatchersKey, postingsSelectionStrategy string) ([]byte, bool) { + return c.get(cacheKeyExpandedPostings{userID, blockID, key, postingsSelectionStrategy}) } // StoreSeriesForPostings stores a series set for the provided postings. @@ -412,9 +412,10 @@ func (c cacheKeySeriesForRef) size() uint64 { // cacheKeyPostings implements cacheKey and is used to reference an expanded postings cache entry in the inmemory cache. type cacheKeyExpandedPostings struct { - userID string - block ulid.ULID - matchersKey LabelMatchersKey + userID string + block ulid.ULID + matchersKey LabelMatchersKey + postingsSelectionStrategy string } func (c cacheKeyExpandedPostings) typ() string { return cacheTypeExpandedPostings } diff --git a/pkg/storegateway/indexcache/inmemory_test.go b/pkg/storegateway/indexcache/inmemory_test.go index ecea60a83d9..4c1ace2af19 100644 --- a/pkg/storegateway/indexcache/inmemory_test.go +++ b/pkg/storegateway/indexcache/inmemory_test.go @@ -162,10 +162,10 @@ func TestInMemoryIndexCache_UpdateItem(t *testing.T) { { typ: cacheTypeExpandedPostings, set: func(id uint64, b []byte) { - cache.StoreExpandedPostings(user, uid(id), CanonicalLabelMatchersKey(matchers), b) + cache.StoreExpandedPostings(user, uid(id), CanonicalLabelMatchersKey(matchers), "strategy", b) }, get: func(id uint64) ([]byte, bool) { - return cache.FetchExpandedPostings(ctx, user, uid(id), CanonicalLabelMatchersKey(matchers)) + return cache.FetchExpandedPostings(ctx, user, uid(id), CanonicalLabelMatchersKey(matchers), "strategy") }, }, { diff --git a/pkg/storegateway/indexcache/remote.go b/pkg/storegateway/indexcache/remote.go index af92aec7f71..609ae6f1950 100644 --- a/pkg/storegateway/indexcache/remote.go +++ b/pkg/storegateway/indexcache/remote.go @@ -206,18 +206,18 @@ func seriesForRefCacheKey(userID string, blockID ulid.ULID, id storage.SeriesRef } // StoreExpandedPostings stores the encoded result of ExpandedPostings for specified matchers identified by the provided LabelMatchersKey. -func (c *RemoteIndexCache) StoreExpandedPostings(userID string, blockID ulid.ULID, lmKey LabelMatchersKey, v []byte) { - c.set(cacheTypeExpandedPostings, expandedPostingsCacheKey(userID, blockID, lmKey), v) +func (c *RemoteIndexCache) StoreExpandedPostings(userID string, blockID ulid.ULID, lmKey LabelMatchersKey, postingsSelectionStrategy string, v []byte) { + c.set(cacheTypeExpandedPostings, expandedPostingsCacheKey(userID, blockID, lmKey, postingsSelectionStrategy), v) } // FetchExpandedPostings fetches the encoded result of ExpandedPostings for specified matchers identified by the provided LabelMatchersKey. -func (c *RemoteIndexCache) FetchExpandedPostings(ctx context.Context, userID string, blockID ulid.ULID, lmKey LabelMatchersKey) ([]byte, bool) { - return c.get(ctx, cacheTypeExpandedPostings, expandedPostingsCacheKey(userID, blockID, lmKey)) +func (c *RemoteIndexCache) FetchExpandedPostings(ctx context.Context, userID string, blockID ulid.ULID, lmKey LabelMatchersKey, postingsSelectionStrategy string) ([]byte, bool) { + return c.get(ctx, cacheTypeExpandedPostings, expandedPostingsCacheKey(userID, blockID, lmKey, postingsSelectionStrategy)) } -func expandedPostingsCacheKey(userID string, blockID ulid.ULID, lmKey LabelMatchersKey) string { +func expandedPostingsCacheKey(userID string, blockID ulid.ULID, lmKey LabelMatchersKey, postingsSelectionStrategy string) string { hash := blake2b.Sum256([]byte(lmKey)) - return "E:" + userID + ":" + blockID.String() + ":" + base64.RawURLEncoding.EncodeToString(hash[0:]) + return "E:" + userID + ":" + blockID.String() + ":" + base64.RawURLEncoding.EncodeToString(hash[0:]) + ":" + postingsSelectionStrategy } // StoreSeriesForPostings stores a series set for the provided postings. diff --git a/pkg/storegateway/indexcache/remote_test.go b/pkg/storegateway/indexcache/remote_test.go index cbefc1c38d1..d1100d083a3 100644 --- a/pkg/storegateway/indexcache/remote_test.go +++ b/pkg/storegateway/indexcache/remote_test.go @@ -241,15 +241,18 @@ func TestRemoteIndexCache_FetchExpandedPostings(t *testing.T) { value1 := []byte{1} value2 := []byte{2} value3 := []byte{3} + postingsStrategy1 := "s1" + postingsStrategy2 := "s2" tests := map[string]struct { - setup []mockedExpandedPostings - mockedErr error - fetchUserID string - fetchBlockID ulid.ULID - fetchKey LabelMatchersKey - expectedData []byte - expectedOk bool + setup []mockedExpandedPostings + mockedErr error + fetchUserID string + fetchBlockID ulid.ULID + fetchKey LabelMatchersKey + fetchStrategy string + expectedData []byte + expectedOk bool }{ "should return no hit on empty cache": { setup: []mockedExpandedPostings{}, @@ -261,16 +264,18 @@ func TestRemoteIndexCache_FetchExpandedPostings(t *testing.T) { }, "should return no miss on hit": { setup: []mockedExpandedPostings{ - {userID: user1, block: block1, matchers: matchers1, value: value1}, - {userID: user2, block: block1, matchers: matchers1, value: value2}, - {userID: user1, block: block1, matchers: matchers2, value: value2}, - {userID: user1, block: block2, matchers: matchers1, value: value3}, + {userID: user1, block: block1, matchers: matchers1, value: value1, postingsStrategy: postingsStrategy1}, + {userID: user2, block: block1, matchers: matchers1, value: value2, postingsStrategy: postingsStrategy1}, + {userID: user1, block: block1, matchers: matchers2, value: value2, postingsStrategy: postingsStrategy1}, + {userID: user1, block: block2, matchers: matchers1, value: value3, postingsStrategy: postingsStrategy1}, + {userID: user1, block: block1, matchers: matchers1, value: value1, postingsStrategy: postingsStrategy2}, }, - fetchUserID: user1, - fetchBlockID: block1, - fetchKey: CanonicalLabelMatchersKey(matchers1), - expectedData: value1, - expectedOk: true, + fetchUserID: user1, + fetchBlockID: block1, + fetchKey: CanonicalLabelMatchersKey(matchers1), + fetchStrategy: postingsStrategy1, + expectedData: value1, + expectedOk: true, }, "should return no hit on remote cache error": { setup: []mockedExpandedPostings{ @@ -296,11 +301,11 @@ func TestRemoteIndexCache_FetchExpandedPostings(t *testing.T) { // Store the postings expected before running the test. ctx := context.Background() for _, p := range testData.setup { - c.StoreExpandedPostings(p.userID, p.block, CanonicalLabelMatchersKey(p.matchers), p.value) + c.StoreExpandedPostings(p.userID, p.block, CanonicalLabelMatchersKey(p.matchers), testData.fetchStrategy, p.value) } // Fetch postings from cached and assert on it. - data, ok := c.FetchExpandedPostings(ctx, testData.fetchUserID, testData.fetchBlockID, testData.fetchKey) + data, ok := c.FetchExpandedPostings(ctx, testData.fetchUserID, testData.fetchBlockID, testData.fetchKey, testData.fetchStrategy) assert.Equal(t, testData.expectedData, data) assert.Equal(t, testData.expectedOk, ok) @@ -700,7 +705,7 @@ func BenchmarkStringCacheKeys(b *testing.B) { b.Run("expanded postings", func(b *testing.B) { for i := 0; i < b.N; i++ { - expandedPostingsCacheKey(userID, uid, lmKey) + expandedPostingsCacheKey(userID, uid, lmKey, "strategy") } }) } @@ -720,10 +725,11 @@ type mockedSeriesForRef struct { } type mockedExpandedPostings struct { - userID string - block ulid.ULID - matchers []*labels.Matcher - value []byte + userID string + block ulid.ULID + matchers []*labels.Matcher + postingsStrategy string + value []byte } type mockedLabelNames struct { diff --git a/pkg/storegateway/indexcache/tracing.go b/pkg/storegateway/indexcache/tracing.go index c08ee02a8e6..3a183c975b1 100644 --- a/pkg/storegateway/indexcache/tracing.go +++ b/pkg/storegateway/indexcache/tracing.go @@ -70,18 +70,19 @@ func (t *TracingIndexCache) FetchMultiSeriesForRefs(ctx context.Context, userID return hits, misses } -func (t *TracingIndexCache) StoreExpandedPostings(userID string, blockID ulid.ULID, key LabelMatchersKey, v []byte) { - t.c.StoreExpandedPostings(userID, blockID, key, v) +func (t *TracingIndexCache) StoreExpandedPostings(userID string, blockID ulid.ULID, key LabelMatchersKey, postingsSelectionStrategy string, v []byte) { + t.c.StoreExpandedPostings(userID, blockID, key, postingsSelectionStrategy, v) } -func (t *TracingIndexCache) FetchExpandedPostings(ctx context.Context, userID string, blockID ulid.ULID, key LabelMatchersKey) ([]byte, bool) { +func (t *TracingIndexCache) FetchExpandedPostings(ctx context.Context, userID string, blockID ulid.ULID, key LabelMatchersKey, postingsSelectionStrategy string) ([]byte, bool) { t0 := time.Now() - data, found := t.c.FetchExpandedPostings(ctx, userID, blockID, key) + data, found := t.c.FetchExpandedPostings(ctx, userID, blockID, key, postingsSelectionStrategy) spanLogger := spanlogger.FromContext(ctx, t.logger) level.Debug(spanLogger).Log( "msg", "IndexCache.FetchExpandedPostings", "requested key", key, + "postings selection strategy", postingsSelectionStrategy, "found", found, "time elapsed", time.Since(t0), "returned bytes", len(data), diff --git a/pkg/storegateway/postings_codec.go b/pkg/storegateway/postings_codec.go index 44ff17e0628..ce8b4337f0c 100644 --- a/pkg/storegateway/postings_codec.go +++ b/pkg/storegateway/postings_codec.go @@ -7,9 +7,14 @@ package storegateway import ( "bytes" + "encoding/binary" + "fmt" + "unsafe" + "github.com/dennwc/varint" "github.com/golang/snappy" "github.com/pkg/errors" + "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/encoding" "github.com/prometheus/prometheus/tsdb/index" @@ -24,8 +29,11 @@ import ( // single byte, values < 16384 are encoded as two bytes). Diff + varint reduces postings size // significantly (to about 20% of original), snappy then halves it to ~10% of the original. +type codec string + const ( - codecHeaderSnappy = "dvs" // As in "diff+varint+snappy". + codecHeaderSnappy codec = "dvs" // As in "diff+varint+snappy". + codecHeaderSnappyWithmatchers codec = "dm" // As in "dvs+matchers" ) // isDiffVarintSnappyEncodedPostings returns true, if input looks like it has been encoded by diff+varint+snappy codec. @@ -86,7 +94,7 @@ func diffVarintEncodeNoHeader(p index.Postings, length int) ([]byte, error) { func diffVarintSnappyDecode(input []byte) (index.Postings, error) { if !isDiffVarintSnappyEncodedPostings(input) { - return nil, errors.New("header not found") + return nil, errors.New("dvs header not found") } raw, err := snappy.Decode(nil, input[len(codecHeaderSnappy):]) @@ -97,6 +105,140 @@ func diffVarintSnappyDecode(input []byte) (index.Postings, error) { return newDiffVarintPostings(raw), nil } +// isDiffVarintSnappyEncodedPostings returns true, if input looks like it has been encoded by diff+varint+snappy+matchers codec. +func isDiffVarintSnappyWithmatchersEncodedPostings(input []byte) bool { + return bytes.HasPrefix(input, []byte(codecHeaderSnappyWithmatchers)) +} + +// diffVarintSnappyMatchersEncode encodes postings into snappy-encoded diff+varint representation, +// prepended with any unapplied matchers to the result. +// Returned byte slice starts with codecHeaderSnappyWithmatchers header. +// Length argument is expected number of postings, used for preallocating buffer. +func diffVarintSnappyMatchersEncode(p index.Postings, length int, unappliedMatchers []*labels.Matcher) ([]byte, error) { + varintPostings, err := diffVarintEncodeNoHeader(p, length) + if err != nil { + return nil, err + } + + // Estimate sizes + matchersLen := encodedMatchersLen(unappliedMatchers) + codecLen := len(codecHeaderSnappyWithmatchers) + + // Preallocate buffer + result := make([]byte, codecLen+matchersLen+snappy.MaxEncodedLen(len(varintPostings))) + + // Codec + copy(result, codecHeaderSnappyWithmatchers) + + // Matchers size + matchers + matchersWritten, err := encodeMatchers(matchersLen, unappliedMatchers, result[codecLen:]) + if err != nil { + return nil, err + } + if matchersWritten != matchersLen { + return nil, fmt.Errorf("encoding matchers wrote unexpected number of bytes: wrote %d, expected %d", matchersWritten, matchersLen) + } + + // Compressed postings + compressedPostings := snappy.Encode(result[codecLen+matchersWritten:], varintPostings) + + result = result[:codecLen+matchersWritten+len(compressedPostings)] + return result, nil +} + +// encodeMatchers needs to be called with the precomputed length of the encoded matchers from encodedMatchersLen +func encodeMatchers(totalLen int, matchers []*labels.Matcher, dest []byte) (written int, _ error) { + if len(dest) < totalLen { + return 0, fmt.Errorf("too small buffer to encode matchers: need at least %d, got %d", totalLen, dest) + } + written += binary.PutUvarint(dest, uint64(len(matchers))) + for _, m := range matchers { + written += binary.PutUvarint(dest[written:], uint64(len(m.Name))) + written += copy(dest[written:], m.Name) + + dest[written] = byte(m.Type) // TODO dimitarvdimitrov add a test which breaks when m.Type changes its values (i.e. Equal is not 0, NotEqual is not 1) + written++ + + written += binary.PutUvarint(dest[written:], uint64(len(m.Value))) + written += copy(dest[written:], m.Value) + } + return written, nil +} + +func encodedMatchersLen(matchers []*labels.Matcher) int { + matchersLen := varint.UvarintSize(uint64(len(matchers))) + for _, m := range matchers { + matchersLen += varint.UvarintSize(uint64(len(m.Name))) + matchersLen += len(m.Name) + matchersLen++ // 1 byte for the type + matchersLen += varint.UvarintSize(uint64(len(m.Value))) + matchersLen += len(m.Value) + } + return matchersLen +} + +// diffVarintSnappyMatchersDecode retains pointers to the buffer. +func diffVarintSnappyMatchersDecode(input []byte) (index.Postings, []*labels.Matcher, error) { + if !isDiffVarintSnappyWithmatchersEncodedPostings(input) { + return nil, nil, errors.New("dm header not found") + } + + codecLen := len(codecHeaderSnappyWithmatchers) + matchers, matchersLen, err := decodeMatchers(input[codecLen:]) + if err != nil { + return nil, nil, errors.Wrap(err, "decoding matchers") + } + raw, err := snappy.Decode(nil, input[codecLen+matchersLen:]) + if err != nil { + return nil, nil, errors.Wrap(err, "snappy decode") + } + + return newDiffVarintPostings(raw), matchers, nil +} + +// decodeMatchers retains points to the buffer +func decodeMatchers(src []byte) ([]*labels.Matcher, int, error) { + initialLength := len(src) + numMatchers, numMatchersLen := varint.Uvarint(src) + src = src[numMatchersLen:] + if numMatchers == 0 { + return nil, numMatchersLen, nil + } + matchers := make([]*labels.Matcher, 0, numMatchers) + + var ( + typ labels.MatchType + labelName string + value string + ) + for i := uint64(0); i < numMatchers; i++ { + n, nLen := varint.Uvarint(src) + src = src[nLen:] + labelName = yoloString(src[:n]) + src = src[n:] + + typ = labels.MatchType(src[0]) + src = src[1:] + + n, nLen = varint.Uvarint(src) + src = src[nLen:] + value = yoloString(src[:n]) + src = src[n:] + + m, err := labels.NewMatcher(typ, labelName, value) + if err != nil { + return nil, 0, err + } + matchers = append(matchers, m) + } + + return matchers, initialLength - len(src), nil +} + +func yoloString(buf []byte) string { + return *((*string)(unsafe.Pointer(&buf))) +} + func newDiffVarintPostings(input []byte) *diffVarintPostings { return &diffVarintPostings{buf: &encoding.Decbuf{B: input}} } diff --git a/pkg/storegateway/postings_codec_test.go b/pkg/storegateway/postings_codec_test.go index eb19dd6cde2..59bddde45f5 100644 --- a/pkg/storegateway/postings_codec_test.go +++ b/pkg/storegateway/postings_codec_test.go @@ -21,6 +21,8 @@ import ( "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/index" "github.com/stretchr/testify/assert" + + "github.com/grafana/mimir/pkg/storegateway/indexcache" ) func TestDiffVarintCodec(t *testing.T) { @@ -53,7 +55,7 @@ func TestDiffVarintCodec(t *testing.T) { `i=~".+"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchRegexp, "i", ".+")), `i=~"1.+"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchRegexp, "i", "1.+")), `i=~"^$"'`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchRegexp, "i", "^$")), - `i!~""`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotEqual, "i", "")), + `i!=""`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotEqual, "i", "")), `n!="2"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotEqual, "n", "2"+labelLongSuffix)), `i!~"2.*"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotRegexp, "i", "^2.*$")), } @@ -94,6 +96,106 @@ func TestDiffVarintCodec(t *testing.T) { } } +func TestLabelMatchersTypeValues(t *testing.T) { + expectedValues := map[labels.MatchType]int{ + labels.MatchEqual: 0, + labels.MatchNotEqual: 1, + labels.MatchRegexp: 2, + labels.MatchNotRegexp: 3, + } + + for matcherType, val := range expectedValues { + assert.Equal(t, int(labels.MustNewMatcher(matcherType, "", "").Type), val, + "diffVarintSnappyMatchersEncode relies on the number values of hte matchers not changing. "+ + "It caches each matcher type as these integer values. "+ + "If the integer values change, then the already cached values in the index cache will be improperly decoded.") + } +} + +func TestDiffVarintMatchersCodec(t *testing.T) { + chunksDir := t.TempDir() + + headOpts := tsdb.DefaultHeadOptions() + headOpts.ChunkDirRoot = chunksDir + headOpts.ChunkRange = 1000 + h, err := tsdb.NewHead(nil, nil, nil, nil, headOpts, nil) + assert.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, h.Close()) + assert.NoError(t, os.RemoveAll(chunksDir)) + }) + + appendTestSeries(1e4)(t, h.Appender(context.Background())) + + idx, err := h.Index() + assert.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, idx.Close()) + }) + + postingsMap := map[string]index.Postings{ + `no postings`: index.EmptyPostings(), + `single posting`: index.NewListPostings([]storage.SeriesRef{123}), + `n="1"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchEqual, "n", "1"+labelLongSuffix)), + `j="foo"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchEqual, "j", "foo")), + `j!="foo"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotEqual, "j", "foo")), + `i=~".*"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchRegexp, "i", ".*")), + `i=~".+"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchRegexp, "i", ".+")), + `i=~"1.+"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchRegexp, "i", "1.+")), + `i=~"^$"'`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchRegexp, "i", "^$")), + `i!=""`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotEqual, "i", "")), + `n!="2"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotEqual, "n", "2"+labelLongSuffix)), + `i!~"2.*"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotRegexp, "i", "^2.*$")), + } + + unappliedMatchersList := [][]*labels.Matcher{ + nil, + {}, + {{}}, + {labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "cpu_seconds")}, + {labels.MustNewMatcher(labels.MatchNotEqual, labels.MetricName, "cpu_seconds")}, + {labels.MustNewMatcher(labels.MatchRegexp, labels.MetricName, "^cpu_.*$")}, + {labels.MustNewMatcher(labels.MatchNotRegexp, labels.MetricName, "^cpu_.*$")}, + {labels.MustNewMatcher(labels.MatchEqual, "n", "1"+labelLongSuffix)}, + {labels.MustNewMatcher(labels.MatchEqual, labelLongSuffix, "1"+labelLongSuffix)}, + {labels.MustNewMatcher(labels.MatchEqual, "n", "")}, + {labels.MustNewMatcher(labels.MatchEqual, "n", "1"), labels.MustNewMatcher(labels.MatchEqual, "i", "2")}, + {labels.MustNewMatcher(labels.MatchRegexp, "n", ""), labels.MustNewMatcher(labels.MatchEqual, "i", "1")}, + {labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "\n")}, + } + + for postingsName, postings := range postingsMap { + p, err := toUint64Postings(postings) + assert.NoError(t, err) + + t.Run(postingsName, func(t *testing.T) { + for _, matchers := range unappliedMatchersList { + t.Run(string(indexcache.CanonicalLabelMatchersKey(matchers)), func(t *testing.T) { + + t.Log("postings entries:", p.len()) + t.Log("original size (4*entries):", 4*p.len(), "bytes") + p.reset() // We reuse postings between runs, so we need to reset iterator. + + data, err := diffVarintSnappyMatchersEncode(p, p.len(), matchers) + assert.NoError(t, err) + + t.Log("encoded size", len(data), "bytes") + t.Logf("ratio: %0.3f", float64(len(data))/float64(4*p.len())) + + decodedPostings, decodedMatchers, err := diffVarintSnappyMatchersDecode(data) + assert.NoError(t, err) + if assert.Len(t, decodedMatchers, len(matchers)) && len(matchers) > 0 { + assert.Equal(t, matchers, decodedMatchers) + } + + p.reset() + comparePostings(t, p, decodedPostings) + }) + } + }) + } +} + func comparePostings(t *testing.T, p1, p2 index.Postings) { for p1.Next() { if !p2.Next() { From 2c7c3afc14063d17fbe61826ee8432b4281da07e Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Wed, 5 Apr 2023 11:26:10 +0200 Subject: [PATCH 02/18] Update TODOs Signed-off-by: Dimitar Dimitrov --- pkg/storegateway/postings_codec.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/storegateway/postings_codec.go b/pkg/storegateway/postings_codec.go index ce8b4337f0c..cae3cfd2501 100644 --- a/pkg/storegateway/postings_codec.go +++ b/pkg/storegateway/postings_codec.go @@ -156,7 +156,7 @@ func encodeMatchers(totalLen int, matchers []*labels.Matcher, dest []byte) (writ written += binary.PutUvarint(dest[written:], uint64(len(m.Name))) written += copy(dest[written:], m.Name) - dest[written] = byte(m.Type) // TODO dimitarvdimitrov add a test which breaks when m.Type changes its values (i.e. Equal is not 0, NotEqual is not 1) + dest[written] = byte(m.Type) written++ written += binary.PutUvarint(dest[written:], uint64(len(m.Value))) @@ -225,6 +225,7 @@ func decodeMatchers(src []byte) ([]*labels.Matcher, int, error) { value = yoloString(src[:n]) src = src[n:] + // TODO dimitarvdimitrov add a test to make sure the matcher is compiled (maybe we can even take it from the compiled query matchers?) m, err := labels.NewMatcher(typ, labelName, value) if err != nil { return nil, 0, err From 2358c41faf416dfa99098cd76698e65a8c6d7342 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Wed, 5 Apr 2023 11:38:27 +0200 Subject: [PATCH 03/18] Add original matcher to non-lazy posting groups too Signed-off-by: Dimitar Dimitrov --- pkg/storegateway/bucket_index_postings.go | 26 ++++++++++++----------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/pkg/storegateway/bucket_index_postings.go b/pkg/storegateway/bucket_index_postings.go index 17cef1efab3..045f495ac5a 100644 --- a/pkg/storegateway/bucket_index_postings.go +++ b/pkg/storegateway/bucket_index_postings.go @@ -36,19 +36,21 @@ type rawPostingGroup struct { prefix string } -func newRawIntersectingPostingGroup(labelName string, keys []labels.Label) rawPostingGroup { +func newRawIntersectingPostingGroup(m *labels.Matcher, keys []labels.Label) rawPostingGroup { return rawPostingGroup{ - isSubtract: false, - labelName: labelName, - keys: keys, + isSubtract: false, + labelName: m.Name, + keys: keys, + originalMatcher: m, } } -func newRawSubtractingPostingGroup(labelName string, keys []labels.Label) rawPostingGroup { +func newRawSubtractingPostingGroup(m *labels.Matcher, keys []labels.Label) rawPostingGroup { return rawPostingGroup{ - isSubtract: true, - labelName: labelName, - keys: keys, + isSubtract: true, + labelName: m.Name, + keys: keys, + originalMatcher: m, } } @@ -140,16 +142,16 @@ func toRawPostingGroup(m *labels.Matcher) rawPostingGroup { keys = append(keys, labels.Label{Name: m.Name, Value: val}) } if m.Type == labels.MatchNotRegexp { - return newRawSubtractingPostingGroup(m.Name, keys) + return newRawSubtractingPostingGroup(m, keys) } - return newRawIntersectingPostingGroup(m.Name, keys) + return newRawIntersectingPostingGroup(m, keys) } if m.Value != "" { // Fast-path for equal matching. // Works for every case except for `foo=""`, which is a special case, see below. if m.Type == labels.MatchEqual { - return newRawIntersectingPostingGroup(m.Name, []labels.Label{{Name: m.Name, Value: m.Value}}) + return newRawIntersectingPostingGroup(m, []labels.Label{{Name: m.Name, Value: m.Value}}) } // If matcher is `label!="foo"`, we select an empty label value too, @@ -157,7 +159,7 @@ func toRawPostingGroup(m *labels.Matcher) rawPostingGroup { // So this matcher selects all series in the storage, // except for the ones that do have `label="foo"` if m.Type == labels.MatchNotEqual { - return newRawSubtractingPostingGroup(m.Name, []labels.Label{{Name: m.Name, Value: m.Value}}) + return newRawSubtractingPostingGroup(m, []labels.Label{{Name: m.Name, Value: m.Value}}) } } From 9982382f0c3ddaaf02905d2a9d695e509dd39bf4 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Wed, 5 Apr 2023 20:15:36 +0200 Subject: [PATCH 04/18] Copy matcher values and names Signed-off-by: Dimitar Dimitrov --- pkg/storegateway/bucket_index_reader.go | 1 - pkg/storegateway/postings_codec.go | 15 +++++---------- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/pkg/storegateway/bucket_index_reader.go b/pkg/storegateway/bucket_index_reader.go index 0950a529aea..942d2c2cf07 100644 --- a/pkg/storegateway/bucket_index_reader.go +++ b/pkg/storegateway/bucket_index_reader.go @@ -531,7 +531,6 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab return output, g.Wait() } -// decodePostings may retain pointers to the buffer. func (r *bucketIndexReader) decodePostings(b []byte, stats *safeQueryStats) (index.Postings, []*labels.Matcher, error) { // Even if this instance is not using compression, there may be compressed // entries in the cache written by other stores. diff --git a/pkg/storegateway/postings_codec.go b/pkg/storegateway/postings_codec.go index cae3cfd2501..5fba713b3e7 100644 --- a/pkg/storegateway/postings_codec.go +++ b/pkg/storegateway/postings_codec.go @@ -177,7 +177,6 @@ func encodedMatchersLen(matchers []*labels.Matcher) int { return matchersLen } -// diffVarintSnappyMatchersDecode retains pointers to the buffer. func diffVarintSnappyMatchersDecode(input []byte) (index.Postings, []*labels.Matcher, error) { if !isDiffVarintSnappyWithmatchersEncodedPostings(input) { return nil, nil, errors.New("dm header not found") @@ -196,7 +195,6 @@ func diffVarintSnappyMatchersDecode(input []byte) (index.Postings, []*labels.Mat return newDiffVarintPostings(raw), matchers, nil } -// decodeMatchers retains points to the buffer func decodeMatchers(src []byte) ([]*labels.Matcher, int, error) { initialLength := len(src) numMatchers, numMatchersLen := varint.Uvarint(src) @@ -206,23 +204,20 @@ func decodeMatchers(src []byte) ([]*labels.Matcher, int, error) { } matchers := make([]*labels.Matcher, 0, numMatchers) - var ( - typ labels.MatchType - labelName string - value string - ) for i := uint64(0); i < numMatchers; i++ { n, nLen := varint.Uvarint(src) src = src[nLen:] - labelName = yoloString(src[:n]) + // We should copy the string so that we don't retain a reference to the original slice, which may be large. + labelName := string(src[:n]) src = src[n:] - typ = labels.MatchType(src[0]) + typ := labels.MatchType(src[0]) src = src[1:] n, nLen = varint.Uvarint(src) src = src[nLen:] - value = yoloString(src[:n]) + // We should copy the string so that we don't retain a reference to the original slice, which may be large. + value := string(src[:n]) src = src[n:] // TODO dimitarvdimitrov add a test to make sure the matcher is compiled (maybe we can even take it from the compiled query matchers?) From 700dd616257ea2e1a66633512229e6b5477f6423 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Wed, 5 Apr 2023 20:17:08 +0200 Subject: [PATCH 05/18] Renames Signed-off-by: Dimitar Dimitrov --- pkg/storegateway/postings_codec.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/storegateway/postings_codec.go b/pkg/storegateway/postings_codec.go index 5fba713b3e7..a593004b9d0 100644 --- a/pkg/storegateway/postings_codec.go +++ b/pkg/storegateway/postings_codec.go @@ -121,35 +121,35 @@ func diffVarintSnappyMatchersEncode(p index.Postings, length int, unappliedMatch } // Estimate sizes - matchersLen := encodedMatchersLen(unappliedMatchers) + estimatedMatchersLen := encodedMatchersLen(unappliedMatchers) codecLen := len(codecHeaderSnappyWithmatchers) // Preallocate buffer - result := make([]byte, codecLen+matchersLen+snappy.MaxEncodedLen(len(varintPostings))) + result := make([]byte, codecLen+estimatedMatchersLen+snappy.MaxEncodedLen(len(varintPostings))) // Codec copy(result, codecHeaderSnappyWithmatchers) // Matchers size + matchers - matchersWritten, err := encodeMatchers(matchersLen, unappliedMatchers, result[codecLen:]) + actualMatchersLen, err := encodeMatchers(estimatedMatchersLen, unappliedMatchers, result[codecLen:]) if err != nil { return nil, err } - if matchersWritten != matchersLen { - return nil, fmt.Errorf("encoding matchers wrote unexpected number of bytes: wrote %d, expected %d", matchersWritten, matchersLen) + if actualMatchersLen != estimatedMatchersLen { + return nil, fmt.Errorf("encoding matchers wrote unexpected number of bytes: wrote %d, expected %d", actualMatchersLen, estimatedMatchersLen) } // Compressed postings - compressedPostings := snappy.Encode(result[codecLen+matchersWritten:], varintPostings) + compressedPostings := snappy.Encode(result[codecLen+actualMatchersLen:], varintPostings) - result = result[:codecLen+matchersWritten+len(compressedPostings)] + result = result[:codecLen+actualMatchersLen+len(compressedPostings)] return result, nil } // encodeMatchers needs to be called with the precomputed length of the encoded matchers from encodedMatchersLen -func encodeMatchers(totalLen int, matchers []*labels.Matcher, dest []byte) (written int, _ error) { - if len(dest) < totalLen { - return 0, fmt.Errorf("too small buffer to encode matchers: need at least %d, got %d", totalLen, dest) +func encodeMatchers(expectedLen int, matchers []*labels.Matcher, dest []byte) (written int, _ error) { + if len(dest) < expectedLen { + return 0, fmt.Errorf("too small buffer to encode matchers: need at least %d, got %d", expectedLen, dest) } written += binary.PutUvarint(dest, uint64(len(matchers))) for _, m := range matchers { From c467f94514730c997ba3640eca227a5ce682f4fb Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Wed, 5 Apr 2023 20:18:02 +0200 Subject: [PATCH 06/18] Use constants instead of hardcoding vals Signed-off-by: Dimitar Dimitrov --- pkg/storegateway/postings_codec.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/storegateway/postings_codec.go b/pkg/storegateway/postings_codec.go index a593004b9d0..d3aa2342787 100644 --- a/pkg/storegateway/postings_codec.go +++ b/pkg/storegateway/postings_codec.go @@ -94,7 +94,7 @@ func diffVarintEncodeNoHeader(p index.Postings, length int) ([]byte, error) { func diffVarintSnappyDecode(input []byte) (index.Postings, error) { if !isDiffVarintSnappyEncodedPostings(input) { - return nil, errors.New("dvs header not found") + return nil, errors.New(string(codecHeaderSnappy) + " header not found") } raw, err := snappy.Decode(nil, input[len(codecHeaderSnappy):]) @@ -179,7 +179,7 @@ func encodedMatchersLen(matchers []*labels.Matcher) int { func diffVarintSnappyMatchersDecode(input []byte) (index.Postings, []*labels.Matcher, error) { if !isDiffVarintSnappyWithmatchersEncodedPostings(input) { - return nil, nil, errors.New("dm header not found") + return nil, nil, errors.New(string(codecHeaderSnappyWithmatchers) + " header not found") } codecLen := len(codecHeaderSnappyWithmatchers) From 7f5dc6d03a9699f5697a764e7c8543db6ee10f5f Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Wed, 5 Apr 2023 20:19:40 +0200 Subject: [PATCH 07/18] Rename strategy name Signed-off-by: Dimitar Dimitrov --- pkg/storegateway/bucket_index_reader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storegateway/bucket_index_reader.go b/pkg/storegateway/bucket_index_reader.go index 942d2c2cf07..ee332ee9d4e 100644 --- a/pkg/storegateway/bucket_index_reader.go +++ b/pkg/storegateway/bucket_index_reader.go @@ -42,7 +42,7 @@ type postingsSelectionStrategy interface { type selectAllStrategy struct{} func (selectAllStrategy) name() string { - return "selectAll" + return "all" } func (selectAllStrategy) selectPostings(groups []postingGroup) (selected, omitted []postingGroup) { From 651b18f56f26e9cb5c98032a53c55528983a9b2d Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Wed, 5 Apr 2023 20:21:09 +0200 Subject: [PATCH 08/18] Rename/fix encoding Signed-off-by: Dimitar Dimitrov --- pkg/storegateway/bucket_index_reader.go | 4 ++-- pkg/storegateway/postings_codec.go | 22 +++++++++++----------- pkg/storegateway/postings_codec_test.go | 4 ++-- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/pkg/storegateway/bucket_index_reader.go b/pkg/storegateway/bucket_index_reader.go index ee332ee9d4e..d3b069a2b65 100644 --- a/pkg/storegateway/bucket_index_reader.go +++ b/pkg/storegateway/bucket_index_reader.go @@ -161,7 +161,7 @@ func (r *bucketIndexReader) expandedPostingsPromise(ctx context.Context, ms []*l } func (r *bucketIndexReader) cacheExpandedPostings(userID string, key indexcache.LabelMatchersKey, refs []storage.SeriesRef, unappliedMatchers []*labels.Matcher) { - data, err := diffVarintSnappyMatchersEncode(index.NewListPostings(refs), len(refs), unappliedMatchers) + data, err := diffVarintSnappyWithMatchersEncode(index.NewListPostings(refs), len(refs), unappliedMatchers) if err != nil { level.Warn(r.block.logger).Log("msg", "can't encode expanded postings cache", "err", err, "matchers_key", key, "block", r.block.meta.ULID) return @@ -552,7 +552,7 @@ func (r *bucketIndexReader) decodePostings(b []byte, stats *safeQueryStats) (ind } }) - case isDiffVarintSnappyWithmatchersEncodedPostings(b): + case isDiffVarintSnappyWithMatchersEncodedPostings(b): s := time.Now() l, matchers, err = diffVarintSnappyMatchersDecode(b) diff --git a/pkg/storegateway/postings_codec.go b/pkg/storegateway/postings_codec.go index d3aa2342787..a32f7c938a7 100644 --- a/pkg/storegateway/postings_codec.go +++ b/pkg/storegateway/postings_codec.go @@ -33,7 +33,7 @@ type codec string const ( codecHeaderSnappy codec = "dvs" // As in "diff+varint+snappy". - codecHeaderSnappyWithmatchers codec = "dm" // As in "dvs+matchers" + codecHeaderSnappyWithMatchers codec = "dm" // As in "dvs+matchers" ) // isDiffVarintSnappyEncodedPostings returns true, if input looks like it has been encoded by diff+varint+snappy codec. @@ -106,15 +106,15 @@ func diffVarintSnappyDecode(input []byte) (index.Postings, error) { } // isDiffVarintSnappyEncodedPostings returns true, if input looks like it has been encoded by diff+varint+snappy+matchers codec. -func isDiffVarintSnappyWithmatchersEncodedPostings(input []byte) bool { - return bytes.HasPrefix(input, []byte(codecHeaderSnappyWithmatchers)) +func isDiffVarintSnappyWithMatchersEncodedPostings(input []byte) bool { + return bytes.HasPrefix(input, []byte(codecHeaderSnappyWithMatchers)) } -// diffVarintSnappyMatchersEncode encodes postings into snappy-encoded diff+varint representation, +// diffVarintSnappyWithMatchersEncode encodes postings into snappy-encoded diff+varint representation, // prepended with any unapplied matchers to the result. -// Returned byte slice starts with codecHeaderSnappyWithmatchers header. +// Returned byte slice starts with codecHeaderSnappyWithMatchers header. // Length argument is expected number of postings, used for preallocating buffer. -func diffVarintSnappyMatchersEncode(p index.Postings, length int, unappliedMatchers []*labels.Matcher) ([]byte, error) { +func diffVarintSnappyWithMatchersEncode(p index.Postings, length int, unappliedMatchers []*labels.Matcher) ([]byte, error) { varintPostings, err := diffVarintEncodeNoHeader(p, length) if err != nil { return nil, err @@ -122,13 +122,13 @@ func diffVarintSnappyMatchersEncode(p index.Postings, length int, unappliedMatch // Estimate sizes estimatedMatchersLen := encodedMatchersLen(unappliedMatchers) - codecLen := len(codecHeaderSnappyWithmatchers) + codecLen := len(codecHeaderSnappyWithMatchers) // Preallocate buffer result := make([]byte, codecLen+estimatedMatchersLen+snappy.MaxEncodedLen(len(varintPostings))) // Codec - copy(result, codecHeaderSnappyWithmatchers) + copy(result, codecHeaderSnappyWithMatchers) // Matchers size + matchers actualMatchersLen, err := encodeMatchers(estimatedMatchersLen, unappliedMatchers, result[codecLen:]) @@ -178,11 +178,11 @@ func encodedMatchersLen(matchers []*labels.Matcher) int { } func diffVarintSnappyMatchersDecode(input []byte) (index.Postings, []*labels.Matcher, error) { - if !isDiffVarintSnappyWithmatchersEncodedPostings(input) { - return nil, nil, errors.New(string(codecHeaderSnappyWithmatchers) + " header not found") + if !isDiffVarintSnappyWithMatchersEncodedPostings(input) { + return nil, nil, errors.New(string(codecHeaderSnappyWithMatchers) + " header not found") } - codecLen := len(codecHeaderSnappyWithmatchers) + codecLen := len(codecHeaderSnappyWithMatchers) matchers, matchersLen, err := decodeMatchers(input[codecLen:]) if err != nil { return nil, nil, errors.Wrap(err, "decoding matchers") diff --git a/pkg/storegateway/postings_codec_test.go b/pkg/storegateway/postings_codec_test.go index 59bddde45f5..3e00f167932 100644 --- a/pkg/storegateway/postings_codec_test.go +++ b/pkg/storegateway/postings_codec_test.go @@ -106,7 +106,7 @@ func TestLabelMatchersTypeValues(t *testing.T) { for matcherType, val := range expectedValues { assert.Equal(t, int(labels.MustNewMatcher(matcherType, "", "").Type), val, - "diffVarintSnappyMatchersEncode relies on the number values of hte matchers not changing. "+ + "diffVarintSnappyWithMatchersEncode relies on the number values of hte matchers not changing. "+ "It caches each matcher type as these integer values. "+ "If the integer values change, then the already cached values in the index cache will be improperly decoded.") } @@ -176,7 +176,7 @@ func TestDiffVarintMatchersCodec(t *testing.T) { t.Log("original size (4*entries):", 4*p.len(), "bytes") p.reset() // We reuse postings between runs, so we need to reset iterator. - data, err := diffVarintSnappyMatchersEncode(p, p.len(), matchers) + data, err := diffVarintSnappyWithMatchersEncode(p, p.len(), matchers) assert.NoError(t, err) t.Log("encoded size", len(data), "bytes") From f4837d93f761862aadefda925e4799cef8f4d566 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Wed, 5 Apr 2023 20:22:55 +0200 Subject: [PATCH 09/18] Update cacheKeyExpandedPostings.size Signed-off-by: Dimitar Dimitrov --- pkg/storegateway/indexcache/inmemory.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storegateway/indexcache/inmemory.go b/pkg/storegateway/indexcache/inmemory.go index c9550c020fe..07cc075fc13 100644 --- a/pkg/storegateway/indexcache/inmemory.go +++ b/pkg/storegateway/indexcache/inmemory.go @@ -421,7 +421,7 @@ type cacheKeyExpandedPostings struct { func (c cacheKeyExpandedPostings) typ() string { return cacheTypeExpandedPostings } func (c cacheKeyExpandedPostings) size() uint64 { - return stringSize(c.userID) + ulidSize + stringSize(string(c.matchersKey)) + return stringSize(c.userID) + ulidSize + stringSize(string(c.matchersKey)) + stringSize(c.postingsSelectionStrategy) } type cacheKeySeriesForPostings struct { From 911363fd1df53d1a8c258e829e137a10eccb20de Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Wed, 5 Apr 2023 20:23:17 +0200 Subject: [PATCH 10/18] Rename strat to strategy Signed-off-by: Dimitar Dimitrov --- pkg/storegateway/bucket_index_reader.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/storegateway/bucket_index_reader.go b/pkg/storegateway/bucket_index_reader.go index d3b069a2b65..e2199e2d4f6 100644 --- a/pkg/storegateway/bucket_index_reader.go +++ b/pkg/storegateway/bucket_index_reader.go @@ -52,15 +52,15 @@ func (selectAllStrategy) selectPostings(groups []postingGroup) (selected, omitte // bucketIndexReader is a custom index reader (not conforming index.Reader interface) that reads index that is stored in // object storage without having to fully download it. type bucketIndexReader struct { - block *bucketBlock - postingsStrat postingsSelectionStrategy - dec *index.Decoder + block *bucketBlock + postingsStrategy postingsSelectionStrategy + dec *index.Decoder } -func newBucketIndexReader(block *bucketBlock, postingsStrat postingsSelectionStrategy) *bucketIndexReader { +func newBucketIndexReader(block *bucketBlock, postingsStrategy postingsSelectionStrategy) *bucketIndexReader { r := &bucketIndexReader{ - block: block, - postingsStrat: postingsStrat, + block: block, + postingsStrategy: postingsStrategy, dec: &index.Decoder{ LookupSymbol: block.indexHeaderReader.LookupSymbol, }, @@ -166,11 +166,11 @@ func (r *bucketIndexReader) cacheExpandedPostings(userID string, key indexcache. level.Warn(r.block.logger).Log("msg", "can't encode expanded postings cache", "err", err, "matchers_key", key, "block", r.block.meta.ULID) return } - r.block.indexCache.StoreExpandedPostings(userID, r.block.meta.ULID, key, r.postingsStrat.name(), data) + r.block.indexCache.StoreExpandedPostings(userID, r.block.meta.ULID, key, r.postingsStrategy.name(), data) } func (r *bucketIndexReader) fetchCachedExpandedPostings(ctx context.Context, userID string, key indexcache.LabelMatchersKey, stats *safeQueryStats) ([]storage.SeriesRef, []*labels.Matcher, bool) { - data, ok := r.block.indexCache.FetchExpandedPostings(ctx, userID, r.block.meta.ULID, key, r.postingsStrat.name()) + data, ok := r.block.indexCache.FetchExpandedPostings(ctx, userID, r.block.meta.ULID, key, r.postingsStrategy.name()) if !ok { return nil, nil, false } @@ -199,7 +199,7 @@ func (r *bucketIndexReader) expandedPostings(ctx context.Context, ms []*labels.M return nil, nil, nil } - postingGroups, omittedPostingGroups := r.postingsStrat.selectPostings(postingGroups) + postingGroups, omittedPostingGroups := r.postingsStrategy.selectPostings(postingGroups) fetchedPostings, err := r.fetchPostings(ctx, extractLabels(postingGroups), stats) if err != nil { From f3ec40cede02c2c429f4d2d861c25b72b8f4d836 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Wed, 5 Apr 2023 20:23:49 +0200 Subject: [PATCH 11/18] Rename originalMatcher to matcher Signed-off-by: Dimitar Dimitrov --- pkg/storegateway/bucket_index_postings.go | 58 +++++++++++------------ pkg/storegateway/bucket_index_reader.go | 2 +- 2 files changed, 30 insertions(+), 30 deletions(-) diff --git a/pkg/storegateway/bucket_index_postings.go b/pkg/storegateway/bucket_index_postings.go index 045f495ac5a..589dbf38760 100644 --- a/pkg/storegateway/bucket_index_postings.go +++ b/pkg/storegateway/bucket_index_postings.go @@ -31,46 +31,46 @@ type rawPostingGroup struct { labelName string keys []labels.Label - isLazy bool - originalMatcher *labels.Matcher - prefix string + isLazy bool + matcher *labels.Matcher + prefix string } func newRawIntersectingPostingGroup(m *labels.Matcher, keys []labels.Label) rawPostingGroup { return rawPostingGroup{ - isSubtract: false, - labelName: m.Name, - keys: keys, - originalMatcher: m, + isSubtract: false, + labelName: m.Name, + keys: keys, + matcher: m, } } func newRawSubtractingPostingGroup(m *labels.Matcher, keys []labels.Label) rawPostingGroup { return rawPostingGroup{ - isSubtract: true, - labelName: m.Name, - keys: keys, - originalMatcher: m, + isSubtract: true, + labelName: m.Name, + keys: keys, + matcher: m, } } func newLazyIntersectingPostingGroup(m *labels.Matcher) rawPostingGroup { return rawPostingGroup{ - isLazy: true, - isSubtract: false, - labelName: m.Name, - prefix: m.Prefix(), - originalMatcher: m, + isLazy: true, + isSubtract: false, + labelName: m.Name, + prefix: m.Prefix(), + matcher: m, } } func newLazySubtractingPostingGroup(m *labels.Matcher) rawPostingGroup { return rawPostingGroup{ - isLazy: true, - isSubtract: true, - labelName: m.Name, - prefix: m.Prefix(), - originalMatcher: m, + isLazy: true, + isSubtract: true, + labelName: m.Name, + prefix: m.Prefix(), + matcher: m, } } @@ -82,7 +82,7 @@ func (g rawPostingGroup) toPostingGroup(r indexheader.Reader) (postingGroup, err totalSize int64 ) if g.isLazy { - filter := g.originalMatcher.Matches + filter := g.matcher.Matches if g.isSubtract { filter = not(filter) } @@ -104,10 +104,10 @@ func (g rawPostingGroup) toPostingGroup(r indexheader.Reader) (postingGroup, err } return postingGroup{ - isSubtract: g.isSubtract, - originalMatcher: g.originalMatcher, - keys: keys, - totalSize: totalSize, + isSubtract: g.isSubtract, + matcher: g.matcher, + keys: keys, + totalSize: totalSize, }, nil } @@ -188,9 +188,9 @@ func not(filter func(string) bool) func(string) bool { // All the labels in keys should have a corresponding postings list in the index. // This computation happens in expandedPostings. type postingGroup struct { - isSubtract bool - originalMatcher *labels.Matcher - keys []labels.Label + isSubtract bool + matcher *labels.Matcher + keys []labels.Label // totalSize is the size in bytes of all the posting lists for keys totalSize int64 diff --git a/pkg/storegateway/bucket_index_reader.go b/pkg/storegateway/bucket_index_reader.go index e2199e2d4f6..f210ea6606c 100644 --- a/pkg/storegateway/bucket_index_reader.go +++ b/pkg/storegateway/bucket_index_reader.go @@ -256,7 +256,7 @@ func extractLabelMatchers(groups []postingGroup) []*labels.Matcher { } m := make([]*labels.Matcher, len(groups)) for i := range groups { - m[i] = groups[i].originalMatcher + m[i] = groups[i].matcher } return m } From 0515336fa4357c868d4efe3edda88edcfe5572a2 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Wed, 5 Apr 2023 20:25:23 +0200 Subject: [PATCH 12/18] Fix comments Signed-off-by: Dimitar Dimitrov --- pkg/storegateway/bucket_index_postings.go | 2 +- pkg/storegateway/bucket_index_reader.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/storegateway/bucket_index_postings.go b/pkg/storegateway/bucket_index_postings.go index 589dbf38760..1c15819affe 100644 --- a/pkg/storegateway/bucket_index_postings.go +++ b/pkg/storegateway/bucket_index_postings.go @@ -192,7 +192,7 @@ type postingGroup struct { matcher *labels.Matcher keys []labels.Label - // totalSize is the size in bytes of all the posting lists for keys + // totalSize is the size in bytes of all the posting lists for keys. totalSize int64 } diff --git a/pkg/storegateway/bucket_index_reader.go b/pkg/storegateway/bucket_index_reader.go index f210ea6606c..324ebdd0fe2 100644 --- a/pkg/storegateway/bucket_index_reader.go +++ b/pkg/storegateway/bucket_index_reader.go @@ -261,7 +261,7 @@ func extractLabelMatchers(groups []postingGroup) []*labels.Matcher { return m } -// extractLabels returns the keys of the posting groups in the that they are found in each posting group. +// extractLabels returns the keys of the posting groups in the order that they are found in each posting group. func extractLabels(groups []postingGroup) []labels.Label { numKeys := 0 for _, pg := range groups { From 856ab8a0af5f520bdaba2ec80fdb6d7585fa2f81 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Wed, 5 Apr 2023 20:26:37 +0200 Subject: [PATCH 13/18] Add changelog entry for invalidating cache entry Signed-off-by: Dimitar Dimitrov --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 32ee7161f56..8a9fbce43f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ * [CHANGE] Query-frontend: use protobuf internal query result payload format by default. This feature is no longer considered experimental. #4557 * [CHANGE] Ruler: reject creating federated rule groups while tenant federation is disabled. Previously the rule groups would be silently dropped during bucket sync. #4555 * [CHANGE] Compactor: the `/api/v1/upload/block/{block}/finish` endpoint now returns a `429` status code when the compactor has reached the limit specified by `-compactor.max-block-upload-validation-concurrency`. #4598 +* [CHANGE] Store-gateway: cache key format for expanded postings. This will invalidate the expanded postings in the index cache when deployed. #4667 * [FEATURE] Cache: Introduce experimental support for using Redis for results, chunks, index, and metadata caches. #4371 * [FEATURE] Vault: Introduce experimental integration with Vault to fetch secrets used to configure TLS for clients. Server TLS secrets will still be read from a file. `tls-ca-path`, `tls-cert-path` and `tls-key-path` will denote the path in Vault for the following CLI flags when `-vault.enabled` is true: #4446. * `-distributor.ha-tracker.etcd.*` From 101b674c32a98b097649f65f3bfb46d0ddac2a50 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Wed, 5 Apr 2023 20:35:07 +0200 Subject: [PATCH 14/18] Remove deadcode + TODO Signed-off-by: Dimitar Dimitrov --- pkg/storegateway/postings_codec.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pkg/storegateway/postings_codec.go b/pkg/storegateway/postings_codec.go index a32f7c938a7..9ab60c4a83f 100644 --- a/pkg/storegateway/postings_codec.go +++ b/pkg/storegateway/postings_codec.go @@ -9,7 +9,6 @@ import ( "bytes" "encoding/binary" "fmt" - "unsafe" "github.com/dennwc/varint" "github.com/golang/snappy" @@ -220,7 +219,6 @@ func decodeMatchers(src []byte) ([]*labels.Matcher, int, error) { value := string(src[:n]) src = src[n:] - // TODO dimitarvdimitrov add a test to make sure the matcher is compiled (maybe we can even take it from the compiled query matchers?) m, err := labels.NewMatcher(typ, labelName, value) if err != nil { return nil, 0, err @@ -231,10 +229,6 @@ func decodeMatchers(src []byte) ([]*labels.Matcher, int, error) { return matchers, initialLength - len(src), nil } -func yoloString(buf []byte) string { - return *((*string)(unsafe.Pointer(&buf))) -} - func newDiffVarintPostings(input []byte) *diffVarintPostings { return &diffVarintPostings{buf: &encoding.Decbuf{B: input}} } From c7442d01ca9dcda5706ace7c181a210b67701c0e Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Thu, 6 Apr 2023 12:02:37 +0200 Subject: [PATCH 15/18] Update CHANGELOG.md Co-authored-by: Marco Pracucci --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a9fbce43f4..b707cdf4baa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,7 +15,7 @@ * [CHANGE] Query-frontend: use protobuf internal query result payload format by default. This feature is no longer considered experimental. #4557 * [CHANGE] Ruler: reject creating federated rule groups while tenant federation is disabled. Previously the rule groups would be silently dropped during bucket sync. #4555 * [CHANGE] Compactor: the `/api/v1/upload/block/{block}/finish` endpoint now returns a `429` status code when the compactor has reached the limit specified by `-compactor.max-block-upload-validation-concurrency`. #4598 -* [CHANGE] Store-gateway: cache key format for expanded postings. This will invalidate the expanded postings in the index cache when deployed. #4667 +* [CHANGE] Store-gateway: cache key format for expanded postings has changed. This will invalidate the expanded postings in the index cache when deployed. #4667 * [FEATURE] Cache: Introduce experimental support for using Redis for results, chunks, index, and metadata caches. #4371 * [FEATURE] Vault: Introduce experimental integration with Vault to fetch secrets used to configure TLS for clients. Server TLS secrets will still be read from a file. `tls-ca-path`, `tls-cert-path` and `tls-key-path` will denote the path in Vault for the following CLI flags when `-vault.enabled` is true: #4446. * `-distributor.ha-tracker.etcd.*` From 131acfef06d6a5f12dda87c218afc5bd9d42b5cd Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Thu, 6 Apr 2023 12:04:16 +0200 Subject: [PATCH 16/18] s/unapplied/deferred/ in /pkg/storegateway Signed-off-by: Dimitar Dimitrov --- pkg/storegateway/bucket_index_reader.go | 50 ++++++++++++------------- pkg/storegateway/postings_codec.go | 8 ++-- pkg/storegateway/postings_codec_test.go | 4 +- 3 files changed, 31 insertions(+), 31 deletions(-) diff --git a/pkg/storegateway/bucket_index_reader.go b/pkg/storegateway/bucket_index_reader.go index 324ebdd0fe2..2f545dfcbca 100644 --- a/pkg/storegateway/bucket_index_reader.go +++ b/pkg/storegateway/bucket_index_reader.go @@ -79,10 +79,10 @@ func newBucketIndexReader(block *bucketBlock, postingsStrategy postingsSelection // single label name=value. func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.Matcher, stats *safeQueryStats) (returnRefs []storage.SeriesRef, returnErr error) { var ( - loaded bool - cached bool - promise expandedPostingsPromise - unappliedMatchers []*labels.Matcher + loaded bool + cached bool + promise expandedPostingsPromise + deferredMatchers []*labels.Matcher ) span, ctx := tracing.StartSpan(ctx, "ExpandedPostings()") defer func() { @@ -93,12 +93,12 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M span.Finish() }() promise, loaded = r.expandedPostingsPromise(ctx, ms, stats) - returnRefs, unappliedMatchers, cached, returnErr = promise(ctx) - if len(unappliedMatchers) > 0 { + returnRefs, deferredMatchers, cached, returnErr = promise(ctx) + if len(deferredMatchers) > 0 { // This shouldn't be reached since we do not have any postingsSelectionStrategy at the moment // and all matchers should be applied via posting lists. - // This will change once the clients of ExpandedPostings can work with unapplied matchers. - return nil, fmt.Errorf("there are unapplied matchers (%s) for query (%s)", indexcache.CanonicalLabelMatchersKey(unappliedMatchers), indexcache.CanonicalLabelMatchersKey(ms)) + // This will change once the clients of ExpandedPostings can work with deferred matchers. + return nil, fmt.Errorf("there are deferred matchers (%s) for query (%s)", indexcache.CanonicalLabelMatchersKey(deferredMatchers), indexcache.CanonicalLabelMatchersKey(ms)) } return returnRefs, returnErr @@ -113,11 +113,11 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M // TODO: https://github.com/grafana/mimir/issues/331 func (r *bucketIndexReader) expandedPostingsPromise(ctx context.Context, ms []*labels.Matcher, stats *safeQueryStats) (promise expandedPostingsPromise, loaded bool) { var ( - refs []storage.SeriesRef - unappliedMatchers []*labels.Matcher - err error - done = make(chan struct{}) - cached bool + refs []storage.SeriesRef + deferredMatchers []*labels.Matcher + err error + done = make(chan struct{}) + cached bool ) promise = func(ctx context.Context) ([]storage.SeriesRef, []*labels.Matcher, bool, error) { @@ -135,7 +135,7 @@ func (r *bucketIndexReader) expandedPostingsPromise(ctx context.Context, ms []*l refsCopy := make([]storage.SeriesRef, len(refs)) copy(refsCopy, refs) - return refsCopy, unappliedMatchers, cached, nil + return refsCopy, deferredMatchers, cached, nil } key := indexcache.CanonicalLabelMatchersKey(ms) @@ -148,20 +148,20 @@ func (r *bucketIndexReader) expandedPostingsPromise(ctx context.Context, ms []*l defer close(done) defer r.block.expandedPostingsPromises.Delete(key) - refs, unappliedMatchers, cached = r.fetchCachedExpandedPostings(ctx, r.block.userID, key, stats) + refs, deferredMatchers, cached = r.fetchCachedExpandedPostings(ctx, r.block.userID, key, stats) if cached { return promise, false } - refs, unappliedMatchers, err = r.expandedPostings(ctx, ms, stats) + refs, deferredMatchers, err = r.expandedPostings(ctx, ms, stats) if err != nil { return promise, false } - r.cacheExpandedPostings(r.block.userID, key, refs, unappliedMatchers) + r.cacheExpandedPostings(r.block.userID, key, refs, deferredMatchers) return promise, false } -func (r *bucketIndexReader) cacheExpandedPostings(userID string, key indexcache.LabelMatchersKey, refs []storage.SeriesRef, unappliedMatchers []*labels.Matcher) { - data, err := diffVarintSnappyWithMatchersEncode(index.NewListPostings(refs), len(refs), unappliedMatchers) +func (r *bucketIndexReader) cacheExpandedPostings(userID string, key indexcache.LabelMatchersKey, refs []storage.SeriesRef, deferredMatchers []*labels.Matcher) { + data, err := diffVarintSnappyWithMatchersEncode(index.NewListPostings(refs), len(refs), deferredMatchers) if err != nil { level.Warn(r.block.logger).Log("msg", "can't encode expanded postings cache", "err", err, "matchers_key", key, "block", r.block.meta.ULID) return @@ -175,7 +175,7 @@ func (r *bucketIndexReader) fetchCachedExpandedPostings(ctx context.Context, use return nil, nil, false } - p, unappliedMatchers, err := r.decodePostings(data, stats) + p, deferredMatchers, err := r.decodePostings(data, stats) if err != nil { level.Warn(r.block.logger).Log("msg", "can't decode expanded postings cache", "err", err, "matchers_key", key, "block", r.block.meta.ULID) return nil, nil, false @@ -186,11 +186,11 @@ func (r *bucketIndexReader) fetchCachedExpandedPostings(ctx context.Context, use level.Warn(r.block.logger).Log("msg", "can't expand decoded expanded postings cache", "err", err, "matchers_key", key, "block", r.block.meta.ULID) return nil, nil, false } - return refs, unappliedMatchers, true + return refs, deferredMatchers, true } // expandedPostings is the main logic of ExpandedPostings, without the promise wrapper. -func (r *bucketIndexReader) expandedPostings(ctx context.Context, ms []*labels.Matcher, stats *safeQueryStats) (returnRefs []storage.SeriesRef, unappliedMatchers []*labels.Matcher, returnErr error) { +func (r *bucketIndexReader) expandedPostings(ctx context.Context, ms []*labels.Matcher, stats *safeQueryStats) (returnRefs []storage.SeriesRef, deferredMatchers []*labels.Matcher, returnErr error) { postingGroups, err := toPostingGroups(ms, r.block.indexHeaderReader) if err != nil { return nil, nil, errors.Wrap(err, "toPostingGroups") @@ -404,10 +404,10 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab stats.postingsTouchedSizeSum += len(b) }) - l, unappliedMatchers, err := r.decodePostings(b, stats) - if len(unappliedMatchers) > 0 { + l, deferredMatchers, err := r.decodePostings(b, stats) + if len(deferredMatchers) > 0 { return nil, fmt.Errorf("not expecting matchers on non-expanded postings for %s=%s in block %s, but got %s", - key.Name, key.Value, r.block.meta.ULID, indexcache.CanonicalLabelMatchersKey(unappliedMatchers)) + key.Name, key.Value, r.block.meta.ULID, indexcache.CanonicalLabelMatchersKey(deferredMatchers)) } if err == nil { output[ix] = l diff --git a/pkg/storegateway/postings_codec.go b/pkg/storegateway/postings_codec.go index 9ab60c4a83f..acb94f4b5a4 100644 --- a/pkg/storegateway/postings_codec.go +++ b/pkg/storegateway/postings_codec.go @@ -110,17 +110,17 @@ func isDiffVarintSnappyWithMatchersEncodedPostings(input []byte) bool { } // diffVarintSnappyWithMatchersEncode encodes postings into snappy-encoded diff+varint representation, -// prepended with any unapplied matchers to the result. +// prepended with any deferred matchers to the result. // Returned byte slice starts with codecHeaderSnappyWithMatchers header. // Length argument is expected number of postings, used for preallocating buffer. -func diffVarintSnappyWithMatchersEncode(p index.Postings, length int, unappliedMatchers []*labels.Matcher) ([]byte, error) { +func diffVarintSnappyWithMatchersEncode(p index.Postings, length int, deferredMatchers []*labels.Matcher) ([]byte, error) { varintPostings, err := diffVarintEncodeNoHeader(p, length) if err != nil { return nil, err } // Estimate sizes - estimatedMatchersLen := encodedMatchersLen(unappliedMatchers) + estimatedMatchersLen := encodedMatchersLen(deferredMatchers) codecLen := len(codecHeaderSnappyWithMatchers) // Preallocate buffer @@ -130,7 +130,7 @@ func diffVarintSnappyWithMatchersEncode(p index.Postings, length int, unappliedM copy(result, codecHeaderSnappyWithMatchers) // Matchers size + matchers - actualMatchersLen, err := encodeMatchers(estimatedMatchersLen, unappliedMatchers, result[codecLen:]) + actualMatchersLen, err := encodeMatchers(estimatedMatchersLen, deferredMatchers, result[codecLen:]) if err != nil { return nil, err } diff --git a/pkg/storegateway/postings_codec_test.go b/pkg/storegateway/postings_codec_test.go index 3e00f167932..71940df68e7 100644 --- a/pkg/storegateway/postings_codec_test.go +++ b/pkg/storegateway/postings_codec_test.go @@ -148,7 +148,7 @@ func TestDiffVarintMatchersCodec(t *testing.T) { `i!~"2.*"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotRegexp, "i", "^2.*$")), } - unappliedMatchersList := [][]*labels.Matcher{ + deferredMatchersList := [][]*labels.Matcher{ nil, {}, {{}}, @@ -169,7 +169,7 @@ func TestDiffVarintMatchersCodec(t *testing.T) { assert.NoError(t, err) t.Run(postingsName, func(t *testing.T) { - for _, matchers := range unappliedMatchersList { + for _, matchers := range deferredMatchersList { t.Run(string(indexcache.CanonicalLabelMatchersKey(matchers)), func(t *testing.T) { t.Log("postings entries:", p.len()) From ef67e3e2343103885a5cd168054d511b0232e50e Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Thu, 6 Apr 2023 16:02:48 +0200 Subject: [PATCH 17/18] Add test for deferred matchers Signed-off-by: Dimitar Dimitrov --- pkg/storegateway/bucket_test.go | 50 +++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/pkg/storegateway/bucket_test.go b/pkg/storegateway/bucket_test.go index a308abc75a2..3bffaa0896d 100644 --- a/pkg/storegateway/bucket_test.go +++ b/pkg/storegateway/bucket_test.go @@ -478,6 +478,26 @@ func (c cacheNotExpectingToStoreLabelValues) StoreLabelValues(userID string, blo c.t.Fatalf("StoreLabelValues should not be called") } +type omitMatcherStrategy struct { + m *labels.Matcher +} + +func (o omitMatcherStrategy) name() string { + return "omitExact" +} + +func (o omitMatcherStrategy) selectPostings(groups []postingGroup) (selected, omitted []postingGroup) { + for _, g := range groups { + m := g.matcher + if m.Value == o.m.Value && m.Name == o.m.Name && m.Type == o.m.Type { + omitted = append(omitted, g) + } else { + selected = append(selected, g) + } + } + return +} + func TestBucketIndexReader_ExpandedPostings(t *testing.T) { tb := test.NewTB(t) const series = 500 @@ -741,6 +761,36 @@ func TestBucketIndexReader_ExpandedPostings(t *testing.T) { mockBucket.Mock.AssertNotCalled(t, "Get") mockBucket.Mock.AssertNotCalled(t, "GetRange") }) + + t.Run("postings selection strategy is respected", func(t *testing.T) { + b := newTestBucketBlock() + ctx := context.Background() + stats := newSafeQueryStats() + + // Construct two matchers that select inverse series. + // When combined they should select 0 series. + // But our selection strategy will omit the inverted one, so we will get some series. + // They should be the same series as if we passed only the non-inverted one. + matcher := labels.MustNewMatcher(labels.MatchRegexp, "i", "^.+$") + inverseMatcher, err := matcher.Inverse() + require.NoError(t, err) + + matchers := []*labels.Matcher{matcher, inverseMatcher} + + // We're using the unexported method expandedPostings because currently ExpandedPostings doesn't support + // deferred matchers. We can switch to that method once it does. + refsWithDeferredMatchers, deferredMatchers, err := newBucketIndexReader(b, omitMatcherStrategy{inverseMatcher}).expandedPostings(ctx, matchers, stats) + require.NoError(t, err) + if assert.Len(t, deferredMatchers, 1) { + assert.Equal(t, inverseMatcher, deferredMatchers[0]) + } + + refsWithoutDeferredMatchers, deferredMatchers, err := newBucketIndexReader(b, selectAllStrategy{}).expandedPostings(ctx, matchers[:1], stats) + require.NoError(t, err) + assert.Empty(t, deferredMatchers) + assert.Equal(t, refsWithoutDeferredMatchers, refsWithDeferredMatchers) + }) + }) } } From 1f9187b19283c43557e63ef85803ea96aa58238e Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Tue, 11 Apr 2023 16:00:39 +0200 Subject: [PATCH 18/18] s/deferred/pending in pkg/storegateway Signed-off-by: Dimitar Dimitrov --- pkg/storegateway/bucket_index_reader.go | 50 ++++++++++++------------- pkg/storegateway/bucket_test.go | 14 +++---- pkg/storegateway/postings_codec.go | 8 ++-- pkg/storegateway/postings_codec_test.go | 4 +- 4 files changed, 38 insertions(+), 38 deletions(-) diff --git a/pkg/storegateway/bucket_index_reader.go b/pkg/storegateway/bucket_index_reader.go index 2f545dfcbca..5f3dd26846b 100644 --- a/pkg/storegateway/bucket_index_reader.go +++ b/pkg/storegateway/bucket_index_reader.go @@ -79,10 +79,10 @@ func newBucketIndexReader(block *bucketBlock, postingsStrategy postingsSelection // single label name=value. func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.Matcher, stats *safeQueryStats) (returnRefs []storage.SeriesRef, returnErr error) { var ( - loaded bool - cached bool - promise expandedPostingsPromise - deferredMatchers []*labels.Matcher + loaded bool + cached bool + promise expandedPostingsPromise + pendingMatchers []*labels.Matcher ) span, ctx := tracing.StartSpan(ctx, "ExpandedPostings()") defer func() { @@ -93,12 +93,12 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M span.Finish() }() promise, loaded = r.expandedPostingsPromise(ctx, ms, stats) - returnRefs, deferredMatchers, cached, returnErr = promise(ctx) - if len(deferredMatchers) > 0 { + returnRefs, pendingMatchers, cached, returnErr = promise(ctx) + if len(pendingMatchers) > 0 { // This shouldn't be reached since we do not have any postingsSelectionStrategy at the moment // and all matchers should be applied via posting lists. - // This will change once the clients of ExpandedPostings can work with deferred matchers. - return nil, fmt.Errorf("there are deferred matchers (%s) for query (%s)", indexcache.CanonicalLabelMatchersKey(deferredMatchers), indexcache.CanonicalLabelMatchersKey(ms)) + // This will change once the clients of ExpandedPostings can work with pending matchers. + return nil, fmt.Errorf("there are pending matchers (%s) for query (%s)", indexcache.CanonicalLabelMatchersKey(pendingMatchers), indexcache.CanonicalLabelMatchersKey(ms)) } return returnRefs, returnErr @@ -113,11 +113,11 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M // TODO: https://github.com/grafana/mimir/issues/331 func (r *bucketIndexReader) expandedPostingsPromise(ctx context.Context, ms []*labels.Matcher, stats *safeQueryStats) (promise expandedPostingsPromise, loaded bool) { var ( - refs []storage.SeriesRef - deferredMatchers []*labels.Matcher - err error - done = make(chan struct{}) - cached bool + refs []storage.SeriesRef + pendingMatchers []*labels.Matcher + err error + done = make(chan struct{}) + cached bool ) promise = func(ctx context.Context) ([]storage.SeriesRef, []*labels.Matcher, bool, error) { @@ -135,7 +135,7 @@ func (r *bucketIndexReader) expandedPostingsPromise(ctx context.Context, ms []*l refsCopy := make([]storage.SeriesRef, len(refs)) copy(refsCopy, refs) - return refsCopy, deferredMatchers, cached, nil + return refsCopy, pendingMatchers, cached, nil } key := indexcache.CanonicalLabelMatchersKey(ms) @@ -148,20 +148,20 @@ func (r *bucketIndexReader) expandedPostingsPromise(ctx context.Context, ms []*l defer close(done) defer r.block.expandedPostingsPromises.Delete(key) - refs, deferredMatchers, cached = r.fetchCachedExpandedPostings(ctx, r.block.userID, key, stats) + refs, pendingMatchers, cached = r.fetchCachedExpandedPostings(ctx, r.block.userID, key, stats) if cached { return promise, false } - refs, deferredMatchers, err = r.expandedPostings(ctx, ms, stats) + refs, pendingMatchers, err = r.expandedPostings(ctx, ms, stats) if err != nil { return promise, false } - r.cacheExpandedPostings(r.block.userID, key, refs, deferredMatchers) + r.cacheExpandedPostings(r.block.userID, key, refs, pendingMatchers) return promise, false } -func (r *bucketIndexReader) cacheExpandedPostings(userID string, key indexcache.LabelMatchersKey, refs []storage.SeriesRef, deferredMatchers []*labels.Matcher) { - data, err := diffVarintSnappyWithMatchersEncode(index.NewListPostings(refs), len(refs), deferredMatchers) +func (r *bucketIndexReader) cacheExpandedPostings(userID string, key indexcache.LabelMatchersKey, refs []storage.SeriesRef, pendingMatchers []*labels.Matcher) { + data, err := diffVarintSnappyWithMatchersEncode(index.NewListPostings(refs), len(refs), pendingMatchers) if err != nil { level.Warn(r.block.logger).Log("msg", "can't encode expanded postings cache", "err", err, "matchers_key", key, "block", r.block.meta.ULID) return @@ -175,7 +175,7 @@ func (r *bucketIndexReader) fetchCachedExpandedPostings(ctx context.Context, use return nil, nil, false } - p, deferredMatchers, err := r.decodePostings(data, stats) + p, pendingMatchers, err := r.decodePostings(data, stats) if err != nil { level.Warn(r.block.logger).Log("msg", "can't decode expanded postings cache", "err", err, "matchers_key", key, "block", r.block.meta.ULID) return nil, nil, false @@ -186,11 +186,11 @@ func (r *bucketIndexReader) fetchCachedExpandedPostings(ctx context.Context, use level.Warn(r.block.logger).Log("msg", "can't expand decoded expanded postings cache", "err", err, "matchers_key", key, "block", r.block.meta.ULID) return nil, nil, false } - return refs, deferredMatchers, true + return refs, pendingMatchers, true } // expandedPostings is the main logic of ExpandedPostings, without the promise wrapper. -func (r *bucketIndexReader) expandedPostings(ctx context.Context, ms []*labels.Matcher, stats *safeQueryStats) (returnRefs []storage.SeriesRef, deferredMatchers []*labels.Matcher, returnErr error) { +func (r *bucketIndexReader) expandedPostings(ctx context.Context, ms []*labels.Matcher, stats *safeQueryStats) (returnRefs []storage.SeriesRef, pendingMatchers []*labels.Matcher, returnErr error) { postingGroups, err := toPostingGroups(ms, r.block.indexHeaderReader) if err != nil { return nil, nil, errors.Wrap(err, "toPostingGroups") @@ -404,10 +404,10 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab stats.postingsTouchedSizeSum += len(b) }) - l, deferredMatchers, err := r.decodePostings(b, stats) - if len(deferredMatchers) > 0 { + l, pendingMatchers, err := r.decodePostings(b, stats) + if len(pendingMatchers) > 0 { return nil, fmt.Errorf("not expecting matchers on non-expanded postings for %s=%s in block %s, but got %s", - key.Name, key.Value, r.block.meta.ULID, indexcache.CanonicalLabelMatchersKey(deferredMatchers)) + key.Name, key.Value, r.block.meta.ULID, indexcache.CanonicalLabelMatchersKey(pendingMatchers)) } if err == nil { output[ix] = l diff --git a/pkg/storegateway/bucket_test.go b/pkg/storegateway/bucket_test.go index 3bffaa0896d..71bfc4201d9 100644 --- a/pkg/storegateway/bucket_test.go +++ b/pkg/storegateway/bucket_test.go @@ -778,17 +778,17 @@ func TestBucketIndexReader_ExpandedPostings(t *testing.T) { matchers := []*labels.Matcher{matcher, inverseMatcher} // We're using the unexported method expandedPostings because currently ExpandedPostings doesn't support - // deferred matchers. We can switch to that method once it does. - refsWithDeferredMatchers, deferredMatchers, err := newBucketIndexReader(b, omitMatcherStrategy{inverseMatcher}).expandedPostings(ctx, matchers, stats) + // pending matchers. We can switch to that method once it does. + refsWithpendingMatchers, pendingMatchers, err := newBucketIndexReader(b, omitMatcherStrategy{inverseMatcher}).expandedPostings(ctx, matchers, stats) require.NoError(t, err) - if assert.Len(t, deferredMatchers, 1) { - assert.Equal(t, inverseMatcher, deferredMatchers[0]) + if assert.Len(t, pendingMatchers, 1) { + assert.Equal(t, inverseMatcher, pendingMatchers[0]) } - refsWithoutDeferredMatchers, deferredMatchers, err := newBucketIndexReader(b, selectAllStrategy{}).expandedPostings(ctx, matchers[:1], stats) + refsWithoutpendingMatchers, pendingMatchers, err := newBucketIndexReader(b, selectAllStrategy{}).expandedPostings(ctx, matchers[:1], stats) require.NoError(t, err) - assert.Empty(t, deferredMatchers) - assert.Equal(t, refsWithoutDeferredMatchers, refsWithDeferredMatchers) + assert.Empty(t, pendingMatchers) + assert.Equal(t, refsWithoutpendingMatchers, refsWithpendingMatchers) }) }) diff --git a/pkg/storegateway/postings_codec.go b/pkg/storegateway/postings_codec.go index acb94f4b5a4..ca6d7f0c6eb 100644 --- a/pkg/storegateway/postings_codec.go +++ b/pkg/storegateway/postings_codec.go @@ -110,17 +110,17 @@ func isDiffVarintSnappyWithMatchersEncodedPostings(input []byte) bool { } // diffVarintSnappyWithMatchersEncode encodes postings into snappy-encoded diff+varint representation, -// prepended with any deferred matchers to the result. +// prepended with any pending matchers to the result. // Returned byte slice starts with codecHeaderSnappyWithMatchers header. // Length argument is expected number of postings, used for preallocating buffer. -func diffVarintSnappyWithMatchersEncode(p index.Postings, length int, deferredMatchers []*labels.Matcher) ([]byte, error) { +func diffVarintSnappyWithMatchersEncode(p index.Postings, length int, pendingMatchers []*labels.Matcher) ([]byte, error) { varintPostings, err := diffVarintEncodeNoHeader(p, length) if err != nil { return nil, err } // Estimate sizes - estimatedMatchersLen := encodedMatchersLen(deferredMatchers) + estimatedMatchersLen := encodedMatchersLen(pendingMatchers) codecLen := len(codecHeaderSnappyWithMatchers) // Preallocate buffer @@ -130,7 +130,7 @@ func diffVarintSnappyWithMatchersEncode(p index.Postings, length int, deferredMa copy(result, codecHeaderSnappyWithMatchers) // Matchers size + matchers - actualMatchersLen, err := encodeMatchers(estimatedMatchersLen, deferredMatchers, result[codecLen:]) + actualMatchersLen, err := encodeMatchers(estimatedMatchersLen, pendingMatchers, result[codecLen:]) if err != nil { return nil, err } diff --git a/pkg/storegateway/postings_codec_test.go b/pkg/storegateway/postings_codec_test.go index 71940df68e7..4a5300e312a 100644 --- a/pkg/storegateway/postings_codec_test.go +++ b/pkg/storegateway/postings_codec_test.go @@ -148,7 +148,7 @@ func TestDiffVarintMatchersCodec(t *testing.T) { `i!~"2.*"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotRegexp, "i", "^2.*$")), } - deferredMatchersList := [][]*labels.Matcher{ + pendingMatchersList := [][]*labels.Matcher{ nil, {}, {{}}, @@ -169,7 +169,7 @@ func TestDiffVarintMatchersCodec(t *testing.T) { assert.NoError(t, err) t.Run(postingsName, func(t *testing.T) { - for _, matchers := range deferredMatchersList { + for _, matchers := range pendingMatchersList { t.Run(string(indexcache.CanonicalLabelMatchersKey(matchers)), func(t *testing.T) { t.Log("postings entries:", p.len())