Skip to content

Commit

Permalink
Abstract the logic of the circuit breaker
Browse files Browse the repository at this point in the history
Signed-off-by: Xiaochao Dong (@damnever) <the.xcdong@gmail.com>
  • Loading branch information
damnever committed Feb 19, 2024
1 parent 07d9134 commit e299abe
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 152 deletions.
26 changes: 14 additions & 12 deletions docs/components/query-frontend.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,13 @@ config:
max_get_multi_batch_size: 0
dns_provider_update_interval: 0s
auto_discovery: false
set_async_circuit_breaker_enabled: false
set_async_circuit_breaker_half_open_max_requests: 0
set_async_circuit_breaker_open_duration: 0s
set_async_circuit_breaker_min_requests: 0
set_async_circuit_breaker_consecutive_failures: 0
set_async_circuit_breaker_failure_percent: 0
set_async_circuit_breaker_config:
enabled: false
half_open_max_requests: 0
open_duration: 0s
min_requests: 0
consecutive_failures: 0
failure_percent: 0
expiration: 0s
```

Expand Down Expand Up @@ -138,12 +139,13 @@ config:
master_name: ""
max_async_buffer_size: 10000
max_async_concurrency: 20
set_async_circuit_breaker_enabled: false
set_async_circuit_breaker_half_open_max_requests: 10
set_async_circuit_breaker_open_duration: 5s
set_async_circuit_breaker_min_requests: 50
set_async_circuit_breaker_consecutive_failures: 5
set_async_circuit_breaker_failure_percent: 0.05
set_async_circuit_breaker_config:
enabled: false
half_open_max_requests: 10
open_duration: 5s
min_requests: 50
consecutive_failures: 5
failure_percent: 0.05
expiration: 24h0m0s
```
Expand Down
39 changes: 21 additions & 18 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -325,12 +325,13 @@ config:
max_get_multi_batch_size: 0
dns_provider_update_interval: 0s
auto_discovery: false
set_async_circuit_breaker_enabled: false
set_async_circuit_breaker_half_open_max_requests: 0
set_async_circuit_breaker_open_duration: 0s
set_async_circuit_breaker_min_requests: 0
set_async_circuit_breaker_consecutive_failures: 0
set_async_circuit_breaker_failure_percent: 0
set_async_circuit_breaker_config:
enabled: false
half_open_max_requests: 0
open_duration: 0s
min_requests: 0
consecutive_failures: 0
failure_percent: 0
enabled_items: []
ttl: 0s
```
Expand All @@ -346,16 +347,17 @@ While the remaining settings are **optional**:
- `max_async_concurrency`: maximum number of concurrent asynchronous operations can occur.
- `max_async_buffer_size`: maximum number of enqueued asynchronous operations allowed.
- `max_get_multi_concurrency`: maximum number of concurrent connections when fetching keys. If set to `0`, the concurrency is unlimited.
- `set_async_circuit_breaker_enabled`: `true` to enable circuite breaker for asynchronous operations. The circuit breaker consists of three states: closed, half-open, and open. It begins in the closed state. When the total requests exceed `set_async_circuit_breaker_min_requests`, and either consecutive failures occur or the failure percentage is excessively high according to the configured values, the circuit breaker transitions to the open state. This results in the rejection of all asynchronous operations. After `set_async_circuit_breaker_open_duration`, the circuit breaker transitions to the half-open state, where it allows `set_async_circuit_breaker_half_open_max_requests` asynchronous operations to be processed in order to test if the conditions have improved. If they have not, the state transitions back to open; if they have, it transitions to the closed state. Following each 10 seconds interval in the closed state, the circuit breaker resets its metrics and repeats this cycle.
- `set_async_circuit_breaker_half_open_max_requests`: maximum number of requests allowed to pass through when the circuit breaker is half-open. If set to 0, the circuit breaker allows only 1 request.
- `set_async_circuit_breaker_open_duration`: the period of the open state after which the state of the circuit breaker becomes half-open. If set to 0, the circuit breaker resets it to 60 seconds.
- `set_async_circuit_breaker_min_requests`: minimal requests to trigger the circuit breaker, 0 signifies no requirements.
- `set_async_circuit_breaker_consecutive_failures`: consecutive failures based on `set_async_circuit_breaker_min_requests` to determine if the circuit breaker should open.
- `set_async_circuit_breaker_failure_percent`: the failure percentage, which is based on `set_async_circuit_breaker_min_requests`, to determine if the circuit breaker should open.
- `max_get_multi_batch_size`: maximum number of keys a single underlying operation should fetch. If more keys are specified, internally keys are splitted into multiple batches and fetched concurrently, honoring `max_get_multi_concurrency`. If set to `0`, the batch size is unlimited.
- `max_item_size`: maximum size of an item to be stored in memcached. This option should be set to the same value of memcached `-I` flag (defaults to 1MB) in order to avoid wasting network round trips to store items larger than the max item size allowed in memcached. If set to `0`, the item size is unlimited.
- `dns_provider_update_interval`: the DNS discovery update interval.
- `auto_discovery`: whether to use the auto-discovery mechanism for memcached.
- `set_async_circuit_breaker_config`: the configuration for the circuit breaker for asynchronous set operations.
- `enabled`: `true` to enable circuite breaker for asynchronous operations. The circuit breaker consists of three states: closed, half-open, and open. It begins in the closed state. When the total requests exceed `min_requests`, and either consecutive failures occur or the failure percentage is excessively high according to the configured values, the circuit breaker transitions to the open state. This results in the rejection of all asynchronous operations. After `open_duration`, the circuit breaker transitions to the half-open state, where it allows `half_open_max_requests` asynchronous operations to be processed in order to test if the conditions have improved. If they have not, the state transitions back to open; if they have, it transitions to the closed state. Following each 10 seconds interval in the closed state, the circuit breaker resets its metrics and repeats this cycle.
- `half_open_max_requests`: maximum number of requests allowed to pass through when the circuit breaker is half-open. If set to 0, the circuit breaker allows only 1 request.
- `open_duration`: the period of the open state after which the state of the circuit breaker becomes half-open. If set to 0, the circuit breaker utilizes the default value of 60 seconds.
- `min_requests`: minimal requests to trigger the circuit breaker, 0 signifies no requirements.
- `consecutive_failures`: consecutive failures based on `min_requests` to determine if the circuit breaker should open.
- `failure_percent`: the failure percentage, which is based on `min_requests`, to determine if the circuit breaker should open.
- `enabled_items`: selectively choose what types of items to cache. Supported values are `Postings`, `Series` and `ExpandedPostings`. By default, all items are cached.
- `ttl`: ttl to store index cache items in memcached.

Expand Down Expand Up @@ -388,12 +390,13 @@ config:
master_name: ""
max_async_buffer_size: 10000
max_async_concurrency: 20
set_async_circuit_breaker_enabled: false
set_async_circuit_breaker_half_open_max_requests: 10
set_async_circuit_breaker_open_duration: 5s
set_async_circuit_breaker_min_requests: 50
set_async_circuit_breaker_consecutive_failures: 5
set_async_circuit_breaker_failure_percent: 0.05
set_async_circuit_breaker_config:
enabled: false
half_open_max_requests: 10
open_duration: 5s
min_requests: 50
consecutive_failures: 5
failure_percent: 0.05
enabled_items: []
ttl: 0s
```
Expand Down
69 changes: 67 additions & 2 deletions pkg/cacheutil/cacheutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,29 @@ package cacheutil

import (
"context"
"time"

"golang.org/x/sync/errgroup"

"github.com/pkg/errors"
"github.com/sony/gobreaker"
"golang.org/x/sync/errgroup"

"github.com/thanos-io/thanos/pkg/gate"
)

var (
errCircuitBreakerConsecutiveFailuresNotPositive = errors.New("circuit breaker: consecutive failures must be greater than 0")
errCircuitBreakerFailurePercentInvalid = errors.New("circuit breaker: failure percent must be in range (0,1]")

defaultCircuitBreakerConfig = CircuitBreakerConfig{
Enabled: false,
HalfOpenMaxRequests: 10,
OpenDuration: 5 * time.Second,
MinRequests: 50,
ConsecutiveFailures: 5,
FailurePercent: 0.05,
}
)

// doWithBatch do func with batch and gate. batchSize==0 means one batch. gate==nil means no gate.
func doWithBatch(ctx context.Context, totalSize int, batchSize int, ga gate.Gate, f func(startIndex, endIndex int) error) error {
if totalSize == 0 {
Expand Down Expand Up @@ -48,6 +63,39 @@ type CircuitBreaker interface {
Execute(func() error) error
}

// CircuitBreakerConfig is the config for the circuite breaker.
type CircuitBreakerConfig struct {
// Enabled enables circuite breaker.
Enabled bool `yaml:"enabled"`

// HalfOpenMaxRequests is the maximum number of requests allowed to pass through
// when the circuit breaker is half-open.
// If set to 0, the circuit breaker allows only 1 request.
HalfOpenMaxRequests uint32 `yaml:"half_open_max_requests"`
// OpenDuration is the period of the open state after which the state of the circuit breaker becomes half-open.
// If set to 0, the circuit breaker utilizes the default value of 60 seconds.
OpenDuration time.Duration `yaml:"open_duration"`
// MinRequests is minimal requests to trigger the circuit breaker.
MinRequests uint32 `yaml:"min_requests"`
// ConsecutiveFailures represents consecutive failures based on CircuitBreakerMinRequests to determine if the circuit breaker should open.
ConsecutiveFailures uint32 `yaml:"consecutive_failures"`
// FailurePercent represents the failure percentage, which is based on CircuitBreakerMinRequests, to determine if the circuit breaker should open.
FailurePercent float64 `yaml:"failure_percent"`
}

func (c CircuitBreakerConfig) validate() error {
if !c.Enabled {
return nil
}
if c.ConsecutiveFailures == 0 {
return errCircuitBreakerConsecutiveFailuresNotPositive
}
if c.FailurePercent <= 0 || c.FailurePercent > 1 {
return errCircuitBreakerFailurePercentInvalid
}
return nil
}

type noopCircuitBreaker struct{}

func (noopCircuitBreaker) Execute(f func() error) error { return f() }
Expand All @@ -62,3 +110,20 @@ func (cb gobreakerCircuitBreaker) Execute(f func() error) error {
})
return err
}

func newCircuitBreaker(name string, config CircuitBreakerConfig) CircuitBreaker {
if !config.Enabled {
return noopCircuitBreaker{}
}
return gobreakerCircuitBreaker{gobreaker.NewCircuitBreaker(gobreaker.Settings{
Name: name,
MaxRequests: config.HalfOpenMaxRequests,
Interval: 10 * time.Second,
Timeout: config.OpenDuration,
ReadyToTrip: func(counts gobreaker.Counts) bool {
return counts.Requests >= config.MinRequests &&
(counts.ConsecutiveFailures >= uint32(config.ConsecutiveFailures) ||
float64(counts.TotalFailures)/float64(counts.Requests) >= config.FailurePercent)
},
})}
}
56 changes: 9 additions & 47 deletions pkg/cacheutil/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,9 @@ const (
)

var (
errMemcachedConfigNoAddrs = errors.New("no memcached addresses provided")
errMemcachedDNSUpdateIntervalNotPositive = errors.New("DNS provider update interval must be positive")
errMemcachedMaxAsyncConcurrencyNotPositive = errors.New("max async concurrency must be positive")
errCircuitBreakerConsecutiveFailuresNotPositive = errors.New("set async circuit breaker: consecutive failures must be greater than 0")
errCircuitBreakerFailurePercentInvalid = errors.New("set async circuit breaker: failure percent must be in range (0,1]")
errMemcachedConfigNoAddrs = errors.New("no memcached addresses provided")
errMemcachedDNSUpdateIntervalNotPositive = errors.New("DNS provider update interval must be positive")
errMemcachedMaxAsyncConcurrencyNotPositive = errors.New("max async concurrency must be positive")

defaultMemcachedClientConfig = MemcachedClientConfig{
Timeout: 500 * time.Millisecond,
Expand All @@ -58,12 +56,7 @@ var (
DNSProviderUpdateInterval: 10 * time.Second,
AutoDiscovery: false,

SetAsyncCircuitBreakerEnabled: false,
SetAsyncCircuitBreakerHalfOpenMaxRequests: 10,
SetAsyncCircuitBreakerOpenDuration: 5 * time.Second,
SetAsyncCircuitBreakerMinRequests: 50,
SetAsyncCircuitBreakerConsecutiveFailures: 5,
SetAsyncCircuitBreakerFailurePercent: 0.05,
SetAsyncCircuitBreaker: defaultCircuitBreakerConfig,
}
)

Expand Down Expand Up @@ -152,21 +145,8 @@ type MemcachedClientConfig struct {
// AutoDiscovery configures memached client to perform auto-discovery instead of DNS resolution
AutoDiscovery bool `yaml:"auto_discovery"`

// SetAsyncCircuitBreakerEnabled enables circuite breaker for SetAsync operations.
SetAsyncCircuitBreakerEnabled bool `yaml:"set_async_circuit_breaker_enabled"`
// SetAsyncCircuitBreakerHalfOpenMaxRequests is the maximum number of requests allowed to pass through
// when the circuit breaker is half-open.
// If set to 0, the circuit breaker allows only 1 request.
SetAsyncCircuitBreakerHalfOpenMaxRequests uint32 `yaml:"set_async_circuit_breaker_half_open_max_requests"`
// SetAsyncCircuitBreakerOpenDuration is the period of the open state after which the state of the circuit breaker becomes half-open.
// If set to 0, the circuit breaker resets it to 60 seconds.
SetAsyncCircuitBreakerOpenDuration time.Duration `yaml:"set_async_circuit_breaker_open_duration"`
// SetAsyncCircuitBreakerMinRequests is minimal requests to trigger the circuit breaker.
SetAsyncCircuitBreakerMinRequests uint32 `yaml:"set_async_circuit_breaker_min_requests"`
// SetAsyncCircuitBreakerConsecutiveFailures represents consecutive failures based on CircuitBreakerMinRequests to determine if the circuit breaker should open.
SetAsyncCircuitBreakerConsecutiveFailures uint32 `yaml:"set_async_circuit_breaker_consecutive_failures"`
// SetAsyncCircuitBreakerFailurePercent represents the failure percentage, which is based on CircuitBreakerMinRequests, to determine if the circuit breaker should open.
SetAsyncCircuitBreakerFailurePercent float64 `yaml:"set_async_circuit_breaker_failure_percent"`
// SetAsyncCircuitBreaker configures the circuit breaker for SetAsync operations.
SetAsyncCircuitBreaker CircuitBreakerConfig `yaml:"set_async_circuit_breaker_config"`
}

func (c *MemcachedClientConfig) validate() error {
Expand All @@ -184,13 +164,8 @@ func (c *MemcachedClientConfig) validate() error {
return errMemcachedMaxAsyncConcurrencyNotPositive
}

if c.SetAsyncCircuitBreakerEnabled {
if c.SetAsyncCircuitBreakerConsecutiveFailures == 0 {
return errCircuitBreakerConsecutiveFailuresNotPositive
}
if c.SetAsyncCircuitBreakerFailurePercent <= 0 || c.SetAsyncCircuitBreakerFailurePercent > 1 {
return errCircuitBreakerFailurePercentInvalid
}
if err := c.SetAsyncCircuitBreaker.validate(); err != nil {
return err
}
return nil
}
Expand Down Expand Up @@ -314,20 +289,7 @@ func newMemcachedClient(
gate.Gets,
),
p: NewAsyncOperationProcessor(config.MaxAsyncBufferSize, config.MaxAsyncConcurrency),
setAsyncCircuitBreaker: noopCircuitBreaker{},
}
if config.SetAsyncCircuitBreakerEnabled {
c.setAsyncCircuitBreaker = gobreakerCircuitBreaker{gobreaker.NewCircuitBreaker(gobreaker.Settings{
Name: "memcached-set-async",
MaxRequests: config.SetAsyncCircuitBreakerHalfOpenMaxRequests,
Interval: 10 * time.Second,
Timeout: config.SetAsyncCircuitBreakerOpenDuration,
ReadyToTrip: func(counts gobreaker.Counts) bool {
return counts.Requests >= config.SetAsyncCircuitBreakerMinRequests &&
(counts.ConsecutiveFailures >= uint32(config.SetAsyncCircuitBreakerConsecutiveFailures) ||
float64(counts.TotalFailures)/float64(counts.Requests) >= config.SetAsyncCircuitBreakerFailurePercent)
},
})}
setAsyncCircuitBreaker: newCircuitBreaker("memcached-set-async", config.SetAsyncCircuitBreaker),
}

c.clientInfo = promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{
Expand Down
54 changes: 30 additions & 24 deletions pkg/cacheutil/memcached_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,33 +64,39 @@ func TestMemcachedClientConfig_validate(t *testing.T) {
},
"should fail on circuit_breaker_consecutive_failures = 0": {
config: MemcachedClientConfig{
Addresses: []string{"127.0.0.1:11211"},
MaxAsyncConcurrency: 1,
DNSProviderUpdateInterval: time.Second,
SetAsyncCircuitBreakerEnabled: true,
SetAsyncCircuitBreakerConsecutiveFailures: 0,
Addresses: []string{"127.0.0.1:11211"},
MaxAsyncConcurrency: 1,
DNSProviderUpdateInterval: time.Second,
SetAsyncCircuitBreaker: CircuitBreakerConfig{
Enabled: true,
ConsecutiveFailures: 0,
},
},
expected: errCircuitBreakerConsecutiveFailuresNotPositive,
},
"should fail on circuit_breaker_failure_percent <= 0": {
config: MemcachedClientConfig{
Addresses: []string{"127.0.0.1:11211"},
MaxAsyncConcurrency: 1,
DNSProviderUpdateInterval: time.Second,
SetAsyncCircuitBreakerEnabled: true,
SetAsyncCircuitBreakerConsecutiveFailures: 1,
SetAsyncCircuitBreakerFailurePercent: 0,
Addresses: []string{"127.0.0.1:11211"},
MaxAsyncConcurrency: 1,
DNSProviderUpdateInterval: time.Second,
SetAsyncCircuitBreaker: CircuitBreakerConfig{
Enabled: true,
ConsecutiveFailures: 1,
FailurePercent: 0,
},
},
expected: errCircuitBreakerFailurePercentInvalid,
},
"should fail on circuit_breaker_failure_percent >= 1": {
config: MemcachedClientConfig{
Addresses: []string{"127.0.0.1:11211"},
MaxAsyncConcurrency: 1,
DNSProviderUpdateInterval: time.Second,
SetAsyncCircuitBreakerEnabled: true,
SetAsyncCircuitBreakerConsecutiveFailures: 1,
SetAsyncCircuitBreakerFailurePercent: 1.1,
Addresses: []string{"127.0.0.1:11211"},
MaxAsyncConcurrency: 1,
DNSProviderUpdateInterval: time.Second,
SetAsyncCircuitBreaker: CircuitBreakerConfig{
Enabled: true,
ConsecutiveFailures: 1,
FailurePercent: 1.1,
},
},
expected: errCircuitBreakerFailurePercentInvalid,
},
Expand Down Expand Up @@ -719,12 +725,12 @@ func TestMemcachedClient_SetAsync_CircuitBreaker(t *testing.T) {
t.Run(testdata.name, func(t *testing.T) {
config := defaultMemcachedClientConfig
config.Addresses = []string{"127.0.0.1:11211"}
config.SetAsyncCircuitBreakerEnabled = true
config.SetAsyncCircuitBreakerOpenDuration = 2 * time.Millisecond
config.SetAsyncCircuitBreakerHalfOpenMaxRequests = 100
config.SetAsyncCircuitBreakerMinRequests = testdata.minRequests
config.SetAsyncCircuitBreakerConsecutiveFailures = testdata.consecutiveFailures
config.SetAsyncCircuitBreakerFailurePercent = testdata.failurePercent
config.SetAsyncCircuitBreaker.Enabled = true
config.SetAsyncCircuitBreaker.OpenDuration = 2 * time.Millisecond
config.SetAsyncCircuitBreaker.HalfOpenMaxRequests = 100
config.SetAsyncCircuitBreaker.MinRequests = testdata.minRequests
config.SetAsyncCircuitBreaker.ConsecutiveFailures = testdata.consecutiveFailures
config.SetAsyncCircuitBreaker.FailurePercent = testdata.failurePercent

backendMock := newMemcachedClientBackendMock()
backendMock.setErrors = testdata.setErrors
Expand All @@ -746,7 +752,7 @@ func TestMemcachedClient_SetAsync_CircuitBreaker(t *testing.T) {
testutil.Ok(t, client.SetAsync(strconv.Itoa(testdata.setErrors), []byte("value"), time.Second))
testutil.Equals(t, gobreaker.StateOpen, cbimpl.State(), "state should be open")

time.Sleep(config.SetAsyncCircuitBreakerOpenDuration)
time.Sleep(config.SetAsyncCircuitBreaker.OpenDuration)
for i := testdata.setErrors; i < testdata.setErrors+10; i++ {
testutil.Ok(t, client.SetAsync(strconv.Itoa(i), []byte("value"), time.Second))
}
Expand Down
Loading

0 comments on commit e299abe

Please sign in to comment.