diff --git a/go.mod b/go.mod index 99f934f1d..536eead9e 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 ) -require github.com/go-co-op/gocron/v2 v2.2.5 +require github.com/go-co-op/gocron/v2 v2.2.9 require ( github.com/emicklei/go-restful/v3 v3.11.0 // indirect diff --git a/go.sum b/go.sum index b3f082c21..9964f8153 100644 --- a/go.sum +++ b/go.sum @@ -45,8 +45,8 @@ github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4 github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/ghodss/yaml v0.0.0-20150909031657-73d445a93680/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= -github.com/go-co-op/gocron/v2 v2.2.5 h1:AGyUDXmSmqnclltaMVrLCtl3viJMY3TcpWdU4dbi/mE= -github.com/go-co-op/gocron/v2 v2.2.5/go.mod h1:igssOwzZkfcnu3m2kwnCf/mYj4SmhP9ecSgmYjCOHkk= +github.com/go-co-op/gocron/v2 v2.2.9 h1:aoKosYWSSdXFLecjFWX1i8+R6V7XdZb8sB2ZKAY5Yis= +github.com/go-co-op/gocron/v2 v2.2.9/go.mod h1:mZx3gMSlFnb97k3hRqX3+GdlG3+DUwTh6B8fnsTScXg= github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas= github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU= github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= @@ -214,8 +214,8 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5 github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= -github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= diff --git a/vendor/github.com/go-co-op/gocron/v2/README.md b/vendor/github.com/go-co-op/gocron/v2/README.md index 2e473bf5d..7fc195753 100644 --- a/vendor/github.com/go-co-op/gocron/v2/README.md +++ b/vendor/github.com/go-co-op/gocron/v2/README.md @@ -66,6 +66,11 @@ func main() { } ``` +## Examples + +- [Go doc examples](https://pkg.go.dev/github.com/go-co-op/gocron/v2#pkg-examples) +- [Examples directory](examples) + ## Concepts - **Job**: The job encapsulates a "task", which is made up of a go function and any function parameters. The Job then diff --git a/vendor/github.com/go-co-op/gocron/v2/errors.go b/vendor/github.com/go-co-op/gocron/v2/errors.go index 12d85eb51..e28012447 100644 --- a/vendor/github.com/go-co-op/gocron/v2/errors.go +++ b/vendor/github.com/go-co-op/gocron/v2/errors.go @@ -9,6 +9,7 @@ var ( ErrDailyJobAtTimesNil = fmt.Errorf("gocron: DailyJob: atTimes must not be nil") ErrDailyJobHours = fmt.Errorf("gocron: DailyJob: atTimes hours must be between 0 and 23 inclusive") ErrDailyJobMinutesSeconds = fmt.Errorf("gocron: DailyJob: atTimes minutes and seconds must be between 0 and 59 inclusive") + ErrDurationJobIntervalZero = fmt.Errorf("gocron: DurationJob: time interval is 0") ErrDurationRandomJobMinMax = fmt.Errorf("gocron: DurationRandomJob: minimum duration must be less than maximum duration") ErrEventListenerFuncNil = fmt.Errorf("gocron: eventListenerFunc must not be nil") ErrJobNotFound = fmt.Errorf("gocron: job not found") diff --git a/vendor/github.com/go-co-op/gocron/v2/executor.go b/vendor/github.com/go-co-op/gocron/v2/executor.go index eae65e631..1b51e57ba 100644 --- a/vendor/github.com/go-co-op/gocron/v2/executor.go +++ b/vendor/github.com/go-co-op/gocron/v2/executor.go @@ -10,20 +10,21 @@ import ( ) type executor struct { - ctx context.Context - cancel context.CancelFunc - logger Logger - stopCh chan struct{} - jobsIn chan jobIn - jobIDsOut chan uuid.UUID - jobOutRequest chan jobOutRequest - stopTimeout time.Duration - done chan error - singletonRunners *sync.Map // map[uuid.UUID]singletonRunner - limitMode *limitModeConfig - elector Elector - locker Locker - monitor Monitor + ctx context.Context + cancel context.CancelFunc + logger Logger + stopCh chan struct{} + jobsIn chan jobIn + jobsOutForRescheduling chan uuid.UUID + jobsOutCompleted chan uuid.UUID + jobOutRequest chan jobOutRequest + stopTimeout time.Duration + done chan error + singletonRunners *sync.Map // map[uuid.UUID]singletonRunner + limitMode *limitModeConfig + elector Elector + locker Locker + monitor Monitor } type jobIn struct { @@ -122,7 +123,7 @@ func (e *executor) start() { // all runners are busy, reschedule the work for later // which means we just skip it here and do nothing // TODO when metrics are added, this should increment a rescheduled metric - e.sendOutToScheduler(&jIn) + e.sendOutForRescheduling(&jIn) } } else { // since we're not using LimitModeReschedule, but instead using LimitModeWait @@ -131,7 +132,7 @@ func (e *executor) start() { // at which point this call would block. // TODO when metrics are added, this should increment a wait metric e.limitMode.in <- jIn - e.sendOutToScheduler(&jIn) + e.sendOutForRescheduling(&jIn) } } else { // no limit mode, so we're either running a regular job or @@ -167,17 +168,17 @@ func (e *executor) start() { select { case runner.rescheduleLimiter <- struct{}{}: runner.in <- jIn - e.sendOutToScheduler(&jIn) + e.sendOutForRescheduling(&jIn) default: // runner is busy, reschedule the work for later // which means we just skip it here and do nothing // TODO when metrics are added, this should increment a rescheduled metric - e.sendOutToScheduler(&jIn) + e.sendOutForRescheduling(&jIn) } } else { // wait mode, fill up that queue (buffered channel, so it's ok) runner.in <- jIn - e.sendOutToScheduler(&jIn) + e.sendOutForRescheduling(&jIn) } } else { select { @@ -193,7 +194,7 @@ func (e *executor) start() { // complete. standardJobsWg.Add(1) go func(j internalJob) { - e.runJob(j, jIn.shouldSendOut) + e.runJob(j, jIn) standardJobsWg.Done() }(*j) } @@ -206,10 +207,10 @@ func (e *executor) start() { } } -func (e *executor) sendOutToScheduler(jIn *jobIn) { +func (e *executor) sendOutForRescheduling(jIn *jobIn) { if jIn.shouldSendOut { select { - case e.jobIDsOut <- jIn.id: + case e.jobsOutForRescheduling <- jIn.id: case <-e.ctx.Done(): return } @@ -250,7 +251,7 @@ func (e *executor) limitModeRunner(name string, in chan jobIn, wg *waitGroupWith return case <-j.ctx.Done(): return - case e.jobIDsOut <- j.id: + case e.jobsOutForRescheduling <- j.id: } } // remove the limiter block, as this particular job @@ -264,7 +265,7 @@ func (e *executor) limitModeRunner(name string, in chan jobIn, wg *waitGroupWith e.limitMode.singletonJobs[jIn.id] = struct{}{} e.limitMode.singletonJobsMu.Unlock() } - e.runJob(*j, jIn.shouldSendOut) + e.runJob(*j, jIn) if j.singletonMode { e.limitMode.singletonJobsMu.Lock() @@ -302,7 +303,7 @@ func (e *executor) singletonModeRunner(name string, in chan jobIn, wg *waitGroup j := requestJobCtx(ctx, jIn.id, e.jobOutRequest) cancel() if j != nil { - e.runJob(*j, jIn.shouldSendOut) + e.runJob(*j, jIn) } // remove the limiter block to allow another job to be scheduled @@ -317,7 +318,7 @@ func (e *executor) singletonModeRunner(name string, in chan jobIn, wg *waitGroup } } -func (e *executor) runJob(j internalJob, shouldSendOut bool) { +func (e *executor) runJob(j internalJob, jIn jobIn) { if j.ctx == nil { return } @@ -331,25 +332,23 @@ func (e *executor) runJob(j internalJob, shouldSendOut bool) { if e.elector != nil { if err := e.elector.IsLeader(j.ctx); err != nil { + e.sendOutForRescheduling(&jIn) return } } else if e.locker != nil { lock, err := e.locker.Lock(j.ctx, j.name) if err != nil { + e.sendOutForRescheduling(&jIn) return } defer func() { _ = lock.Unlock(j.ctx) }() } _ = callJobFuncWithParams(j.beforeJobRuns, j.id, j.name) - if shouldSendOut { - select { - case <-e.ctx.Done(): - return - case <-j.ctx.Done(): - return - case e.jobIDsOut <- j.id: - } + e.sendOutForRescheduling(&jIn) + select { + case e.jobsOutCompleted <- j.id: + case <-e.ctx.Done(): } startTime := time.Now() diff --git a/vendor/github.com/go-co-op/gocron/v2/job.go b/vendor/github.com/go-co-op/gocron/v2/job.go index 3dd2a1cd1..7641dbe82 100644 --- a/vendor/github.com/go-co-op/gocron/v2/job.go +++ b/vendor/github.com/go-co-op/gocron/v2/job.go @@ -24,7 +24,9 @@ type internalJob struct { name string tags []string jobSchedule - lastRun, nextRun time.Time + lastScheduledRun time.Time + nextScheduled time.Time + lastRun time.Time function any parameters []any timer clockwork.Timer @@ -148,6 +150,9 @@ type durationJobDefinition struct { } func (d durationJobDefinition) setup(j *internalJob, _ *time.Location) error { + if d.duration == 0 { + return ErrDurationJobIntervalZero + } j.jobSchedule = &durationJob{duration: d.duration} return nil } @@ -678,18 +683,18 @@ func (d dailyJob) next(lastRun time.Time) time.Time { func (d dailyJob) nextDay(lastRun time.Time, firstPass bool) time.Time { for _, at := range d.atTimes { - // sub the at time hour/min/sec onto the lastRun's values + // sub the at time hour/min/sec onto the lastScheduledRun's values // to use in checks to see if we've got our next run time atDate := time.Date(lastRun.Year(), lastRun.Month(), lastRun.Day(), at.Hour(), at.Minute(), at.Second(), lastRun.Nanosecond(), lastRun.Location()) if firstPass && atDate.After(lastRun) { // checking to see if it is after i.e. greater than, - // and not greater or equal as our lastRun day/time + // and not greater or equal as our lastScheduledRun day/time // will be in the loop, and we don't want to select it again return atDate } else if !firstPass && !atDate.Before(lastRun) { // now that we're looking at the next day, it's ok to consider - // the same at time that was last run (as lastRun has been incremented) + // the same at time that was last run (as lastScheduledRun has been incremented) return atDate } } @@ -724,18 +729,18 @@ func (w weeklyJob) nextWeekDayAtTime(lastRun time.Time, firstPass bool) time.Tim // weekDayDiff is used to add the correct amount to the atDate day below weekDayDiff := wd - lastRun.Weekday() for _, at := range w.atTimes { - // sub the at time hour/min/sec onto the lastRun's values + // sub the at time hour/min/sec onto the lastScheduledRun's values // to use in checks to see if we've got our next run time atDate := time.Date(lastRun.Year(), lastRun.Month(), lastRun.Day()+int(weekDayDiff), at.Hour(), at.Minute(), at.Second(), lastRun.Nanosecond(), lastRun.Location()) if firstPass && atDate.After(lastRun) { // checking to see if it is after i.e. greater than, - // and not greater or equal as our lastRun day/time + // and not greater or equal as our lastScheduledRun day/time // will be in the loop, and we don't want to select it again return atDate } else if !firstPass && !atDate.Before(lastRun) { // now that we're looking at the next week, it's ok to consider - // the same at time that was last run (as lastRun has been incremented) + // the same at time that was last run (as lastScheduledRun has been incremented) return atDate } } @@ -792,7 +797,7 @@ func (m monthlyJob) nextMonthDayAtTime(lastRun time.Time, days []int, firstPass for _, day := range days { if day >= lastRun.Day() { for _, at := range m.atTimes { - // sub the day, and the at time hour/min/sec onto the lastRun's values + // sub the day, and the at time hour/min/sec onto the lastScheduledRun's values // to use in checks to see if we've got our next run time atDate := time.Date(lastRun.Year(), lastRun.Month(), day, at.Hour(), at.Minute(), at.Second(), lastRun.Nanosecond(), lastRun.Location()) @@ -804,12 +809,12 @@ func (m monthlyJob) nextMonthDayAtTime(lastRun time.Time, days []int, firstPass if firstPass && atDate.After(lastRun) { // checking to see if it is after i.e. greater than, - // and not greater or equal as our lastRun day/time + // and not greater or equal as our lastScheduledRun day/time // will be in the loop, and we don't want to select it again return atDate } else if !firstPass && !atDate.Before(lastRun) { // now that we're looking at the next month, it's ok to consider - // the same at time that was lastRun (as lastRun has been incremented) + // the same at time that was lastScheduledRun (as lastScheduledRun has been incremented) return atDate } } @@ -889,7 +894,7 @@ func (j job) NextRun() (time.Time, error) { if ij == nil || ij.id == uuid.Nil { return time.Time{}, ErrJobNotFound } - return ij.nextRun, nil + return ij.nextScheduled, nil } func (j job) Tags() []string { diff --git a/vendor/github.com/go-co-op/gocron/v2/scheduler.go b/vendor/github.com/go-co-op/gocron/v2/scheduler.go index 32c7acc7c..dd1323c76 100644 --- a/vendor/github.com/go-co-op/gocron/v2/scheduler.go +++ b/vendor/github.com/go-co-op/gocron/v2/scheduler.go @@ -109,10 +109,11 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { singletonRunners: nil, logger: &noOpLogger{}, - jobsIn: make(chan jobIn), - jobIDsOut: make(chan uuid.UUID), - jobOutRequest: make(chan jobOutRequest, 1000), - done: make(chan error), + jobsIn: make(chan jobIn), + jobsOutForRescheduling: make(chan uuid.UUID), + jobsOutCompleted: make(chan uuid.UUID), + jobOutRequest: make(chan jobOutRequest, 1000), + done: make(chan error), } s := &scheduler{ @@ -147,8 +148,11 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { s.logger.Info("gocron: new scheduler created") for { select { - case id := <-s.exec.jobIDsOut: - s.selectExecJobIDsOut(id) + case id := <-s.exec.jobsOutForRescheduling: + s.selectExecJobsOutForRescheduling(id) + + case id := <-s.exec.jobsOutCompleted: + s.selectExecJobsOutCompleted(id) case in := <-s.newJobCh: s.selectNewJob(in) @@ -287,28 +291,16 @@ func (s *scheduler) selectRemoveJob(id uuid.UUID) { // Jobs coming back from the executor to the scheduler that // need to evaluated for rescheduling. -func (s *scheduler) selectExecJobIDsOut(id uuid.UUID) { - j := s.jobs[id] - j.lastRun = j.nextRun - - // if the job has a limited number of runs set, we need to - // check how many runs have occurred and stop running this - // job if it has reached the limit. - if j.limitRunsTo != nil { - j.limitRunsTo.runCount = j.limitRunsTo.runCount + 1 - if j.limitRunsTo.runCount == j.limitRunsTo.limit { - go func() { - select { - case <-s.shutdownCtx.Done(): - return - case s.removeJobCh <- id: - } - }() - return - } +func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) { + j, ok := s.jobs[id] + if !ok { + // the job was removed while it was running, and + // so we don't need to reschedule it. + return } + j.lastScheduledRun = j.nextScheduled - next := j.next(j.lastRun) + next := j.next(j.lastScheduledRun) if next.IsZero() { // the job's next function will return zero for OneTime jobs. // since they are one time only, they do not need rescheduling. @@ -324,7 +316,7 @@ func (s *scheduler) selectExecJobIDsOut(id uuid.UUID) { next = j.next(next) } } - j.nextRun = next + j.nextScheduled = next j.timer = s.clock.AfterFunc(next.Sub(s.now()), func() { // set the actual timer on the job here and listen for // shut down events so that the job doesn't attempt to @@ -342,6 +334,33 @@ func (s *scheduler) selectExecJobIDsOut(id uuid.UUID) { s.jobs[id] = j } +func (s *scheduler) selectExecJobsOutCompleted(id uuid.UUID) { + j, ok := s.jobs[id] + if !ok { + return + } + + // if the job has a limited number of runs set, we need to + // check how many runs have occurred and stop running this + // job if it has reached the limit. + if j.limitRunsTo != nil { + j.limitRunsTo.runCount = j.limitRunsTo.runCount + 1 + if j.limitRunsTo.runCount == j.limitRunsTo.limit { + go func() { + select { + case <-s.shutdownCtx.Done(): + return + case s.removeJobCh <- id: + } + }() + return + } + } + + j.lastRun = s.now() + s.jobs[id] = j +} + func (s *scheduler) selectJobOutRequest(out jobOutRequest) { if j, ok := s.jobs[out.id]; ok { select { @@ -381,7 +400,7 @@ func (s *scheduler) selectNewJob(in newJobIn) { } }) } - j.nextRun = next + j.nextScheduled = next } s.jobs[j.id] = j @@ -432,7 +451,7 @@ func (s *scheduler) selectStart() { } }) } - j.nextRun = next + j.nextScheduled = next s.jobs[id] = j } select { diff --git a/vendor/github.com/go-co-op/gocron/v2/util.go b/vendor/github.com/go-co-op/gocron/v2/util.go index 8bff9429e..18986b363 100644 --- a/vendor/github.com/go-co-op/gocron/v2/util.go +++ b/vendor/github.com/go-co-op/gocron/v2/util.go @@ -44,18 +44,12 @@ func requestJob(id uuid.UUID, ch chan jobOutRequest) *internalJob { func requestJobCtx(ctx context.Context, id uuid.UUID, ch chan jobOutRequest) *internalJob { resp := make(chan internalJob, 1) - select { - case <-ctx.Done(): - return nil - default: - } - select { case ch <- jobOutRequest{ id: id, outChan: resp, }: - default: + case <-ctx.Done(): return nil } var j internalJob diff --git a/vendor/modules.txt b/vendor/modules.txt index e2145422f..80ce206f1 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -29,7 +29,7 @@ github.com/evanphx/json-patch # github.com/fsnotify/fsnotify v1.7.0 ## explicit; go 1.17 github.com/fsnotify/fsnotify -# github.com/go-co-op/gocron/v2 v2.2.5 +# github.com/go-co-op/gocron/v2 v2.2.9 ## explicit; go 1.20 github.com/go-co-op/gocron/v2 # github.com/go-logr/logr v1.3.0