Skip to content

Commit

Permalink
Use another approach to track index operations
Browse files Browse the repository at this point in the history
Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>
  • Loading branch information
kakkoyun committed Aug 6, 2020
1 parent 2412be2 commit 8a46f33
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 33 deletions.
2 changes: 2 additions & 0 deletions pkg/objstore/objstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/thanos-io/thanos/pkg/runutil"
)

Expand Down Expand Up @@ -257,6 +258,7 @@ func BucketWithMetrics(name string, b Bucket, reg prometheus.Registerer) *metric
ConstLabels: prometheus.Labels{"bucket": name},
Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120},
}, []string{"operation"}),

lastSuccessfulUploadTime: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "thanos_objstore_bucket_last_successful_upload_time",
Help: "Second timestamp of the last successful upload to the bucket.",
Expand Down
47 changes: 31 additions & 16 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ const (
// Labels for metrics.
labelEncode = "encode"
labelDecode = "decode"
// Label values for metrics.
opFetchPostings = "fetch-postings"
opLoadSeries = "load-series"
)

type bucketStoreMetrics struct {
Expand All @@ -105,13 +108,14 @@ type bucketStoreMetrics struct {
chunkSizeBytes prometheus.Histogram
queriesDropped prometheus.Counter
seriesRefetches prometheus.Counter
fetchDuration *prometheus.HistogramVec

cachedPostingsCompressions *prometheus.CounterVec
cachedPostingsCompressionErrors *prometheus.CounterVec
cachedPostingsCompressionTimeSeconds *prometheus.CounterVec
cachedPostingsOriginalSizeBytes prometheus.Counter
cachedPostingsCompressedSizeBytes prometheus.Counter

indexOperationDuration *prometheus.HistogramVec
}

func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
Expand Down Expand Up @@ -222,11 +226,13 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
Help: "Compressed size of postings stored into cache.",
})

m.fetchDuration = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "thanos_bucket_store_fetch_duration_seconds",
Help: "Time it takes to fetch an element from a bucket to respond a query (Any BucketReader can be used as source: objstore or caching bucket).",
Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120},
}, []string{"data_type"})
m.indexOperationDuration = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "thanos_bucket_store_index_operation_duration_seconds",
Help: "Time it takes to process index operation from a bucket to respond a query.",
Buckets: []float64{
0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120,
},
}, []string{"operation"})

return &m
}
Expand Down Expand Up @@ -480,7 +486,14 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er
lset := labels.FromMap(meta.Thanos.Labels)
h := lset.Hash()

indexHeaderReader, err := indexheader.NewBinaryReader(ctx, s.logger, s.bkt, s.dir, meta.ULID, s.postingOffsetsInMemSampling)
indexHeaderReader, err := indexheader.NewBinaryReader(
ctx,
s.logger,
s.bkt,
s.dir,
meta.ULID,
s.postingOffsetsInMemSampling,
)
if err != nil {
return errors.Wrap(err, "create index header reader")
}
Expand All @@ -493,14 +506,14 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er
b, err := newBucketBlock(
ctx,
log.With(s.logger, "block", meta.ULID),
s.metrics,
meta,
s.bkt,
dir,
s.indexCache,
s.chunkPool,
indexHeaderReader,
s.partitioner,
s.metrics.seriesRefetches,
s.enablePostingsCompression,
)
if err != nil {
Expand Down Expand Up @@ -973,9 +986,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
s.metrics.cachedPostingsCompressionTimeSeconds.WithLabelValues(labelDecode).Add(stats.cachedPostingsDecompressionTimeSum.Seconds())
s.metrics.cachedPostingsOriginalSizeBytes.Add(float64(stats.cachedPostingsOriginalSizeSum))
s.metrics.cachedPostingsCompressedSizeBytes.Add(float64(stats.cachedPostingsCompressedSizeSum))
s.metrics.fetchDuration.WithLabelValues("postings").Observe(stats.postingsFetchDurationSum.Seconds())
s.metrics.fetchDuration.WithLabelValues("series").Observe(stats.seriesFetchDurationSum.Seconds())
s.metrics.fetchDuration.WithLabelValues("chunks").Observe(stats.chunksFetchDurationSum.Seconds())

level.Debug(s.logger).Log("msg", "stats query processed",
"stats", fmt.Sprintf("%+v", stats), "err", err)
Expand Down Expand Up @@ -1268,6 +1278,7 @@ func (s *bucketBlockSet) labelMatchers(matchers ...*labels.Matcher) ([]*labels.M
// state for the block on local disk.
type bucketBlock struct {
logger log.Logger
metrics *bucketStoreMetrics
bkt objstore.BucketReader
meta *metadata.Meta
dir string
Expand All @@ -1282,8 +1293,6 @@ type bucketBlock struct {

partitioner partitioner

seriesRefetches prometheus.Counter

enablePostingsCompression bool

// Block's labels used by block-level matchers to filter blocks to query. These are used to select blocks using
Expand All @@ -1294,26 +1303,26 @@ type bucketBlock struct {
func newBucketBlock(
ctx context.Context,
logger log.Logger,
metrics *bucketStoreMetrics,
meta *metadata.Meta,
bkt objstore.BucketReader,
dir string,
indexCache storecache.IndexCache,
chunkPool pool.BytesPool,
indexHeadReader indexheader.Reader,
p partitioner,
seriesRefetches prometheus.Counter,
enablePostingsCompression bool,
) (b *bucketBlock, err error) {
b = &bucketBlock{
logger: logger,
metrics: metrics,
bkt: bkt,
indexCache: indexCache,
chunkPool: chunkPool,
dir: dir,
partitioner: p,
meta: meta,
indexHeaderReader: indexHeadReader,
seriesRefetches: seriesRefetches,
enablePostingsCompression: enablePostingsCompression,
}

Expand Down Expand Up @@ -1620,6 +1629,9 @@ type postingPtr struct {
// It returns one postings for each key, in the same order.
// If postings for given key is not fetched, entry at given index will be nil.
func (r *bucketIndexReader) fetchPostings(keys []labels.Label) ([]index.Postings, error) {
start := time.Now()
defer r.block.metrics.indexOperationDuration.WithLabelValues(opFetchPostings).Observe(time.Since(start).Seconds())

var ptrs []postingPtr

output := make([]index.Postings, len(keys))
Expand Down Expand Up @@ -1837,6 +1849,9 @@ func (it *bigEndianPostings) length() int {
}

func (r *bucketIndexReader) PreloadSeries(ids []uint64) error {
start := time.Now()
defer r.block.metrics.indexOperationDuration.WithLabelValues(opLoadSeries).Observe(float64(time.Since(start)))

// Load series from cache, overwriting the list of ids to preload
// with the missing ones.
fromCache, ids := r.block.indexCache.FetchMultiSeries(r.ctx, r.block.meta.ULID, ids)
Expand Down Expand Up @@ -1887,7 +1902,7 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []uint64, refetc
}

// Inefficient, but should be rare.
r.block.seriesRefetches.Inc()
r.block.metrics.seriesRefetches.Inc()
level.Warn(r.block.logger).Log("msg", "series size exceeded expected size; refetching", "id", id, "series length", n+int(l), "maxSeriesSize", maxSeriesSize)

// Fetch plus to get the size of next one if exists.
Expand Down
37 changes: 20 additions & 17 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ import (
"github.com/leanovate/gopter/gen"
"github.com/leanovate/gopter/prop"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/encoding"

"go.uber.org/atomic"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/indexheader"
"github.com/thanos-io/thanos/pkg/block/metadata"
Expand All @@ -52,7 +52,6 @@ import (
storetestutil "github.com/thanos-io/thanos/pkg/store/storepb/testutil"
"github.com/thanos-io/thanos/pkg/testutil"
"github.com/thanos-io/thanos/pkg/testutil/e2eutil"
"go.uber.org/atomic"
)

var emptyRelabelConfig = make([]*relabel.Config, 0)
Expand Down Expand Up @@ -209,7 +208,7 @@ func TestBucketBlock_matchLabels(t *testing.T) {
},
}

b, err := newBucketBlock(context.Background(), log.NewNopLogger(), meta, bkt, path.Join(dir, blockID.String()), nil, nil, nil, nil, nil, true)
b, err := newBucketBlock(context.Background(), log.NewNopLogger(), newBucketStoreMetrics(nil), meta, bkt, path.Join(dir, blockID.String()), nil, nil, nil, nil, true)
testutil.Ok(t, err)

cases := []struct {
Expand Down Expand Up @@ -921,10 +920,10 @@ func TestReadIndexCache_LoadSeries(t *testing.T) {
ULID: ulid.MustNew(1, nil),
},
},
bkt: bkt,
seriesRefetches: s.seriesRefetches,
logger: log.NewNopLogger(),
indexCache: noopCache{},
bkt: bkt,
logger: log.NewNopLogger(),
metrics: s,
indexCache: noopCache{},
}

buf := encoding.Encbuf{}
Expand Down Expand Up @@ -1130,6 +1129,7 @@ func benchmarkExpandedPostings(
t.Run(c.name, func(t testutil.TB) {
b := &bucketBlock{
logger: log.NewNopLogger(),
metrics: newBucketStoreMetrics(nil),
indexHeaderReader: r,
indexCache: noopCache{},
bkt: bkt,
Expand Down Expand Up @@ -1228,15 +1228,16 @@ func benchBucketSeries(t testutil.TB, samplesPerSeries, totalSeries int, request
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(blockDir, id.String())))

m := newBucketStoreMetrics(nil)
b := &bucketBlock{
indexCache: noopCache{},
logger: logger,
bkt: bkt,
meta: meta,
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")},
chunkPool: chunkPool,
seriesRefetches: promauto.With(nil).NewCounter(prometheus.CounterOpts{}),
indexCache: noopCache{},
logger: logger,
metrics: m,
bkt: bkt,
meta: meta,
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")},
chunkPool: chunkPool,
}
blocks = append(blocks, b)
}
Expand Down Expand Up @@ -1289,7 +1290,7 @@ func benchBucketSeries(t testutil.TB, samplesPerSeries, totalSeries int, request

for _, b := range blocks {
// NOTE(bwplotka): It is 4 x 1.0 for 100mln samples. Kind of make sense: long series.
testutil.Equals(t, 0.0, promtest.ToFloat64(b.seriesRefetches))
testutil.Equals(t, 0.0, promtest.ToFloat64(b.metrics.seriesRefetches))
}
}
}
Expand Down Expand Up @@ -1393,6 +1394,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
b1 = &bucketBlock{
indexCache: indexCache,
logger: logger,
metrics: newBucketStoreMetrics(nil),
bkt: bkt,
meta: meta,
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
Expand Down Expand Up @@ -1431,6 +1433,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
b2 = &bucketBlock{
indexCache: indexCache,
logger: logger,
metrics: newBucketStoreMetrics(nil),
bkt: bkt,
meta: meta,
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
Expand Down

0 comments on commit 8a46f33

Please sign in to comment.