diff --git a/Gopkg.lock b/Gopkg.lock index 907ab5fca0..1461563fde 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -537,6 +537,7 @@ "fileutil", "index", "labels", + "testutil", "wal", ] pruneopts = "" @@ -853,6 +854,7 @@ "github.com/prometheus/common/version", "github.com/prometheus/prometheus/discovery/file", "github.com/prometheus/prometheus/discovery/targetgroup", + "github.com/prometheus/prometheus/pkg/gate", "github.com/prometheus/prometheus/pkg/labels", "github.com/prometheus/prometheus/pkg/timestamp", "github.com/prometheus/prometheus/pkg/value", @@ -867,6 +869,7 @@ "github.com/prometheus/tsdb/fileutil", "github.com/prometheus/tsdb/index", "github.com/prometheus/tsdb/labels", + "github.com/prometheus/tsdb/testutil", "golang.org/x/net/context", "golang.org/x/sync/errgroup", "golang.org/x/text/language", diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index ddf9893061..25f8512abf 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -36,6 +36,11 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string chunkPoolSize := cmd.Flag("chunk-pool-size", "Maximum size of concurrently allocatable bytes for chunks."). Default("2GB").Bytes() + maxSampleCount := cmd.Flag("grpc-sample-limit", "Maximum amount of samples returned via a single Series call. 0 means no limit."). + Default("50000000").Uint() + + maxConcurrent := cmd.Flag("grpc-concurrent-limit", "Maximum number of concurrent Series calls. 0 means no limit.").Default("20").Int() + objStoreConfig := regCommonObjStoreFlags(cmd, "", true) syncInterval := cmd.Flag("sync-block-duration", "Repeat interval for syncing the blocks between local and remote view."). @@ -63,6 +68,8 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string peer, uint64(*indexCacheSize), uint64(*chunkPoolSize), + uint64(*maxSampleCount), + int(*maxConcurrent), name, debugLogging, *syncInterval, @@ -87,6 +94,8 @@ func runStore( peer cluster.Peer, indexCacheSizeBytes uint64, chunkPoolSizeBytes uint64, + maxSampleCount uint64, + maxConcurrent int, component string, verbose bool, syncInterval time.Duration, @@ -117,6 +126,8 @@ func runStore( dataDir, indexCacheSizeBytes, chunkPoolSizeBytes, + maxSampleCount, + maxConcurrent, verbose, blockSyncConcurrency, ) diff --git a/docs/components/store.md b/docs/components/store.md index e32aa11d16..866cfe2642 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -104,6 +104,12 @@ Flags: --index-cache-size=250MB Maximum size of items held in the index cache. --chunk-pool-size=2GB Maximum size of concurrently allocatable bytes for chunks. + --grpc-sample-limit=50000000 + Maximum amount of samples returned via a single + Series call. 0 means no limit. + --grpc-concurrent-limit=20 + Maximum number of concurrent Series calls. 0 + means no limit. --objstore.config-file= Path to YAML file that contains object store configuration. diff --git a/pkg/compact/downsample/downsample_test.go b/pkg/compact/downsample/downsample_test.go index bb2c38b17a..2a56426c96 100644 --- a/pkg/compact/downsample/downsample_test.go +++ b/pkg/compact/downsample/downsample_test.go @@ -1,13 +1,14 @@ package downsample import ( - "github.com/prometheus/tsdb" "io/ioutil" "math" "os" "path/filepath" "testing" + "github.com/prometheus/tsdb" + "github.com/improbable-eng/thanos/pkg/block/metadata" "github.com/prometheus/prometheus/pkg/value" diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index e006f5c70c..090d4367c5 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -171,6 +171,13 @@ type BucketStore struct { debugLogging bool // Number of goroutines to use when syncing blocks from object storage. blockSyncConcurrency int + + // The maximum of samples Thanos Store could return in one Series() call. + // Set to 0 to remove this limit (not recommended). + maxSampleCount uint64 + + // Query gate which limits the maximum amount of concurrent queries. + queryGate *Gate } // NewBucketStore creates a new bucket backed store that implements the store API against @@ -182,6 +189,8 @@ func NewBucketStore( dir string, indexCacheSizeBytes uint64, maxChunkPoolBytes uint64, + maxSampleCount uint64, + maxConcurrent int, debugLogging bool, blockSyncConcurrency int, ) (*BucketStore, error) { @@ -204,8 +213,10 @@ func NewBucketStore( chunkPool: chunkPool, blocks: map[ulid.ULID]*bucketBlock{}, blockSets: map[uint64]*bucketBlockSet{}, + maxSampleCount: maxSampleCount, debugLogging: debugLogging, blockSyncConcurrency: blockSyncConcurrency, + queryGate: NewGate(maxConcurrent, reg), } s.metrics = newBucketStoreMetrics(reg) @@ -463,7 +474,7 @@ func (s *bucketSeriesSet) Err() error { return s.err } -func (s *BucketStore) blockSeries( +func (bs *BucketStore) blockSeries( ctx context.Context, ulid ulid.ULID, extLset map[string]string, @@ -471,6 +482,8 @@ func (s *BucketStore) blockSeries( chunkr *bucketChunkReader, matchers []labels.Matcher, req *storepb.SeriesRequest, + samples *uint64, + samplesLock *sync.Mutex, ) (storepb.SeriesSet, *queryStats, error) { ps, err := indexr.ExpandedPostings(matchers) if err != nil { @@ -559,7 +572,7 @@ func (s *BucketStore) blockSeries( if err != nil { return nil, nil, errors.Wrap(err, "get chunk") } - if err := populateChunk(&s.chks[i], chk, req.Aggregates); err != nil { + if err := bs.populateChunk(&s.chks[i], chk, req.Aggregates, samples, samplesLock); err != nil { return nil, nil, errors.Wrap(err, "populate chunk") } } @@ -568,7 +581,9 @@ func (s *BucketStore) blockSeries( return newBucketSeriesSet(res), indexr.stats.merge(chunkr.stats), nil } -func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr) error { +func (bs *BucketStore) populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr, + samples *uint64, samplesLock *sync.Mutex) error { + if in.Encoding() == chunkenc.EncXOR { out.Raw = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: in.Bytes()} return nil @@ -579,6 +594,14 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag ac := downsample.AggrChunk(in.Bytes()) + samplesLock.Lock() + *samples += uint64(ac.NumSamples()) + if bs.maxSampleCount > 0 && *samples > bs.maxSampleCount { + samplesLock.Unlock() + return errors.Errorf("sample limit violated (got %v, limit %v)", *samples, bs.maxSampleCount) + } + samplesLock.Unlock() + for _, at := range aggrs { switch at { case storepb.Aggr_COUNT: @@ -652,19 +675,24 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt int64, lset labels } // Series implements the storepb.StoreServer interface. -// TODO(bwplotka): It buffers all chunks in memory and only then streams to client. -// 1. Either count chunk sizes and error out too big query. -// 2. Stream posting -> series -> chunk all together. func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { + err := s.queryGate.Start(srv.Context()) + if err != nil { + return errors.Wrapf(err, "gate Start failed") + } + defer s.queryGate.Done() + matchers, err := translateMatchers(req.Matchers) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) } var ( - stats = &queryStats{} - g run.Group - res []storepb.SeriesSet - mtx sync.Mutex + stats = &queryStats{} + g run.Group + res []storepb.SeriesSet + mtx sync.Mutex + samples uint64 + samplesLock sync.Mutex ) s.mtx.RLock() @@ -701,6 +729,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie chunkr, blockMatchers, req, + &samples, + &samplesLock, ) if err != nil { return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID) diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 28122ac367..0f3850dc0d 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -35,7 +35,7 @@ func (s *storeSuite) Close() { s.wg.Wait() } -func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket) *storeSuite { +func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, maxSampleCount uint64) *storeSuite { series := []labels.Labels{ labels.FromStrings("a", "1", "b", "1"), labels.FromStrings("a", "1", "b", "2"), @@ -87,7 +87,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket) * testutil.Ok(t, os.RemoveAll(dir2)) } - store, err := NewBucketStore(log.NewLogfmtLogger(os.Stderr), nil, bkt, dir, 100, 0, false, 20) + store, err := NewBucketStore(log.NewLogfmtLogger(os.Stderr), nil, bkt, dir, 100, 0, maxSampleCount, 20, false, 20) testutil.Ok(t, err) s.store = store @@ -126,7 +126,7 @@ func TestBucketStore_e2e(t *testing.T) { testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - s := prepareStoreWithTestBlocks(t, dir, bkt) + s := prepareStoreWithTestBlocks(t, dir, bkt, 0) defer s.Close() mint, maxt := s.store.TimeRange() @@ -215,6 +215,31 @@ func TestBucketStore_e2e(t *testing.T) { MaxTime: maxt, }, srv)) testutil.Equals(t, 0, len(srv.SeriesSet)) + + // Test the samples limit. + testutil.Ok(t, os.RemoveAll(dir)) + s = prepareStoreWithTestBlocks(t, dir, bkt, 10) + mint, maxt = s.store.TimeRange() + defer s.Close() + + srv = newStoreSeriesServer(ctx) + + testutil.Ok(t, s.store.Series(&storepb.SeriesRequest{ + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "1"}, + }, + MinTime: mint, + MaxTime: maxt, + }, srv)) + + testutil.NotOk(t, s.store.Series(&storepb.SeriesRequest{ + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_RE, Name: "a", Value: "1|2"}, + }, + MinTime: mint, + MaxTime: maxt, + }, srv)) + }) } diff --git a/pkg/store/gate.go b/pkg/store/gate.go new file mode 100644 index 0000000000..555aa1828f --- /dev/null +++ b/pkg/store/gate.go @@ -0,0 +1,43 @@ +package store + +import ( + "context" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/pkg/gate" +) + +// Gate wraps the Prometheus gate with extra metrics. +type Gate struct { + g *gate.Gate + currentQueries prometheus.Gauge +} + +// NewGate returns a new gate. +func NewGate(maxConcurrent int, reg prometheus.Registerer) *Gate { + g := &Gate{ + g: gate.New(maxConcurrent), + } + g.currentQueries = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "thanos_bucket_store_queries_total", + Help: "Total number of currently executing queries.", + }) + + if reg != nil { + reg.MustRegister(g.currentQueries) + } + + return g +} + +// Start iniates a new query. +func (g *Gate) Start(ctx context.Context) error { + g.currentQueries.Inc() + return g.g.Start(ctx) +} + +// Done finishes a query. +func (g *Gate) Done() { + g.currentQueries.Dec() + g.g.Done() +}