Skip to content

Commit

Permalink
Store: add escape hatch to skip store resorting
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Hoffmann <mhoffm@posteo.de>
  • Loading branch information
MichaHoffmann committed Sep 11, 2023
1 parent 6638072 commit b1bba54
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 80 deletions.
71 changes: 37 additions & 34 deletions pkg/store/acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
{Type: storepb.LabelMatcher_EQ, Name: "n", Value: "1"},
},
expectedLabels: []labels.Labels{
labels.FromStrings("i", "a", "n", "1", "region", "eu-west"),
labels.FromStrings("i", "b", "n", "1", "region", "eu-west"),
labels.FromStrings("n", "1", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "a", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "b", "region", "eu-west"),
},
},
{
Expand All @@ -251,7 +251,7 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
{Type: storepb.LabelMatcher_EQ, Name: "i", Value: "a"},
},
expectedLabels: []labels.Labels{
labels.FromStrings("n", "1", "i", "a", "region", "eu-west"),
labels.FromStrings("i", "a", "n", "1", "region", "eu-west"),
},
},
{
Expand All @@ -270,9 +270,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
{Type: storepb.LabelMatcher_EQ, Name: "missing", Value: ""},
},
expectedLabels: []labels.Labels{
labels.FromStrings("i", "a", "n", "1", "region", "eu-west"),
labels.FromStrings("i", "b", "n", "1", "region", "eu-west"),
labels.FromStrings("n", "1", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "a", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "b", "region", "eu-west"),
labels.FromStrings("n", "2", "region", "eu-west"),
labels.FromStrings("n", "2.5", "region", "eu-west"),
},
Expand All @@ -295,8 +295,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
{Type: storepb.LabelMatcher_RE, Name: "i", Value: ".+"},
},
expectedLabels: []labels.Labels{
labels.FromStrings("n", "1", "i", "a", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "b", "region", "eu-west"),
labels.FromStrings("i", "a", "n", "1", "region", "eu-west"),
labels.FromStrings("i", "b", "n", "1", "region", "eu-west"),
},
},
{
Expand All @@ -306,9 +306,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
{Type: storepb.LabelMatcher_RE, Name: "i", Value: ".*"},
},
expectedLabels: []labels.Labels{
labels.FromStrings("i", "a", "n", "1", "region", "eu-west"),
labels.FromStrings("i", "b", "n", "1", "region", "eu-west"),
labels.FromStrings("n", "1", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "a", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "b", "region", "eu-west"),
labels.FromStrings("n", "2", "region", "eu-west"),
labels.FromStrings("n", "2.5", "region", "eu-west"),
},
Expand All @@ -332,8 +332,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
{Type: storepb.LabelMatcher_NEQ, Name: "i", Value: ""},
},
expectedLabels: []labels.Labels{
labels.FromStrings("n", "1", "i", "a", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "b", "region", "eu-west"),
labels.FromStrings("i", "a", "n", "1", "region", "eu-west"),
labels.FromStrings("i", "b", "n", "1", "region", "eu-west"),
},
},
{
Expand All @@ -352,8 +352,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
{Type: storepb.LabelMatcher_NEQ, Name: "i", Value: "a"},
},
expectedLabels: []labels.Labels{
labels.FromStrings("i", "b", "n", "1", "region", "eu-west"),
labels.FromStrings("n", "1", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "b", "region", "eu-west"),
},
},
{
Expand All @@ -363,9 +363,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
{Type: storepb.LabelMatcher_RE, Name: "n", Value: "^1$"},
},
expectedLabels: []labels.Labels{
labels.FromStrings("i", "a", "n", "1", "region", "eu-west"),
labels.FromStrings("i", "b", "n", "1", "region", "eu-west"),
labels.FromStrings("n", "1", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "a", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "b", "region", "eu-west"),
},
},
{
Expand All @@ -376,7 +376,7 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
{Type: storepb.LabelMatcher_RE, Name: "i", Value: "^a$"},
},
expectedLabels: []labels.Labels{
labels.FromStrings("n", "1", "i", "a", "region", "eu-west"),
labels.FromStrings("i", "a", "n", "1", "region", "eu-west"),
},
},
{
Expand All @@ -387,8 +387,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
{Type: storepb.LabelMatcher_RE, Name: "i", Value: "^a?$"},
},
expectedLabels: []labels.Labels{
labels.FromStrings("i", "a", "n", "1", "region", "eu-west"),
labels.FromStrings("n", "1", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "a", "region", "eu-west"),
},
},
{
Expand Down Expand Up @@ -422,9 +422,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
{Type: storepb.LabelMatcher_RE, Name: "i", Value: "^.*$"},
},
expectedLabels: []labels.Labels{
labels.FromStrings("i", "a", "n", "1", "region", "eu-west"),
labels.FromStrings("i", "b", "n", "1", "region", "eu-west"),
labels.FromStrings("n", "1", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "a", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "b", "region", "eu-west"),
},
},
{
Expand All @@ -435,8 +435,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
{Type: storepb.LabelMatcher_RE, Name: "i", Value: "^.+$"},
},
expectedLabels: []labels.Labels{
labels.FromStrings("n", "1", "i", "a", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "b", "region", "eu-west"),
labels.FromStrings("i", "a", "n", "1", "region", "eu-west"),
labels.FromStrings("i", "b", "n", "1", "region", "eu-west"),
},
},
{
Expand Down Expand Up @@ -489,8 +489,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
{Type: storepb.LabelMatcher_NRE, Name: "i", Value: "^a$"},
},
expectedLabels: []labels.Labels{
labels.FromStrings("i", "b", "n", "1", "region", "eu-west"),
labels.FromStrings("n", "1", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "b", "region", "eu-west"),
},
},
{
Expand All @@ -501,7 +501,7 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
{Type: storepb.LabelMatcher_NRE, Name: "i", Value: "^a?$"},
},
expectedLabels: []labels.Labels{
labels.FromStrings("n", "1", "i", "b", "region", "eu-west"),
labels.FromStrings("i", "b", "n", "1", "region", "eu-west"),
},
},
{
Expand All @@ -512,8 +512,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
{Type: storepb.LabelMatcher_NRE, Name: "i", Value: "^$"},
},
expectedLabels: []labels.Labels{
labels.FromStrings("n", "1", "i", "a", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "b", "region", "eu-west"),
labels.FromStrings("i", "a", "n", "1", "region", "eu-west"),
labels.FromStrings("i", "b", "n", "1", "region", "eu-west"),
},
},
{
Expand Down Expand Up @@ -545,7 +545,7 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
{Type: storepb.LabelMatcher_EQ, Name: "i", Value: "a"},
},
expectedLabels: []labels.Labels{
labels.FromStrings("n", "1", "i", "a", "region", "eu-west"),
labels.FromStrings("i", "a", "n", "1", "region", "eu-west"),
},
},
{
Expand All @@ -557,7 +557,7 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
{Type: storepb.LabelMatcher_RE, Name: "i", Value: "^(b|a).*$"},
},
expectedLabels: []labels.Labels{
labels.FromStrings("n", "1", "i", "a", "region", "eu-west"),
labels.FromStrings("i", "a", "n", "1", "region", "eu-west"),
},
},
{
Expand All @@ -567,9 +567,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
{Type: storepb.LabelMatcher_RE, Name: "n", Value: "(1|2)"},
},
expectedLabels: []labels.Labels{
labels.FromStrings("i", "a", "n", "1", "region", "eu-west"),
labels.FromStrings("i", "b", "n", "1", "region", "eu-west"),
labels.FromStrings("n", "1", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "a", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "b", "region", "eu-west"),
labels.FromStrings("n", "2", "region", "eu-west"),
},
},
Expand All @@ -580,8 +580,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
{Type: storepb.LabelMatcher_RE, Name: "i", Value: "a|b"},
},
expectedLabels: []labels.Labels{
labels.FromStrings("n", "1", "i", "a", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "b", "region", "eu-west"),
labels.FromStrings("i", "a", "n", "1", "region", "eu-west"),
labels.FromStrings("i", "b", "n", "1", "region", "eu-west"),
},
},
{
Expand All @@ -591,8 +591,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
{Type: storepb.LabelMatcher_RE, Name: "i", Value: "(a|b)"},
},
expectedLabels: []labels.Labels{
labels.FromStrings("n", "1", "i", "a", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "b", "region", "eu-west"),
labels.FromStrings("i", "a", "n", "1", "region", "eu-west"),
labels.FromStrings("i", "b", "n", "1", "region", "eu-west"),
},
},
{
Expand Down Expand Up @@ -706,12 +706,15 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
}
testutil.Ok(t, err)

testutil.Equals(t, true, slices.IsSortedFunc(srv.SeriesSet, func(x, y storepb.Series) bool {
return labels.Compare(x.PromLabels(), y.PromLabels()) < 0
}))

receivedLabels := make([]labels.Labels, 0)
for _, s := range srv.SeriesSet {
receivedLabels = append(receivedLabels, s.PromLabels())
}
slices.SortFunc(c.expectedLabels, func(a, b labels.Labels) bool { return labels.Compare(a, b) < 0 })
slices.SortFunc(receivedLabels, func(a, b labels.Labels) bool { return labels.Compare(a, b) < 0 })

testutil.Equals(t, c.expectedLabels, receivedLabels)
})
}
Expand Down
54 changes: 15 additions & 39 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1334,7 +1334,8 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt, maxResolutionMill

// Series implements the storepb.StoreServer interface.
func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) (err error) {
srv := newFlushableServer(seriesSrv)
srv := newFlushableServer(seriesSrv, sortingStrategyNone)

if s.queryGate != nil {
tracing.DoInSpan(srv.Context(), "store_query_gate_ismyturn", func(ctx context.Context) {
err = s.queryGate.Start(srv.Context())
Expand Down Expand Up @@ -1464,44 +1465,19 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
return errors.Wrapf(err, "fetch postings for block %s", blk.meta.ULID)
}

// If we have inner replica labels we need to resort.
s.mtx.Lock()
needsEagerRetrival := len(req.WithoutReplicaLabels) > 0 && s.labelNamesSet.HasAny(req.WithoutReplicaLabels)
s.mtx.Unlock()

var resp respSet
if needsEagerRetrival {
labelsToRemove := make(map[string]struct{})
for _, replicaLabel := range req.WithoutReplicaLabels {
labelsToRemove[replicaLabel] = struct{}{}
}
resp = newEagerRespSet(
srv.Context(),
span,
10*time.Minute,
blk.meta.ULID.String(),
[]labels.Labels{blk.extLset},
onClose,
blockClient,
shardMatcher,
false,
s.metrics.emptyPostingCount,
labelsToRemove,
)
} else {
resp = newLazyRespSet(
srv.Context(),
span,
10*time.Minute,
blk.meta.ULID.String(),
[]labels.Labels{blk.extLset},
onClose,
blockClient,
shardMatcher,
false,
s.metrics.emptyPostingCount,
)
}
resp := newEagerRespSet(
srv.Context(),
span,
10*time.Minute,
blk.meta.ULID.String(),
[]labels.Labels{blk.extLset},
onClose,
blockClient,
shardMatcher,
false,
s.metrics.emptyPostingCount,
nil,
)

mtx.Lock()
respSets = append(respSets, resp)
Expand Down
3 changes: 3 additions & 0 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3484,5 +3484,8 @@ func TestBucketStoreDedupOnBlockSeriesSet(t *testing.T) {
},
}, srv))

testutil.Equals(t, true, slices.IsSortedFunc(srv.SeriesSet, func(x, y storepb.Series) bool {
return labels.Compare(x.PromLabels(), y.PromLabels()) < 0
}))
testutil.Equals(t, 2, len(srv.SeriesSet))
}
27 changes: 26 additions & 1 deletion pkg/store/flushable.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,43 @@ import (
"github.com/thanos-io/thanos/pkg/store/storepb"
)

type sortingStrategy uint64

const (
sortingStrategyStore sortingStrategy = iota + 1
sortingStrategyNone
)

// flushableServer is an extension of storepb.Store_SeriesServer with a Flush method.
type flushableServer interface {
storepb.Store_SeriesServer

Flush() error
}

func newFlushableServer(
upstream storepb.Store_SeriesServer,
sortingsortingStrategy sortingStrategy,
) flushableServer {
return &resortingServer{Store_SeriesServer: upstream}
switch sortingsortingStrategy {
case sortingStrategyStore:
return &resortingServer{Store_SeriesServer: upstream}
case sortingStrategyNone:
return &passthroughServer{Store_SeriesServer: upstream}
default:
// should not happen.
panic("unexpected sorting strategy")
}
}

// passthroughServer is a flushableServer that forwards all data to
// an upstream server without additional processing.
type passthroughServer struct {
storepb.Store_SeriesServer
}

func (p *passthroughServer) Flush() error { return nil }

// resortingServer is a flushableServer that resorts all series by their labels.
// This is required if replica labels are stored internally in a TSDB.
// Data is resorted and sent to an upstream server upon calling Flush.
Expand Down
3 changes: 2 additions & 1 deletion pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ func (p *PrometheusStore) putBuffer(b *[]byte) {

// Series returns all series for a requested time range and label matcher.
func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) error {
s := newFlushableServer(seriesSrv)
s := newFlushableServer(seriesSrv, sortingStrategyStore)

extLset := p.externalLabelsFn()

match, matchers, err := matchesExternalLabels(r.Matchers, extLset)
Expand Down
10 changes: 6 additions & 4 deletions pkg/store/proxy_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,9 +756,9 @@ func newEagerRespSet(

// This should be used only for stores that does not support doing this on server side.
// See docs/proposals-accepted/20221129-avoid-global-sort.md for details.
if len(l.removeLabels) > 0 {
sortWithoutLabels(l.bufferedResponses, l.removeLabels)
}
// NOTE. Client is not guaranteed to give a sorted response when extLset is added
// Generally we need to resort here.
sortWithoutLabels(l.bufferedResponses, l.removeLabels)

}(ret)

Expand All @@ -785,7 +785,9 @@ func sortWithoutLabels(set []*storepb.SeriesResponse, labelsToRemove map[string]
continue
}

ser.Labels = labelpb.ZLabelsFromPromLabels(rmLabels(labelpb.ZLabelsToPromLabels(ser.Labels), labelsToRemove))
if len(labelsToRemove) > 0 {
ser.Labels = labelpb.ZLabelsFromPromLabels(rmLabels(labelpb.ZLabelsToPromLabels(ser.Labels), labelsToRemove))
}
}

// With the re-ordered label sets, re-sorting all series aligns the same series
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ type CloseDelegator interface {
// Series returns all series for a requested time range and label matcher. The returned data may
// exceed the requested time bounds.
func (s *TSDBStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) error {
srv := newFlushableServer(seriesSrv)
srv := newFlushableServer(seriesSrv, sortingStrategyStore)

match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset())
if err != nil {
Expand Down

0 comments on commit b1bba54

Please sign in to comment.