Skip to content

Commit

Permalink
add pause job execution on the scheduler (#518)
Browse files Browse the repository at this point in the history
* add pause job execution on the scheduler

* Update scheduler_test.go

* fix formatting
  • Loading branch information
JohnRoesler committed Jun 25, 2023
1 parent ffa4c91 commit 61f60f6
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 7 deletions.
10 changes: 10 additions & 0 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,16 @@ func ExampleScheduler_NextRun() {
// 10:30
}

func ExampleScheduler_PauseJobExecution() {
s := gocron.NewScheduler(time.UTC)
_, _ = s.Every("1s").Do(task)
s.StartAsync()
s.PauseJobExecution(true)
// jobs don't run
s.PauseJobExecution(false)
// jobs run again
}

func ExampleScheduler_RegisterEventListeners() {
s := gocron.NewScheduler(time.UTC)

Expand Down
16 changes: 9 additions & 7 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ const (
)

type executor struct {
jobFunctions chan jobFunction // the chan upon which the jobFunctions are passed in from the scheduler
ctx context.Context // used to tell the executor to stop
cancel context.CancelFunc // used to tell the executor to stop
wg *sync.WaitGroup // used by the scheduler to wait for the executor to stop
jobsWg *sync.WaitGroup // used by the executor to wait for all jobs to finish
singletonWgs *sync.Map // used by the executor to wait for the singleton runners to complete
jobFunctions chan jobFunction // the chan upon which the jobFunctions are passed in from the scheduler
ctx context.Context // used to tell the executor to stop
cancel context.CancelFunc // used to tell the executor to stop
wg *sync.WaitGroup // used by the scheduler to wait for the executor to stop
jobsWg *sync.WaitGroup // used by the executor to wait for all jobs to finish
singletonWgs *sync.Map // used by the executor to wait for the singleton runners to complete
skipExecution *atomic.Bool // used to pause the execution of jobs

limitMode limitMode // when SetMaxConcurrentJobs() is set upon the scheduler
limitModeMaxRunningJobs int // stores the maximum number of concurrently running jobs
Expand Down Expand Up @@ -134,6 +135,7 @@ func (e *executor) start() {
e.jobsWg = &sync.WaitGroup{}

e.stopped = atomic.NewBool(false)
e.skipExecution = atomic.NewBool(false)

e.limitModeQueueMu.Lock()
e.limitModeQueue = make(chan jobFunction, 1000)
Expand Down Expand Up @@ -187,7 +189,7 @@ func (e *executor) run() {
for {
select {
case f := <-e.jobFunctions:
if e.stopped.Load() {
if e.stopped.Load() || e.skipExecution.Load() {
continue
}

Expand Down
4 changes: 4 additions & 0 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1441,3 +1441,7 @@ func (s *Scheduler) RegisterEventListeners(eventListeners ...EventListener) {
job.RegisterEventListeners(eventListeners...)
}
}

func (s *Scheduler) PauseJobExecution(shouldPause bool) {
s.executor.skipExecution.Store(shouldPause)
}
28 changes: 28 additions & 0 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2707,3 +2707,31 @@ func TestScheduler_WithDistributedLocker_With_Name(t *testing.T) {
})
}
}

func TestScheduler_PauseJobExecution(t *testing.T) {
s := NewScheduler(time.UTC)
var counter int
var mu sync.Mutex

_, err := s.Every("100ms").Do(func() {
mu.Lock()
counter++
mu.Unlock()
})
require.NoError(t, err)

s.StartAsync()
time.Sleep(50 * time.Millisecond)

s.PauseJobExecution(true)
time.Sleep(200 * time.Millisecond)

s.PauseJobExecution(false)
time.Sleep(100 * time.Millisecond)
s.Stop()

mu.Lock()
defer mu.Unlock()
assert.GreaterOrEqual(t, counter, 1)
assert.LessOrEqual(t, counter, 2)
}

0 comments on commit 61f60f6

Please sign in to comment.