Skip to content

Commit

Permalink
Added receiver multidb unit tests for basic cases. (#2593)
Browse files Browse the repository at this point in the history
Unfortunately, all passes. ):

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed May 14, 2020
1 parent 7f78174 commit 0c8b5e0
Show file tree
Hide file tree
Showing 4 changed files with 302 additions and 35 deletions.
4 changes: 4 additions & 0 deletions pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (
"strconv"
"sync"
"testing"
"time"

"github.com/fortytw2/leaktest"
"github.com/go-kit/kit/log"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
Expand Down Expand Up @@ -180,6 +182,8 @@ func newHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64)
}

func TestReceive(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)

appenderErrFn := func() error { return errors.New("failed to get appender") }
conflictErrFn := func() error { return storage.ErrOutOfBounds }
commitErrFn := func() error { return errors.New("failed to commit") }
Expand Down
70 changes: 35 additions & 35 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type MultiTSDB struct {
dataDir string
logger log.Logger
reg prometheus.Registerer
tsdbCfg *tsdb.Options
tsdbOpts *tsdb.Options
tenantLabelName string
labels labels.Labels
bucket objstore.Bucket
Expand All @@ -40,8 +40,34 @@ type MultiTSDB struct {
tenants map[string]*tenant
}

func NewMultiTSDB(
dataDir string,
l log.Logger,
reg prometheus.Registerer,
tsdbOpts *tsdb.Options,
labels labels.Labels,
tenantLabelName string,
bucket objstore.Bucket,
) *MultiTSDB {
if l == nil {
l = log.NewNopLogger()
}

return &MultiTSDB{
dataDir: dataDir,
logger: l,
reg: reg,
tsdbOpts: tsdbOpts,
mtx: &sync.RWMutex{},
tenants: map[string]*tenant{},
labels: labels,
tenantLabelName: tenantLabelName,
bucket: bucket,
}
}

type tenant struct {
tsdbCfg *tsdb.Options
tsdbOpts *tsdb.Options

readyS *tsdb.ReadyStorage
fs *FlushableStorage
Expand All @@ -51,11 +77,11 @@ type tenant struct {
mtx *sync.RWMutex
}

func newTenant(tsdbCfg *tsdb.Options) *tenant {
func newTenant(tsdbOpts *tsdb.Options) *tenant {
return &tenant{
tsdbCfg: tsdbCfg,
readyS: &tsdb.ReadyStorage{},
mtx: &sync.RWMutex{},
tsdbOpts: tsdbOpts,
readyS: &tsdb.ReadyStorage{},
mtx: &sync.RWMutex{},
}
}

Expand All @@ -82,40 +108,14 @@ func (t *tenant) flushableStorage() *FlushableStorage {
}

func (t *tenant) set(tstore *store.TSDBStore, fs *FlushableStorage, ship *shipper.Shipper) {
t.readyS.Set(fs.Get(), int64(2*time.Duration(t.tsdbCfg.MinBlockDuration).Seconds()*1000))
t.readyS.Set(fs.Get(), int64(2*time.Duration(t.tsdbOpts.MinBlockDuration).Seconds()*1000))
t.mtx.Lock()
t.fs = fs
t.s = tstore
t.ship = ship
t.mtx.Unlock()
}

func NewMultiTSDB(
dataDir string,
l log.Logger,
reg prometheus.Registerer,
tsdbCfg *tsdb.Options,
labels labels.Labels,
tenantLabelName string,
bucket objstore.Bucket,
) *MultiTSDB {
if l == nil {
l = log.NewNopLogger()
}

return &MultiTSDB{
dataDir: dataDir,
logger: l,
reg: reg,
tsdbCfg: tsdbCfg,
mtx: &sync.RWMutex{},
tenants: map[string]*tenant{},
labels: labels,
tenantLabelName: tenantLabelName,
bucket: bucket,
}
}

func (t *MultiTSDB) Open() error {
if err := os.MkdirAll(t.dataDir, 0777); err != nil {
return err
Expand Down Expand Up @@ -232,7 +232,7 @@ func (t *MultiTSDB) getOrLoadTenant(tenantID string, blockingStart bool) (*tenan
return tenant, nil
}

tenant = newTenant(t.tsdbCfg)
tenant = newTenant(t.tsdbOpts)
t.tenants[tenantID] = tenant
t.mtx.Unlock()

Expand Down Expand Up @@ -261,7 +261,7 @@ func (t *MultiTSDB) getOrLoadTenant(tenantID string, blockingStart bool) (*tenan
dataDir,
logger,
reg,
t.tsdbCfg,
t.tsdbOpts,
)

// Assign to outer error to report in blocking case.
Expand Down
Loading

0 comments on commit 0c8b5e0

Please sign in to comment.