diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 779542f4fe2..c05afa4765f 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -290,6 +290,12 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error { return nil } +func (s *BucketStore) numBlocks() int { + s.mtx.RLock() + defer s.mtx.RUnlock() + return len(s.blocks) +} + func (s *BucketStore) getBlock(id ulid.ULID) *bucketBlock { s.mtx.RLock() defer s.mtx.RUnlock() @@ -301,6 +307,7 @@ func (s *BucketStore) addBlock(ctx context.Context, id ulid.ULID) (err error) { defer func() { if err != nil { + s.metrics.blockLoadFailures.Inc() os.RemoveAll(dir) } }() @@ -309,7 +316,6 @@ func (s *BucketStore) addBlock(ctx context.Context, id ulid.ULID) (err error) { b, err := newBucketBlock(ctx, log.With(s.logger, "block", id), s.bucket, id, dir, s.indexCache, s.chunkPool) if err != nil { - s.metrics.blockLoadFailures.Inc() return err } s.mtx.Lock() @@ -393,7 +399,6 @@ type seriesEntry struct { type bucketSeriesSet struct { set []seriesEntry i int - // chks []storepb.AggrChunk err error } @@ -564,7 +569,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag if err != nil { return errors.Errorf("aggregate %s does not exist", downsample.AggrSum) } - out.Count = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: x.Bytes()} + out.Sum = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: x.Bytes()} case storepb.Aggr_MIN: x, err := ac.Get(downsample.AggrMin) if err != nil { @@ -1443,7 +1448,7 @@ func (r *bucketChunkReader) Chunk(id uint64) (chunkenc.Chunk, error) { // rawChunk is a helper type that wraps a chunk's raw bytes and implements the chunkenc.Chunk // interface over it. -// It is used to Store API responses which don't need to introspect and validate the chunk's contents.s +// It is used to Store API responses which don't need to introspect and validate the chunk's contents. type rawChunk []byte func (b rawChunk) Encoding() chunkenc.Encoding { diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 10c98593a97..79129eb89d0 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -2,20 +2,23 @@ package store import ( "context" + "errors" "io/ioutil" "os" "path/filepath" "testing" "time" + "github.com/oklog/ulid" + + "github.com/improbable-eng/thanos/pkg/compact/downsample" + "github.com/improbable-eng/thanos/pkg/block" "github.com/improbable-eng/thanos/pkg/objstore" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/improbable-eng/thanos/pkg/store/storepb" "github.com/improbable-eng/thanos/pkg/testutil" - "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/timestamp" - "github.com/prometheus/tsdb" "github.com/prometheus/tsdb/labels" ) @@ -190,97 +193,173 @@ func TestBucketStore_e2e(t *testing.T) { testutil.Equals(t, 0, len(srv.SeriesSet)) } -func TestBucketBlock_matches(t *testing.T) { - makeMeta := func(mint, maxt int64, lset map[string]string) *block.Meta { - return &block.Meta{ - BlockMeta: tsdb.BlockMeta{ - MinTime: mint, - MaxTime: maxt, - }, - Thanos: block.ThanosMeta{Labels: lset}, - } +func TestBucketBlockSet_addGet(t *testing.T) { + set := newBucketBlockSet(labels.Labels{}) + + type resBlock struct { + mint, maxt int64 + window int64 + } + input := []resBlock{ + // Blocks from 0 to 100 with raw resolution. + {window: downsample.ResLevel0, mint: 0, maxt: 100}, + {window: downsample.ResLevel0, mint: 100, maxt: 200}, + {window: downsample.ResLevel0, mint: 200, maxt: 300}, + {window: downsample.ResLevel0, mint: 300, maxt: 400}, + {window: downsample.ResLevel0, mint: 400, maxt: 500}, + // Lower resolution data not covering last block. + {window: downsample.ResLevel1, mint: 0, maxt: 100}, + {window: downsample.ResLevel1, mint: 100, maxt: 200}, + {window: downsample.ResLevel1, mint: 200, maxt: 300}, + {window: downsample.ResLevel1, mint: 300, maxt: 400}, + // Lower resolution data only covering middle blocks. + {window: downsample.ResLevel2, mint: 100, maxt: 200}, + {window: downsample.ResLevel2, mint: 200, maxt: 300}, } + + for _, in := range input { + var m block.Meta + m.Thanos.Downsample.Resolution = in.window + m.MinTime = in.mint + m.MaxTime = in.maxt + set.add(&bucketBlock{meta: &m}) + } + cases := []struct { - meta *block.Meta - mint, maxt int64 - matchers []labels.Matcher - expBlockMatchers []labels.Matcher - ok bool + mint, maxt int64 + minResolution int64 + res []resBlock }{ { - meta: makeMeta(100, 200, nil), - mint: 0, - maxt: 99, - ok: false, - }, - { - meta: makeMeta(100, 200, nil), - mint: 0, - maxt: 100, - ok: true, - }, - { - meta: makeMeta(100, 200, nil), - mint: 201, - maxt: 250, - ok: false, + mint: -100, + maxt: 1000, + minResolution: 0, + res: []resBlock{ + {window: downsample.ResLevel0, mint: 0, maxt: 100}, + {window: downsample.ResLevel0, mint: 100, maxt: 200}, + {window: downsample.ResLevel0, mint: 200, maxt: 300}, + {window: downsample.ResLevel0, mint: 300, maxt: 400}, + {window: downsample.ResLevel0, mint: 400, maxt: 500}, + }, + }, { + mint: 100, + maxt: 400, + minResolution: downsample.ResLevel1 - 1, + res: []resBlock{ + {window: downsample.ResLevel0, mint: 100, maxt: 200}, + {window: downsample.ResLevel0, mint: 200, maxt: 300}, + {window: downsample.ResLevel0, mint: 300, maxt: 400}, + }, + }, { + mint: 100, + maxt: 500, + minResolution: downsample.ResLevel1, + res: []resBlock{ + {window: downsample.ResLevel1, mint: 100, maxt: 200}, + {window: downsample.ResLevel1, mint: 200, maxt: 300}, + {window: downsample.ResLevel1, mint: 300, maxt: 400}, + {window: downsample.ResLevel0, mint: 400, maxt: 500}, + }, + }, { + mint: 0, + maxt: 500, + minResolution: downsample.ResLevel2, + res: []resBlock{ + {window: downsample.ResLevel1, mint: 0, maxt: 100}, + {window: downsample.ResLevel2, mint: 100, maxt: 200}, + {window: downsample.ResLevel2, mint: 200, maxt: 300}, + {window: downsample.ResLevel1, mint: 300, maxt: 400}, + {window: downsample.ResLevel0, mint: 400, maxt: 500}, + }, }, + } + for i, c := range cases { + t.Logf("case %d", i) + + var exp []*bucketBlock + for _, b := range c.res { + var m block.Meta + m.Thanos.Downsample.Resolution = b.window + m.MinTime = b.mint + m.MaxTime = b.maxt + exp = append(exp, &bucketBlock{meta: &m}) + } + res := set.getFor(c.mint, c.maxt, c.minResolution) + testutil.Equals(t, exp, res) + } +} + +func TestBucketBlockSet_remove(t *testing.T) { + set := newBucketBlockSet(labels.Labels{}) + + type resBlock struct { + id ulid.ULID + mint, maxt int64 + } + input := []resBlock{ + {id: ulid.MustNew(1, nil), mint: 0, maxt: 100}, + {id: ulid.MustNew(2, nil), mint: 100, maxt: 200}, + {id: ulid.MustNew(3, nil), mint: 200, maxt: 300}, + } + + for _, in := range input { + var m block.Meta + m.ULID = in.id + m.MinTime = in.mint + m.MaxTime = in.maxt + set.add(&bucketBlock{meta: &m}) + } + set.remove(input[1].id) + res := set.getFor(0, 300, 0) + + testutil.Equals(t, 2, len(res)) + testutil.Equals(t, input[0].id, res[0].meta.ULID) + testutil.Equals(t, input[2].id, res[1].meta.ULID) +} + +func TestBucketBlockSet_labelMatchers(t *testing.T) { + set := newBucketBlockSet(labels.FromStrings("a", "b", "c", "d")) + + cases := []struct { + in []labels.Matcher + res []labels.Matcher + match bool + }{ { - meta: makeMeta(100, 200, nil), - mint: 200, - maxt: 250, - ok: true, + in: []labels.Matcher{}, + res: []labels.Matcher{}, + match: true, }, { - meta: makeMeta(100, 200, nil), - mint: 150, - maxt: 160, - matchers: []labels.Matcher{ - labels.NewEqualMatcher("a", "b"), - }, - expBlockMatchers: []labels.Matcher{ + in: []labels.Matcher{ labels.NewEqualMatcher("a", "b"), + labels.NewEqualMatcher("c", "d"), }, - ok: true, + res: []labels.Matcher{}, + match: true, }, { - meta: makeMeta(100, 200, map[string]string{"a": "b"}), - mint: 150, - maxt: 160, - matchers: []labels.Matcher{ + in: []labels.Matcher{ labels.NewEqualMatcher("a", "b"), + labels.NewEqualMatcher("c", "b"), }, - ok: true, + match: false, }, { - meta: makeMeta(100, 200, map[string]string{"a": "b"}), - mint: 150, - maxt: 160, - matchers: []labels.Matcher{ - labels.NewEqualMatcher("a", "c"), - }, - ok: false, - }, - { - meta: makeMeta(100, 200, map[string]string{"a": "b"}), - mint: 150, - maxt: 160, - matchers: []labels.Matcher{ + in: []labels.Matcher{ labels.NewEqualMatcher("a", "b"), - labels.NewEqualMatcher("d", "e"), + labels.NewEqualMatcher("e", "f"), }, - expBlockMatchers: []labels.Matcher{ - labels.NewEqualMatcher("d", "e"), + res: []labels.Matcher{ + labels.NewEqualMatcher("e", "f"), }, - ok: true, + match: true, }, } - - for i, c := range cases { - b := &bucketBlock{meta: c.meta} - blockMatchers, ok := b.blockMatchers(c.mint, c.maxt, c.matchers...) - testutil.Assert(t, c.ok == ok, "test case %d failed", i) - testutil.Equals(t, c.expBlockMatchers, blockMatchers) + for _, c := range cases { + res, ok := set.labelMatchers(c.in...) + testutil.Equals(t, c.match, ok) + testutil.Equals(t, c.res, res) } }