Skip to content

Commit

Permalink
Refactoring: allow to pass BytesPool to store.NewBucketStore() (#3801)
Browse files Browse the repository at this point in the history
Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci committed Feb 15, 2021
1 parent 2c33dd7 commit f318232
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 15 deletions.
7 changes: 6 additions & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,11 @@ func runStore(

queriesGate := gate.New(extprom.WrapRegistererWithPrefix("thanos_bucket_store_series_", reg), maxConcurrency)

chunkPool, err := store.NewDefaultChunkBytesPool(chunkPoolSizeBytes)
if err != nil {
return errors.Wrap(err, "create chunk pool")
}

bs, err := store.NewBucketStore(
logger,
reg,
Expand All @@ -308,7 +313,7 @@ func runStore(
dataDir,
indexCache,
queriesGate,
chunkPoolSizeBytes,
chunkPool,
store.NewChunksLimiterFactory(maxSampleCount/store.MaxSamplesPerChunk), // The samples limit is an approximation based on the max number of samples per chunk.
store.NewSeriesLimiterFactory(maxSeriesCount),
verbose,
Expand Down
2 changes: 1 addition & 1 deletion pkg/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func NewBucketedBytesPool(minSize, maxSize int, factor float64, maxTotal uint64)
// ErrPoolExhausted is returned if a pool cannot provide the request bytes.
var ErrPoolExhausted = errors.New("pool exhausted")

// Get returns a new byte slices that fits the given size.
// Get returns a new byte slice that fits the given size.
func (p *BucketedBytesPool) Get(sz int) (*[]byte, error) {
p.mtx.Lock()
defer p.mtx.Unlock()
Expand Down
12 changes: 6 additions & 6 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func NewBucketStore(
dir string,
indexCache storecache.IndexCache,
queryGate gate.Gate,
maxChunkPoolBytes uint64,
chunkPool pool.BytesPool,
chunksLimiterFactory ChunksLimiterFactory,
seriesLimiterFactory SeriesLimiterFactory,
debugLogging bool,
Expand All @@ -316,11 +316,6 @@ func NewBucketStore(
logger = log.NewNopLogger()
}

chunkPool, err := pool.NewBucketedBytesPool(maxChunkSize, 50e6, 2, maxChunkPoolBytes)
if err != nil {
return nil, errors.Wrap(err, "create chunk pool")
}

s := &BucketStore{
logger: logger,
bkt: bkt,
Expand Down Expand Up @@ -2449,3 +2444,8 @@ func (s queryStats) merge(o *queryStats) *queryStats {

return &s
}

// NewDefaultChunkBytesPool returns a chunk bytes pool with default settings.
func NewDefaultChunkBytesPool(maxChunkPoolBytes uint64) (pool.BytesPool, error) {
return pool.NewBucketedBytesPool(maxChunkSize, 50e6, 2, maxChunkPoolBytes)
}
5 changes: 4 additions & 1 deletion pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
}, nil)
testutil.Ok(t, err)

chunkPool, err := NewDefaultChunkBytesPool(0)
testutil.Ok(t, err)

store, err := NewBucketStore(
s.logger,
nil,
Expand All @@ -162,7 +165,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
dir,
s.cache,
nil,
0,
chunkPool,
NewChunksLimiterFactory(maxChunksLimit),
NewSeriesLimiterFactory(0),
false,
Expand Down
30 changes: 24 additions & 6 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,9 @@ func TestBucketStore_Info(t *testing.T) {

defer testutil.Ok(t, os.RemoveAll(dir))

chunkPool, err := NewDefaultChunkBytesPool(2e5)
testutil.Ok(t, err)

bucketStore, err := NewBucketStore(
nil,
nil,
Expand All @@ -573,7 +576,7 @@ func TestBucketStore_Info(t *testing.T) {
dir,
noopCache{},
nil,
2e5,
chunkPool,
NewChunksLimiterFactory(0),
NewSeriesLimiterFactory(0),
false,
Expand Down Expand Up @@ -817,6 +820,9 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul
}, nil)
testutil.Ok(t, err)

chunkPool, err := NewDefaultChunkBytesPool(0)
testutil.Ok(t, err)

bucketStore, err := NewBucketStore(
logger,
nil,
Expand All @@ -825,7 +831,7 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul
dir,
noopCache{},
nil,
0,
chunkPool,
NewChunksLimiterFactory(0),
NewSeriesLimiterFactory(0),
false,
Expand Down Expand Up @@ -1636,6 +1642,9 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) {
indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, storecache.InMemoryIndexCacheConfig{})
testutil.Ok(tb, err)

chunkPool, err := NewDefaultChunkBytesPool(1000000)
testutil.Ok(t, err)

store, err := NewBucketStore(
logger,
nil,
Expand All @@ -1644,7 +1653,7 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) {
tmpDir,
indexCache,
nil,
1000000,
chunkPool,
NewChunksLimiterFactory(10000/MaxSamplesPerChunk),
NewSeriesLimiterFactory(0),
false,
Expand Down Expand Up @@ -1730,6 +1739,9 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) {
indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, storecache.InMemoryIndexCacheConfig{})
testutil.Ok(tb, err)

chunkPool, err := NewDefaultChunkBytesPool(1000000)
testutil.Ok(t, err)

store, err := NewBucketStore(
logger,
nil,
Expand All @@ -1738,7 +1750,7 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) {
tmpDir,
indexCache,
nil,
1000000,
chunkPool,
NewChunksLimiterFactory(100000/MaxSamplesPerChunk),
NewSeriesLimiterFactory(0),
false,
Expand Down Expand Up @@ -1875,6 +1887,9 @@ func TestBlockWithLargeChunks(t *testing.T) {
indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, storecache.InMemoryIndexCacheConfig{})
testutil.Ok(t, err)

chunkPool, err := NewDefaultChunkBytesPool(1000000)
testutil.Ok(t, err)

store, err := NewBucketStore(
logger,
nil,
Expand All @@ -1883,7 +1898,7 @@ func TestBlockWithLargeChunks(t *testing.T) {
tmpDir,
indexCache,
nil,
1000000,
chunkPool,
NewChunksLimiterFactory(10000/MaxSamplesPerChunk),
NewSeriesLimiterFactory(0),
false,
Expand Down Expand Up @@ -2036,6 +2051,9 @@ func setupStoreForHintsTest(t *testing.T) (testutil.TB, *BucketStore, []*storepb
indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, storecache.InMemoryIndexCacheConfig{})
testutil.Ok(tb, err)

chunkPool, err := NewDefaultChunkBytesPool(1000000)
testutil.Ok(t, err)

store, err := NewBucketStore(
logger,
nil,
Expand All @@ -2044,7 +2062,7 @@ func setupStoreForHintsTest(t *testing.T) (testutil.TB, *BucketStore, []*storepb
tmpDir,
indexCache,
nil,
1000000,
chunkPool,
NewChunksLimiterFactory(10000/MaxSamplesPerChunk),
NewSeriesLimiterFactory(0),
false,
Expand Down

0 comments on commit f318232

Please sign in to comment.