From f4888970fbf0674aed6bee3e8e0fb6dc63056ac8 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Wed, 4 Aug 2021 17:38:25 +0200 Subject: [PATCH] Add backoff functionality Signed-off-by: Arve Knudsen --- go.mod | 3 + pkg/util/backoff.go | 117 +++++++++++++++++++++++++++++++++++++++ pkg/util/backoff_test.go | 103 ++++++++++++++++++++++++++++++++++ 3 files changed, 223 insertions(+) create mode 100644 go.mod create mode 100644 pkg/util/backoff.go create mode 100644 pkg/util/backoff_test.go diff --git a/go.mod b/go.mod new file mode 100644 index 000000000..01711a631 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module github.com/grafana/dskit + +go 1.16 diff --git a/pkg/util/backoff.go b/pkg/util/backoff.go new file mode 100644 index 000000000..2a9101dd8 --- /dev/null +++ b/pkg/util/backoff.go @@ -0,0 +1,117 @@ +package util + +import ( + "context" + "flag" + "fmt" + "math/rand" + "time" +) + +// BackoffConfig configures a Backoff +type BackoffConfig struct { + MinBackoff time.Duration `yaml:"min_period"` // start backoff at this level + MaxBackoff time.Duration `yaml:"max_period"` // increase exponentially to this level + MaxRetries int `yaml:"max_retries"` // give up after this many; zero means infinite retries +} + +// RegisterFlags for BackoffConfig. +func (cfg *BackoffConfig) RegisterFlags(prefix string, f *flag.FlagSet) { + f.DurationVar(&cfg.MinBackoff, prefix+".backoff-min-period", 100*time.Millisecond, "Minimum delay when backing off.") + f.DurationVar(&cfg.MaxBackoff, prefix+".backoff-max-period", 10*time.Second, "Maximum delay when backing off.") + f.IntVar(&cfg.MaxRetries, prefix+".backoff-retries", 10, "Number of times to backoff and retry before failing.") +} + +// Backoff implements exponential backoff with randomized wait times +type Backoff struct { + cfg BackoffConfig + ctx context.Context + numRetries int + nextDelayMin time.Duration + nextDelayMax time.Duration +} + +// NewBackoff creates a Backoff object. Pass a Context that can also terminate the operation. +func NewBackoff(ctx context.Context, cfg BackoffConfig) *Backoff { + return &Backoff{ + cfg: cfg, + ctx: ctx, + nextDelayMin: cfg.MinBackoff, + nextDelayMax: doubleDuration(cfg.MinBackoff, cfg.MaxBackoff), + } +} + +// Reset the Backoff back to its initial condition +func (b *Backoff) Reset() { + b.numRetries = 0 + b.nextDelayMin = b.cfg.MinBackoff + b.nextDelayMax = doubleDuration(b.cfg.MinBackoff, b.cfg.MaxBackoff) +} + +// Ongoing returns true if caller should keep going +func (b *Backoff) Ongoing() bool { + // Stop if Context has errored or max retry count is exceeded + return b.ctx.Err() == nil && (b.cfg.MaxRetries == 0 || b.numRetries < b.cfg.MaxRetries) +} + +// Err returns the reason for terminating the backoff, or nil if it didn't terminate +func (b *Backoff) Err() error { + if b.ctx.Err() != nil { + return b.ctx.Err() + } + if b.cfg.MaxRetries != 0 && b.numRetries >= b.cfg.MaxRetries { + return fmt.Errorf("terminated after %d retries", b.numRetries) + } + return nil +} + +// NumRetries returns the number of retries so far +func (b *Backoff) NumRetries() int { + return b.numRetries +} + +// Wait sleeps for the backoff time then increases the retry count and backoff time +// Returns immediately if Context is terminated +func (b *Backoff) Wait() { + // Increase the number of retries and get the next delay + sleepTime := b.NextDelay() + + if b.Ongoing() { + select { + case <-b.ctx.Done(): + case <-time.After(sleepTime): + } + } +} + +func (b *Backoff) NextDelay() time.Duration { + b.numRetries++ + + // Handle the edge case where the min and max have the same value + // (or due to some misconfig max is < min) + if b.nextDelayMin >= b.nextDelayMax { + return b.nextDelayMin + } + + // Add a jitter within the next exponential backoff range + sleepTime := b.nextDelayMin + time.Duration(rand.Int63n(int64(b.nextDelayMax-b.nextDelayMin))) + + // Apply the exponential backoff to calculate the next jitter + // range, unless we've already reached the max + if b.nextDelayMax < b.cfg.MaxBackoff { + b.nextDelayMin = doubleDuration(b.nextDelayMin, b.cfg.MaxBackoff) + b.nextDelayMax = doubleDuration(b.nextDelayMax, b.cfg.MaxBackoff) + } + + return sleepTime +} + +func doubleDuration(value time.Duration, max time.Duration) time.Duration { + value = value * 2 + + if value <= max { + return value + } + + return max +} diff --git a/pkg/util/backoff_test.go b/pkg/util/backoff_test.go new file mode 100644 index 000000000..b255db0e7 --- /dev/null +++ b/pkg/util/backoff_test.go @@ -0,0 +1,103 @@ +package util + +import ( + "context" + "testing" + "time" +) + +func TestBackoff_NextDelay(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + minBackoff time.Duration + maxBackoff time.Duration + expectedRanges [][]time.Duration + }{ + "exponential backoff with jitter honoring min and max": { + minBackoff: 100 * time.Millisecond, + maxBackoff: 10 * time.Second, + expectedRanges: [][]time.Duration{ + {100 * time.Millisecond, 200 * time.Millisecond}, + {200 * time.Millisecond, 400 * time.Millisecond}, + {400 * time.Millisecond, 800 * time.Millisecond}, + {800 * time.Millisecond, 1600 * time.Millisecond}, + {1600 * time.Millisecond, 3200 * time.Millisecond}, + {3200 * time.Millisecond, 6400 * time.Millisecond}, + {6400 * time.Millisecond, 10000 * time.Millisecond}, + {6400 * time.Millisecond, 10000 * time.Millisecond}, + }, + }, + "exponential backoff with max equal to the end of a range": { + minBackoff: 100 * time.Millisecond, + maxBackoff: 800 * time.Millisecond, + expectedRanges: [][]time.Duration{ + {100 * time.Millisecond, 200 * time.Millisecond}, + {200 * time.Millisecond, 400 * time.Millisecond}, + {400 * time.Millisecond, 800 * time.Millisecond}, + {400 * time.Millisecond, 800 * time.Millisecond}, + }, + }, + "exponential backoff with max equal to the end of a range + 1": { + minBackoff: 100 * time.Millisecond, + maxBackoff: 801 * time.Millisecond, + expectedRanges: [][]time.Duration{ + {100 * time.Millisecond, 200 * time.Millisecond}, + {200 * time.Millisecond, 400 * time.Millisecond}, + {400 * time.Millisecond, 800 * time.Millisecond}, + {800 * time.Millisecond, 801 * time.Millisecond}, + {800 * time.Millisecond, 801 * time.Millisecond}, + }, + }, + "exponential backoff with max equal to the end of a range - 1": { + minBackoff: 100 * time.Millisecond, + maxBackoff: 799 * time.Millisecond, + expectedRanges: [][]time.Duration{ + {100 * time.Millisecond, 200 * time.Millisecond}, + {200 * time.Millisecond, 400 * time.Millisecond}, + {400 * time.Millisecond, 799 * time.Millisecond}, + {400 * time.Millisecond, 799 * time.Millisecond}, + }, + }, + "min backoff is equal to max": { + minBackoff: 100 * time.Millisecond, + maxBackoff: 100 * time.Millisecond, + expectedRanges: [][]time.Duration{ + {100 * time.Millisecond, 100 * time.Millisecond}, + {100 * time.Millisecond, 100 * time.Millisecond}, + {100 * time.Millisecond, 100 * time.Millisecond}, + }, + }, + "min backoff is greater then max": { + minBackoff: 200 * time.Millisecond, + maxBackoff: 100 * time.Millisecond, + expectedRanges: [][]time.Duration{ + {200 * time.Millisecond, 200 * time.Millisecond}, + {200 * time.Millisecond, 200 * time.Millisecond}, + {200 * time.Millisecond, 200 * time.Millisecond}, + }, + }, + } + + for testName, testData := range tests { + testData := testData + + t.Run(testName, func(t *testing.T) { + t.Parallel() + + b := NewBackoff(context.Background(), BackoffConfig{ + MinBackoff: testData.minBackoff, + MaxBackoff: testData.maxBackoff, + MaxRetries: len(testData.expectedRanges), + }) + + for _, expectedRange := range testData.expectedRanges { + delay := b.NextDelay() + + if delay < expectedRange[0] || delay > expectedRange[1] { + t.Errorf("%d expected to be within %d and %d", delay, expectedRange[0], expectedRange[1]) + } + } + }) + } +}