Skip to content

Commit

Permalink
feat: add a simple way to ensure cron jobs to run at least once for e…
Browse files Browse the repository at this point in the history
…ach schedule.
  • Loading branch information
Reasno committed Mar 15, 2022
1 parent 7a37160 commit 101017f
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 0 deletions.
13 changes: 13 additions & 0 deletions cron/cancel.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
local job = KEYS[1]
local hostname = ARGV[1]

local host = redis.call('GET', job .. ':host')
if host == nil then
return -2
end

if host ~= hostname then
return -1
end
return redis.call('DEL', job .. ':host')

18 changes: 18 additions & 0 deletions cron/commit.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
local job = KEYS[1]
local hostname = ARGV[1]
local next = ARGV[2]
local expire = tonumber(ARGV[3])

local host = redis.call('GET', job .. ':host')
if host == nil then
return -2
end

if host ~= hostname then
return -1
end

redis.call('SET', job .. ':next', next)
redis.call('EXPIRE', job .. ':next', expire)
return redis.call('DEL', job .. ':host')

98 changes: 98 additions & 0 deletions cron/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@ package cron

import (
"context"
_ "embed"
"errors"
"fmt"
"math"
"os"
"runtime/debug"
"strings"
"time"

"github.com/DoNewsCode/core/logging"
"github.com/go-redis/redis/v8"

"github.com/go-kit/log"
"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -92,6 +97,92 @@ func WithTracing(tracer opentracing.Tracer) JobOption {
}
}

//go:embed start.lua
var startLua string

//go:embed commit.lua
var commitLua string

//go:embed cancel.lua
var cancelLua string

type PersistenceConfig struct {
// How long will the redis lock be held. By default, the lock will be held for a minute.
LockTTL time.Duration
// Only missed schedules before this duration can be compensated. By default, it is calculated from the gap between each run.
MaxRecoverableDuration time.Duration
// The prefix of keys in redis. Make sure each project use different keys to avoid collision.
KeyPrefix string
}

// WithPersistence ensures the job will be run at least once by committing
// successful runs into redis. If a schedule is missed, the driver will
// compensate the missing run in the next schedule. Users should use
// GetCurrentSchedule method to determine the targeted schedule of the current
// run, instead of relying on time.Now.
func WithPersistence(redis redis.UniversalClient, config PersistenceConfig) JobOption {
if config.LockTTL == 0 {
config.LockTTL = time.Minute
}
return func(descriptor *JobDescriptor) {
innerRun := descriptor.Run
descriptor.Run = func(ctx context.Context) error {
var (
expectedNext time.Time
expectedNextString interface{}
err error
current = GetCurrentSchedule(ctx)
next = GetNextSchedule(ctx)
)

hostname, _ := os.Hostname()
expire := calculateNextTTL(config, current, next)

cancel := func() {
redis.Eval(ctx, cancelLua, []string{descriptor.Name}, []string{hostname})
}

for next.Sub(descriptor.next) <= 0 {
keys := []string{strings.Join([]string{config.KeyPrefix, descriptor.Name}, ":")}
argv := []string{hostname, fmt.Sprintf("%.0f", config.LockTTL.Round(time.Second).Seconds())}
expectedNextString, err = redis.Eval(ctx, startLua, keys, argv).Result()
if err != nil {
cancel()
return fmt.Errorf("failed to start job: %w", err)
}
if expectedNextString == -2 {
cancel()
return errors.New("job is already running")
}
if expectedNextString == -1 {
expectedNext = current
} else {
expectedNext, err = time.Parse(time.RFC3339, expectedNextString.(string))
if err != nil {
cancel()
return fmt.Errorf("could not parse expected next time: %s", err)
}
}

current = expectedNext
next = descriptor.Schedule.Next(current)
ctx = context.WithValue(ctx, prevContextKey, current)
ctx = context.WithValue(ctx, nextContextKey, next)
err = innerRun(ctx)
if err != nil {
cancel()
return err
}
if err := redis.Eval(ctx, commitLua, keys, []string{hostname, next.Format(time.RFC3339), expire}).Err(); err != nil {
cancel()
return fmt.Errorf("failed to commit job: %w", err)
}
}
return nil
}
}
}

// SkipIfOverlap returns a new JobDescriptor that will skip the job if it overlaps with another job.
func SkipIfOverlap() JobOption {
ch := make(chan struct{}, 1)
Expand Down Expand Up @@ -155,3 +246,10 @@ func Recover(logger log.Logger) JobOption {
}
}
}

func calculateNextTTL(config PersistenceConfig, current, next time.Time) string {
if config.MaxRecoverableDuration != 0 {
return fmt.Sprintf("%0.f", config.MaxRecoverableDuration.Seconds())
}
return fmt.Sprintf("%0.f", math.Max(3600, 2*float64(next.Sub(current).Seconds())+1))
}
33 changes: 33 additions & 0 deletions cron/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ import (
"bytes"
"context"
"errors"
"os"
"strings"
"testing"
"time"

"github.com/DoNewsCode/core/internal/stub"
"github.com/go-redis/redis/v8"
"github.com/stretchr/testify/assert"

"github.com/go-kit/log"
"github.com/opentracing/opentracing-go/mocktracer"
Expand All @@ -29,6 +32,36 @@ func (m mockParser) Parse(spec string) (cron.Schedule, error) {
}), nil
}

func TestJobPersistence(t *testing.T) {
if os.Getenv("REDIS_ADDR") == "" {
t.Skip("set REDIS_ADDR to run TestModule_ProvideRunGroup")
return
}
addrs := strings.Split(os.Getenv("REDIS_ADDR"), ",")

client := redis.NewUniversalClient(&redis.UniversalOptions{
Addrs: addrs,
})
defer client.Close()

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

client.Set(ctx, "test:foo:next", time.Now().Round(time.Second).Format(time.RFC3339), 0)

c := New(Config{EnableSeconds: true})

var i int
c.Add("* * * * * *", func(ctx context.Context) error {
i++
return nil
}, WithPersistence(client, PersistenceConfig{KeyPrefix: "test"}), WithName("foo"))
c.Run(ctx)

assert.GreaterOrEqual(t, i, 2)
t.Log(i)
}

func TestJobOption(t *testing.T) {
t.Parallel()
var buf bytes.Buffer
Expand Down
20 changes: 20 additions & 0 deletions cron/start.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
local job = KEYS[1]
local hostname = ARGV[1]
local expire = tonumber(ARGV[2])

local host = redis.call('GET', job .. ':host')
if host ~= false and host ~= hostname then
return -2
end

redis.call('SET', job .. ':host', hostname)
redis.call('EXPIRE', job .. ':host', expire)

local expectedNext = redis.call('GET', job .. ':next')
if expectedNext == false then
return -1
end

return expectedNext


0 comments on commit 101017f

Please sign in to comment.