Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Receive: Allow remote write request limits to be defined per file and tenant #5565

Merged
merged 24 commits into from
Sep 6, 2022
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
e50c690
Allow per-tenant limits to be configured via file
douglascamata Aug 2, 2022
da60fe7
Refactor Receive's limiting logic
douglascamata Aug 2, 2022
ddbde37
Fix some methods that were in plural
douglascamata Aug 2, 2022
62fa80b
Improve metric description
douglascamata Aug 2, 2022
bf6f7b7
Merge branch 'main' of https://github.com/thanos-io/thanos into limit…
douglascamata Aug 2, 2022
6afb222
Add a TODO for later
douglascamata Aug 2, 2022
9534029
Do some cleanup after moving limits to config file
douglascamata Aug 2, 2022
0bfd2ec
Isolate rest of limiting logic from the handler
douglascamata Aug 2, 2022
f67ff0b
Small refactor to the request limiter
douglascamata Aug 2, 2022
4cc442d
Rename MergeWith -> OverlayWith
douglascamata Aug 2, 2022
0b3046f
Merge branch 'main' of https://github.com/thanos-io/thanos into limit…
douglascamata Aug 3, 2022
6926b0e
Update changelog
douglascamata Aug 3, 2022
3e8df8d
Update documentation
douglascamata Aug 3, 2022
babbb22
Add missing copyright notice to few files
douglascamata Aug 3, 2022
a4ae1d1
Fix test after change in config file tenants
douglascamata Aug 3, 2022
b466ffa
Retrigger CI because of bundled-Cortex failing test
douglascamata Aug 4, 2022
9dd8644
Expose default limits as metrics
douglascamata Aug 4, 2022
ffb9842
Retrigger CI
douglascamata Aug 5, 2022
ff4efde
Merge branch 'main' of https://github.com/thanos-io/thanos into limit…
douglascamata Aug 9, 2022
dc6d25d
Merge branch 'main' of https://github.com/thanos-io/thanos into limit…
douglascamata Aug 9, 2022
e5c171d
Replace comment with a TODOs
douglascamata Aug 10, 2022
246ea5c
Merge branch 'main' of github.com:thanos-io/thanos into limits-per-te…
douglascamata Aug 29, 2022
25ba51d
Fix changelog after bad merge
douglascamata Aug 29, 2022
ca55514
Merge branch 'main' of github.com:thanos-io/thanos into limits-per-te…
douglascamata Aug 30, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#5566](https://github.com/thanos-io/thanos/pull/5566) Receive: Added experimental support to enable chunk write queue via `--tsdb.write-queue-size` flag.
- [#5575](https://github.com/thanos-io/thanos/pull/5575) Receive: Add support for gRPC compression with snappy.
- [#5439](https://github.com/thanos-io/thanos/pull/5439) Add Alert ThanosQueryOverload to Mixin.
- [#5565](https://github.com/thanos-io/thanos/pull/5565) Receive: Allow remote write request limits to be defined per file and tenant.

### Changed

Expand Down
87 changes: 36 additions & 51 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,18 @@ func runReceive(
return errors.Wrap(err, "parse relabel configuration")
}

var limitsConfig *receive.RootLimitsConfig
if conf.limitsConfig != nil {
limitsContentYaml, err := conf.limitsConfig.Content()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @fpetkovski mentioned, having some /~/reload endpoint like we have in Thanos Ruler, to reload such configuration would be amazing, as adding some tenant limits now, would mean redeploying.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, leaving this for another PR to avoid this one getting too big.

if err != nil {
return errors.Wrap(err, "get content of limit configuration")
}
limitsConfig, err = receive.ParseRootLimitConfig(limitsContentYaml)
if err != nil {
return errors.Wrap(err, "parse limit configuration")
}
}

// Impose active series limit only if Receiver is in Router or RouterIngestor mode, and config is provided.
seriesLimitSupported := (receiveMode == receive.RouterOnly || receiveMode == receive.RouterIngestor) && conf.maxPerTenantLimit != 0

Expand All @@ -210,31 +222,28 @@ func runReceive(
)
writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs)
webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{
Writer: writer,
ListenAddress: conf.rwAddress,
Registry: reg,
Endpoint: conf.endpoint,
TenantHeader: conf.tenantHeader,
TenantField: conf.tenantField,
DefaultTenantID: conf.defaultTenantID,
ReplicaHeader: conf.replicaHeader,
ReplicationFactor: conf.replicationFactor,
RelabelConfigs: relabelConfig,
ReceiverMode: receiveMode,
Tracer: tracer,
TLSConfig: rwTLSConfig,
DialOpts: dialOpts,
ForwardTimeout: time.Duration(*conf.forwardTimeout),
TSDBStats: dbs,
SeriesLimitSupported: seriesLimitSupported,
MaxPerTenantLimit: conf.maxPerTenantLimit,
MetaMonitoringUrl: conf.metaMonitoringUrl,
MetaMonitoringHttpClient: conf.metaMonitoringHttpClient,
MetaMonitoringLimitQuery: conf.metaMonitoringLimitQuery,
WriteSeriesLimit: conf.writeSeriesLimit,
WriteSamplesLimit: conf.writeSamplesLimit,
WriteRequestSizeLimit: conf.writeRequestSizeLimit,
WriteRequestConcurrencyLimit: conf.writeRequestConcurrencyLimit,
Writer: writer,
ListenAddress: conf.rwAddress,
Registry: reg,
Endpoint: conf.endpoint,
TenantHeader: conf.tenantHeader,
TenantField: conf.tenantField,
DefaultTenantID: conf.defaultTenantID,
ReplicaHeader: conf.replicaHeader,
ReplicationFactor: conf.replicationFactor,
RelabelConfigs: relabelConfig,
ReceiverMode: receiveMode,
Tracer: tracer,
TLSConfig: rwTLSConfig,
DialOpts: dialOpts,
ForwardTimeout: time.Duration(*conf.forwardTimeout),
TSDBStats: dbs,
LimitsConfig: limitsConfig,
SeriesLimitSupported: seriesLimitSupported,
MaxPerTenantLimit: conf.maxPerTenantLimit,
MetaMonitoringUrl: conf.metaMonitoringUrl,
MetaMonitoringHttpClient: conf.metaMonitoringHttpClient,
MetaMonitoringLimitQuery: conf.metaMonitoringLimitQuery,
})

grpcProbe := prober.NewGRPC()
Expand Down Expand Up @@ -810,10 +819,7 @@ type receiveConfig struct {
reqLogConfig *extflag.PathOrContent
relabelConfigPath *extflag.PathOrContent

writeSeriesLimit int64
writeSamplesLimit int64
writeRequestSizeLimit int64
writeRequestConcurrencyLimit int
limitsConfig *extflag.PathOrContent
}

func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -921,28 +927,7 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {

rc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd)

// TODO(douglascamata): Allow all these limits to be configured per tenant
// and move the configuration to a file. Then this is done, remove the
// "hidden" modifier on all these flags.
cmd.Flag("receive.write-request-limits.max-series",
"The maximum amount of series accepted in remote write requests."+
"The default is no limit, represented by 0.").
Default("0").Hidden().Int64Var(&rc.writeSeriesLimit)

cmd.Flag("receive.write-request-limits.max-samples",
"The maximum amount of samples accepted in remote write requests."+
"The default is no limit, represented by 0.").
Default("0").Hidden().Int64Var(&rc.writeSamplesLimit)

cmd.Flag("receive.write-request-limits.max-size-bytes",
"The maximum size (in bytes) of remote write requests."+
"The default is no limit, represented by 0.").
Default("0").Hidden().Int64Var(&rc.writeRequestSizeLimit)

cmd.Flag("receive.write-request-limits.max-concurrency",
"The maximum amount of remote write requests that will be concurrently processed while others wait."+
"The default is no limit, represented by 0.").
Default("0").Hidden().IntVar(&rc.writeRequestConcurrencyLimit)
rc.limitsConfig = extflag.RegisterPathOrContent(cmd, "receive.limits-config", "YAML file that contains limit configuration.", extflag.WithEnvSubstitution(), extflag.WithHidden())
}

// determineMode returns the ReceiverMode that this receiver is configured to run in.
Expand Down
71 changes: 59 additions & 12 deletions docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,68 @@ Thanos Receive has some limits and gates that can be configured to control resou
- **Limits**: if a request hits any configured limit the client will receive an error response from the server.
- **Gates**: if a request hits a gate without capacity it will wait until the gate's capacity is replenished to be processed. It doesn't trigger an error response from the server.

To configure the gates and limits you can use one of the two options:

- `--receive.limits-config-file=<file-path>`: where `<file-path>` is the path to the YAML file.
- `--receive.limits-config=<content>`: where `<content>` is the content of YAML file.

By default all the limits and gates are **disabled**.

### Understanding the configuration file

The configuration file follows a few standards:

1. The value `0` (zero) is used to explicitly define "there is no limit" (infinite limit).
2. In the configuration of default limits (in the `default` section) or global limits (in the `global` section), a value that is not present means "no limit".
3. In the configuration of per tenant limits (in the `tenants` section), a value that is not present means they are the same as the default.

All the configuration for the remote write endpoint of Receive is contained in the `write` key. Inside it there are 3 subsections:

- `global`: limits and/or gates that are applied considering all the requests.
- `default`: the default values for limits in case a given tenant doesn't have any specified.
- `tenants`: the limits for a given tenant.

From the example configuration below, it's understood that:

1. This Receive instance has a max concurrency of 30.
2. This Receive instance has some default request limits that apply of all tenants, **unless** a given tenant has their own limits (i.e. the `acme` tenant and partially for the `ajax` tenant).
3. Tenant `acme` has no request limits.
4. Tenant `ajax` has a request series limit of 50000 and samples limit of 500. Their request size bytes limit is inherited from the default, 1024 bytes.

The next sections explain what each configuration value means.

```yaml mdox-exec="cat pkg/receive/testdata/limits_config/good_limits.yaml"
write:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idea from Community Hours: generalize on any label - not only scope to tenant labels.

This allows us to keep tenancy topic separate.

Perhaps this can help with other use cases 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make it clear for potential reviewers, from what I understood in the community hours, this PR and be reviewed as merged as is. The feature is documented as experimental and thus we can introduce changes to the configuration file and behavior in case we see fit, especially after the planned discussion regarding tenancy in Thanos.

global:
max_concurrency: 30
default:
request:
size_bytes_limit: 1024
series_limit: 1000
samples_limit: 10
tenants:
acme:
request:
size_bytes_limit: 0
series_limit: 0
samples_limit: 0
ajax:
request:
series_limit: 50000
samples_limit: 500
```

**IMPORTANT**: this feature is experimental and a work-in-progres. It might change in the near future, i.e. configuration might move to a file (to allow easy configuration of different request limits per tenant) or its structure could change.

### Request limits
### Remote write request limits

Thanos Receive supports setting limits on the incoming remote write request sizes. These limits should help you to prevent a single tenant from being able to send big requests and possibly crash the Receive.

These limits are applied per request and can be configured with the following command line arguments:
These limits are applied per request and can be configured within the `request` key:

- `--receive.write-request-limits.max-size-bytes`: the maximum body size.
- `--receive.write-request-limits.max-series`: the maximum amount of series in a single remote write request.
- `--receive.write-request-limits.max-samples`: the maximum amount of samples in a single remote write request (summed from all series).
- `size_bytes_limit`: the maximum body size.
- `series_limit`: the maximum amount of series in a single remote write request.
- `samples_limit`: the maximum amount of samples in a single remote write request (summed from all series).

Any request above these limits will cause an 413 HTTP response (*Entity Too Large*) and should not be retried without modifications.

Expand All @@ -105,15 +156,11 @@ Future work that can improve this scenario:
- Proper handling of 413 responses in clients, given Receive can somehow communicate which limit was reached.
- Including in the 413 response which are the current limits that apply to the tenant.

By default all these limits are disabled.

## Request gates

The available request gates in Thanos Receive can be configured with the following command line arguments:
### Remote write request gates

- `--receive.write-request-limits.max-concurrency`: the maximum amount of remote write requests that will be concurrently worked on. Any request request that would exceed this limit will be accepted, but wait until the gate allows it to be processed.
The available request gates in Thanos Receive can be configured within the `global` key:

By default all gates are disabled.
- `max_concurrency`: the maximum amount of remote write requests that will be concurrently worked on. Any request request that would exceed this limit will be accepted, but wait until the gate allows it to be processed.

## Active Series Limiting (experimental)

Expand Down
81 changes: 31 additions & 50 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
extflag "github.com/efficientgo/tools/extkingpin"
"github.com/thanos-io/thanos/pkg/api"
statusapi "github.com/thanos-io/thanos/pkg/api/status"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/httpconfig"
"github.com/thanos-io/thanos/pkg/logging"
"github.com/thanos-io/thanos/pkg/promclient"
Expand Down Expand Up @@ -89,31 +87,28 @@ var (

// Options for the web Handler.
type Options struct {
Writer *Writer
ListenAddress string
Registry *prometheus.Registry
TenantHeader string
TenantField string
DefaultTenantID string
ReplicaHeader string
Endpoint string
ReplicationFactor uint64
ReceiverMode ReceiverMode
Tracer opentracing.Tracer
TLSConfig *tls.Config
DialOpts []grpc.DialOption
ForwardTimeout time.Duration
RelabelConfigs []*relabel.Config
TSDBStats TSDBStats
SeriesLimitSupported bool
MaxPerTenantLimit uint64
MetaMonitoringUrl *url.URL
MetaMonitoringHttpClient *extflag.PathOrContent
MetaMonitoringLimitQuery string
WriteSeriesLimit int64
WriteSamplesLimit int64
WriteRequestSizeLimit int64
WriteRequestConcurrencyLimit int
Writer *Writer
ListenAddress string
Registry *prometheus.Registry
TenantHeader string
TenantField string
DefaultTenantID string
ReplicaHeader string
Endpoint string
ReplicationFactor uint64
ReceiverMode ReceiverMode
Tracer opentracing.Tracer
TLSConfig *tls.Config
DialOpts []grpc.DialOption
ForwardTimeout time.Duration
RelabelConfigs []*relabel.Config
TSDBStats TSDBStats
LimitsConfig *RootLimitsConfig
SeriesLimitSupported bool
MaxPerTenantLimit uint64
MetaMonitoringUrl *url.URL
MetaMonitoringHttpClient *extflag.PathOrContent
MetaMonitoringLimitQuery string
}

// activeSeriesLimiter encompasses active series limiting logic.
Expand Down Expand Up @@ -145,8 +140,7 @@ type Handler struct {
writeSamplesTotal *prometheus.HistogramVec
writeTimeseriesTotal *prometheus.HistogramVec

writeGate gate.Gate
requestLimiter requestLimiter
limiter *limiter
}

func NewHandler(logger log.Logger, o *Options) *Handler {
Expand All @@ -172,13 +166,7 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
Max: 30 * time.Second,
Jitter: true,
},
writeGate: gate.NewNoop(),
requestLimiter: newRequestLimiter(
o.WriteRequestSizeLimit,
o.WriteSeriesLimit,
o.WriteSamplesLimit,
registerer,
),
limiter: newLimiter(o.LimitsConfig, registerer),
forwardRequests: promauto.With(registerer).NewCounterVec(
prometheus.CounterOpts{
Name: "thanos_receive_forward_requests_total",
Expand Down Expand Up @@ -217,13 +205,6 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
),
}

if o.WriteRequestConcurrencyLimit > 0 {
h.writeGate = gate.New(
extprom.WrapRegistererWithPrefix("thanos_receive_write_request_concurrent_", registerer),
o.WriteRequestConcurrencyLimit,
)
}

h.forwardRequests.WithLabelValues(labelSuccess)
h.forwardRequests.WithLabelValues(labelError)
h.replications.WithLabelValues(labelSuccess)
Expand Down Expand Up @@ -442,15 +423,14 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
tLogger := log.With(h.logger, "tenant", tenant)

tracing.DoInSpan(r.Context(), "receive_write_gate_ismyturn", func(ctx context.Context) {
err = h.writeGate.Start(r.Context())
err = h.limiter.writeGate.Start(r.Context())
})
if err != nil {
level.Error(tLogger).Log("err", err, "msg", "internal server error")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

defer h.writeGate.Done()
defer h.limiter.writeGate.Done()

under, err := h.ActiveSeriesLimit.isUnderLimit(tenant, tLogger)
if err != nil {
Expand All @@ -463,11 +443,12 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
return
}

requestLimiter := h.limiter.requestLimiter
// io.ReadAll dynamically adjust the byte slice for read data, starting from 512B.
// Since this is receive hot path, grow upfront saving allocations and CPU time.
compressed := bytes.Buffer{}
if r.ContentLength >= 0 {
if !h.requestLimiter.AllowSizeBytes(tenant, r.ContentLength) {
if !requestLimiter.AllowSizeBytes(tenant, r.ContentLength) {
http.Error(w, "write request too large", http.StatusRequestEntityTooLarge)
return
}
Expand All @@ -487,7 +468,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
return
}

if !h.requestLimiter.AllowSizeBytes(tenant, int64(len(reqBuf))) {
if !requestLimiter.AllowSizeBytes(tenant, int64(len(reqBuf))) {
http.Error(w, "write request too large", http.StatusRequestEntityTooLarge)
return
}
Expand Down Expand Up @@ -523,7 +504,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
return
}

if !h.requestLimiter.AllowSeries(tenant, int64(len(wreq.Timeseries))) {
if !requestLimiter.AllowSeries(tenant, int64(len(wreq.Timeseries))) {
http.Error(w, "too many timeseries", http.StatusRequestEntityTooLarge)
return
}
Expand All @@ -532,7 +513,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
for _, timeseries := range wreq.Timeseries {
totalSamples += len(timeseries.Samples)
}
if !h.requestLimiter.AllowSamples(tenant, int64(totalSamples)) {
if !requestLimiter.AllowSamples(tenant, int64(totalSamples)) {
http.Error(w, "too many samples", http.StatusRequestEntityTooLarge)
return
}
Expand Down
Loading