Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a _doc_count field for single-histogram metricsets #4647

Merged
merged 4 commits into from
Jan 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ https://github.com/elastic/apm-server/compare/7.11\...master[View commits]
==== Added
* Jaeger gRPC is now served over the same port as the Elastic APM agent protocol {pull}4618[4618]
* Support for reloading config in Fleet mode, gracefully stopping the HTTP server and starting a new one {pull}4623[4623]
* Add a `_doc_count` field to transaction histogram docs {pull}4647[4647]

[float]
==== Deprecated
Expand Down
12 changes: 12 additions & 0 deletions model/metricset.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ type Metricset struct {
Labels common.MapStr

// Samples holds the metrics in the set.
//
// If Samples holds a single histogram metric, then the sum of its Counts
// will be used to set a _doc_count field in the transformed beat.Event.
Samples []Sample

// TimeseriesInstanceID holds an optional identifier for the timeseries
Expand Down Expand Up @@ -163,6 +166,15 @@ func (me *Metricset) Transform(ctx context.Context, cfg *transform.Config) []bea
continue
}
}
if len(me.Samples) == 1 && len(me.Samples[0].Counts) > 0 {
// We have a single histogram metric; add a _doc_count field which holds the sum of counts.
// See https://www.elastic.co/guide/en/elasticsearch/reference/master/mapping-doc-count-field.html
var total int64
for _, count := range me.Samples[0].Counts {
total += count
}
fields.Put("_doc_count", total)
}

me.Metadata.Set(fields, me.Labels)

Expand Down
1 change: 1 addition & 0 deletions model/metricset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ func TestTransform(t *testing.T) {
},
},
},
"_doc_count": int64(6), // 1+2+3
},
},
Msg: "Payload with transaction duration.",
Expand Down
53 changes: 47 additions & 6 deletions systemtest/aggregation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
package systemtest_test

import (
"context"
"encoding/json"
"net/http"
"strings"
"testing"
"time"

Expand All @@ -30,6 +33,7 @@ import (
"github.com/elastic/apm-server/systemtest"
"github.com/elastic/apm-server/systemtest/apmservertest"
"github.com/elastic/apm-server/systemtest/estest"
"github.com/elastic/go-elasticsearch/v7/esapi"
)

func TestTransactionAggregation(t *testing.T) {
Expand All @@ -56,21 +60,58 @@ func TestTransactionAggregation(t *testing.T) {

// Send some transactions to the server to be aggregated.
tracer := srv.Tracer()
tx := tracer.StartTransaction("name", "backend")
req, _ := http.NewRequest("GET", "/", nil)
tx.Context.SetHTTPRequest(req)
tx.Duration = time.Second
tx.End()
for i, name := range []string{"abc", "def"} {
for j := 0; j < (i+1)*5; j++ {
tx := tracer.StartTransaction(name, "backend")
req, _ := http.NewRequest("GET", "/", nil)
tx.Context.SetHTTPRequest(req)
tx.Duration = time.Second
tx.End()
}
}
tracer.Flush(nil)

result := systemtest.Elasticsearch.ExpectDocs(t, "apm-*",
result := systemtest.Elasticsearch.ExpectMinDocs(t, 2, "apm-*",
estest.ExistsQuery{Field: "transaction.duration.histogram"},
)
systemtest.ApproveEvents(t, t.Name(), result.Hits.Hits, "@timestamp")

// Make sure apm-server.aggregation.txmetrics metrics are published. Metric values are unit tested.
doc := getBeatsMonitoringStats(t, srv, nil)
assert.True(t, gjson.GetBytes(doc.RawSource, "beats_stats.metrics.apm-server.aggregation.txmetrics").Exists())

// Make sure the _doc_count field is added such that aggregations return
// the appropriate per-bucket doc_count values.
result = estest.SearchResult{}
_, err = systemtest.Elasticsearch.Do(context.Background(), &esapi.SearchRequest{
Index: []string{"apm-*"},
Body: strings.NewReader(`{
"size": 0,
"query": {"exists":{"field":"transaction.duration.histogram"}},
"aggs": {
"transaction_names": {
"terms": {"field": "transaction.name"}
}
}
}
`),
}, &result)
require.NoError(t, err)
require.Contains(t, result.Aggregations, "transaction_names")

type aggregationBucket struct {
Key string `json:"key"`
DocCount int `json:"doc_count"`
}
var aggregationResult struct {
Buckets []aggregationBucket `json:"buckets"`
}
err = json.Unmarshal(result.Aggregations["transaction_names"], &aggregationResult)
require.NoError(t, err)
assert.Equal(t, []aggregationBucket{
{Key: "def", DocCount: 10},
{Key: "abc", DocCount: 5},
}, aggregationResult.Buckets)
}

func TestTransactionAggregationShutdown(t *testing.T) {
Expand Down
61 changes: 58 additions & 3 deletions systemtest/approvals/TestTransactionAggregation.approved.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"events": [
{
"@timestamp": "dynamic",
"_doc_count": 5,
"agent": {
"name": "go"
},
Expand Down Expand Up @@ -35,20 +36,74 @@
}
},
"timeseries": {
"instance": "systemtest:name:865d6816622184cd"
"instance": "systemtest:abc:29d4be3fbd7f200f"
},
"transaction": {
"duration": {
"histogram": {
"counts": [
1
5
],
"values": [
1003519
]
}
},
"name": "name",
"name": "abc",
"root": true,
"type": "backend"
}
},
{
"@timestamp": "dynamic",
"_doc_count": 10,
"agent": {
"name": "go"
},
"ecs": {
"version": "dynamic"
},
"event": {
"ingested": "dynamic",
"outcome": "unknown"
},
"host": {
"hostname": "beowulf",
"name": "beowulf"
},
"observer": {
"ephemeral_id": "dynamic",
"hostname": "dynamic",
"id": "dynamic",
"type": "apm-server",
"version": "dynamic",
"version_major": "dynamic"
},
"processor": {
"event": "metric",
"name": "metric"
},
"service": {
"name": "systemtest",
"node": {
"name": "beowulf"
}
},
"timeseries": {
"instance": "systemtest:def:41b636ee77bc8c1c"
},
"transaction": {
"duration": {
"histogram": {
"counts": [
10
],
"values": [
1003519
]
}
},
"name": "def",
"root": true,
"type": "backend"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"events": [
{
"@timestamp": "dynamic",
"_doc_count": 1,
"agent": {
"name": "go"
},
Expand Down
3 changes: 2 additions & 1 deletion systemtest/estest/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ func (r *SearchRequest) Do(ctx context.Context, out *SearchResult, opts ...Reque
}

type SearchResult struct {
Hits SearchHits `json:"hits"`
Hits SearchHits `json:"hits"`
Aggregations map[string]json.RawMessage `json:"aggregations"`
}

type SearchHits struct {
Expand Down