From 0c4c32de9bdf0b323b3000dc935c0e4e0249cfc2 Mon Sep 17 00:00:00 2001 From: Oleg Zaytsev Date: Wed, 12 Jan 2022 10:41:03 +0100 Subject: [PATCH] Use concurrency.ForEachJob() from new dskit Ref: https://github.com/grafana/dskit/pull/113 Updated all usages of concurrency.ForEach() to use ForEachJob() so no type assertion is needed anymore. Signed-off-by: Oleg Zaytsev --- go.mod | 2 +- go.sum | 4 +- .../alertstore/bucketclient/bucket_client.go | 4 +- pkg/alertmanager/multitenant.go | 7 +-- pkg/compactor/blocks_cleaner.go | 12 ++-- pkg/compactor/bucket_compactor.go | 34 +++++------- pkg/querier/queryrange/sharded_queryable.go | 11 +--- .../queryrange/split_and_cache_test.go | 17 +++--- .../tenantfederation/merge_queryable.go | 33 +++-------- pkg/ruler/ruler.go | 6 +- tools/listblocks/main.go | 8 +-- .../grafana/dskit/concurrency/runner.go | 55 +++++++++++-------- vendor/github.com/grafana/dskit/ring/ring.go | 12 ---- vendor/modules.txt | 2 +- 14 files changed, 84 insertions(+), 123 deletions(-) diff --git a/go.mod b/go.mod index 64856467db6..c2b428292e4 100644 --- a/go.mod +++ b/go.mod @@ -28,7 +28,7 @@ require ( github.com/golang/protobuf v1.5.2 github.com/golang/snappy v0.0.4 github.com/gorilla/mux v1.8.0 - github.com/grafana/dskit v0.0.0-20220104154758-acbb88132134 + github.com/grafana/dskit v0.0.0-20220112093026-95274ccc858d github.com/hashicorp/golang-lru v0.5.4 github.com/json-iterator/go v1.1.12 github.com/leanovate/gopter v0.2.4 diff --git a/go.sum b/go.sum index 83de4a49935..8f34efb8039 100644 --- a/go.sum +++ b/go.sum @@ -849,8 +849,8 @@ github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grafana/dskit v0.0.0-20211021180445-3bd016e9d7f1/go.mod h1:uPG2nyK4CtgNDmWv7qyzYcdI+S90kHHRWvHnBtEMBXM= -github.com/grafana/dskit v0.0.0-20220104154758-acbb88132134 h1:WhDvHde5WYR/dHSFeTfTukgbvVMhO7o9zezACAKfwV0= -github.com/grafana/dskit v0.0.0-20220104154758-acbb88132134/go.mod h1:M0/dlftwBvH7+hdNNpjMa/CUXD7gsew67mbkCuDlFXE= +github.com/grafana/dskit v0.0.0-20220112093026-95274ccc858d h1:YwUtZIQFjlH6e2b5dFLfW1h/vTkTXNkZqv9qeU8b5h0= +github.com/grafana/dskit v0.0.0-20220112093026-95274ccc858d/go.mod h1:M0/dlftwBvH7+hdNNpjMa/CUXD7gsew67mbkCuDlFXE= github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85 h1:xLuzPoOzdfNb/RF/IENCw+oLVdZB4G21VPhkHBgwSHY= github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85/go.mod h1:crI9WX6p0IhrqB+DqIUHulRW853PaNFf7o4UprV//3I= github.com/grafana/memberlist v0.2.5-0.20211201083710-c7bc8e9df94b/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= diff --git a/pkg/alertmanager/alertstore/bucketclient/bucket_client.go b/pkg/alertmanager/alertstore/bucketclient/bucket_client.go index fe025775f94..c00c1965113 100644 --- a/pkg/alertmanager/alertstore/bucketclient/bucket_client.go +++ b/pkg/alertmanager/alertstore/bucketclient/bucket_client.go @@ -78,8 +78,8 @@ func (s *BucketAlertStore) GetAlertConfigs(ctx context.Context, userIDs []string cfgs = make(map[string]alertspb.AlertConfigDesc, len(userIDs)) ) - err := concurrency.ForEach(ctx, concurrency.CreateJobsFromStrings(userIDs), fetchConcurrency, func(ctx context.Context, job interface{}) error { - userID := job.(string) + err := concurrency.ForEachJob(ctx, len(userIDs), fetchConcurrency, func(ctx context.Context, idx int) error { + userID := userIDs[idx] cfg, err := s.getAlertConfig(ctx, userID) if s.alertsBucket.IsObjNotFoundErr(err) { diff --git a/pkg/alertmanager/multitenant.go b/pkg/alertmanager/multitenant.go index a3c0a37db12..1b30c8a2b42 100644 --- a/pkg/alertmanager/multitenant.go +++ b/pkg/alertmanager/multitenant.go @@ -1141,9 +1141,8 @@ func (am *MultitenantAlertmanager) ReadFullStateForUser(ctx context.Context, use ) // Note that the jobs swallow the errors - this is because we want to give each replica a chance to respond. - jobs := concurrency.CreateJobsFromStrings(addrs) - err = concurrency.ForEach(ctx, jobs, len(jobs), func(ctx context.Context, job interface{}) error { - addr := job.(string) + err = concurrency.ForEachJob(ctx, len(addrs), len(addrs), func(ctx context.Context, idx int) error { + addr := addrs[idx] level.Debug(am.logger).Log("msg", "contacting replica for full state", "user", userID, "addr", addr) c, err := am.alertmanagerClientsPool.GetClientFor(addr) @@ -1180,7 +1179,7 @@ func (am *MultitenantAlertmanager) ReadFullStateForUser(ctx context.Context, use } // If all replicas do not know the user, propagate that outcome for the client to decide what to do. - if notFound == len(jobs) { + if notFound == len(addrs) { return nil, errAllReplicasUserNotFound } diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index e7258ac6c97..77280439c02 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -364,7 +364,7 @@ const deleteBlocksConcurrency = 16 // Concurrently deletes blocks marked for deletion, and removes blocks from index. func (c *BlocksCleaner) deleteBlocksMarkedForDeletion(ctx context.Context, idx *bucketindex.Index, userBucket objstore.Bucket, userLogger log.Logger) { - blocksToDelete := make([]interface{}, 0, len(idx.BlockDeletionMarks)) + blocksToDelete := make([]ulid.ULID, 0, len(idx.BlockDeletionMarks)) // Collect blocks marked for deletion into buffered channel. for _, mark := range idx.BlockDeletionMarks { @@ -377,8 +377,8 @@ func (c *BlocksCleaner) deleteBlocksMarkedForDeletion(ctx context.Context, idx * var mu sync.Mutex // We don't want to return errors from our function, as that would stop ForEach loop early. - _ = concurrency.ForEach(ctx, blocksToDelete, deleteBlocksConcurrency, func(ctx context.Context, job interface{}) error { - blockID := job.(ulid.ULID) + _ = concurrency.ForEachJob(ctx, len(blocksToDelete), deleteBlocksConcurrency, func(ctx context.Context, jobIdx int) error { + blockID := blocksToDelete[jobIdx] if err := block.Delete(ctx, userLogger, userBucket, blockID); err != nil { c.blocksFailedTotal.Inc() @@ -401,7 +401,7 @@ func (c *BlocksCleaner) deleteBlocksMarkedForDeletion(ctx context.Context, idx * // and index are updated accordingly. func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, partials map[ulid.ULID]error, idx *bucketindex.Index, userBucket objstore.InstrumentedBucket, userLogger log.Logger) { // Collect all blocks with missing meta.json into buffered channel. - blocks := make([]interface{}, 0, len(partials)) + blocks := make([]ulid.ULID, 0, len(partials)) for blockID, blockErr := range partials { // We can safely delete only blocks which are partial because the meta.json is missing. @@ -415,8 +415,8 @@ func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, partials map var mu sync.Mutex // We don't want to return errors from our function, as that would stop ForEach loop early. - _ = concurrency.ForEach(ctx, blocks, deleteBlocksConcurrency, func(ctx context.Context, job interface{}) error { - blockID := job.(ulid.ULID) + _ = concurrency.ForEachJob(ctx, len(blocks), deleteBlocksConcurrency, func(ctx context.Context, jobIdx int) error { + blockID := blocks[jobIdx] // We can safely delete only partial blocks with a deletion mark. err := metadata.ReadMarker(ctx, userLogger, userBucket, blockID.String(), &metadata.DeletionMark{}) diff --git a/pkg/compactor/bucket_compactor.go b/pkg/compactor/bucket_compactor.go index c94b89fe0a8..76ac806061d 100644 --- a/pkg/compactor/bucket_compactor.go +++ b/pkg/compactor/bucket_compactor.go @@ -364,8 +364,8 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul // Once we have a plan we need to download the actual data. downloadBegin := time.Now() - err = concurrency.ForEach(ctx, convertBlocksToForEachJobs(toCompact), c.blockSyncConcurrency, func(ctx context.Context, job interface{}) error { - meta := job.(*metadata.Meta) + err = concurrency.ForEachJob(ctx, len(toCompact), c.blockSyncConcurrency, func(ctx context.Context, idx int) error { + meta := toCompact[idx] // Must be the same as in blocksToCompactDirs. bdir := filepath.Join(subDir, meta.ULID.String()) @@ -442,19 +442,19 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul uploadBegin := time.Now() uploadedBlocks := atomic.NewInt64(0) - err = concurrency.ForEach(ctx, convertCompactionResultToForEachJobs(compIDs, job.UseSplitting(), jobLogger), c.blockSyncConcurrency, func(ctx context.Context, j interface{}) error { - shardID := j.(ulidWithShardIndex).shardIndex - compID := j.(ulidWithShardIndex).ulid + blocksToUpload := convertCompactionResultToForEachJobs(compIDs, job.UseSplitting(), jobLogger) + err = concurrency.ForEachJob(ctx, len(blocksToUpload), c.blockSyncConcurrency, func(ctx context.Context, idx int) error { + blockToUpload := blocksToUpload[idx] uploadedBlocks.Inc() - bdir := filepath.Join(subDir, compID.String()) + bdir := filepath.Join(subDir, blockToUpload.ulid.String()) index := filepath.Join(bdir, block.IndexFilename) // When splitting is enabled, we need to inject the shard ID as external label. newLabels := job.Labels().Map() if job.UseSplitting() { - newLabels[mimit_tsdb.CompactorShardIDExternalLabel] = sharding.FormatShardIDLabelValue(uint64(shardID), uint64(job.SplittingShards())) + newLabels[mimit_tsdb.CompactorShardIDExternalLabel] = sharding.FormatShardIDLabelValue(uint64(blockToUpload.shardIndex), uint64(job.SplittingShards())) } newMeta, err := metadata.InjectThanos(jobLogger, bdir, metadata.Thanos{ @@ -478,11 +478,11 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul begin := time.Now() if err := block.Upload(ctx, jobLogger, c.bkt, bdir, job.hashFunc); err != nil { - return errors.Wrapf(err, "upload of %s failed", compID) + return errors.Wrapf(err, "upload of %s failed", blockToUpload.ulid) } elapsed := time.Since(begin) - level.Info(jobLogger).Log("msg", "uploaded block", "result_block", compID, "duration", elapsed, "duration_ms", elapsed.Milliseconds(), "external_labels", labels.FromMap(newLabels)) + level.Info(jobLogger).Log("msg", "uploaded block", "result_block", blockToUpload.ulid, "duration", elapsed, "duration_ms", elapsed.Milliseconds(), "external_labels", labels.FromMap(newLabels)) return nil }) if err != nil { @@ -505,18 +505,10 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul return true, compIDs, nil } -func convertBlocksToForEachJobs(input []*metadata.Meta) []interface{} { - result := make([]interface{}, len(input)) - for ix := range input { - result[ix] = input[ix] - } - return result -} - -// Converts ULIDs from compaction to interface{}, and also filters out empty ULIDs. When handling result of split -// compactions, shard index is index in the slice returned by compaction. -func convertCompactionResultToForEachJobs(compactedBlocks []ulid.ULID, splitJob bool, jobLogger log.Logger) []interface{} { - result := make([]interface{}, 0, len(compactedBlocks)) +// convertCompactionResultToForEachJobs filters out empty ULIDs. +// When handling result of split compactions, shard index is index in the slice returned by compaction. +func convertCompactionResultToForEachJobs(compactedBlocks []ulid.ULID, splitJob bool, jobLogger log.Logger) []ulidWithShardIndex { + result := make([]ulidWithShardIndex, 0, len(compactedBlocks)) for ix, id := range compactedBlocks { // Skip if it's an empty block. diff --git a/pkg/querier/queryrange/sharded_queryable.go b/pkg/querier/queryrange/sharded_queryable.go index 6a4de44e548..f679980343c 100644 --- a/pkg/querier/queryrange/sharded_queryable.go +++ b/pkg/querier/queryrange/sharded_queryable.go @@ -110,8 +110,7 @@ func (q *shardedQuerier) handleEmbeddedQueries(queries []string, hints *storage. streams := make([][]SampleStream, len(queries)) // Concurrently run each query. It breaks and cancels each worker context on first error. - err := concurrency.ForEach(q.ctx, createJobIndexes(len(queries)), len(queries), func(ctx context.Context, job interface{}) error { - idx := job.(int) + err := concurrency.ForEachJob(q.ctx, len(queries), len(queries), func(ctx context.Context, idx int) error { resp, err := q.handler.Do(ctx, q.req.WithQuery(queries[idx])) if err != nil { return err @@ -149,14 +148,6 @@ func (q *shardedQuerier) Close() error { return nil } -func createJobIndexes(l int) []interface{} { - jobs := make([]interface{}, l) - for j := 0; j < l; j++ { - jobs[j] = j - } - return jobs -} - type responseHeadersTracker struct { headersMx sync.Mutex headers map[string][]string diff --git a/pkg/querier/queryrange/split_and_cache_test.go b/pkg/querier/queryrange/split_and_cache_test.go index 5488ffb606f..d7386325d61 100644 --- a/pkg/querier/queryrange/split_and_cache_test.go +++ b/pkg/querier/queryrange/split_and_cache_test.go @@ -566,7 +566,7 @@ func TestSplitAndCacheMiddleware_ResultsCacheFuzzy(t *testing.T) { } // Generate some random requests. - reqs := make([]interface{}, 0, numQueries) + reqs := make([]Request, 0, numQueries) for q := 0; q < numQueries; q++ { // Generate a random time range within min/max time. startTime := minTime.Add(time.Duration(rand.Int63n(maxTime.Sub(minTime).Milliseconds())) * time.Millisecond) @@ -585,15 +585,14 @@ func TestSplitAndCacheMiddleware_ResultsCacheFuzzy(t *testing.T) { // Run the query without the split and cache middleware and store it as expected result. expectedResMx := sync.Mutex{} expectedRes := make(map[int64]Response, len(reqs)) - require.NoError(t, concurrency.ForEach(ctx, reqs, len(reqs), func(ctx context.Context, job interface{}) error { - req := job.(Request) - res, err := downstream.Do(ctx, req) + require.NoError(t, concurrency.ForEachJob(ctx, len(reqs), len(reqs), func(ctx context.Context, idx int) error { + res, err := downstream.Do(ctx, reqs[idx]) if err != nil { return err } expectedResMx.Lock() - expectedRes[req.GetId()] = res + expectedRes[reqs[idx].GetId()] = res expectedResMx.Unlock() return nil @@ -628,12 +627,10 @@ func TestSplitAndCacheMiddleware_ResultsCacheFuzzy(t *testing.T) { ).Wrap(downstream) // Run requests honoring concurrency. - require.NoError(t, concurrency.ForEach(ctx, reqs, maxConcurrency, func(ctx context.Context, job interface{}) error { - req := job.(Request) - - actual, err := mw.Do(ctx, req) + require.NoError(t, concurrency.ForEachJob(ctx, len(reqs), maxConcurrency, func(ctx context.Context, idx int) error { + actual, err := mw.Do(ctx, reqs[idx]) require.NoError(t, err) - require.Equal(t, expectedRes[req.GetId()], actual) + require.Equal(t, expectedRes[reqs[idx].GetId()], actual) return nil })) diff --git a/pkg/querier/tenantfederation/merge_queryable.go b/pkg/querier/tenantfederation/merge_queryable.go index 326a5aaa8c4..7511b121368 100644 --- a/pkg/querier/tenantfederation/merge_queryable.go +++ b/pkg/querier/tenantfederation/merge_queryable.go @@ -7,7 +7,6 @@ package tenantfederation import ( "context" - "fmt" "sort" "strings" @@ -228,8 +227,7 @@ type stringSliceFuncJob struct { // It doesn't require the output of the stringSliceFunc to be sorted, as results // of LabelValues are not sorted. func (m *mergeQuerier) mergeDistinctStringSliceWithTenants(f stringSliceFunc, tenants map[string]struct{}) ([]string, storage.Warnings, error) { - var jobs []interface{} - + jobs := make([]*stringSliceFuncJob, 0, len(m.ids)) for pos, id := range m.ids { if tenants != nil { if _, matched := tenants[id]; !matched { @@ -243,13 +241,8 @@ func (m *mergeQuerier) mergeDistinctStringSliceWithTenants(f stringSliceFunc, te }) } - run := func(ctx context.Context, jobIntf interface{}) error { - job, ok := jobIntf.(*stringSliceFuncJob) - if !ok { - return fmt.Errorf("unexpected type %T", jobIntf) - } - - var err error + run := func(ctx context.Context, idx int) (err error) { + job := jobs[idx] job.result, job.warnings, err = f(ctx, job.querier) if err != nil { return errors.Wrapf(err, "error querying %s %s", rewriteLabelName(m.idLabelName), job.id) @@ -258,7 +251,7 @@ func (m *mergeQuerier) mergeDistinctStringSliceWithTenants(f stringSliceFunc, te return nil } - err := concurrency.ForEach(m.ctx, jobs, maxConcurrency, run) + err := concurrency.ForEachJob(m.ctx, len(jobs), maxConcurrency, run) if err != nil { return nil, nil, err } @@ -266,12 +259,7 @@ func (m *mergeQuerier) mergeDistinctStringSliceWithTenants(f stringSliceFunc, te // aggregate warnings and deduplicate string results var warnings storage.Warnings resultMap := make(map[string]struct{}) - for _, jobIntf := range jobs { - job, ok := jobIntf.(*stringSliceFuncJob) - if !ok { - return nil, nil, fmt.Errorf("unexpected type %T", jobIntf) - } - + for _, job := range jobs { for _, e := range job.result { resultMap[e] = struct{}{} } @@ -312,7 +300,7 @@ func (m *mergeQuerier) Select(sortSeries bool, hints *storage.SelectHints, match log, ctx := spanlogger.NewWithLogger(m.ctx, m.logger, "mergeQuerier.Select") defer log.Span.Finish() matchedValues, filteredMatchers := filterValuesByMatchers(m.idLabelName, m.ids, matchers...) - var jobs = make([]interface{}, len(matchedValues)) + var jobs = make([]*selectJob, len(matchedValues)) var seriesSets = make([]storage.SeriesSet, len(matchedValues)) var jobPos int for labelPos := range m.ids { @@ -327,11 +315,8 @@ func (m *mergeQuerier) Select(sortSeries bool, hints *storage.SelectHints, match jobPos++ } - run := func(ctx context.Context, jobIntf interface{}) error { - job, ok := jobIntf.(*selectJob) - if !ok { - return fmt.Errorf("unexpected type %T", jobIntf) - } + run := func(ctx context.Context, idx int) error { + job := jobs[idx] seriesSets[job.pos] = &addLabelsSeriesSet{ upstream: job.querier.Select(sortSeries, hints, filteredMatchers...), labels: labels.Labels{ @@ -344,7 +329,7 @@ func (m *mergeQuerier) Select(sortSeries bool, hints *storage.SelectHints, match return nil } - err := concurrency.ForEach(ctx, jobs, maxConcurrency, run) + err := concurrency.ForEachJob(ctx, len(jobs), maxConcurrency, run) if err != nil { return storage.ErrSeriesSet(err) } diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index 516740c3de8..6250f3a5bee 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -772,9 +772,9 @@ func (r *Ruler) getShardedRules(ctx context.Context, userID string) ([]*GroupSta // Concurrently fetch rules from all rulers. Since rules are not replicated, // we need all requests to succeed. - jobs := concurrency.CreateJobsFromStrings(rulers.GetAddresses()) - err = concurrency.ForEach(ctx, jobs, len(jobs), func(ctx context.Context, job interface{}) error { - addr := job.(string) + addrs := rulers.GetAddresses() + err = concurrency.ForEachJob(ctx, len(addrs), len(addrs), func(ctx context.Context, idx int) error { + addr := addrs[idx] rulerClient, err := r.clientsPool.GetClientFor(addr) if err != nil { diff --git a/tools/listblocks/main.go b/tools/listblocks/main.go index 70ea87f31d6..6a538364387 100644 --- a/tools/listblocks/main.go +++ b/tools/listblocks/main.go @@ -149,8 +149,8 @@ func fetchDeletionTimes(ctx context.Context, bkt objstore.Bucket, deletionMarker mu := sync.Mutex{} times := map[ulid.ULID]time.Time{} - return times, concurrency.ForEach(ctx, concurrency.CreateJobsFromStrings(deletionMarkers), concurrencyLimit, func(ctx context.Context, job interface{}) error { - r, err := bkt.Get(ctx, job.(string)) + return times, concurrency.ForEachJob(ctx, len(deletionMarkers), concurrencyLimit, func(ctx context.Context, idx int) error { + r, err := bkt.Get(ctx, deletionMarkers[idx]) if err != nil { if bkt.IsObjNotFoundErr(err) { return nil @@ -179,8 +179,8 @@ func fetchMetas(ctx context.Context, bkt objstore.Bucket, metaFiles []string) (m mu := sync.Mutex{} metas := map[ulid.ULID]*metadata.Meta{} - return metas, concurrency.ForEach(ctx, concurrency.CreateJobsFromStrings(metaFiles), concurrencyLimit, func(ctx context.Context, job interface{}) error { - r, err := bkt.Get(ctx, job.(string)) + return metas, concurrency.ForEachJob(ctx, len(metaFiles), concurrencyLimit, func(ctx context.Context, idx int) error { + r, err := bkt.Get(ctx, metaFiles[idx]) if err != nil { if bkt.IsObjNotFoundErr(err) { return nil diff --git a/vendor/github.com/grafana/dskit/concurrency/runner.go b/vendor/github.com/grafana/dskit/concurrency/runner.go index a6740f3ac9c..023be10d7a0 100644 --- a/vendor/github.com/grafana/dskit/concurrency/runner.go +++ b/vendor/github.com/grafana/dskit/concurrency/runner.go @@ -4,6 +4,7 @@ import ( "context" "sync" + "go.uber.org/atomic" "golang.org/x/sync/errgroup" "github.com/grafana/dskit/internal/math" @@ -62,45 +63,53 @@ func ForEachUser(ctx context.Context, userIDs []string, concurrency int, userFun // ForEach runs the provided jobFunc for each job up to concurrency concurrent workers. // The execution breaks on first error encountered. +// +// Deprecated: use ForEachJob instead. func ForEach(ctx context.Context, jobs []interface{}, concurrency int, jobFunc func(ctx context.Context, job interface{}) error) error { - if len(jobs) == 0 { - return nil + return ForEachJob(ctx, len(jobs), concurrency, func(ctx context.Context, idx int) error { + return jobFunc(ctx, jobs[idx]) + }) +} + +// CreateJobsFromStrings is an utility to create jobs from an slice of strings. +// +// Deprecated: will be removed as it's not needed when using ForEachJob. +func CreateJobsFromStrings(values []string) []interface{} { + jobs := make([]interface{}, len(values)) + for i := 0; i < len(values); i++ { + jobs[i] = values[i] } + return jobs +} - // Push all jobs to a channel. - ch := make(chan interface{}, len(jobs)) - for _, job := range jobs { - ch <- job +// ForEachJob runs the provided jobFunc for each job index in [0, jobs) up to concurrency concurrent workers. +// The execution breaks on first error encountered. +func ForEachJob(ctx context.Context, jobs int, concurrency int, jobFunc func(ctx context.Context, idx int) error) error { + if jobs == 0 { + return nil } - close(ch) + + // Initialise indexes with -1 so first Inc() returns index 0. + indexes := atomic.NewInt64(-1) // Start workers to process jobs. g, ctx := errgroup.WithContext(ctx) - for ix := 0; ix < math.Min(concurrency, len(jobs)); ix++ { + for ix := 0; ix < math.Min(concurrency, jobs); ix++ { g.Go(func() error { - for job := range ch { - if err := ctx.Err(); err != nil { - return err + for ctx.Err() == nil { + idx := int(indexes.Inc()) + if idx >= jobs { + return nil } - if err := jobFunc(ctx, job); err != nil { + if err := jobFunc(ctx, idx); err != nil { return err } } - - return nil + return ctx.Err() }) } // Wait until done (or context has canceled). return g.Wait() } - -// CreateJobsFromStrings is an utility to create jobs from an slice of strings. -func CreateJobsFromStrings(values []string) []interface{} { - jobs := make([]interface{}, len(values)) - for i := 0; i < len(values); i++ { - jobs[i] = values[i] - } - return jobs -} diff --git a/vendor/github.com/grafana/dskit/ring/ring.go b/vendor/github.com/grafana/dskit/ring/ring.go index 8d6da6be61b..6aaf165bf97 100644 --- a/vendor/github.com/grafana/dskit/ring/ring.go +++ b/vendor/github.com/grafana/dskit/ring/ring.go @@ -29,18 +29,6 @@ import ( const ( unhealthy = "Unhealthy" - // IngesterRingKey is the key under which we store the ingesters ring in the KVStore. - IngesterRingKey = "ring" - - // RulerRingKey is the key under which we store the rulers ring in the KVStore. - RulerRingKey = "ring" - - // DistributorRingKey is the key under which we store the distributors ring in the KVStore. - DistributorRingKey = "distributor" - - // CompactorRingKey is the key under which we store the compactors ring in the KVStore. - CompactorRingKey = "compactor" - // GetBufferSize is the suggested size of buffers passed to Ring.Get(). It's based on // a typical replication factor 3, plus extra room for a JOINING + LEAVING instance. GetBufferSize = 5 diff --git a/vendor/modules.txt b/vendor/modules.txt index 083371df87c..10d8743bf54 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -394,7 +394,7 @@ github.com/googleapis/gax-go/v2/apierror/internal/proto # github.com/gorilla/mux v1.8.0 ## explicit; go 1.12 github.com/gorilla/mux -# github.com/grafana/dskit v0.0.0-20220104154758-acbb88132134 +# github.com/grafana/dskit v0.0.0-20220112093026-95274ccc858d ## explicit; go 1.16 github.com/grafana/dskit/backoff github.com/grafana/dskit/concurrency