diff --git a/pkg/compactor/block_upload.go b/pkg/compactor/block_upload.go index cf6c49ef16c..d5a6e351bc1 100644 --- a/pkg/compactor/block_upload.go +++ b/pkg/compactor/block_upload.go @@ -577,7 +577,7 @@ func (c *MultitenantCompactor) validateBlock(ctx context.Context, logger log.Log // validate block checkChunks := c.cfgProvider.CompactorBlockUploadVerifyChunks(userID) - err = block.VerifyBlock(c.logger, blockDir, blockMetadata.MinTime, blockMetadata.MaxTime, checkChunks) + err = block.VerifyBlock(ctx, c.logger, blockDir, blockMetadata.MinTime, blockMetadata.MaxTime, checkChunks) if err != nil { return errors.Wrap(err, "error validating block") } diff --git a/pkg/compactor/bucket_compactor.go b/pkg/compactor/bucket_compactor.go index 7ac41bea8f8..612a54553c8 100644 --- a/pkg/compactor/bucket_compactor.go +++ b/pkg/compactor/bucket_compactor.go @@ -296,7 +296,7 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul } // Ensure all source blocks are valid. - stats, err := block.GatherBlockHealthStats(jobLogger, bdir, meta.MinTime, meta.MaxTime, false) + stats, err := block.GatherBlockHealthStats(ctx, jobLogger, bdir, meta.MinTime, meta.MaxTime, false) if err != nil { return errors.Wrapf(err, "gather index issues for block %s", bdir) } @@ -397,7 +397,7 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul } // Ensure the compacted block is valid. - if err := block.VerifyBlock(jobLogger, bdir, newMeta.MinTime, newMeta.MaxTime, false); err != nil { + if err := block.VerifyBlock(ctx, jobLogger, bdir, newMeta.MinTime, newMeta.MaxTime, false); err != nil { return errors.Wrapf(err, "invalid result block %s", bdir) } @@ -571,13 +571,13 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, return errors.Wrapf(err, "read meta from %s", bdir) } - resid, err := block.Repair(logger, tmpdir, ie.id, block.CompactorRepairSource, block.IgnoreIssue347OutsideChunk) + resid, err := block.Repair(ctx, logger, tmpdir, ie.id, block.CompactorRepairSource, block.IgnoreIssue347OutsideChunk) if err != nil { return errors.Wrapf(err, "repair failed for block %s", ie.id) } // Verify repaired id before uploading it. - if err := block.VerifyBlock(logger, filepath.Join(tmpdir, resid.String()), meta.MinTime, meta.MaxTime, false); err != nil { + if err := block.VerifyBlock(ctx, logger, filepath.Join(tmpdir, resid.String()), meta.MinTime, meta.MaxTime, false); err != nil { return errors.Wrapf(err, "repaired block is invalid %s", resid) } diff --git a/pkg/frontend/querymiddleware/querysharding_test_utils_test.go b/pkg/frontend/querymiddleware/querysharding_test_utils_test.go index bc9163f5e94..93e9dda5de4 100644 --- a/pkg/frontend/querymiddleware/querysharding_test_utils_test.go +++ b/pkg/frontend/querymiddleware/querysharding_test_utils_test.go @@ -273,7 +273,7 @@ func TestNewMockShardedQueryable(t *testing.T) { }.LabelValue(), }) - require.Nil(t, set.Err()) + require.NoError(t, set.Err()) var iter chunkenc.Iterator for set.Next() { diff --git a/pkg/frontend/querymiddleware/sharded_queryable.go b/pkg/frontend/querymiddleware/sharded_queryable.go index eb3a0e2c1f8..023569fe89f 100644 --- a/pkg/frontend/querymiddleware/sharded_queryable.go +++ b/pkg/frontend/querymiddleware/sharded_queryable.go @@ -100,16 +100,16 @@ func (q *shardedQuerier) Select(_ bool, hints *storage.SelectHints, matchers ... return storage.ErrSeriesSet(err) } - return q.handleEmbeddedQueries(queries, hints) + return q.handleEmbeddedQueries(q.ctx, queries, hints) } // handleEmbeddedQueries concurrently executes the provided queries through the downstream handler. // The returned storage.SeriesSet contains sorted series. -func (q *shardedQuerier) handleEmbeddedQueries(queries []string, hints *storage.SelectHints) storage.SeriesSet { +func (q *shardedQuerier) handleEmbeddedQueries(ctx context.Context, queries []string, hints *storage.SelectHints) storage.SeriesSet { streams := make([][]SampleStream, len(queries)) // Concurrently run each query. It breaks and cancels each worker context on first error. - err := concurrency.ForEachJob(q.ctx, len(queries), len(queries), func(ctx context.Context, idx int) error { + err := concurrency.ForEachJob(ctx, len(queries), len(queries), func(ctx context.Context, idx int) error { resp, err := q.handler.Do(ctx, q.req.WithQuery(queries[idx])) if err != nil { return err diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 8171857d8c9..bd73ea403dc 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -830,7 +830,7 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, pushReq *push.Request) ( return &mimirpb.WriteResponse{}, nil } - db, err := i.getOrCreateTSDB(userID, false) + db, err := i.getOrCreateTSDB(ctx, userID, false) if err != nil { return nil, wrapOrAnnotateWithUser(err, userID) } @@ -1552,12 +1552,14 @@ func (i *Ingester) LabelValuesCardinality(req *client.LabelValuesCardinalityRequ return err } - var postingsForMatchersFn func(ix tsdb.IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) + var postingsForMatchersFn func(context.Context, tsdb.IndexPostingsReader, ...*labels.Matcher) (index.Postings, error) switch req.GetCountMethod() { case client.IN_MEMORY: - postingsForMatchersFn = tsdb.PostingsForMatchers + postingsForMatchersFn = func(_ context.Context, ix tsdb.IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) { + return tsdb.PostingsForMatchers(ix, ms...) + } case client.ACTIVE: - postingsForMatchersFn = func(ix tsdb.IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) { + postingsForMatchersFn = func(_ context.Context, ix tsdb.IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) { postings, err := tsdb.PostingsForMatchers(ix, ms...) if err != nil { return nil, err @@ -1876,7 +1878,7 @@ func (i *Ingester) executeStreamingQuery(ctx context.Context, db *userTSDB, from // The querier must remain open until we've finished streaming chunks. defer q.Close() - allSeries, numSeries, err := i.sendStreamingQuerySeries(q, from, through, matchers, shard, stream) + allSeries, numSeries, err := i.sendStreamingQuerySeries(ctx, q, from, through, matchers, shard, stream) if err != nil { return 0, 0, err } @@ -1926,7 +1928,7 @@ func putChunkSeriesNode(sn *chunkSeriesNode) { chunkSeriesNodePool.Put(sn) } -func (i *Ingester) sendStreamingQuerySeries(q storage.ChunkQuerier, from, through int64, matchers []*labels.Matcher, shard *sharding.ShardSelector, stream client.Ingester_QueryStreamServer) (*chunkSeriesNode, int, error) { +func (i *Ingester) sendStreamingQuerySeries(_ context.Context, q storage.ChunkQuerier, from, through int64, matchers []*labels.Matcher, shard *sharding.ShardSelector, stream client.Ingester_QueryStreamServer) (*chunkSeriesNode, int, error) { // Disable chunks trimming, so that we don't have to rewrite chunks which have samples outside // the requested from/through range. PromQL engine can handle it. hints := initSelectHints(from, through) @@ -2102,7 +2104,7 @@ func (i *Ingester) getTSDBUsers() []string { return ids } -func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*userTSDB, error) { +func (i *Ingester) getOrCreateTSDB(ctx context.Context, userID string, force bool) (*userTSDB, error) { db := i.getTSDB(userID) if db != nil { return db, nil @@ -2137,7 +2139,7 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*userTSDB, error) } // Create the database and a shipper for a user - db, err := i.createTSDB(userID, 0) + db, err := i.createTSDB(ctx, userID, 0) if err != nil { return nil, err } @@ -2150,7 +2152,7 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*userTSDB, error) } // createTSDB creates a TSDB for a given userID, and returns the created db. -func (i *Ingester) createTSDB(userID string, walReplayConcurrency int) (*userTSDB, error) { +func (i *Ingester) createTSDB(_ context.Context, userID string, walReplayConcurrency int) (*userTSDB, error) { tsdbPromReg := prometheus.NewRegistry() udir := i.cfg.BlocksStorageConfig.TSDB.BlocksDir(userID) userLogger := util_log.WithUserID(userID, i.logger) @@ -2320,7 +2322,7 @@ func (i *Ingester) openExistingTSDB(ctx context.Context) error { for n := 0; n < tsdbOpenConcurrency; n++ { group.Go(func() error { for userID := range queue { - db, err := i.createTSDB(userID, tsdbWALReplayConcurrency) + db, err := i.createTSDB(ctx, userID, tsdbWALReplayConcurrency) if err != nil { level.Error(i.logger).Log("msg", "unable to open TSDB", "err", err, "user", userID) return errors.Wrapf(err, "unable to open TSDB for user %s", userID) @@ -2660,18 +2662,18 @@ func (i *Ingester) compactBlocks(ctx context.Context, force bool, forcedCompacti switch { case force: reason = "forced" - err = userDB.compactHead(i.cfg.BlocksStorageConfig.TSDB.BlockRanges[0].Milliseconds(), forcedCompactionMaxTime) + err = userDB.compactHead(ctx, i.cfg.BlocksStorageConfig.TSDB.BlockRanges[0].Milliseconds(), forcedCompactionMaxTime) case i.compactionIdleTimeout > 0 && userDB.isIdle(time.Now(), i.compactionIdleTimeout): reason = "idle" level.Info(i.logger).Log("msg", "TSDB is idle, forcing compaction", "user", userID) // Always pass math.MaxInt64 as forcedCompactionMaxTime because we want to compact the whole TSDB head. - err = userDB.compactHead(i.cfg.BlocksStorageConfig.TSDB.BlockRanges[0].Milliseconds(), math.MaxInt64) + err = userDB.compactHead(ctx, i.cfg.BlocksStorageConfig.TSDB.BlockRanges[0].Milliseconds(), math.MaxInt64) default: reason = "regular" - err = userDB.Compact() + err = userDB.Compact(ctx) } if err != nil { diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 55c11c1fe1d..32dc23fb259 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -2612,6 +2612,8 @@ func TestIngester_Push_ShouldNotCreateTSDBIfNotInActiveState(t *testing.T) { } func TestIngester_getOrCreateTSDB_ShouldNotAllowToCreateTSDBIfIngesterStateIsNotActive(t *testing.T) { + ctx := context.Background() + tests := map[string]struct { state ring.InstanceState expectedErr error @@ -2660,7 +2662,7 @@ func TestIngester_getOrCreateTSDB_ShouldNotAllowToCreateTSDBIfIngesterStateIsNot } } - db, err := i.getOrCreateTSDB("test", false) + db, err := i.getOrCreateTSDB(ctx, "test", false) assert.Equal(t, testData.expectedErr, err) if testData.expectedErr != nil { @@ -4219,6 +4221,8 @@ func getWALReplayConcurrencyFromTSDBHeadOptions(userTSDB *userTSDB) int { } func TestIngester_shipBlocks(t *testing.T) { + ctx := context.Background() + cfg := defaultIngesterTestConfig(t) cfg.BlocksStorageConfig.TSDB.ShipConcurrency = 2 @@ -4236,7 +4240,7 @@ func TestIngester_shipBlocks(t *testing.T) { // Create the TSDB for 3 users and then replace the shipper with the mocked one mocks := []*uploaderMock{} for _, userID := range []string{"user-1", "user-2", "user-3"} { - userDB, err := i.getOrCreateTSDB(userID, false) + userDB, err := i.getOrCreateTSDB(ctx, userID, false) require.NoError(t, err) require.NotNil(t, userDB) @@ -4407,7 +4411,7 @@ func TestIngester_closingAndOpeningTsdbConcurrently(t *testing.T) { return i.lifecycler.HealthyInstancesCount() }) - _, err = i.getOrCreateTSDB(userID, false) + _, err = i.getOrCreateTSDB(ctx, userID, false) require.NoError(t, err) iterations := 5000 @@ -4420,7 +4424,7 @@ func TestIngester_closingAndOpeningTsdbConcurrently(t *testing.T) { case <-quit: return default: - _, err = i.getOrCreateTSDB(userID, false) + _, err = i.getOrCreateTSDB(ctx, userID, false) if err != nil { chanErr <- err } @@ -4460,7 +4464,7 @@ func TestIngester_idleCloseEmptyTSDB(t *testing.T) { return i.lifecycler.HealthyInstancesCount() }) - db, err := i.getOrCreateTSDB(userID, true) + db, err := i.getOrCreateTSDB(ctx, userID, true) require.NoError(t, err) require.NotNil(t, db) @@ -4476,7 +4480,7 @@ func TestIngester_idleCloseEmptyTSDB(t *testing.T) { require.Nil(t, db) // And we can recreate it again, if needed. - db, err = i.getOrCreateTSDB(userID, true) + db, err = i.getOrCreateTSDB(ctx, userID, true) require.NoError(t, err) require.NotNil(t, db) } @@ -4915,7 +4919,7 @@ func TestIngester_ForFlush(t *testing.T) { func mockUserShipper(t *testing.T, i *Ingester) *uploaderMock { m := &uploaderMock{} - userDB, err := i.getOrCreateTSDB(userID, false) + userDB, err := i.getOrCreateTSDB(context.Background(), userID, false) require.NoError(t, err) require.NotNil(t, userDB) @@ -5411,7 +5415,7 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) { db := i.getTSDB(userID) require.NotNil(t, db) - require.Nil(t, db.Compact()) + require.NoError(t, db.Compact(ctx)) oldBlocks := db.Blocks() require.Equal(t, 3, len(oldBlocks)) @@ -5435,7 +5439,7 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) { _, err := i.Push(ctx, req) require.NoError(t, err) } - require.Nil(t, db.Compact()) + require.NoError(t, db.Compact(ctx)) // Only the second block should be gone along with a new block. newBlocks := db.Blocks() @@ -5467,7 +5471,7 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) { _, err := i.Push(ctx, req) require.NoError(t, err) } - require.Nil(t, db.Compact()) + require.NoError(t, db.Compact(ctx)) // All blocks from the old blocks should be gone now. newBlocks2 := db.Blocks() @@ -5524,7 +5528,7 @@ func TestIngesterNotDeleteShippedBlocksUntilRetentionExpires(t *testing.T) { db := i.getTSDB(userID) require.NotNil(t, db) - require.Nil(t, db.Compact()) + require.NoError(t, db.Compact(ctx)) oldBlocks := db.Blocks() require.Equal(t, 3, len(oldBlocks)) @@ -5551,7 +5555,7 @@ func TestIngesterNotDeleteShippedBlocksUntilRetentionExpires(t *testing.T) { _, err := i.Push(ctx, req) require.NoError(t, err) } - require.Nil(t, db.Compact()) + require.NoError(t, db.Compact(ctx)) // Only the last two old blocks plus the one containing the newly added samples should remain. newBlocks := db.Blocks() @@ -5605,7 +5609,7 @@ func TestIngesterWithShippingDisabledDeletesBlocksOnlyAfterRetentionExpires(t *t db := i.getTSDB(userID) require.NotNil(t, db) - require.Nil(t, db.Compact()) + require.NoError(t, db.Compact(ctx)) oldBlocks := db.Blocks() require.Len(t, oldBlocks, 3) @@ -5617,7 +5621,7 @@ func TestIngesterWithShippingDisabledDeletesBlocksOnlyAfterRetentionExpires(t *t req, _, _, _ := mockWriteRequest(t, labels.FromStrings(labels.MetricName, "test"), 0, 5*chunkRangeMilliSec) _, err = i.Push(ctx, req) require.NoError(t, err) - require.Nil(t, db.Compact()) + require.NoError(t, db.Compact(ctx)) require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` # HELP cortex_ingester_tsdb_compactions_total Total number of TSDB compactions that were executed. @@ -8938,7 +8942,7 @@ func TestIngester_lastUpdatedTimeIsNotInTheFuture(t *testing.T) { return i.lifecycler.HealthyInstancesCount() }) - db, err := i.getOrCreateTSDB(userID, true) + db, err := i.getOrCreateTSDB(ctx, userID, true) require.NoError(t, err) require.NotNil(t, db) require.InDelta(t, time.Now().Unix(), db.getLastUpdate().Unix(), 5) // within 5 seconds of "now" @@ -8951,7 +8955,7 @@ func TestIngester_lastUpdatedTimeIsNotInTheFuture(t *testing.T) { i.closeAllTSDB() // and open it again (it must exist) - db, err = i.getOrCreateTSDB(userID, false) + db, err = i.getOrCreateTSDB(ctx, userID, false) require.NoError(t, err) require.NotNil(t, db) diff --git a/pkg/ingester/label_names_and_values.go b/pkg/ingester/label_names_and_values.go index 0225486cfda..6cd5fc87e50 100644 --- a/pkg/ingester/label_names_and_values.go +++ b/pkg/ingester/label_names_and_values.go @@ -105,7 +105,7 @@ func labelValuesCardinality( lbNames []string, matchers []*labels.Matcher, idxReader tsdb.IndexReader, - postingsForMatchersFn func(tsdb.IndexPostingsReader, ...*labels.Matcher) (index.Postings, error), + postingsForMatchersFn func(context.Context, tsdb.IndexPostingsReader, ...*labels.Matcher) (index.Postings, error), msgSizeThreshold int, srv client.Ingester_LabelValuesCardinalityServer, ) error { @@ -172,7 +172,7 @@ func computeLabelValuesSeriesCount( lblValues []string, matchers []*labels.Matcher, idxReader tsdb.IndexReader, - postingsForMatchersFn func(tsdb.IndexPostingsReader, ...*labels.Matcher) (index.Postings, error), + postingsForMatchersFn func(context.Context, tsdb.IndexPostingsReader, ...*labels.Matcher) (index.Postings, error), ) <-chan labelValueCountResult { maxConcurrency := 16 if len(lblValues) < maxConcurrency { @@ -216,7 +216,7 @@ func countLabelValueSeries( lblValue string, matchers []*labels.Matcher, idxReader tsdb.IndexReader, - postingsForMatchersFn func(tsdb.IndexPostingsReader, ...*labels.Matcher) (index.Postings, error), + postingsForMatchersFn func(context.Context, tsdb.IndexPostingsReader, ...*labels.Matcher) (index.Postings, error), ) (uint64, error) { if err := ctx.Err(); err != nil { return 0, err @@ -229,7 +229,7 @@ func countLabelValueSeries( lblValMatchers[len(lblValMatchers)-1] = labels.MustNewMatcher(labels.MatchEqual, lblName, lblValue) - p, err := postingsForMatchersFn(idxReader, lblValMatchers...) + p, err := postingsForMatchersFn(ctx, idxReader, lblValMatchers...) if err != nil { return 0, err } diff --git a/pkg/ingester/label_names_and_values_test.go b/pkg/ingester/label_names_and_values_test.go index 4cd5738bb10..1a28aa0c997 100644 --- a/pkg/ingester/label_names_and_values_test.go +++ b/pkg/ingester/label_names_and_values_test.go @@ -290,7 +290,7 @@ func TestCountLabelValueSeries_ContextCancellation(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() - _, err := countLabelValueSeries(ctx, "lblName", "lblVal", nil, nil, func(reader tsdb.IndexPostingsReader, matcher ...*labels.Matcher) (index.Postings, error) { + _, err := countLabelValueSeries(ctx, "lblName", "lblVal", nil, nil, func(context.Context, tsdb.IndexPostingsReader, ...*labels.Matcher) (index.Postings, error) { return infinitePostings{}, nil }) diff --git a/pkg/ingester/user_tsdb.go b/pkg/ingester/user_tsdb.go index 9de4559d88d..6cab841a6be 100644 --- a/pkg/ingester/user_tsdb.go +++ b/pkg/ingester/user_tsdb.go @@ -156,7 +156,7 @@ func (u *userTSDB) Close() error { return u.db.Close() } -func (u *userTSDB) Compact() error { +func (u *userTSDB) Compact(_ context.Context) error { return u.db.Compact() } @@ -196,7 +196,7 @@ func (u *userTSDB) changeStateToForcedCompaction(from tsdbState, forcedCompactio // // The input forcedMaxTime allows to specify the maximum timestamp of samples compacted from the // in-order Head. You can pass math.MaxInt64 to compact the entire in-order Head. -func (u *userTSDB) compactHead(blockDuration, forcedCompactionMaxTime int64) error { +func (u *userTSDB) compactHead(_ context.Context, blockDuration, forcedCompactionMaxTime int64) error { if ok, s := u.changeStateToForcedCompaction(active, forcedCompactionMaxTime); !ok { return fmt.Errorf("TSDB head cannot be compacted because it is not in active state (possibly being closed or blocks shipping in progress): %s", s.String()) } diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 90f89888b10..3fdac129f42 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -342,7 +342,7 @@ type blocksStoreQuerier struct { // Select implements storage.Querier interface. // The bool passed is ignored because the series is always sorted. func (q *blocksStoreQuerier) Select(_ bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { - return q.selectSorted(sp, matchers...) + return q.selectSorted(q.ctx, sp, matchers...) } func (q *blocksStoreQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) { @@ -430,8 +430,8 @@ func (q *blocksStoreQuerier) Close() error { return nil } -func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { - spanLog, ctx := spanlogger.NewWithLogger(q.ctx, q.logger, "blocksStoreQuerier.selectSorted") +func (q *blocksStoreQuerier) selectSorted(ctx context.Context, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { + spanLog, ctx := spanlogger.NewWithLogger(ctx, q.logger, "blocksStoreQuerier.selectSorted") defer spanLog.Span.Finish() minT, maxT := sp.Start, sp.End diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index 5fc083e7f17..1a320b5c130 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -1613,6 +1613,7 @@ func TestBlocksStoreQuerier_Labels(t *testing.T) { func TestBlocksStoreQuerier_SelectSortedShouldHonorQueryStoreAfter(t *testing.T) { now := time.Now() + ctx := context.Background() tests := map[string]struct { queryStoreAfter time.Duration @@ -1657,7 +1658,7 @@ func TestBlocksStoreQuerier_SelectSortedShouldHonorQueryStoreAfter(t *testing.T) finder.On("GetBlocks", mock.Anything, "user-1", mock.Anything, mock.Anything).Return(bucketindex.Blocks(nil), map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), error(nil)) q := &blocksStoreQuerier{ - ctx: context.Background(), + ctx: ctx, minT: testData.queryMinT, maxT: testData.queryMaxT, userID: "user-1", @@ -1675,7 +1676,7 @@ func TestBlocksStoreQuerier_SelectSortedShouldHonorQueryStoreAfter(t *testing.T) End: testData.queryMaxT, } - set := q.selectSorted(sp) + set := q.selectSorted(ctx, sp) require.NoError(t, set.Err()) if testData.expectedMinT == 0 && testData.expectedMaxT == 0 { @@ -1731,8 +1732,9 @@ func TestBlocksStoreQuerier_MaxLabelsQueryRange(t *testing.T) { finder := &blocksFinderMock{} finder.On("GetBlocks", mock.Anything, "user-1", mock.Anything, mock.Anything).Return(bucketindex.Blocks(nil), map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), error(nil)) + ctx := user.InjectOrgID(context.Background(), "user-1") q := &blocksStoreQuerier{ - ctx: user.InjectOrgID(context.Background(), "user-1"), + ctx: ctx, minT: testData.queryMinT, maxT: testData.queryMaxT, userID: "user-1", diff --git a/pkg/querier/dummy.go b/pkg/querier/dummy.go index 26bc20b2bdb..8123fbd70cf 100644 --- a/pkg/querier/dummy.go +++ b/pkg/querier/dummy.go @@ -12,16 +12,16 @@ import ( "github.com/prometheus/prometheus/scrape" ) -// DummyTargetRetriever implements github.com/prometheus/prometheus/web/api/v1.targetRetriever. +// DummyTargetRetriever implements github.com/prometheus/prometheus/web/api/v1.TargetRetriever. // and v1.ScrapePoolsRetriever type DummyTargetRetriever struct{} -// TargetsActive implements targetRetriever. +// TargetsActive implements TargetRetriever. func (DummyTargetRetriever) TargetsActive() map[string][]*scrape.Target { return map[string][]*scrape.Target{} } -// TargetsDropped implements targetRetriever. +// TargetsDropped implements TargetRetriever. func (DummyTargetRetriever) TargetsDropped() map[string][]*scrape.Target { return map[string][]*scrape.Target{} } diff --git a/pkg/querier/tenantfederation/merge_queryable.go b/pkg/querier/tenantfederation/merge_queryable.go index b19c42619b0..929a50edba5 100644 --- a/pkg/querier/tenantfederation/merge_queryable.go +++ b/pkg/querier/tenantfederation/merge_queryable.go @@ -47,8 +47,8 @@ func tenantQuerierCallback(queryable storage.Queryable) MergeQuerierCallback { return nil, nil, err } - var queriers = make([]storage.Querier, len(tenantIDs)) - for pos, tenantID := range tenantIDs { + var queriers = make([]storage.Querier, 0, len(tenantIDs)) + for _, tenantID := range tenantIDs { q, err := queryable.Querier( user.InjectOrgID(ctx, tenantID), mint, @@ -57,7 +57,7 @@ func tenantQuerierCallback(queryable storage.Queryable) MergeQuerierCallback { if err != nil { return nil, nil, err } - queriers[pos] = q + queriers = append(queriers, q) } return tenantIDs, queriers, nil @@ -166,7 +166,7 @@ func (m *mergeQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([] name = m.idLabelName } - return m.mergeDistinctStringSliceWithTenants(func(ctx context.Context, q storage.Querier) ([]string, storage.Warnings, error) { + return m.mergeDistinctStringSliceWithTenants(m.ctx, func(ctx context.Context, q storage.Querier) ([]string, storage.Warnings, error) { return q.LabelValues(name, filteredMatchers...) }, matchedTenants) } @@ -180,7 +180,7 @@ func (m *mergeQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, storag matchedTenants, filteredMatchers := filterValuesByMatchers(m.idLabelName, m.ids, matchers...) - labelNames, warnings, err := m.mergeDistinctStringSliceWithTenants(func(ctx context.Context, q storage.Querier) ([]string, storage.Warnings, error) { + labelNames, warnings, err := m.mergeDistinctStringSliceWithTenants(m.ctx, func(ctx context.Context, q storage.Querier) ([]string, storage.Warnings, error) { return q.LabelNames(filteredMatchers...) }, matchedTenants) if err != nil { @@ -225,7 +225,7 @@ type stringSliceFuncJob struct { // provided, all queriers are used. It removes duplicates and sorts the result. // It doesn't require the output of the stringSliceFunc to be sorted, as results // of LabelValues are not sorted. -func (m *mergeQuerier) mergeDistinctStringSliceWithTenants(f stringSliceFunc, tenants map[string]struct{}) ([]string, storage.Warnings, error) { +func (m *mergeQuerier) mergeDistinctStringSliceWithTenants(ctx context.Context, f stringSliceFunc, tenants map[string]struct{}) ([]string, storage.Warnings, error) { jobs := make([]*stringSliceFuncJob, 0, len(m.ids)) for pos, id := range m.ids { if tenants != nil { @@ -250,7 +250,7 @@ func (m *mergeQuerier) mergeDistinctStringSliceWithTenants(f stringSliceFunc, te return nil } - err := concurrency.ForEachJob(m.ctx, len(jobs), m.maxConcurrency, run) + err := concurrency.ForEachJob(ctx, len(jobs), m.maxConcurrency, run) if err != nil { return nil, nil, err } diff --git a/pkg/storage/tsdb/block/index.go b/pkg/storage/tsdb/block/index.go index 768bbc5b4b2..e8a80a17ed2 100644 --- a/pkg/storage/tsdb/block/index.go +++ b/pkg/storage/tsdb/block/index.go @@ -32,8 +32,8 @@ import ( ) // VerifyBlock does a full run over a block index and chunk data and verifies that they fulfill the order invariants. -func VerifyBlock(logger log.Logger, blockDir string, minTime, maxTime int64, checkChunks bool) error { - stats, err := GatherBlockHealthStats(logger, blockDir, minTime, maxTime, checkChunks) +func VerifyBlock(ctx context.Context, logger log.Logger, blockDir string, minTime, maxTime int64, checkChunks bool) error { + stats, err := GatherBlockHealthStats(ctx, logger, blockDir, minTime, maxTime, checkChunks) if err != nil { return err } @@ -212,7 +212,7 @@ func (n *minMaxSumInt64) Avg() int64 { // helps to assess index and optionally chunk health. // It considers https://github.com/prometheus/tsdb/issues/347 as something that Thanos can handle. // See HealthStats.Issue347OutsideChunks for details. -func GatherBlockHealthStats(logger log.Logger, blockDir string, minTime, maxTime int64, checkChunkData bool) (stats HealthStats, err error) { +func GatherBlockHealthStats(_ context.Context, logger log.Logger, blockDir string, minTime, maxTime int64, checkChunkData bool) (stats HealthStats, err error) { indexFn := filepath.Join(blockDir, IndexFilename) chunkDir := filepath.Join(blockDir, ChunksDirname) // index reader @@ -396,14 +396,14 @@ func GatherBlockHealthStats(logger log.Logger, blockDir string, minTime, maxTime type ignoreFnType func(mint, maxt int64, prev *chunks.Meta, curr *chunks.Meta) (bool, error) -// Repair open the block with given id in dir and creates a new one with fixed data. +// Repair opens the block with given id in dir and creates a new one with fixed data. // It: // - removes out of order duplicates // - all "complete" outsiders (they will not accessed anyway) // - removes all near "complete" outside chunks introduced by https://github.com/prometheus/tsdb/issues/347. // Fixable inconsistencies are resolved in the new block. // TODO(bplotka): https://github.com/thanos-io/thanos/issues/378. -func Repair(logger log.Logger, dir string, id ulid.ULID, source SourceType, ignoreChkFns ...ignoreFnType) (resid ulid.ULID, err error) { +func Repair(ctx context.Context, logger log.Logger, dir string, id ulid.ULID, source SourceType, ignoreChkFns ...ignoreFnType) (resid ulid.ULID, err error) { if len(ignoreChkFns) == 0 { return resid, errors.New("no ignore chunk function specified") } @@ -459,7 +459,7 @@ func Repair(logger log.Logger, dir string, id ulid.ULID, source SourceType, igno resmeta.Stats = tsdb.BlockStats{} // Reset stats. resmeta.Thanos.Source = source // Update source. - if err := rewrite(logger, indexr, chunkr, indexw, chunkw, &resmeta, ignoreChkFns); err != nil { + if err := rewrite(ctx, logger, indexr, chunkr, indexw, chunkw, &resmeta, ignoreChkFns); err != nil { return resid, errors.Wrap(err, "rewrite block") } resmeta.Thanos.SegmentFiles = GetSegmentFiles(resdir) @@ -635,6 +635,7 @@ type indexReader interface { // rewrite writes all data from the readers back into the writers while cleaning // up mis-ordered and duplicated chunks. func rewrite( + _ context.Context, logger log.Logger, indexr indexReader, chunkr tsdb.ChunkReader, indexw tsdb.IndexWriter, chunkw tsdb.ChunkWriter, diff --git a/pkg/storage/tsdb/block/index_test.go b/pkg/storage/tsdb/block/index_test.go index 90e6b1ad522..dc3c5a4d2dd 100644 --- a/pkg/storage/tsdb/block/index_test.go +++ b/pkg/storage/tsdb/block/index_test.go @@ -63,7 +63,7 @@ func TestRewrite(t *testing.T) { totalChunks := 0 ignoredChunks := 0 - require.NoError(t, rewrite(log.NewNopLogger(), ir, cr, iw, cw, m, []ignoreFnType{func(mint, maxt int64, prev *chunks.Meta, curr *chunks.Meta) (bool, error) { + require.NoError(t, rewrite(ctx, log.NewNopLogger(), ir, cr, iw, cw, m, []ignoreFnType{func(mint, maxt int64, prev *chunks.Meta, curr *chunks.Meta) (bool, error) { totalChunks++ if curr.OverlapsClosedInterval(excludeTime, excludeTime) { // Ignores all chunks that overlap with the excludeTime. excludeTime was randomly selected inside the block. diff --git a/pkg/storegateway/bucket_index_reader.go b/pkg/storegateway/bucket_index_reader.go index 628519d284c..99a02fffc67 100644 --- a/pkg/storegateway/bucket_index_reader.go +++ b/pkg/storegateway/bucket_index_reader.go @@ -718,7 +718,7 @@ func (r *bucketIndexReader) Close() error { } // LookupLabelsSymbols populates label set strings from symbolized label set. -func (r *bucketIndexReader) LookupLabelsSymbols(symbolized []symbolizedLabel, builder *labels.ScratchBuilder) (labels.Labels, error) { +func (r *bucketIndexReader) LookupLabelsSymbols(_ context.Context, symbolized []symbolizedLabel, builder *labels.ScratchBuilder) (labels.Labels, error) { builder.Reset() for _, s := range symbolized { ln, err := r.dec.LookupSymbol(s.name) diff --git a/pkg/storegateway/indexheader/header_test.go b/pkg/storegateway/indexheader/header_test.go index 3a83e2e191e..a818f00a986 100644 --- a/pkg/storegateway/indexheader/header_test.go +++ b/pkg/storegateway/indexheader/header_test.go @@ -152,7 +152,6 @@ func compareIndexToHeader(t *testing.T, indexByteSlice index.ByteSlice, headerRe require.NoError(t, iter.Err()) _, err := headerReader.LookupSymbol(uint32(i)) require.Error(t, err) - } else { // For v1 symbols refs are actual offsets in the index. symbols, err := getSymbolTable(indexByteSlice) diff --git a/pkg/storegateway/series_refs.go b/pkg/storegateway/series_refs.go index 2175e434cc3..998b56609fe 100644 --- a/pkg/storegateway/series_refs.go +++ b/pkg/storegateway/series_refs.go @@ -955,7 +955,7 @@ func (s *loadingSeriesChunkRefsSetIterator) Next() bool { } defer symbolizedSet.release() // We only retain the slices of chunk ranges from this set. These are still not pooled, and it's ok to retain them. - nextSet, err := s.stringifiedSet(symbolizedSet) + nextSet, err := s.stringifiedSet(context.TODO(), symbolizedSet) if err != nil { s.err = err return false @@ -1019,14 +1019,14 @@ func (s *loadingSeriesChunkRefsSetIterator) symbolizedSet(ctx context.Context, p return symbolizedSet, nil } -func (s *loadingSeriesChunkRefsSetIterator) stringifiedSet(symbolizedSet symbolizedSeriesChunkRefsSet) (seriesChunkRefsSet, error) { +func (s *loadingSeriesChunkRefsSetIterator) stringifiedSet(ctx context.Context, symbolizedSet symbolizedSeriesChunkRefsSet) (seriesChunkRefsSet, error) { if len(symbolizedSet.series) > 256 { // This approach comes with some overhead in data structures. // It starts making more sense with more label values only. return s.singlePassStringify(symbolizedSet) } - return s.multiLookupStringify(symbolizedSet) + return s.multiLookupStringify(ctx, symbolizedSet) } // clampLastChunkLength checks the length of the last chunk in the last range of the last series. @@ -1266,13 +1266,13 @@ func (s *loadingSeriesChunkRefsSetIterator) singlePassStringify(symbolizedSet sy return set, nil } -func (s *loadingSeriesChunkRefsSetIterator) multiLookupStringify(symbolizedSet symbolizedSeriesChunkRefsSet) (seriesChunkRefsSet, error) { +func (s *loadingSeriesChunkRefsSetIterator) multiLookupStringify(ctx context.Context, symbolizedSet symbolizedSeriesChunkRefsSet) (seriesChunkRefsSet, error) { // This can be released by the caller because loadingSeriesChunkRefsSetIterator doesn't retain it after Next() is called again. set := newSeriesChunkRefsSet(len(symbolizedSet.series), true) labelsBuilder := labels.NewScratchBuilder(16) for _, series := range symbolizedSet.series { - lset, err := s.indexr.LookupLabelsSymbols(series.lset, &labelsBuilder) + lset, err := s.indexr.LookupLabelsSymbols(ctx, series.lset, &labelsBuilder) if err != nil { return seriesChunkRefsSet{}, err } diff --git a/tools/tsdb-index-health/main.go b/tools/tsdb-index-health/main.go index 3bbe09abf2f..ee6987e0e84 100644 --- a/tools/tsdb-index-health/main.go +++ b/tools/tsdb-index-health/main.go @@ -3,6 +3,7 @@ package main import ( + "context" "encoding/json" "flag" "fmt" @@ -39,6 +40,8 @@ func main() { return } + ctx := context.Background() + for _, b := range args { meta, err := block.ReadMetaFromDir(b) if err != nil { @@ -46,7 +49,7 @@ func main() { continue } - stats, err := block.GatherBlockHealthStats(logger, b, meta.MinTime, meta.MaxTime, *verifyChunks) + stats, err := block.GatherBlockHealthStats(ctx, logger, b, meta.MinTime, meta.MaxTime, *verifyChunks) if err != nil { fmt.Fprintln(os.Stderr, "Failed to gather health stats from block dir", b, "error:", err) continue diff --git a/tools/tsdb-index-health/main_test.go b/tools/tsdb-index-health/main_test.go index 22e45387ba7..c1e958e255c 100644 --- a/tools/tsdb-index-health/main_test.go +++ b/tools/tsdb-index-health/main_test.go @@ -3,6 +3,7 @@ package main import ( + "context" "path" "testing" @@ -52,7 +53,7 @@ func TestGatherIndexHealthStats(t *testing.T) { require.NoError(t, err) blockDir := path.Join(tmpDir, meta.ULID.String()) - stats, err := block.GatherBlockHealthStats(log.NewNopLogger(), blockDir, meta.MinTime, meta.MaxTime, true) + stats, err := block.GatherBlockHealthStats(context.Background(), log.NewNopLogger(), blockDir, meta.MinTime, meta.MaxTime, true) require.NoError(t, err) require.Equal(t, int64(2), stats.TotalSeries) diff --git a/tools/tsdb-index/main.go b/tools/tsdb-index/main.go index b47acf10ed9..73ffdace3d5 100644 --- a/tools/tsdb-index/main.go +++ b/tools/tsdb-index/main.go @@ -3,6 +3,7 @@ package main import ( + "context" "flag" "fmt" "os" @@ -58,12 +59,13 @@ func main() { level.Error(logger).Log(matchersStr...) } + ctx := context.Background() for _, blockDir := range args { - printBlockIndex(blockDir, *printChunks, matchers) + printBlockIndex(ctx, blockDir, *printChunks, matchers) } } -func printBlockIndex(blockDir string, printChunks bool, matchers []*labels.Matcher) { +func printBlockIndex(_ context.Context, blockDir string, printChunks bool, matchers []*labels.Matcher) { block, err := tsdb.OpenBlock(logger, blockDir, nil) if err != nil { level.Error(logger).Log("msg", "failed to open block", "dir", blockDir, "err", err) diff --git a/tools/tsdb-symbols/main.go b/tools/tsdb-symbols/main.go index c69a8947c34..b2d2f3174b8 100644 --- a/tools/tsdb-symbols/main.go +++ b/tools/tsdb-symbols/main.go @@ -3,6 +3,7 @@ package main import ( + "context" "encoding/binary" "flag" "fmt" @@ -53,8 +54,9 @@ func main() { } } + ctx := context.Background() for _, blockDir := range args { - err := analyseSymbols(blockDir, uniqueSymbols, uniqueSymbolsPerShard) + err := analyseSymbols(ctx, blockDir, uniqueSymbols, uniqueSymbolsPerShard) if err != nil { log.Println("failed to analyse symbols for", blockDir, "due to error:", err) } @@ -87,7 +89,7 @@ func main() { fmt.Println("Analysis complete in", time.Since(startTime)) } -func analyseSymbols(blockDir string, uniqueSymbols map[string]struct{}, uniqueSymbolsPerShard []map[string]struct{}) error { +func analyseSymbols(_ context.Context, blockDir string, uniqueSymbols map[string]struct{}, uniqueSymbolsPerShard []map[string]struct{}) error { block, err := tsdb.OpenBlock(gokitlog.NewLogfmtLogger(os.Stderr), blockDir, nil) if err != nil { return fmt.Errorf("failed to open block: %v", err)