Skip to content

Commit

Permalink
Create base error type for ingester per-instance errors
Browse files Browse the repository at this point in the history
This allows us to decorate them with extra information for gRPC responses
and our logging middleware (to prevent them from being logged, increasing
resource usage).

Related #5581
Related weaveworks/common#299

Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com>
  • Loading branch information
56quarters committed Aug 3, 2023
1 parent 1e2aca9 commit aec69ee
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* [CHANGE] Querier: Renamed `-querier.prefer-streaming-chunks` to `-querier.prefer-streaming-chunks-from-ingesters` to enable streaming chunks from ingesters to queriers. #5182
* [CHANGE] Querier: `-query-frontend.cache-unaligned-requests` has been moved from a global flag to a per-tenant override. #5312
* [CHANGE] Ingester: removed `cortex_ingester_shipper_dir_syncs_total` and `cortex_ingester_shipper_dir_sync_failures_total` metrics. The former metric was not much useful, and the latter was never incremented. #5396
* [CHANGE] Ingester: Do not log errors related to hitting per-instance limits to reduce resource usage when ingesters are under pressure. #5585
* [CHANGE] gRPC clients: use default connect timeout of 5s, and therefore enable default connect backoff max delay of 5s. #5562
* [FEATURE] Cardinality API: Add a new `count_method` parameter which enables counting active series #5136
* [FEATURE] Query-frontend: added experimental support to cache cardinality, label names and label values query responses. The cache will be used when `-query-frontend.cache-results` is enabled, and `-query-frontend.results-cache-ttl-for-cardinality-query` or `-query-frontend.results-cache-ttl-for-labels-query` set to a value greater than 0. The following metrics have been added to track the query results cache hit ratio per `request_type`: #5212 #5235 #5426 #5524
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v1.0.0 // indirect
github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect
github.com/cenkalti/backoff/v3 v3.2.2 // indirect
github.com/go-test/deep v1.1.0 // indirect
github.com/hashicorp/go-retryablehttp v0.7.4 // indirect
Expand All @@ -100,6 +99,7 @@ require (
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v1.1.1 // indirect
github.com/DmitriyVTitov/size v1.5.0 // indirect
github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/armon/go-metrics v0.4.1 // indirect
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect
Expand Down
16 changes: 13 additions & 3 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,11 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, pushReq *push.Request) (

db, err := i.getOrCreateTSDB(userID, false)
if err != nil {
// Check for a particular per-instance limit and return that error directly
// since it contains extra information for gRPC and our logging middleware.
if errors.Is(err, errMaxTenantsReached) {
return nil, err
}
return nil, wrapWithUser(err, userID)
}

Expand Down Expand Up @@ -805,7 +810,12 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, pushReq *push.Request) (
level.Warn(i.logger).Log("msg", "failed to rollback appender on error", "user", userID, "err", err)
}

return nil, err
// Check for a particular per-instance limit and return that error directly
// since it contains extra information for gRPC and our logging middleware.
if errors.Is(err, errMaxInMemorySeriesReached) {
return nil, err
}
return nil, wrapWithUser(err, userID)
}

// At this point all samples have been added to the appender, so we can track the time it took.
Expand Down Expand Up @@ -1038,7 +1048,7 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre
}

// Otherwise, return a 500.
return wrapWithUser(err, userID)
return err
}

numNativeHistogramBuckets := -1
Expand Down Expand Up @@ -1079,7 +1089,7 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre
continue
}

return wrapWithUser(err, userID)
return err
}
numNativeHistograms := len(ts.Histograms)
if numNativeHistograms > 0 {
Expand Down
11 changes: 6 additions & 5 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ import (
"github.com/grafana/mimir/pkg/storage/tsdb/block"
"github.com/grafana/mimir/pkg/usagestats"
"github.com/grafana/mimir/pkg/util"
util_log "github.com/grafana/mimir/pkg/util/log"
util_math "github.com/grafana/mimir/pkg/util/math"
"github.com/grafana/mimir/pkg/util/push"
util_test "github.com/grafana/mimir/pkg/util/test"
Expand Down Expand Up @@ -5461,7 +5460,7 @@ func TestIngester_PushInstanceLimits(t *testing.T) {
},
},

expectedErr: wrapWithUser(errMaxInMemorySeriesReached, "test"),
expectedErr: errMaxInMemorySeriesReached,
},

"should fail creating two users": {
Expand All @@ -5488,7 +5487,7 @@ func TestIngester_PushInstanceLimits(t *testing.T) {
),
},
},
expectedErr: wrapWithUser(errMaxTenantsReached, "user2"),
expectedErr: errMaxTenantsReached,
},

"should fail pushing samples in two requests due to rate limit": {
Expand Down Expand Up @@ -5687,10 +5686,12 @@ func TestIngester_inflightPushRequests(t *testing.T) {

time.Sleep(10 * time.Millisecond) // Give first goroutine a chance to start pushing...
req := generateSamplesForLabel(labels.FromStrings(labels.MetricName, "testcase"), 1, 1024)
var optional middleware.OptionalLogging

_, err := i.Push(ctx, req)
require.Equal(t, errMaxInflightRequestsReached, err)
require.ErrorAs(t, err, &util_log.DoNotLogError{})
require.ErrorIs(t, err, errMaxInflightRequestsReached)
require.ErrorAs(t, err, &optional)
require.False(t, optional.ShouldLog(ctx, time.Duration(0)), "expected not to log via .ShouldLog()")
return nil
})

Expand Down
43 changes: 36 additions & 7 deletions pkg/ingester/instance_limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
package ingester

import (
"context"
"flag"
"time"

"github.com/pkg/errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"gopkg.in/yaml.v3"

"github.com/grafana/mimir/pkg/util/globalerror"
util_log "github.com/grafana/mimir/pkg/util/log"
)

const (
Expand All @@ -22,14 +24,41 @@ const (
maxInflightPushRequestsFlag = "ingester.instance-limits.max-inflight-push-requests"
)

// We don't include values in the messages for per-instance limits to avoid leaking Mimir cluster configuration to users.
var (
// We don't include values in the message to avoid leaking Mimir cluster configuration to users.
errMaxIngestionRateReached = errors.New(globalerror.IngesterMaxIngestionRate.MessageWithPerInstanceLimitConfig("the write request has been rejected because the ingester exceeded the samples ingestion rate limit", maxIngestionRateFlag))
errMaxTenantsReached = errors.New(globalerror.IngesterMaxTenants.MessageWithPerInstanceLimitConfig("the write request has been rejected because the ingester exceeded the allowed number of tenants", maxInMemoryTenantsFlag))
errMaxInMemorySeriesReached = errors.New(globalerror.IngesterMaxInMemorySeries.MessageWithPerInstanceLimitConfig("the write request has been rejected because the ingester exceeded the allowed number of in-memory series", maxInMemorySeriesFlag))
errMaxInflightRequestsReached = util_log.DoNotLogError{Err: errors.New(globalerror.IngesterMaxInflightPushRequests.MessageWithPerInstanceLimitConfig("the write request has been rejected because the ingester exceeded the allowed number of inflight push requests", maxInflightPushRequestsFlag))}
errMaxIngestionRateReached = newInstanceLimitError(globalerror.IngesterMaxIngestionRate.MessageWithPerInstanceLimitConfig("the write request has been rejected because the ingester exceeded the samples ingestion rate limit", maxIngestionRateFlag))
errMaxTenantsReached = newInstanceLimitError(globalerror.IngesterMaxTenants.MessageWithPerInstanceLimitConfig("the write request has been rejected because the ingester exceeded the allowed number of tenants", maxInMemoryTenantsFlag))
errMaxInMemorySeriesReached = newInstanceLimitError(globalerror.IngesterMaxInMemorySeries.MessageWithPerInstanceLimitConfig("the write request has been rejected because the ingester exceeded the allowed number of in-memory series", maxInMemorySeriesFlag))
errMaxInflightRequestsReached = newInstanceLimitError(globalerror.IngesterMaxInflightPushRequests.MessageWithPerInstanceLimitConfig("the write request has been rejected because the ingester exceeded the allowed number of inflight push requests", maxInflightPushRequestsFlag))
)

type instanceLimitErr struct {
msg string
status *status.Status
}

func newInstanceLimitError(msg string) error {
return &instanceLimitErr{
// Errors from hitting per-instance limits are always "unavailable" for gRPC
status: status.New(codes.Unavailable, msg),
msg: msg,
}
}

func (e *instanceLimitErr) ShouldLog(context.Context, time.Duration) bool {
// We increment metrics when hitting per-instance limits and so there's no need to
// log them, the error doesn't contain any interesting information for us.
return false
}

func (e *instanceLimitErr) GRPCStatus() *status.Status {
return e.status
}

func (e *instanceLimitErr) Error() string {
return e.msg
}

// InstanceLimits describes limits used by ingester. Reaching any of these will result in Push method to return
// (internal) error.
type InstanceLimits struct {
Expand Down

0 comments on commit aec69ee

Please sign in to comment.