Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use concurrency.ForEachJob() from new dskit #727

Merged
merged 1 commit into from
Jan 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ require (
github.com/golang/protobuf v1.5.2
github.com/golang/snappy v0.0.4
github.com/gorilla/mux v1.8.0
github.com/grafana/dskit v0.0.0-20220104154758-acbb88132134
github.com/grafana/dskit v0.0.0-20220112093026-95274ccc858d
github.com/hashicorp/golang-lru v0.5.4
github.com/json-iterator/go v1.1.12
github.com/leanovate/gopter v0.2.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -849,8 +849,8 @@ github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grafana/dskit v0.0.0-20211021180445-3bd016e9d7f1/go.mod h1:uPG2nyK4CtgNDmWv7qyzYcdI+S90kHHRWvHnBtEMBXM=
github.com/grafana/dskit v0.0.0-20220104154758-acbb88132134 h1:WhDvHde5WYR/dHSFeTfTukgbvVMhO7o9zezACAKfwV0=
github.com/grafana/dskit v0.0.0-20220104154758-acbb88132134/go.mod h1:M0/dlftwBvH7+hdNNpjMa/CUXD7gsew67mbkCuDlFXE=
github.com/grafana/dskit v0.0.0-20220112093026-95274ccc858d h1:YwUtZIQFjlH6e2b5dFLfW1h/vTkTXNkZqv9qeU8b5h0=
github.com/grafana/dskit v0.0.0-20220112093026-95274ccc858d/go.mod h1:M0/dlftwBvH7+hdNNpjMa/CUXD7gsew67mbkCuDlFXE=
github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85 h1:xLuzPoOzdfNb/RF/IENCw+oLVdZB4G21VPhkHBgwSHY=
github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85/go.mod h1:crI9WX6p0IhrqB+DqIUHulRW853PaNFf7o4UprV//3I=
github.com/grafana/memberlist v0.2.5-0.20211201083710-c7bc8e9df94b/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
Expand Down
4 changes: 2 additions & 2 deletions pkg/alertmanager/alertstore/bucketclient/bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ func (s *BucketAlertStore) GetAlertConfigs(ctx context.Context, userIDs []string
cfgs = make(map[string]alertspb.AlertConfigDesc, len(userIDs))
)

err := concurrency.ForEach(ctx, concurrency.CreateJobsFromStrings(userIDs), fetchConcurrency, func(ctx context.Context, job interface{}) error {
userID := job.(string)
err := concurrency.ForEachJob(ctx, len(userIDs), fetchConcurrency, func(ctx context.Context, idx int) error {
userID := userIDs[idx]

cfg, err := s.getAlertConfig(ctx, userID)
if s.alertsBucket.IsObjNotFoundErr(err) {
Expand Down
7 changes: 3 additions & 4 deletions pkg/alertmanager/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -1141,9 +1141,8 @@ func (am *MultitenantAlertmanager) ReadFullStateForUser(ctx context.Context, use
)

// Note that the jobs swallow the errors - this is because we want to give each replica a chance to respond.
jobs := concurrency.CreateJobsFromStrings(addrs)
err = concurrency.ForEach(ctx, jobs, len(jobs), func(ctx context.Context, job interface{}) error {
addr := job.(string)
err = concurrency.ForEachJob(ctx, len(addrs), len(addrs), func(ctx context.Context, idx int) error {
addr := addrs[idx]
level.Debug(am.logger).Log("msg", "contacting replica for full state", "user", userID, "addr", addr)

c, err := am.alertmanagerClientsPool.GetClientFor(addr)
Expand Down Expand Up @@ -1180,7 +1179,7 @@ func (am *MultitenantAlertmanager) ReadFullStateForUser(ctx context.Context, use
}

// If all replicas do not know the user, propagate that outcome for the client to decide what to do.
if notFound == len(jobs) {
if notFound == len(addrs) {
return nil, errAllReplicasUserNotFound
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ const deleteBlocksConcurrency = 16

// Concurrently deletes blocks marked for deletion, and removes blocks from index.
func (c *BlocksCleaner) deleteBlocksMarkedForDeletion(ctx context.Context, idx *bucketindex.Index, userBucket objstore.Bucket, userLogger log.Logger) {
blocksToDelete := make([]interface{}, 0, len(idx.BlockDeletionMarks))
blocksToDelete := make([]ulid.ULID, 0, len(idx.BlockDeletionMarks))

// Collect blocks marked for deletion into buffered channel.
for _, mark := range idx.BlockDeletionMarks {
Expand All @@ -377,8 +377,8 @@ func (c *BlocksCleaner) deleteBlocksMarkedForDeletion(ctx context.Context, idx *
var mu sync.Mutex

// We don't want to return errors from our function, as that would stop ForEach loop early.
_ = concurrency.ForEach(ctx, blocksToDelete, deleteBlocksConcurrency, func(ctx context.Context, job interface{}) error {
blockID := job.(ulid.ULID)
_ = concurrency.ForEachJob(ctx, len(blocksToDelete), deleteBlocksConcurrency, func(ctx context.Context, jobIdx int) error {
blockID := blocksToDelete[jobIdx]

if err := block.Delete(ctx, userLogger, userBucket, blockID); err != nil {
c.blocksFailedTotal.Inc()
Expand All @@ -401,7 +401,7 @@ func (c *BlocksCleaner) deleteBlocksMarkedForDeletion(ctx context.Context, idx *
// and index are updated accordingly.
func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, partials map[ulid.ULID]error, idx *bucketindex.Index, userBucket objstore.InstrumentedBucket, userLogger log.Logger) {
// Collect all blocks with missing meta.json into buffered channel.
blocks := make([]interface{}, 0, len(partials))
blocks := make([]ulid.ULID, 0, len(partials))

for blockID, blockErr := range partials {
// We can safely delete only blocks which are partial because the meta.json is missing.
Expand All @@ -415,8 +415,8 @@ func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, partials map
var mu sync.Mutex

// We don't want to return errors from our function, as that would stop ForEach loop early.
_ = concurrency.ForEach(ctx, blocks, deleteBlocksConcurrency, func(ctx context.Context, job interface{}) error {
blockID := job.(ulid.ULID)
_ = concurrency.ForEachJob(ctx, len(blocks), deleteBlocksConcurrency, func(ctx context.Context, jobIdx int) error {
blockID := blocks[jobIdx]

// We can safely delete only partial blocks with a deletion mark.
err := metadata.ReadMarker(ctx, userLogger, userBucket, blockID.String(), &metadata.DeletionMark{})
Expand Down
34 changes: 13 additions & 21 deletions pkg/compactor/bucket_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,8 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul
// Once we have a plan we need to download the actual data.
downloadBegin := time.Now()

err = concurrency.ForEach(ctx, convertBlocksToForEachJobs(toCompact), c.blockSyncConcurrency, func(ctx context.Context, job interface{}) error {
meta := job.(*metadata.Meta)
err = concurrency.ForEachJob(ctx, len(toCompact), c.blockSyncConcurrency, func(ctx context.Context, idx int) error {
meta := toCompact[idx]

// Must be the same as in blocksToCompactDirs.
bdir := filepath.Join(subDir, meta.ULID.String())
Expand Down Expand Up @@ -442,19 +442,19 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul
uploadBegin := time.Now()
uploadedBlocks := atomic.NewInt64(0)

err = concurrency.ForEach(ctx, convertCompactionResultToForEachJobs(compIDs, job.UseSplitting(), jobLogger), c.blockSyncConcurrency, func(ctx context.Context, j interface{}) error {
shardID := j.(ulidWithShardIndex).shardIndex
compID := j.(ulidWithShardIndex).ulid
blocksToUpload := convertCompactionResultToForEachJobs(compIDs, job.UseSplitting(), jobLogger)
err = concurrency.ForEachJob(ctx, len(blocksToUpload), c.blockSyncConcurrency, func(ctx context.Context, idx int) error {
blockToUpload := blocksToUpload[idx]

uploadedBlocks.Inc()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to extend the scope of this PR but this isn't needed, we can just use len(blocksToUpload) later.


bdir := filepath.Join(subDir, compID.String())
bdir := filepath.Join(subDir, blockToUpload.ulid.String())
index := filepath.Join(bdir, block.IndexFilename)

// When splitting is enabled, we need to inject the shard ID as external label.
newLabels := job.Labels().Map()
if job.UseSplitting() {
newLabels[mimit_tsdb.CompactorShardIDExternalLabel] = sharding.FormatShardIDLabelValue(uint64(shardID), uint64(job.SplittingShards()))
newLabels[mimit_tsdb.CompactorShardIDExternalLabel] = sharding.FormatShardIDLabelValue(uint64(blockToUpload.shardIndex), uint64(job.SplittingShards()))
}

newMeta, err := metadata.InjectThanos(jobLogger, bdir, metadata.Thanos{
Expand All @@ -478,11 +478,11 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul

begin := time.Now()
if err := block.Upload(ctx, jobLogger, c.bkt, bdir, job.hashFunc); err != nil {
return errors.Wrapf(err, "upload of %s failed", compID)
return errors.Wrapf(err, "upload of %s failed", blockToUpload.ulid)
}

elapsed := time.Since(begin)
level.Info(jobLogger).Log("msg", "uploaded block", "result_block", compID, "duration", elapsed, "duration_ms", elapsed.Milliseconds(), "external_labels", labels.FromMap(newLabels))
level.Info(jobLogger).Log("msg", "uploaded block", "result_block", blockToUpload.ulid, "duration", elapsed, "duration_ms", elapsed.Milliseconds(), "external_labels", labels.FromMap(newLabels))
return nil
})
if err != nil {
Expand All @@ -505,18 +505,10 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul
return true, compIDs, nil
}

func convertBlocksToForEachJobs(input []*metadata.Meta) []interface{} {
result := make([]interface{}, len(input))
for ix := range input {
result[ix] = input[ix]
}
return result
}

// Converts ULIDs from compaction to interface{}, and also filters out empty ULIDs. When handling result of split
// compactions, shard index is index in the slice returned by compaction.
func convertCompactionResultToForEachJobs(compactedBlocks []ulid.ULID, splitJob bool, jobLogger log.Logger) []interface{} {
result := make([]interface{}, 0, len(compactedBlocks))
// convertCompactionResultToForEachJobs filters out empty ULIDs.
// When handling result of split compactions, shard index is index in the slice returned by compaction.
func convertCompactionResultToForEachJobs(compactedBlocks []ulid.ULID, splitJob bool, jobLogger log.Logger) []ulidWithShardIndex {
result := make([]ulidWithShardIndex, 0, len(compactedBlocks))

for ix, id := range compactedBlocks {
// Skip if it's an empty block.
Expand Down
11 changes: 1 addition & 10 deletions pkg/querier/queryrange/sharded_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,7 @@ func (q *shardedQuerier) handleEmbeddedQueries(queries []string, hints *storage.
streams := make([][]SampleStream, len(queries))

// Concurrently run each query. It breaks and cancels each worker context on first error.
err := concurrency.ForEach(q.ctx, createJobIndexes(len(queries)), len(queries), func(ctx context.Context, job interface{}) error {
idx := job.(int)
err := concurrency.ForEachJob(q.ctx, len(queries), len(queries), func(ctx context.Context, idx int) error {
resp, err := q.handler.Do(ctx, q.req.WithQuery(queries[idx]))
if err != nil {
return err
Expand Down Expand Up @@ -149,14 +148,6 @@ func (q *shardedQuerier) Close() error {
return nil
}

func createJobIndexes(l int) []interface{} {
jobs := make([]interface{}, l)
for j := 0; j < l; j++ {
jobs[j] = j
}
return jobs
}

type responseHeadersTracker struct {
headersMx sync.Mutex
headers map[string][]string
Expand Down
17 changes: 7 additions & 10 deletions pkg/querier/queryrange/split_and_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ func TestSplitAndCacheMiddleware_ResultsCacheFuzzy(t *testing.T) {
}

// Generate some random requests.
reqs := make([]interface{}, 0, numQueries)
reqs := make([]Request, 0, numQueries)
for q := 0; q < numQueries; q++ {
// Generate a random time range within min/max time.
startTime := minTime.Add(time.Duration(rand.Int63n(maxTime.Sub(minTime).Milliseconds())) * time.Millisecond)
Expand All @@ -585,15 +585,14 @@ func TestSplitAndCacheMiddleware_ResultsCacheFuzzy(t *testing.T) {
// Run the query without the split and cache middleware and store it as expected result.
expectedResMx := sync.Mutex{}
expectedRes := make(map[int64]Response, len(reqs))
require.NoError(t, concurrency.ForEach(ctx, reqs, len(reqs), func(ctx context.Context, job interface{}) error {
req := job.(Request)
res, err := downstream.Do(ctx, req)
require.NoError(t, concurrency.ForEachJob(ctx, len(reqs), len(reqs), func(ctx context.Context, idx int) error {
res, err := downstream.Do(ctx, reqs[idx])
if err != nil {
return err
}

expectedResMx.Lock()
expectedRes[req.GetId()] = res
expectedRes[reqs[idx].GetId()] = res
expectedResMx.Unlock()

return nil
Expand Down Expand Up @@ -628,12 +627,10 @@ func TestSplitAndCacheMiddleware_ResultsCacheFuzzy(t *testing.T) {
).Wrap(downstream)

// Run requests honoring concurrency.
require.NoError(t, concurrency.ForEach(ctx, reqs, maxConcurrency, func(ctx context.Context, job interface{}) error {
req := job.(Request)

actual, err := mw.Do(ctx, req)
require.NoError(t, concurrency.ForEachJob(ctx, len(reqs), maxConcurrency, func(ctx context.Context, idx int) error {
actual, err := mw.Do(ctx, reqs[idx])
require.NoError(t, err)
require.Equal(t, expectedRes[req.GetId()], actual)
require.Equal(t, expectedRes[reqs[idx].GetId()], actual)

return nil
}))
Expand Down
33 changes: 9 additions & 24 deletions pkg/querier/tenantfederation/merge_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package tenantfederation

import (
"context"
"fmt"
"sort"
"strings"

Expand Down Expand Up @@ -228,8 +227,7 @@ type stringSliceFuncJob struct {
// It doesn't require the output of the stringSliceFunc to be sorted, as results
// of LabelValues are not sorted.
func (m *mergeQuerier) mergeDistinctStringSliceWithTenants(f stringSliceFunc, tenants map[string]struct{}) ([]string, storage.Warnings, error) {
var jobs []interface{}

jobs := make([]*stringSliceFuncJob, 0, len(m.ids))
for pos, id := range m.ids {
if tenants != nil {
if _, matched := tenants[id]; !matched {
Expand All @@ -243,13 +241,8 @@ func (m *mergeQuerier) mergeDistinctStringSliceWithTenants(f stringSliceFunc, te
})
}

run := func(ctx context.Context, jobIntf interface{}) error {
job, ok := jobIntf.(*stringSliceFuncJob)
if !ok {
return fmt.Errorf("unexpected type %T", jobIntf)
}

var err error
run := func(ctx context.Context, idx int) (err error) {
job := jobs[idx]
job.result, job.warnings, err = f(ctx, job.querier)
if err != nil {
return errors.Wrapf(err, "error querying %s %s", rewriteLabelName(m.idLabelName), job.id)
Expand All @@ -258,20 +251,15 @@ func (m *mergeQuerier) mergeDistinctStringSliceWithTenants(f stringSliceFunc, te
return nil
}

err := concurrency.ForEach(m.ctx, jobs, maxConcurrency, run)
err := concurrency.ForEachJob(m.ctx, len(jobs), maxConcurrency, run)
if err != nil {
return nil, nil, err
}

// aggregate warnings and deduplicate string results
var warnings storage.Warnings
resultMap := make(map[string]struct{})
for _, jobIntf := range jobs {
job, ok := jobIntf.(*stringSliceFuncJob)
if !ok {
return nil, nil, fmt.Errorf("unexpected type %T", jobIntf)
}

for _, job := range jobs {
for _, e := range job.result {
resultMap[e] = struct{}{}
}
Expand Down Expand Up @@ -312,7 +300,7 @@ func (m *mergeQuerier) Select(sortSeries bool, hints *storage.SelectHints, match
log, ctx := spanlogger.NewWithLogger(m.ctx, m.logger, "mergeQuerier.Select")
defer log.Span.Finish()
matchedValues, filteredMatchers := filterValuesByMatchers(m.idLabelName, m.ids, matchers...)
var jobs = make([]interface{}, len(matchedValues))
var jobs = make([]*selectJob, len(matchedValues))
var seriesSets = make([]storage.SeriesSet, len(matchedValues))
var jobPos int
for labelPos := range m.ids {
Expand All @@ -327,11 +315,8 @@ func (m *mergeQuerier) Select(sortSeries bool, hints *storage.SelectHints, match
jobPos++
}

run := func(ctx context.Context, jobIntf interface{}) error {
job, ok := jobIntf.(*selectJob)
if !ok {
return fmt.Errorf("unexpected type %T", jobIntf)
}
run := func(ctx context.Context, idx int) error {
job := jobs[idx]
seriesSets[job.pos] = &addLabelsSeriesSet{
upstream: job.querier.Select(sortSeries, hints, filteredMatchers...),
labels: labels.Labels{
Expand All @@ -344,7 +329,7 @@ func (m *mergeQuerier) Select(sortSeries bool, hints *storage.SelectHints, match
return nil
}

err := concurrency.ForEach(ctx, jobs, maxConcurrency, run)
err := concurrency.ForEachJob(ctx, len(jobs), maxConcurrency, run)
if err != nil {
return storage.ErrSeriesSet(err)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/ruler/ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,9 +772,9 @@ func (r *Ruler) getShardedRules(ctx context.Context, userID string) ([]*GroupSta

// Concurrently fetch rules from all rulers. Since rules are not replicated,
// we need all requests to succeed.
jobs := concurrency.CreateJobsFromStrings(rulers.GetAddresses())
err = concurrency.ForEach(ctx, jobs, len(jobs), func(ctx context.Context, job interface{}) error {
addr := job.(string)
addrs := rulers.GetAddresses()
err = concurrency.ForEachJob(ctx, len(addrs), len(addrs), func(ctx context.Context, idx int) error {
addr := addrs[idx]

rulerClient, err := r.clientsPool.GetClientFor(addr)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions tools/listblocks/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ func fetchDeletionTimes(ctx context.Context, bkt objstore.Bucket, deletionMarker
mu := sync.Mutex{}
times := map[ulid.ULID]time.Time{}

return times, concurrency.ForEach(ctx, concurrency.CreateJobsFromStrings(deletionMarkers), concurrencyLimit, func(ctx context.Context, job interface{}) error {
r, err := bkt.Get(ctx, job.(string))
return times, concurrency.ForEachJob(ctx, len(deletionMarkers), concurrencyLimit, func(ctx context.Context, idx int) error {
r, err := bkt.Get(ctx, deletionMarkers[idx])
if err != nil {
if bkt.IsObjNotFoundErr(err) {
return nil
Expand Down Expand Up @@ -179,8 +179,8 @@ func fetchMetas(ctx context.Context, bkt objstore.Bucket, metaFiles []string) (m
mu := sync.Mutex{}
metas := map[ulid.ULID]*metadata.Meta{}

return metas, concurrency.ForEach(ctx, concurrency.CreateJobsFromStrings(metaFiles), concurrencyLimit, func(ctx context.Context, job interface{}) error {
r, err := bkt.Get(ctx, job.(string))
return metas, concurrency.ForEachJob(ctx, len(metaFiles), concurrencyLimit, func(ctx context.Context, idx int) error {
r, err := bkt.Get(ctx, metaFiles[idx])
if err != nil {
if bkt.IsObjNotFoundErr(err) {
return nil
Expand Down
Loading