Skip to content

Commit

Permalink
Merge pull request #41 from grafana/feat/limiter
Browse files Browse the repository at this point in the history
Feature: Add limiter package
  • Loading branch information
aknuds1 committed Oct 7, 2021
2 parents 8c49f9c + e4bf92c commit be68933
Show file tree
Hide file tree
Showing 3 changed files with 252 additions and 1 deletion.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@
* [CHANGE] Added CHANGELOG.md and Pull Request template to reference the changelog
* [CHANGE] Remove `cortex_` prefix for metrics registered in the ring. #46
* [ENHANCEMENT] Add middleware package. #38
* [ENHANCEMENT] Add the ring package #45
* [ENHANCEMENT] Add limiter package. #41
122 changes: 122 additions & 0 deletions limiter/rate_limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package limiter

import (
"sync"
"time"

"golang.org/x/time/rate"
)

// RateLimiterStrategy defines the interface which a pluggable strategy should
// implement. The returned limit and burst can change over the time, and the
// local rate limiter will apply them every recheckPeriod.
type RateLimiterStrategy interface {
Limit(tenantID string) float64
Burst(tenantID string) int
}

// RateLimiter is a multi-tenant local rate limiter based on golang.org/x/time/rate.
// It requires a custom strategy in input, which is used to get the limit and burst
// settings for each tenant.
type RateLimiter struct {
strategy RateLimiterStrategy
recheckPeriod time.Duration

tenantsLock sync.RWMutex
tenants map[string]*tenantLimiter
}

type tenantLimiter struct {
limiter *rate.Limiter
recheckAt time.Time
}

// NewRateLimiter makes a new multi-tenant rate limiter. Each per-tenant limiter
// is configured using the input strategy and its limit/burst is rechecked (and
// reconfigured if changed) every recheckPeriod.
func NewRateLimiter(strategy RateLimiterStrategy, recheckPeriod time.Duration) *RateLimiter {
return &RateLimiter{
strategy: strategy,
recheckPeriod: recheckPeriod,
tenants: map[string]*tenantLimiter{},
}
}

// AllowN reports whether n tokens may be consumed happen at time now.
func (l *RateLimiter) AllowN(now time.Time, tenantID string, n int) bool {
return l.getTenantLimiter(now, tenantID).AllowN(now, n)
}

// Limit returns the currently configured maximum overall tokens rate.
func (l *RateLimiter) Limit(now time.Time, tenantID string) float64 {
return float64(l.getTenantLimiter(now, tenantID).Limit())
}

// Burst returns the currently configured maximum burst size.
func (l *RateLimiter) Burst(now time.Time, tenantID string) int {
return l.getTenantLimiter(now, tenantID).Burst()
}

func (l *RateLimiter) getTenantLimiter(now time.Time, tenantID string) *rate.Limiter {
recheck := false

// Check if the per-tenant limiter already exists and if should
// be rechecked because the recheck period has elapsed
l.tenantsLock.RLock()
entry, ok := l.tenants[tenantID]
if ok && !now.Before(entry.recheckAt) {
recheck = true
}
l.tenantsLock.RUnlock()

// If the limiter already exist, we return it, making sure to recheck it
// if the recheck period has elapsed
if ok && recheck {
return l.recheckTenantLimiter(now, tenantID)
} else if ok {
return entry.limiter
}

// Create a new limiter
limit := rate.Limit(l.strategy.Limit(tenantID))
burst := l.strategy.Burst(tenantID)
limiter := rate.NewLimiter(limit, burst)

l.tenantsLock.Lock()
if entry, ok = l.tenants[tenantID]; !ok {
entry = &tenantLimiter{limiter, now.Add(l.recheckPeriod)}
l.tenants[tenantID] = entry
}
l.tenantsLock.Unlock()

return entry.limiter
}

func (l *RateLimiter) recheckTenantLimiter(now time.Time, tenantID string) *rate.Limiter {
limit := rate.Limit(l.strategy.Limit(tenantID))
burst := l.strategy.Burst(tenantID)

l.tenantsLock.Lock()
defer l.tenantsLock.Unlock()

entry := l.tenants[tenantID]

// We check again if the recheck period elapsed, cause it may
// have already been rechecked in the meanwhile.
if now.Before(entry.recheckAt) {
return entry.limiter
}

// Ensure the limiter's limit and burst match the expected value
if entry.limiter.Limit() != limit {
entry.limiter.SetLimitAt(now, limit)
}

if entry.limiter.Burst() != burst {
entry.limiter.SetBurstAt(now, burst)
}

entry.recheckAt = now.Add(l.recheckPeriod)

return entry.limiter
}
129 changes: 129 additions & 0 deletions limiter/rate_limiter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package limiter

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"golang.org/x/time/rate"
)

func TestRateLimiter_RecheckPeriod(t *testing.T) {
strategy := &increasingLimitStrategy{}
limiter := NewRateLimiter(strategy, 10*time.Second)
now := time.Now()

// Since the strategy increases the limit and burst value each time
// the strategy functions are called, we do assert if the recheck
// period is honored increasing the input time
assert.Equal(t, float64(1), limiter.Limit(now, "test"))
assert.Equal(t, 1, limiter.Burst(now, "test"))

assert.Equal(t, float64(1), limiter.Limit(now.Add(9*time.Second), "test"))
assert.Equal(t, 1, limiter.Burst(now.Add(9*time.Second), "test"))

assert.Equal(t, float64(2), limiter.Limit(now.Add(10*time.Second), "test"))
assert.Equal(t, 2, limiter.Burst(now.Add(10*time.Second), "test"))

assert.Equal(t, float64(2), limiter.Limit(now.Add(19*time.Second), "test"))
assert.Equal(t, 2, limiter.Burst(now.Add(19*time.Second), "test"))

assert.Equal(t, float64(3), limiter.Limit(now.Add(20*time.Second), "test"))
assert.Equal(t, 3, limiter.Burst(now.Add(20*time.Second), "test"))
}

func TestRateLimiter_AllowN(t *testing.T) {
strategy := &staticLimitStrategy{tenants: map[string]struct {
limit float64
burst int
}{
"tenant-1": {limit: 10, burst: 20},
"tenant-2": {limit: 20, burst: 40},
}}

limiter := NewRateLimiter(strategy, 10*time.Second)
now := time.Now()

// Tenant #1
assert.Equal(t, true, limiter.AllowN(now, "tenant-1", 8))
assert.Equal(t, true, limiter.AllowN(now, "tenant-1", 10))
assert.Equal(t, false, limiter.AllowN(now, "tenant-1", 3))
assert.Equal(t, true, limiter.AllowN(now, "tenant-1", 2))

assert.Equal(t, true, limiter.AllowN(now.Add(time.Second), "tenant-1", 8))
assert.Equal(t, false, limiter.AllowN(now.Add(time.Second), "tenant-1", 3))
assert.Equal(t, true, limiter.AllowN(now.Add(time.Second), "tenant-1", 2))

// Tenant #2
assert.Equal(t, true, limiter.AllowN(now, "tenant-2", 18))
assert.Equal(t, true, limiter.AllowN(now, "tenant-2", 20))
assert.Equal(t, false, limiter.AllowN(now, "tenant-2", 3))
assert.Equal(t, true, limiter.AllowN(now, "tenant-2", 2))

assert.Equal(t, true, limiter.AllowN(now.Add(time.Second), "tenant-2", 18))
assert.Equal(t, false, limiter.AllowN(now.Add(time.Second), "tenant-2", 3))
assert.Equal(t, true, limiter.AllowN(now.Add(time.Second), "tenant-2", 2))
}

func BenchmarkRateLimiter_CustomMultiTenant(b *testing.B) {
strategy := &increasingLimitStrategy{}
limiter := NewRateLimiter(strategy, 10*time.Second)
now := time.Now()

b.ResetTimer()

for i := 0; i < b.N; i++ {
limiter.AllowN(now, "test", 1)
}
}

func BenchmarkRateLimiter_OriginalSingleTenant(b *testing.B) {
limiter := rate.NewLimiter(rate.Limit(1), 1)
now := time.Now()

b.ResetTimer()

for i := 0; i < b.N; i++ {
limiter.AllowN(now, 1)
}
}

type increasingLimitStrategy struct {
limit float64
burst int
}

func (s *increasingLimitStrategy) Limit(tenantID string) float64 {
s.limit++
return s.limit
}

func (s *increasingLimitStrategy) Burst(tenantID string) int {
s.burst++
return s.burst
}

type staticLimitStrategy struct {
tenants map[string]struct {
limit float64
burst int
}
}

func (s *staticLimitStrategy) Limit(tenantID string) float64 {
tenant, ok := s.tenants[tenantID]
if !ok {
return 0
}

return tenant.limit
}

func (s *staticLimitStrategy) Burst(tenantID string) int {
tenant, ok := s.tenants[tenantID]
if !ok {
return 0
}

return tenant.burst
}

0 comments on commit be68933

Please sign in to comment.