Skip to content

Commit

Permalink
store: add tests for blockStoreSet
Browse files Browse the repository at this point in the history
  • Loading branch information
fabxc committed Jan 29, 2018
1 parent b89b1c2 commit 134d1cc
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 75 deletions.
11 changes: 8 additions & 3 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,12 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error {
return nil
}

func (s *BucketStore) numBlocks() int {
s.mtx.RLock()
defer s.mtx.RUnlock()
return len(s.blocks)
}

func (s *BucketStore) getBlock(id ulid.ULID) *bucketBlock {
s.mtx.RLock()
defer s.mtx.RUnlock()
Expand All @@ -301,6 +307,7 @@ func (s *BucketStore) addBlock(ctx context.Context, id ulid.ULID) (err error) {

defer func() {
if err != nil {
s.metrics.blockLoadFailures.Inc()
os.RemoveAll(dir)
}
}()
Expand All @@ -309,7 +316,6 @@ func (s *BucketStore) addBlock(ctx context.Context, id ulid.ULID) (err error) {
b, err := newBucketBlock(ctx, log.With(s.logger, "block", id),
s.bucket, id, dir, s.indexCache, s.chunkPool)
if err != nil {
s.metrics.blockLoadFailures.Inc()
return err
}
s.mtx.Lock()
Expand Down Expand Up @@ -393,7 +399,6 @@ type seriesEntry struct {
type bucketSeriesSet struct {
set []seriesEntry
i int
// chks []storepb.AggrChunk
err error
}

Expand Down Expand Up @@ -564,7 +569,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag
if err != nil {
return errors.Errorf("aggregate %s does not exist", downsample.AggrSum)
}
out.Count = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: x.Bytes()}
out.Sum = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: x.Bytes()}
case storepb.Aggr_MIN:
x, err := ac.Get(downsample.AggrMin)
if err != nil {
Expand Down
224 changes: 152 additions & 72 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,23 @@ package store

import (
"context"
"errors"
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"

"github.com/oklog/ulid"

"github.com/improbable-eng/thanos/pkg/compact/downsample"

"github.com/improbable-eng/thanos/pkg/block"
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/store/storepb"
"github.com/improbable-eng/thanos/pkg/testutil"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/tsdb"
"github.com/prometheus/tsdb/labels"
)

Expand Down Expand Up @@ -190,97 +193,174 @@ func TestBucketStore_e2e(t *testing.T) {
testutil.Equals(t, 0, len(srv.SeriesSet))
}

func TestBucketBlock_matches(t *testing.T) {
makeMeta := func(mint, maxt int64, lset map[string]string) *block.Meta {
return &block.Meta{
BlockMeta: tsdb.BlockMeta{
MinTime: mint,
MaxTime: maxt,
},
Thanos: block.ThanosMeta{Labels: lset},
}
func TestBucketBlockSet_addGet(t *testing.T) {
set := newBucketBlockSet(labels.Labels{})

type resBlock struct {
mint, maxt int64
window int64
}
input := []resBlock{
// Blocks from 0 to 100 with raw resolution.
{window: downsample.ResLevel0, mint: 0, maxt: 100},
{window: downsample.ResLevel0, mint: 100, maxt: 200},
{window: downsample.ResLevel0, mint: 200, maxt: 300},
{window: downsample.ResLevel0, mint: 300, maxt: 400},
{window: downsample.ResLevel0, mint: 400, maxt: 500},
// Lower resolution data not covering last block.
{window: downsample.ResLevel1, mint: 0, maxt: 100},
{window: downsample.ResLevel1, mint: 100, maxt: 200},
{window: downsample.ResLevel1, mint: 200, maxt: 300},
{window: downsample.ResLevel1, mint: 300, maxt: 400},
// Lower resolution data only covering middle blocks.
{window: downsample.ResLevel2, mint: 100, maxt: 200},
{window: downsample.ResLevel2, mint: 200, maxt: 300},
{window: downsample.ResLevel2, mint: 300, maxt: 400},
}

for _, in := range input {
var m block.Meta
m.Thanos.Downsample.Resolution = in.window
m.MinTime = in.mint
m.MaxTime = in.maxt
set.add(&bucketBlock{meta: &m})
}

cases := []struct {
meta *block.Meta
mint, maxt int64
matchers []labels.Matcher
expBlockMatchers []labels.Matcher
ok bool
mint, maxt int64
minResolution int64
res []resBlock
}{
{
meta: makeMeta(100, 200, nil),
mint: 0,
maxt: 99,
ok: false,
},
{
meta: makeMeta(100, 200, nil),
mint: 0,
maxt: 100,
ok: true,
},
{
meta: makeMeta(100, 200, nil),
mint: 201,
maxt: 250,
ok: false,
mint: -100,
maxt: 1000,
minResolution: 0,
res: []resBlock{
{window: downsample.ResLevel0, mint: 0, maxt: 100},
{window: downsample.ResLevel0, mint: 100, maxt: 200},
{window: downsample.ResLevel0, mint: 200, maxt: 300},
{window: downsample.ResLevel0, mint: 300, maxt: 400},
{window: downsample.ResLevel0, mint: 400, maxt: 500},
},
}, {
mint: 100,
maxt: 400,
minResolution: downsample.ResLevel1 - 1,
res: []resBlock{
{window: downsample.ResLevel0, mint: 100, maxt: 200},
{window: downsample.ResLevel0, mint: 200, maxt: 300},
{window: downsample.ResLevel0, mint: 300, maxt: 400},
},
}, {
mint: 100,
maxt: 500,
minResolution: downsample.ResLevel1,
res: []resBlock{
{window: downsample.ResLevel1, mint: 100, maxt: 200},
{window: downsample.ResLevel1, mint: 200, maxt: 300},
{window: downsample.ResLevel1, mint: 300, maxt: 400},
{window: downsample.ResLevel0, mint: 400, maxt: 500},
},
}, {
mint: 0,
maxt: 500,
minResolution: downsample.ResLevel2,
res: []resBlock{
{window: downsample.ResLevel1, mint: 0, maxt: 100},
{window: downsample.ResLevel2, mint: 100, maxt: 200},
{window: downsample.ResLevel2, mint: 200, maxt: 300},
{window: downsample.ResLevel1, mint: 300, maxt: 400},
{window: downsample.ResLevel0, mint: 400, maxt: 500},
},
},
}
for i, c := range cases {
t.Logf("case %d", i)

var exp []*bucketBlock
for _, b := range c.res {
var m block.Meta
m.Thanos.Downsample.Resolution = b.window
m.MinTime = b.mint
m.MaxTime = b.maxt
exp = append(exp, &bucketBlock{meta: &m})
}
res := set.getFor(c.mint, c.maxt, c.minResolution)
testutil.Equals(t, exp, res)
}
}

func TestBucketBlockSet_remove(t *testing.T) {
set := newBucketBlockSet(labels.Labels{})

type resBlock struct {
id ulid.ULID
mint, maxt int64
}
input := []resBlock{
{id: ulid.MustNew(1, nil), mint: 0, maxt: 100},
{id: ulid.MustNew(2, nil), mint: 100, maxt: 200},
{id: ulid.MustNew(3, nil), mint: 200, maxt: 300},
}

for _, in := range input {
var m block.Meta
m.ULID = in.id
m.MinTime = in.mint
m.MaxTime = in.maxt
set.add(&bucketBlock{meta: &m})
}
set.remove(input[1].id)
res := set.getFor(0, 300, 0)

testutil.Equals(t, 2, len(res))
testutil.Equals(t, input[0].id, res[0].meta.ULID)
testutil.Equals(t, input[2].id, res[1].meta.ULID)
}

func TestBucketBlockSet_labelMatchers(t *testing.T) {
set := newBucketBlockSet(labels.FromStrings("a", "b", "c", "d"))

cases := []struct {
in []labels.Matcher
res []labels.Matcher
match bool
}{
{
meta: makeMeta(100, 200, nil),
mint: 200,
maxt: 250,
ok: true,
in: []labels.Matcher{},
res: []labels.Matcher{},
match: true,
},
{
meta: makeMeta(100, 200, nil),
mint: 150,
maxt: 160,
matchers: []labels.Matcher{
labels.NewEqualMatcher("a", "b"),
},
expBlockMatchers: []labels.Matcher{
in: []labels.Matcher{
labels.NewEqualMatcher("a", "b"),
labels.NewEqualMatcher("c", "d"),
},
ok: true,
res: []labels.Matcher{},
match: true,
},
{
meta: makeMeta(100, 200, map[string]string{"a": "b"}),
mint: 150,
maxt: 160,
matchers: []labels.Matcher{
in: []labels.Matcher{
labels.NewEqualMatcher("a", "b"),
labels.NewEqualMatcher("c", "b"),
},
ok: true,
match: false,
},
{
meta: makeMeta(100, 200, map[string]string{"a": "b"}),
mint: 150,
maxt: 160,
matchers: []labels.Matcher{
labels.NewEqualMatcher("a", "c"),
},
ok: false,
},
{
meta: makeMeta(100, 200, map[string]string{"a": "b"}),
mint: 150,
maxt: 160,
matchers: []labels.Matcher{
in: []labels.Matcher{
labels.NewEqualMatcher("a", "b"),
labels.NewEqualMatcher("d", "e"),
labels.NewEqualMatcher("e", "f"),
},
expBlockMatchers: []labels.Matcher{
labels.NewEqualMatcher("d", "e"),
res: []labels.Matcher{
labels.NewEqualMatcher("e", "f"),
},
ok: true,
match: true,
},
}

for i, c := range cases {
b := &bucketBlock{meta: c.meta}
blockMatchers, ok := b.blockMatchers(c.mint, c.maxt, c.matchers...)
testutil.Assert(t, c.ok == ok, "test case %d failed", i)
testutil.Equals(t, c.expBlockMatchers, blockMatchers)
for _, c := range cases {
res, ok := set.labelMatchers(c.in...)
testutil.Equals(t, c.match, ok)
testutil.Equals(t, c.res, res)
}
}

Expand Down

0 comments on commit 134d1cc

Please sign in to comment.