Skip to content

Commit

Permalink
Make sure to check for inflight bytes even if new request size is not…
Browse files Browse the repository at this point in the history
… known.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
  • Loading branch information
pstibrany committed Nov 1, 2023
1 parent d699abe commit 6b294ad
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 6 deletions.
12 changes: 9 additions & 3 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -779,8 +779,12 @@ func (i *Ingester) StartPushRequest(requestSize int64) error {

inflight := i.inflightPushRequests.Inc()
inflightBytes := int64(0)
rejectEqualInflightBytes := false
if requestSize > 0 {
inflightBytes = i.inflightPushRequestsBytes.Add(requestSize)
} else {
inflightBytes = i.inflightPushRequestsBytes.Load()
rejectEqualInflightBytes = true // if inflightBytes == limit, reject new request
}

finishRequestInDefer := true
Expand All @@ -797,9 +801,11 @@ func (i *Ingester) StartPushRequest(requestSize int64) error {
return errMaxInflightRequestsReached
}

if il.MaxInflightPushRequestsBytes > 0 && inflightBytes > il.MaxInflightPushRequestsBytes {
i.metrics.rejected.WithLabelValues(reasonIngesterMaxInflightPushRequestsBytes).Inc()
return errMaxInflightRequestsBytesReached
if il.MaxInflightPushRequestsBytes > 0 {
if (rejectEqualInflightBytes && inflightBytes >= il.MaxInflightPushRequestsBytes) || inflightBytes > il.MaxInflightPushRequestsBytes {
i.metrics.rejected.WithLabelValues(reasonIngesterMaxInflightPushRequestsBytes).Inc()
return errMaxInflightRequestsBytesReached
}
}

if il.MaxIngestionRate > 0 {
Expand Down
15 changes: 12 additions & 3 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6282,7 +6282,7 @@ func TestIngester_inflightPushRequestsBytes(t *testing.T) {
g.Go(func() error {
req := prepareRequestForTargetRequestDuration(ctx, t, i, targetRequestDuration)

// Update instance limits
// Update instance limits. Set limit to EXACTLY the request size.
limitsMx.Lock()
limits.MaxInflightPushRequestsBytes = int64(req.Size())
limitsMx.Unlock()
Expand All @@ -6295,7 +6295,7 @@ func TestIngester_inflightPushRequestsBytes(t *testing.T) {
})

g.Go(func() error {
req := generateSamplesForLabel(labels.FromStrings(labels.MetricName, "testcase"), 1, 1024)
req := generateSamplesForLabel(labels.FromStrings(labels.MetricName, "testcase1"), 1, 1024)

select {
case <-ctx.Done():
Expand All @@ -6308,6 +6308,15 @@ func TestIngester_inflightPushRequestsBytes(t *testing.T) {
return i.inflightPushRequests.Load()
})

// Starting push request fails
err = i.StartPushRequest(100)
require.ErrorIs(t, err, errMaxInflightRequestsBytesReached)

// Starting push request with unknown size fails
err = i.StartPushRequest(0)
require.ErrorIs(t, err, errMaxInflightRequestsBytesReached)

// Sending push request fails
_, err := i.Push(ctx, req)
require.ErrorIs(t, err, errMaxInflightRequestsBytesReached)

Expand All @@ -6329,7 +6338,7 @@ func TestIngester_inflightPushRequestsBytes(t *testing.T) {
# HELP cortex_ingester_instance_rejected_requests_total Requests rejected for hitting per-instance limits
# TYPE cortex_ingester_instance_rejected_requests_total counter
cortex_ingester_instance_rejected_requests_total{reason="ingester_max_inflight_push_requests"} 0
cortex_ingester_instance_rejected_requests_total{reason="ingester_max_inflight_push_requests_bytes"} 1
cortex_ingester_instance_rejected_requests_total{reason="ingester_max_inflight_push_requests_bytes"} 3
cortex_ingester_instance_rejected_requests_total{reason="ingester_max_ingestion_rate"} 0
cortex_ingester_instance_rejected_requests_total{reason="ingester_max_series"} 0
cortex_ingester_instance_rejected_requests_total{reason="ingester_max_tenants"} 0
Expand Down

0 comments on commit 6b294ad

Please sign in to comment.