diff --git a/example_test.go b/example_test.go index 42aa40df..9279a29d 100644 --- a/example_test.go +++ b/example_test.go @@ -623,6 +623,16 @@ func ExampleScheduler_NextRun() { // 10:30 } +func ExampleScheduler_PauseJobExecution() { + s := gocron.NewScheduler(time.UTC) + _, _ = s.Every("1s").Do(task) + s.StartAsync() + s.PauseJobExecution(true) + // jobs don't run + s.PauseJobExecution(false) + // jobs run again +} + func ExampleScheduler_RegisterEventListeners() { s := gocron.NewScheduler(time.UTC) diff --git a/executor.go b/executor.go index 977fe3c3..76d2352a 100644 --- a/executor.go +++ b/executor.go @@ -37,12 +37,13 @@ const ( ) type executor struct { - jobFunctions chan jobFunction // the chan upon which the jobFunctions are passed in from the scheduler - ctx context.Context // used to tell the executor to stop - cancel context.CancelFunc // used to tell the executor to stop - wg *sync.WaitGroup // used by the scheduler to wait for the executor to stop - jobsWg *sync.WaitGroup // used by the executor to wait for all jobs to finish - singletonWgs *sync.Map // used by the executor to wait for the singleton runners to complete + jobFunctions chan jobFunction // the chan upon which the jobFunctions are passed in from the scheduler + ctx context.Context // used to tell the executor to stop + cancel context.CancelFunc // used to tell the executor to stop + wg *sync.WaitGroup // used by the scheduler to wait for the executor to stop + jobsWg *sync.WaitGroup // used by the executor to wait for all jobs to finish + singletonWgs *sync.Map // used by the executor to wait for the singleton runners to complete + skipExecution *atomic.Bool // used to pause the execution of jobs limitMode limitMode // when SetMaxConcurrentJobs() is set upon the scheduler limitModeMaxRunningJobs int // stores the maximum number of concurrently running jobs @@ -134,6 +135,7 @@ func (e *executor) start() { e.jobsWg = &sync.WaitGroup{} e.stopped = atomic.NewBool(false) + e.skipExecution = atomic.NewBool(false) e.limitModeQueueMu.Lock() e.limitModeQueue = make(chan jobFunction, 1000) @@ -187,7 +189,7 @@ func (e *executor) run() { for { select { case f := <-e.jobFunctions: - if e.stopped.Load() { + if e.stopped.Load() || e.skipExecution.Load() { continue } diff --git a/scheduler.go b/scheduler.go index ad84a7b4..9d7f6147 100644 --- a/scheduler.go +++ b/scheduler.go @@ -1441,3 +1441,7 @@ func (s *Scheduler) RegisterEventListeners(eventListeners ...EventListener) { job.RegisterEventListeners(eventListeners...) } } + +func (s *Scheduler) PauseJobExecution(shouldPause bool) { + s.executor.skipExecution.Store(shouldPause) +} diff --git a/scheduler_test.go b/scheduler_test.go index d9698e75..9a2efa1b 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -2707,3 +2707,31 @@ func TestScheduler_WithDistributedLocker_With_Name(t *testing.T) { }) } } + +func TestScheduler_PauseJobExecution(t *testing.T) { + s := NewScheduler(time.UTC) + var counter int + var mu sync.Mutex + + _, err := s.Every("100ms").Do(func() { + mu.Lock() + counter++ + mu.Unlock() + }) + require.NoError(t, err) + + s.StartAsync() + time.Sleep(50 * time.Millisecond) + + s.PauseJobExecution(true) + time.Sleep(200 * time.Millisecond) + + s.PauseJobExecution(false) + time.Sleep(100 * time.Millisecond) + s.Stop() + + mu.Lock() + defer mu.Unlock() + assert.GreaterOrEqual(t, counter, 1) + assert.LessOrEqual(t, counter, 2) +}