Skip to content

Commit

Permalink
store: add ability to limit max samples / conc. queries
Browse files Browse the repository at this point in the history
  • Loading branch information
Giedrius Statkevičius authored and GiedriusS committed Feb 4, 2019
1 parent 0061958 commit 1bc1f59
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 14 deletions.
3 changes: 3 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
chunkPoolSize := cmd.Flag("chunk-pool-size", "Maximum size of concurrently allocatable bytes for chunks.").
Default("2GB").Bytes()

maxSampleCount := cmd.Flag("grpc-sample-limit", "Maximum amount of samples returned via a single Series call. 0 means no limit.").
Default("50000000").Uint()

maxConcurrent := cmd.Flag("grpc-concurrent-limit", "Maximum number of concurrent Series calls. 0 means no limit.").Default("20").Int()

objStoreConfig := regCommonObjStoreFlags(cmd, "", true)

syncInterval := cmd.Flag("sync-block-duration", "Repeat interval for syncing the blocks between local and remote view.").
Expand Down Expand Up @@ -63,6 +68,8 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
peer,
uint64(*indexCacheSize),
uint64(*chunkPoolSize),
uint64(*maxSampleCount),
int(*maxConcurrent),
name,
debugLogging,
*syncInterval,
Expand All @@ -87,6 +94,8 @@ func runStore(
peer cluster.Peer,
indexCacheSizeBytes uint64,
chunkPoolSizeBytes uint64,
maxSampleCount uint64,
maxConcurrent int,
component string,
verbose bool,
syncInterval time.Duration,
Expand Down Expand Up @@ -117,6 +126,8 @@ func runStore(
dataDir,
indexCacheSizeBytes,
chunkPoolSizeBytes,
maxSampleCount,
maxConcurrent,
verbose,
blockSyncConcurrency,
)
Expand Down
6 changes: 6 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ Flags:
--index-cache-size=250MB Maximum size of items held in the index cache.
--chunk-pool-size=2GB Maximum size of concurrently allocatable bytes
for chunks.
--grpc-sample-limit=50000000
Maximum amount of samples returned via a single
Series call. 0 means no limit.
--grpc-concurrent-limit=20
Maximum number of concurrent Series calls. 0
means no limit.
--objstore.config-file=<bucket.config-yaml-path>
Path to YAML file that contains object store
configuration.
Expand Down
3 changes: 2 additions & 1 deletion pkg/compact/downsample/downsample_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package downsample

import (
"github.com/prometheus/tsdb"
"io/ioutil"
"math"
"os"
"path/filepath"
"testing"

"github.com/prometheus/tsdb"

"github.com/improbable-eng/thanos/pkg/block/metadata"

"github.com/prometheus/prometheus/pkg/value"
Expand Down
50 changes: 40 additions & 10 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,13 @@ type BucketStore struct {
debugLogging bool
// Number of goroutines to use when syncing blocks from object storage.
blockSyncConcurrency int

// The maximum of samples Thanos Store could return in one Series() call.
// Set to 0 to remove this limit (not recommended).
maxSampleCount uint64

// Query gate which limits the maximum amount of concurrent queries.
queryGate *Gate
}

// NewBucketStore creates a new bucket backed store that implements the store API against
Expand All @@ -182,6 +189,8 @@ func NewBucketStore(
dir string,
indexCacheSizeBytes uint64,
maxChunkPoolBytes uint64,
maxSampleCount uint64,
maxConcurrent int,
debugLogging bool,
blockSyncConcurrency int,
) (*BucketStore, error) {
Expand All @@ -204,8 +213,10 @@ func NewBucketStore(
chunkPool: chunkPool,
blocks: map[ulid.ULID]*bucketBlock{},
blockSets: map[uint64]*bucketBlockSet{},
maxSampleCount: maxSampleCount,
debugLogging: debugLogging,
blockSyncConcurrency: blockSyncConcurrency,
queryGate: NewGate(maxConcurrent, reg),
}
s.metrics = newBucketStoreMetrics(reg)

Expand Down Expand Up @@ -463,14 +474,16 @@ func (s *bucketSeriesSet) Err() error {
return s.err
}

func (s *BucketStore) blockSeries(
func (bs *BucketStore) blockSeries(
ctx context.Context,
ulid ulid.ULID,
extLset map[string]string,
indexr *bucketIndexReader,
chunkr *bucketChunkReader,
matchers []labels.Matcher,
req *storepb.SeriesRequest,
samples *uint64,
samplesLock *sync.Mutex,
) (storepb.SeriesSet, *queryStats, error) {
ps, err := indexr.ExpandedPostings(matchers)
if err != nil {
Expand Down Expand Up @@ -559,7 +572,7 @@ func (s *BucketStore) blockSeries(
if err != nil {
return nil, nil, errors.Wrap(err, "get chunk")
}
if err := populateChunk(&s.chks[i], chk, req.Aggregates); err != nil {
if err := bs.populateChunk(&s.chks[i], chk, req.Aggregates, samples, samplesLock); err != nil {
return nil, nil, errors.Wrap(err, "populate chunk")
}
}
Expand All @@ -568,7 +581,9 @@ func (s *BucketStore) blockSeries(
return newBucketSeriesSet(res), indexr.stats.merge(chunkr.stats), nil
}

func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr) error {
func (bs *BucketStore) populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr,
samples *uint64, samplesLock *sync.Mutex) error {

if in.Encoding() == chunkenc.EncXOR {
out.Raw = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: in.Bytes()}
return nil
Expand All @@ -579,6 +594,14 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag

ac := downsample.AggrChunk(in.Bytes())

samplesLock.Lock()
*samples += uint64(ac.NumSamples())
if bs.maxSampleCount > 0 && *samples > bs.maxSampleCount {
samplesLock.Unlock()
return errors.Errorf("sample limit violated (got %v, limit %v)", *samples, bs.maxSampleCount)
}
samplesLock.Unlock()

for _, at := range aggrs {
switch at {
case storepb.Aggr_COUNT:
Expand Down Expand Up @@ -652,19 +675,24 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt int64, lset labels
}

// Series implements the storepb.StoreServer interface.
// TODO(bwplotka): It buffers all chunks in memory and only then streams to client.
// 1. Either count chunk sizes and error out too big query.
// 2. Stream posting -> series -> chunk all together.
func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
err := s.queryGate.Start(srv.Context())
if err != nil {
return errors.Wrapf(err, "gate Start failed")
}
defer s.queryGate.Done()

matchers, err := translateMatchers(req.Matchers)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
var (
stats = &queryStats{}
g run.Group
res []storepb.SeriesSet
mtx sync.Mutex
stats = &queryStats{}
g run.Group
res []storepb.SeriesSet
mtx sync.Mutex
samples uint64
samplesLock sync.Mutex
)
s.mtx.RLock()

Expand Down Expand Up @@ -701,6 +729,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
chunkr,
blockMatchers,
req,
&samples,
&samplesLock,
)
if err != nil {
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID)
Expand Down
31 changes: 28 additions & 3 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (s *storeSuite) Close() {
s.wg.Wait()
}

func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket) *storeSuite {
func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, maxSampleCount uint64) *storeSuite {
series := []labels.Labels{
labels.FromStrings("a", "1", "b", "1"),
labels.FromStrings("a", "1", "b", "2"),
Expand Down Expand Up @@ -87,7 +87,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket) *
testutil.Ok(t, os.RemoveAll(dir2))
}

store, err := NewBucketStore(log.NewLogfmtLogger(os.Stderr), nil, bkt, dir, 100, 0, false, 20)
store, err := NewBucketStore(log.NewLogfmtLogger(os.Stderr), nil, bkt, dir, 100, 0, maxSampleCount, 20, false, 20)
testutil.Ok(t, err)

s.store = store
Expand Down Expand Up @@ -126,7 +126,7 @@ func TestBucketStore_e2e(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

s := prepareStoreWithTestBlocks(t, dir, bkt)
s := prepareStoreWithTestBlocks(t, dir, bkt, 0)
defer s.Close()

mint, maxt := s.store.TimeRange()
Expand Down Expand Up @@ -215,6 +215,31 @@ func TestBucketStore_e2e(t *testing.T) {
MaxTime: maxt,
}, srv))
testutil.Equals(t, 0, len(srv.SeriesSet))

// Test the samples limit.
testutil.Ok(t, os.RemoveAll(dir))
s = prepareStoreWithTestBlocks(t, dir, bkt, 10)
mint, maxt = s.store.TimeRange()
defer s.Close()

srv = newStoreSeriesServer(ctx)

testutil.Ok(t, s.store.Series(&storepb.SeriesRequest{
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "a", Value: "1"},
},
MinTime: mint,
MaxTime: maxt,
}, srv))

testutil.NotOk(t, s.store.Series(&storepb.SeriesRequest{
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_RE, Name: "a", Value: "1|2"},
},
MinTime: mint,
MaxTime: maxt,
}, srv))

})

}
43 changes: 43 additions & 0 deletions pkg/store/gate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package store

import (
"context"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/gate"
)

// Gate wraps the Prometheus gate with extra metrics.
type Gate struct {
g *gate.Gate
currentQueries prometheus.Gauge
}

// NewGate returns a new gate.
func NewGate(maxConcurrent int, reg prometheus.Registerer) *Gate {
g := &Gate{
g: gate.New(maxConcurrent),
}
g.currentQueries = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "thanos_bucket_store_queries_total",
Help: "Total number of currently executing queries.",
})

if reg != nil {
reg.MustRegister(g.currentQueries)
}

return g
}

// Start iniates a new query.
func (g *Gate) Start(ctx context.Context) error {
g.currentQueries.Inc()
return g.g.Start(ctx)
}

// Done finishes a query.
func (g *Gate) Done() {
g.currentQueries.Dec()
g.g.Done()
}

0 comments on commit 1bc1f59

Please sign in to comment.