Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: add ability to limit max num of samples / concurrent queries #798

Merged
merged 57 commits into from
Mar 23, 2019
Merged
Show file tree
Hide file tree
Changes from 50 commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
1bc1f59
store: add ability to limit max samples / conc. queries
Feb 1, 2019
e87f763
store/bucket: account for the RawChunk case
Feb 5, 2019
1ab1dc6
store/bucket_e2e_test: adjust sample limit size
Feb 5, 2019
d7c3ade
store/bucket: add metric thanos_bucket_store_queries_limited_total
Feb 5, 2019
12db24a
store/bucket: register queriesLimited metric
Feb 5, 2019
9d0b8a7
store: make changes according to the review comments
Feb 8, 2019
9727072
docs/store: update
Feb 8, 2019
d9c733a
store: gating naming changes, add span/extra metric
Feb 8, 2019
c4ce735
store: improve error messages
Feb 8, 2019
30eef19
store/limiter: improve error messages
Feb 8, 2019
194394d
store/gate: time -> seconds
Feb 8, 2019
2e51c2e
store/bucket_e2e_test: narrow down the first query
Feb 8, 2019
58a14fa
store/bucket: check for negative maxConcurrent
Feb 8, 2019
4d1b7ed
cmd/store: clarify help message
Feb 8, 2019
3ae3733
Merge remote-tracking branch 'origin/master' into smpl
Feb 8, 2019
b149f74
pkg/store: hook thanos_bucket_store_queries_limited into Limiter
Feb 8, 2019
e79c56d
store/bucket_test: fix NewBucketStore call
Feb 8, 2019
1d07515
docs: update again
Feb 8, 2019
38a093b
store/gate: spelling fix
Feb 9, 2019
7b13f7e
store/gate: spelling fix #2
Feb 9, 2019
4540394
Merge remote-tracking branch 'origin/master' into feature/store_sampl…
GiedriusS Feb 15, 2019
3e532fe
store/bucket: remove pointless newline
GiedriusS Feb 15, 2019
cff979c
store/gate: generalize gate timing
Mar 14, 2019
e7ea64b
store/gate: convert the g.gateTiming metric into a histogram
Mar 14, 2019
23c7368
store/bucket: change comment wording
Mar 14, 2019
da575e4
store/bucket: remove type from maxSamplesPerChunk
Mar 14, 2019
e390846
store/bucket: rename metric into thanos_bucket_store_queries_dropped
Mar 14, 2019
24e8e1f
thanos/store: clarify help message
Mar 14, 2019
4012eca
store/gate: rename metric to thanos_bucket_store_queries_in_flight
Mar 14, 2019
e7be55d
store/gate: fix MustRegister() call
Mar 14, 2019
3e8150d
docs: update
Mar 14, 2019
5ec5ce9
store/bucket: clarify the name of the span
Mar 14, 2019
810a131
store/bucket: inline calculation into the function call
Mar 14, 2019
541f180
Merge remote-tracking branch 'origin' into fork_store_sample_limit
Mar 14, 2019
ae8e425
CHANGELOG: add item about this
Mar 14, 2019
de8a234
store/gate: reduce number of buckets
Mar 15, 2019
07b4658
store/bucket: rename metric to thanos_bucket_store_queries_dropped_total
Mar 15, 2019
61d6ecd
store/bucket: move defer out of code block
Mar 15, 2019
70b115d
store/gate: generalize gate for different kinds of subsystems
Mar 15, 2019
36f1153
store/limiter: remove non-nil check
Mar 15, 2019
9b74bbe
CHANGELOG: fixes
Mar 15, 2019
82bdb3c
store/limiter: convert failedCounter to non-ptr
Mar 15, 2019
4d8420f
store/limiter: remove invalid comment
Mar 15, 2019
590b9a6
*: update according to review comments
Mar 18, 2019
3f40bac
CHANGELOG: update
Mar 18, 2019
f4734e5
*: fix according to review
Mar 18, 2019
d6c1534
*: fix according to review
Mar 18, 2019
1147acd
*: make docs
Mar 18, 2019
1d0fad3
CHANGELOG: clean up
Mar 18, 2019
ef4a51e
CHANGELOG: update
GiedriusS Mar 18, 2019
48141fd
Merge remote-tracking branch 'origin' into feature/store_sample_limit
Mar 20, 2019
c9a7d83
*: queries_in_flight_total -> queries_in_flight
GiedriusS Mar 21, 2019
d71f1d8
Merge branch 'master' into smpl_limit
Mar 22, 2019
280a8ca
store/bucket: do not wraper samplesLimiter error
Mar 22, 2019
11c4b18
store/bucket: err -> errors.Wrap
Mar 22, 2019
31a8346
store: make store.grpc.series-max-concurrency 20 by default
GiedriusS Mar 23, 2019
6e98dfd
CHANGELOG: add warning about new limit
GiedriusS Mar 23, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,21 @@ We use *breaking* word for marking changes that are not backward compatible (rel

### Added
- [#811](https://github.com/improbable-eng/thanos/pull/811) Remote write receiver
- [#798](https://github.com/improbable-eng/thanos/pull/798) Ability to limit the maximum concurrent about of Series() calls in Thanos Store and the maximum amount of samples.

New options:

* `--store.grpc.series-sample-limit` limits the amount of samples that might be retrieved on a single Series() call. By default it is 0. Consider enabling it by setting it to more than 0 if you are running on limited resources.
* `--store.grpc.series-max-concurrency` limits the number of concurrent Series() calls in Thanos Store. By default it is 0. Consider enabling it by setting it to more than 0 if you want to limit the maximum of concurrent Series() calls.

New metrics:
* `thanos_bucket_store_queries_dropped_total` shows how many queries were dropped due to the samples limit;
* `thanos_bucket_store_queries_concurrent_max` is a constant metric which shows how many Series() calls can concurrently be executed by Thanos Store;
* `thanos_bucket_store_queries_in_flight_total` shows how many queries are currently "in flight" i.e. they are being executed;
* `thanos_bucket_store_gate_duration_seconds` shows how many seconds it took for queries to pass through the gate in both cases - when that fails and when it does not.

New tracing span:
* `store_query_gate_ismyturn` shows how long it took for a query to pass (or not) through the gate.

### Fixed
- [#921](https://github.com/improbable-eng/thanos/pull/921) `thanos_objstore_bucket_last_successful_upload_time` now does not appear when no blocks have been uploaded so far
Expand Down
12 changes: 12 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
chunkPoolSize := cmd.Flag("chunk-pool-size", "Maximum size of concurrently allocatable bytes for chunks.").
Default("2GB").Bytes()

maxSampleCount := cmd.Flag("store.grpc.series-sample-limit",
"Maximum amount of samples returned via a single Series call. 0 means no limit. NOTE: for efficiency we take 120 as the number of samples in chunk (it cannot be bigger than that), so the actual number of samples might be lower, even though the maximum could be hit.").
Default("0").Uint()

maxConcurrent := cmd.Flag("store.grpc.series-max-concurrency", "Maximum number of concurrent Series calls. 0 means no limit.").Default("0").Int()

objStoreConfig := regCommonObjStoreFlags(cmd, "", true)

syncInterval := cmd.Flag("sync-block-duration", "Repeat interval for syncing the blocks between local and remote view.").
Expand Down Expand Up @@ -63,6 +69,8 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
peer,
uint64(*indexCacheSize),
uint64(*chunkPoolSize),
uint64(*maxSampleCount),
int(*maxConcurrent),
name,
debugLogging,
*syncInterval,
Expand All @@ -87,6 +95,8 @@ func runStore(
peer cluster.Peer,
indexCacheSizeBytes uint64,
chunkPoolSizeBytes uint64,
maxSampleCount uint64,
maxConcurrent int,
component string,
verbose bool,
syncInterval time.Duration,
Expand Down Expand Up @@ -117,6 +127,8 @@ func runStore(
dataDir,
indexCacheSizeBytes,
chunkPoolSizeBytes,
maxSampleCount,
maxConcurrent,
verbose,
blockSyncConcurrency,
)
Expand Down
10 changes: 10 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,16 @@ Flags:
--index-cache-size=250MB Maximum size of items held in the index cache.
--chunk-pool-size=2GB Maximum size of concurrently allocatable bytes
for chunks.
--store.grpc.series-sample-limit=0
Maximum amount of samples returned via a single
Series call. 0 means no limit. NOTE: for
efficiency we take 120 as the number of samples
in chunk (it cannot be bigger than that), so
the actual number of samples might be lower,
even though the maximum could be hit.
--store.grpc.series-max-concurrency=0
Maximum number of concurrent Series calls. 0
means no limit.
--objstore.config-file=<bucket.config-yaml-path>
Path to YAML file that contains object store
configuration.
Expand Down
76 changes: 67 additions & 9 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/compact/downsample"
"github.com/improbable-eng/thanos/pkg/component"
"github.com/improbable-eng/thanos/pkg/extprom"
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/improbable-eng/thanos/pkg/pool"
"github.com/improbable-eng/thanos/pkg/runutil"
Expand All @@ -42,6 +43,14 @@ import (
"google.golang.org/grpc/status"
)

// maxSamplesPerChunk is approximately the max number of samples that we may have in any given chunk. This is needed
// for precalculating the number of samples that we may have to retrieve and decode for any given query
// without downloading them. Please take a look at https://github.com/prometheus/tsdb/pull/397 to know
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Someone could tell that's too verbose, but I actually like this 👍

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 someone will definitely start wondering why 120 is here just like I have at the beginning

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think this is fine 👍

// where this number comes from. Long story short: TSDB is made in such a way, and it is made in such a way
// because you barely get any improvements in compression when the number of samples is beyond this.
// Take a look at Figure 6 in this whitepaper http://www.vldb.org/pvldb/vol8/p1816-teller.pdf.
const maxSamplesPerChunk = 120

type bucketStoreMetrics struct {
blocksLoaded prometheus.Gauge
blockLoads prometheus.Counter
Expand All @@ -57,6 +66,8 @@ type bucketStoreMetrics struct {
seriesMergeDuration prometheus.Histogram
resultSeriesCount prometheus.Summary
chunkSizeBytes prometheus.Histogram
queriesDropped prometheus.Counter
queriesLimit prometheus.Gauge
}

func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
Expand Down Expand Up @@ -132,6 +143,15 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
},
})

m.queriesDropped = prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_bucket_store_queries_dropped_total",
Help: "Number of queries that were dropped due to the sample limit.",
})
m.queriesLimit = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "thanos_bucket_store_queries_concurrent_max",
Help: "Number of maximum concurrent queries.",
})

if reg != nil {
reg.MustRegister(
m.blockLoads,
Expand All @@ -148,6 +168,8 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
m.seriesMergeDuration,
m.resultSeriesCount,
m.chunkSizeBytes,
m.queriesDropped,
m.queriesLimit,
)
}
return &m
Expand All @@ -173,7 +195,12 @@ type BucketStore struct {
// Number of goroutines to use when syncing blocks from object storage.
blockSyncConcurrency int

partitioner partitioner
// Query gate which limits the maximum amount of concurrent queries.
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
queryGate *Gate

// samplesLimiter limits the number of samples per each Series() call.
samplesLimiter *Limiter
partitioner partitioner
}

// NewBucketStore creates a new bucket backed store that implements the store API against
Expand All @@ -185,12 +212,19 @@ func NewBucketStore(
dir string,
indexCacheSizeBytes uint64,
maxChunkPoolBytes uint64,
maxSampleCount uint64,
maxConcurrent int,
debugLogging bool,
blockSyncConcurrency int,
) (*BucketStore, error) {
if logger == nil {
logger = log.NewNopLogger()
}

if maxConcurrent < 0 {
return nil, errors.Errorf("max concurrency value cannot be lower than 0 (got %v)", maxConcurrent)
}

indexCache, err := newIndexCache(reg, indexCacheSizeBytes)
if err != nil {
return nil, errors.Wrap(err, "create index cache")
Expand All @@ -202,6 +236,7 @@ func NewBucketStore(

const maxGapSize = 512 * 1024

metrics := newBucketStoreMetrics(reg)
s := &BucketStore{
logger: logger,
bucket: bucket,
Expand All @@ -212,14 +247,18 @@ func NewBucketStore(
blockSets: map[uint64]*bucketBlockSet{},
debugLogging: debugLogging,
blockSyncConcurrency: blockSyncConcurrency,
queryGate: NewGate(maxConcurrent, extprom.NewSubsystem(reg, "thanos_bucket_store")),
samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped),
partitioner: gapBasedPartitioner{maxGapSize: maxGapSize},
}
s.metrics = newBucketStoreMetrics(reg)
s.metrics = metrics

if err := os.MkdirAll(dir, 0777); err != nil {
return nil, errors.Wrap(err, "create dir")
}

s.metrics.queriesLimit.Set(float64(maxConcurrent))

return s, nil
}

Expand Down Expand Up @@ -472,14 +511,15 @@ func (s *bucketSeriesSet) Err() error {
return s.err
}

func (s *BucketStore) blockSeries(
func blockSeries(
ctx context.Context,
ulid ulid.ULID,
extLset map[string]string,
indexr *bucketIndexReader,
chunkr *bucketChunkReader,
matchers []labels.Matcher,
req *storepb.SeriesRequest,
samplesLimiter *Limiter,
) (storepb.SeriesSet, *queryStats, error) {
ps, err := indexr.ExpandedPostings(matchers)
if err != nil {
Expand Down Expand Up @@ -557,7 +597,7 @@ func (s *BucketStore) blockSeries(
}

// Preload all chunks that were marked in the previous stage.
if err := chunkr.preload(); err != nil {
if err := chunkr.preload(samplesLimiter); err != nil {
return nil, nil, errors.Wrap(err, "preload chunks")
}

Expand Down Expand Up @@ -661,10 +701,17 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt int64, lset labels
}

// Series implements the storepb.StoreServer interface.
// TODO(bwplotka): It buffers all chunks in memory and only then streams to client.
// 1. Either count chunk sizes and error out too big query.
// 2. Stream posting -> series -> chunk all together.
func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
{
span, _ := tracing.StartSpan(srv.Context(), "store_query_gate_ismyturn")
err := s.queryGate.IsMyTurn(srv.Context())
span.Finish()
if err != nil {
return errors.Wrapf(err, "failed to wait for turn")
}
}
defer s.queryGate.Done()

matchers, err := translateMatchers(req.Matchers)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
Expand Down Expand Up @@ -703,13 +750,14 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
defer runutil.CloseWithLogOnErr(s.logger, chunkr, "series block")

g.Add(func() error {
part, pstats, err := s.blockSeries(ctx,
part, pstats, err := blockSeries(ctx,
b.meta.ULID,
b.meta.Thanos.Labels,
indexr,
chunkr,
blockMatchers,
req,
s.samplesLimiter,
)
if err != nil {
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID)
Expand Down Expand Up @@ -1580,11 +1628,21 @@ func (r *bucketChunkReader) addPreload(id uint64) error {
}

// preload all added chunk IDs. Must be called before the first call to Chunk is made.
func (r *bucketChunkReader) preload() error {
func (r *bucketChunkReader) preload(samplesLimiter *Limiter) error {
const maxChunkSize = 16000

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could perhaps move this to the top along with maxSamplesPerChunk?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No necessarily as it only is used here


var g run.Group

numChunks := uint64(0)
for _, offsets := range r.preloads {
for range offsets {
numChunks++
}
}
if err := samplesLimiter.Check(numChunks * maxSamplesPerChunk); err != nil {
return errors.Wrapf(err, "exceeded samples limit")
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
}

for seq, offsets := range r.preloads {
sort.Slice(offsets, func(i, j int) bool {
return offsets[i] < offsets[j]
Expand Down
8 changes: 4 additions & 4 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (s *storeSuite) Close() {
s.wg.Wait()
}

func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool) *storeSuite {
func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, maxSampleCount uint64) *storeSuite {
series := []labels.Labels{
labels.FromStrings("a", "1", "b", "1"),
labels.FromStrings("a", "1", "b", "2"),
Expand Down Expand Up @@ -87,7 +87,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
testutil.Ok(t, os.RemoveAll(dir2))
}

store, err := NewBucketStore(log.NewLogfmtLogger(os.Stderr), nil, bkt, dir, 100, 0, false, 20)
store, err := NewBucketStore(log.NewLogfmtLogger(os.Stderr), nil, bkt, dir, 100, 0, maxSampleCount, 20, false, 20)
testutil.Ok(t, err)

s.store = store
Expand Down Expand Up @@ -334,7 +334,7 @@ func TestBucketStore_e2e(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

s := prepareStoreWithTestBlocks(t, dir, bkt, false)
s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0)
defer s.Close()

testBucketStore_e2e(t, ctx, s)
Expand Down Expand Up @@ -363,7 +363,7 @@ func TestBucketStore_ManyParts_e2e(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

s := prepareStoreWithTestBlocks(t, dir, bkt, true)
s := prepareStoreWithTestBlocks(t, dir, bkt, true, 0)
defer s.Close()

GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
testBucketStore_e2e(t, ctx, s)
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func TestBucketStore_Info(t *testing.T) {
dir, err := ioutil.TempDir("", "prometheus-test")
testutil.Ok(t, err)

bucketStore, err := NewBucketStore(nil, nil, nil, dir, 2e5, 2e5, false, 20)
bucketStore, err := NewBucketStore(nil, nil, nil, dir, 2e5, 2e5, 0, 0, false, 20)
testutil.Ok(t, err)

resp, err := bucketStore.Info(ctx, &storepb.InfoRequest{})
Expand Down
64 changes: 64 additions & 0 deletions pkg/store/gate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package store

import (
"context"
"time"

"github.com/improbable-eng/thanos/pkg/extprom"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/gate"
)

// Gate wraps the Prometheus gate with extra metrics.
type Gate struct {
g *gate.Gate
inflightQueries prometheus.Gauge
gateTiming prometheus.Histogram
}

// NewGate returns a new query gate.
func NewGate(maxConcurrent int, reg *extprom.SubsystemRegisterer) *Gate {
g := &Gate{
g: gate.New(maxConcurrent),
}
g.inflightQueries = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "queries_in_flight_total",
Help: "Total number of queries that are currently in flight.",
Subsystem: reg.Subsystem(),
})
g.gateTiming = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "gate_duration_seconds",
Help: "How many seconds it took for queries to wait at the gate.",
Buckets: []float64{
0.01, 0.05, 0.1, 0.25, 0.6, 1, 2, 3.5, 5, 10,
},
Subsystem: reg.Subsystem(),
})

if r := reg.Registerer(); r != nil {
r.MustRegister(g.inflightQueries, g.gateTiming)
}

return g
}

// IsMyTurn iniates a new query and waits until it's our turn to fulfill a query request.
func (g *Gate) IsMyTurn(ctx context.Context) error {
start := time.Now()
defer func() {
g.gateTiming.Observe(float64(time.Now().Sub(start)))
}()

if err := g.g.Start(ctx); err != nil {
return err
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
}

g.inflightQueries.Inc()
return nil
}

// Done finishes a query.
func (g *Gate) Done() {
g.inflightQueries.Dec()
g.g.Done()
}
Loading