From bed577169e233201f5621e485cb3f16a67f666ce Mon Sep 17 00:00:00 2001
From: Povilas Versockas
Date: Tue, 13 Aug 2019 18:32:02 +0300
Subject: [PATCH] Store: Add Time & duration based partitioning
Signed-off-by: Povilas Versockas
---
cmd/thanos/store.go | 19 ++++
docs/components/store.md | 32 +++++++
pkg/model/timeduration.go | 75 +++++++++++++++
pkg/model/timeduration_test.go | 36 +++++++
pkg/store/bucket.go | 74 +++++++++++++++
pkg/store/bucket_e2e_test.go | 166 ++++++++++++++++++++++++++-------
pkg/store/bucket_test.go | 71 +++++++++++++-
7 files changed, 437 insertions(+), 36 deletions(-)
create mode 100644 pkg/model/timeduration.go
create mode 100644 pkg/model/timeduration_test.go
diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go
index d87ff2c89f..d2b277c151 100644
--- a/cmd/thanos/store.go
+++ b/cmd/thanos/store.go
@@ -11,6 +11,7 @@ import (
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
+ "github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store"
@@ -49,7 +50,19 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing blocks from object storage.").
Default("20").Int()
+ minTime := model.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve. Thanos Store serves only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
+ Default("0000-01-01T00:00:00Z"))
+
+ maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to serve. Thanos Store serves only blocks, which happened eariler than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
+ Default("9999-12-31T23:59:59Z"))
+
m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error {
+ // Sanity check Time filters
+ if minTime.PrometheusTimestamp() > maxTime.PrometheusTimestamp() {
+ return errors.Errorf("invalid argument: --min-time '%s' can't be greater than --max-time '%s'",
+ minTime, maxTime)
+ }
+
return runStore(g,
logger,
reg,
@@ -69,6 +82,10 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
debugLogging,
*syncInterval,
*blockSyncConcurrency,
+ &store.FilterConfig{
+ MinTime: *minTime,
+ MaxTime: *maxTime,
+ },
)
}
}
@@ -94,6 +111,7 @@ func runStore(
verbose bool,
syncInterval time.Duration,
blockSyncConcurrency int,
+ filterConf *store.FilterConfig,
) error {
{
confContentYaml, err := objStoreConfig.Content()
@@ -135,6 +153,7 @@ func runStore(
maxConcurrent,
verbose,
blockSyncConcurrency,
+ filterConf,
)
if err != nil {
return errors.Wrap(err, "create object storage store")
diff --git a/docs/components/store.md b/docs/components/store.md
index 3bc0c1e4ef..69c9db8de8 100644
--- a/docs/components/store.md
+++ b/docs/components/store.md
@@ -84,5 +84,37 @@ Flags:
--block-sync-concurrency=20
Number of goroutines to use when syncing blocks
from object storage.
+ --min-time=0000-01-01T00:00:00Z
+ Start of time range limit to serve. Thanos
+ Store serves only metrics, which happened later
+ than this value. Option can be a constant time
+ in RFC3339 format or time duration relative to
+ current time, such as -1d or 2h45m. Valid
+ duration units are ms, s, m, h, d, w, y.
+ --max-time=9999-12-31T23:59:59Z
+ End of time range limit to serve. Thanos Store
+ serves only blocks, which happened eariler than
+ this value. Option can be a constant time in
+ RFC3339 format or time duration relative to
+ current time, such as -1d or 2h45m. Valid
+ duration units are ms, s, m, h, d, w, y.
```
+
+## Time based partioning
+
+By default Thanos Store Gateway looks at all the data in Object Store and returns it based on query's time range.
+
+Thanos Store `--min-time`, `--max-time` flags allows you to shard Thanos Store based on constant time or time duration relative to current time.
+
+For example setting: `--min-time=-6w` & `--max-time==-2w` will make Thanos Store Gateway return metrics that fall within `now - 6 weeks` up to `now - 2 weeks` time range.
+
+Constant time needs to be set in RFC3339 format. For example `--min-time=2018-01-01T00:00:00Z`, `--max-time=2019-01-01T23:59:59Z`.
+
+Thanos Store Gateway might not get new blocks immediately, as Time partitioning is partly done in asynchronous block synchronization job, which is by default done every 3 minutes. Additionally some of the Object Store implementations provide eventual read-after-write consistency, which means that Thanos Store might not immediately get newly created & uploaded blocks anyway.
+
+We recommend having overlapping time ranges with Thanos Sidecar and other Thanos Store gateways as this will improve your resiliency to failures.
+
+Thanos Querier deals with overlapping time series by merging them together.
+
+Filtering is done on a Chunk level, so Thanos Store might still return Samples which are outside of `--min-time` & `--max-time`.
diff --git a/pkg/model/timeduration.go b/pkg/model/timeduration.go
new file mode 100644
index 0000000000..bbe766043f
--- /dev/null
+++ b/pkg/model/timeduration.go
@@ -0,0 +1,75 @@
+package model
+
+import (
+ "time"
+
+ "github.com/prometheus/common/model"
+ "github.com/prometheus/prometheus/pkg/timestamp"
+ "gopkg.in/alecthomas/kingpin.v2"
+)
+
+// TimeOrDurationValue is a custom kingping parser for time in RFC3339
+// or duration in Go's duration format, such as "300ms", "-1.5h" or "2h45m".
+// Only one will be set.
+type TimeOrDurationValue struct {
+ Time *time.Time
+ Dur *model.Duration
+}
+
+// Set converts string to TimeOrDurationValue.
+func (tdv *TimeOrDurationValue) Set(s string) error {
+ t, err := time.Parse(time.RFC3339, s)
+ if err == nil {
+ tdv.Time = &t
+ return nil
+ }
+
+ // error parsing time, let's try duration.
+ var minus bool
+ if s[0] == '-' {
+ minus = true
+ s = s[1:]
+ }
+ dur, err := model.ParseDuration(s)
+ if err != nil {
+ return err
+ }
+
+ if minus {
+ dur = dur * -1
+ }
+ tdv.Dur = &dur
+ return nil
+}
+
+// String returns either tume or duration.
+func (tdv *TimeOrDurationValue) String() string {
+ switch {
+ case tdv.Time != nil:
+ return tdv.Time.String()
+ case tdv.Dur != nil:
+ return tdv.Dur.String()
+ }
+
+ return "nil"
+}
+
+// PrometheusTimestamp returns TimeOrDurationValue converted to PrometheusTimestamp
+// if duration is set now+duration is converted to Timestamp.
+func (tdv *TimeOrDurationValue) PrometheusTimestamp() int64 {
+ switch {
+ case tdv.Time != nil:
+ return timestamp.FromTime(*tdv.Time)
+ case tdv.Dur != nil:
+ return timestamp.FromTime(time.Now().Add(time.Duration(*tdv.Dur)))
+ }
+
+ return 0
+}
+
+// TimeOrDuration helper for parsing TimeOrDuration with kingpin.
+func TimeOrDuration(flags *kingpin.FlagClause) *TimeOrDurationValue {
+ value := new(TimeOrDurationValue)
+ flags.SetValue(value)
+ return value
+}
diff --git a/pkg/model/timeduration_test.go b/pkg/model/timeduration_test.go
new file mode 100644
index 0000000000..d3d3eba271
--- /dev/null
+++ b/pkg/model/timeduration_test.go
@@ -0,0 +1,36 @@
+package model_test
+
+import (
+ "testing"
+ "time"
+
+ "github.com/prometheus/prometheus/pkg/timestamp"
+ "github.com/thanos-io/thanos/pkg/model"
+ "github.com/thanos-io/thanos/pkg/testutil"
+ "gopkg.in/alecthomas/kingpin.v2"
+)
+
+func TestTimeOrDurationValue(t *testing.T) {
+ cmd := kingpin.New("test", "test")
+
+ minTime := model.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve"))
+
+ maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to serve").
+ Default("9999-12-31T23:59:59Z"))
+
+ _, err := cmd.Parse([]string{"--min-time", "10s"})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ testutil.Equals(t, "10s", minTime.String())
+ testutil.Equals(t, "9999-12-31 23:59:59 +0000 UTC", maxTime.String())
+
+ prevTime := timestamp.FromTime(time.Now())
+ afterTime := timestamp.FromTime(time.Now().Add(15 * time.Second))
+
+ testutil.Assert(t, minTime.PrometheusTimestamp() > prevTime, "minTime prometheus timestamp is less than time now.")
+ testutil.Assert(t, minTime.PrometheusTimestamp() < afterTime, "minTime prometheus timestamp is more than time now + 15s")
+
+ testutil.Assert(t, 253402300799000 == maxTime.PrometheusTimestamp(), "maxTime is not equal to 253402300799000")
+}
diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go
index 47965cd21a..c057f7684a 100644
--- a/pkg/store/bucket.go
+++ b/pkg/store/bucket.go
@@ -32,6 +32,7 @@ import (
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extprom"
+ "github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/pool"
"github.com/thanos-io/thanos/pkg/runutil"
@@ -182,6 +183,11 @@ type indexCache interface {
Series(b ulid.ULID, id uint64) ([]byte, bool)
}
+// FilterConfig is a configuration, which Store uses for filtering metrics.
+type FilterConfig struct {
+ MinTime, MaxTime model.TimeOrDurationValue
+}
+
// BucketStore implements the store API backed by a bucket. It loads all index
// files to local disk.
type BucketStore struct {
@@ -208,6 +214,8 @@ type BucketStore struct {
// samplesLimiter limits the number of samples per each Series() call.
samplesLimiter *Limiter
partitioner partitioner
+
+ filterConfig *FilterConfig
}
// NewBucketStore creates a new bucket backed store that implements the store API against
@@ -223,6 +231,7 @@ func NewBucketStore(
maxConcurrent int,
debugLogging bool,
blockSyncConcurrency int,
+ filterConf *FilterConfig,
) (*BucketStore, error) {
if logger == nil {
logger = log.NewNopLogger()
@@ -256,6 +265,7 @@ func NewBucketStore(
),
samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped),
partitioner: gapBasedPartitioner{maxGapSize: maxGapSize},
+ filterConfig: filterConf,
}
s.metrics = metrics
@@ -309,6 +319,17 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error {
if err != nil {
return nil
}
+
+ inRange, err := s.isBlockInMinMaxRange(ctx, id)
+ if err != nil {
+ level.Warn(s.logger).Log("msg", "error parsing block range", "block", id, "err", err)
+ return nil
+ }
+
+ if !inRange {
+ return nil
+ }
+
allIDs[id] = struct{}{}
if b := s.getBlock(id); b != nil {
@@ -377,6 +398,31 @@ func (s *BucketStore) numBlocks() int {
return len(s.blocks)
}
+func (s *BucketStore) isBlockInMinMaxRange(ctx context.Context, id ulid.ULID) (bool, error) {
+ dir := filepath.Join(s.dir, id.String())
+
+ b := &bucketBlock{
+ logger: s.logger,
+ bucket: s.bucket,
+ id: id,
+ dir: dir,
+ }
+ if err := b.loadMeta(ctx, id); err != nil {
+ return false, err
+ }
+
+ // We check for blocks in configured minTime, maxTime range
+ switch {
+ case b.meta.MaxTime <= s.filterConfig.MinTime.PrometheusTimestamp():
+ return false, nil
+
+ case b.meta.MinTime >= s.filterConfig.MaxTime.PrometheusTimestamp():
+ return false, nil
+ }
+
+ return true, nil
+}
+
func (s *BucketStore) getBlock(id ulid.ULID) *bucketBlock {
s.mtx.RLock()
defer s.mtx.RUnlock()
@@ -468,6 +514,10 @@ func (s *BucketStore) TimeRange() (mint, maxt int64) {
maxt = b.meta.MaxTime
}
}
+
+ mint = s.normalizeMinTime(mint)
+ maxt = s.normalizeMaxTime(maxt)
+
return mint, maxt
}
@@ -482,6 +532,26 @@ func (s *BucketStore) Info(context.Context, *storepb.InfoRequest) (*storepb.Info
}, nil
}
+func (s *BucketStore) normalizeMinTime(mint int64) int64 {
+ filterMinTime := s.filterConfig.MinTime.PrometheusTimestamp()
+
+ if mint < filterMinTime {
+ return filterMinTime
+ }
+
+ return mint
+}
+
+func (s *BucketStore) normalizeMaxTime(maxt int64) int64 {
+ filterMaxTime := s.filterConfig.MaxTime.PrometheusTimestamp()
+
+ if maxt > filterMaxTime {
+ maxt = filterMaxTime
+ }
+
+ return maxt
+}
+
type seriesEntry struct {
lset []storepb.Label
refs []uint64
@@ -722,6 +792,10 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
+ // Adjust Request MinTime based on filters.
+ req.MinTime = s.normalizeMinTime(req.MinTime)
+ req.MaxTime = s.normalizeMaxTime(req.MaxTime)
+
var (
stats = &queryStats{}
g run.Group
diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go
index 2185e0d161..272b8a45b2 100644
--- a/pkg/store/bucket_e2e_test.go
+++ b/pkg/store/bucket_e2e_test.go
@@ -9,15 +9,16 @@ import (
"testing"
"time"
- "github.com/oklog/ulid"
-
"github.com/go-kit/kit/log"
+ "github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/tsdb/labels"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
+ "github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore"
+ "github.com/thanos-io/thanos/pkg/objstore/inmem"
"github.com/thanos-io/thanos/pkg/objstore/objtesting"
"github.com/thanos-io/thanos/pkg/runutil"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
@@ -25,6 +26,17 @@ import (
"github.com/thanos-io/thanos/pkg/testutil"
)
+var (
+ minTime = time.Unix(0, 0)
+ maxTime, _ = time.Parse(time.RFC3339, "9999-12-31T23:59:59Z")
+ minTimeDuration = model.TimeOrDurationValue{Time: &minTime}
+ maxTimeDuration = model.TimeOrDurationValue{Time: &maxTime}
+ filterConf = &FilterConfig{
+ MinTime: minTimeDuration,
+ MaxTime: maxTimeDuration,
+ }
+)
+
type noopCache struct{}
func (noopCache) SetPostings(b ulid.ULID, l labels.Label, v []byte) {}
@@ -72,38 +84,20 @@ func (s *storeSuite) Close() {
s.wg.Wait()
}
-func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, maxSampleCount uint64) *storeSuite {
- series := []labels.Labels{
- labels.FromStrings("a", "1", "b", "1"),
- labels.FromStrings("a", "1", "b", "2"),
- labels.FromStrings("a", "2", "b", "1"),
- labels.FromStrings("a", "2", "b", "2"),
- labels.FromStrings("a", "1", "c", "1"),
- labels.FromStrings("a", "1", "c", "2"),
- labels.FromStrings("a", "2", "c", "1"),
- labels.FromStrings("a", "2", "c", "2"),
- }
- extLset := labels.FromStrings("ext1", "value1")
+func prepareTestBlocks(t testing.TB, now time.Time, count int, dir string, bkt objstore.Bucket,
+ series []labels.Labels, extLset labels.Labels) (blocks int, minTime, maxTime int64) {
+ ctx := context.Background()
+ logger := log.NewNopLogger()
- start := time.Now()
- now := start
-
- ctx, cancel := context.WithCancel(context.Background())
- s := &storeSuite{
- cancel: cancel,
- logger: log.NewLogfmtLogger(os.Stderr),
- cache: &swappableCache{},
- }
- blocks := 0
- for i := 0; i < 3; i++ {
+ for i := 0; i < count; i++ {
mint := timestamp.FromTime(now)
now = now.Add(2 * time.Hour)
maxt := timestamp.FromTime(now)
- if s.minTime == 0 {
- s.minTime = mint
+ if minTime == 0 {
+ minTime = mint
}
- s.maxTime = maxt
+ maxTime = maxt
// Create two blocks per time slot. Only add 10 samples each so only one chunk
// gets created each. This way we can easily verify we got 10 chunks per series below.
@@ -118,19 +112,46 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
meta, err := metadata.Read(dir2)
testutil.Ok(t, err)
meta.Thanos.Labels = map[string]string{"ext2": "value2"}
- testutil.Ok(t, metadata.Write(s.logger, dir2, meta))
+ testutil.Ok(t, metadata.Write(logger, dir2, meta))
- testutil.Ok(t, block.Upload(ctx, s.logger, bkt, dir1))
- testutil.Ok(t, block.Upload(ctx, s.logger, bkt, dir2))
+ testutil.Ok(t, block.Upload(ctx, logger, bkt, dir1))
+ testutil.Ok(t, block.Upload(ctx, logger, bkt, dir2))
blocks += 2
testutil.Ok(t, os.RemoveAll(dir1))
testutil.Ok(t, os.RemoveAll(dir2))
}
- store, err := NewBucketStore(s.logger, nil, bkt, dir, s.cache, 0, maxSampleCount, 20, false, 20)
- testutil.Ok(t, err)
+ return
+}
+
+func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, maxSampleCount uint64) *storeSuite {
+ series := []labels.Labels{
+ labels.FromStrings("a", "1", "b", "1"),
+ labels.FromStrings("a", "1", "b", "2"),
+ labels.FromStrings("a", "2", "b", "1"),
+ labels.FromStrings("a", "2", "b", "2"),
+ labels.FromStrings("a", "1", "c", "1"),
+ labels.FromStrings("a", "1", "c", "2"),
+ labels.FromStrings("a", "2", "c", "1"),
+ labels.FromStrings("a", "2", "c", "2"),
+ }
+ extLset := labels.FromStrings("ext1", "value1")
+ blocks, minTime, maxTime := prepareTestBlocks(t, time.Now(), 3, dir, bkt,
+ series, extLset)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ s := &storeSuite{
+ cancel: cancel,
+ logger: log.NewLogfmtLogger(os.Stderr),
+ cache: &swappableCache{},
+ minTime: minTime,
+ maxTime: maxTime,
+ }
+
+ store, err := NewBucketStore(s.logger, nil, bkt, dir, s.cache, 0, maxSampleCount, 20, false, 20, filterConf)
+ testutil.Ok(t, err)
s.store = store
if manyParts {
@@ -144,8 +165,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
if err := runutil.Repeat(100*time.Millisecond, ctx.Done(), func() error {
return store.SyncBlocks(ctx)
}); err != nil && errors.Cause(err) != context.Canceled {
- t.Error(err)
- t.FailNow()
+ t.Fatal(err)
}
}()
@@ -433,3 +453,79 @@ func TestBucketStore_ManyParts_e2e(t *testing.T) {
testBucketStore_e2e(t, ctx, s)
})
}
+
+func TestBucketStore_TimePartitioning_e2e(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ bkt := inmem.NewBucket()
+
+ dir, err := ioutil.TempDir("", "test_bucket_time_part_e2e")
+ testutil.Ok(t, err)
+ defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()
+
+ series := []labels.Labels{
+ labels.FromStrings("a", "1", "b", "1"),
+ labels.FromStrings("a", "1", "b", "1"),
+ labels.FromStrings("a", "1", "b", "1"),
+ labels.FromStrings("a", "1", "b", "1"),
+ labels.FromStrings("a", "1", "b", "2"),
+ labels.FromStrings("a", "1", "b", "2"),
+ labels.FromStrings("a", "1", "b", "2"),
+ labels.FromStrings("a", "1", "b", "2"),
+ }
+ extLset := labels.FromStrings("ext1", "value1")
+
+ _, minTime, _ := prepareTestBlocks(t, time.Now(), 3, dir, bkt, series, extLset)
+
+ hourAfter := time.Now().Add(1 * time.Hour)
+ filterMaxTime := model.TimeOrDurationValue{Time: &hourAfter}
+
+ store, err := NewBucketStore(nil, nil, bkt, dir, noopCache{}, 0, 0, 20, false, 20,
+ &FilterConfig{
+ MinTime: minTimeDuration,
+ MaxTime: filterMaxTime,
+ })
+ testutil.Ok(t, err)
+
+ err = store.SyncBlocks(ctx)
+ testutil.Ok(t, err)
+
+ mint, maxt := store.TimeRange()
+ testutil.Equals(t, minTime, mint)
+ testutil.Equals(t, filterMaxTime.PrometheusTimestamp(), maxt)
+
+ for i, tcase := range []struct {
+ req *storepb.SeriesRequest
+ expectedLabels [][]storepb.Label
+ expectedChunks int
+ }{
+ {
+ req: &storepb.SeriesRequest{
+ Matchers: []storepb.LabelMatcher{
+ {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "1"},
+ },
+ MinTime: mint,
+ MaxTime: timestamp.FromTime(time.Now().AddDate(0, 0, 1)),
+ },
+ expectedLabels: [][]storepb.Label{
+ {{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
+ {{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext2", Value: "value2"}},
+ },
+ // prepareTestBlocks makes 3 chunks containing 2 hour data,
+ // we should only get 1, as we are filtering.
+ expectedChunks: 1,
+ },
+ } {
+ t.Log("Run", i)
+
+ srv := newStoreSeriesServer(ctx)
+
+ testutil.Ok(t, store.Series(tcase.req, srv))
+ testutil.Equals(t, len(tcase.expectedLabels), len(srv.SeriesSet))
+
+ for i, s := range srv.SeriesSet {
+ testutil.Equals(t, tcase.expectedLabels[i], s.Labels)
+ testutil.Equals(t, tcase.expectedChunks, len(s.Chunks))
+ }
+ }
+}
diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go
index 0a85f7cc49..07432340c9 100644
--- a/pkg/store/bucket_test.go
+++ b/pkg/store/bucket_test.go
@@ -4,17 +4,23 @@ import (
"context"
"io/ioutil"
"math"
+ "path/filepath"
"testing"
"time"
"github.com/fortytw2/leaktest"
+ "github.com/go-kit/kit/log"
"github.com/leanovate/gopter"
"github.com/leanovate/gopter/gen"
"github.com/leanovate/gopter/prop"
"github.com/oklog/ulid"
+ prommodel "github.com/prometheus/common/model"
+ "github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/tsdb/labels"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact/downsample"
+ "github.com/thanos-io/thanos/pkg/model"
+ "github.com/thanos-io/thanos/pkg/objstore/inmem"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/testutil"
)
@@ -416,7 +422,7 @@ func TestBucketStore_Info(t *testing.T) {
dir, err := ioutil.TempDir("", "prometheus-test")
testutil.Ok(t, err)
- bucketStore, err := NewBucketStore(nil, nil, nil, dir, noopCache{}, 2e5, 0, 0, false, 20)
+ bucketStore, err := NewBucketStore(nil, nil, nil, dir, noopCache{}, 2e5, 0, 0, false, 20, filterConf)
testutil.Ok(t, err)
resp, err := bucketStore.Info(ctx, &storepb.InfoRequest{})
@@ -426,3 +432,66 @@ func TestBucketStore_Info(t *testing.T) {
testutil.Equals(t, int64(math.MaxInt64), resp.MinTime)
testutil.Equals(t, int64(math.MinInt64), resp.MaxTime)
}
+
+func TestBucketStore_isBlockInMinMaxRange(t *testing.T) {
+ ctx := context.TODO()
+ dir, err := ioutil.TempDir("", "block-min-max-test")
+ testutil.Ok(t, err)
+
+ series := []labels.Labels{labels.FromStrings("a", "1", "b", "1")}
+ extLset := labels.FromStrings("ext1", "value1")
+
+ // Create a block in range [-2w, -1w]
+ id1, err := testutil.CreateBlock(ctx, dir, series, 10,
+ timestamp.FromTime(time.Now().Add(-14*24*time.Hour)),
+ timestamp.FromTime(time.Now().Add(-7*24*time.Hour)),
+ extLset, 0)
+ testutil.Ok(t, err)
+
+ // Create a block in range [-1w, 0w]
+ id2, err := testutil.CreateBlock(ctx, dir, series, 10,
+ timestamp.FromTime(time.Now().Add(-7*24*time.Hour)),
+ timestamp.FromTime(time.Now().Add(-0*24*time.Hour)),
+ extLset, 0)
+ testutil.Ok(t, err)
+
+ // Create a block in range [+1w, +2w]
+ id3, err := testutil.CreateBlock(ctx, dir, series, 10,
+ timestamp.FromTime(time.Now().Add(7*24*time.Hour)),
+ timestamp.FromTime(time.Now().Add(14*24*time.Hour)),
+ extLset, 0)
+ testutil.Ok(t, err)
+
+ dir1, dir2 := filepath.Join(dir, id1.String()), filepath.Join(dir, id2.String())
+ meta1, err := metadata.Read(dir1)
+ testutil.Ok(t, err)
+ testutil.Ok(t, metadata.Write(log.NewNopLogger(), dir1, meta1))
+
+ meta2, err := metadata.Read(dir2)
+ testutil.Ok(t, err)
+ testutil.Ok(t, metadata.Write(log.NewNopLogger(), dir2, meta2))
+
+ // Run actual test
+ hourBeforeDur := prommodel.Duration(-1 * time.Hour)
+ hourBefore := model.TimeOrDurationValue{Dur: &hourBeforeDur}
+
+ // bucketStore accepts blocks in range [0, now-1h]
+ bucketStore, err := NewBucketStore(nil, nil, inmem.NewBucket(), dir, noopCache{}, 0, 0, 20, false, 20,
+ &FilterConfig{
+ MinTime: minTimeDuration,
+ MaxTime: hourBefore,
+ })
+ testutil.Ok(t, err)
+
+ inRange, err := bucketStore.isBlockInMinMaxRange(context.TODO(), id1)
+ testutil.Ok(t, err)
+ testutil.Equals(t, true, inRange)
+
+ inRange, err = bucketStore.isBlockInMinMaxRange(context.TODO(), id2)
+ testutil.Ok(t, err)
+ testutil.Equals(t, true, inRange)
+
+ inRange, err = bucketStore.isBlockInMinMaxRange(context.TODO(), id3)
+ testutil.Ok(t, err)
+ testutil.Equals(t, false, inRange)
+}