diff --git a/CHANGELOG.md b/CHANGELOG.md index 91fd714f8b9..dfa13e2de89 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,6 +41,8 @@ ### Tools +* [ENHANCEMENT] trafficdump: Trafficdump can now parse OTEL requests. Entire request is dumped to output, there's no filtering of fields or matching of series done. #6108 + ## 2.10.0 ### Grafana Mimir diff --git a/tools/trafficdump/main.go b/tools/trafficdump/main.go index dd9c69ccc6c..5a8ac699575 100644 --- a/tools/trafficdump/main.go +++ b/tools/trafficdump/main.go @@ -29,6 +29,9 @@ import ( ) func main() { + // Clean up all flags registered via init() methods of 3rd-party libraries. + flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ExitOnError) + parser := &parser{} parser.RegisterFlags(flag.CommandLine) diff --git a/tools/trafficdump/model.go b/tools/trafficdump/model.go index c66bef222ad..fb2028c9a82 100644 --- a/tools/trafficdump/model.go +++ b/tools/trafficdump/model.go @@ -11,6 +11,7 @@ import ( "time" "github.com/prometheus/prometheus/model/labels" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" "github.com/grafana/mimir/pkg/mimirpb" ) @@ -42,7 +43,9 @@ type request struct { Form url.Values `json:"form,omitempty"` RawBody string `json:"raw_body,omitempty"` - PushRequest *pushRequest `json:"push,omitempty"` + PushRequest any `json:"push,omitempty"` + + cleanup func() } type requestURL struct { @@ -57,8 +60,11 @@ type pushRequest struct { Metadata []*mimirpb.MetricMetadata `json:"metadata,omitempty"` Error string `json:"error,omitempty"` +} - cleanup func() +type otlpPushRequest struct { + pmetricotlp.ExportRequest + Error string `json:"error,omitempty"` } type timeseries struct { diff --git a/tools/trafficdump/parser.go b/tools/trafficdump/parser.go index 9010498ea06..3899a0b4cc4 100644 --- a/tools/trafficdump/parser.go +++ b/tools/trafficdump/parser.go @@ -19,6 +19,7 @@ import ( "github.com/grafana/regexp" "github.com/prometheus/prometheus/model/labels" promql_parser "github.com/prometheus/prometheus/promql/parser" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/util" @@ -55,12 +56,12 @@ func (rp *parser) RegisterFlags(f *flag.FlagSet) { f.StringVar(&rp.pathRegexpStr, "path", "", "Output only requests matching this URL path (regex).") f.StringVar(&rp.ignorePathRegexpStr, "ignore-path", "", "If not empty, and URL path matches this regex, request is ignored.") f.StringVar(&rp.tenantRegexpStr, "tenant", "", "Output only requests for this tenant (regex).") - f.StringVar(&rp.metricSelector, "select", "", "If set, only output write requests that include series matching this selector. Used only when -request.decode-remote-write is enabled.") + f.StringVar(&rp.metricSelector, "select", "", "If set, only output write requests that include series matching this selector. Used only when -request.decode-remote-write is enabled. (Only works for Prometheus requests)") f.BoolVar(&rp.requestHeaders, "request.headers", false, "Include request headers in the output") f.BoolVar(&rp.requestRawBody, "request.raw-body", false, "Include raw request body in the output") - f.BoolVar(&rp.decodePush, "request.decode-remote-write", false, "Decode remote-write requests (only for POST requests that contain /push in the path)") - f.BoolVar(&rp.includeSamples, "request.samples", false, "Include samples in the output. Used only when -request.decode-remote-write is enabled.") + f.BoolVar(&rp.decodePush, "request.decode-remote-write", false, "Decode Prometheus or OTEL metrics requests. Only for POST requests that contain /push (Prometheus) or /metrics (OTEL) in the path") + f.BoolVar(&rp.includeSamples, "request.samples", false, "Include samples in the output. Used only when -request.decode-remote-write is enabled. (Only works with Prometheus requests)") f.BoolVar(&rp.responseHeaders, "response.headers", false, "Include HTTP headers in the response") f.BoolVar(&rp.responseRawBody, "response.raw-body", false, "Include raw body in the response") @@ -133,7 +134,16 @@ func (rp *parser) processHTTPRequest(req *http.Request, body []byte) *request { if rp.decodePush && req.Method == "POST" && strings.Contains(req.URL.Path, "/push") { var matched bool - r.PushRequest, matched = rp.decodePushRequest(req, body, rp.matchers) + r.PushRequest, r.cleanup, matched = rp.decodePushRequest(req, body, rp.matchers) + if !matched { + r.ignored = true + } + } + + // Support POST to /otlp/v1/metrics + if rp.decodePush && req.Method == "POST" && strings.Contains(req.URL.Path, "/metrics") { + var matched bool + r.PushRequest, matched = rp.decodeOTLPRequest(req, body) if !matched { r.ignored = true } @@ -151,7 +161,7 @@ var bufferPool = sync.Pool{ New: func() interface{} { return &bufHolder{buf: make([]byte, 256*1024)} }, } -func (rp *parser) decodePushRequest(req *http.Request, body []byte, matchers []*labels.Matcher) (*pushRequest, bool) { +func (rp *parser) decodePushRequest(req *http.Request, body []byte, matchers []*labels.Matcher) (*pushRequest, func(), bool) { res := &pushRequest{Version: req.Header.Get("X-Prometheus-Remote-Write-Version")} bufHolder := bufferPool.Get().(*bufHolder) @@ -165,7 +175,7 @@ func (rp *parser) decodePushRequest(req *http.Request, body []byte, matchers []* if err != nil { cleanup() res.Error = fmt.Errorf("failed to decode decodePush request: %s", err).Error() - return nil, true + return nil, nil, true } // If decoding allocated a bigger buffer, put that one back in the pool. @@ -186,7 +196,7 @@ func (rp *parser) decodePushRequest(req *http.Request, body []byte, matchers []* if !matched { cleanup() - return nil, false + return nil, nil, false } } @@ -208,8 +218,7 @@ func (rp *parser) decodePushRequest(req *http.Request, body []byte, matchers []* res.Metadata = wr.Metadata - res.cleanup = cleanup - return res, true + return res, cleanup, true } func matches(lbls labels.Labels, matchers []*labels.Matcher) bool { @@ -268,3 +277,66 @@ func (rp *parser) processHTTPResponse(resp *http.Response, body []byte) *respons return &out } + +const ( + pbContentType = "application/x-protobuf" + jsonContentType = "application/json" +) + +func (rp *parser) decodeOTLPRequest(req *http.Request, body []byte) (*otlpPushRequest, bool) { + res := &otlpPushRequest{} + + var decoderFunc func(buf []byte) (pmetricotlp.ExportRequest, error) + + contentType := req.Header.Get("Content-Type") + switch contentType { + case pbContentType: + decoderFunc = func(buf []byte) (pmetricotlp.ExportRequest, error) { + req := pmetricotlp.NewExportRequest() + return req, req.UnmarshalProto(buf) + } + + case jsonContentType: + decoderFunc = func(buf []byte) (pmetricotlp.ExportRequest, error) { + req := pmetricotlp.NewExportRequest() + return req, req.UnmarshalJSON(buf) + } + + default: + res.Error = fmt.Sprintf("unsupported content type: %s", contentType) + return nil, false + } + + // Handle compression. + contentEncoding := req.Header.Get("Content-Encoding") + switch contentEncoding { + case "gzip": + gr, err := gzip.NewReader(bytes.NewReader(body)) + if err != nil { + res.Error = fmt.Sprintf("failed to decode gzip request: %v", err) + return nil, true + } + + body, err = io.ReadAll(gr) + if err != nil { + res.Error = fmt.Sprintf("failed to decode gzip request: %v", err) + return nil, true + } + + case "": + // No compression. + + default: + res.Error = fmt.Sprintf("unsupported compression for otel: %s", contentEncoding) + return nil, true + } + + var err error + res.ExportRequest, err = decoderFunc(body) + if err != nil { + res.Error = fmt.Sprintf("failed to decode request: %v", err) + return nil, true + } + + return res, true +} diff --git a/tools/trafficdump/processor.go b/tools/trafficdump/processor.go index dbd75c6797b..a12953f213d 100644 --- a/tools/trafficdump/processor.go +++ b/tools/trafficdump/processor.go @@ -58,8 +58,8 @@ func (p *processor) run() { } func (p *processor) print(req *request, resp *response) { - if req != nil && req.PushRequest != nil && req.PushRequest.cleanup != nil { - defer req.PushRequest.cleanup() + if req != nil && req.cleanup != nil { + defer req.cleanup() } if req != nil && req.ignored {