Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingester: Base memory utilization on Go heap size #6584

Merged
merged 3 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
* `http.StatusServiceUnavailable` (503) and `codes.Unknown` are replaced with `codes.Internal`.
* [CHANGE] Upgrade Node.js to v20. #6540
* [CHANGE] Querier: `cortex_querier_blocks_consistency_checks_failed_total` is now incremented when a block couldn't be queried from any attempted store-gateway as opposed to incremented after each attempt. Also `cortex_querier_blocks_consistency_checks_total` is incremented once per query as opposed to once per attempt (with 3 attempts). #6590
* [CHANGE] Ingester: Modify utilization based read path limiter to base memory usage on Go heap size. #6584
* [FEATURE] Distributor: added option `-distributor.retry-after-header.enabled` to include the `Retry-After` header in recoverable error responses. #6608
* [FEATURE] Query-frontend: add experimental support for query blocking. Queries are blocked on a per-tenant basis and is configured via the limit `blocked_queries`. #5609
* [FEATURE] Vault: Added support for new Vault authentication methods: `AppRole`, `Kubernetes`, `UserPass` and `Token`. #6143
Expand Down
46 changes: 26 additions & 20 deletions pkg/util/limiter/utilization.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package limiter
import (
"context"
"fmt"
"runtime"
"strings"
"time"

Expand All @@ -29,21 +30,32 @@ const (
)

type utilizationScanner interface {
// Scan returns CPU time in seconds and memory utilization in bytes, or an error.
// Scan returns CPU time in seconds and Go heap size in bytes, or an error.
Scan() (float64, uint64, error)
}

type procfsScanner struct {
// combinedScanner scans /proc for CPU utilization and Go runtime for heap size.
type combinedScanner struct {
proc procfs.Proc
}

func (s procfsScanner) Scan() (float64, uint64, error) {
func (s combinedScanner) Scan() (float64, uint64, error) {
ps, err := s.proc.Stat()
if err != nil {
return 0, 0, errors.Wrap(err, "failed to get process stats")
}

return ps.CPUTime(), uint64(ps.ResidentMemory()), nil
var m runtime.MemStats
runtime.ReadMemStats(&m)
aknuds1 marked this conversation as resolved.
Show resolved Hide resolved

return ps.CPUTime(), m.HeapInuse, nil
}

func newCombinedScanner() (combinedScanner, error) {
p, err := procfs.Self()
return combinedScanner{
proc: p,
}, err
}

// UtilizationBasedLimiter is a Service offering limiting based on CPU and memory utilization.
Expand All @@ -68,7 +80,7 @@ type UtilizationBasedLimiter struct {
cpuMovingAvg *math.EwmaRate
limitingReason atomic.String
currCPUUtil atomic.Float64
currMemoryUtil atomic.Uint64
currHeapSize atomic.Uint64
// For logging of input to CPU load EWMA calculation, keep window of source samples
cpuSamples *cpuSampleBuffer
}
Expand Down Expand Up @@ -104,7 +116,7 @@ func NewUtilizationBasedLimiter(cpuLimit float64, memoryLimit uint64, logCPUSamp
Name: "utilization_limiter_current_memory_usage_bytes",
Help: "Current memory usage calculated by utilization based limiter.",
}, func() float64 {
return float64(l.currMemoryUtil.Load())
return float64(l.currHeapSize.Load())
})
}

Expand All @@ -118,15 +130,9 @@ func (l *UtilizationBasedLimiter) LimitingReason() string {
}

func (l *UtilizationBasedLimiter) starting(_ context.Context) error {
p, err := procfs.Self()
if err != nil {
return errors.Wrap(err, "unable to detect CPU/memory utilization, unsupported platform. Please disable utilization based limiting")
}

l.utilizationScanner = procfsScanner{
proc: p,
}
return nil
var err error
l.utilizationScanner, err = newCombinedScanner()
return errors.Wrap(err, "unable to detect CPU/memory utilization, unsupported platform. Please disable utilization based limiting")
}

func (l *UtilizationBasedLimiter) update(_ context.Context) error {
Expand All @@ -137,7 +143,7 @@ func (l *UtilizationBasedLimiter) update(_ context.Context) error {
// compute and return the current CPU and memory utilization.
// This function must be called at a regular interval (resourceUtilizationUpdateInterval) to get a predictable behaviour.
func (l *UtilizationBasedLimiter) compute(nowFn func() time.Time) (currCPUUtil float64, currMemoryUtil uint64) {
cpuTime, currMemoryUtil, err := l.utilizationScanner.Scan()
cpuTime, currHeapSize, err := l.utilizationScanner.Scan()
if err != nil {
level.Warn(l.logger).Log("msg", "failed to get CPU and memory stats", "err", err.Error())
// Disable any limiting, since we can't tell resource utilization
Expand All @@ -148,7 +154,7 @@ func (l *UtilizationBasedLimiter) compute(nowFn func() time.Time) (currCPUUtil f
// Get wall time after CPU time, in case there's a delay before CPU time is returned,
// which would cause us to compute too high of a CPU load
now := nowFn()
l.currMemoryUtil.Store(currMemoryUtil)
l.currHeapSize.Store(currHeapSize)

// Add the instant CPU utilization to the moving average. The instant CPU
// utilization can only be computed starting from the 2nd tick.
Expand Down Expand Up @@ -190,7 +196,7 @@ func (l *UtilizationBasedLimiter) compute(nowFn func() time.Time) (currCPUUtil f
}

var reason string
if l.memoryLimit > 0 && currMemoryUtil >= l.memoryLimit {
if l.memoryLimit > 0 && currHeapSize >= l.memoryLimit {
reason = "memory"
} else if l.cpuLimit > 0 && currCPUUtil >= l.cpuLimit {
reason = "cpu"
Expand All @@ -210,11 +216,11 @@ func (l *UtilizationBasedLimiter) compute(nowFn func() time.Time) (currCPUUtil f
logger = log.WithSuffix(logger, "source_samples", l.cpuSamples.String())
}
level.Info(logger).Log("msg", "enabling resource utilization based limiting",
"reason", reason, "memory_limit", formatMemoryLimit(l.memoryLimit), "memory_utilization", formatMemory(currMemoryUtil),
"reason", reason, "memory_limit", formatMemoryLimit(l.memoryLimit), "memory_utilization", formatMemory(currHeapSize),
"cpu_limit", formatCPULimit(l.cpuLimit), "cpu_utilization", formatCPU(currCPUUtil))
} else {
level.Info(l.logger).Log("msg", "disabling resource utilization based limiting",
"memory_limit", formatMemoryLimit(l.memoryLimit), "memory_utilization", formatMemory(currMemoryUtil),
"memory_limit", formatMemoryLimit(l.memoryLimit), "memory_utilization", formatMemory(currHeapSize),
"cpu_limit", formatCPULimit(l.cpuLimit), "cpu_utilization", formatCPU(currCPUUtil))
}

Expand Down
46 changes: 46 additions & 0 deletions pkg/util/limiter/utilization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,3 +404,49 @@ func generateLinearStepCPUUtilization(count int, from, step float64) []float64 {
}
return values
}

func BenchmarkUtilizationBasedLimiter(b *testing.B) {
const gigabyte = 1024 * 1024 * 1024

setup := func(cpuLimit float64, memoryLimit uint64) *UtilizationBasedLimiter {
lim := NewUtilizationBasedLimiter(cpuLimit, memoryLimit, false, log.NewNopLogger(), prometheus.NewPedanticRegistry())
s, err := newCombinedScanner()
require.NoError(b, err)
lim.utilizationScanner = s
require.Empty(b, lim.LimitingReason(), "Limiting should initially be disabled")

return lim
}

tim := time.Now()
nowFn := func() time.Time {
return tim
}

b.ResetTimer()

for i := 0; i < b.N; i++ {
lim := setup(0.11, gigabyte)

// Warm up the CPU utilization.
for i := 0; i < int(resourceUtilizationSlidingWindow.Seconds()); i++ {
lim.compute(nowFn)
tim = tim.Add(resourceUtilizationUpdateInterval)
}

lim.compute(nowFn)
tim = tim.Add(resourceUtilizationUpdateInterval)
}
}

func BenchmarkCombinedScanner(b *testing.B) {
s, err := newCombinedScanner()
require.NoError(b, err)

b.ResetTimer()

for i := 0; i < b.N; i++ {
_, _, err := s.Scan()
require.NoError(b, err)
}
}
Loading