From b7439f618db163a1f6bf43d5f4cd96cf9a196852 Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Tue, 12 May 2020 19:21:24 +0100 Subject: [PATCH] Added receiver multidb unit tests for basic cases. Unfortunately, all passes. ): Signed-off-by: Bartlomiej Plotka --- pkg/receive/handler_test.go | 4 + pkg/receive/multitsdb.go | 70 ++++----- pkg/receive/multitsdb_test.go | 257 ++++++++++++++++++++++++++++++++++ pkg/receive/tsdb_test.go | 3 + 4 files changed, 299 insertions(+), 35 deletions(-) create mode 100644 pkg/receive/multitsdb_test.go diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index 80eb005bd28..8be28333b63 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -13,7 +13,9 @@ import ( "strconv" "sync" "testing" + "time" + "github.com/fortytw2/leaktest" "github.com/go-kit/kit/log" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" @@ -180,6 +182,8 @@ func newHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64) } func TestReceive(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second) + appenderErrFn := func() error { return errors.New("failed to get appender") } conflictErrFn := func() error { return storage.ErrOutOfBounds } commitErrFn := func() error { return errors.New("failed to commit") } diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 1074f059be1..b907da13159 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -31,7 +31,7 @@ type MultiTSDB struct { dataDir string logger log.Logger reg prometheus.Registerer - tsdbCfg *tsdb.Options + tsdbOpts *tsdb.Options tenantLabelName string labels labels.Labels bucket objstore.Bucket @@ -40,8 +40,34 @@ type MultiTSDB struct { tenants map[string]*tenant } +func NewMultiTSDB( + dataDir string, + l log.Logger, + reg prometheus.Registerer, + tsdbOpts *tsdb.Options, + labels labels.Labels, + tenantLabelName string, + bucket objstore.Bucket, +) *MultiTSDB { + if l == nil { + l = log.NewNopLogger() + } + + return &MultiTSDB{ + dataDir: dataDir, + logger: l, + reg: reg, + tsdbOpts: tsdbOpts, + mtx: &sync.RWMutex{}, + tenants: map[string]*tenant{}, + labels: labels, + tenantLabelName: tenantLabelName, + bucket: bucket, + } +} + type tenant struct { - tsdbCfg *tsdb.Options + tsdbOpts *tsdb.Options readyS *tsdb.ReadyStorage fs *FlushableStorage @@ -51,11 +77,11 @@ type tenant struct { mtx *sync.RWMutex } -func newTenant(tsdbCfg *tsdb.Options) *tenant { +func newTenant(tsdbOpts *tsdb.Options) *tenant { return &tenant{ - tsdbCfg: tsdbCfg, - readyS: &tsdb.ReadyStorage{}, - mtx: &sync.RWMutex{}, + tsdbOpts: tsdbOpts, + readyS: &tsdb.ReadyStorage{}, + mtx: &sync.RWMutex{}, } } @@ -82,7 +108,7 @@ func (t *tenant) flushableStorage() *FlushableStorage { } func (t *tenant) set(tstore *store.TSDBStore, fs *FlushableStorage, ship *shipper.Shipper) { - t.readyS.Set(fs.Get(), int64(2*time.Duration(t.tsdbCfg.MinBlockDuration).Seconds()*1000)) + t.readyS.Set(fs.Get(), int64(2*time.Duration(t.tsdbOpts.MinBlockDuration).Seconds()*1000)) t.mtx.Lock() t.fs = fs t.s = tstore @@ -90,32 +116,6 @@ func (t *tenant) set(tstore *store.TSDBStore, fs *FlushableStorage, ship *shippe t.mtx.Unlock() } -func NewMultiTSDB( - dataDir string, - l log.Logger, - reg prometheus.Registerer, - tsdbCfg *tsdb.Options, - labels labels.Labels, - tenantLabelName string, - bucket objstore.Bucket, -) *MultiTSDB { - if l == nil { - l = log.NewNopLogger() - } - - return &MultiTSDB{ - dataDir: dataDir, - logger: l, - reg: reg, - tsdbCfg: tsdbCfg, - mtx: &sync.RWMutex{}, - tenants: map[string]*tenant{}, - labels: labels, - tenantLabelName: tenantLabelName, - bucket: bucket, - } -} - func (t *MultiTSDB) Open() error { if err := os.MkdirAll(t.dataDir, 0777); err != nil { return err @@ -232,7 +232,7 @@ func (t *MultiTSDB) getOrLoadTenant(tenantID string, blockingStart bool) (*tenan return tenant, nil } - tenant = newTenant(t.tsdbCfg) + tenant = newTenant(t.tsdbOpts) t.tenants[tenantID] = tenant t.mtx.Unlock() @@ -261,7 +261,7 @@ func (t *MultiTSDB) getOrLoadTenant(tenantID string, blockingStart bool) (*tenan dataDir, logger, reg, - t.tsdbCfg, + t.tsdbOpts, ) // Assign to outer error to report in blocking case. diff --git a/pkg/receive/multitsdb_test.go b/pkg/receive/multitsdb_test.go new file mode 100644 index 00000000000..b9c47b1c184 --- /dev/null +++ b/pkg/receive/multitsdb_test.go @@ -0,0 +1,257 @@ +package receive + +import ( + "context" + "fmt" + "io/ioutil" + "os" + "testing" + "time" + + "github.com/fortytw2/leaktest" + "github.com/go-kit/kit/log" + "github.com/gogo/protobuf/types" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/storage/tsdb" + "github.com/thanos-io/thanos/pkg/runutil" + "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/testutil" + "golang.org/x/sync/errgroup" +) + +func TestMultiTSDB(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second) + + dir, err := ioutil.TempDir("", "test") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() + + logger := log.NewNopLogger() + t.Run("run fresh", func(t *testing.T) { + m := NewMultiTSDB( + dir, logger, prometheus.NewRegistry(), &tsdb.Options{ + MinBlockDuration: model.Duration(2 * time.Hour), + MaxBlockDuration: model.Duration(2 * time.Hour), + RetentionDuration: model.Duration(6 * time.Hour), + NoLockfile: true, + }, + labels.FromStrings("replica", "01"), + "tenant_id", + nil, + ) + defer testutil.Ok(t, m.Flush()) + + testutil.Ok(t, m.Flush()) + testutil.Ok(t, m.Open()) + + app, err := m.TenantAppendable("foo") + testutil.Ok(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + var a storage.Appender + testutil.Ok(t, runutil.Retry(1*time.Second, ctx.Done(), func() error { + a, err = app.Appender() + return err + })) + + _, err = a.Add(labels.FromStrings("a", "1", "b", "2"), 1, 2.41241) + testutil.Ok(t, err) + _, err = a.Add(labels.FromStrings("a", "1", "b", "2"), 2, 3.41241) + testutil.Ok(t, err) + _, err = a.Add(labels.FromStrings("a", "1", "b", "2"), 3, 4.41241) + testutil.Ok(t, err) + testutil.Ok(t, a.Commit()) + + // Check if not leaking. + _, err = m.TenantAppendable("foo") + testutil.Ok(t, err) + _, err = m.TenantAppendable("foo") + testutil.Ok(t, err) + _, err = m.TenantAppendable("foo") + testutil.Ok(t, err) + + ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + app, err = m.TenantAppendable("bar") + testutil.Ok(t, err) + + testutil.Ok(t, runutil.Retry(1*time.Second, ctx.Done(), func() error { + a, err = app.Appender() + return err + })) + + _, err = a.Add(labels.FromStrings("a", "1", "b", "2"), 1, 20.41241) + testutil.Ok(t, err) + _, err = a.Add(labels.FromStrings("a", "1", "b", "2"), 2, 30.41241) + testutil.Ok(t, err) + _, err = a.Add(labels.FromStrings("a", "1", "b", "2"), 3, 40.41241) + testutil.Ok(t, err) + testutil.Ok(t, a.Commit()) + + testMulitTSDBSeries(t, m) + }) + t.Run("run on existing storage", func(t *testing.T) { + m := NewMultiTSDB( + dir, logger, prometheus.NewRegistry(), &tsdb.Options{ + MinBlockDuration: model.Duration(2 * time.Hour), + MaxBlockDuration: model.Duration(2 * time.Hour), + RetentionDuration: model.Duration(6 * time.Hour), + NoLockfile: true, + }, + labels.FromStrings("replica", "01"), + "tenant_id", + nil, + ) + defer testutil.Ok(t, m.Flush()) + + testutil.Ok(t, m.Flush()) + testutil.Ok(t, m.Open()) + + // Get appender just for test. + app, err := m.TenantAppendable("foo") + testutil.Ok(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + testutil.Ok(t, runutil.Retry(1*time.Second, ctx.Done(), func() error { + _, err := app.Appender() + return err + })) + + // Check if not leaking. + _, err = m.TenantAppendable("foo") + testutil.Ok(t, err) + _, err = m.TenantAppendable("foo") + testutil.Ok(t, err) + _, err = m.TenantAppendable("foo") + testutil.Ok(t, err) + + testMulitTSDBSeries(t, m) + }) +} + +var ( + expectedFooResp = []storepb.Series{ + { + Labels: []storepb.Label{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "replica", Value: "01"}, {Name: "tenant_id", Value: "foo"}}, + Chunks: []storepb.AggrChunk{{MinTime: 1, MaxTime: 3, Raw: &storepb.Chunk{Data: []byte("\000\003\002@\003L\235\2354X\315\001\330\r\257Mui\251\327:U")}}}, + }, + } + expectedBarResp = []storepb.Series{ + { + Labels: []storepb.Label{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "replica", Value: "01"}, {Name: "tenant_id", Value: "bar"}}, + Chunks: []storepb.AggrChunk{{MinTime: 1, MaxTime: 3, Raw: &storepb.Chunk{Data: []byte("\000\003\002@4i\223\263\246\213\032\001\330\035i\337\322\352\323S\256t\270")}}}, + }, + } +) + +func testMulitTSDBSeries(t *testing.T, m *MultiTSDB) { + g := &errgroup.Group{} + respFoo := make(chan []storepb.Series) + respBar := make(chan []storepb.Series) + for i := 0; i < 100; i++ { + s := m.TSDBStores() + testutil.Assert(t, len(s) == 2) + + g.Go(func() error { + srv := newStoreSeriesServer(context.Background()) + if err := s["foo"].Series(&storepb.SeriesRequest{ + MinTime: 0, + MaxTime: 10, + Matchers: []storepb.LabelMatcher{{Name: "a", Value: ".*", Type: storepb.LabelMatcher_RE}}, + }, srv); err != nil { + return err + } + respFoo <- srv.SeriesSet + return nil + }) + g.Go(func() error { + srv := newStoreSeriesServer(context.Background()) + if err := s["bar"].Series(&storepb.SeriesRequest{ + MinTime: 0, + MaxTime: 10, + Matchers: []storepb.LabelMatcher{{Name: "a", Value: ".*", Type: storepb.LabelMatcher_RE}}, + }, srv); err != nil { + return err + } + respBar <- srv.SeriesSet + return nil + }) + } + var err error + go func() { + err = g.Wait() + close(respFoo) + close(respBar) + }() +Outer: + for { + select { + case r, ok := <-respFoo: + if !ok { + break Outer + } + fmt.Println(r[0].String()) + testutil.Equals(t, expectedFooResp, r) + case r, ok := <-respBar: + if !ok { + break Outer + } + fmt.Println(r[0].String()) + testutil.Equals(t, expectedBarResp, r) + } + } + testutil.Ok(t, err) +} + +// storeSeriesServer is test gRPC storeAPI series server. +// TODO(bwplotka): Make this part of some common library. We copy and paste this also in pkg/store. +type storeSeriesServer struct { + // This field just exist to pseudo-implement the unused methods of the interface. + storepb.Store_SeriesServer + + ctx context.Context + + SeriesSet []storepb.Series + Warnings []string + HintsSet []*types.Any + + Size int64 +} + +func newStoreSeriesServer(ctx context.Context) *storeSeriesServer { + return &storeSeriesServer{ctx: ctx} +} + +func (s *storeSeriesServer) Send(r *storepb.SeriesResponse) error { + s.Size += int64(r.Size()) + + if r.GetWarning() != "" { + s.Warnings = append(s.Warnings, r.GetWarning()) + return nil + } + + if r.GetSeries() != nil { + s.SeriesSet = append(s.SeriesSet, *r.GetSeries()) + return nil + } + + if r.GetHints() != nil { + s.HintsSet = append(s.HintsSet, r.GetHints()) + return nil + } + + // Unsupported field, skip. + return nil +} + +func (s *storeSeriesServer) Context() context.Context { + return s.ctx +} diff --git a/pkg/receive/tsdb_test.go b/pkg/receive/tsdb_test.go index 5a527bef0f9..ec5c7fd37f5 100644 --- a/pkg/receive/tsdb_test.go +++ b/pkg/receive/tsdb_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/fortytw2/leaktest" "github.com/go-kit/kit/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" @@ -19,6 +20,8 @@ import ( ) func TestFlushableStorage(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second) + { // Ensure that flushing storage does not cause data loss. // This test: