Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[otlp] Fix panic in dropped count (again!) #3538

Merged
merged 1 commit into from
Nov 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
* [BUGFIX] Query-frontend: properly close gRPC streams to the query-scheduler to stop memory and goroutines leak. #3302
* [BUGFIX] Ruler: persist evaluation delay configured in the rulegroup. #3392
* [BUGFIX] Ring status pages: show 100% ownership as "100%", not "1e+02%". #3435
* [BUGFIX] Fix panics in OTLP ingest path when parse errors exist. #3538

### Mixin

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ require (
go.opentelemetry.io/collector/pdata v0.54.0
go.opentelemetry.io/otel v1.10.0
go.opentelemetry.io/otel/trace v1.10.0
go.uber.org/multierr v1.8.0
golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8
google.golang.org/api v0.97.0
gopkg.in/alecthomas/kingpin.v2 v2.2.6
Expand Down Expand Up @@ -207,7 +208,6 @@ require (
go.opentelemetry.io/collector/semconv v0.54.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.36.0 // indirect
go.opentelemetry.io/otel/metric v0.32.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.21.0 // indirect
golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
Expand Down
12 changes: 2 additions & 10 deletions pkg/util/push/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
"go.uber.org/multierr"

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/util"
Expand Down Expand Up @@ -132,7 +133,7 @@ func otelMetricsToTimeseries(ctx context.Context, discardedDueToOtelParseError *
return nil, err
}

dropped := md.DataPointCount() - sampleCountInMap(tsMap)
dropped := len(multierr.Errors(errs))
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
discardedDueToOtelParseError.WithLabelValues(userID).Add(float64(dropped))

parseErrs := errs.Error()
Expand Down Expand Up @@ -228,12 +229,3 @@ func TimeseriesToOTLPRequest(timeseries []prompb.TimeSeries) pmetricotlp.Request

return pmetricotlp.NewRequestFromMetrics(d)
}

func sampleCountInMap(tsMap map[string]*prompb.TimeSeries) int {
count := 0
for _, ts := range tsMap {
count += len(ts.Samples)
}

return count
}
66 changes: 66 additions & 0 deletions pkg/util/push/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,72 @@ func TestHandler_otlpDroppedMetricsPanic(t *testing.T) {
assert.Equal(t, 200, resp.Code)
}

func TestHandler_otlpDroppedMetricsPanic2(t *testing.T) {
// After the above test, the panic occurred again.
// This test is to ensure that the panic is fixed for the new cases as well.

// First case is to make sure that target_info is counted correctly.
md := pmetric.NewMetrics()
const name = "foo"
attributes := pcommon.NewMap()
attributes.InsertString(model.MetricNameLabel, name)

resource1 := md.ResourceMetrics().AppendEmpty()
resource1.Resource().Attributes().InsertString("region", "us-central1")

metric1 := resource1.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
metric1.SetName(name)
metric1.SetDataType(pmetric.MetricDataTypeGauge)
datapoint1 := metric1.Gauge().DataPoints().AppendEmpty()
datapoint1.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
datapoint1.SetDoubleVal(0)
attributes.CopyTo(datapoint1.Attributes())
datapoint1.Attributes().InsertString("diff_label", "bar")

metric2 := resource1.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
metric2.SetName(name)
metric2.SetDataType(pmetric.MetricDataTypeGauge)

req := createOTLPRequest(t, pmetricotlp.NewRequestFromMetrics(md), false)
resp := httptest.NewRecorder()
handler := OTLPHandler(100000, nil, false, nil, func(ctx context.Context, pushReq *Request) (response *mimirpb.WriteResponse, err error) {
request, err := pushReq.WriteRequest()
assert.NoError(t, err)
assert.Len(t, request.Timeseries, 2)
assert.False(t, request.SkipLabelNameValidation)
pushReq.CleanUp()
return &mimirpb.WriteResponse{}, nil
})
handler.ServeHTTP(resp, req)
assert.Equal(t, 200, resp.Code)

// Second case is to make sure that histogram metrics are counted correctly.
metric3 := resource1.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
metric3.SetName("http_request_duration_seconds")
metric3.SetDataType(pmetric.MetricDataTypeHistogram)
metric3.Histogram().SetAggregationTemporality(pmetric.MetricAggregationTemporalityCumulative)
datapoint3 := metric3.Histogram().DataPoints().AppendEmpty()
datapoint3.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
datapoint3.SetCount(50)
datapoint3.SetSum(100)
datapoint3.SetMExplicitBounds([]float64{0.1, 0.2, 0.3, 0.4, 0.5})
datapoint3.SetMBucketCounts([]uint64{10, 20, 30, 40, 50})
attributes.CopyTo(datapoint3.Attributes())

req = createOTLPRequest(t, pmetricotlp.NewRequestFromMetrics(md), false)
resp = httptest.NewRecorder()
handler = OTLPHandler(100000, nil, false, nil, func(ctx context.Context, pushReq *Request) (response *mimirpb.WriteResponse, err error) {
request, err := pushReq.WriteRequest()
assert.NoError(t, err)
assert.Len(t, request.Timeseries, 10) // 6 buckets (including +Inf) + 2 sum/count + 2 from the first case
assert.False(t, request.SkipLabelNameValidation)
pushReq.CleanUp()
return &mimirpb.WriteResponse{}, nil
})
handler.ServeHTTP(resp, req)
assert.Equal(t, 200, resp.Code)
}

func TestHandler_otlpWriteWithCompression(t *testing.T) {
req := createOTLPRequest(t, createOTLPMetricRequest(t), true)
resp := httptest.NewRecorder()
Expand Down