Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepare for context argument changes in Prometheus #6097

Merged
merged 3 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/compactor/block_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ func (c *MultitenantCompactor) validateBlock(ctx context.Context, logger log.Log

// validate block
checkChunks := c.cfgProvider.CompactorBlockUploadVerifyChunks(userID)
err = block.VerifyBlock(c.logger, blockDir, blockMetadata.MinTime, blockMetadata.MaxTime, checkChunks)
err = block.VerifyBlock(ctx, c.logger, blockDir, blockMetadata.MinTime, blockMetadata.MaxTime, checkChunks)
if err != nil {
return errors.Wrap(err, "error validating block")
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/compactor/bucket_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul
}

// Ensure all source blocks are valid.
stats, err := block.GatherBlockHealthStats(jobLogger, bdir, meta.MinTime, meta.MaxTime, false)
stats, err := block.GatherBlockHealthStats(ctx, jobLogger, bdir, meta.MinTime, meta.MaxTime, false)
if err != nil {
return errors.Wrapf(err, "gather index issues for block %s", bdir)
}
Expand Down Expand Up @@ -397,7 +397,7 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul
}

// Ensure the compacted block is valid.
if err := block.VerifyBlock(jobLogger, bdir, newMeta.MinTime, newMeta.MaxTime, false); err != nil {
if err := block.VerifyBlock(ctx, jobLogger, bdir, newMeta.MinTime, newMeta.MaxTime, false); err != nil {
return errors.Wrapf(err, "invalid result block %s", bdir)
}

Expand Down Expand Up @@ -571,13 +571,13 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket,
return errors.Wrapf(err, "read meta from %s", bdir)
}

resid, err := block.Repair(logger, tmpdir, ie.id, block.CompactorRepairSource, block.IgnoreIssue347OutsideChunk)
resid, err := block.Repair(ctx, logger, tmpdir, ie.id, block.CompactorRepairSource, block.IgnoreIssue347OutsideChunk)
if err != nil {
return errors.Wrapf(err, "repair failed for block %s", ie.id)
}

// Verify repaired id before uploading it.
if err := block.VerifyBlock(logger, filepath.Join(tmpdir, resid.String()), meta.MinTime, meta.MaxTime, false); err != nil {
if err := block.VerifyBlock(ctx, logger, filepath.Join(tmpdir, resid.String()), meta.MinTime, meta.MaxTime, false); err != nil {
return errors.Wrapf(err, "repaired block is invalid %s", resid)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func TestNewMockShardedQueryable(t *testing.T) {
}.LabelValue(),
})

require.Nil(t, set.Err())
require.NoError(t, set.Err())

var iter chunkenc.Iterator
for set.Next() {
Expand Down
6 changes: 3 additions & 3 deletions pkg/frontend/querymiddleware/sharded_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,16 @@ func (q *shardedQuerier) Select(_ bool, hints *storage.SelectHints, matchers ...
return storage.ErrSeriesSet(err)
}

return q.handleEmbeddedQueries(queries, hints)
return q.handleEmbeddedQueries(q.ctx, queries, hints)
}

// handleEmbeddedQueries concurrently executes the provided queries through the downstream handler.
// The returned storage.SeriesSet contains sorted series.
func (q *shardedQuerier) handleEmbeddedQueries(queries []string, hints *storage.SelectHints) storage.SeriesSet {
func (q *shardedQuerier) handleEmbeddedQueries(ctx context.Context, queries []string, hints *storage.SelectHints) storage.SeriesSet {
streams := make([][]SampleStream, len(queries))

// Concurrently run each query. It breaks and cancels each worker context on first error.
err := concurrency.ForEachJob(q.ctx, len(queries), len(queries), func(ctx context.Context, idx int) error {
err := concurrency.ForEachJob(ctx, len(queries), len(queries), func(ctx context.Context, idx int) error {
resp, err := q.handler.Do(ctx, q.req.WithQuery(queries[idx]))
if err != nil {
return err
Expand Down
28 changes: 15 additions & 13 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, pushReq *push.Request) (
return &mimirpb.WriteResponse{}, nil
}

db, err := i.getOrCreateTSDB(userID, false)
db, err := i.getOrCreateTSDB(ctx, userID, false)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're passing the request context to getOrCreateTSDB(). I don't know what it will be used for, but if the context will be retained by TSDB then I think this is a mistake. We shouldn't pass any request-specific context to any place where the context will be retained.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback @pracucci. The context shouldn't be retained, but I'm looking into whether it's a bad idea to pass it down into TSDB.

if err != nil {
return nil, wrapOrAnnotateWithUser(err, userID)
}
Expand Down Expand Up @@ -1552,12 +1552,14 @@ func (i *Ingester) LabelValuesCardinality(req *client.LabelValuesCardinalityRequ
return err
}

var postingsForMatchersFn func(ix tsdb.IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error)
var postingsForMatchersFn func(context.Context, tsdb.IndexPostingsReader, ...*labels.Matcher) (index.Postings, error)
switch req.GetCountMethod() {
case client.IN_MEMORY:
postingsForMatchersFn = tsdb.PostingsForMatchers
postingsForMatchersFn = func(_ context.Context, ix tsdb.IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
return tsdb.PostingsForMatchers(ix, ms...)
}
case client.ACTIVE:
postingsForMatchersFn = func(ix tsdb.IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
postingsForMatchersFn = func(_ context.Context, ix tsdb.IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
postings, err := tsdb.PostingsForMatchers(ix, ms...)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1876,7 +1878,7 @@ func (i *Ingester) executeStreamingQuery(ctx context.Context, db *userTSDB, from
// The querier must remain open until we've finished streaming chunks.
defer q.Close()

allSeries, numSeries, err := i.sendStreamingQuerySeries(q, from, through, matchers, shard, stream)
allSeries, numSeries, err := i.sendStreamingQuerySeries(ctx, q, from, through, matchers, shard, stream)
if err != nil {
return 0, 0, err
}
Expand Down Expand Up @@ -1926,7 +1928,7 @@ func putChunkSeriesNode(sn *chunkSeriesNode) {
chunkSeriesNodePool.Put(sn)
}

func (i *Ingester) sendStreamingQuerySeries(q storage.ChunkQuerier, from, through int64, matchers []*labels.Matcher, shard *sharding.ShardSelector, stream client.Ingester_QueryStreamServer) (*chunkSeriesNode, int, error) {
func (i *Ingester) sendStreamingQuerySeries(_ context.Context, q storage.ChunkQuerier, from, through int64, matchers []*labels.Matcher, shard *sharding.ShardSelector, stream client.Ingester_QueryStreamServer) (*chunkSeriesNode, int, error) {
// Disable chunks trimming, so that we don't have to rewrite chunks which have samples outside
// the requested from/through range. PromQL engine can handle it.
hints := initSelectHints(from, through)
Expand Down Expand Up @@ -2102,7 +2104,7 @@ func (i *Ingester) getTSDBUsers() []string {
return ids
}

func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*userTSDB, error) {
func (i *Ingester) getOrCreateTSDB(ctx context.Context, userID string, force bool) (*userTSDB, error) {
db := i.getTSDB(userID)
if db != nil {
return db, nil
Expand Down Expand Up @@ -2137,7 +2139,7 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*userTSDB, error)
}

// Create the database and a shipper for a user
db, err := i.createTSDB(userID, 0)
db, err := i.createTSDB(ctx, userID, 0)
if err != nil {
return nil, err
}
Expand All @@ -2150,7 +2152,7 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*userTSDB, error)
}

// createTSDB creates a TSDB for a given userID, and returns the created db.
func (i *Ingester) createTSDB(userID string, walReplayConcurrency int) (*userTSDB, error) {
func (i *Ingester) createTSDB(_ context.Context, userID string, walReplayConcurrency int) (*userTSDB, error) {
tsdbPromReg := prometheus.NewRegistry()
udir := i.cfg.BlocksStorageConfig.TSDB.BlocksDir(userID)
userLogger := util_log.WithUserID(userID, i.logger)
Expand Down Expand Up @@ -2320,7 +2322,7 @@ func (i *Ingester) openExistingTSDB(ctx context.Context) error {
for n := 0; n < tsdbOpenConcurrency; n++ {
group.Go(func() error {
for userID := range queue {
db, err := i.createTSDB(userID, tsdbWALReplayConcurrency)
db, err := i.createTSDB(ctx, userID, tsdbWALReplayConcurrency)
if err != nil {
level.Error(i.logger).Log("msg", "unable to open TSDB", "err", err, "user", userID)
return errors.Wrapf(err, "unable to open TSDB for user %s", userID)
Expand Down Expand Up @@ -2660,18 +2662,18 @@ func (i *Ingester) compactBlocks(ctx context.Context, force bool, forcedCompacti
switch {
case force:
reason = "forced"
err = userDB.compactHead(i.cfg.BlocksStorageConfig.TSDB.BlockRanges[0].Milliseconds(), forcedCompactionMaxTime)
err = userDB.compactHead(ctx, i.cfg.BlocksStorageConfig.TSDB.BlockRanges[0].Milliseconds(), forcedCompactionMaxTime)

case i.compactionIdleTimeout > 0 && userDB.isIdle(time.Now(), i.compactionIdleTimeout):
reason = "idle"
level.Info(i.logger).Log("msg", "TSDB is idle, forcing compaction", "user", userID)

// Always pass math.MaxInt64 as forcedCompactionMaxTime because we want to compact the whole TSDB head.
err = userDB.compactHead(i.cfg.BlocksStorageConfig.TSDB.BlockRanges[0].Milliseconds(), math.MaxInt64)
err = userDB.compactHead(ctx, i.cfg.BlocksStorageConfig.TSDB.BlockRanges[0].Milliseconds(), math.MaxInt64)

default:
reason = "regular"
err = userDB.Compact()
err = userDB.Compact(ctx)
}

if err != nil {
Expand Down
46 changes: 25 additions & 21 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2612,6 +2612,8 @@ func TestIngester_Push_ShouldNotCreateTSDBIfNotInActiveState(t *testing.T) {
}

func TestIngester_getOrCreateTSDB_ShouldNotAllowToCreateTSDBIfIngesterStateIsNotActive(t *testing.T) {
ctx := context.Background()

tests := map[string]struct {
state ring.InstanceState
expectedErr error
Expand Down Expand Up @@ -2660,7 +2662,7 @@ func TestIngester_getOrCreateTSDB_ShouldNotAllowToCreateTSDBIfIngesterStateIsNot
}
}

db, err := i.getOrCreateTSDB("test", false)
db, err := i.getOrCreateTSDB(ctx, "test", false)
assert.Equal(t, testData.expectedErr, err)

if testData.expectedErr != nil {
Expand Down Expand Up @@ -4219,14 +4221,16 @@ func getWALReplayConcurrencyFromTSDBHeadOptions(userTSDB *userTSDB) int {
}

func TestIngester_shipBlocks(t *testing.T) {
ctx := context.Background()
aknuds1 marked this conversation as resolved.
Show resolved Hide resolved

cfg := defaultIngesterTestConfig(t)
cfg.BlocksStorageConfig.TSDB.ShipConcurrency = 2

// Create ingester
i, err := prepareIngesterWithBlocksStorage(t, cfg, nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.NoError(t, services.StartAndAwaitRunning(ctx, i))
defer services.StopAndAwaitTerminated(ctx, i) //nolint:errcheck

// Wait until it's healthy
test.Poll(t, 1*time.Second, 1, func() interface{} {
Expand All @@ -4236,7 +4240,7 @@ func TestIngester_shipBlocks(t *testing.T) {
// Create the TSDB for 3 users and then replace the shipper with the mocked one
mocks := []*uploaderMock{}
for _, userID := range []string{"user-1", "user-2", "user-3"} {
userDB, err := i.getOrCreateTSDB(userID, false)
userDB, err := i.getOrCreateTSDB(ctx, userID, false)
require.NoError(t, err)
require.NotNil(t, userDB)

Expand All @@ -4248,7 +4252,7 @@ func TestIngester_shipBlocks(t *testing.T) {
}

// Ship blocks and assert on the mocked shipper
i.shipBlocks(context.Background(), nil)
i.shipBlocks(ctx, nil)

for _, m := range mocks {
m.AssertNumberOfCalls(t, "Sync", 1)
Expand Down Expand Up @@ -4407,7 +4411,7 @@ func TestIngester_closingAndOpeningTsdbConcurrently(t *testing.T) {
return i.lifecycler.HealthyInstancesCount()
})

_, err = i.getOrCreateTSDB(userID, false)
_, err = i.getOrCreateTSDB(ctx, userID, false)
require.NoError(t, err)

iterations := 5000
Expand All @@ -4420,7 +4424,7 @@ func TestIngester_closingAndOpeningTsdbConcurrently(t *testing.T) {
case <-quit:
return
default:
_, err = i.getOrCreateTSDB(userID, false)
_, err = i.getOrCreateTSDB(ctx, userID, false)
if err != nil {
chanErr <- err
}
Expand Down Expand Up @@ -4460,13 +4464,13 @@ func TestIngester_idleCloseEmptyTSDB(t *testing.T) {
return i.lifecycler.HealthyInstancesCount()
})

db, err := i.getOrCreateTSDB(userID, true)
db, err := i.getOrCreateTSDB(ctx, userID, true)
aknuds1 marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err)
require.NotNil(t, db)

// Run compaction and shipping.
i.compactBlocks(context.Background(), true, math.MaxInt64, nil)
i.shipBlocks(context.Background(), nil)
i.compactBlocks(ctx, true, math.MaxInt64, nil)
i.shipBlocks(ctx, nil)

// Make sure we can close completely empty TSDB without problems.
require.Equal(t, tsdbIdleClosed, i.closeAndDeleteUserTSDBIfIdle(userID))
Expand All @@ -4476,7 +4480,7 @@ func TestIngester_idleCloseEmptyTSDB(t *testing.T) {
require.Nil(t, db)

// And we can recreate it again, if needed.
db, err = i.getOrCreateTSDB(userID, true)
db, err = i.getOrCreateTSDB(ctx, userID, true)
require.NoError(t, err)
require.NotNil(t, db)
}
Expand Down Expand Up @@ -4915,7 +4919,7 @@ func TestIngester_ForFlush(t *testing.T) {

func mockUserShipper(t *testing.T, i *Ingester) *uploaderMock {
m := &uploaderMock{}
userDB, err := i.getOrCreateTSDB(userID, false)
userDB, err := i.getOrCreateTSDB(context.Background(), userID, false)
require.NoError(t, err)
require.NotNil(t, userDB)

Expand Down Expand Up @@ -5411,7 +5415,7 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) {

db := i.getTSDB(userID)
require.NotNil(t, db)
require.Nil(t, db.Compact())
require.NoError(t, db.Compact(ctx))

oldBlocks := db.Blocks()
require.Equal(t, 3, len(oldBlocks))
Expand All @@ -5435,7 +5439,7 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) {
_, err := i.Push(ctx, req)
require.NoError(t, err)
}
require.Nil(t, db.Compact())
require.NoError(t, db.Compact(ctx))

// Only the second block should be gone along with a new block.
newBlocks := db.Blocks()
Expand Down Expand Up @@ -5467,7 +5471,7 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) {
_, err := i.Push(ctx, req)
require.NoError(t, err)
}
require.Nil(t, db.Compact())
require.NoError(t, db.Compact(ctx))

// All blocks from the old blocks should be gone now.
newBlocks2 := db.Blocks()
Expand Down Expand Up @@ -5524,7 +5528,7 @@ func TestIngesterNotDeleteShippedBlocksUntilRetentionExpires(t *testing.T) {

db := i.getTSDB(userID)
require.NotNil(t, db)
require.Nil(t, db.Compact())
require.NoError(t, db.Compact(ctx))

oldBlocks := db.Blocks()
require.Equal(t, 3, len(oldBlocks))
Expand All @@ -5551,7 +5555,7 @@ func TestIngesterNotDeleteShippedBlocksUntilRetentionExpires(t *testing.T) {
_, err := i.Push(ctx, req)
require.NoError(t, err)
}
require.Nil(t, db.Compact())
require.NoError(t, db.Compact(ctx))

// Only the last two old blocks plus the one containing the newly added samples should remain.
newBlocks := db.Blocks()
Expand Down Expand Up @@ -5605,7 +5609,7 @@ func TestIngesterWithShippingDisabledDeletesBlocksOnlyAfterRetentionExpires(t *t

db := i.getTSDB(userID)
require.NotNil(t, db)
require.Nil(t, db.Compact())
require.NoError(t, db.Compact(ctx))

oldBlocks := db.Blocks()
require.Len(t, oldBlocks, 3)
Expand All @@ -5617,7 +5621,7 @@ func TestIngesterWithShippingDisabledDeletesBlocksOnlyAfterRetentionExpires(t *t
req, _, _, _ := mockWriteRequest(t, labels.FromStrings(labels.MetricName, "test"), 0, 5*chunkRangeMilliSec)
_, err = i.Push(ctx, req)
require.NoError(t, err)
require.Nil(t, db.Compact())
require.NoError(t, db.Compact(ctx))

require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_ingester_tsdb_compactions_total Total number of TSDB compactions that were executed.
Expand Down Expand Up @@ -8938,7 +8942,7 @@ func TestIngester_lastUpdatedTimeIsNotInTheFuture(t *testing.T) {
return i.lifecycler.HealthyInstancesCount()
})

db, err := i.getOrCreateTSDB(userID, true)
db, err := i.getOrCreateTSDB(ctx, userID, true)
require.NoError(t, err)
require.NotNil(t, db)
require.InDelta(t, time.Now().Unix(), db.getLastUpdate().Unix(), 5) // within 5 seconds of "now"
Expand All @@ -8951,7 +8955,7 @@ func TestIngester_lastUpdatedTimeIsNotInTheFuture(t *testing.T) {
i.closeAllTSDB()

// and open it again (it must exist)
db, err = i.getOrCreateTSDB(userID, false)
db, err = i.getOrCreateTSDB(ctx, userID, false)
require.NoError(t, err)
require.NotNil(t, db)

Expand Down
8 changes: 4 additions & 4 deletions pkg/ingester/label_names_and_values.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func labelValuesCardinality(
lbNames []string,
matchers []*labels.Matcher,
idxReader tsdb.IndexReader,
postingsForMatchersFn func(tsdb.IndexPostingsReader, ...*labels.Matcher) (index.Postings, error),
postingsForMatchersFn func(context.Context, tsdb.IndexPostingsReader, ...*labels.Matcher) (index.Postings, error),
msgSizeThreshold int,
srv client.Ingester_LabelValuesCardinalityServer,
) error {
Expand Down Expand Up @@ -172,7 +172,7 @@ func computeLabelValuesSeriesCount(
lblValues []string,
matchers []*labels.Matcher,
idxReader tsdb.IndexReader,
postingsForMatchersFn func(tsdb.IndexPostingsReader, ...*labels.Matcher) (index.Postings, error),
postingsForMatchersFn func(context.Context, tsdb.IndexPostingsReader, ...*labels.Matcher) (index.Postings, error),
) <-chan labelValueCountResult {
maxConcurrency := 16
if len(lblValues) < maxConcurrency {
Expand Down Expand Up @@ -216,7 +216,7 @@ func countLabelValueSeries(
lblValue string,
matchers []*labels.Matcher,
idxReader tsdb.IndexReader,
postingsForMatchersFn func(tsdb.IndexPostingsReader, ...*labels.Matcher) (index.Postings, error),
postingsForMatchersFn func(context.Context, tsdb.IndexPostingsReader, ...*labels.Matcher) (index.Postings, error),
) (uint64, error) {
if err := ctx.Err(); err != nil {
return 0, err
Expand All @@ -229,7 +229,7 @@ func countLabelValueSeries(

lblValMatchers[len(lblValMatchers)-1] = labels.MustNewMatcher(labels.MatchEqual, lblName, lblValue)

p, err := postingsForMatchersFn(idxReader, lblValMatchers...)
p, err := postingsForMatchersFn(ctx, idxReader, lblValMatchers...)
if err != nil {
return 0, err
}
Expand Down
Loading
Loading