From efe96dda78b133e69c4e7d29e7f0ba6635d2757c Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 4 Dec 2023 08:59:59 +0000 Subject: [PATCH 1/5] lazy downloaded index header Signed-off-by: Ben Ye --- pkg/block/indexheader/header_test.go | 2 +- pkg/block/indexheader/lazy_binary_reader.go | 6 +++++- .../indexheader/lazy_binary_reader_test.go | 12 ++++++------ pkg/block/indexheader/reader_pool.go | 17 ++++++++++++++--- pkg/block/indexheader/reader_pool_test.go | 13 +++++++++---- pkg/store/bucket.go | 12 +++++++++++- pkg/store/bucket_test.go | 2 +- 7 files changed, 47 insertions(+), 17 deletions(-) diff --git a/pkg/block/indexheader/header_test.go b/pkg/block/indexheader/header_test.go index 56dabc33f7..4130157a96 100644 --- a/pkg/block/indexheader/header_test.go +++ b/pkg/block/indexheader/header_test.go @@ -206,7 +206,7 @@ func TestReaders(t *testing.T) { _, err := WriteBinary(ctx, bkt, id, fn) testutil.Ok(t, err) - br, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), nil, tmpDir, id, 3, NewLazyBinaryReaderMetrics(nil), NewBinaryReaderMetrics(nil), nil) + br, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), nil, tmpDir, id, 3, NewLazyBinaryReaderMetrics(nil), NewBinaryReaderMetrics(nil), nil, false) testutil.Ok(t, err) defer func() { testutil.Ok(t, br.Close()) }() diff --git a/pkg/block/indexheader/lazy_binary_reader.go b/pkg/block/indexheader/lazy_binary_reader.go index d7e589c724..515a8867a7 100644 --- a/pkg/block/indexheader/lazy_binary_reader.go +++ b/pkg/block/indexheader/lazy_binary_reader.go @@ -83,6 +83,8 @@ type LazyBinaryReader struct { // Keep track of the last time it was used. usedAt *atomic.Int64 + + lazyDownload bool } // NewLazyBinaryReader makes a new LazyBinaryReader. If the index-header does not exist @@ -99,8 +101,9 @@ func NewLazyBinaryReader( metrics *LazyBinaryReaderMetrics, binaryReaderMetrics *BinaryReaderMetrics, onClosed func(*LazyBinaryReader), + lazyDownload bool, ) (*LazyBinaryReader, error) { - if dir != "" { + if dir != "" && !lazyDownload { indexHeaderFile := filepath.Join(dir, id.String(), block.IndexHeaderFilename) // If the index-header doesn't exist we should download it. if _, err := os.Stat(indexHeaderFile); err != nil { @@ -131,6 +134,7 @@ func NewLazyBinaryReader( binaryReaderMetrics: binaryReaderMetrics, usedAt: atomic.NewInt64(time.Now().UnixNano()), onClosed: onClosed, + lazyDownload: lazyDownload, }, nil } diff --git a/pkg/block/indexheader/lazy_binary_reader_test.go b/pkg/block/indexheader/lazy_binary_reader_test.go index 150f2d649b..b2e87ac671 100644 --- a/pkg/block/indexheader/lazy_binary_reader_test.go +++ b/pkg/block/indexheader/lazy_binary_reader_test.go @@ -31,7 +31,7 @@ func TestNewLazyBinaryReader_ShouldFailIfUnableToBuildIndexHeader(t *testing.T) bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) testutil.Ok(t, err) defer func() { testutil.Ok(t, bkt.Close()) }() - _, err = NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, ulid.MustNew(0, nil), 3, NewLazyBinaryReaderMetrics(nil), NewBinaryReaderMetrics(nil), nil) + _, err = NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, ulid.MustNew(0, nil), 3, NewLazyBinaryReaderMetrics(nil), NewBinaryReaderMetrics(nil), nil, false) testutil.NotOk(t, err) } @@ -54,7 +54,7 @@ func TestNewLazyBinaryReader_ShouldBuildIndexHeaderFromBucket(t *testing.T) { m := NewLazyBinaryReaderMetrics(nil) bm := NewBinaryReaderMetrics(nil) - r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil, false) testutil.Ok(t, err) testutil.Assert(t, r.reader == nil) testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadCount)) @@ -98,7 +98,7 @@ func TestNewLazyBinaryReader_ShouldRebuildCorruptedIndexHeader(t *testing.T) { m := NewLazyBinaryReaderMetrics(nil) bm := NewBinaryReaderMetrics(nil) - r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil, false) testutil.Ok(t, err) testutil.Assert(t, r.reader == nil) testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadCount)) @@ -133,7 +133,7 @@ func TestLazyBinaryReader_ShouldReopenOnUsageAfterClose(t *testing.T) { m := NewLazyBinaryReaderMetrics(nil) bm := NewBinaryReaderMetrics(nil) - r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil, false) testutil.Ok(t, err) testutil.Assert(t, r.reader == nil) @@ -184,7 +184,7 @@ func TestLazyBinaryReader_unload_ShouldReturnErrorIfNotIdle(t *testing.T) { m := NewLazyBinaryReaderMetrics(nil) bm := NewBinaryReaderMetrics(nil) - r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil, false) testutil.Ok(t, err) testutil.Assert(t, r.reader == nil) @@ -234,7 +234,7 @@ func TestLazyBinaryReader_LoadUnloadRaceCondition(t *testing.T) { m := NewLazyBinaryReaderMetrics(nil) bm := NewBinaryReaderMetrics(nil) - r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil, false) testutil.Ok(t, err) testutil.Assert(t, r.reader == nil) t.Cleanup(func() { diff --git a/pkg/block/indexheader/reader_pool.go b/pkg/block/indexheader/reader_pool.go index fc8cb26813..4d9bc26846 100644 --- a/pkg/block/indexheader/reader_pool.go +++ b/pkg/block/indexheader/reader_pool.go @@ -14,6 +14,8 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/objstore" + + "github.com/thanos-io/thanos/pkg/block/metadata" ) // ReaderPoolMetrics holds metrics tracked by ReaderPool. @@ -46,10 +48,18 @@ type ReaderPool struct { // Keep track of all readers managed by the pool. lazyReadersMx sync.Mutex lazyReaders map[*LazyBinaryReader]struct{} + + lazyDownloadFunc LazyDownloadIndexHeaderFunc +} + +type LazyDownloadIndexHeaderFunc func(meta *metadata.Meta) bool + +func DisableLazyDownloadIndexHeader(meta *metadata.Meta) bool { + return false } // NewReaderPool makes a new ReaderPool. -func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTimeout time.Duration, metrics *ReaderPoolMetrics) *ReaderPool { +func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTimeout time.Duration, metrics *ReaderPoolMetrics, lazyDownloadFunc LazyDownloadIndexHeaderFunc) *ReaderPool { p := &ReaderPool{ logger: logger, metrics: metrics, @@ -57,6 +67,7 @@ func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTime lazyReaderIdleTimeout: lazyReaderIdleTimeout, lazyReaders: make(map[*LazyBinaryReader]struct{}), close: make(chan struct{}), + lazyDownloadFunc: lazyDownloadFunc, } // Start a goroutine to close idle readers (only if required). @@ -81,12 +92,12 @@ func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTime // NewBinaryReader creates and returns a new binary reader. If the pool has been configured // with lazy reader enabled, this function will return a lazy reader. The returned lazy reader // is tracked by the pool and automatically closed once the idle timeout expires. -func (p *ReaderPool) NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID, postingOffsetsInMemSampling int) (Reader, error) { +func (p *ReaderPool) NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID, postingOffsetsInMemSampling int, meta *metadata.Meta) (Reader, error) { var reader Reader var err error if p.lazyReaderEnabled { - reader, err = NewLazyBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, p.metrics.lazyReader, p.metrics.binaryReader, p.onLazyReaderClosed) + reader, err = NewLazyBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, p.metrics.lazyReader, p.metrics.binaryReader, p.onLazyReaderClosed, p.lazyDownloadFunc(meta)) } else { reader, err = NewBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, p.metrics.binaryReader) } diff --git a/pkg/block/indexheader/reader_pool_test.go b/pkg/block/indexheader/reader_pool_test.go index 4ed60ea8fb..c5f9f6dfad 100644 --- a/pkg/block/indexheader/reader_pool_test.go +++ b/pkg/block/indexheader/reader_pool_test.go @@ -54,12 +54,15 @@ func TestReaderPool_NewBinaryReader(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) + meta, err := metadata.ReadFromDir(filepath.Join(tmpDir, blockID.String())) + testutil.Ok(t, err) + for testName, testData := range tests { t.Run(testName, func(t *testing.T) { - pool := NewReaderPool(log.NewNopLogger(), testData.lazyReaderEnabled, testData.lazyReaderIdleTimeout, NewReaderPoolMetrics(nil)) + pool := NewReaderPool(log.NewNopLogger(), testData.lazyReaderEnabled, testData.lazyReaderIdleTimeout, NewReaderPoolMetrics(nil), DisableLazyDownloadIndexHeader) defer pool.Close() - r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3) + r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, meta) testutil.Ok(t, err) defer func() { testutil.Ok(t, r.Close()) }() @@ -89,12 +92,14 @@ func TestReaderPool_ShouldCloseIdleLazyReaders(t *testing.T) { }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124, metadata.NoneFunc) testutil.Ok(t, err) testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) + meta, err := metadata.ReadFromDir(filepath.Join(tmpDir, blockID.String())) + testutil.Ok(t, err) metrics := NewReaderPoolMetrics(nil) - pool := NewReaderPool(log.NewNopLogger(), true, idleTimeout, metrics) + pool := NewReaderPool(log.NewNopLogger(), true, idleTimeout, metrics, DisableLazyDownloadIndexHeader) defer pool.Close() - r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3) + r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, meta) testutil.Ok(t, err) defer func() { testutil.Ok(t, r.Close()) }() diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 3ebd6f06a4..f3a3be8246 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -413,6 +413,8 @@ type BucketStore struct { blockEstimatedMaxSeriesFunc BlockEstimator blockEstimatedMaxChunkFunc BlockEstimator + + lazyDownloadIndexHeaderFunc indexheader.LazyDownloadIndexHeaderFunc } func (s *BucketStore) validate() error { @@ -531,6 +533,12 @@ func WithDontResort(true bool) BucketStoreOption { } } +func WithLazyDownloadIndexHeaderFunc(f indexheader.LazyDownloadIndexHeaderFunc) BucketStoreOption { + return func(s *BucketStore) { + s.lazyDownloadIndexHeaderFunc = f + } +} + // NewBucketStore creates a new bucket backed store that implements the store API against // an object store bucket. It is optimized to work against high latency backends. func NewBucketStore( @@ -574,6 +582,7 @@ func NewBucketStore( enableChunkHashCalculation: enableChunkHashCalculation, seriesBatchSize: SeriesBatchSize, sortingStrategy: sortingStrategyStore, + lazyDownloadIndexHeaderFunc: indexheader.DisableLazyDownloadIndexHeader, } for _, option := range options { @@ -582,7 +591,7 @@ func NewBucketStore( // Depend on the options indexReaderPoolMetrics := indexheader.NewReaderPoolMetrics(extprom.WrapRegistererWithPrefix("thanos_bucket_store_", s.reg)) - s.indexReaderPool = indexheader.NewReaderPool(s.logger, lazyIndexReaderEnabled, lazyIndexReaderIdleTimeout, indexReaderPoolMetrics) + s.indexReaderPool = indexheader.NewReaderPool(s.logger, lazyIndexReaderEnabled, lazyIndexReaderIdleTimeout, indexReaderPoolMetrics, s.lazyDownloadIndexHeaderFunc) s.metrics = newBucketStoreMetrics(s.reg) // TODO(metalmatze): Might be possible via Option too if err := s.validate(); err != nil { @@ -759,6 +768,7 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er s.dir, meta.ULID, s.postingOffsetsInMemSampling, + meta, ) if err != nil { return errors.Wrap(err, "create index header reader") diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 87659f5450..c5a9247054 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1658,7 +1658,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { bkt: objstore.WithNoopInstr(bkt), logger: logger, indexCache: indexCache, - indexReaderPool: indexheader.NewReaderPool(log.NewNopLogger(), false, 0, indexheader.NewReaderPoolMetrics(nil)), + indexReaderPool: indexheader.NewReaderPool(log.NewNopLogger(), false, 0, indexheader.NewReaderPoolMetrics(nil), indexheader.DisableLazyDownloadIndexHeader), metrics: newBucketStoreMetrics(nil), blockSets: map[uint64]*bucketBlockSet{ labels.Labels{{Name: "ext1", Value: "1"}}.Hash(): {blocks: [][]*bucketBlock{{b1, b2}}}, From 220293785579436479313a4c76a47e900c1f1ecc Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Fri, 15 Dec 2023 23:37:21 +0000 Subject: [PATCH 2/5] update tests Signed-off-by: Ben Ye --- pkg/block/indexheader/lazy_binary_reader.go | 1 + .../indexheader/lazy_binary_reader_test.go | 301 ++++++++++-------- pkg/block/indexheader/reader_pool.go | 3 + pkg/store/bucket.go | 2 + 4 files changed, 172 insertions(+), 135 deletions(-) diff --git a/pkg/block/indexheader/lazy_binary_reader.go b/pkg/block/indexheader/lazy_binary_reader.go index 515a8867a7..2b36bf8025 100644 --- a/pkg/block/indexheader/lazy_binary_reader.go +++ b/pkg/block/indexheader/lazy_binary_reader.go @@ -84,6 +84,7 @@ type LazyBinaryReader struct { // Keep track of the last time it was used. usedAt *atomic.Int64 + // If true, index header will be downloaded at query time rather than initialization time. lazyDownload bool } diff --git a/pkg/block/indexheader/lazy_binary_reader_test.go b/pkg/block/indexheader/lazy_binary_reader_test.go index b2e87ac671..5ba89c2934 100644 --- a/pkg/block/indexheader/lazy_binary_reader_test.go +++ b/pkg/block/indexheader/lazy_binary_reader_test.go @@ -35,6 +35,18 @@ func TestNewLazyBinaryReader_ShouldFailIfUnableToBuildIndexHeader(t *testing.T) testutil.NotOk(t, err) } +func TestNewLazyBinaryReader_ShouldNotFailIfUnableToBuildIndexHeaderWhenLazyDownload(t *testing.T) { + ctx := context.Background() + + tmpDir := t.TempDir() + + bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, bkt.Close()) }() + _, err = NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, ulid.MustNew(0, nil), 3, NewLazyBinaryReaderMetrics(nil), NewBinaryReaderMetrics(nil), nil, true) + testutil.Ok(t, err) +} + func TestNewLazyBinaryReader_ShouldBuildIndexHeaderFromBucket(t *testing.T) { ctx := context.Background() @@ -52,27 +64,38 @@ func TestNewLazyBinaryReader_ShouldBuildIndexHeaderFromBucket(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) - m := NewLazyBinaryReaderMetrics(nil) - bm := NewBinaryReaderMetrics(nil) - r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil, false) - testutil.Ok(t, err) - testutil.Assert(t, r.reader == nil) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) - - // Should lazy load the index upon first usage. - v, err := r.IndexVersion() - testutil.Ok(t, err) - testutil.Equals(t, 2, v) - testutil.Assert(t, r.reader != nil) - testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) - - labelNames, err := r.LabelNames() - testutil.Ok(t, err) - testutil.Equals(t, []string{"a"}, labelNames) - testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + for _, lazyDownload := range []bool{true} { + m := NewLazyBinaryReaderMetrics(nil) + bm := NewBinaryReaderMetrics(nil) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil, lazyDownload) + testutil.Ok(t, err) + testutil.Assert(t, r.reader == nil) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + + _, err = os.Stat(filepath.Join(r.dir, blockID.String(), block.IndexHeaderFilename)) + // Index file shouldn't exist. + if lazyDownload { + testutil.Equals(t, true, os.IsNotExist(err)) + } + // Should lazy load the index upon first usage. + v, err := r.IndexVersion() + testutil.Ok(t, err) + if lazyDownload { + _, err = os.Stat(filepath.Join(r.dir, blockID.String(), block.IndexHeaderFilename)) + testutil.Ok(t, err) + } + testutil.Equals(t, 2, v) + testutil.Assert(t, r.reader != nil) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + + labelNames, err := r.LabelNames() + testutil.Ok(t, err) + testutil.Equals(t, []string{"a"}, labelNames) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + } } func TestNewLazyBinaryReader_ShouldRebuildCorruptedIndexHeader(t *testing.T) { @@ -96,22 +119,24 @@ func TestNewLazyBinaryReader_ShouldRebuildCorruptedIndexHeader(t *testing.T) { headerFilename := filepath.Join(tmpDir, blockID.String(), block.IndexHeaderFilename) testutil.Ok(t, os.WriteFile(headerFilename, []byte("xxx"), os.ModePerm)) - m := NewLazyBinaryReaderMetrics(nil) - bm := NewBinaryReaderMetrics(nil) - r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil, false) - testutil.Ok(t, err) - testutil.Assert(t, r.reader == nil) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) - - // Ensure it can read data. - labelNames, err := r.LabelNames() - testutil.Ok(t, err) - testutil.Equals(t, []string{"a"}, labelNames) - testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + for _, lazyDownload := range []bool{false, true} { + m := NewLazyBinaryReaderMetrics(nil) + bm := NewBinaryReaderMetrics(nil) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil, lazyDownload) + testutil.Ok(t, err) + testutil.Assert(t, r.reader == nil) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + + // Ensure it can read data. + labelNames, err := r.LabelNames() + testutil.Ok(t, err) + testutil.Equals(t, []string{"a"}, labelNames) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + } } func TestLazyBinaryReader_ShouldReopenOnUsageAfterClose(t *testing.T) { @@ -131,37 +156,39 @@ func TestLazyBinaryReader_ShouldReopenOnUsageAfterClose(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) - m := NewLazyBinaryReaderMetrics(nil) - bm := NewBinaryReaderMetrics(nil) - r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil, false) - testutil.Ok(t, err) - testutil.Assert(t, r.reader == nil) - - // Should lazy load the index upon first usage. - labelNames, err := r.LabelNames() - testutil.Ok(t, err) - testutil.Equals(t, []string{"a"}, labelNames) - testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) - - // Close it. - testutil.Ok(t, r.Close()) - testutil.Assert(t, r.reader == nil) - testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.unloadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) - - // Should lazy load again upon next usage. - labelNames, err = r.LabelNames() - testutil.Ok(t, err) - testutil.Equals(t, []string{"a"}, labelNames) - testutil.Equals(t, float64(2), promtestutil.ToFloat64(m.loadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) - - // Closing an already closed lazy reader should be a no-op. - for i := 0; i < 2; i++ { + for _, lazyDownload := range []bool{false, true} { + m := NewLazyBinaryReaderMetrics(nil) + bm := NewBinaryReaderMetrics(nil) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil, lazyDownload) + testutil.Ok(t, err) + testutil.Assert(t, r.reader == nil) + + // Should lazy load the index upon first usage. + labelNames, err := r.LabelNames() + testutil.Ok(t, err) + testutil.Equals(t, []string{"a"}, labelNames) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) + + // Close it. testutil.Ok(t, r.Close()) - testutil.Equals(t, float64(2), promtestutil.ToFloat64(m.unloadCount)) + testutil.Assert(t, r.reader == nil) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.unloadCount)) testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) + + // Should lazy load again upon next usage. + labelNames, err = r.LabelNames() + testutil.Ok(t, err) + testutil.Equals(t, []string{"a"}, labelNames) + testutil.Equals(t, float64(2), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) + + // Closing an already closed lazy reader should be a no-op. + for i := 0; i < 2; i++ { + testutil.Ok(t, r.Close()) + testutil.Equals(t, float64(2), promtestutil.ToFloat64(m.unloadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) + } } } @@ -182,34 +209,36 @@ func TestLazyBinaryReader_unload_ShouldReturnErrorIfNotIdle(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) - m := NewLazyBinaryReaderMetrics(nil) - bm := NewBinaryReaderMetrics(nil) - r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil, false) - testutil.Ok(t, err) - testutil.Assert(t, r.reader == nil) + for _, lazyDownload := range []bool{false, true} { + m := NewLazyBinaryReaderMetrics(nil) + bm := NewBinaryReaderMetrics(nil) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil, lazyDownload) + testutil.Ok(t, err) + testutil.Assert(t, r.reader == nil) + + // Should lazy load the index upon first usage. + labelNames, err := r.LabelNames() + testutil.Ok(t, err) + testutil.Equals(t, []string{"a"}, labelNames) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) - // Should lazy load the index upon first usage. - labelNames, err := r.LabelNames() - testutil.Ok(t, err) - testutil.Equals(t, []string{"a"}, labelNames) - testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) - - // Try to unload but not idle since enough time. - testutil.Equals(t, errNotIdle, r.unloadIfIdleSince(time.Now().Add(-time.Minute).UnixNano())) - testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) - - // Try to unload and idle since enough time. - testutil.Ok(t, r.unloadIfIdleSince(time.Now().UnixNano())) - testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) - testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.unloadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) + // Try to unload but not idle since enough time. + testutil.Equals(t, errNotIdle, r.unloadIfIdleSince(time.Now().Add(-time.Minute).UnixNano())) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) + + // Try to unload and idle since enough time. + testutil.Ok(t, r.unloadIfIdleSince(time.Now().UnixNano())) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.unloadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) + } } func TestLazyBinaryReader_LoadUnloadRaceCondition(t *testing.T) { @@ -232,49 +261,51 @@ func TestLazyBinaryReader_LoadUnloadRaceCondition(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) - m := NewLazyBinaryReaderMetrics(nil) - bm := NewBinaryReaderMetrics(nil) - r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil, false) - testutil.Ok(t, err) - testutil.Assert(t, r.reader == nil) - t.Cleanup(func() { - testutil.Ok(t, r.Close()) - }) - - done := make(chan struct{}) - time.AfterFunc(runDuration, func() { close(done) }) - wg := sync.WaitGroup{} - wg.Add(2) - - // Start a goroutine which continuously try to unload the reader. - go func() { - defer wg.Done() - - for { - select { - case <-done: - return - default: - testutil.Ok(t, r.unloadIfIdleSince(0)) + for _, lazyDownload := range []bool{false, true} { + m := NewLazyBinaryReaderMetrics(nil) + bm := NewBinaryReaderMetrics(nil) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil, lazyDownload) + testutil.Ok(t, err) + testutil.Assert(t, r.reader == nil) + t.Cleanup(func() { + testutil.Ok(t, r.Close()) + }) + + done := make(chan struct{}) + time.AfterFunc(runDuration, func() { close(done) }) + wg := sync.WaitGroup{} + wg.Add(2) + + // Start a goroutine which continuously try to unload the reader. + go func() { + defer wg.Done() + + for { + select { + case <-done: + return + default: + testutil.Ok(t, r.unloadIfIdleSince(0)) + } } - } - }() - - // Try to read multiple times, while the other goroutine continuously try to unload it. - go func() { - defer wg.Done() - - for { - select { - case <-done: - return - default: - _, err := r.PostingsOffset("a", "1") - testutil.Assert(t, err == nil || err == errUnloadedWhileLoading) + }() + + // Try to read multiple times, while the other goroutine continuously try to unload it. + go func() { + defer wg.Done() + + for { + select { + case <-done: + return + default: + _, err := r.PostingsOffset("a", "1") + testutil.Assert(t, err == nil || err == errUnloadedWhileLoading) + } } - } - }() + }() - // Wait until both goroutines have done. - wg.Wait() + // Wait until both goroutines have done. + wg.Wait() + } } diff --git a/pkg/block/indexheader/reader_pool.go b/pkg/block/indexheader/reader_pool.go index 4d9bc26846..f5b905ce97 100644 --- a/pkg/block/indexheader/reader_pool.go +++ b/pkg/block/indexheader/reader_pool.go @@ -52,8 +52,11 @@ type ReaderPool struct { lazyDownloadFunc LazyDownloadIndexHeaderFunc } +// LazyDownloadIndexHeaderFunc is used to determinte whether to download the index header lazily +// or not by checking its block metadata. Usecase can be by time or by index file size. type LazyDownloadIndexHeaderFunc func(meta *metadata.Meta) bool +// DisableLazyDownloadIndexHeader disables lazy downloaded index header. func DisableLazyDownloadIndexHeader(meta *metadata.Meta) bool { return false } diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index f3a3be8246..3a68dceedf 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -533,6 +533,8 @@ func WithDontResort(true bool) BucketStoreOption { } } +// WithLazyDownloadIndexHeaderFunc specifies what block to lazy download its index header. +// Only used when lazy mmap is enabled at the same time. func WithLazyDownloadIndexHeaderFunc(f indexheader.LazyDownloadIndexHeaderFunc) BucketStoreOption { return func(s *BucketStore) { s.lazyDownloadIndexHeaderFunc = f From 0b62099a5a6fb68493db567a521a17c1dd35cbc0 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Sat, 16 Dec 2023 19:30:55 +0000 Subject: [PATCH 3/5] address comments Signed-off-by: Ben Ye --- .../indexheader/lazy_binary_reader_test.go | 315 +++++++++--------- 1 file changed, 163 insertions(+), 152 deletions(-) diff --git a/pkg/block/indexheader/lazy_binary_reader_test.go b/pkg/block/indexheader/lazy_binary_reader_test.go index 5ba89c2934..d740da99ab 100644 --- a/pkg/block/indexheader/lazy_binary_reader_test.go +++ b/pkg/block/indexheader/lazy_binary_reader_test.go @@ -5,6 +5,7 @@ package indexheader import ( "context" + "fmt" "os" "path/filepath" "sync" @@ -56,45 +57,47 @@ func TestNewLazyBinaryReader_ShouldBuildIndexHeaderFromBucket(t *testing.T) { testutil.Ok(t, err) defer func() { testutil.Ok(t, bkt.Close()) }() - // Create block. - blockID, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{ - {{Name: "a", Value: "1"}}, - {{Name: "a", Value: "2"}}, - }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124, metadata.NoneFunc) - testutil.Ok(t, err) - testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) + for _, lazyDownload := range []bool{false, true} { + t.Run(fmt.Sprintf("lazyDownload=%v", lazyDownload), func(t *testing.T) { + // Create block. + blockID, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{ + {{Name: "a", Value: "1"}}, + {{Name: "a", Value: "2"}}, + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124, metadata.NoneFunc) + testutil.Ok(t, err) + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) + + m := NewLazyBinaryReaderMetrics(nil) + bm := NewBinaryReaderMetrics(nil) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil, lazyDownload) + testutil.Ok(t, err) + testutil.Assert(t, r.reader == nil) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) - for _, lazyDownload := range []bool{true} { - m := NewLazyBinaryReaderMetrics(nil) - bm := NewBinaryReaderMetrics(nil) - r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil, lazyDownload) - testutil.Ok(t, err) - testutil.Assert(t, r.reader == nil) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) - - _, err = os.Stat(filepath.Join(r.dir, blockID.String(), block.IndexHeaderFilename)) - // Index file shouldn't exist. - if lazyDownload { - testutil.Equals(t, true, os.IsNotExist(err)) - } - // Should lazy load the index upon first usage. - v, err := r.IndexVersion() - testutil.Ok(t, err) - if lazyDownload { _, err = os.Stat(filepath.Join(r.dir, blockID.String(), block.IndexHeaderFilename)) + // Index file shouldn't exist. + if lazyDownload { + testutil.Equals(t, true, os.IsNotExist(err)) + } + // Should lazy load the index upon first usage. + v, err := r.IndexVersion() + testutil.Ok(t, err) + if lazyDownload { + _, err = os.Stat(filepath.Join(r.dir, blockID.String(), block.IndexHeaderFilename)) + testutil.Ok(t, err) + } + testutil.Equals(t, 2, v) + testutil.Assert(t, r.reader != nil) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + + labelNames, err := r.LabelNames() testutil.Ok(t, err) - } - testutil.Equals(t, 2, v) - testutil.Assert(t, r.reader != nil) - testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) - - labelNames, err := r.LabelNames() - testutil.Ok(t, err) - testutil.Equals(t, []string{"a"}, labelNames) - testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + testutil.Equals(t, []string{"a"}, labelNames) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + }) } } @@ -120,22 +123,24 @@ func TestNewLazyBinaryReader_ShouldRebuildCorruptedIndexHeader(t *testing.T) { testutil.Ok(t, os.WriteFile(headerFilename, []byte("xxx"), os.ModePerm)) for _, lazyDownload := range []bool{false, true} { - m := NewLazyBinaryReaderMetrics(nil) - bm := NewBinaryReaderMetrics(nil) - r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil, lazyDownload) - testutil.Ok(t, err) - testutil.Assert(t, r.reader == nil) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) - - // Ensure it can read data. - labelNames, err := r.LabelNames() - testutil.Ok(t, err) - testutil.Equals(t, []string{"a"}, labelNames) - testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + t.Run(fmt.Sprintf("lazyDownload=%v", lazyDownload), func(t *testing.T) { + m := NewLazyBinaryReaderMetrics(nil) + bm := NewBinaryReaderMetrics(nil) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil, lazyDownload) + testutil.Ok(t, err) + testutil.Assert(t, r.reader == nil) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + + // Ensure it can read data. + labelNames, err := r.LabelNames() + testutil.Ok(t, err) + testutil.Equals(t, []string{"a"}, labelNames) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + }) } } @@ -157,38 +162,40 @@ func TestLazyBinaryReader_ShouldReopenOnUsageAfterClose(t *testing.T) { testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) for _, lazyDownload := range []bool{false, true} { - m := NewLazyBinaryReaderMetrics(nil) - bm := NewBinaryReaderMetrics(nil) - r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil, lazyDownload) - testutil.Ok(t, err) - testutil.Assert(t, r.reader == nil) - - // Should lazy load the index upon first usage. - labelNames, err := r.LabelNames() - testutil.Ok(t, err) - testutil.Equals(t, []string{"a"}, labelNames) - testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) - - // Close it. - testutil.Ok(t, r.Close()) - testutil.Assert(t, r.reader == nil) - testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.unloadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) - - // Should lazy load again upon next usage. - labelNames, err = r.LabelNames() - testutil.Ok(t, err) - testutil.Equals(t, []string{"a"}, labelNames) - testutil.Equals(t, float64(2), promtestutil.ToFloat64(m.loadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) - - // Closing an already closed lazy reader should be a no-op. - for i := 0; i < 2; i++ { + t.Run(fmt.Sprintf("lazyDownload=%v", lazyDownload), func(t *testing.T) { + m := NewLazyBinaryReaderMetrics(nil) + bm := NewBinaryReaderMetrics(nil) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil, lazyDownload) + testutil.Ok(t, err) + testutil.Assert(t, r.reader == nil) + + // Should lazy load the index upon first usage. + labelNames, err := r.LabelNames() + testutil.Ok(t, err) + testutil.Equals(t, []string{"a"}, labelNames) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) + + // Close it. testutil.Ok(t, r.Close()) - testutil.Equals(t, float64(2), promtestutil.ToFloat64(m.unloadCount)) + testutil.Assert(t, r.reader == nil) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.unloadCount)) testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) - } + + // Should lazy load again upon next usage. + labelNames, err = r.LabelNames() + testutil.Ok(t, err) + testutil.Equals(t, []string{"a"}, labelNames) + testutil.Equals(t, float64(2), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) + + // Closing an already closed lazy reader should be a no-op. + for i := 0; i < 2; i++ { + testutil.Ok(t, r.Close()) + testutil.Equals(t, float64(2), promtestutil.ToFloat64(m.unloadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) + } + }) } } @@ -210,34 +217,36 @@ func TestLazyBinaryReader_unload_ShouldReturnErrorIfNotIdle(t *testing.T) { testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) for _, lazyDownload := range []bool{false, true} { - m := NewLazyBinaryReaderMetrics(nil) - bm := NewBinaryReaderMetrics(nil) - r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil, lazyDownload) - testutil.Ok(t, err) - testutil.Assert(t, r.reader == nil) - - // Should lazy load the index upon first usage. - labelNames, err := r.LabelNames() - testutil.Ok(t, err) - testutil.Equals(t, []string{"a"}, labelNames) - testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) - - // Try to unload but not idle since enough time. - testutil.Equals(t, errNotIdle, r.unloadIfIdleSince(time.Now().Add(-time.Minute).UnixNano())) - testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) - - // Try to unload and idle since enough time. - testutil.Ok(t, r.unloadIfIdleSince(time.Now().UnixNano())) - testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) - testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.unloadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) + t.Run(fmt.Sprintf("lazyDownload=%v", lazyDownload), func(t *testing.T) { + m := NewLazyBinaryReaderMetrics(nil) + bm := NewBinaryReaderMetrics(nil) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil, lazyDownload) + testutil.Ok(t, err) + testutil.Assert(t, r.reader == nil) + + // Should lazy load the index upon first usage. + labelNames, err := r.LabelNames() + testutil.Ok(t, err) + testutil.Equals(t, []string{"a"}, labelNames) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) + + // Try to unload but not idle since enough time. + testutil.Equals(t, errNotIdle, r.unloadIfIdleSince(time.Now().Add(-time.Minute).UnixNano())) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) + + // Try to unload and idle since enough time. + testutil.Ok(t, r.unloadIfIdleSince(time.Now().UnixNano())) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.unloadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) + }) } } @@ -262,50 +271,52 @@ func TestLazyBinaryReader_LoadUnloadRaceCondition(t *testing.T) { testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) for _, lazyDownload := range []bool{false, true} { - m := NewLazyBinaryReaderMetrics(nil) - bm := NewBinaryReaderMetrics(nil) - r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil, lazyDownload) - testutil.Ok(t, err) - testutil.Assert(t, r.reader == nil) - t.Cleanup(func() { - testutil.Ok(t, r.Close()) - }) - - done := make(chan struct{}) - time.AfterFunc(runDuration, func() { close(done) }) - wg := sync.WaitGroup{} - wg.Add(2) - - // Start a goroutine which continuously try to unload the reader. - go func() { - defer wg.Done() - - for { - select { - case <-done: - return - default: - testutil.Ok(t, r.unloadIfIdleSince(0)) + t.Run(fmt.Sprintf("lazyDownload=%v", lazyDownload), func(t *testing.T) { + m := NewLazyBinaryReaderMetrics(nil) + bm := NewBinaryReaderMetrics(nil) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil, lazyDownload) + testutil.Ok(t, err) + testutil.Assert(t, r.reader == nil) + t.Cleanup(func() { + testutil.Ok(t, r.Close()) + }) + + done := make(chan struct{}) + time.AfterFunc(runDuration, func() { close(done) }) + wg := sync.WaitGroup{} + wg.Add(2) + + // Start a goroutine which continuously try to unload the reader. + go func() { + defer wg.Done() + + for { + select { + case <-done: + return + default: + testutil.Ok(t, r.unloadIfIdleSince(0)) + } } - } - }() - - // Try to read multiple times, while the other goroutine continuously try to unload it. - go func() { - defer wg.Done() - - for { - select { - case <-done: - return - default: - _, err := r.PostingsOffset("a", "1") - testutil.Assert(t, err == nil || err == errUnloadedWhileLoading) + }() + + // Try to read multiple times, while the other goroutine continuously try to unload it. + go func() { + defer wg.Done() + + for { + select { + case <-done: + return + default: + _, err := r.PostingsOffset("a", "1") + testutil.Assert(t, err == nil || err == errUnloadedWhileLoading) + } } - } - }() + }() - // Wait until both goroutines have done. - wg.Wait() + // Wait until both goroutines have done. + wg.Wait() + }) } } From b733ab32bef10b18d038e6109955d75b23a38020 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Wed, 20 Dec 2023 07:54:00 +0000 Subject: [PATCH 4/5] address comments Signed-off-by: Ben Ye --- cmd/thanos/store.go | 10 ++++++ docs/components/store.md | 6 ++++ pkg/block/indexheader/reader_pool.go | 30 ++++++++++++++-- pkg/block/indexheader/reader_pool_test.go | 4 +-- pkg/store/bucket.go | 42 +++++++++++------------ pkg/store/bucket_test.go | 2 +- 6 files changed, 68 insertions(+), 26 deletions(-) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 9191b77ce8..7d80687ec3 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -28,6 +28,7 @@ import ( blocksAPI "github.com/thanos-io/thanos/pkg/api/blocks" "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/indexheader" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/component" hidden "github.com/thanos-io/thanos/pkg/extflag" @@ -89,6 +90,8 @@ type storeConfig struct { lazyIndexReaderEnabled bool lazyIndexReaderIdleTimeout time.Duration lazyExpandedPostingsEnabled bool + + indexHeaderLazyDownloadStrategy string } func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { @@ -186,6 +189,10 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("store.enable-lazy-expanded-postings", "If true, Store Gateway will estimate postings size and try to lazily expand postings if it downloads less data than expanding all postings."). Default("false").BoolVar(&sc.lazyExpandedPostingsEnabled) + cmd.Flag("store.index-header-lazy-download-strategy", "Strategy of how to download index headers lazily. Supported values: eager, lazy. If eager, always download index header during initial load. If lazy, download index header during query time."). + Default(string(indexheader.EagerDownloadStrategy)). + EnumVar(&sc.indexHeaderLazyDownloadStrategy, string(indexheader.EagerDownloadStrategy), string(indexheader.LazyDownloadStrategy)) + cmd.Flag("web.disable", "Disable Block Viewer UI.").Default("false").BoolVar(&sc.disableWeb) cmd.Flag("web.external-prefix", "Static prefix for all HTML links and redirect URLs in the bucket web UI interface. Actual endpoints are still served on / or the web.route-prefix. This allows thanos bucket web UI to be served behind a reverse proxy that strips a URL sub-path."). @@ -388,6 +395,9 @@ func runStore( return conf.estimatedMaxChunkSize }), store.WithLazyExpandedPostings(conf.lazyExpandedPostingsEnabled), + store.WithIndexHeaderLazyDownloadStrategy( + indexheader.IndexHeaderLazyDownloadStrategy(conf.indexHeaderLazyDownloadStrategy).StrategyToDownloadFunc(), + ), } if conf.debugLogging { diff --git a/docs/components/store.md b/docs/components/store.md index 3ba59a2d9e..85fd4ce688 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -193,6 +193,12 @@ Flags: DEPRECATED: use store.limits.request-samples. --store.grpc.touched-series-limit=0 DEPRECATED: use store.limits.request-series. + --store.index-header-lazy-download-strategy=eager + Strategy of how to download index headers + lazily. Supported values: eager, lazy. + If eager, always download index header during + initial load. If lazy, download index header + during query time. --store.limits.request-samples=0 The maximum samples allowed for a single Series request, The Series call fails if diff --git a/pkg/block/indexheader/reader_pool.go b/pkg/block/indexheader/reader_pool.go index f5b905ce97..e9fe5eb7dc 100644 --- a/pkg/block/indexheader/reader_pool.go +++ b/pkg/block/indexheader/reader_pool.go @@ -52,15 +52,41 @@ type ReaderPool struct { lazyDownloadFunc LazyDownloadIndexHeaderFunc } +// IndexHeaderLazyDownloadStrategy specifies how to download index headers +// lazily. Only used when lazy mmap is enabled. +type IndexHeaderLazyDownloadStrategy string + +const ( + // EagerDownloadStrategy always disables lazy downloading index headers. + EagerDownloadStrategy IndexHeaderLazyDownloadStrategy = "eager" + // LazyDownloadStrategy always lazily download index headers. + LazyDownloadStrategy IndexHeaderLazyDownloadStrategy = "lazy" +) + +func (s IndexHeaderLazyDownloadStrategy) StrategyToDownloadFunc() LazyDownloadIndexHeaderFunc { + switch s { + case LazyDownloadStrategy: + return AlwaysLazyDownloadIndexHeader + default: + // Always fallback to eager download index header. + return AlwaysEagerDownloadIndexHeader + } +} + // LazyDownloadIndexHeaderFunc is used to determinte whether to download the index header lazily // or not by checking its block metadata. Usecase can be by time or by index file size. type LazyDownloadIndexHeaderFunc func(meta *metadata.Meta) bool -// DisableLazyDownloadIndexHeader disables lazy downloaded index header. -func DisableLazyDownloadIndexHeader(meta *metadata.Meta) bool { +// AlwaysEagerDownloadIndexHeader always eagerly download index header. +func AlwaysEagerDownloadIndexHeader(meta *metadata.Meta) bool { return false } +// AlwaysLazyDownloadIndexHeader always lazily download index header. +func AlwaysLazyDownloadIndexHeader(meta *metadata.Meta) bool { + return true +} + // NewReaderPool makes a new ReaderPool. func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTimeout time.Duration, metrics *ReaderPoolMetrics, lazyDownloadFunc LazyDownloadIndexHeaderFunc) *ReaderPool { p := &ReaderPool{ diff --git a/pkg/block/indexheader/reader_pool_test.go b/pkg/block/indexheader/reader_pool_test.go index c5f9f6dfad..a7445f0fed 100644 --- a/pkg/block/indexheader/reader_pool_test.go +++ b/pkg/block/indexheader/reader_pool_test.go @@ -59,7 +59,7 @@ func TestReaderPool_NewBinaryReader(t *testing.T) { for testName, testData := range tests { t.Run(testName, func(t *testing.T) { - pool := NewReaderPool(log.NewNopLogger(), testData.lazyReaderEnabled, testData.lazyReaderIdleTimeout, NewReaderPoolMetrics(nil), DisableLazyDownloadIndexHeader) + pool := NewReaderPool(log.NewNopLogger(), testData.lazyReaderEnabled, testData.lazyReaderIdleTimeout, NewReaderPoolMetrics(nil), AlwaysEagerDownloadIndexHeader) defer pool.Close() r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, meta) @@ -96,7 +96,7 @@ func TestReaderPool_ShouldCloseIdleLazyReaders(t *testing.T) { testutil.Ok(t, err) metrics := NewReaderPoolMetrics(nil) - pool := NewReaderPool(log.NewNopLogger(), true, idleTimeout, metrics, DisableLazyDownloadIndexHeader) + pool := NewReaderPool(log.NewNopLogger(), true, idleTimeout, metrics, AlwaysEagerDownloadIndexHeader) defer pool.Close() r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, meta) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 3a68dceedf..fd4fb7392c 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -414,7 +414,7 @@ type BucketStore struct { blockEstimatedMaxSeriesFunc BlockEstimator blockEstimatedMaxChunkFunc BlockEstimator - lazyDownloadIndexHeaderFunc indexheader.LazyDownloadIndexHeaderFunc + indexHeaderLazyDownloadStrategy indexheader.LazyDownloadIndexHeaderFunc } func (s *BucketStore) validate() error { @@ -533,11 +533,11 @@ func WithDontResort(true bool) BucketStoreOption { } } -// WithLazyDownloadIndexHeaderFunc specifies what block to lazy download its index header. +// WithIndexHeaderLazyDownloadStrategy specifies what block to lazy download its index header. // Only used when lazy mmap is enabled at the same time. -func WithLazyDownloadIndexHeaderFunc(f indexheader.LazyDownloadIndexHeaderFunc) BucketStoreOption { +func WithIndexHeaderLazyDownloadStrategy(strategy indexheader.LazyDownloadIndexHeaderFunc) BucketStoreOption { return func(s *BucketStore) { - s.lazyDownloadIndexHeaderFunc = f + s.indexHeaderLazyDownloadStrategy = strategy } } @@ -569,22 +569,22 @@ func NewBucketStore( b := make([]byte, 0, initialBufSize) return &b }}, - chunkPool: pool.NoopBytes{}, - blocks: map[ulid.ULID]*bucketBlock{}, - blockSets: map[uint64]*bucketBlockSet{}, - blockSyncConcurrency: blockSyncConcurrency, - queryGate: gate.NewNoop(), - chunksLimiterFactory: chunksLimiterFactory, - seriesLimiterFactory: seriesLimiterFactory, - bytesLimiterFactory: bytesLimiterFactory, - partitioner: partitioner, - enableCompatibilityLabel: enableCompatibilityLabel, - postingOffsetsInMemSampling: postingOffsetsInMemSampling, - enableSeriesResponseHints: enableSeriesResponseHints, - enableChunkHashCalculation: enableChunkHashCalculation, - seriesBatchSize: SeriesBatchSize, - sortingStrategy: sortingStrategyStore, - lazyDownloadIndexHeaderFunc: indexheader.DisableLazyDownloadIndexHeader, + chunkPool: pool.NoopBytes{}, + blocks: map[ulid.ULID]*bucketBlock{}, + blockSets: map[uint64]*bucketBlockSet{}, + blockSyncConcurrency: blockSyncConcurrency, + queryGate: gate.NewNoop(), + chunksLimiterFactory: chunksLimiterFactory, + seriesLimiterFactory: seriesLimiterFactory, + bytesLimiterFactory: bytesLimiterFactory, + partitioner: partitioner, + enableCompatibilityLabel: enableCompatibilityLabel, + postingOffsetsInMemSampling: postingOffsetsInMemSampling, + enableSeriesResponseHints: enableSeriesResponseHints, + enableChunkHashCalculation: enableChunkHashCalculation, + seriesBatchSize: SeriesBatchSize, + sortingStrategy: sortingStrategyStore, + indexHeaderLazyDownloadStrategy: indexheader.AlwaysEagerDownloadIndexHeader, } for _, option := range options { @@ -593,7 +593,7 @@ func NewBucketStore( // Depend on the options indexReaderPoolMetrics := indexheader.NewReaderPoolMetrics(extprom.WrapRegistererWithPrefix("thanos_bucket_store_", s.reg)) - s.indexReaderPool = indexheader.NewReaderPool(s.logger, lazyIndexReaderEnabled, lazyIndexReaderIdleTimeout, indexReaderPoolMetrics, s.lazyDownloadIndexHeaderFunc) + s.indexReaderPool = indexheader.NewReaderPool(s.logger, lazyIndexReaderEnabled, lazyIndexReaderIdleTimeout, indexReaderPoolMetrics, s.indexHeaderLazyDownloadStrategy) s.metrics = newBucketStoreMetrics(s.reg) // TODO(metalmatze): Might be possible via Option too if err := s.validate(); err != nil { diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index c5a9247054..67223a9467 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1658,7 +1658,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { bkt: objstore.WithNoopInstr(bkt), logger: logger, indexCache: indexCache, - indexReaderPool: indexheader.NewReaderPool(log.NewNopLogger(), false, 0, indexheader.NewReaderPoolMetrics(nil), indexheader.DisableLazyDownloadIndexHeader), + indexReaderPool: indexheader.NewReaderPool(log.NewNopLogger(), false, 0, indexheader.NewReaderPoolMetrics(nil), indexheader.AlwaysEagerDownloadIndexHeader), metrics: newBucketStoreMetrics(nil), blockSets: map[uint64]*bucketBlockSet{ labels.Labels{{Name: "ext1", Value: "1"}}.Hash(): {blocks: [][]*bucketBlock{{b1, b2}}}, From d3c0223602611d2380fc1932c3ee023d241c6db5 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Wed, 20 Dec 2023 08:01:44 +0000 Subject: [PATCH 5/5] changelog Signed-off-by: Ben Ye --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2f6a09133a..0fc8fd93ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#6954](https://github.com/thanos-io/thanos/pull/6954) Index Cache: Support tracing for fetch APIs. - [#6943](https://github.com/thanos-io/thanos/pull/6943) Ruler: Added `keep_firing_for` field in alerting rule. - [#6972](https://github.com/thanos-io/thanos/pull/6972) Store Gateway: Apply series limit when streaming series for series actually matched if lazy postings is enabled. +- [#6984](https://github.com/thanos-io/thanos/pull/6984) Store Gateway: Added `--store.index-header-lazy-download-strategy` to specify how to lazily download index headers when lazy mmap is enabled. ### Changed