Skip to content

Commit

Permalink
Rename label name path to request_type and label value write to push
Browse files Browse the repository at this point in the history
Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>
  • Loading branch information
duricanikolic committed Jun 10, 2024
1 parent 2a002ef commit 3037b08
Show file tree
Hide file tree
Showing 8 changed files with 634 additions and 634 deletions.
86 changes: 43 additions & 43 deletions pkg/ingester/circuitbreaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,48 +26,48 @@ const (
circuitBreakerResultOpen = "circuit_breaker_open"
circuitBreakerDefaultPushTimeout = 2 * time.Second
circuitBreakerDefaultReadTimeout = 30 * time.Second
circuitBreakerWritePath = "write"
circuitBreakerReadPath = "read"
circuitBreakerPushRequestType = "push"
circuitBreakerReadRequestType = "read"
)

type circuitBreakerMetrics struct {
circuitBreakerTransitions *prometheus.CounterVec
circuitBreakerResults *prometheus.CounterVec
}

func newCircuitBreakerMetrics(r prometheus.Registerer, currentStateFn func(string) circuitbreaker.State, paths []string) *circuitBreakerMetrics {
func newCircuitBreakerMetrics(r prometheus.Registerer, currentStateFn func(string) circuitbreaker.State, requestTypes []string) *circuitBreakerMetrics {
cbMetrics := &circuitBreakerMetrics{
circuitBreakerTransitions: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingester_circuit_breaker_transitions_total",
Help: "Number of times the circuit breaker has entered a state.",
}, []string{"path", "state"}),
}, []string{"request_type", "state"}),
circuitBreakerResults: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingester_circuit_breaker_results_total",
Help: "Results of executing requests via the circuit breaker.",
}, []string{"path", "result"}),
}, []string{"request_type", "result"}),
}
circuitBreakerCurrentStateGaugeFn := func(path string, state circuitbreaker.State) prometheus.GaugeFunc {
circuitBreakerCurrentStateGaugeFn := func(requestType string, state circuitbreaker.State) prometheus.GaugeFunc {
return promauto.With(r).NewGaugeFunc(prometheus.GaugeOpts{
Name: "cortex_ingester_circuit_breaker_current_state",
Help: "Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name.",
ConstLabels: map[string]string{"path": path, "state": state.String()},
ConstLabels: map[string]string{"request_type": requestType, "state": state.String()},
}, func() float64 {
if currentStateFn(path) == state {
if currentStateFn(requestType) == state {
return 1
}
return 0
})
}
for _, path := range paths {
for _, requestType := range requestTypes {
for _, s := range []circuitbreaker.State{circuitbreaker.OpenState, circuitbreaker.HalfOpenState, circuitbreaker.ClosedState} {
circuitBreakerCurrentStateGaugeFn(path, s)
// We initialize all possible states for the given path and the circuitBreakerTransitions metrics
cbMetrics.circuitBreakerTransitions.WithLabelValues(path, s.String())
circuitBreakerCurrentStateGaugeFn(requestType, s)
// We initialize all possible states for the given requestType and the circuitBreakerTransitions metrics
cbMetrics.circuitBreakerTransitions.WithLabelValues(requestType, s.String())
}

for _, r := range []string{circuitBreakerResultSuccess, circuitBreakerResultError, circuitBreakerResultOpen} {
// We initialize all possible results for the given path and the circuitBreakerResults metrics
cbMetrics.circuitBreakerResults.WithLabelValues(path, r)
// We initialize all possible results for the given requestType and the circuitBreakerResults metrics
cbMetrics.circuitBreakerResults.WithLabelValues(requestType, r)
}
}
return cbMetrics
Expand Down Expand Up @@ -97,46 +97,46 @@ func (cfg *CircuitBreakerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.
// circuitBreaker abstracts the ingester's server-side circuit breaker functionality.
// A nil *circuitBreaker is a valid noop implementation.
type circuitBreaker struct {
cfg CircuitBreakerConfig
path string
logger log.Logger
metrics *circuitBreakerMetrics
active atomic.Bool
cb circuitbreaker.CircuitBreaker[any]
cfg CircuitBreakerConfig
requestType string
logger log.Logger
metrics *circuitBreakerMetrics
active atomic.Bool
cb circuitbreaker.CircuitBreaker[any]

// testRequestDelay is needed for testing purposes to simulate long-lasting requests
testRequestDelay time.Duration
}

func newCircuitBreaker(cfg CircuitBreakerConfig, metrics *circuitBreakerMetrics, path string, logger log.Logger) *circuitBreaker {
func newCircuitBreaker(cfg CircuitBreakerConfig, metrics *circuitBreakerMetrics, requestType string, logger log.Logger) *circuitBreaker {
if !cfg.Enabled {
return nil
}
active := atomic.NewBool(false)
cb := circuitBreaker{
cfg: cfg,
path: path,
logger: logger,
metrics: metrics,
active: *active,
cfg: cfg,
requestType: requestType,
logger: logger,
metrics: metrics,
active: *active,
}

circuitBreakerTransitionsCounterFn := func(metrics *circuitBreakerMetrics, path string, state circuitbreaker.State) prometheus.Counter {
return metrics.circuitBreakerTransitions.WithLabelValues(path, state.String())
circuitBreakerTransitionsCounterFn := func(metrics *circuitBreakerMetrics, requestType string, state circuitbreaker.State) prometheus.Counter {
return metrics.circuitBreakerTransitions.WithLabelValues(requestType, state.String())
}

cbBuilder := circuitbreaker.Builder[any]().
WithDelay(cfg.CooldownPeriod).
OnClose(func(event circuitbreaker.StateChangedEvent) {
circuitBreakerTransitionsCounterFn(cb.metrics, path, circuitbreaker.ClosedState).Inc()
circuitBreakerTransitionsCounterFn(cb.metrics, requestType, circuitbreaker.ClosedState).Inc()
level.Info(logger).Log("msg", "circuit breaker is closed", "previous", event.OldState, "current", event.NewState)
}).
OnOpen(func(event circuitbreaker.StateChangedEvent) {
circuitBreakerTransitionsCounterFn(cb.metrics, path, circuitbreaker.OpenState).Inc()
circuitBreakerTransitionsCounterFn(cb.metrics, requestType, circuitbreaker.OpenState).Inc()
level.Warn(logger).Log("msg", "circuit breaker is open", "previous", event.OldState, "current", event.NewState)
}).
OnHalfOpen(func(event circuitbreaker.StateChangedEvent) {
circuitBreakerTransitionsCounterFn(cb.metrics, path, circuitbreaker.HalfOpenState).Inc()
circuitBreakerTransitionsCounterFn(cb.metrics, requestType, circuitbreaker.HalfOpenState).Inc()
level.Info(logger).Log("msg", "circuit breaker is half-open", "previous", event.OldState, "current", event.NewState)
})

Expand Down Expand Up @@ -205,8 +205,8 @@ func (cb *circuitBreaker) tryAcquirePermit() (bool, error) {
return false, nil
}
if !cb.cb.TryAcquirePermit() {
cb.metrics.circuitBreakerResults.WithLabelValues(cb.path, circuitBreakerResultOpen).Inc()
return false, newCircuitBreakerOpenError(cb.path, cb.cb.RemainingDelay())
cb.metrics.circuitBreakerResults.WithLabelValues(cb.requestType, circuitBreakerResultOpen).Inc()
return false, newCircuitBreakerOpenError(cb.requestType, cb.cb.RemainingDelay())
}
return true, nil
}
Expand Down Expand Up @@ -237,12 +237,12 @@ func (cb *circuitBreaker) recordResult(errs ...error) error {
for _, err := range errs {
if err != nil && isCircuitBreakerFailure(err) {
cb.cb.RecordFailure()
cb.metrics.circuitBreakerResults.WithLabelValues(cb.path, circuitBreakerResultError).Inc()
cb.metrics.circuitBreakerResults.WithLabelValues(cb.requestType, circuitBreakerResultError).Inc()
return err
}
}
cb.cb.RecordSuccess()
cb.metrics.circuitBreakerResults.WithLabelValues(cb.path, circuitBreakerResultSuccess).Inc()
cb.metrics.circuitBreakerResults.WithLabelValues(cb.requestType, circuitBreakerResultSuccess).Inc()
return nil
}

Expand All @@ -253,22 +253,22 @@ type ingesterCircuitBreaker struct {

func newIngesterCircuitBreaker(pushCfg CircuitBreakerConfig, readCfg CircuitBreakerConfig, logger log.Logger, registerer prometheus.Registerer) *ingesterCircuitBreaker {
prCB := &ingesterCircuitBreaker{}
state := func(path string) circuitbreaker.State {
switch path {
case circuitBreakerWritePath:
state := func(requestType string) circuitbreaker.State {
switch requestType {
case circuitBreakerPushRequestType:
if prCB.push.isActive() {
return prCB.push.cb.State()
}
case circuitBreakerReadPath: