Skip to content

Commit

Permalink
store: valide block sync concurrency parameter
Browse files Browse the repository at this point in the history
Must be equal or greater than 1 to avoid blocked program.

Signed-off-by: Aymeric <aymeric.daurelle@cdiscount.com>
  • Loading branch information
Aymeric committed Oct 8, 2021
1 parent 3040829 commit ba1d016
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
### Fixed

- [#4663](https://github.com/thanos-io/thanos/pull/4663) Fetcher: Fix discovered data races
- [#4753](https://github.com/thanos-io/thanos/pull/4753) Store: valide block sync concurrency parameter

### Added

Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
cmd.Flag("sync-block-duration", "Repeat interval for syncing the blocks between local and remote view.").
Default("3m").DurationVar(&sc.syncInterval)

cmd.Flag("block-sync-concurrency", "Number of goroutines to use when constructing index-cache.json blocks from object storage.").
cmd.Flag("block-sync-concurrency", "Number of goroutines to use when constructing index-cache.json blocks from object storage. Must be equal or greater than 1.").
Default("20").IntVar(&sc.blockSyncConcurrency)

cmd.Flag("block-meta-fetch-concurrency", "Number of goroutines to use when fetching block metadata from object storage.").
Expand Down
17 changes: 17 additions & 0 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ const (
// Labels for metrics.
labelEncode = "encode"
labelDecode = "decode"

minBlockSyncConcurrency = 1
)

var (
errBlockSyncConcurrencyNotValid = errors.New("the block sync concurrency must be equal or greater than 1.")
)

type bucketStoreMetrics struct {
Expand Down Expand Up @@ -298,6 +304,13 @@ type BucketStore struct {
enableSeriesResponseHints bool
}

func (b *BucketStore) validate() error {
if b.blockSyncConcurrency < minBlockSyncConcurrency {
return errBlockSyncConcurrencyNotValid
}
return nil
}

type noopCache struct{}

func (noopCache) StorePostings(context.Context, ulid.ULID, labels.Label, []byte) {}
Expand Down Expand Up @@ -407,6 +420,10 @@ func NewBucketStore(
s.indexReaderPool = indexheader.NewReaderPool(s.logger, lazyIndexReaderEnabled, lazyIndexReaderIdleTimeout, indexReaderPoolMetrics)
s.metrics = newBucketStoreMetrics(s.reg) // TODO(metalmatze): Might be possible via Option too

if err := s.validate(); err != nil {
return nil, errors.Wrap(err, "validate config")
}

if err := os.MkdirAll(dir, 0750); err != nil {
return nil, errors.Wrap(err, "create dir")
}
Expand Down
26 changes: 26 additions & 0 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,32 @@ func TestGapBasedPartitioner_Partition(t *testing.T) {
}
}

func TestBucketStoreConfig_validate(t *testing.T) {
tests := map[string]struct {
config *BucketStore
expected error
}{
"should pass on valid config": {
config: &BucketStore{
blockSyncConcurrency: 1,
},
expected: nil,
},
"should fail on blockSyncConcurrency < 1": {
config: &BucketStore{
blockSyncConcurrency: 0,
},
expected: errBlockSyncConcurrencyNotValid,
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
testutil.Equals(t, testData.expected, testData.config.validate())
})
}
}

func TestBucketStore_Info(t *testing.T) {
defer testutil.TolerantVerifyLeak(t)

Expand Down

0 comments on commit ba1d016

Please sign in to comment.