From 0aa995458671b40b5ce7a10a9741f45230324a60 Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Wed, 1 Apr 2020 12:16:16 +0100 Subject: [PATCH] fetcher: Made metaFetcher go routine safe; Fixed multiple bucket UI + fetcher issues. (#2354) Fixed https://github.com/thanos-io/thanos/issues/2349 Fixed races (we were reusing fetcher by both bucket UI and compaction syncs... Fixed logging Added singleflight to ensure we don't synchronize too often. Signed-off-by: Bartlomiej Plotka --- cmd/thanos/bucket.go | 53 +---- cmd/thanos/compact.go | 31 ++- cmd/thanos/downsample.go | 4 +- cmd/thanos/main_test.go | 4 +- cmd/thanos/store.go | 16 +- go.mod | 1 + pkg/block/fetcher.go | 302 ++++++++++++++++---------- pkg/block/fetcher_test.go | 31 +-- pkg/compact/clean_test.go | 2 +- pkg/compact/compact_e2e_test.go | 4 +- pkg/compact/retention_test.go | 2 +- pkg/replicate/replicator.go | 2 +- pkg/replicate/scheme_test.go | 2 +- pkg/store/bucket_e2e_test.go | 2 +- pkg/store/bucket_test.go | 2 +- pkg/ui/bucket.go | 42 +++- pkg/verifier/duplicated_compaction.go | 4 +- pkg/verifier/index_issue.go | 2 +- pkg/verifier/overlapped_blocks.go | 6 +- pkg/verifier/verify.go | 8 +- 20 files changed, 297 insertions(+), 223 deletions(-) diff --git a/cmd/thanos/bucket.go b/cmd/thanos/bucket.go index 124586c5cb..2a46098c7c 100644 --- a/cmd/thanos/bucket.go +++ b/cmd/thanos/bucket.go @@ -140,7 +140,7 @@ func registerBucketVerify(m map[string]setupFunc, root *kingpin.CmdClause, name issues = append(issues, issueFn) } - fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil) + fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil, nil) if err != nil { return err } @@ -189,7 +189,7 @@ func registerBucketLs(m map[string]setupFunc, root *kingpin.CmdClause, name stri return err } - fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil) + fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil, nil) if err != nil { return err } @@ -289,7 +289,7 @@ func registerBucketInspect(m map[string]setupFunc, root *kingpin.CmdClause, name return err } - fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil) + fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil, nil) if err != nil { return err } @@ -373,7 +373,8 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str return errors.Wrap(err, "bucket client") } - fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil) + // TODO(bwplotka): Allow Bucket UI to visualisate the state of block as well. + fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil, nil) if err != nil { return err } @@ -382,7 +383,7 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str g.Add(func() error { statusProber.Ready() defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client") - return refresh(ctx, logger, bucketUI, *interval, *timeout, fetcher) + return bucketUI.RunRefreshLoop(ctx, fetcher, *interval, *timeout) }, func(error) { cancel() }) @@ -459,48 +460,6 @@ func registerBucketDownsample(m map[string]setupFunc, root *kingpin.CmdClause, n } } -// refresh metadata from remote storage periodically and update the UI. -func refresh(ctx context.Context, logger log.Logger, bucketUI *ui.Bucket, duration time.Duration, timeout time.Duration, fetcher *block.MetaFetcher) error { - return runutil.Repeat(duration, ctx.Done(), func() error { - return runutil.RetryWithLog(logger, time.Minute, ctx.Done(), func() error { - iterCtx, iterCancel := context.WithTimeout(ctx, timeout) - defer iterCancel() - - blocks, err := download(iterCtx, logger, fetcher) - if err != nil { - bucketUI.Set("[]", err) - return err - } - - data, err := json.Marshal(blocks) - if err != nil { - bucketUI.Set("[]", err) - return err - } - bucketUI.Set(string(data), nil) - return nil - }) - }) -} - -func download(ctx context.Context, logger log.Logger, fetcher *block.MetaFetcher) ([]metadata.Meta, error) { - level.Info(logger).Log("msg", "synchronizing block metadata") - - metas, _, err := fetcher.Fetch(ctx) - if err != nil { - return nil, err - } - - blocks := []metadata.Meta{} - - for _, meta := range metas { - blocks = append(blocks, *meta) - } - - level.Info(logger).Log("msg", "downloaded blocks meta.json", "num", len(blocks)) - return blocks, nil -} - func printTable(blockMetas []*metadata.Meta, selectorLabels labels.Labels, sortBy []string) error { header := inspectColumns diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index eb3a6ae012..409cbe8d2f 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -301,27 +301,25 @@ func runCompact( // The delay of deleteDelay/2 is added to ensure we fetch blocks that are meant to be deleted but do not have a replacement yet. ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, time.Duration(deleteDelay.Seconds()/2)*time.Second) duplicateBlocksFilter := block.NewDeduplicateFilter() - prometheusRegisterer := extprom.WrapRegistererWithPrefix("thanos_", reg) - metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", prometheusRegisterer, []block.MetadataFilter{ + baseMetaFetcher, err := block.NewBaseFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg)) + if err != nil { + return errors.Wrap(err, "create meta fetcher") + } + metaFetcherFilters := []block.MetadataFilter{ block.NewLabelShardedMetaFilter(relabelConfig), - block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer), + block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg)), ignoreDeletionMarkFilter, duplicateBlocksFilter, - }, - block.NewReplicaLabelRemover(logger, dedupReplicaLabels), - ) - if err != nil { - return errors.Wrap(err, "create meta fetcher") } - + compactFetcher := baseMetaFetcher.WithFilters(extprom.WrapRegistererWithPrefix("thanos_", reg), metaFetcherFilters, []block.MetadataModifier{block.NewReplicaLabelRemover(logger, dedupReplicaLabels)}) enableVerticalCompaction := false if len(dedupReplicaLabels) > 0 { enableVerticalCompaction = true level.Info(logger).Log("msg", "deduplication.replica-label specified, vertical compaction is enabled", "dedupReplicaLabels", strings.Join(dedupReplicaLabels, ",")) } - sy, err := compact.NewSyncer(logger, reg, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, blockSyncConcurrency, acceptMalformedIndex, enableVerticalCompaction) + sy, err := compact.NewSyncer(logger, reg, bkt, compactFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, blockSyncConcurrency, acceptMalformedIndex, enableVerticalCompaction) if err != nil { return errors.Wrap(err, "create syncer") } @@ -383,13 +381,13 @@ func runCompact( // for 5m downsamplings created in the first run. level.Info(logger).Log("msg", "start first pass of downsampling") - if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, metaFetcher, downsamplingDir); err != nil { + if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, compactFetcher, downsamplingDir); err != nil { return errors.Wrap(err, "first pass of downsampling failed") } level.Info(logger).Log("msg", "start second pass of downsampling") - if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, metaFetcher, downsamplingDir); err != nil { + if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, compactFetcher, downsamplingDir); err != nil { return errors.Wrap(err, "second pass of downsampling failed") } level.Info(logger).Log("msg", "downsampling iterations done") @@ -397,7 +395,7 @@ func runCompact( level.Warn(logger).Log("msg", "downsampling was explicitly disabled") } - if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, metaFetcher, retentionByResolution, blocksMarkedForDeletion); err != nil { + if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, compactFetcher, retentionByResolution, blocksMarkedForDeletion); err != nil { return errors.Wrap(err, fmt.Sprintf("retention failed")) } @@ -405,7 +403,7 @@ func runCompact( return errors.Wrap(err, "error cleaning blocks") } - compact.BestEffortCleanAbortedPartialUploads(ctx, logger, metaFetcher, bkt, partialUploadDeleteAttempts, blocksMarkedForDeletion) + compact.BestEffortCleanAbortedPartialUploads(ctx, logger, compactFetcher, bkt, partialUploadDeleteAttempts, blocksMarkedForDeletion) return nil } @@ -414,7 +412,7 @@ func runCompact( // Generate index file. if generateMissingIndexCacheFiles { - if err := genMissingIndexCacheFiles(ctx, logger, reg, bkt, metaFetcher, indexCacheDir); err != nil { + if err := genMissingIndexCacheFiles(ctx, logger, reg, bkt, compactFetcher, indexCacheDir); err != nil { return err } } @@ -465,7 +463,8 @@ func runCompact( srv.Handle("/", router) g.Add(func() error { - return refresh(ctx, logger, bucketUI, waitInterval, time.Minute, metaFetcher) + // TODO(bwplotka): Allow Bucket UI to visualisate the state of the block as well. + return bucketUI.RunRefreshLoop(ctx, baseMetaFetcher.WithFilters(extprom.WrapRegistererWithPrefix("thanos_bucket_ui", reg), metaFetcherFilters, nil), waitInterval, time.Minute) }, func(error) { cancel() }) diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go index f2dd7c82dd..c98335fd0c 100644 --- a/cmd/thanos/downsample.go +++ b/cmd/thanos/downsample.go @@ -72,7 +72,9 @@ func RunDownsample( return err } - metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg), nil) + metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{ + block.NewDeduplicateFilter(), + }, nil) if err != nil { return errors.Wrap(err, "create meta fetcher") } diff --git a/cmd/thanos/main_test.go b/cmd/thanos/main_test.go index 58e340db3d..a993013995 100644 --- a/cmd/thanos/main_test.go +++ b/cmd/thanos/main_test.go @@ -76,7 +76,7 @@ func TestCleanupIndexCacheFolder(t *testing.T) { Name: metricIndexGenerateName, Help: metricIndexGenerateHelp, }) - metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil) + metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil, nil) testutil.Ok(t, err) testutil.Ok(t, genMissingIndexCacheFiles(ctx, logger, reg, bkt, metaFetcher, dir)) @@ -116,7 +116,7 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) { metrics := newDownsampleMetrics(prometheus.NewRegistry()) testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta.Thanos)))) - metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil) + metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil, nil) testutil.Ok(t, err) testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, metaFetcher, dir)) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index d6d865533c..446077530c 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -241,14 +241,14 @@ func runStore( } ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, ignoreDeletionMarksDelay) - prometheusRegisterer := extprom.WrapRegistererWithPrefix("thanos_", reg) - metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, dataDir, prometheusRegisterer, []block.MetadataFilter{ - block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime), - block.NewLabelShardedMetaFilter(relabelConfig), - block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer), - ignoreDeletionMarkFilter, - block.NewDeduplicateFilter(), - }) + metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg), + []block.MetadataFilter{ + block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime), + block.NewLabelShardedMetaFilter(relabelConfig), + block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg)), + ignoreDeletionMarkFilter, + block.NewDeduplicateFilter(), + }, nil) if err != nil { return errors.Wrap(err, "meta fetcher") } diff --git a/go.mod b/go.mod index fd4b6e3d84..620eda6f71 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/go-kit/kit v0.9.0 github.com/go-openapi/strfmt v0.19.2 github.com/gogo/protobuf v1.3.1 + github.com/golang/groupcache v0.0.0-20191027212112-611e8accdfc9 github.com/golang/snappy v0.0.1 github.com/googleapis/gax-go v2.0.2+incompatible github.com/gophercloud/gophercloud v0.6.0 diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index bda9998e45..10ca88e17b 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -16,6 +16,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/golang/groupcache/singleflight" "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -92,11 +93,13 @@ func newFetcherMetrics(reg prometheus.Registerer) *fetcherMetrics { Help: "Duration of the blocks metadata synchronization in seconds", Buckets: []float64{0.01, 1, 10, 100, 1000}, }) - m.synced = extprom.NewTxGaugeVec(reg, prometheus.GaugeOpts{ - Subsystem: fetcherSubSys, - Name: "synced", - Help: "Number of block metadata synced", - }, + m.synced = extprom.NewTxGaugeVec( + reg, + prometheus.GaugeOpts{ + Subsystem: fetcherSubSys, + Name: "synced", + Help: "Number of block metadata synced", + }, []string{"state"}, []string{corruptedMeta}, []string{noMeta}, @@ -108,11 +111,13 @@ func newFetcherMetrics(reg prometheus.Registerer) *fetcherMetrics { []string{duplicateMeta}, []string{markedForDeletionMeta}, ) - m.modified = extprom.NewTxGaugeVec(reg, prometheus.GaugeOpts{ - Subsystem: fetcherSubSys, - Name: "modified", - Help: "Number of blocks that their metadata modified", - }, + m.modified = extprom.NewTxGaugeVec( + reg, + prometheus.GaugeOpts{ + Subsystem: fetcherSubSys, + Name: "modified", + Help: "Number of blocks that their metadata modified", + }, []string{"modified"}, []string{replicaRemovedMeta}, ) @@ -131,25 +136,22 @@ type MetadataModifier interface { Modify(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, modified *extprom.TxGaugeVec, incompleteView bool) error } -// MetaFetcher is a struct that synchronizes filtered metadata of all block in the object storage with the local state. -// Not go-routine safe. -type MetaFetcher struct { +// BaseFetcher is a struct that synchronizes filtered metadata of all block in the object storage with the local state. +// Go-routine safe. +type BaseFetcher struct { logger log.Logger concurrency int bkt objstore.BucketReader // Optional local directory to cache meta.json files. cacheDir string - metrics *fetcherMetrics - - filters []MetadataFilter - modifiers []MetadataModifier - - cached map[ulid.ULID]*metadata.Meta + cached map[ulid.ULID]*metadata.Meta + syncs prometheus.Counter + g singleflight.Group } -// NewMetaFetcher constructs MetaFetcher. -func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.BucketReader, dir string, r prometheus.Registerer, filters []MetadataFilter, modifiers ...MetadataModifier) (*MetaFetcher, error) { +// NewBaseFetcher constructs BaseFetcher. +func NewBaseFetcher(logger log.Logger, concurrency int, bkt objstore.BucketReader, dir string, reg prometheus.Registerer) (*BaseFetcher, error) { if logger == nil { logger = log.NewNopLogger() } @@ -162,18 +164,34 @@ func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.BucketReade } } - return &MetaFetcher{ - logger: log.With(logger, "component", "block.MetaFetcher"), + return &BaseFetcher{ + logger: log.With(logger, "component", "block.BaseFetcher"), concurrency: concurrency, bkt: bkt, cacheDir: cacheDir, - metrics: newFetcherMetrics(r), - filters: filters, - modifiers: modifiers, cached: map[ulid.ULID]*metadata.Meta{}, + syncs: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Subsystem: fetcherSubSys, + Name: "base_syncs_total", + Help: "Total blocks metadata synchronization attempts by base Fetcher", + }), }, nil } +// NewMetaFetcher returns meta fetcher. +func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.BucketReader, dir string, reg prometheus.Registerer, filters []MetadataFilter, modifiers []MetadataModifier) (*MetaFetcher, error) { + b, err := NewBaseFetcher(logger, concurrency, bkt, dir, reg) + if err != nil { + return nil, err + } + return b.WithFilters(reg, filters, modifiers), nil +} + +// WithFilters transforms BaseFetcher into actually usable MetadataFetcher. +func (f *BaseFetcher) WithFilters(reg prometheus.Registerer, filters []MetadataFilter, modifiers []MetadataModifier) *MetaFetcher { + return &MetaFetcher{metrics: newFetcherMetrics(reg), wrapped: f, filters: filters, modifiers: modifiers} +} + var ( ErrorSyncMetaNotFound = errors.New("meta.json not found") ErrorSyncMetaCorrupted = errors.New("meta.json corrupted") @@ -181,16 +199,16 @@ var ( // loadMeta returns metadata from object storage or error. // It returns `ErrorSyncMetaNotFound` and `ErrorSyncMetaCorrupted` sentinel errors in those cases. -func (s *MetaFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*metadata.Meta, error) { +func (f *BaseFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*metadata.Meta, error) { var ( metaFile = path.Join(id.String(), MetaFilename) - cachedBlockDir = filepath.Join(s.cacheDir, id.String()) + cachedBlockDir = filepath.Join(f.cacheDir, id.String()) ) // TODO(bwplotka): If that causes problems (obj store rate limits), add longer ttl to cached items. // For 1y and 100 block sources this generates ~1.5-3k HEAD RPM. AWS handles 330k RPM per prefix. // TODO(bwplotka): Consider filtering by consistency delay here (can't do until compactor healthyOverride work). - ok, err := s.bkt.Exists(ctx, metaFile) + ok, err := f.bkt.Exists(ctx, metaFile) if err != nil { return nil, errors.Wrapf(err, "meta.json file exists: %v", metaFile) } @@ -198,27 +216,27 @@ func (s *MetaFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*metadata.Met return nil, ErrorSyncMetaNotFound } - if m, seen := s.cached[id]; seen { + if m, seen := f.cached[id]; seen { return m, nil } // Best effort load from local dir. - if s.cacheDir != "" { + if f.cacheDir != "" { m, err := metadata.Read(cachedBlockDir) if err == nil { return m, nil } if !errors.Is(err, os.ErrNotExist) { - level.Warn(s.logger).Log("msg", "best effort read of the local meta.json failed; removing cached block dir", "dir", cachedBlockDir, "err", err) + level.Warn(f.logger).Log("msg", "best effort read of the local meta.json failed; removing cached block dir", "dir", cachedBlockDir, "err", err) if err := os.RemoveAll(cachedBlockDir); err != nil { - level.Warn(s.logger).Log("msg", "best effort remove of cached dir failed; ignoring", "dir", cachedBlockDir, "err", err) + level.Warn(f.logger).Log("msg", "best effort remove of cached dir failed; ignoring", "dir", cachedBlockDir, "err", err) } } } - r, err := s.bkt.Get(ctx, metaFile) - if s.bkt.IsObjNotFoundErr(err) { + r, err := f.bkt.Get(ctx, metaFile) + if f.bkt.IsObjNotFoundErr(err) { // Meta.json was deleted between bkt.Exists and here. return nil, errors.Wrapf(ErrorSyncMetaNotFound, "%v", err) } @@ -226,7 +244,7 @@ func (s *MetaFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*metadata.Met return nil, errors.Wrapf(err, "get meta file: %v", metaFile) } - defer runutil.CloseWithLogOnErr(s.logger, r, "close bkt meta get") + defer runutil.CloseWithLogOnErr(f.logger, r, "close bkt meta get") metaContent, err := ioutil.ReadAll(r) if err != nil { @@ -243,71 +261,71 @@ func (s *MetaFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*metadata.Met } // Best effort cache in local dir. - if s.cacheDir != "" { + if f.cacheDir != "" { if err := os.MkdirAll(cachedBlockDir, os.ModePerm); err != nil { - level.Warn(s.logger).Log("msg", "best effort mkdir of the meta.json block dir failed; ignoring", "dir", cachedBlockDir, "err", err) + level.Warn(f.logger).Log("msg", "best effort mkdir of the meta.json block dir failed; ignoring", "dir", cachedBlockDir, "err", err) } - if err := metadata.Write(s.logger, cachedBlockDir, m); err != nil { - level.Warn(s.logger).Log("msg", "best effort save of the meta.json to local dir failed; ignoring", "dir", cachedBlockDir, "err", err) + if err := metadata.Write(f.logger, cachedBlockDir, m); err != nil { + level.Warn(f.logger).Log("msg", "best effort save of the meta.json to local dir failed; ignoring", "dir", cachedBlockDir, "err", err) } } return m, nil } -// Fetch returns all block metas as well as partial blocks (blocks without or with corrupted meta file) from the bucket. -// It's caller responsibility to not change the returned metadata files. Maps can be modified. -// -// Returned error indicates a failure in fetching metadata. Returned meta can be assumed as correct, with some blocks missing. -func (s *MetaFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.Meta, partial map[ulid.ULID]error, err error) { - start := time.Now() - defer func() { - s.metrics.syncDuration.Observe(time.Since(start).Seconds()) - if err != nil { - s.metrics.syncFailures.Inc() - } - }() - s.metrics.syncs.Inc() +type response struct { + metas map[ulid.ULID]*metadata.Meta + partial map[ulid.ULID]error + // If metaErr > 0 it means incomplete view, so some metas, failed to be loaded. + metaErrs tsdberrors.MultiError - metas = make(map[ulid.ULID]*metadata.Meta) - partial = make(map[ulid.ULID]error) + noMetas float64 + corruptedMetas float64 + + incompleteView bool +} + +func (f *BaseFetcher) fetchMetadata(ctx context.Context) (interface{}, error) { + f.syncs.Inc() var ( + resp = response{ + metas: make(map[ulid.ULID]*metadata.Meta), + partial: make(map[ulid.ULID]error), + } eg errgroup.Group - ch = make(chan ulid.ULID, s.concurrency) + ch = make(chan ulid.ULID, f.concurrency) mtx sync.Mutex - - metaErrs tsdberrors.MultiError ) - - s.metrics.resetTx() - - for i := 0; i < s.concurrency; i++ { + for i := 0; i < f.concurrency; i++ { eg.Go(func() error { for id := range ch { - meta, err := s.loadMeta(ctx, id) + meta, err := f.loadMeta(ctx, id) if err == nil { mtx.Lock() - metas[id] = meta + resp.metas[id] = meta mtx.Unlock() continue } switch errors.Cause(err) { default: - s.metrics.synced.WithLabelValues(failedMeta).Inc() mtx.Lock() - metaErrs.Add(err) + resp.metaErrs.Add(err) mtx.Unlock() continue case ErrorSyncMetaNotFound: - s.metrics.synced.WithLabelValues(noMeta).Inc() + mtx.Lock() + resp.noMetas++ + mtx.Unlock() case ErrorSyncMetaCorrupted: - s.metrics.synced.WithLabelValues(corruptedMeta).Inc() + mtx.Lock() + resp.corruptedMetas++ + mtx.Unlock() } mtx.Lock() - partial[id] = err + resp.partial[id] = err mtx.Unlock() } return nil @@ -317,7 +335,7 @@ func (s *MetaFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata. // Workers scheduled, distribute blocks. eg.Go(func() error { defer close(ch) - return s.bkt.Iter(ctx, "", func(name string) error { + return f.bkt.Iter(ctx, "", func(name string) error { id, ok := IsBlockDir(name) if !ok { return nil @@ -334,74 +352,124 @@ func (s *MetaFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata. }) if err := eg.Wait(); err != nil { - return nil, nil, errors.Wrap(err, "MetaFetcher: iter bucket") + return nil, errors.Wrap(err, "BaseFetcher: iter bucket") } - incompleteView := len(metaErrs) > 0 + if len(resp.metaErrs) > 0 { + return resp, nil + } // Only for complete view of blocks update the cache. - if !incompleteView { - cached := make(map[ulid.ULID]*metadata.Meta, len(metas)) - for id, m := range metas { - cached[id] = m - } - s.cached = cached - - // Best effort cleanup of disk-cached metas. - if s.cacheDir != "" { - names, err := fileutil.ReadDir(s.cacheDir) - if err != nil { - level.Warn(s.logger).Log("msg", "best effort remove of not needed cached dirs failed; ignoring", "err", err) - } else { - for _, n := range names { - id, ok := IsBlockDir(n) - if !ok { - continue - } - - if _, ok := metas[id]; ok { - continue - } - - cachedBlockDir := filepath.Join(s.cacheDir, id.String()) - - // No such block loaded, remove the local dir. - if err := os.RemoveAll(cachedBlockDir); err != nil { - level.Warn(s.logger).Log("msg", "best effort remove of not needed cached dir failed; ignoring", "dir", cachedBlockDir, "err", err) - } + cached := make(map[ulid.ULID]*metadata.Meta, len(resp.metas)) + for id, m := range resp.metas { + cached[id] = m + } + f.cached = cached + + // Best effort cleanup of disk-cached metas. + if f.cacheDir != "" { + names, err := fileutil.ReadDir(f.cacheDir) + if err != nil { + level.Warn(f.logger).Log("msg", "best effort remove of not needed cached dirs failed; ignoring", "err", err) + } else { + for _, n := range names { + id, ok := IsBlockDir(n) + if !ok { + continue + } + + if _, ok := resp.metas[id]; ok { + continue + } + + cachedBlockDir := filepath.Join(f.cacheDir, id.String()) + + // No such block loaded, remove the local dir. + if err := os.RemoveAll(cachedBlockDir); err != nil { + level.Warn(f.logger).Log("msg", "best effort remove of not needed cached dir failed; ignoring", "dir", cachedBlockDir, "err", err) } } } } + return resp, nil +} + +func (f *BaseFetcher) fetch(ctx context.Context, metrics *fetcherMetrics, filters []MetadataFilter, modifiers []MetadataModifier) (_ map[ulid.ULID]*metadata.Meta, _ map[ulid.ULID]error, err error) { + start := time.Now() + defer func() { + metrics.syncDuration.Observe(time.Since(start).Seconds()) + if err != nil { + metrics.syncFailures.Inc() + } + }() + metrics.syncs.Inc() + metrics.resetTx() + + // Run this in thread safe run group. + // TODO(bwplotka): Consider custom singleflight with ttl. + v, err := f.g.Do("", func() (i interface{}, err error) { + // NOTE: First go routine context will go through. + return f.fetchMetadata(ctx) + }) + if err != nil { + return nil, nil, err + } + resp := v.(response) + + // Copy as same response might be reused by different goroutines. + metas := make(map[ulid.ULID]*metadata.Meta, len(resp.metas)) + for id, m := range resp.metas { + metas[id] = m + } - for _, f := range s.filters { + metrics.synced.WithLabelValues(failedMeta).Set(float64(len(resp.metaErrs))) + metrics.synced.WithLabelValues(noMeta).Set(resp.noMetas) + metrics.synced.WithLabelValues(corruptedMeta).Set(resp.corruptedMetas) + + for _, filter := range filters { // NOTE: filter can update synced metric accordingly to the reason of the exclude. - if err := f.Filter(ctx, metas, s.metrics.synced, incompleteView); err != nil { + if err := filter.Filter(ctx, metas, metrics.synced, resp.incompleteView); err != nil { return nil, nil, errors.Wrap(err, "filter metas") } } - for _, m := range s.modifiers { + for _, m := range modifiers { // NOTE: modifier can update modified metric accordingly to the reason of the modification. - if err := m.Modify(ctx, metas, s.metrics.modified, incompleteView); err != nil { + if err := m.Modify(ctx, metas, metrics.modified, resp.incompleteView); err != nil { return nil, nil, errors.Wrap(err, "modify metas") } } - s.metrics.synced.WithLabelValues(loadedMeta).Set(float64(len(metas))) - s.metrics.submit() + metrics.synced.WithLabelValues(loadedMeta).Set(float64(len(metas))) + metrics.submit() - if incompleteView { - return metas, partial, errors.Wrap(metaErrs, "incomplete view") + if len(resp.metaErrs) > 0 { + return metas, resp.partial, errors.Wrap(resp.metaErrs, "incomplete view") } - level.Debug(s.logger).Log("msg", "successfully fetched block metadata", "duration", time.Since(start).String(), "cached", len(s.cached), "returned", len(metas), "partial", len(partial)) - return metas, partial, nil + level.Debug(f.logger).Log("msg", "successfully fetched block metadata", "duration", time.Since(start).String(), "cached", len(f.cached), "returned", len(metas), "partial", len(resp.partial)) + return metas, resp.partial, nil +} + +type MetaFetcher struct { + wrapped *BaseFetcher + metrics *fetcherMetrics + + filters []MetadataFilter + modifiers []MetadataModifier +} + +// Fetch returns all block metas as well as partial blocks (blocks without or with corrupted meta file) from the bucket. +// It's caller responsibility to not change the returned metadata files. Maps can be modified. +// +// Returned error indicates a failure in fetching metadata. Returned meta can be assumed as correct, with some blocks missing. +func (f *MetaFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.Meta, partial map[ulid.ULID]error, err error) { + return f.wrapped.fetch(ctx, f.metrics, f.filters, f.modifiers) } var _ MetadataFilter = &TimePartitionMetaFilter{} -// TimePartitionMetaFilter is a MetaFetcher filter that filters out blocks that are outside of specified time range. +// TimePartitionMetaFilter is a BaseFetcher filter that filters out blocks that are outside of specified time range. // Not go-routine safe. type TimePartitionMetaFilter struct { minTime, maxTime model.TimeOrDurationValue @@ -458,7 +526,9 @@ func (f *LabelShardedMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]* return nil } -// DeduplicateFilter is a MetaFetcher filter that filters out older blocks that have exactly the same data. +var _ MetadataFilter = &DeduplicateFilter{} + +// DeduplicateFilter is a BaseFetcher filter that filters out older blocks that have exactly the same data. // Not go-routine safe. type DeduplicateFilter struct { duplicateIDs []ulid.ULID @@ -572,7 +642,9 @@ func contains(s1 []ulid.ULID, s2 []ulid.ULID) bool { return true } -// ReplicaLabelRemover is a MetaFetcher modifier modifies external labels of existing blocks, it removes given replica labels from the metadata of blocks that have it. +var _ MetadataModifier = &ReplicaLabelRemover{} + +// ReplicaLabelRemover is a BaseFetcher modifier modifies external labels of existing blocks, it removes given replica labels from the metadata of blocks that have it. type ReplicaLabelRemover struct { logger log.Logger @@ -600,7 +672,7 @@ func (r *ReplicaLabelRemover) Modify(_ context.Context, metas map[ulid.ULID]*met return nil } -// ConsistencyDelayMetaFilter is a MetaFetcher filter that filters out blocks that are created before a specified consistency delay. +// ConsistencyDelayMetaFilter is a BaseFetcher filter that filters out blocks that are created before a specified consistency delay. // Not go-routine safe. type ConsistencyDelayMetaFilter struct { logger log.Logger diff --git a/pkg/block/fetcher_test.go b/pkg/block/fetcher_test.go index 99920b8e5f..cd7b3ffade 100644 --- a/pkg/block/fetcher_test.go +++ b/pkg/block/fetcher_test.go @@ -75,11 +75,13 @@ func TestMetaFetcher_Fetch(t *testing.T) { var ulidToDelete ulid.ULID r := prometheus.NewRegistry() - f, err := NewMetaFetcher(log.NewNopLogger(), 20, bkt, dir, r, []MetadataFilter{ - &ulidFilter{ulidToDelete: &ulidToDelete}, - }) + baseFetcher, err := NewBaseFetcher(log.NewNopLogger(), 20, bkt, dir, r) testutil.Ok(t, err) + fetcher := baseFetcher.WithFilters(r, []MetadataFilter{ + &ulidFilter{ulidToDelete: &ulidToDelete}, + }, nil) + for i, tcase := range []struct { name string do func() @@ -133,7 +135,7 @@ func TestMetaFetcher_Fetch(t *testing.T) { { name: "fresh cache", do: func() { - f.cached = map[ulid.ULID]*metadata.Meta{} + baseFetcher.cached = map[ulid.ULID]*metadata.Meta{} }, expectedMetas: ULIDs(1, 2, 3), @@ -143,7 +145,7 @@ func TestMetaFetcher_Fetch(t *testing.T) { { name: "fresh cache: meta 2 and 3 have corrupted data on disk ", do: func() { - f.cached = map[ulid.ULID]*metadata.Meta{} + baseFetcher.cached = map[ulid.ULID]*metadata.Meta{} testutil.Ok(t, os.Remove(filepath.Join(dir, "meta-syncer", ULID(2).String(), MetaFilename))) @@ -238,7 +240,7 @@ func TestMetaFetcher_Fetch(t *testing.T) { tcase.do() ulidToDelete = tcase.filterULID - metas, partial, err := f.Fetch(ctx) + metas, partial, err := fetcher.Fetch(ctx) if tcase.expectedMetaErr != nil { testutil.NotOk(t, err) testutil.Equals(t, tcase.expectedMetaErr.Error(), err.Error()) @@ -279,14 +281,15 @@ func TestMetaFetcher_Fetch(t *testing.T) { if tcase.expectedMetaErr != nil { expectedFailures = 1 } - testutil.Equals(t, float64(i+1), promtest.ToFloat64(f.metrics.syncs)) - testutil.Equals(t, float64(len(tcase.expectedMetas)), promtest.ToFloat64(f.metrics.synced.WithLabelValues(loadedMeta))) - testutil.Equals(t, float64(len(tcase.expectedNoMeta)), promtest.ToFloat64(f.metrics.synced.WithLabelValues(noMeta))) - testutil.Equals(t, float64(tcase.expectedFiltered), promtest.ToFloat64(f.metrics.synced.WithLabelValues("filtered"))) - testutil.Equals(t, 0.0, promtest.ToFloat64(f.metrics.synced.WithLabelValues(labelExcludedMeta))) - testutil.Equals(t, 0.0, promtest.ToFloat64(f.metrics.synced.WithLabelValues(timeExcludedMeta))) - testutil.Equals(t, float64(expectedFailures), promtest.ToFloat64(f.metrics.synced.WithLabelValues(failedMeta))) - testutil.Equals(t, 0.0, promtest.ToFloat64(f.metrics.synced.WithLabelValues(tooFreshMeta))) + testutil.Equals(t, float64(i+1), promtest.ToFloat64(baseFetcher.syncs)) + testutil.Equals(t, float64(i+1), promtest.ToFloat64(fetcher.metrics.syncs)) + testutil.Equals(t, float64(len(tcase.expectedMetas)), promtest.ToFloat64(fetcher.metrics.synced.WithLabelValues(loadedMeta))) + testutil.Equals(t, float64(len(tcase.expectedNoMeta)), promtest.ToFloat64(fetcher.metrics.synced.WithLabelValues(noMeta))) + testutil.Equals(t, float64(tcase.expectedFiltered), promtest.ToFloat64(fetcher.metrics.synced.WithLabelValues("filtered"))) + testutil.Equals(t, 0.0, promtest.ToFloat64(fetcher.metrics.synced.WithLabelValues(labelExcludedMeta))) + testutil.Equals(t, 0.0, promtest.ToFloat64(fetcher.metrics.synced.WithLabelValues(timeExcludedMeta))) + testutil.Equals(t, float64(expectedFailures), promtest.ToFloat64(fetcher.metrics.synced.WithLabelValues(failedMeta))) + testutil.Equals(t, 0.0, promtest.ToFloat64(fetcher.metrics.synced.WithLabelValues(tooFreshMeta))) }); !ok { return } diff --git a/pkg/compact/clean_test.go b/pkg/compact/clean_test.go index 4507b3f5a5..9a7889ead8 100644 --- a/pkg/compact/clean_test.go +++ b/pkg/compact/clean_test.go @@ -29,7 +29,7 @@ func TestBestEffortCleanAbortedPartialUploads(t *testing.T) { bkt := inmem.NewBucket() logger := log.NewNopLogger() - metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil) + metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil, nil) testutil.Ok(t, err) // 1. No meta, old block, should be removed. diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index 52e4147623..21de5cfca7 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -92,7 +92,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { duplicateBlocksFilter := block.NewDeduplicateFilter() metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, []block.MetadataFilter{ duplicateBlocksFilter, - }) + }, nil) testutil.Ok(t, err) blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) @@ -181,7 +181,7 @@ func TestGroup_Compact_e2e(t *testing.T) { metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, []block.MetadataFilter{ ignoreDeletionMarkFilter, duplicateBlocksFilter, - }) + }, nil) testutil.Ok(t, err) blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) diff --git a/pkg/compact/retention_test.go b/pkg/compact/retention_test.go index 980b883839..0a1431996e 100644 --- a/pkg/compact/retention_test.go +++ b/pkg/compact/retention_test.go @@ -245,7 +245,7 @@ func TestApplyRetentionPolicyByResolution(t *testing.T) { uploadMockBlock(t, bkt, b.id, b.minTime, b.maxTime, int64(b.resolution)) } - metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", nil, nil) + metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", nil, nil, nil) testutil.Ok(t, err) blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) diff --git a/pkg/replicate/replicator.go b/pkg/replicate/replicator.go index 198faf8162..82455be46d 100644 --- a/pkg/replicate/replicator.go +++ b/pkg/replicate/replicator.go @@ -151,7 +151,7 @@ func RunReplicate( Help: "The Duration of replication runs split by success and error.", }, []string{"result"}) - fetcher, err := thanosblock.NewMetaFetcher(logger, 32, fromBkt, "", reg, nil) + fetcher, err := thanosblock.NewMetaFetcher(logger, 32, fromBkt, "", reg, nil, nil) if err != nil { return errors.Wrapf(err, "create meta fetcher with bucket %v", fromBkt) } diff --git a/pkg/replicate/scheme_test.go b/pkg/replicate/scheme_test.go index 6b441a572b..26c83aafb6 100644 --- a/pkg/replicate/scheme_test.go +++ b/pkg/replicate/scheme_test.go @@ -313,7 +313,7 @@ func TestReplicationSchemeAll(t *testing.T) { } filter := NewBlockFilter(logger, selector, compact.ResolutionLevelRaw, 1).Filter - fetcher, err := block.NewMetaFetcher(logger, 32, originBucket, "", nil, nil) + fetcher, err := block.NewMetaFetcher(logger, 32, originBucket, "", nil, nil, nil) testutil.Ok(t, err) r := newReplicationScheme( diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 2721c7544e..8bcf3216c0 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -150,7 +150,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m metaFetcher, err := block.NewMetaFetcher(s.logger, 20, bkt, dir, nil, []block.MetadataFilter{ block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime), block.NewLabelShardedMetaFilter(relabelConfig), - }) + }, nil) testutil.Ok(t, err) store, err := NewBucketStore( diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index a7829bf052..516bf684c3 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -714,7 +714,7 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul metaFetcher, err := block.NewMetaFetcher(logger, 20, bkt, dir, nil, []block.MetadataFilter{ block.NewTimePartitionMetaFilter(allowAllFilterConf.MinTime, allowAllFilterConf.MaxTime), block.NewLabelShardedMetaFilter(relabelConf), - }) + }, nil) testutil.Ok(t, err) bucketStore, err := NewBucketStore( diff --git a/pkg/ui/bucket.go b/pkg/ui/bucket.go index 75e8c7b8b6..fdd1aeba43 100644 --- a/pkg/ui/bucket.go +++ b/pkg/ui/bucket.go @@ -4,13 +4,19 @@ package ui import ( + "context" + "encoding/json" "html/template" "net/http" "time" "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" "github.com/prometheus/common/route" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" + "github.com/thanos-io/thanos/pkg/runutil" ) // Bucket is a web UI representing state of buckets as a timeline. @@ -27,7 +33,7 @@ type Bucket struct { func NewBucketUI(logger log.Logger, label string, flagsMap map[string]string) *Bucket { return &Bucket{ - BaseUI: NewBaseUI(logger, "bucket_menu.html", queryTmplFuncs()), + BaseUI: NewBaseUI(log.With(logger, "component", "bucketUI"), "bucket_menu.html", queryTmplFuncs()), Blocks: "[]", Label: label, flagsMap: flagsMap, @@ -52,6 +58,38 @@ func (b *Bucket) root(w http.ResponseWriter, r *http.Request) { func (b *Bucket) Set(data string, err error) { b.RefreshedAt = time.Now() - b.Blocks = template.JS(string(data)) + b.Blocks = template.JS(data) b.Err = err } + +// RunRefreshLoop refreshes periodically metadata from remote storage periodically and update the UI. +func (b *Bucket) RunRefreshLoop(ctx context.Context, fetcher block.MetadataFetcher, interval time.Duration, loopTimeout time.Duration) error { + return runutil.Repeat(interval, ctx.Done(), func() error { + return runutil.RetryWithLog(b.logger, time.Minute, ctx.Done(), func() error { + iterCtx, iterCancel := context.WithTimeout(ctx, loopTimeout) + defer iterCancel() + + level.Debug(b.logger).Log("msg", "synchronizing block metadata") + metas, _, err := fetcher.Fetch(iterCtx) + if err != nil { + level.Error(b.logger).Log("msg", "failed to sync metas", "err", err) + b.Set("[]", err) + return err + } + blocks := make([]metadata.Meta, 0, len(metas)) + for _, meta := range metas { + blocks = append(blocks, *meta) + } + level.Debug(b.logger).Log("msg", "downloaded blocks meta.json", "num", len(blocks)) + + data, err := json.Marshal(blocks) + if err != nil { + b.Set("[]", err) + return err + } + // TODO(bwplotka): Allow setting info about partial blocks as well. + b.Set(string(data), nil) + return nil + }) + }) +} diff --git a/pkg/verifier/duplicated_compaction.go b/pkg/verifier/duplicated_compaction.go index 029aa93caf..d5ccdb73c2 100644 --- a/pkg/verifier/duplicated_compaction.go +++ b/pkg/verifier/duplicated_compaction.go @@ -27,14 +27,14 @@ const DuplicatedCompactionIssueID = "duplicated_compaction" // until sync-delay passes. // The expected print of this are same overlapped blocks with exactly the same sources, time ranges and stats. // If repair is enabled, all but one duplicates are safely deleted. -func DuplicatedCompactionIssue(ctx context.Context, logger log.Logger, bkt objstore.Bucket, backupBkt objstore.Bucket, repair bool, idMatcher func(ulid.ULID) bool, fetcher *block.MetaFetcher, deleteDelay time.Duration, metrics *verifierMetrics) error { +func DuplicatedCompactionIssue(ctx context.Context, logger log.Logger, bkt objstore.Bucket, backupBkt objstore.Bucket, repair bool, idMatcher func(ulid.ULID) bool, fetcher block.MetadataFetcher, deleteDelay time.Duration, metrics *verifierMetrics) error { if idMatcher != nil { return errors.Errorf("id matching is not supported by issue %s verifier", DuplicatedCompactionIssueID) } level.Info(logger).Log("msg", "started verifying issue", "with-repair", repair, "issue", DuplicatedCompactionIssueID) - overlaps, err := fetchOverlaps(ctx, logger, bkt, fetcher) + overlaps, err := fetchOverlaps(ctx, fetcher) if err != nil { return errors.Wrap(err, DuplicatedCompactionIssueID) } diff --git a/pkg/verifier/index_issue.go b/pkg/verifier/index_issue.go index 0982ec8ba8..a6a5035ebe 100644 --- a/pkg/verifier/index_issue.go +++ b/pkg/verifier/index_issue.go @@ -29,7 +29,7 @@ const IndexIssueID = "index_issue" // If the replacement was created successfully it is uploaded to the bucket and the input // block is deleted. // NOTE: This also verifies all indexes against chunks mismatches and duplicates. -func IndexIssue(ctx context.Context, logger log.Logger, bkt objstore.Bucket, backupBkt objstore.Bucket, repair bool, idMatcher func(ulid.ULID) bool, fetcher *block.MetaFetcher, deleteDelay time.Duration, metrics *verifierMetrics) error { +func IndexIssue(ctx context.Context, logger log.Logger, bkt objstore.Bucket, backupBkt objstore.Bucket, repair bool, idMatcher func(ulid.ULID) bool, fetcher block.MetadataFetcher, deleteDelay time.Duration, metrics *verifierMetrics) error { level.Info(logger).Log("msg", "started verifying issue", "with-repair", repair, "issue", IndexIssueID) metas, _, err := fetcher.Fetch(ctx) diff --git a/pkg/verifier/overlapped_blocks.go b/pkg/verifier/overlapped_blocks.go index 3fcce7a151..ac76734878 100644 --- a/pkg/verifier/overlapped_blocks.go +++ b/pkg/verifier/overlapped_blocks.go @@ -22,14 +22,14 @@ const OverlappedBlocksIssueID = "overlapped_blocks" // OverlappedBlocksIssue checks bucket for blocks with overlapped time ranges. // No repair is available for this issue. -func OverlappedBlocksIssue(ctx context.Context, logger log.Logger, bkt objstore.Bucket, _ objstore.Bucket, repair bool, idMatcher func(ulid.ULID) bool, fetcher *block.MetaFetcher, deleteDelay time.Duration, metrics *verifierMetrics) error { +func OverlappedBlocksIssue(ctx context.Context, logger log.Logger, _ objstore.Bucket, _ objstore.Bucket, repair bool, idMatcher func(ulid.ULID) bool, fetcher block.MetadataFetcher, _ time.Duration, _ *verifierMetrics) error { if idMatcher != nil { return errors.Errorf("id matching is not supported by issue %s verifier", OverlappedBlocksIssueID) } level.Info(logger).Log("msg", "started verifying issue", "with-repair", repair, "issue", OverlappedBlocksIssueID) - overlaps, err := fetchOverlaps(ctx, logger, bkt, fetcher) + overlaps, err := fetchOverlaps(ctx, fetcher) if err != nil { return errors.Wrap(err, OverlappedBlocksIssueID) } @@ -49,7 +49,7 @@ func OverlappedBlocksIssue(ctx context.Context, logger log.Logger, bkt objstore. return nil } -func fetchOverlaps(ctx context.Context, logger log.Logger, bkt objstore.Bucket, fetcher *block.MetaFetcher) (map[string]tsdb.Overlaps, error) { +func fetchOverlaps(ctx context.Context, fetcher block.MetadataFetcher) (map[string]tsdb.Overlaps, error) { metas, _, err := fetcher.Fetch(ctx) if err != nil { return nil, err diff --git a/pkg/verifier/verify.go b/pkg/verifier/verify.go index 77cd646b1b..6149a6450b 100644 --- a/pkg/verifier/verify.go +++ b/pkg/verifier/verify.go @@ -35,7 +35,7 @@ func newVerifierMetrics(reg prometheus.Registerer) *verifierMetrics { // Issue is an function that does verification and repair only if repair arg is true. // It should log affected blocks using warn level logs. It should be safe for issue to run on healthy bucket. -type Issue func(ctx context.Context, logger log.Logger, bkt objstore.Bucket, backupBkt objstore.Bucket, repair bool, idMatcher func(ulid.ULID) bool, fetcher *block.MetaFetcher, deleteDelay time.Duration, metrics *verifierMetrics) error +type Issue func(ctx context.Context, logger log.Logger, bkt objstore.Bucket, backupBkt objstore.Bucket, repair bool, idMatcher func(ulid.ULID) bool, fetcher block.MetadataFetcher, deleteDelay time.Duration, metrics *verifierMetrics) error // Verifier runs given issues to verify if bucket is healthy. type Verifier struct { @@ -44,13 +44,13 @@ type Verifier struct { backupBkt objstore.Bucket issues []Issue repair bool - fetcher *block.MetaFetcher + fetcher block.MetadataFetcher deleteDelay time.Duration metrics *verifierMetrics } // New returns verifier that only logs affected blocks. -func New(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher *block.MetaFetcher, deleteDelay time.Duration, issues []Issue) *Verifier { +func New(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher block.MetadataFetcher, deleteDelay time.Duration, issues []Issue) *Verifier { return &Verifier{ logger: logger, bkt: bkt, @@ -63,7 +63,7 @@ func New(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetc } // NewWithRepair returns verifier that logs affected blocks and attempts to repair them. -func NewWithRepair(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, backupBkt objstore.Bucket, fetcher *block.MetaFetcher, deleteDelay time.Duration, issues []Issue) *Verifier { +func NewWithRepair(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, backupBkt objstore.Bucket, fetcher block.MetadataFetcher, deleteDelay time.Duration, issues []Issue) *Verifier { return &Verifier{ logger: logger, bkt: bkt,