Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receive] Export metrics about remote write requests per tenant #5424

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
4e0a874
Add write metrics to Thanos Receive
douglascamata Jun 13, 2022
81d03c3
Let the middleware count inflight HTTP requests
douglascamata Jun 14, 2022
8878786
Update Receive write metrics type & definition
douglascamata Jun 15, 2022
48d7ef0
Put option back in its place to avoid big diff
douglascamata Jun 15, 2022
d33957f
Fetch tenant from headers instead of context
douglascamata Jun 15, 2022
33f4c6a
Delete unnecessary tenant parser middleware
douglascamata Jun 15, 2022
3254efa
Refactor & reuse code for HTTP instrumentation
douglascamata Jun 17, 2022
7f1cecb
Add missing copyright to some files
douglascamata Jun 17, 2022
51b6d21
Add changelog entry for Receive & new HTTP metrics
douglascamata Jun 20, 2022
fe750d1
Merge branch 'main' of https://github.com/thanos-io/thanos into dcama…
douglascamata Jun 20, 2022
643ffb9
Remove TODO added by accident
douglascamata Jun 20, 2022
607c4e9
Make error handling code shorter
douglascamata Jun 21, 2022
95a7a37
Make switch statement simpler
douglascamata Jun 21, 2022
3b3592b
Remove method label from timeseries' metrics
douglascamata Jun 21, 2022
2dd4592
Count samples of all series instead of each
douglascamata Jun 21, 2022
0141ea9
Remove in-flight requests metric
douglascamata Jun 21, 2022
493ede8
Change timeseries/samples metrics to histograms
douglascamata Jun 21, 2022
26600c1
Merge branch 'main' of https://github.com/thanos-io/thanos into dcama…
douglascamata Jun 21, 2022
c4662b5
Fix Prometheus registry for histograms
douglascamata Jun 21, 2022
77f7bf7
Fix comment in NewHandler functions
douglascamata Jun 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#5391](https://github.com/thanos-io/thanos/pull/5391) Receive: Add relabeling support.
- [#5408](https://github.com/thanos-io/thanos/pull/5391) Receive: Add support for consistent hashrings.
- [#5391](https://github.com/thanos-io/thanos/pull/5391) Receive: Implement api/v1/status/tsdb.
- [#5424](https://github.com/thanos-io/thanos/pull/5424) Receive: export metrics regarding size of remote write requests

### Changed

Expand Down
80 changes: 25 additions & 55 deletions pkg/extprom/http/instrument_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/uber/jaeger-client-go"
)
Expand All @@ -37,81 +36,43 @@ func NewNopInstrumentationMiddleware() InstrumentationMiddleware {
}

type defaultInstrumentationMiddleware struct {
requestDuration *prometheus.HistogramVec
requestSize *prometheus.SummaryVec
requestsTotal *prometheus.CounterVec
responseSize *prometheus.SummaryVec
metrics *defaultMetrics
}

// NewInstrumentationMiddleware provides default InstrumentationMiddleware.
// Passing nil as buckets uses the default buckets.
func NewInstrumentationMiddleware(reg prometheus.Registerer, buckets []float64) InstrumentationMiddleware {
if buckets == nil {
buckets = []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120, 240, 360, 720}
return &defaultInstrumentationMiddleware{
metrics: newDefaultMetrics(reg, buckets, []string{}),
}

ins := defaultInstrumentationMiddleware{
requestDuration: promauto.With(reg).NewHistogramVec(
prometheus.HistogramOpts{
Name: "http_request_duration_seconds",
Help: "Tracks the latencies for HTTP requests.",
Buckets: buckets,
},
[]string{"code", "handler", "method"},
),

requestSize: promauto.With(reg).NewSummaryVec(
prometheus.SummaryOpts{
Name: "http_request_size_bytes",
Help: "Tracks the size of HTTP requests.",
},
[]string{"code", "handler", "method"},
),

requestsTotal: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Name: "http_requests_total",
Help: "Tracks the number of HTTP requests.",
}, []string{"code", "handler", "method"},
),

responseSize: promauto.With(reg).NewSummaryVec(
prometheus.SummaryOpts{
Name: "http_response_size_bytes",
Help: "Tracks the size of HTTP responses.",
},
[]string{"code", "handler", "method"},
),
}
return &ins
}

// NewHandler wraps the given HTTP handler for instrumentation. It
// registers four metric collectors (if not already done) and reports HTTP
// metrics to the (newly or already) registered collectors: http_requests_total
// (CounterVec), http_request_duration_seconds (Histogram),
// http_request_size_bytes (Summary), http_response_size_bytes (Summary). Each
// has a constant label named "handler" with the provided handlerName as
// value. http_requests_total is a metric vector partitioned by HTTP method
// (label name "method") and HTTP status code (label name "code").
// http_request_size_bytes (Summary), http_response_size_bytes (Summary).
// Each has a constant label named "handler" with the provided handlerName as value.
func (ins *defaultInstrumentationMiddleware) NewHandler(handlerName string, handler http.Handler) http.HandlerFunc {
baseLabels := prometheus.Labels{"handler": handlerName}
return httpInstrumentationHandler(baseLabels, ins.metrics, handler)
}

func httpInstrumentationHandler(baseLabels prometheus.Labels, metrics *defaultMetrics, next http.Handler) http.HandlerFunc {
return promhttp.InstrumentHandlerRequestSize(
ins.requestSize.MustCurryWith(prometheus.Labels{"handler": handlerName}),
metrics.requestSize.MustCurryWith(baseLabels),
promhttp.InstrumentHandlerCounter(
ins.requestsTotal.MustCurryWith(prometheus.Labels{"handler": handlerName}),
metrics.requestsTotal.MustCurryWith(baseLabels),
promhttp.InstrumentHandlerResponseSize(
ins.responseSize.MustCurryWith(prometheus.Labels{"handler": handlerName}),
metrics.responseSize.MustCurryWith(baseLabels),
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
now := time.Now()

wd := &responseWriterDelegator{w: w}
handler.ServeHTTP(wd, r)
next.ServeHTTP(wd, r)

observer := ins.requestDuration.WithLabelValues(
wd.Status(),
handlerName,
strings.ToLower(r.Method),
)
requestLabels := prometheus.Labels{"code": wd.Status(), "method": strings.ToLower(r.Method)}
observer := metrics.requestDuration.MustCurryWith(baseLabels).With(requestLabels)
observer.Observe(time.Since(now).Seconds())

// If we find a tracingID we'll expose it as Exemplar.
Expand Down Expand Up @@ -164,3 +125,12 @@ func (wd *responseWriterDelegator) StatusCode() int {
func (wd *responseWriterDelegator) Status() string {
return fmt.Sprintf("%d", wd.StatusCode())
}

// NewInstrumentHandlerInflightTenant creates a middleware used to export the current amount of concurrent requests
// being handled. It has an optional tenant label whenever a tenant is present in the context.
func NewInstrumentHandlerInflightTenant(gaugeVec *prometheus.GaugeVec, tenantHeader string, next http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
tenant := r.Header.Get(tenantHeader)
promhttp.InstrumentHandlerInFlight(gaugeVec.With(prometheus.Labels{"tenant": tenant}), next).ServeHTTP(w, r)
}
}
41 changes: 41 additions & 0 deletions pkg/extprom/http/instrument_tenant_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package http

import (
"net/http"

"github.com/prometheus/client_golang/prometheus"
)

type tenantInstrumentationMiddleware struct {
metrics *defaultMetrics
tenantHeaderName string
}

// NewTenantInstrumentationMiddleware provides the same instrumentation as defaultInstrumentationMiddleware,
// but with a tenant label fetched from the given tenantHeaderName header.
// Passing nil as buckets uses the default buckets.
func NewTenantInstrumentationMiddleware(tenantHeaderName string, reg prometheus.Registerer, buckets []float64) InstrumentationMiddleware {
return &tenantInstrumentationMiddleware{
tenantHeaderName: tenantHeaderName,
metrics: newDefaultMetrics(reg, buckets, []string{"tenant"}),
}
}

// NewHandler wraps the given HTTP handler for instrumentation. It
// registers four metric collectors (if not already done) and reports HTTP
// metrics to the (newly or already) registered collectors: http_requests_total
// (CounterVec), http_request_duration_seconds (Histogram),
// http_request_size_bytes (Summary), http_response_size_bytes (Summary).
// Each has a constant label named "handler" with the provided handlerName as value.
func (ins *tenantInstrumentationMiddleware) NewHandler(handlerName string, next http.Handler) http.HandlerFunc {
tenantWrapper := func(w http.ResponseWriter, r *http.Request) {
tenant := r.Header.Get(ins.tenantHeaderName)
baseLabels := prometheus.Labels{"handler": handlerName, "tenant": tenant}
handlerStack := httpInstrumentationHandler(baseLabels, ins.metrics, next)
handlerStack.ServeHTTP(w, r)
}
return tenantWrapper
}
54 changes: 54 additions & 0 deletions pkg/extprom/http/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package http

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

type defaultMetrics struct {
requestDuration *prometheus.HistogramVec
requestSize *prometheus.SummaryVec
requestsTotal *prometheus.CounterVec
responseSize *prometheus.SummaryVec
}

func newDefaultMetrics(reg prometheus.Registerer, buckets []float64, extraLabels []string) *defaultMetrics {
if buckets == nil {
buckets = []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120, 240, 360, 720}
}

return &defaultMetrics{
requestDuration: promauto.With(reg).NewHistogramVec(
prometheus.HistogramOpts{
Name: "http_request_duration_seconds",
Help: "Tracks the latencies for HTTP requests.",
Buckets: buckets,
},
append([]string{"code", "handler", "method"}, extraLabels...),
),
requestSize: promauto.With(reg).NewSummaryVec(
prometheus.SummaryOpts{
Name: "http_request_size_bytes",
Help: "Tracks the size of HTTP requests.",
},
append([]string{"code", "handler", "method"}, extraLabels...),
),
requestsTotal: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Name: "http_requests_total",
Help: "Tracks the number of HTTP requests.",
},
append([]string{"code", "handler", "method"}, extraLabels...),
),
responseSize: promauto.With(reg).NewSummaryVec(
prometheus.SummaryOpts{
Name: "http_response_size_bytes",
Help: "Tracks the size of HTTP responses.",
},
append([]string{"code", "handler", "method"}, extraLabels...),
),
}
}
76 changes: 57 additions & 19 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ type Handler struct {
forwardRequests *prometheus.CounterVec
replications *prometheus.CounterVec
replicationFactor prometheus.Gauge

writeSamplesTotal *prometheus.HistogramVec
writeTimeseriesTotal *prometheus.HistogramVec
}

func NewHandler(logger log.Logger, o *Options) *Handler {
Expand Down Expand Up @@ -159,6 +162,24 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
Help: "The number of times to replicate incoming write requests.",
},
),
writeTimeseriesTotal: promauto.With(registerer).NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "thanos",
Subsystem: "receive",
Name: "write_timeseries",
Help: "The number of timeseries received in the incoming write requests.",
Buckets: []float64{10, 50, 100, 500, 1000, 5000, 10000},
}, []string{"code", "tenant"},
),
writeSamplesTotal: promauto.With(registerer).NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "thanos",
Subsystem: "receive",
Name: "write_samples",
Help: "The number of sampled received in the incoming write requests.",
Buckets: []float64{10, 50, 100, 500, 1000, 5000, 10000},
}, []string{"code", "tenant"},
),
}

h.forwardRequests.WithLabelValues(labelSuccess)
Expand All @@ -174,21 +195,34 @@ func NewHandler(logger log.Logger, o *Options) *Handler {

ins := extpromhttp.NewNopInstrumentationMiddleware()
if o.Registry != nil {
ins = extpromhttp.NewInstrumentationMiddleware(o.Registry,
ins = extpromhttp.NewTenantInstrumentationMiddleware(
o.TenantHeader,
o.Registry,
[]float64{0.001, 0.005, 0.01, 0.02, 0.03, 0.04, 0.05, 0.06, 0.07, 0.08, 0.09, 0.1, 0.25, 0.5, 0.75, 1, 2, 3, 4, 5},
)
}

readyf := h.testReady
instrf := func(name string, next func(w http.ResponseWriter, r *http.Request)) http.HandlerFunc {
next = ins.NewHandler(name, http.HandlerFunc(next))

if o.Tracer != nil {
next = tracing.HTTPMiddleware(o.Tracer, name, logger, http.HandlerFunc(next))
}
return next
}

h.router.Post("/api/v1/receive", instrf("receive", readyf(middleware.RequestID(http.HandlerFunc(h.receiveHTTP)))))
h.router.Post(
"/api/v1/receive",
instrf(
"receive",
readyf(
middleware.RequestID(
http.HandlerFunc(h.receiveHTTP),
),
),
),
)

statusAPI := statusapi.New(statusapi.Options{
GetStats: h.getStats,
Expand Down Expand Up @@ -409,26 +443,30 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
return
}

err = h.handleRequest(ctx, rep, tenant, &wreq)
if err != nil {
responseStatusCode := http.StatusOK
if err = h.handleRequest(ctx, rep, tenant, &wreq); err != nil {
level.Debug(tLogger).Log("msg", "failed to handle request", "err", err)
switch determineWriteErrorCause(err, 1) {
case errNotReady:
responseStatusCode = http.StatusServiceUnavailable
case errUnavailable:
responseStatusCode = http.StatusServiceUnavailable
case errConflict:
responseStatusCode = http.StatusConflict
case errBadReplica:
responseStatusCode = http.StatusBadRequest
default:
level.Error(tLogger).Log("err", err, "msg", "internal server error")
responseStatusCode = http.StatusInternalServerError
}
http.Error(w, err.Error(), responseStatusCode)
}

switch determineWriteErrorCause(err, 1) {
case nil:
return
case errNotReady:
http.Error(w, err.Error(), http.StatusServiceUnavailable)
case errUnavailable:
http.Error(w, err.Error(), http.StatusServiceUnavailable)
case errConflict:
http.Error(w, err.Error(), http.StatusConflict)
case errBadReplica:
http.Error(w, err.Error(), http.StatusBadRequest)
default:
level.Error(tLogger).Log("err", err, "msg", "internal server error")
http.Error(w, err.Error(), http.StatusInternalServerError)
h.writeTimeseriesTotal.WithLabelValues(strconv.Itoa(responseStatusCode), tenant).Observe(float64(len(wreq.Timeseries)))
totalSamples := 0
for _, timeseries := range wreq.Timeseries {
totalSamples += len(timeseries.Samples)
}
h.writeSamplesTotal.WithLabelValues(strconv.Itoa(responseStatusCode), tenant).Observe(float64(totalSamples))
}

// forward accepts a write request, batches its time series by
Expand Down