Skip to content

Commit

Permalink
Improve binary reader metrics
Browse files Browse the repository at this point in the history
Signed-off-by: 🌲 Harry 🌊 John 🏔 <johrry@amazon.com>
  • Loading branch information
harry671003 committed Oct 25, 2023
1 parent 0af4c29 commit 1f4a2aa
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 32 deletions.
46 changes: 40 additions & 6 deletions pkg/block/indexheader/binary_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/go-kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/tsdb/encoding"
"github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/prometheus/prometheus/tsdb/index"
Expand Down Expand Up @@ -64,6 +66,28 @@ func newCRC32() hash.Hash32 {
return crc32.New(castagnoliTable)
}

// LazyBinaryReaderMetrics holds metrics tracked by LazyBinaryReader.
type BinaryReaderMetrics struct {
downloadDuration prometheus.Histogram
loadDuration prometheus.Histogram
}

// NewBinaryReaderMetrics makes new BinaryReaderMetrics.
func NewBinaryReaderMetrics(reg prometheus.Registerer) *BinaryReaderMetrics {
return &BinaryReaderMetrics{
downloadDuration: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "indexheader_download_duration_seconds",
Help: "Duration of the index-header download from objstore in seconds.",
Buckets: []float64{0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 15, 30, 60, 90, 120, 300},
}),
loadDuration: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "indexheader_load_duration_seconds",
Help: "Duration of the index-header loading in seconds.",
Buckets: []float64{0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 15, 30, 60, 90, 120, 300},
}),
}
}

// BinaryTOC is a table of content for index-header file.
type BinaryTOC struct {
// Symbols holds start to the same symbols section as index related to this index header.
Expand Down Expand Up @@ -529,13 +553,15 @@ type BinaryReader struct {
indexLastPostingEnd int64

postingOffsetsInMemSampling int

metrics *BinaryReaderMetrics
}

// NewBinaryReader loads or builds new index-header if not present on disk.
func NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID, postingOffsetsInMemSampling int) (*BinaryReader, error) {
func NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID, postingOffsetsInMemSampling int, metrics *BinaryReaderMetrics) (*BinaryReader, error) {
if dir != "" {
binfn := filepath.Join(dir, id.String(), block.IndexHeaderFilename)
br, err := newFileBinaryReader(binfn, postingOffsetsInMemSampling)
br, err := newFileBinaryReader(binfn, postingOffsetsInMemSampling, metrics)
if err == nil {
return br, nil
}
Expand All @@ -546,25 +572,27 @@ func NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.Bucket
if _, err := WriteBinary(ctx, bkt, id, binfn); err != nil {
return nil, errors.Wrap(err, "write index header")
}
metrics.loadDuration.Observe(time.Since(start).Seconds())

level.Debug(logger).Log("msg", "built index-header file", "path", binfn, "elapsed", time.Since(start))
return newFileBinaryReader(binfn, postingOffsetsInMemSampling)
return newFileBinaryReader(binfn, postingOffsetsInMemSampling, metrics)
} else {
buf, err := WriteBinary(ctx, bkt, id, "")
if err != nil {
return nil, errors.Wrap(err, "generate index header")
}

return newMemoryBinaryReader(buf, postingOffsetsInMemSampling)
return newMemoryBinaryReader(buf, postingOffsetsInMemSampling, metrics)
}
}

func newMemoryBinaryReader(buf []byte, postingOffsetsInMemSampling int) (bw *BinaryReader, err error) {
func newMemoryBinaryReader(buf []byte, postingOffsetsInMemSampling int, metrics *BinaryReaderMetrics) (bw *BinaryReader, err error) {
r := &BinaryReader{
b: realByteSlice(buf),
c: nil,
postings: map[string]*postingValueOffsets{},
postingOffsetsInMemSampling: postingOffsetsInMemSampling,
metrics: metrics,
}

if err := r.init(); err != nil {
Expand All @@ -574,7 +602,7 @@ func newMemoryBinaryReader(buf []byte, postingOffsetsInMemSampling int) (bw *Bin
return r, nil
}

func newFileBinaryReader(path string, postingOffsetsInMemSampling int) (bw *BinaryReader, err error) {
func newFileBinaryReader(path string, postingOffsetsInMemSampling int, metrics *BinaryReaderMetrics) (bw *BinaryReader, err error) {
f, err := fileutil.OpenMmapFile(path)
if err != nil {
return nil, err
Expand All @@ -590,6 +618,7 @@ func newFileBinaryReader(path string, postingOffsetsInMemSampling int) (bw *Bina
c: f,
postings: map[string]*postingValueOffsets{},
postingOffsetsInMemSampling: postingOffsetsInMemSampling,
metrics: metrics,
}

if err := r.init(); err != nil {
Expand Down Expand Up @@ -624,6 +653,11 @@ func newBinaryTOCFromByteSlice(bs index.ByteSlice) (*BinaryTOC, error) {
}

func (r *BinaryReader) init() (err error) {
start := time.Now()

defer func() {
r.metrics.loadDuration.Observe(time.Since(start).Seconds())
}()
// Verify header.
if r.b.Len() < headerLen {
return errors.Wrap(encoding.ErrInvalidSize, "index header's header")
Expand Down
8 changes: 4 additions & 4 deletions pkg/block/indexheader/header_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestReaders(t *testing.T) {
_, err := WriteBinary(ctx, bkt, id, fn)
testutil.Ok(t, err)

br, err := NewBinaryReader(ctx, log.NewNopLogger(), nil, tmpDir, id, 3)
br, err := NewBinaryReader(ctx, log.NewNopLogger(), nil, tmpDir, id, 3, NewBinaryReaderMetrics(nil))
testutil.Ok(t, err)

defer func() { testutil.Ok(t, br.Close()) }()
Expand Down Expand Up @@ -206,7 +206,7 @@ func TestReaders(t *testing.T) {
_, err := WriteBinary(ctx, bkt, id, fn)
testutil.Ok(t, err)

br, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), nil, tmpDir, id, 3, NewLazyBinaryReaderMetrics(nil), nil)
br, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), nil, tmpDir, id, 3, NewLazyBinaryReaderMetrics(nil), NewBinaryReaderMetrics(nil), nil)
testutil.Ok(t, err)

defer func() { testutil.Ok(t, br.Close()) }()
Expand Down Expand Up @@ -399,7 +399,7 @@ func BenchmarkBinaryReader(t *testing.B) {

t.ResetTimer()
for i := 0; i < t.N; i++ {
br, err := newFileBinaryReader(fn, 32)
br, err := newFileBinaryReader(fn, 32, NewBinaryReaderMetrics(nil))
testutil.Ok(t, err)
testutil.Ok(t, br.Close())
}
Expand Down Expand Up @@ -437,7 +437,7 @@ func benchmarkBinaryReaderLookupSymbol(b *testing.B, numSeries int) {
testutil.Ok(b, block.Upload(ctx, logger, bkt, filepath.Join(tmpDir, id1.String()), metadata.NoneFunc))

// Create an index reader.
reader, err := NewBinaryReader(ctx, logger, bkt, tmpDir, id1, postingOffsetsInMemSampling)
reader, err := NewBinaryReader(ctx, logger, bkt, tmpDir, id1, postingOffsetsInMemSampling, NewBinaryReaderMetrics(nil))
testutil.Ok(b, err)

// Get the offset of each label value symbol.
Expand Down
8 changes: 6 additions & 2 deletions pkg/block/indexheader/lazy_binary_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewLazyBinaryReaderMetrics(reg prometheus.Registerer) *LazyBinaryReaderMetr
loadDuration: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "indexheader_lazy_load_duration_seconds",
Help: "Duration of the index-header lazy loading in seconds.",
Buckets: []float64{0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 15, 30, 60, 120, 300},
Buckets: []float64{0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 15, 30, 60, 90, 120, 300},
}),
}
}
Expand All @@ -74,6 +74,7 @@ type LazyBinaryReader struct {
id ulid.ULID
postingOffsetsInMemSampling int
metrics *LazyBinaryReaderMetrics
binaryReaderMetrics *BinaryReaderMetrics
onClosed func(*LazyBinaryReader)

readerMx sync.RWMutex
Expand All @@ -96,6 +97,7 @@ func NewLazyBinaryReader(
id ulid.ULID,
postingOffsetsInMemSampling int,
metrics *LazyBinaryReaderMetrics,
binaryReaderMetrics *BinaryReaderMetrics,
onClosed func(*LazyBinaryReader),
) (*LazyBinaryReader, error) {
if dir != "" {
Expand All @@ -114,6 +116,7 @@ func NewLazyBinaryReader(
}

level.Debug(logger).Log("msg", "built index-header file", "path", indexHeaderFile, "elapsed", time.Since(start))
binaryReaderMetrics.downloadDuration.Observe(time.Since(start).Seconds())
}
}

Expand All @@ -125,6 +128,7 @@ func NewLazyBinaryReader(
id: id,
postingOffsetsInMemSampling: postingOffsetsInMemSampling,
metrics: metrics,
binaryReaderMetrics: binaryReaderMetrics,
usedAt: atomic.NewInt64(time.Now().UnixNano()),
onClosed: onClosed,
}, nil
Expand Down Expand Up @@ -257,7 +261,7 @@ func (r *LazyBinaryReader) load() (returnErr error) {
r.metrics.loadCount.Inc()
startTime := time.Now()

reader, err := NewBinaryReader(r.ctx, r.logger, r.bkt, r.dir, r.id, r.postingOffsetsInMemSampling)
reader, err := NewBinaryReader(r.ctx, r.logger, r.bkt, r.dir, r.id, r.postingOffsetsInMemSampling, r.binaryReaderMetrics)
if err != nil {
r.metrics.loadFailedCount.Inc()
r.readerErr = err
Expand Down
18 changes: 11 additions & 7 deletions pkg/block/indexheader/lazy_binary_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ func TestNewLazyBinaryReader_ShouldFailIfUnableToBuildIndexHeader(t *testing.T)
bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt"))
testutil.Ok(t, err)
defer func() { testutil.Ok(t, bkt.Close()) }()

_, err = NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, ulid.MustNew(0, nil), 3, NewLazyBinaryReaderMetrics(nil), nil)
_, err = NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, ulid.MustNew(0, nil), 3, NewLazyBinaryReaderMetrics(nil), NewBinaryReaderMetrics(nil), nil)
testutil.NotOk(t, err)
}

Expand All @@ -54,7 +53,8 @@ func TestNewLazyBinaryReader_ShouldBuildIndexHeaderFromBucket(t *testing.T) {
testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc))

m := NewLazyBinaryReaderMetrics(nil)
r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, nil)
bm := NewBinaryReaderMetrics(nil)
r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil)
testutil.Ok(t, err)
testutil.Assert(t, r.reader == nil)
testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadCount))
Expand Down Expand Up @@ -97,7 +97,8 @@ func TestNewLazyBinaryReader_ShouldRebuildCorruptedIndexHeader(t *testing.T) {
testutil.Ok(t, os.WriteFile(headerFilename, []byte("xxx"), os.ModePerm))

m := NewLazyBinaryReaderMetrics(nil)
r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, nil)
bm := NewBinaryReaderMetrics(nil)
r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil)
testutil.Ok(t, err)
testutil.Assert(t, r.reader == nil)
testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadCount))
Expand Down Expand Up @@ -131,7 +132,8 @@ func TestLazyBinaryReader_ShouldReopenOnUsageAfterClose(t *testing.T) {
testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc))

m := NewLazyBinaryReaderMetrics(nil)
r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, nil)
bm := NewBinaryReaderMetrics(nil)
r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil)
testutil.Ok(t, err)
testutil.Assert(t, r.reader == nil)

Expand Down Expand Up @@ -181,7 +183,8 @@ func TestLazyBinaryReader_unload_ShouldReturnErrorIfNotIdle(t *testing.T) {
testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc))

m := NewLazyBinaryReaderMetrics(nil)
r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, nil)
bm := NewBinaryReaderMetrics(nil)
r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil)
testutil.Ok(t, err)
testutil.Assert(t, r.reader == nil)

Expand Down Expand Up @@ -230,7 +233,8 @@ func TestLazyBinaryReader_LoadUnloadRaceCondition(t *testing.T) {
testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc))

m := NewLazyBinaryReaderMetrics(nil)
r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, nil)
bm := NewBinaryReaderMetrics(nil)
r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil)
testutil.Ok(t, err)
testutil.Assert(t, r.reader == nil)
t.Cleanup(func() {
Expand Down
10 changes: 6 additions & 4 deletions pkg/block/indexheader/reader_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ import (

// ReaderPoolMetrics holds metrics tracked by ReaderPool.
type ReaderPoolMetrics struct {
lazyReader *LazyBinaryReaderMetrics
lazyReader *LazyBinaryReaderMetrics
binaryReader *BinaryReaderMetrics
}

// NewReaderPoolMetrics makes new ReaderPoolMetrics.
func NewReaderPoolMetrics(reg prometheus.Registerer) *ReaderPoolMetrics {
return &ReaderPoolMetrics{
lazyReader: NewLazyBinaryReaderMetrics(reg),
lazyReader: NewLazyBinaryReaderMetrics(reg),
binaryReader: NewBinaryReaderMetrics(reg),
}
}

Expand Down Expand Up @@ -84,9 +86,9 @@ func (p *ReaderPool) NewBinaryReader(ctx context.Context, logger log.Logger, bkt
var err error

if p.lazyReaderEnabled {
reader, err = NewLazyBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, p.metrics.lazyReader, p.onLazyReaderClosed)
reader, err = NewLazyBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, p.metrics.lazyReader, p.metrics.binaryReader, p.onLazyReaderClosed)
} else {
reader, err = NewBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling)
reader, err = NewBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, p.metrics.binaryReader)
}

if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,8 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
m.blockLoadDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "thanos_bucket_store_block_load_duration_seconds",
Help: "The total time taken to load a block in seconds.",
Buckets: []float64{0.1, 0.5, 1, 10, 20, 30, 60, 120},
Buckets: []float64{0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 15, 30, 60, 90, 120, 300},
})

m.seriesDataTouched = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "thanos_bucket_store_series_data_touched",
Help: "Number of items of a data type touched to fulfill a single Store API series request.",
Expand Down
14 changes: 7 additions & 7 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1067,7 +1067,7 @@ func TestBucketIndexReader_ExpandedPostings(t *testing.T) {

id := uploadTestBlock(tb, tmpDir, bkt, 500)

r, err := indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, id, DefaultPostingOffsetInMemorySampling)
r, err := indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, id, DefaultPostingOffsetInMemorySampling, indexheader.NewBinaryReaderMetrics(nil))
testutil.Ok(tb, err)

benchmarkExpandedPostings(tb, bkt, id, r, 500)
Expand All @@ -1083,7 +1083,7 @@ func BenchmarkBucketIndexReader_ExpandedPostings(b *testing.B) {
defer func() { testutil.Ok(tb, bkt.Close()) }()

id := uploadTestBlock(tb, tmpDir, bkt, 50e5)
r, err := indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, id, DefaultPostingOffsetInMemorySampling)
r, err := indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, id, DefaultPostingOffsetInMemorySampling, indexheader.NewBinaryReaderMetrics(nil))
testutil.Ok(tb, err)

benchmarkExpandedPostings(tb, bkt, id, r, 50e5)
Expand Down Expand Up @@ -1255,7 +1255,7 @@ func TestExpandedPostingsEmptyPostings(t *testing.T) {

id := uploadTestBlock(t, tmpDir, bkt, 100)

r, err := indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, id, DefaultPostingOffsetInMemorySampling)
r, err := indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, id, DefaultPostingOffsetInMemorySampling, indexheader.NewBinaryReaderMetrics(nil))
testutil.Ok(t, err)
b := &bucketBlock{
logger: log.NewNopLogger(),
Expand Down Expand Up @@ -1599,7 +1599,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
estimatedMaxSeriesSize: EstimatedMaxSeriesSize,
estimatedMaxChunkSize: EstimatedMaxChunkSize,
}
b1.indexHeaderReader, err = indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, b1.meta.ULID, DefaultPostingOffsetInMemorySampling)
b1.indexHeaderReader, err = indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, b1.meta.ULID, DefaultPostingOffsetInMemorySampling, indexheader.NewBinaryReaderMetrics(nil))
testutil.Ok(t, err)
}

Expand Down Expand Up @@ -1640,7 +1640,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
estimatedMaxSeriesSize: EstimatedMaxSeriesSize,
estimatedMaxChunkSize: EstimatedMaxChunkSize,
}
b2.indexHeaderReader, err = indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, b2.meta.ULID, DefaultPostingOffsetInMemorySampling)
b2.indexHeaderReader, err = indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, b2.meta.ULID, DefaultPostingOffsetInMemorySampling, indexheader.NewBinaryReaderMetrics(nil))
testutil.Ok(t, err)
}

Expand Down Expand Up @@ -2705,7 +2705,7 @@ func prepareBucket(b *testing.B, resolutionLevel compact.ResolutionLevel) (*buck
partitioner := NewGapBasedPartitioner(PartitionerMaxGapSize)

// Create an index header reader.
indexHeaderReader, err := indexheader.NewBinaryReader(ctx, logger, bkt, tmpDir, blockMeta.ULID, DefaultPostingOffsetInMemorySampling)
indexHeaderReader, err := indexheader.NewBinaryReader(ctx, logger, bkt, tmpDir, blockMeta.ULID, DefaultPostingOffsetInMemorySampling, indexheader.NewBinaryReaderMetrics(nil))
testutil.Ok(b, err)
indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, nil, storecache.DefaultInMemoryIndexCacheConfig)
testutil.Ok(b, err)
Expand Down Expand Up @@ -3340,7 +3340,7 @@ func TestExpandedPostingsRace(t *testing.T) {
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(context.Background(), log.NewLogfmtLogger(os.Stderr), bkt, filepath.Join(tmpDir, ul.String()), metadata.NoneFunc))

r, err := indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, ul, DefaultPostingOffsetInMemorySampling)
r, err := indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, ul, DefaultPostingOffsetInMemorySampling, indexheader.NewBinaryReaderMetrics(nil))
testutil.Ok(t, err)

blk, err := newBucketBlock(
Expand Down

0 comments on commit 1f4a2aa

Please sign in to comment.