Skip to content

Commit

Permalink
Receive: allow unlimited head_series_limit tenants (thanos-io#6406)
Browse files Browse the repository at this point in the history
With this commit we now allow to configure tenants with unlimited active
series limit by setting the limit to `0`. Prior to this commit setting a
per tenant limit to `0` would cause the tenant to be unable to write any
metrics at all.

This fixes: thanos-io#6393

Signed-off-by: Jacob Baungard Hansen <jacobbaungard@redhat.com>
  • Loading branch information
jacobbaungard authored and pedro-stanaka committed Jun 27, 2023
1 parent f4e46d7 commit 78c3419
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#5548](https://github.com/thanos-io/thanos/pull/5548) Query: Add experimental support for load balancing across multiple Store endpoints.
- [#6148](https://github.com/thanos-io/thanos/pull/6148) Query-frontend: Add `traceID` to slow query detected log line.
- [#6153](https://github.com/thanos-io/thanos/pull/6153) Query-frontend: Add `remote_user` (from http basic auth) and `remote_addr` to slow query detected log line.
- [#6406](https://github.com/thanos-io/thanos/pull/6406) Receive: Allow tenants to be configured with unlimited active series by setting head_series_limit to 0.

### Fixed

Expand Down
2 changes: 1 addition & 1 deletion docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ Under `global`:
- `meta_monitoring_http_client`: Optional YAML field specifying HTTP client config for meta-monitoring.

Under `default` and per `tenant`:
- `head_series_limit`: Specifies the total number of active (head) series for any tenant, across all replicas (including data replication), allowed by Thanos Receive.
- `head_series_limit`: Specifies the total number of active (head) series for any tenant, across all replicas (including data replication), allowed by Thanos Receive. Set to 0 for unlimited.

NOTE:
- It is possible that Receive ingests more active series than the specified limit, as it relies on meta-monitoring, which may not have the latest data for current number of active series of a tenant at all times.
Expand Down
5 changes: 5 additions & 0 deletions pkg/receive/head_series_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ func (h *headSeriesLimit) isUnderLimit(tenant string) (bool, error) {
limit = h.defaultLimit
}

// If tenant limit is 0 we treat it as unlimited.
if limit == 0 {
return true, nil
}

if v >= float64(limit) {
level.Error(h.logger).Log("msg", "tenant above limit", "tenant", tenant, "currentSeries", v, "limit", limit)
h.limitedRequests.WithLabelValues(tenant).Inc()
Expand Down
8 changes: 7 additions & 1 deletion test/e2e/e2ethanos/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,7 @@ type ReceiveBuilder struct {
maxExemplars int
ingestion bool
limit int
tenantsLimits receive.TenantsWriteLimitsConfig
metaMonitoring string
metaMonitoringQuery string
hashringConfigs []receive.HashringConfig
Expand Down Expand Up @@ -562,9 +563,10 @@ func (r *ReceiveBuilder) WithRelabelConfigs(relabelConfigs []*relabel.Config) *R
return r
}

func (r *ReceiveBuilder) WithValidationEnabled(limit int, metaMonitoring string, query ...string) *ReceiveBuilder {
func (r *ReceiveBuilder) WithValidationEnabled(limit int, metaMonitoring string, tenantsLimits receive.TenantsWriteLimitsConfig, query ...string) *ReceiveBuilder {
r.limit = limit
r.metaMonitoring = metaMonitoring
r.tenantsLimits = tenantsLimits
if len(query) > 0 {
r.metaMonitoringQuery = query[0]
}
Expand Down Expand Up @@ -619,6 +621,10 @@ func (r *ReceiveBuilder) Init() *e2emon.InstrumentedRunnable {
},
}

if r.tenantsLimits != nil {
cfg.WriteLimits.TenantsLimits = r.tenantsLimits
}

b, err := yaml.Marshal(cfg)
if err != nil {
return &e2emon.InstrumentedRunnable{Runnable: e2e.NewFailedRunnable(r.Name(), errors.Wrapf(err, "generate limiting file: %v", hashring))}
Expand Down
49 changes: 42 additions & 7 deletions test/e2e/receive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,9 +813,13 @@ test_metric{a="2", b="2"} 1`)
},
}

i1Runnable := ingestor1.WithRouting(1, h).WithValidationEnabled(10, "http://"+meta.GetMonitoringRunnable().InternalEndpoint(e2edb.AccessPortName)).Init()
i2Runnable := ingestor2.WithRouting(1, h).WithValidationEnabled(10, "http://"+meta.GetMonitoringRunnable().InternalEndpoint(e2edb.AccessPortName)).Init()
i3Runnable := ingestor3.WithRouting(1, h).WithValidationEnabled(10, "http://"+meta.GetMonitoringRunnable().InternalEndpoint(e2edb.AccessPortName)).Init()
tenantsLimits := receive.TenantsWriteLimitsConfig{
"unlimited-tenant": receive.NewEmptyWriteLimitConfig().SetHeadSeriesLimit(0),
}

i1Runnable := ingestor1.WithRouting(1, h).WithValidationEnabled(10, "http://"+meta.GetMonitoringRunnable().InternalEndpoint(e2edb.AccessPortName), tenantsLimits).Init()
i2Runnable := ingestor2.WithRouting(1, h).WithValidationEnabled(10, "http://"+meta.GetMonitoringRunnable().InternalEndpoint(e2edb.AccessPortName), tenantsLimits).Init()
i3Runnable := ingestor3.WithRouting(1, h).WithValidationEnabled(10, "http://"+meta.GetMonitoringRunnable().InternalEndpoint(e2edb.AccessPortName), tenantsLimits).Init()

testutil.Ok(t, e2e.StartAndWaitReady(i1Runnable, i2Runnable, i3Runnable))

Expand All @@ -824,7 +828,7 @@ test_metric{a="2", b="2"} 1`)

testutil.Ok(t, querier.WaitSumMetricsWithOptions(e2emon.Equals(3), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics()))

// We run two avalanches, one tenant which exceeds the limit, and one tenant which remains under it.
// We run three avalanches, one tenant which exceeds the limit, one tenant which remains under it, and one for the unlimited tenant.

// Avalanche in this configuration, would send 5 requests each with 10 new timeseries.
// One request always fails due to TSDB not being ready for new tenant.
Expand Down Expand Up @@ -864,7 +868,26 @@ test_metric{a="2", b="2"} 1`)
TenantID: "under-tenant",
})

testutil.Ok(t, e2e.StartAndWaitReady(avalanche1, avalanche2))
// Avalanche in this configuration, would send 5 requests each with 10 new timeseries.
// One request always fails due to TSDB not being ready for new tenant.
// So without limiting we end up with 40 timeseries and 40 samples.
avalanche3 := e2ethanos.NewAvalanche(e, "avalanche-3",
e2ethanos.AvalancheOptions{
MetricCount: "10",
SeriesCount: "1",
MetricInterval: "30",
SeriesInterval: "3600",
ValueInterval: "3600",

RemoteURL: e2ethanos.RemoteWriteEndpoint(ingestor1.InternalEndpoint("remote-write")),
RemoteWriteInterval: "30s",
RemoteBatchSize: "10",
RemoteRequestCount: "5",

TenantID: "unlimited-tenant",
})

testutil.Ok(t, e2e.StartAndWaitReady(avalanche1, avalanche2, avalanche3))

// Here, 3/5 requests are failed due to limiting, as one request fails due to TSDB readiness and we ingest one initial request.
// 3 limited requests belong to the exceed-tenant.
Expand All @@ -876,7 +899,7 @@ test_metric{a="2", b="2"} 1`)
ingestor1Name := e.Name() + "-" + ingestor1.Name()
// Here for exceed-tenant we go above limit by 10, which results in 0 value.
queryWaitAndAssert(t, ctx, meta.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string {
return fmt.Sprintf("sum(prometheus_tsdb_head_series{tenant=\"exceed-tenant\"}) - on() thanos_receive_head_series_limit{instance=\"%s:8080\", job=\"receive-i1\"}", ingestor1Name)
return fmt.Sprintf("sum(prometheus_tsdb_head_series{tenant=\"exceed-tenant\"}) - on() thanos_receive_head_series_limit{instance=\"%s:8080\", job=\"receive-i1\", tenant=\"\"}", ingestor1Name)
}, time.Now, promclient.QueryOptions{
Deduplicate: true,
}, model.Vector{
Expand All @@ -888,7 +911,7 @@ test_metric{a="2", b="2"} 1`)

// For under-tenant we stay at -5, as we have only pushed 5 series.
queryWaitAndAssert(t, ctx, meta.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string {
return fmt.Sprintf("sum(prometheus_tsdb_head_series{tenant=\"under-tenant\"}) - on() thanos_receive_head_series_limit{instance=\"%s:8080\", job=\"receive-i1\"}", ingestor1Name)
return fmt.Sprintf("sum(prometheus_tsdb_head_series{tenant=\"under-tenant\"}) - on() thanos_receive_head_series_limit{instance=\"%s:8080\", job=\"receive-i1\", tenant=\"\"}", ingestor1Name)
}, time.Now, promclient.QueryOptions{
Deduplicate: true,
}, model.Vector{
Expand Down Expand Up @@ -918,6 +941,18 @@ test_metric{a="2", b="2"} 1`)
},
})

// Query meta-monitoring solution to assert that we have ingested some number of timeseries.
// Avalanche sometimes misses some requests due to TSDB readiness etc. In this case, as the
// limit is set to `0` we just want to make sure some timeseries are ingested.
queryWaitAndAssert(t, ctx, meta.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string { return "sum(prometheus_tsdb_head_series{tenant=\"unlimited-tenant\"}) >=bool 10" }, time.Now, promclient.QueryOptions{
Deduplicate: true,
}, model.Vector{
&model.Sample{
Metric: model.Metric{},
Value: model.SampleValue(1),
},
})

// Query meta-monitoring solution to assert that 3 requests were limited for exceed-tenant and none for under-tenant.
queryWaitAndAssert(t, ctx, meta.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string { return "thanos_receive_head_series_limited_requests_total" }, time.Now, promclient.QueryOptions{
Deduplicate: true,
Expand Down

0 comments on commit 78c3419

Please sign in to comment.