Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

trafficdump: add support for parsing OTEL requests #6108

Merged
merged 3 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@

### Tools

* [ENHANCEMENT] trafficdump: Trafficdump can now parse OTEL requests. Entire request is dumped to output, there's no filtering of fields or matching of series done. #6108

## 2.10.0

### Grafana Mimir
Expand Down
3 changes: 3 additions & 0 deletions tools/trafficdump/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ import (
)

func main() {
// Clean up all flags registered via init() methods of 3rd-party libraries.
flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ExitOnError)

parser := &parser{}

parser.RegisterFlags(flag.CommandLine)
Expand Down
10 changes: 8 additions & 2 deletions tools/trafficdump/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/prometheus/prometheus/model/labels"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"

"github.com/grafana/mimir/pkg/mimirpb"
)
Expand Down Expand Up @@ -42,7 +43,9 @@ type request struct {
Form url.Values `json:"form,omitempty"`
RawBody string `json:"raw_body,omitempty"`

PushRequest *pushRequest `json:"push,omitempty"`
PushRequest any `json:"push,omitempty"`

cleanup func()
}

type requestURL struct {
Expand All @@ -57,8 +60,11 @@ type pushRequest struct {
Metadata []*mimirpb.MetricMetadata `json:"metadata,omitempty"`

Error string `json:"error,omitempty"`
}

cleanup func()
type otlpPushRequest struct {
pmetricotlp.ExportRequest
Error string `json:"error,omitempty"`
}

type timeseries struct {
Expand Down
90 changes: 81 additions & 9 deletions tools/trafficdump/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/grafana/regexp"
"github.com/prometheus/prometheus/model/labels"
promql_parser "github.com/prometheus/prometheus/promql/parser"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/util"
Expand Down Expand Up @@ -55,12 +56,12 @@ func (rp *parser) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&rp.pathRegexpStr, "path", "", "Output only requests matching this URL path (regex).")
f.StringVar(&rp.ignorePathRegexpStr, "ignore-path", "", "If not empty, and URL path matches this regex, request is ignored.")
f.StringVar(&rp.tenantRegexpStr, "tenant", "", "Output only requests for this tenant (regex).")
f.StringVar(&rp.metricSelector, "select", "", "If set, only output write requests that include series matching this selector. Used only when -request.decode-remote-write is enabled.")
f.StringVar(&rp.metricSelector, "select", "", "If set, only output write requests that include series matching this selector. Used only when -request.decode-remote-write is enabled. (Only works for Prometheus requests)")

f.BoolVar(&rp.requestHeaders, "request.headers", false, "Include request headers in the output")
f.BoolVar(&rp.requestRawBody, "request.raw-body", false, "Include raw request body in the output")
f.BoolVar(&rp.decodePush, "request.decode-remote-write", false, "Decode remote-write requests (only for POST requests that contain /push in the path)")
f.BoolVar(&rp.includeSamples, "request.samples", false, "Include samples in the output. Used only when -request.decode-remote-write is enabled.")
f.BoolVar(&rp.decodePush, "request.decode-remote-write", false, "Decode Prometheus or OTEL metrics requests. Only for POST requests that contain /push (Prometheus) or /metrics (OTEL) in the path")
f.BoolVar(&rp.includeSamples, "request.samples", false, "Include samples in the output. Used only when -request.decode-remote-write is enabled. (Only works with Prometheus requests)")

f.BoolVar(&rp.responseHeaders, "response.headers", false, "Include HTTP headers in the response")
f.BoolVar(&rp.responseRawBody, "response.raw-body", false, "Include raw body in the response")
Expand Down Expand Up @@ -133,7 +134,16 @@ func (rp *parser) processHTTPRequest(req *http.Request, body []byte) *request {

if rp.decodePush && req.Method == "POST" && strings.Contains(req.URL.Path, "/push") {
var matched bool
r.PushRequest, matched = rp.decodePushRequest(req, body, rp.matchers)
r.PushRequest, r.cleanup, matched = rp.decodePushRequest(req, body, rp.matchers)
if !matched {
r.ignored = true
}
}

// Support POST to /otlp/v1/metrics
if rp.decodePush && req.Method == "POST" && strings.Contains(req.URL.Path, "/metrics") {
var matched bool
r.PushRequest, matched = rp.decodeOTLPRequest(req, body)
if !matched {
r.ignored = true
}
Expand All @@ -151,7 +161,7 @@ var bufferPool = sync.Pool{
New: func() interface{} { return &bufHolder{buf: make([]byte, 256*1024)} },
}

func (rp *parser) decodePushRequest(req *http.Request, body []byte, matchers []*labels.Matcher) (*pushRequest, bool) {
func (rp *parser) decodePushRequest(req *http.Request, body []byte, matchers []*labels.Matcher) (*pushRequest, func(), bool) {
res := &pushRequest{Version: req.Header.Get("X-Prometheus-Remote-Write-Version")}

bufHolder := bufferPool.Get().(*bufHolder)
Expand All @@ -165,7 +175,7 @@ func (rp *parser) decodePushRequest(req *http.Request, body []byte, matchers []*
if err != nil {
cleanup()
res.Error = fmt.Errorf("failed to decode decodePush request: %s", err).Error()
return nil, true
return nil, nil, true
}

// If decoding allocated a bigger buffer, put that one back in the pool.
Expand All @@ -186,7 +196,7 @@ func (rp *parser) decodePushRequest(req *http.Request, body []byte, matchers []*

if !matched {
cleanup()
return nil, false
return nil, nil, false
}
}

Expand All @@ -208,8 +218,7 @@ func (rp *parser) decodePushRequest(req *http.Request, body []byte, matchers []*

res.Metadata = wr.Metadata

res.cleanup = cleanup
return res, true
return res, cleanup, true
}

func matches(lbls labels.Labels, matchers []*labels.Matcher) bool {
Expand Down Expand Up @@ -268,3 +277,66 @@ func (rp *parser) processHTTPResponse(resp *http.Response, body []byte) *respons

return &out
}

const (
pbContentType = "application/x-protobuf"
jsonContentType = "application/json"
)

func (rp *parser) decodeOTLPRequest(req *http.Request, body []byte) (*otlpPushRequest, bool) {
res := &otlpPushRequest{}

var decoderFunc func(buf []byte) (pmetricotlp.ExportRequest, error)

contentType := req.Header.Get("Content-Type")
switch contentType {
case pbContentType:
decoderFunc = func(buf []byte) (pmetricotlp.ExportRequest, error) {
req := pmetricotlp.NewExportRequest()
return req, req.UnmarshalProto(buf)
}

case jsonContentType:
decoderFunc = func(buf []byte) (pmetricotlp.ExportRequest, error) {
req := pmetricotlp.NewExportRequest()
return req, req.UnmarshalJSON(buf)
}

default:
res.Error = fmt.Sprintf("unsupported content type: %s", contentType)
return nil, false
}

// Handle compression.
contentEncoding := req.Header.Get("Content-Encoding")
switch contentEncoding {
case "gzip":
gr, err := gzip.NewReader(bytes.NewReader(body))
if err != nil {
res.Error = fmt.Sprintf("failed to decode gzip request: %v", err)
return nil, true
}

body, err = io.ReadAll(gr)
if err != nil {
res.Error = fmt.Sprintf("failed to decode gzip request: %v", err)
return nil, true
}

case "":
// No compression.

default:
res.Error = fmt.Sprintf("unsupported compression for otel: %s", contentEncoding)
return nil, true
}

var err error
res.ExportRequest, err = decoderFunc(body)
if err != nil {
res.Error = fmt.Sprintf("failed to decode request: %v", err)
return nil, true
}

return res, true
}
4 changes: 2 additions & 2 deletions tools/trafficdump/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ func (p *processor) run() {
}

func (p *processor) print(req *request, resp *response) {
if req != nil && req.PushRequest != nil && req.PushRequest.cleanup != nil {
defer req.PushRequest.cleanup()
if req != nil && req.cleanup != nil {
defer req.cleanup()
}

if req != nil && req.ignored {
Expand Down
Loading