Skip to content

Commit

Permalink
Distributor: Don't use global logger in push handlers (#6652)
Browse files Browse the repository at this point in the history
Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
  • Loading branch information
aknuds1 committed Nov 15, 2023
1 parent 990f857 commit 6a24a60
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 35 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
* [ENHANCEMENT] Server: Add `-server.http-log-closed-connections-without-response-enabled` option to log details about connections to HTTP server that were closed before any data was sent back. This can happen if client doesn't manage to send complete HTTP headers before timeout. #6612
* [ENHANCEMENT] Query-frontend: include length of query, time since the earliest and latest points of a query, time since the earliest and latest points of a query, cached/uncached bytes in "query stats" logs. Time parameters (start/end/time) are always formatted as RFC3339 now. #6473 #6477
* [ENHANCEMENT] Distributor: added support for reducing the resolution of native histogram samples upon ingestion if the sample has too many buckets compared to `-validation.max-native-histogram-buckets`. This is enabled by default and can be turned off by setting `-validation.reduce-native-histogram-over-max-buckets` to `false`. #6535
* [ENHANCEMENT] Distributor: Include source IPs in OTLP push handler logs. #6652
* [BUGFIX] Distributor: return server overload error in the event of exceeding the ingestion rate limit. #6549
* [BUGFIX] Ring: Ensure network addresses used for component hash rings are formatted correctly when using IPv6. #6068
* [BUGFIX] Query-scheduler: don't retain connections from queriers that have shut down, leading to gradually increasing enqueue latency over time. #6100 #6145
Expand Down
4 changes: 2 additions & 2 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,8 @@ const OTLPPushEndpoint = "/otlp/v1/metrics"
func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config, reg prometheus.Registerer, limits *validation.Overrides) {
distributorpb.RegisterDistributorServer(a.server.GRPC, d)

a.RegisterRoute(PrometheusPushEndpoint, distributor.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, limits, pushConfig.RetryConfig, d.PushWithMiddlewares), true, false, "POST")
a.RegisterRoute(OTLPPushEndpoint, distributor.OTLPHandler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, a.cfg.EnableOtelMetadataStorage, limits, pushConfig.RetryConfig, reg, d.PushWithMiddlewares), true, false, "POST")
a.RegisterRoute(PrometheusPushEndpoint, distributor.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, limits, pushConfig.RetryConfig, d.PushWithMiddlewares, a.logger), true, false, "POST")
a.RegisterRoute(OTLPPushEndpoint, distributor.OTLPHandler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, a.cfg.EnableOtelMetadataStorage, limits, pushConfig.RetryConfig, reg, d.PushWithMiddlewares, a.logger), true, false, "POST")

a.indexPage.AddLinks(defaultWeight, "Distributor", []IndexPageLink{
{Desc: "Ring status", Path: "/distributor/ring"},
Expand Down
24 changes: 11 additions & 13 deletions pkg/distributor/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"net/http"
"time"

kitlog "github.com/go-kit/log"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/middleware"
Expand All @@ -27,7 +27,6 @@ import (

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/log"
"github.com/grafana/mimir/pkg/util/spanlogger"
"github.com/grafana/mimir/pkg/util/validation"
)
Expand All @@ -49,14 +48,13 @@ func OTLPHandler(
retryCfg RetryConfig,
reg prometheus.Registerer,
push PushFunc,
logger log.Logger,
) http.Handler {
discardedDueToOtelParseError := validation.DiscardedSamplesCounter(reg, otelParseError)

return handler(maxRecvMsgSize, sourceIPs, allowSkipLabelNameValidation, limits, retryCfg, push, func(ctx context.Context, r *http.Request, maxRecvMsgSize int, dst []byte, req *mimirpb.PreallocWriteRequest) ([]byte, error) {
return handler(maxRecvMsgSize, sourceIPs, allowSkipLabelNameValidation, limits, retryCfg, push, logger, func(ctx context.Context, r *http.Request, maxRecvMsgSize int, dst []byte, req *mimirpb.PreallocWriteRequest, logger log.Logger) ([]byte, error) {
var decoderFunc func(buf []byte) (pmetricotlp.ExportRequest, error)

logger := log.WithContext(ctx, log.Logger)

contentType := r.Header.Get("Content-Type")
switch contentType {
case pbContentType:
Expand Down Expand Up @@ -115,19 +113,19 @@ func OTLPHandler(
return body, err
}

log, ctx := spanlogger.NewWithLogger(ctx, logger, "Distributor.OTLPHandler.decodeAndConvert")
defer log.Span.Finish()
spanLogger, ctx := spanlogger.NewWithLogger(ctx, logger, "Distributor.OTLPHandler.decodeAndConvert")
defer spanLogger.Span.Finish()

log.SetTag("content_type", contentType)
log.SetTag("content_encoding", contentEncoding)
log.SetTag("content_length", r.ContentLength)
spanLogger.SetTag("content_type", contentType)
spanLogger.SetTag("content_encoding", contentEncoding)
spanLogger.SetTag("content_length", r.ContentLength)

otlpReq, err := decoderFunc(body)
if err != nil {
return body, err
}

level.Debug(log).Log("msg", "decoding complete, starting conversion")
level.Debug(spanLogger).Log("msg", "decoding complete, starting conversion")

metrics, err := otelMetricsToTimeseries(ctx, discardedDueToOtelParseError, logger, otlpReq.Metrics())
if err != nil {
Expand All @@ -145,7 +143,7 @@ func OTLPHandler(
exemplarCount += len(m.Exemplars)
}

level.Debug(log).Log(
level.Debug(spanLogger).Log(
"msg", "OTLP to Prometheus conversion complete",
"metric_count", metricCount,
"sample_count", sampleCount,
Expand Down Expand Up @@ -217,7 +215,7 @@ func otelMetricsToMetadata(md pmetric.Metrics) []*mimirpb.MetricMetadata {

}

func otelMetricsToTimeseries(ctx context.Context, discardedDueToOtelParseError *prometheus.CounterVec, logger kitlog.Logger, md pmetric.Metrics) ([]mimirpb.PreallocTimeseries, error) {
func otelMetricsToTimeseries(ctx context.Context, discardedDueToOtelParseError *prometheus.CounterVec, logger log.Logger, md pmetric.Metrics) ([]mimirpb.PreallocTimeseries, error) {
tsMap, errs := prometheusremotewrite.FromMetrics(md, prometheusremotewrite.Settings{})

if errs != nil {
Expand Down
15 changes: 9 additions & 6 deletions pkg/distributor/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"strconv"
"sync"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/httpgrpc/server"
Expand All @@ -25,15 +26,15 @@ import (
"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/globalerror"
"github.com/grafana/mimir/pkg/util/log"
utillog "github.com/grafana/mimir/pkg/util/log"
"github.com/grafana/mimir/pkg/util/validation"
)

// PushFunc defines the type of the push. It is similar to http.HandlerFunc.
type PushFunc func(ctx context.Context, req *Request) error

// parserFunc defines how to read the body the request from an HTTP request
type parserFunc func(ctx context.Context, r *http.Request, maxSize int, buffer []byte, req *mimirpb.PreallocWriteRequest) ([]byte, error)
type parserFunc func(ctx context.Context, r *http.Request, maxSize int, buffer []byte, req *mimirpb.PreallocWriteRequest, logger log.Logger) ([]byte, error)

// Wrap a slice in a struct so we can store a pointer in sync.Pool
type bufHolder struct {
Expand Down Expand Up @@ -84,8 +85,9 @@ func Handler(
limits *validation.Overrides,
retryCfg RetryConfig,
push PushFunc,
logger log.Logger,
) http.Handler {
return handler(maxRecvMsgSize, sourceIPs, allowSkipLabelNameValidation, limits, retryCfg, push, func(ctx context.Context, r *http.Request, maxRecvMsgSize int, dst []byte, req *mimirpb.PreallocWriteRequest) ([]byte, error) {
return handler(maxRecvMsgSize, sourceIPs, allowSkipLabelNameValidation, limits, retryCfg, push, logger, func(ctx context.Context, r *http.Request, maxRecvMsgSize int, dst []byte, req *mimirpb.PreallocWriteRequest, _ log.Logger) ([]byte, error) {
res, err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, dst, req, util.RawSnappy)
if errors.Is(err, util.MsgSizeTooLargeErr{}) {
err = distributorMaxWriteMessageSizeErr{actual: int(r.ContentLength), limit: maxRecvMsgSize}
Expand Down Expand Up @@ -113,22 +115,23 @@ func handler(
limits *validation.Overrides,
retryCfg RetryConfig,
push PushFunc,
logger log.Logger,
parser parserFunc,
) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
logger := log.WithContext(ctx, log.Logger)
logger := utillog.WithContext(ctx, logger)
if sourceIPs != nil {
source := sourceIPs.Get(r)
if source != "" {
ctx = util.AddSourceIPsToOutgoingContext(ctx, source)
logger = log.WithSourceIPs(source, logger)
logger = utillog.WithSourceIPs(source, logger)
}
}
supplier := func() (*mimirpb.WriteRequest, func(), error) {
bufHolder := bufferPool.Get().(*bufHolder)
var req mimirpb.PreallocWriteRequest
buf, err := parser(ctx, r, maxRecvMsgSize, bufHolder.buf, &req)
buf, err := parser(ctx, r, maxRecvMsgSize, bufHolder.buf, &req, logger)
if err != nil {
// Check for httpgrpc error, default to client error if parsing failed
if _, ok := httpgrpc.HTTPResponseFromError(err); !ok {
Expand Down
29 changes: 15 additions & 14 deletions pkg/distributor/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"testing"
"time"

"github.com/go-kit/log"
"github.com/golang/snappy"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/httpgrpc"
Expand All @@ -43,7 +44,7 @@ import (
func TestHandler_remoteWrite(t *testing.T) {
req := createRequest(t, createPrometheusRemoteWriteProtobuf(t))
resp := httptest.NewRecorder()
handler := Handler(100000, nil, false, nil, RetryConfig{}, verifyWritePushFunc(t, mimirpb.API))
handler := Handler(100000, nil, false, nil, RetryConfig{}, verifyWritePushFunc(t, mimirpb.API), log.NewNopLogger())
handler.ServeHTTP(resp, req)
assert.Equal(t, 200, resp.Code)
}
Expand Down Expand Up @@ -267,7 +268,7 @@ func TestHandlerOTLPPush(t *testing.T) {
req.Header.Set("Content-Encoding", tt.encoding)
}

handler := OTLPHandler(tt.maxMsgSize, nil, false, tt.enableOtelMetadataStorage, nil, RetryConfig{}, nil, tt.verifyFunc)
handler := OTLPHandler(tt.maxMsgSize, nil, false, tt.enableOtelMetadataStorage, nil, RetryConfig{}, nil, tt.verifyFunc, log.NewNopLogger())

resp := httptest.NewRecorder()
handler.ServeHTTP(resp, req)
Expand Down Expand Up @@ -329,7 +330,7 @@ func TestHandler_otlpDroppedMetricsPanic(t *testing.T) {
assert.False(t, request.SkipLabelNameValidation)
pushReq.CleanUp()
return nil
})
}, log.NewNopLogger())
handler.ServeHTTP(resp, req)
assert.Equal(t, 200, resp.Code)
}
Expand Down Expand Up @@ -369,7 +370,7 @@ func TestHandler_otlpDroppedMetricsPanic2(t *testing.T) {
assert.False(t, request.SkipLabelNameValidation)
pushReq.CleanUp()
return nil
})
}, log.NewNopLogger())
handler.ServeHTTP(resp, req)
assert.Equal(t, 200, resp.Code)

Expand All @@ -395,7 +396,7 @@ func TestHandler_otlpDroppedMetricsPanic2(t *testing.T) {
assert.False(t, request.SkipLabelNameValidation)
pushReq.CleanUp()
return nil
})
}, log.NewNopLogger())
handler.ServeHTTP(resp, req)
assert.Equal(t, 200, resp.Code)
}
Expand All @@ -417,7 +418,7 @@ func TestHandler_otlpWriteRequestTooBigWithCompression(t *testing.T) {

resp := httptest.NewRecorder()

handler := OTLPHandler(140, nil, false, true, nil, RetryConfig{}, nil, readBodyPushFunc(t))
handler := OTLPHandler(140, nil, false, true, nil, RetryConfig{}, nil, readBodyPushFunc(t), log.NewNopLogger())
handler.ServeHTTP(resp, req)
assert.Equal(t, http.StatusRequestEntityTooLarge, resp.Code)
body, err := io.ReadAll(resp.Body)
Expand All @@ -429,7 +430,7 @@ func TestHandler_mimirWriteRequest(t *testing.T) {
req := createRequest(t, createMimirWriteRequestProtobuf(t, false))
resp := httptest.NewRecorder()
sourceIPs, _ := middleware.NewSourceIPs("SomeField", "(.*)")
handler := Handler(100000, sourceIPs, false, nil, RetryConfig{}, verifyWritePushFunc(t, mimirpb.RULE))
handler := Handler(100000, sourceIPs, false, nil, RetryConfig{}, verifyWritePushFunc(t, mimirpb.RULE), log.NewNopLogger())
handler.ServeHTTP(resp, req)
assert.Equal(t, 200, resp.Code)
}
Expand All @@ -441,7 +442,7 @@ func TestHandler_contextCanceledRequest(t *testing.T) {
handler := Handler(100000, sourceIPs, false, nil, RetryConfig{}, func(_ context.Context, req *Request) error {
defer req.CleanUp()
return fmt.Errorf("the request failed: %w", context.Canceled)
})
}, log.NewNopLogger())
handler.ServeHTTP(resp, req)
assert.Equal(t, 499, resp.Code)
}
Expand Down Expand Up @@ -547,7 +548,7 @@ func TestHandler_EnsureSkipLabelNameValidationBehaviour(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
resp := httptest.NewRecorder()
handler := Handler(100000, nil, tc.allowSkipLabelNameValidation, nil, RetryConfig{}, tc.verifyReqHandler)
handler := Handler(100000, nil, tc.allowSkipLabelNameValidation, nil, RetryConfig{}, tc.verifyReqHandler, log.NewNopLogger())
if !tc.includeAllowSkiplabelNameValidationHeader {
tc.req.Header.Set(SkipLabelNameValidationHeader, "true")
}
Expand Down Expand Up @@ -704,7 +705,7 @@ func BenchmarkPushHandler(b *testing.B) {
pushReq.CleanUp()
return nil
}
handler := Handler(100000, nil, false, nil, RetryConfig{}, pushFunc)
handler := Handler(100000, nil, false, nil, RetryConfig{}, pushFunc, log.NewNopLogger())
b.ResetTimer()
for iter := 0; iter < b.N; iter++ {
req.Body = bufCloser{Buffer: buf} // reset Body so it can be read each time round the loop
Expand Down Expand Up @@ -752,15 +753,15 @@ func TestHandler_ErrorTranslation(t *testing.T) {
}
for _, tc := range parserTestCases {
t.Run(tc.name, func(t *testing.T) {
parserFunc := func(context.Context, *http.Request, int, []byte, *mimirpb.PreallocWriteRequest) ([]byte, error) {
parserFunc := func(context.Context, *http.Request, int, []byte, *mimirpb.PreallocWriteRequest, log.Logger) ([]byte, error) {
return nil, tc.err
}
pushFunc := func(ctx context.Context, req *Request) error {
_, err := req.WriteRequest() // just read the body so we can trigger the parser
return err
}

h := handler(10, nil, false, nil, RetryConfig{}, pushFunc, parserFunc)
h := handler(10, nil, false, nil, RetryConfig{}, pushFunc, log.NewNopLogger(), parserFunc)

recorder := httptest.NewRecorder()
h.ServeHTTP(recorder, httptest.NewRequest(http.MethodPost, "/push", bufCloser{&bytes.Buffer{}}))
Expand Down Expand Up @@ -819,7 +820,7 @@ func TestHandler_ErrorTranslation(t *testing.T) {
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
parserFunc := func(context.Context, *http.Request, int, []byte, *mimirpb.PreallocWriteRequest) ([]byte, error) {
parserFunc := func(context.Context, *http.Request, int, []byte, *mimirpb.PreallocWriteRequest, log.Logger) ([]byte, error) {
return nil, nil
}
pushFunc := func(ctx context.Context, req *Request) error {
Expand All @@ -829,7 +830,7 @@ func TestHandler_ErrorTranslation(t *testing.T) {
}
return tc.err
}
h := handler(10, nil, false, nil, RetryConfig{}, pushFunc, parserFunc)
h := handler(10, nil, false, nil, RetryConfig{}, pushFunc, log.NewNopLogger(), parserFunc)
recorder := httptest.NewRecorder()
h.ServeHTTP(recorder, httptest.NewRequest(http.MethodPost, "/push", bufCloser{&bytes.Buffer{}}))

Expand Down

0 comments on commit 6a24a60

Please sign in to comment.