Skip to content

Commit

Permalink
Prepare for context argument changes in Prometheus
Browse files Browse the repository at this point in the history
Prepare for context argument changes (some functions gain a context
argument, while others lose it) in Prometheus, by adding a context
argument to functions that will need it.

Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
  • Loading branch information
aknuds1 committed Sep 22, 2023
1 parent 2f436d9 commit 2a64b04
Show file tree
Hide file tree
Showing 22 changed files with 97 additions and 81 deletions.
2 changes: 1 addition & 1 deletion pkg/compactor/block_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ func (c *MultitenantCompactor) validateBlock(ctx context.Context, logger log.Log

// validate block
checkChunks := c.cfgProvider.CompactorBlockUploadVerifyChunks(userID)
err = block.VerifyBlock(c.logger, blockDir, blockMetadata.MinTime, blockMetadata.MaxTime, checkChunks)
err = block.VerifyBlock(ctx, c.logger, blockDir, blockMetadata.MinTime, blockMetadata.MaxTime, checkChunks)
if err != nil {
return errors.Wrap(err, "error validating block")
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/compactor/bucket_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul
}

// Ensure all source blocks are valid.
stats, err := block.GatherBlockHealthStats(jobLogger, bdir, meta.MinTime, meta.MaxTime, false)
stats, err := block.GatherBlockHealthStats(ctx, jobLogger, bdir, meta.MinTime, meta.MaxTime, false)
if err != nil {
return errors.Wrapf(err, "gather index issues for block %s", bdir)
}
Expand Down Expand Up @@ -397,7 +397,7 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul
}

// Ensure the compacted block is valid.
if err := block.VerifyBlock(jobLogger, bdir, newMeta.MinTime, newMeta.MaxTime, false); err != nil {
if err := block.VerifyBlock(ctx, jobLogger, bdir, newMeta.MinTime, newMeta.MaxTime, false); err != nil {
return errors.Wrapf(err, "invalid result block %s", bdir)
}

Expand Down Expand Up @@ -571,13 +571,13 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket,
return errors.Wrapf(err, "read meta from %s", bdir)
}

resid, err := block.Repair(logger, tmpdir, ie.id, block.CompactorRepairSource, block.IgnoreIssue347OutsideChunk)
resid, err := block.Repair(ctx, logger, tmpdir, ie.id, block.CompactorRepairSource, block.IgnoreIssue347OutsideChunk)
if err != nil {
return errors.Wrapf(err, "repair failed for block %s", ie.id)
}

// Verify repaired id before uploading it.
if err := block.VerifyBlock(logger, filepath.Join(tmpdir, resid.String()), meta.MinTime, meta.MaxTime, false); err != nil {
if err := block.VerifyBlock(ctx, logger, filepath.Join(tmpdir, resid.String()), meta.MinTime, meta.MaxTime, false); err != nil {
return errors.Wrapf(err, "repaired block is invalid %s", resid)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func TestNewMockShardedQueryable(t *testing.T) {
}.LabelValue(),
})

require.Nil(t, set.Err())
require.NoError(t, set.Err())

var iter chunkenc.Iterator
for set.Next() {
Expand Down
6 changes: 3 additions & 3 deletions pkg/frontend/querymiddleware/sharded_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,16 @@ func (q *shardedQuerier) Select(_ bool, hints *storage.SelectHints, matchers ...
return storage.ErrSeriesSet(err)
}

return q.handleEmbeddedQueries(queries, hints)
return q.handleEmbeddedQueries(q.ctx, queries, hints)
}

// handleEmbeddedQueries concurrently executes the provided queries through the downstream handler.
// The returned storage.SeriesSet contains sorted series.
func (q *shardedQuerier) handleEmbeddedQueries(queries []string, hints *storage.SelectHints) storage.SeriesSet {
func (q *shardedQuerier) handleEmbeddedQueries(ctx context.Context, queries []string, hints *storage.SelectHints) storage.SeriesSet {
streams := make([][]SampleStream, len(queries))

// Concurrently run each query. It breaks and cancels each worker context on first error.
err := concurrency.ForEachJob(q.ctx, len(queries), len(queries), func(ctx context.Context, idx int) error {
err := concurrency.ForEachJob(ctx, len(queries), len(queries), func(ctx context.Context, idx int) error {
resp, err := q.handler.Do(ctx, q.req.WithQuery(queries[idx]))
if err != nil {
return err
Expand Down
28 changes: 15 additions & 13 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, pushReq *push.Request) (
return &mimirpb.WriteResponse{}, nil
}

db, err := i.getOrCreateTSDB(userID, false)
db, err := i.getOrCreateTSDB(ctx, userID, false)
if err != nil {
return nil, wrapOrAnnotateWithUser(err, userID)
}
Expand Down Expand Up @@ -1552,12 +1552,14 @@ func (i *Ingester) LabelValuesCardinality(req *client.LabelValuesCardinalityRequ
return err
}

var postingsForMatchersFn func(ix tsdb.IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error)
var postingsForMatchersFn func(context.Context, tsdb.IndexPostingsReader, ...*labels.Matcher) (index.Postings, error)
switch req.GetCountMethod() {
case client.IN_MEMORY:
postingsForMatchersFn = tsdb.PostingsForMatchers
postingsForMatchersFn = func(_ context.Context, ix tsdb.IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
return tsdb.PostingsForMatchers(ix, ms...)
}
case client.ACTIVE:
postingsForMatchersFn = func(ix tsdb.IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
postingsForMatchersFn = func(_ context.Context, ix tsdb.IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
postings, err := tsdb.PostingsForMatchers(ix, ms...)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1876,7 +1878,7 @@ func (i *Ingester) executeStreamingQuery(ctx context.Context, db *userTSDB, from
// The querier must remain open until we've finished streaming chunks.
defer q.Close()

allSeries, numSeries, err := i.sendStreamingQuerySeries(q, from, through, matchers, shard, stream)
allSeries, numSeries, err := i.sendStreamingQuerySeries(ctx, q, from, through, matchers, shard, stream)
if err != nil {
return 0, 0, err
}
Expand Down Expand Up @@ -1926,7 +1928,7 @@ func putChunkSeriesNode(sn *chunkSeriesNode) {
chunkSeriesNodePool.Put(sn)
}

func (i *Ingester) sendStreamingQuerySeries(q storage.ChunkQuerier, from, through int64, matchers []*labels.Matcher, shard *sharding.ShardSelector, stream client.Ingester_QueryStreamServer) (*chunkSeriesNode, int, error) {
func (i *Ingester) sendStreamingQuerySeries(_ context.Context, q storage.ChunkQuerier, from, through int64, matchers []*labels.Matcher, shard *sharding.ShardSelector, stream client.Ingester_QueryStreamServer) (*chunkSeriesNode, int, error) {
// Disable chunks trimming, so that we don't have to rewrite chunks which have samples outside
// the requested from/through range. PromQL engine can handle it.
hints := initSelectHints(from, through)
Expand Down Expand Up @@ -2102,7 +2104,7 @@ func (i *Ingester) getTSDBUsers() []string {
return ids
}

func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*userTSDB, error) {
func (i *Ingester) getOrCreateTSDB(ctx context.Context, userID string, force bool) (*userTSDB, error) {
db := i.getTSDB(userID)
if db != nil {
return db, nil
Expand Down Expand Up @@ -2137,7 +2139,7 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*userTSDB, error)
}

// Create the database and a shipper for a user
db, err := i.createTSDB(userID, 0)
db, err := i.createTSDB(ctx, userID, 0)
if err != nil {
return nil, err
}
Expand All @@ -2150,7 +2152,7 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*userTSDB, error)
}

// createTSDB creates a TSDB for a given userID, and returns the created db.
func (i *Ingester) createTSDB(userID string, walReplayConcurrency int) (*userTSDB, error) {
func (i *Ingester) createTSDB(_ context.Context, userID string, walReplayConcurrency int) (*userTSDB, error) {
tsdbPromReg := prometheus.NewRegistry()
udir := i.cfg.BlocksStorageConfig.TSDB.BlocksDir(userID)
userLogger := util_log.WithUserID(userID, i.logger)
Expand Down Expand Up @@ -2320,7 +2322,7 @@ func (i *Ingester) openExistingTSDB(ctx context.Context) error {
for n := 0; n < tsdbOpenConcurrency; n++ {
group.Go(func() error {
for userID := range queue {
db, err := i.createTSDB(userID, tsdbWALReplayConcurrency)
db, err := i.createTSDB(ctx, userID, tsdbWALReplayConcurrency)
if err != nil {
level.Error(i.logger).Log("msg", "unable to open TSDB", "err", err, "user", userID)
return errors.Wrapf(err, "unable to open TSDB for user %s", userID)
Expand Down Expand Up @@ -2660,18 +2662,18 @@ func (i *Ingester) compactBlocks(ctx context.Context, force bool, forcedCompacti
switch {
case force:
reason = "forced"
err = userDB.compactHead(i.cfg.BlocksStorageConfig.TSDB.BlockRanges[0].Milliseconds(), forcedCompactionMaxTime)
err = userDB.compactHead(ctx, i.cfg.BlocksStorageConfig.TSDB.BlockRanges[0].Milliseconds(), forcedCompactionMaxTime)

case i.compactionIdleTimeout > 0 && userDB.isIdle(time.Now(), i.compactionIdleTimeout):
reason = "idle"
level.Info(i.logger).Log("msg", "TSDB is idle, forcing compaction", "user", userID)

// Always pass math.MaxInt64 as forcedCompactionMaxTime because we want to compact the whole TSDB head.
err = userDB.compactHead(i.cfg.BlocksStorageConfig.TSDB.BlockRanges[0].Milliseconds(), math.MaxInt64)
err = userDB.compactHead(ctx, i.cfg.BlocksStorageConfig.TSDB.BlockRanges[0].Milliseconds(), math.MaxInt64)

default:
reason = "regular"
err = userDB.Compact()
err = userDB.Compact(ctx)
}

if err != nil {
Expand Down
36 changes: 20 additions & 16 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2612,6 +2612,8 @@ func TestIngester_Push_ShouldNotCreateTSDBIfNotInActiveState(t *testing.T) {
}

func TestIngester_getOrCreateTSDB_ShouldNotAllowToCreateTSDBIfIngesterStateIsNotActive(t *testing.T) {
ctx := context.Background()

tests := map[string]struct {
state ring.InstanceState
expectedErr error
Expand Down Expand Up @@ -2660,7 +2662,7 @@ func TestIngester_getOrCreateTSDB_ShouldNotAllowToCreateTSDBIfIngesterStateIsNot
}
}

db, err := i.getOrCreateTSDB("test", false)
db, err := i.getOrCreateTSDB(ctx, "test", false)
assert.Equal(t, testData.expectedErr, err)

if testData.expectedErr != nil {
Expand Down Expand Up @@ -4219,6 +4221,8 @@ func getWALReplayConcurrencyFromTSDBHeadOptions(userTSDB *userTSDB) int {
}

func TestIngester_shipBlocks(t *testing.T) {
ctx := context.Background()

cfg := defaultIngesterTestConfig(t)
cfg.BlocksStorageConfig.TSDB.ShipConcurrency = 2

Expand All @@ -4236,7 +4240,7 @@ func TestIngester_shipBlocks(t *testing.T) {
// Create the TSDB for 3 users and then replace the shipper with the mocked one
mocks := []*uploaderMock{}
for _, userID := range []string{"user-1", "user-2", "user-3"} {
userDB, err := i.getOrCreateTSDB(userID, false)
userDB, err := i.getOrCreateTSDB(ctx, userID, false)
require.NoError(t, err)
require.NotNil(t, userDB)

Expand Down Expand Up @@ -4407,7 +4411,7 @@ func TestIngester_closingAndOpeningTsdbConcurrently(t *testing.T) {
return i.lifecycler.HealthyInstancesCount()
})

_, err = i.getOrCreateTSDB(userID, false)
_, err = i.getOrCreateTSDB(ctx, userID, false)
require.NoError(t, err)

iterations := 5000
Expand All @@ -4420,7 +4424,7 @@ func TestIngester_closingAndOpeningTsdbConcurrently(t *testing.T) {
case <-quit:
return
default:
_, err = i.getOrCreateTSDB(userID, false)
_, err = i.getOrCreateTSDB(ctx, userID, false)
if err != nil {
chanErr <- err
}
Expand Down Expand Up @@ -4460,7 +4464,7 @@ func TestIngester_idleCloseEmptyTSDB(t *testing.T) {
return i.lifecycler.HealthyInstancesCount()
})

db, err := i.getOrCreateTSDB(userID, true)
db, err := i.getOrCreateTSDB(ctx, userID, true)
require.NoError(t, err)
require.NotNil(t, db)

Expand All @@ -4476,7 +4480,7 @@ func TestIngester_idleCloseEmptyTSDB(t *testing.T) {
require.Nil(t, db)

// And we can recreate it again, if needed.
db, err = i.getOrCreateTSDB(userID, true)
db, err = i.getOrCreateTSDB(ctx, userID, true)
require.NoError(t, err)
require.NotNil(t, db)
}
Expand Down Expand Up @@ -4915,7 +4919,7 @@ func TestIngester_ForFlush(t *testing.T) {

func mockUserShipper(t *testing.T, i *Ingester) *uploaderMock {
m := &uploaderMock{}
userDB, err := i.getOrCreateTSDB(userID, false)
userDB, err := i.getOrCreateTSDB(context.Background(), userID, false)
require.NoError(t, err)
require.NotNil(t, userDB)

Expand Down Expand Up @@ -5411,7 +5415,7 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) {

db := i.getTSDB(userID)
require.NotNil(t, db)
require.Nil(t, db.Compact())
require.NoError(t, db.Compact(ctx))

oldBlocks := db.Blocks()
require.Equal(t, 3, len(oldBlocks))
Expand All @@ -5435,7 +5439,7 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) {
_, err := i.Push(ctx, req)
require.NoError(t, err)
}
require.Nil(t, db.Compact())
require.NoError(t, db.Compact(ctx))

// Only the second block should be gone along with a new block.
newBlocks := db.Blocks()
Expand Down Expand Up @@ -5467,7 +5471,7 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) {
_, err := i.Push(ctx, req)
require.NoError(t, err)
}
require.Nil(t, db.Compact())
require.NoError(t, db.Compact(ctx))

// All blocks from the old blocks should be gone now.
newBlocks2 := db.Blocks()
Expand Down Expand Up @@ -5524,7 +5528,7 @@ func TestIngesterNotDeleteShippedBlocksUntilRetentionExpires(t *testing.T) {

db := i.getTSDB(userID)
require.NotNil(t, db)
require.Nil(t, db.Compact())
require.NoError(t, db.Compact(ctx))

oldBlocks := db.Blocks()
require.Equal(t, 3, len(oldBlocks))
Expand All @@ -5551,7 +5555,7 @@ func TestIngesterNotDeleteShippedBlocksUntilRetentionExpires(t *testing.T) {
_, err := i.Push(ctx, req)
require.NoError(t, err)
}
require.Nil(t, db.Compact())
require.NoError(t, db.Compact(ctx))

// Only the last two old blocks plus the one containing the newly added samples should remain.
newBlocks := db.Blocks()
Expand Down Expand Up @@ -5605,7 +5609,7 @@ func TestIngesterWithShippingDisabledDeletesBlocksOnlyAfterRetentionExpires(t *t

db := i.getTSDB(userID)
require.NotNil(t, db)
require.Nil(t, db.Compact())
require.NoError(t, db.Compact(ctx))

oldBlocks := db.Blocks()
require.Len(t, oldBlocks, 3)
Expand All @@ -5617,7 +5621,7 @@ func TestIngesterWithShippingDisabledDeletesBlocksOnlyAfterRetentionExpires(t *t
req, _, _, _ := mockWriteRequest(t, labels.FromStrings(labels.MetricName, "test"), 0, 5*chunkRangeMilliSec)
_, err = i.Push(ctx, req)
require.NoError(t, err)
require.Nil(t, db.Compact())
require.NoError(t, db.Compact(ctx))

require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_ingester_tsdb_compactions_total Total number of TSDB compactions that were executed.
Expand Down Expand Up @@ -8938,7 +8942,7 @@ func TestIngester_lastUpdatedTimeIsNotInTheFuture(t *testing.T) {
return i.lifecycler.HealthyInstancesCount()
})

db, err := i.getOrCreateTSDB(userID, true)
db, err := i.getOrCreateTSDB(ctx, userID, true)
require.NoError(t, err)
require.NotNil(t, db)
require.InDelta(t, time.Now().Unix(), db.getLastUpdate().Unix(), 5) // within 5 seconds of "now"
Expand All @@ -8951,7 +8955,7 @@ func TestIngester_lastUpdatedTimeIsNotInTheFuture(t *testing.T) {
i.closeAllTSDB()

// and open it again (it must exist)
db, err = i.getOrCreateTSDB(userID, false)
db, err = i.getOrCreateTSDB(ctx, userID, false)
require.NoError(t, err)
require.NotNil(t, db)

Expand Down
8 changes: 4 additions & 4 deletions pkg/ingester/label_names_and_values.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func labelValuesCardinality(
lbNames []string,
matchers []*labels.Matcher,
idxReader tsdb.IndexReader,
postingsForMatchersFn func(tsdb.IndexPostingsReader, ...*labels.Matcher) (index.Postings, error),
postingsForMatchersFn func(context.Context, tsdb.IndexPostingsReader, ...*labels.Matcher) (index.Postings, error),
msgSizeThreshold int,
srv client.Ingester_LabelValuesCardinalityServer,
) error {
Expand Down Expand Up @@ -172,7 +172,7 @@ func computeLabelValuesSeriesCount(
lblValues []string,
matchers []*labels.Matcher,
idxReader tsdb.IndexReader,
postingsForMatchersFn func(tsdb.IndexPostingsReader, ...*labels.Matcher) (index.Postings, error),
postingsForMatchersFn func(context.Context, tsdb.IndexPostingsReader, ...*labels.Matcher) (index.Postings, error),
) <-chan labelValueCountResult {
maxConcurrency := 16
if len(lblValues) < maxConcurrency {
Expand Down Expand Up @@ -216,7 +216,7 @@ func countLabelValueSeries(
lblValue string,
matchers []*labels.Matcher,
idxReader tsdb.IndexReader,
postingsForMatchersFn func(tsdb.IndexPostingsReader, ...*labels.Matcher) (index.Postings, error),
postingsForMatchersFn func(context.Context, tsdb.IndexPostingsReader, ...*labels.Matcher) (index.Postings, error),
) (uint64, error) {
if err := ctx.Err(); err != nil {
return 0, err
Expand All @@ -229,7 +229,7 @@ func countLabelValueSeries(

lblValMatchers[len(lblValMatchers)-1] = labels.MustNewMatcher(labels.MatchEqual, lblName, lblValue)

p, err := postingsForMatchersFn(idxReader, lblValMatchers...)
p, err := postingsForMatchersFn(ctx, idxReader, lblValMatchers...)
if err != nil {
return 0, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/label_names_and_values_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func TestCountLabelValueSeries_ContextCancellation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()

_, err := countLabelValueSeries(ctx, "lblName", "lblVal", nil, nil, func(reader tsdb.IndexPostingsReader, matcher ...*labels.Matcher) (index.Postings, error) {
_, err := countLabelValueSeries(ctx, "lblName", "lblVal", nil, nil, func(context.Context, tsdb.IndexPostingsReader, ...*labels.Matcher) (index.Postings, error) {
return infinitePostings{}, nil
})

Expand Down
4 changes: 2 additions & 2 deletions pkg/ingester/user_tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (u *userTSDB) Close() error {
return u.db.Close()
}

func (u *userTSDB) Compact() error {
func (u *userTSDB) Compact(_ context.Context) error {
return u.db.Compact()
}

Expand Down Expand Up @@ -196,7 +196,7 @@ func (u *userTSDB) changeStateToForcedCompaction(from tsdbState, forcedCompactio
//
// The input forcedMaxTime allows to specify the maximum timestamp of samples compacted from the
// in-order Head. You can pass math.MaxInt64 to compact the entire in-order Head.
func (u *userTSDB) compactHead(blockDuration, forcedCompactionMaxTime int64) error {
func (u *userTSDB) compactHead(_ context.Context, blockDuration, forcedCompactionMaxTime int64) error {
if ok, s := u.changeStateToForcedCompaction(active, forcedCompactionMaxTime); !ok {
return fmt.Errorf("TSDB head cannot be compacted because it is not in active state (possibly being closed or blocks shipping in progress): %s", s.String())
}
Expand Down
Loading

0 comments on commit 2a64b04

Please sign in to comment.