Skip to content

Commit

Permalink
store: add ability to limit max num of samples / concurrent queries (#…
Browse files Browse the repository at this point in the history
…798)

* store: add ability to limit max samples / conc. queries

* store/bucket: account for the RawChunk case

Convert raw chunks into XOR encoded chunks and call the NumSamples()
method on them to calculate the number of samples. Rip out the samples
calculation into a different function because it is used in two
different places.

* store/bucket_e2e_test: adjust sample limit size

It should be actually 30 - I miscalculated this.

* store/bucket: add metric thanos_bucket_store_queries_limited_total

* store/bucket: register queriesLimited metric

* store: make changes according to the review comments

* docs/store: update

* store: gating naming changes, add span/extra metric

* store: improve error messages

* store/limiter: improve error messages

* store/gate: time -> seconds

* store/bucket_e2e_test: narrow down the first query

* store/bucket: check for negative maxConcurrent

* cmd/store: clarify help message

* pkg/store: hook thanos_bucket_store_queries_limited into Limiter

* store/bucket_test: fix NewBucketStore call

* docs: update again

* store/gate: spelling fix

* store/gate: spelling fix #2

* store/bucket: remove pointless newline

* store/gate: generalize gate timing

Make the metric show in general how much time it takes for queries to
wait at the gate.

* store/gate: convert the g.gateTiming metric into a histogram

* store/bucket: change comment wording

* store/bucket: remove type from maxSamplesPerChunk

Let Go decide by itself what kind of type it needs.

* store/bucket: rename metric into thanos_bucket_store_queries_dropped

* thanos/store: clarify help message

Literally explain what it means in the help message so that it would be
clearer.

* store/gate: rename metric to thanos_bucket_store_queries_in_flight

More fitting as decided by everyone.

* store/gate: fix MustRegister() call

* docs: update

* store/bucket: clarify the name of the span

Make it more clearer about what it is for.

* store/bucket: inline calculation into the function call

No need to create an extra variable in a hot path in the code if we can
inline it and it will be just as clear.

* CHANGELOG: add item about this

* store/gate: reduce number of buckets

* store/bucket: rename metric to thanos_bucket_store_queries_dropped_total

* store/bucket: move defer out of code block

* store/gate: generalize gate for different kinds of subsystems

* store/limiter: remove non-nil check

* CHANGELOG: fixes

* store/limiter: convert failedCounter to non-ptr

* store/limiter: remove invalid comment

* *: update according to review comments

* CHANGELOG: update

* *: fix according to review

* *: fix according to review

* *: make docs

* CHANGELOG: clean up

* CHANGELOG: update

* *: queries_in_flight_total -> queries_in_flight

* store/bucket: do not wraper samplesLimiter error

The original error already informs us about what is going wrong.

* store/bucket: err -> errors.Wrap

It's still useful to know that we are talking about samples here
exactly.

* store: make store.grpc.series-max-concurrency 20 by default

Setting it to 0 by default doesn't make sense since the Go channel
becomes unbuffered and all queries will timeout. Set it to 20 by default
since that's the limit on Thanos Query and naturally there won't be more
than 20 by default so it's good.

* CHANGELOG: add warning about new limit
  • Loading branch information
GiedriusS committed Mar 23, 2019
1 parent e756fe1 commit f24d555
Show file tree
Hide file tree
Showing 8 changed files with 206 additions and 15 deletions.
19 changes: 18 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,23 @@ We use *breaking* word for marking changes that are not backward compatible (rel

### Added
- [#811](https://github.com/improbable-eng/thanos/pull/811) Remote write receiver
- [#798](https://github.com/improbable-eng/thanos/pull/798) Ability to limit the maximum concurrent about of Series() calls in Thanos Store and the maximum amount of samples.

New options:

* `--store.grpc.series-sample-limit` limits the amount of samples that might be retrieved on a single Series() call. By default it is 0. Consider enabling it by setting it to more than 0 if you are running on limited resources.
* `--store.grpc.series-max-concurrency` limits the number of concurrent Series() calls in Thanos Store. By default it is 20. Considering making it lower or bigger depending on the scale of your deployment.

New metrics:
* `thanos_bucket_store_queries_dropped_total` shows how many queries were dropped due to the samples limit;
* `thanos_bucket_store_queries_concurrent_max` is a constant metric which shows how many Series() calls can concurrently be executed by Thanos Store;
* `thanos_bucket_store_queries_in_flight` shows how many queries are currently "in flight" i.e. they are being executed;
* `thanos_bucket_store_gate_duration_seconds` shows how many seconds it took for queries to pass through the gate in both cases - when that fails and when it does not.

New tracing span:
* `store_query_gate_ismyturn` shows how long it took for a query to pass (or not) through the gate.

:warning: **WARNING** :warning: #798 adds a new default limit to Thanos Store: `--store.grpc.series-max-concurrency`. Most likely you will want to make it the same as `--query.max-concurrent` on Thanos Query.

### Fixed
- [#921](https://github.com/improbable-eng/thanos/pull/921) `thanos_objstore_bucket_last_successful_upload_time` now does not appear when no blocks have been uploaded so far
Expand All @@ -23,7 +40,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#851](https://github.com/improbable-eng/thanos/pull/851) New read API endpoint for api/v1/rules and api/v1/alerts.
- [#873](https://github.com/improbable-eng/thanos/pull/873) Store: fix set index cache LRU

:warning: **WARING** :warning: #873 fix fixes actual handling of `index-cache-size`. Handling of limit for this cache was
:warning: **WARNING** :warning: #873 fix fixes actual handling of `index-cache-size`. Handling of limit for this cache was
broken so it was unbounded all the time. From this release actual value matters and is extremely low by default. To "revert"
the old behaviour (no boundary), use a large enough value.

Expand Down
12 changes: 12 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
chunkPoolSize := cmd.Flag("chunk-pool-size", "Maximum size of concurrently allocatable bytes for chunks.").
Default("2GB").Bytes()

maxSampleCount := cmd.Flag("store.grpc.series-sample-limit",
"Maximum amount of samples returned via a single Series call. 0 means no limit. NOTE: for efficiency we take 120 as the number of samples in chunk (it cannot be bigger than that), so the actual number of samples might be lower, even though the maximum could be hit.").
Default("0").Uint()

maxConcurrent := cmd.Flag("store.grpc.series-max-concurrency", "Maximum number of concurrent Series calls.").Default("20").Int()

objStoreConfig := regCommonObjStoreFlags(cmd, "", true)

syncInterval := cmd.Flag("sync-block-duration", "Repeat interval for syncing the blocks between local and remote view.").
Expand Down Expand Up @@ -63,6 +69,8 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
peer,
uint64(*indexCacheSize),
uint64(*chunkPoolSize),
uint64(*maxSampleCount),
int(*maxConcurrent),
name,
debugLogging,
*syncInterval,
Expand All @@ -87,6 +95,8 @@ func runStore(
peer cluster.Peer,
indexCacheSizeBytes uint64,
chunkPoolSizeBytes uint64,
maxSampleCount uint64,
maxConcurrent int,
component string,
verbose bool,
syncInterval time.Duration,
Expand Down Expand Up @@ -117,6 +127,8 @@ func runStore(
dataDir,
indexCacheSizeBytes,
chunkPoolSizeBytes,
maxSampleCount,
maxConcurrent,
verbose,
blockSyncConcurrency,
)
Expand Down
9 changes: 9 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,15 @@ Flags:
--index-cache-size=250MB Maximum size of items held in the index cache.
--chunk-pool-size=2GB Maximum size of concurrently allocatable bytes
for chunks.
--store.grpc.series-sample-limit=0
Maximum amount of samples returned via a single
Series call. 0 means no limit. NOTE: for
efficiency we take 120 as the number of samples
in chunk (it cannot be bigger than that), so
the actual number of samples might be lower,
even though the maximum could be hit.
--store.grpc.series-max-concurrency=20
Maximum number of concurrent Series calls.
--objstore.config-file=<bucket.config-yaml-path>
Path to YAML file that contains object store
configuration.
Expand Down
76 changes: 67 additions & 9 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/compact/downsample"
"github.com/improbable-eng/thanos/pkg/component"
"github.com/improbable-eng/thanos/pkg/extprom"
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/improbable-eng/thanos/pkg/pool"
"github.com/improbable-eng/thanos/pkg/runutil"
Expand All @@ -42,6 +43,14 @@ import (
"google.golang.org/grpc/status"
)

// maxSamplesPerChunk is approximately the max number of samples that we may have in any given chunk. This is needed
// for precalculating the number of samples that we may have to retrieve and decode for any given query
// without downloading them. Please take a look at https://github.com/prometheus/tsdb/pull/397 to know
// where this number comes from. Long story short: TSDB is made in such a way, and it is made in such a way
// because you barely get any improvements in compression when the number of samples is beyond this.
// Take a look at Figure 6 in this whitepaper http://www.vldb.org/pvldb/vol8/p1816-teller.pdf.
const maxSamplesPerChunk = 120

type bucketStoreMetrics struct {
blocksLoaded prometheus.Gauge
blockLoads prometheus.Counter
Expand All @@ -57,6 +66,8 @@ type bucketStoreMetrics struct {
seriesMergeDuration prometheus.Histogram
resultSeriesCount prometheus.Summary
chunkSizeBytes prometheus.Histogram
queriesDropped prometheus.Counter
queriesLimit prometheus.Gauge
}

func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
Expand Down Expand Up @@ -132,6 +143,15 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
},
})

m.queriesDropped = prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_bucket_store_queries_dropped_total",
Help: "Number of queries that were dropped due to the sample limit.",
})
m.queriesLimit = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "thanos_bucket_store_queries_concurrent_max",
Help: "Number of maximum concurrent queries.",
})

if reg != nil {
reg.MustRegister(
m.blockLoads,
Expand All @@ -148,6 +168,8 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
m.seriesMergeDuration,
m.resultSeriesCount,
m.chunkSizeBytes,
m.queriesDropped,
m.queriesLimit,
)
}
return &m
Expand All @@ -173,7 +195,12 @@ type BucketStore struct {
// Number of goroutines to use when syncing blocks from object storage.
blockSyncConcurrency int

partitioner partitioner
// Query gate which limits the maximum amount of concurrent queries.
queryGate *Gate

// samplesLimiter limits the number of samples per each Series() call.
samplesLimiter *Limiter
partitioner partitioner
}

// NewBucketStore creates a new bucket backed store that implements the store API against
Expand All @@ -185,12 +212,19 @@ func NewBucketStore(
dir string,
indexCacheSizeBytes uint64,
maxChunkPoolBytes uint64,
maxSampleCount uint64,
maxConcurrent int,
debugLogging bool,
blockSyncConcurrency int,
) (*BucketStore, error) {
if logger == nil {
logger = log.NewNopLogger()
}

if maxConcurrent < 0 {
return nil, errors.Errorf("max concurrency value cannot be lower than 0 (got %v)", maxConcurrent)
}

indexCache, err := newIndexCache(reg, indexCacheSizeBytes)
if err != nil {
return nil, errors.Wrap(err, "create index cache")
Expand All @@ -202,6 +236,7 @@ func NewBucketStore(

const maxGapSize = 512 * 1024

metrics := newBucketStoreMetrics(reg)
s := &BucketStore{
logger: logger,
bucket: bucket,
Expand All @@ -212,14 +247,18 @@ func NewBucketStore(
blockSets: map[uint64]*bucketBlockSet{},
debugLogging: debugLogging,
blockSyncConcurrency: blockSyncConcurrency,
queryGate: NewGate(maxConcurrent, extprom.NewSubsystem(reg, "thanos_bucket_store")),
samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped),
partitioner: gapBasedPartitioner{maxGapSize: maxGapSize},
}
s.metrics = newBucketStoreMetrics(reg)
s.metrics = metrics

if err := os.MkdirAll(dir, 0777); err != nil {
return nil, errors.Wrap(err, "create dir")
}

s.metrics.queriesLimit.Set(float64(maxConcurrent))

return s, nil
}

Expand Down Expand Up @@ -472,14 +511,15 @@ func (s *bucketSeriesSet) Err() error {
return s.err
}

func (s *BucketStore) blockSeries(
func blockSeries(
ctx context.Context,
ulid ulid.ULID,
extLset map[string]string,
indexr *bucketIndexReader,
chunkr *bucketChunkReader,
matchers []labels.Matcher,
req *storepb.SeriesRequest,
samplesLimiter *Limiter,
) (storepb.SeriesSet, *queryStats, error) {
ps, err := indexr.ExpandedPostings(matchers)
if err != nil {
Expand Down Expand Up @@ -557,7 +597,7 @@ func (s *BucketStore) blockSeries(
}

// Preload all chunks that were marked in the previous stage.
if err := chunkr.preload(); err != nil {
if err := chunkr.preload(samplesLimiter); err != nil {
return nil, nil, errors.Wrap(err, "preload chunks")
}

Expand Down Expand Up @@ -661,10 +701,17 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt int64, lset labels
}

// Series implements the storepb.StoreServer interface.
// TODO(bwplotka): It buffers all chunks in memory and only then streams to client.
// 1. Either count chunk sizes and error out too big query.
// 2. Stream posting -> series -> chunk all together.
func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) (err error) {
{
span, _ := tracing.StartSpan(srv.Context(), "store_query_gate_ismyturn")
err := s.queryGate.IsMyTurn(srv.Context())
span.Finish()
if err != nil {
return errors.Wrapf(err, "failed to wait for turn")
}
}
defer s.queryGate.Done()

matchers, err := translateMatchers(req.Matchers)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
Expand Down Expand Up @@ -703,13 +750,14 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
defer runutil.CloseWithLogOnErr(s.logger, chunkr, "series block")

g.Add(func() error {
part, pstats, err := s.blockSeries(ctx,
part, pstats, err := blockSeries(ctx,
b.meta.ULID,
b.meta.Thanos.Labels,
indexr,
chunkr,
blockMatchers,
req,
s.samplesLimiter,
)
if err != nil {
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID)
Expand Down Expand Up @@ -1589,11 +1637,21 @@ func (r *bucketChunkReader) addPreload(id uint64) error {
}

// preload all added chunk IDs. Must be called before the first call to Chunk is made.
func (r *bucketChunkReader) preload() error {
func (r *bucketChunkReader) preload(samplesLimiter *Limiter) error {
const maxChunkSize = 16000

var g run.Group

numChunks := uint64(0)
for _, offsets := range r.preloads {
for range offsets {
numChunks++
}
}
if err := samplesLimiter.Check(numChunks * maxSamplesPerChunk); err != nil {
return errors.Wrap(err, "exceeded samples limit")
}

for seq, offsets := range r.preloads {
sort.Slice(offsets, func(i, j int) bool {
return offsets[i] < offsets[j]
Expand Down
8 changes: 4 additions & 4 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (s *storeSuite) Close() {
s.wg.Wait()
}

func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool) *storeSuite {
func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, maxSampleCount uint64) *storeSuite {
series := []labels.Labels{
labels.FromStrings("a", "1", "b", "1"),
labels.FromStrings("a", "1", "b", "2"),
Expand Down Expand Up @@ -87,7 +87,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
testutil.Ok(t, os.RemoveAll(dir2))
}

store, err := NewBucketStore(log.NewLogfmtLogger(os.Stderr), nil, bkt, dir, 100, 0, false, 20)
store, err := NewBucketStore(log.NewLogfmtLogger(os.Stderr), nil, bkt, dir, 100, 0, maxSampleCount, 20, false, 20)
testutil.Ok(t, err)

s.store = store
Expand Down Expand Up @@ -334,7 +334,7 @@ func TestBucketStore_e2e(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

s := prepareStoreWithTestBlocks(t, dir, bkt, false)
s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0)
defer s.Close()

testBucketStore_e2e(t, ctx, s)
Expand Down Expand Up @@ -363,7 +363,7 @@ func TestBucketStore_ManyParts_e2e(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

s := prepareStoreWithTestBlocks(t, dir, bkt, true)
s := prepareStoreWithTestBlocks(t, dir, bkt, true, 0)
defer s.Close()

testBucketStore_e2e(t, ctx, s)
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func TestBucketStore_Info(t *testing.T) {
dir, err := ioutil.TempDir("", "prometheus-test")
testutil.Ok(t, err)

bucketStore, err := NewBucketStore(nil, nil, nil, dir, 2e5, 2e5, false, 20)
bucketStore, err := NewBucketStore(nil, nil, nil, dir, 2e5, 2e5, 0, 0, false, 20)
testutil.Ok(t, err)

resp, err := bucketStore.Info(ctx, &storepb.InfoRequest{})
Expand Down
Loading

0 comments on commit f24d555

Please sign in to comment.