Skip to content

Commit

Permalink
[otlp] Fix panic in dropped count (again!) (grafana#3538)
Browse files Browse the repository at this point in the history
This doesn't accurately count the dropped samples. For example
if a single metric with multiple samples is faulty, we get a single error
rather than an error per sample.

But I believe its the best best-effort measurement.

Before we used to do `DatapointCount() - samplesInMap()`

The problem is the following:
1. target_info is a synthetic metric added in Prometheus, so the final samples could higher.
2. A single histogram datapoint in OTLP corresponds to many samples in Prometheus.

Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>

Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
  • Loading branch information
gouthamve authored and replay committed Nov 28, 2022
1 parent 54344f2 commit 3744cb5
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
* [BUGFIX] Query-frontend: properly close gRPC streams to the query-scheduler to stop memory and goroutines leak. #3302
* [BUGFIX] Ruler: persist evaluation delay configured in the rulegroup. #3392
* [BUGFIX] Ring status pages: show 100% ownership as "100%", not "1e+02%". #3435
* [BUGFIX] Fix panics in OTLP ingest path when parse errors exist. #3538

### Mixin

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ require (
go.opentelemetry.io/collector/pdata v0.54.0
go.opentelemetry.io/otel v1.10.0
go.opentelemetry.io/otel/trace v1.10.0
go.uber.org/multierr v1.8.0
golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8
google.golang.org/api v0.97.0
gopkg.in/alecthomas/kingpin.v2 v2.2.6
Expand Down Expand Up @@ -207,7 +208,6 @@ require (
go.opentelemetry.io/collector/semconv v0.54.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.36.0 // indirect
go.opentelemetry.io/otel/metric v0.32.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.21.0 // indirect
golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
Expand Down
12 changes: 2 additions & 10 deletions pkg/util/push/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
"go.uber.org/multierr"

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/util"
Expand Down Expand Up @@ -132,7 +133,7 @@ func otelMetricsToTimeseries(ctx context.Context, discardedDueToOtelParseError *
return nil, err
}

dropped := md.DataPointCount() - sampleCountInMap(tsMap)
dropped := len(multierr.Errors(errs))
discardedDueToOtelParseError.WithLabelValues(userID).Add(float64(dropped))

parseErrs := errs.Error()
Expand Down Expand Up @@ -228,12 +229,3 @@ func TimeseriesToOTLPRequest(timeseries []prompb.TimeSeries) pmetricotlp.Request

return pmetricotlp.NewRequestFromMetrics(d)
}

func sampleCountInMap(tsMap map[string]*prompb.TimeSeries) int {
count := 0
for _, ts := range tsMap {
count += len(ts.Samples)
}

return count
}
66 changes: 66 additions & 0 deletions pkg/util/push/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,72 @@ func TestHandler_otlpDroppedMetricsPanic(t *testing.T) {
assert.Equal(t, 200, resp.Code)
}

func TestHandler_otlpDroppedMetricsPanic2(t *testing.T) {
// After the above test, the panic occurred again.
// This test is to ensure that the panic is fixed for the new cases as well.

// First case is to make sure that target_info is counted correctly.
md := pmetric.NewMetrics()
const name = "foo"
attributes := pcommon.NewMap()
attributes.InsertString(model.MetricNameLabel, name)

resource1 := md.ResourceMetrics().AppendEmpty()
resource1.Resource().Attributes().InsertString("region", "us-central1")

metric1 := resource1.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
metric1.SetName(name)
metric1.SetDataType(pmetric.MetricDataTypeGauge)
datapoint1 := metric1.Gauge().DataPoints().AppendEmpty()
datapoint1.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
datapoint1.SetDoubleVal(0)
attributes.CopyTo(datapoint1.Attributes())
datapoint1.Attributes().InsertString("diff_label", "bar")

metric2 := resource1.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
metric2.SetName(name)
metric2.SetDataType(pmetric.MetricDataTypeGauge)

req := createOTLPRequest(t, pmetricotlp.NewRequestFromMetrics(md), false)
resp := httptest.NewRecorder()
handler := OTLPHandler(100000, nil, false, nil, func(ctx context.Context, pushReq *Request) (response *mimirpb.WriteResponse, err error) {
request, err := pushReq.WriteRequest()
assert.NoError(t, err)
assert.Len(t, request.Timeseries, 2)
assert.False(t, request.SkipLabelNameValidation)
pushReq.CleanUp()
return &mimirpb.WriteResponse{}, nil
})
handler.ServeHTTP(resp, req)
assert.Equal(t, 200, resp.Code)

// Second case is to make sure that histogram metrics are counted correctly.
metric3 := resource1.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
metric3.SetName("http_request_duration_seconds")
metric3.SetDataType(pmetric.MetricDataTypeHistogram)
metric3.Histogram().SetAggregationTemporality(pmetric.MetricAggregationTemporalityCumulative)
datapoint3 := metric3.Histogram().DataPoints().AppendEmpty()
datapoint3.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
datapoint3.SetCount(50)
datapoint3.SetSum(100)
datapoint3.SetMExplicitBounds([]float64{0.1, 0.2, 0.3, 0.4, 0.5})
datapoint3.SetMBucketCounts([]uint64{10, 20, 30, 40, 50})
attributes.CopyTo(datapoint3.Attributes())

req = createOTLPRequest(t, pmetricotlp.NewRequestFromMetrics(md), false)
resp = httptest.NewRecorder()
handler = OTLPHandler(100000, nil, false, nil, func(ctx context.Context, pushReq *Request) (response *mimirpb.WriteResponse, err error) {
request, err := pushReq.WriteRequest()
assert.NoError(t, err)
assert.Len(t, request.Timeseries, 10) // 6 buckets (including +Inf) + 2 sum/count + 2 from the first case
assert.False(t, request.SkipLabelNameValidation)
pushReq.CleanUp()
return &mimirpb.WriteResponse{}, nil
})
handler.ServeHTTP(resp, req)
assert.Equal(t, 200, resp.Code)
}

func TestHandler_otlpWriteWithCompression(t *testing.T) {
req := createOTLPRequest(t, createOTLPMetricRequest(t), true)
resp := httptest.NewRecorder()
Expand Down

0 comments on commit 3744cb5

Please sign in to comment.