Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store-gateway: add support for partial expanded postings #4667

Merged
merged 18 commits into from
Apr 11, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* [CHANGE] Query-frontend: use protobuf internal query result payload format by default. This feature is no longer considered experimental. #4557
* [CHANGE] Ruler: reject creating federated rule groups while tenant federation is disabled. Previously the rule groups would be silently dropped during bucket sync. #4555
* [CHANGE] Compactor: the `/api/v1/upload/block/{block}/finish` endpoint now returns a `429` status code when the compactor has reached the limit specified by `-compactor.max-block-upload-validation-concurrency`. #4598
* [CHANGE] Store-gateway: cache key format for expanded postings has changed. This will invalidate the expanded postings in the index cache when deployed. #4667
* [FEATURE] Cache: Introduce experimental support for using Redis for results, chunks, index, and metadata caches. #4371
* [FEATURE] Vault: Introduce experimental integration with Vault to fetch secrets used to configure TLS for clients. Server TLS secrets will still be read from a file. `tls-ca-path`, `tls-cert-path` and `tls-key-path` will denote the path in Vault for the following CLI flags when `-vault.enabled` is true: #4446.
* `-distributor.ha-tracker.etcd.*`
Expand Down
7 changes: 4 additions & 3 deletions pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,10 @@ func (noopCache) FetchMultiSeriesForRefs(_ context.Context, _ string, _ ulid.ULI
return map[storage.SeriesRef][]byte{}, ids
}

func (c noopCache) StoreExpandedPostings(_ string, _ ulid.ULID, _ indexcache.LabelMatchersKey, _ []byte) {
func (c noopCache) StoreExpandedPostings(_ string, _ ulid.ULID, _ indexcache.LabelMatchersKey, _ string, _ []byte) {
}

func (c noopCache) FetchExpandedPostings(_ context.Context, _ string, _ ulid.ULID, _ indexcache.LabelMatchersKey) ([]byte, bool) {
func (c noopCache) FetchExpandedPostings(_ context.Context, _ string, _ ulid.ULID, _ indexcache.LabelMatchersKey, _ string) ([]byte, bool) {
return nil, false
}

Expand Down Expand Up @@ -1482,7 +1482,8 @@ func (b *bucketBlock) chunkRangeReader(ctx context.Context, seq int, off, length

func (b *bucketBlock) indexReader() *bucketIndexReader {
b.pendingReaders.Add(1)
return newBucketIndexReader(b)
// This will be replaced with a strategy selected via a CLI flag.
return newBucketIndexReader(b, selectAllStrategy{})
}

func (b *bucketBlock) chunkReader(ctx context.Context) *bucketChunkReader {
Expand Down
89 changes: 55 additions & 34 deletions pkg/storegateway/bucket_index_postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,91 +31,108 @@ type rawPostingGroup struct {
labelName string
keys []labels.Label

isLazy bool
lazyMatcher func(string) bool
prefix string
isLazy bool
matcher *labels.Matcher
prefix string
}

func newRawIntersectingPostingGroup(labelName string, keys []labels.Label) rawPostingGroup {
func newRawIntersectingPostingGroup(m *labels.Matcher, keys []labels.Label) rawPostingGroup {
return rawPostingGroup{
isSubtract: false,
labelName: labelName,
labelName: m.Name,
keys: keys,
matcher: m,
}
}

func newRawSubtractingPostingGroup(labelName string, keys []labels.Label) rawPostingGroup {
func newRawSubtractingPostingGroup(m *labels.Matcher, keys []labels.Label) rawPostingGroup {
return rawPostingGroup{
isSubtract: true,
labelName: labelName,
labelName: m.Name,
keys: keys,
matcher: m,
}
}

func newLazyIntersectingPostingGroup(labelName string, prefix string, matcher func(string) bool) rawPostingGroup {
func newLazyIntersectingPostingGroup(m *labels.Matcher) rawPostingGroup {
return rawPostingGroup{
isLazy: true,
isSubtract: false,
labelName: labelName,
lazyMatcher: matcher,
prefix: prefix,
isLazy: true,
isSubtract: false,
labelName: m.Name,
prefix: m.Prefix(),
matcher: m,
}
}

func newLazySubtractingPostingGroup(labelName string, prefix string, matcher func(string) bool) rawPostingGroup {
func newLazySubtractingPostingGroup(m *labels.Matcher) rawPostingGroup {
return rawPostingGroup{
isLazy: true,
isSubtract: true,
labelName: labelName,
lazyMatcher: matcher,
prefix: prefix,
isLazy: true,
isSubtract: true,
labelName: m.Name,
prefix: m.Prefix(),
matcher: m,
}
}

// toPostingGroup returns a postingGroup which shares the underlying keys slice with g.
// This means that after calling toPostingGroup g.keys will be modified.
func (g rawPostingGroup) toPostingGroup(r indexheader.Reader) (postingGroup, error) {
var keys []labels.Label
var (
keys []labels.Label
totalSize int64
)
if g.isLazy {
vals, err := r.LabelValues(g.labelName, g.prefix, g.lazyMatcher)
filter := g.matcher.Matches
if g.isSubtract {
filter = not(filter)
}
vals, err := r.LabelValuesOffsets(g.labelName, g.prefix, filter)
if err != nil {
return postingGroup{}, err
}
keys = make([]labels.Label, len(vals))
for i := range vals {
keys[i] = labels.Label{Name: g.labelName, Value: vals[i]}
keys[i] = labels.Label{Name: g.labelName, Value: vals[i].LabelValue}
totalSize += vals[i].Off.End - vals[i].Off.Start
}
} else {
var err error
keys, err = g.filterNonExistingKeys(r)
keys, totalSize, err = g.filterNonExistingKeys(r)
if err != nil {
return postingGroup{}, errors.Wrap(err, "filter posting keys")
}
}

return postingGroup{
isSubtract: g.isSubtract,
matcher: g.matcher,
keys: keys,
totalSize: totalSize,
}, nil
}

// filterNonExistingKeys uses the indexheader.Reader to filter out any label values that do not exist in this index.
// modifies the underlying keys slice of the group. Do not use the rawPostingGroup after calling toPostingGroup.
func (g rawPostingGroup) filterNonExistingKeys(r indexheader.Reader) ([]labels.Label, error) {
writeIdx := 0
func (g rawPostingGroup) filterNonExistingKeys(r indexheader.Reader) ([]labels.Label, int64, error) {
var (
writeIdx int
totalSize int64
)
for _, l := range g.keys {
if _, err := r.PostingsOffset(l.Name, l.Value); errors.Is(err, indexheader.NotFoundRangeErr) {
offset, err := r.PostingsOffset(l.Name, l.Value)
if errors.Is(err, indexheader.NotFoundRangeErr) {
// This label name and value doesn't exist in this block, so there are 0 postings we can match.
// Try with the rest of the set matchers, maybe they can match some series.
// Continue so we overwrite it next time there's an existing value.
continue
} else if err != nil {
return nil, err
return nil, 0, err
}
g.keys[writeIdx] = l
writeIdx++
totalSize += offset.End - offset.Start
}
return g.keys[:writeIdx], nil
return g.keys[:writeIdx], totalSize, nil
}

func toRawPostingGroup(m *labels.Matcher) rawPostingGroup {
Expand All @@ -125,24 +142,24 @@ func toRawPostingGroup(m *labels.Matcher) rawPostingGroup {
keys = append(keys, labels.Label{Name: m.Name, Value: val})
}
if m.Type == labels.MatchNotRegexp {
return newRawSubtractingPostingGroup(m.Name, keys)
return newRawSubtractingPostingGroup(m, keys)
}
return newRawIntersectingPostingGroup(m.Name, keys)
return newRawIntersectingPostingGroup(m, keys)
}

if m.Value != "" {
// Fast-path for equal matching.
// Works for every case except for `foo=""`, which is a special case, see below.
if m.Type == labels.MatchEqual {
return newRawIntersectingPostingGroup(m.Name, []labels.Label{{Name: m.Name, Value: m.Value}})
return newRawIntersectingPostingGroup(m, []labels.Label{{Name: m.Name, Value: m.Value}})
}

// If matcher is `label!="foo"`, we select an empty label value too,
// i.e., series that don't have this label.
// So this matcher selects all series in the storage,
// except for the ones that do have `label="foo"`
if m.Type == labels.MatchNotEqual {
return newRawSubtractingPostingGroup(m.Name, []labels.Label{{Name: m.Name, Value: m.Value}})
return newRawSubtractingPostingGroup(m, []labels.Label{{Name: m.Name, Value: m.Value}})
}
}

Expand All @@ -153,12 +170,12 @@ func toRawPostingGroup(m *labels.Matcher) rawPostingGroup {
// have the label name set too. See: https://github.com/prometheus/prometheus/issues/3575
// and https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555.
if m.Matches("") {
return newLazySubtractingPostingGroup(m.Name, m.Prefix(), not(m.Matches))
return newLazySubtractingPostingGroup(m)
}

// Our matcher does not match the empty value, so we just need the postings that correspond
// to label values matched by the matcher.
return newLazyIntersectingPostingGroup(m.Name, m.Prefix(), m.Matches)
return newLazyIntersectingPostingGroup(m)
}

func not(filter func(string) bool) func(string) bool {
Expand All @@ -172,7 +189,11 @@ func not(filter func(string) bool) func(string) bool {
// This computation happens in expandedPostings.
type postingGroup struct {
isSubtract bool
matcher *labels.Matcher
keys []labels.Label

// totalSize is the size in bytes of all the posting lists for keys.
totalSize int64
}

type postingPtr struct {
Expand Down
Loading