Skip to content

Commit

Permalink
fix(das): limit amount of recent jobs (#2314)
Browse files Browse the repository at this point in the history
## Overview

Resolves #2312
  • Loading branch information
walldiss committed Jun 5, 2023
1 parent 59b9daf commit f05b71f
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 9 deletions.
16 changes: 8 additions & 8 deletions das/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ type workerCheckpoint struct {
func newCheckpoint(stats SamplingStats) checkpoint {
workers := make([]workerCheckpoint, 0, len(stats.Workers))
for _, w := range stats.Workers {
// no need to store retry jobs, since they will resume from failed heights map
if w.JobType == retryJob {
continue
// no need to resume recent jobs after restart. On the other hand, retry jobs will resume from
// failed heights map. it leaves only catchup jobs to be stored and resumed
if w.JobType == catchupJob {
workers = append(workers, workerCheckpoint{
From: w.Curr,
To: w.To,
JobType: w.JobType,
})
}
workers = append(workers, workerCheckpoint{
From: w.Curr,
To: w.To,
JobType: w.JobType,
})
}
return checkpoint{
SampleFrom: stats.CatchupHead + 1,
Expand Down
9 changes: 8 additions & 1 deletion das/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ func (sc *samplingCoordinator) run(ctx context.Context, cp checkpoint) {
select {
case head := <-sc.updHeadCh:
if sc.state.isNewHead(head.Height()) {
sc.runWorker(ctx, sc.state.recentJob(head))
if !sc.recentJobsLimitReached() {
sc.runWorker(ctx, sc.state.recentJob(head))
}
sc.state.updateHead(head.Height())
// run worker without concurrency limit restrictions to reduced delay
sc.metrics.observeNewHead(ctx)
Expand Down Expand Up @@ -146,3 +148,8 @@ func (sc *samplingCoordinator) getCheckpoint(ctx context.Context) (checkpoint, e
func (sc *samplingCoordinator) concurrencyLimitReached() bool {
return len(sc.state.inProgress) >= sc.concurrencyLimit
}

// recentJobsLimitReached indicates whether concurrency limit for recent jobs has been reached
func (sc *samplingCoordinator) recentJobsLimitReached() bool {
return len(sc.state.inProgress) >= 2*sc.concurrencyLimit
}

0 comments on commit f05b71f

Please sign in to comment.