diff --git a/pkg/cacheutil/memcached_client.go b/pkg/cacheutil/memcached_client.go index ca1dff1d73..7ce31c580c 100644 --- a/pkg/cacheutil/memcached_client.go +++ b/pkg/cacheutil/memcached_client.go @@ -466,7 +466,7 @@ func (c *memcachedClient) resolveAddrs() error { // If some of the dns resolution fails, log the error. if err := c.dnsProvider.Resolve(ctx, c.config.Addresses); err != nil { - level.Error(c.logger).Log("msg", "failed to resolve addresses for storeAPIs", "addresses", strings.Join(c.config.Addresses, ","), "err", err) + level.Error(c.logger).Log("msg", "failed to resolve addresses for memcached", "addresses", strings.Join(c.config.Addresses, ","), "err", err) } // Fail in case no server address is resolved. servers := c.dnsProvider.Addresses() diff --git a/pkg/objstore/objstore.go b/pkg/objstore/objstore.go index f5a436cc2c..b70f20e1bc 100644 --- a/pkg/objstore/objstore.go +++ b/pkg/objstore/objstore.go @@ -17,6 +17,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/thanos-io/thanos/pkg/runutil" ) @@ -257,6 +258,7 @@ func BucketWithMetrics(name string, b Bucket, reg prometheus.Registerer) *metric ConstLabels: prometheus.Labels{"bucket": name}, Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}, }, []string{"operation"}), + lastSuccessfulUploadTime: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ Name: "thanos_objstore_bucket_last_successful_upload_time", Help: "Second timestamp of the last successful upload to the bucket.", diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index f511149b0b..b6c5cc2e00 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -111,6 +111,9 @@ type bucketStoreMetrics struct { cachedPostingsCompressionTimeSeconds *prometheus.CounterVec cachedPostingsOriginalSizeBytes prometheus.Counter cachedPostingsCompressedSizeBytes prometheus.Counter + + seriesFetchDuration prometheus.Histogram + postingsFetchDuration prometheus.Histogram } func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { @@ -221,6 +224,18 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { Help: "Compressed size of postings stored into cache.", }) + m.seriesFetchDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + Name: "thanos_bucket_store_cached_series_fetch_duration_seconds", + Help: "Time it takes to fetch series from a bucket to respond a query. It also includes the time it takes to cache fetch and store operations.", + Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}, + }) + + m.postingsFetchDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + Name: "thanos_bucket_store_cached_postings_fetch_duration_seconds", + Help: "Time it takes to fetch postings from a bucket to respond a query. It also includes the time it takes to cache fetch and store operations.", + Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}, + }) + return &m } @@ -473,7 +488,14 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er lset := labels.FromMap(meta.Thanos.Labels) h := lset.Hash() - indexHeaderReader, err := indexheader.NewBinaryReader(ctx, s.logger, s.bkt, s.dir, meta.ULID, s.postingOffsetsInMemSampling) + indexHeaderReader, err := indexheader.NewBinaryReader( + ctx, + s.logger, + s.bkt, + s.dir, + meta.ULID, + s.postingOffsetsInMemSampling, + ) if err != nil { return errors.Wrap(err, "create index header reader") } @@ -486,6 +508,7 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er b, err := newBucketBlock( ctx, log.With(s.logger, "block", meta.ULID), + s.metrics, meta, s.bkt, dir, @@ -493,7 +516,6 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er s.chunkPool, indexHeaderReader, s.partitioner, - s.metrics.seriesRefetches, s.enablePostingsCompression, ) if err != nil { @@ -1258,6 +1280,7 @@ func (s *bucketBlockSet) labelMatchers(matchers ...*labels.Matcher) ([]*labels.M // state for the block on local disk. type bucketBlock struct { logger log.Logger + metrics *bucketStoreMetrics bkt objstore.BucketReader meta *metadata.Meta dir string @@ -1272,8 +1295,6 @@ type bucketBlock struct { partitioner partitioner - seriesRefetches prometheus.Counter - enablePostingsCompression bool // Block's labels used by block-level matchers to filter blocks to query. These are used to select blocks using @@ -1284,6 +1305,7 @@ type bucketBlock struct { func newBucketBlock( ctx context.Context, logger log.Logger, + metrics *bucketStoreMetrics, meta *metadata.Meta, bkt objstore.BucketReader, dir string, @@ -1291,11 +1313,11 @@ func newBucketBlock( chunkPool pool.BytesPool, indexHeadReader indexheader.Reader, p partitioner, - seriesRefetches prometheus.Counter, enablePostingsCompression bool, ) (b *bucketBlock, err error) { b = &bucketBlock{ logger: logger, + metrics: metrics, bkt: bkt, indexCache: indexCache, chunkPool: chunkPool, @@ -1303,7 +1325,6 @@ func newBucketBlock( partitioner: p, meta: meta, indexHeaderReader: indexHeadReader, - seriesRefetches: seriesRefetches, enablePostingsCompression: enablePostingsCompression, } @@ -1610,6 +1631,9 @@ type postingPtr struct { // It returns one postings for each key, in the same order. // If postings for given key is not fetched, entry at given index will be nil. func (r *bucketIndexReader) fetchPostings(keys []labels.Label) ([]index.Postings, error) { + timer := prometheus.NewTimer(r.block.metrics.postingsFetchDuration) + defer timer.ObserveDuration() + var ptrs []postingPtr output := make([]index.Postings, len(keys)) @@ -1827,6 +1851,9 @@ func (it *bigEndianPostings) length() int { } func (r *bucketIndexReader) PreloadSeries(ids []uint64) error { + timer := prometheus.NewTimer(r.block.metrics.seriesFetchDuration) + defer timer.ObserveDuration() + // Load series from cache, overwriting the list of ids to preload // with the missing ones. fromCache, ids := r.block.indexCache.FetchMultiSeries(r.ctx, r.block.meta.ULID, ids) @@ -1877,7 +1904,7 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []uint64, refetc } // Inefficient, but should be rare. - r.block.seriesRefetches.Inc() + r.block.metrics.seriesRefetches.Inc() level.Warn(r.block.logger).Log("msg", "series size exceeded expected size; refetching", "id", id, "series length", n+int(l), "maxSeriesSize", maxSeriesSize) // Fetch plus to get the size of next one if exists. diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index a70cc99482..4d3a8dd59b 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -30,8 +30,6 @@ import ( "github.com/leanovate/gopter/gen" "github.com/leanovate/gopter/prop" "github.com/oklog/ulid" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/relabel" @@ -39,6 +37,8 @@ import ( "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/encoding" + "go.uber.org/atomic" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/indexheader" "github.com/thanos-io/thanos/pkg/block/metadata" @@ -52,7 +52,6 @@ import ( storetestutil "github.com/thanos-io/thanos/pkg/store/storepb/testutil" "github.com/thanos-io/thanos/pkg/testutil" "github.com/thanos-io/thanos/pkg/testutil/e2eutil" - "go.uber.org/atomic" ) var emptyRelabelConfig = make([]*relabel.Config, 0) @@ -209,7 +208,7 @@ func TestBucketBlock_matchLabels(t *testing.T) { }, } - b, err := newBucketBlock(context.Background(), log.NewNopLogger(), meta, bkt, path.Join(dir, blockID.String()), nil, nil, nil, nil, nil, true) + b, err := newBucketBlock(context.Background(), log.NewNopLogger(), newBucketStoreMetrics(nil), meta, bkt, path.Join(dir, blockID.String()), nil, nil, nil, nil, true) testutil.Ok(t, err) cases := []struct { @@ -921,10 +920,10 @@ func TestReadIndexCache_LoadSeries(t *testing.T) { ULID: ulid.MustNew(1, nil), }, }, - bkt: bkt, - seriesRefetches: s.seriesRefetches, - logger: log.NewNopLogger(), - indexCache: noopCache{}, + bkt: bkt, + logger: log.NewNopLogger(), + metrics: s, + indexCache: noopCache{}, } buf := encoding.Encbuf{} @@ -1130,6 +1129,7 @@ func benchmarkExpandedPostings( t.Run(c.name, func(t testutil.TB) { b := &bucketBlock{ logger: log.NewNopLogger(), + metrics: newBucketStoreMetrics(nil), indexHeaderReader: r, indexCache: noopCache{}, bkt: bkt, @@ -1228,15 +1228,16 @@ func benchBucketSeries(t testutil.TB, samplesPerSeries, totalSeries int, request testutil.Ok(t, err) testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(blockDir, id.String()))) + m := newBucketStoreMetrics(nil) b := &bucketBlock{ - indexCache: noopCache{}, - logger: logger, - bkt: bkt, - meta: meta, - partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize}, - chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")}, - chunkPool: chunkPool, - seriesRefetches: promauto.With(nil).NewCounter(prometheus.CounterOpts{}), + indexCache: noopCache{}, + logger: logger, + metrics: m, + bkt: bkt, + meta: meta, + partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize}, + chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")}, + chunkPool: chunkPool, } blocks = append(blocks, b) } @@ -1289,7 +1290,7 @@ func benchBucketSeries(t testutil.TB, samplesPerSeries, totalSeries int, request for _, b := range blocks { // NOTE(bwplotka): It is 4 x 1.0 for 100mln samples. Kind of make sense: long series. - testutil.Equals(t, 0.0, promtest.ToFloat64(b.seriesRefetches)) + testutil.Equals(t, 0.0, promtest.ToFloat64(b.metrics.seriesRefetches)) } } } @@ -1393,6 +1394,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { b1 = &bucketBlock{ indexCache: indexCache, logger: logger, + metrics: newBucketStoreMetrics(nil), bkt: bkt, meta: meta, partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize}, @@ -1431,6 +1433,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { b2 = &bucketBlock{ indexCache: indexCache, logger: logger, + metrics: newBucketStoreMetrics(nil), bkt: bkt, meta: meta, partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},