Skip to content

Commit

Permalink
Added receiver multidb unit tests for basic cases.
Browse files Browse the repository at this point in the history
Unfortunately, all passes. ):

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed May 12, 2020
1 parent 3a50383 commit b7439f6
Show file tree
Hide file tree
Showing 4 changed files with 299 additions and 35 deletions.
4 changes: 4 additions & 0 deletions pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (
"strconv"
"sync"
"testing"
"time"

"github.com/fortytw2/leaktest"
"github.com/go-kit/kit/log"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
Expand Down Expand Up @@ -180,6 +182,8 @@ func newHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64)
}

func TestReceive(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)

appenderErrFn := func() error { return errors.New("failed to get appender") }
conflictErrFn := func() error { return storage.ErrOutOfBounds }
commitErrFn := func() error { return errors.New("failed to commit") }
Expand Down
70 changes: 35 additions & 35 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type MultiTSDB struct {
dataDir string
logger log.Logger
reg prometheus.Registerer
tsdbCfg *tsdb.Options
tsdbOpts *tsdb.Options
tenantLabelName string
labels labels.Labels
bucket objstore.Bucket
Expand All @@ -40,8 +40,34 @@ type MultiTSDB struct {
tenants map[string]*tenant
}

func NewMultiTSDB(
dataDir string,
l log.Logger,
reg prometheus.Registerer,
tsdbOpts *tsdb.Options,
labels labels.Labels,
tenantLabelName string,
bucket objstore.Bucket,
) *MultiTSDB {
if l == nil {
l = log.NewNopLogger()
}

return &MultiTSDB{
dataDir: dataDir,
logger: l,
reg: reg,
tsdbOpts: tsdbOpts,
mtx: &sync.RWMutex{},
tenants: map[string]*tenant{},
labels: labels,
tenantLabelName: tenantLabelName,
bucket: bucket,
}
}

type tenant struct {
tsdbCfg *tsdb.Options
tsdbOpts *tsdb.Options

readyS *tsdb.ReadyStorage
fs *FlushableStorage
Expand All @@ -51,11 +77,11 @@ type tenant struct {
mtx *sync.RWMutex
}

func newTenant(tsdbCfg *tsdb.Options) *tenant {
func newTenant(tsdbOpts *tsdb.Options) *tenant {
return &tenant{
tsdbCfg: tsdbCfg,
readyS: &tsdb.ReadyStorage{},
mtx: &sync.RWMutex{},
tsdbOpts: tsdbOpts,
readyS: &tsdb.ReadyStorage{},
mtx: &sync.RWMutex{},
}
}

Expand All @@ -82,40 +108,14 @@ func (t *tenant) flushableStorage() *FlushableStorage {
}

func (t *tenant) set(tstore *store.TSDBStore, fs *FlushableStorage, ship *shipper.Shipper) {
t.readyS.Set(fs.Get(), int64(2*time.Duration(t.tsdbCfg.MinBlockDuration).Seconds()*1000))
t.readyS.Set(fs.Get(), int64(2*time.Duration(t.tsdbOpts.MinBlockDuration).Seconds()*1000))
t.mtx.Lock()
t.fs = fs
t.s = tstore
t.ship = ship
t.mtx.Unlock()
}

func NewMultiTSDB(
dataDir string,
l log.Logger,
reg prometheus.Registerer,
tsdbCfg *tsdb.Options,
labels labels.Labels,
tenantLabelName string,
bucket objstore.Bucket,
) *MultiTSDB {
if l == nil {
l = log.NewNopLogger()
}

return &MultiTSDB{
dataDir: dataDir,
logger: l,
reg: reg,
tsdbCfg: tsdbCfg,
mtx: &sync.RWMutex{},
tenants: map[string]*tenant{},
labels: labels,
tenantLabelName: tenantLabelName,
bucket: bucket,
}
}

func (t *MultiTSDB) Open() error {
if err := os.MkdirAll(t.dataDir, 0777); err != nil {
return err
Expand Down Expand Up @@ -232,7 +232,7 @@ func (t *MultiTSDB) getOrLoadTenant(tenantID string, blockingStart bool) (*tenan
return tenant, nil
}

tenant = newTenant(t.tsdbCfg)
tenant = newTenant(t.tsdbOpts)
t.tenants[tenantID] = tenant
t.mtx.Unlock()

Expand Down Expand Up @@ -261,7 +261,7 @@ func (t *MultiTSDB) getOrLoadTenant(tenantID string, blockingStart bool) (*tenan
dataDir,
logger,
reg,
t.tsdbCfg,
t.tsdbOpts,
)

// Assign to outer error to report in blocking case.
Expand Down
257 changes: 257 additions & 0 deletions pkg/receive/multitsdb_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
package receive

import (
"context"
"fmt"
"io/ioutil"
"os"
"testing"
"time"

"github.com/fortytw2/leaktest"
"github.com/go-kit/kit/log"
"github.com/gogo/protobuf/types"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/tsdb"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/testutil"
"golang.org/x/sync/errgroup"
)

func TestMultiTSDB(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)

dir, err := ioutil.TempDir("", "test")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

logger := log.NewNopLogger()
t.Run("run fresh", func(t *testing.T) {
m := NewMultiTSDB(
dir, logger, prometheus.NewRegistry(), &tsdb.Options{
MinBlockDuration: model.Duration(2 * time.Hour),
MaxBlockDuration: model.Duration(2 * time.Hour),
RetentionDuration: model.Duration(6 * time.Hour),
NoLockfile: true,
},
labels.FromStrings("replica", "01"),
"tenant_id",
nil,
)
defer testutil.Ok(t, m.Flush())

testutil.Ok(t, m.Flush())
testutil.Ok(t, m.Open())

app, err := m.TenantAppendable("foo")
testutil.Ok(t, err)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

var a storage.Appender
testutil.Ok(t, runutil.Retry(1*time.Second, ctx.Done(), func() error {
a, err = app.Appender()
return err
}))

_, err = a.Add(labels.FromStrings("a", "1", "b", "2"), 1, 2.41241)
testutil.Ok(t, err)
_, err = a.Add(labels.FromStrings("a", "1", "b", "2"), 2, 3.41241)
testutil.Ok(t, err)
_, err = a.Add(labels.FromStrings("a", "1", "b", "2"), 3, 4.41241)
testutil.Ok(t, err)
testutil.Ok(t, a.Commit())

// Check if not leaking.
_, err = m.TenantAppendable("foo")
testutil.Ok(t, err)
_, err = m.TenantAppendable("foo")
testutil.Ok(t, err)
_, err = m.TenantAppendable("foo")
testutil.Ok(t, err)

ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

app, err = m.TenantAppendable("bar")
testutil.Ok(t, err)

testutil.Ok(t, runutil.Retry(1*time.Second, ctx.Done(), func() error {
a, err = app.Appender()
return err
}))

_, err = a.Add(labels.FromStrings("a", "1", "b", "2"), 1, 20.41241)
testutil.Ok(t, err)
_, err = a.Add(labels.FromStrings("a", "1", "b", "2"), 2, 30.41241)
testutil.Ok(t, err)
_, err = a.Add(labels.FromStrings("a", "1", "b", "2"), 3, 40.41241)
testutil.Ok(t, err)
testutil.Ok(t, a.Commit())

testMulitTSDBSeries(t, m)
})
t.Run("run on existing storage", func(t *testing.T) {
m := NewMultiTSDB(
dir, logger, prometheus.NewRegistry(), &tsdb.Options{
MinBlockDuration: model.Duration(2 * time.Hour),
MaxBlockDuration: model.Duration(2 * time.Hour),
RetentionDuration: model.Duration(6 * time.Hour),
NoLockfile: true,
},
labels.FromStrings("replica", "01"),
"tenant_id",
nil,
)
defer testutil.Ok(t, m.Flush())

testutil.Ok(t, m.Flush())
testutil.Ok(t, m.Open())

// Get appender just for test.
app, err := m.TenantAppendable("foo")
testutil.Ok(t, err)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

testutil.Ok(t, runutil.Retry(1*time.Second, ctx.Done(), func() error {
_, err := app.Appender()
return err
}))

// Check if not leaking.
_, err = m.TenantAppendable("foo")
testutil.Ok(t, err)
_, err = m.TenantAppendable("foo")
testutil.Ok(t, err)
_, err = m.TenantAppendable("foo")
testutil.Ok(t, err)

testMulitTSDBSeries(t, m)
})
}

var (
expectedFooResp = []storepb.Series{
{
Labels: []storepb.Label{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "replica", Value: "01"}, {Name: "tenant_id", Value: "foo"}},
Chunks: []storepb.AggrChunk{{MinTime: 1, MaxTime: 3, Raw: &storepb.Chunk{Data: []byte("\000\003\002@\003L\235\2354X\315\001\330\r\257Mui\251\327:U")}}},
},
}
expectedBarResp = []storepb.Series{
{
Labels: []storepb.Label{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "replica", Value: "01"}, {Name: "tenant_id", Value: "bar"}},
Chunks: []storepb.AggrChunk{{MinTime: 1, MaxTime: 3, Raw: &storepb.Chunk{Data: []byte("\000\003\002@4i\223\263\246\213\032\001\330\035i\337\322\352\323S\256t\270")}}},
},
}
)

func testMulitTSDBSeries(t *testing.T, m *MultiTSDB) {
g := &errgroup.Group{}
respFoo := make(chan []storepb.Series)
respBar := make(chan []storepb.Series)
for i := 0; i < 100; i++ {
s := m.TSDBStores()
testutil.Assert(t, len(s) == 2)

g.Go(func() error {
srv := newStoreSeriesServer(context.Background())
if err := s["foo"].Series(&storepb.SeriesRequest{
MinTime: 0,
MaxTime: 10,
Matchers: []storepb.LabelMatcher{{Name: "a", Value: ".*", Type: storepb.LabelMatcher_RE}},
}, srv); err != nil {
return err
}
respFoo <- srv.SeriesSet
return nil
})
g.Go(func() error {
srv := newStoreSeriesServer(context.Background())
if err := s["bar"].Series(&storepb.SeriesRequest{
MinTime: 0,
MaxTime: 10,
Matchers: []storepb.LabelMatcher{{Name: "a", Value: ".*", Type: storepb.LabelMatcher_RE}},
}, srv); err != nil {
return err
}
respBar <- srv.SeriesSet
return nil
})
}
var err error
go func() {
err = g.Wait()
close(respFoo)
close(respBar)
}()
Outer:
for {
select {
case r, ok := <-respFoo:
if !ok {
break Outer
}
fmt.Println(r[0].String())
testutil.Equals(t, expectedFooResp, r)
case r, ok := <-respBar:
if !ok {
break Outer
}
fmt.Println(r[0].String())
testutil.Equals(t, expectedBarResp, r)
}
}
testutil.Ok(t, err)
}

// storeSeriesServer is test gRPC storeAPI series server.
// TODO(bwplotka): Make this part of some common library. We copy and paste this also in pkg/store.
type storeSeriesServer struct {
// This field just exist to pseudo-implement the unused methods of the interface.
storepb.Store_SeriesServer

ctx context.Context

SeriesSet []storepb.Series
Warnings []string
HintsSet []*types.Any

Size int64
}

func newStoreSeriesServer(ctx context.Context) *storeSeriesServer {
return &storeSeriesServer{ctx: ctx}
}

func (s *storeSeriesServer) Send(r *storepb.SeriesResponse) error {
s.Size += int64(r.Size())

if r.GetWarning() != "" {
s.Warnings = append(s.Warnings, r.GetWarning())
return nil
}

if r.GetSeries() != nil {
s.SeriesSet = append(s.SeriesSet, *r.GetSeries())
return nil
}

if r.GetHints() != nil {
s.HintsSet = append(s.HintsSet, r.GetHints())
return nil
}

// Unsupported field, skip.
return nil
}

func (s *storeSeriesServer) Context() context.Context {
return s.ctx
}
Loading

0 comments on commit b7439f6

Please sign in to comment.