From a5397132c047ad78cbf2a7eb94fa2b9168470bdf Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Wed, 27 Jan 2021 16:53:29 +0800 Subject: [PATCH] Add a _doc_count field for single-histogram metricsets (#4647) * model: _doc_count for single histogram metricsets If a metricset is published with a single histogram, add a _doc_count field which is the sum of the histogram's counts. * systemtest: check _doc_count in txmetrics docs # Conflicts: # changelogs/head.asciidoc --- model/metricset.go | 12 ++++ model/metricset_test.go | 1 + systemtest/aggregation_test.go | 53 ++++++++++++++-- .../TestTransactionAggregation.approved.json | 61 ++++++++++++++++++- ...ansactionAggregationShutdown.approved.json | 1 + systemtest/estest/search.go | 3 +- 6 files changed, 121 insertions(+), 10 deletions(-) diff --git a/model/metricset.go b/model/metricset.go index 2eaa773f33d..427057bcef1 100644 --- a/model/metricset.go +++ b/model/metricset.go @@ -75,6 +75,9 @@ type Metricset struct { Labels common.MapStr // Samples holds the metrics in the set. + // + // If Samples holds a single histogram metric, then the sum of its Counts + // will be used to set a _doc_count field in the transformed beat.Event. Samples []Sample // TimeseriesInstanceID holds an optional identifier for the timeseries @@ -163,6 +166,15 @@ func (me *Metricset) Transform(ctx context.Context, cfg *transform.Config) []bea continue } } + if len(me.Samples) == 1 && len(me.Samples[0].Counts) > 0 { + // We have a single histogram metric; add a _doc_count field which holds the sum of counts. + // See https://www.elastic.co/guide/en/elasticsearch/reference/master/mapping-doc-count-field.html + var total int64 + for _, count := range me.Samples[0].Counts { + total += count + } + fields.Put("_doc_count", total) + } me.Metadata.Set(fields, me.Labels) diff --git a/model/metricset_test.go b/model/metricset_test.go index 32e7ce7bba1..94952651c2b 100644 --- a/model/metricset_test.go +++ b/model/metricset_test.go @@ -171,6 +171,7 @@ func TestTransform(t *testing.T) { }, }, }, + "_doc_count": int64(6), // 1+2+3 }, }, Msg: "Payload with transaction duration.", diff --git a/systemtest/aggregation_test.go b/systemtest/aggregation_test.go index 6875050caf6..ee08ff32465 100644 --- a/systemtest/aggregation_test.go +++ b/systemtest/aggregation_test.go @@ -18,7 +18,10 @@ package systemtest_test import ( + "context" + "encoding/json" "net/http" + "strings" "testing" "time" @@ -30,6 +33,7 @@ import ( "github.com/elastic/apm-server/systemtest" "github.com/elastic/apm-server/systemtest/apmservertest" "github.com/elastic/apm-server/systemtest/estest" + "github.com/elastic/go-elasticsearch/v7/esapi" ) func TestTransactionAggregation(t *testing.T) { @@ -56,14 +60,18 @@ func TestTransactionAggregation(t *testing.T) { // Send some transactions to the server to be aggregated. tracer := srv.Tracer() - tx := tracer.StartTransaction("name", "backend") - req, _ := http.NewRequest("GET", "/", nil) - tx.Context.SetHTTPRequest(req) - tx.Duration = time.Second - tx.End() + for i, name := range []string{"abc", "def"} { + for j := 0; j < (i+1)*5; j++ { + tx := tracer.StartTransaction(name, "backend") + req, _ := http.NewRequest("GET", "/", nil) + tx.Context.SetHTTPRequest(req) + tx.Duration = time.Second + tx.End() + } + } tracer.Flush(nil) - result := systemtest.Elasticsearch.ExpectDocs(t, "apm-*", + result := systemtest.Elasticsearch.ExpectMinDocs(t, 2, "apm-*", estest.ExistsQuery{Field: "transaction.duration.histogram"}, ) systemtest.ApproveEvents(t, t.Name(), result.Hits.Hits, "@timestamp") @@ -71,6 +79,39 @@ func TestTransactionAggregation(t *testing.T) { // Make sure apm-server.aggregation.txmetrics metrics are published. Metric values are unit tested. doc := getBeatsMonitoringStats(t, srv, nil) assert.True(t, gjson.GetBytes(doc.RawSource, "beats_stats.metrics.apm-server.aggregation.txmetrics").Exists()) + + // Make sure the _doc_count field is added such that aggregations return + // the appropriate per-bucket doc_count values. + result = estest.SearchResult{} + _, err = systemtest.Elasticsearch.Do(context.Background(), &esapi.SearchRequest{ + Index: []string{"apm-*"}, + Body: strings.NewReader(`{ + "size": 0, + "query": {"exists":{"field":"transaction.duration.histogram"}}, + "aggs": { + "transaction_names": { + "terms": {"field": "transaction.name"} + } + } +} +`), + }, &result) + require.NoError(t, err) + require.Contains(t, result.Aggregations, "transaction_names") + + type aggregationBucket struct { + Key string `json:"key"` + DocCount int `json:"doc_count"` + } + var aggregationResult struct { + Buckets []aggregationBucket `json:"buckets"` + } + err = json.Unmarshal(result.Aggregations["transaction_names"], &aggregationResult) + require.NoError(t, err) + assert.Equal(t, []aggregationBucket{ + {Key: "def", DocCount: 10}, + {Key: "abc", DocCount: 5}, + }, aggregationResult.Buckets) } func TestTransactionAggregationShutdown(t *testing.T) { diff --git a/systemtest/approvals/TestTransactionAggregation.approved.json b/systemtest/approvals/TestTransactionAggregation.approved.json index 858696b540f..1e5ea0b9b81 100644 --- a/systemtest/approvals/TestTransactionAggregation.approved.json +++ b/systemtest/approvals/TestTransactionAggregation.approved.json @@ -2,6 +2,7 @@ "events": [ { "@timestamp": "dynamic", + "_doc_count": 5, "agent": { "name": "go" }, @@ -35,20 +36,74 @@ } }, "timeseries": { - "instance": "systemtest:name:865d6816622184cd" + "instance": "systemtest:abc:29d4be3fbd7f200f" }, "transaction": { "duration": { "histogram": { "counts": [ - 1 + 5 ], "values": [ 1003519 ] } }, - "name": "name", + "name": "abc", + "root": true, + "type": "backend" + } + }, + { + "@timestamp": "dynamic", + "_doc_count": 10, + "agent": { + "name": "go" + }, + "ecs": { + "version": "dynamic" + }, + "event": { + "ingested": "dynamic", + "outcome": "unknown" + }, + "host": { + "hostname": "beowulf", + "name": "beowulf" + }, + "observer": { + "ephemeral_id": "dynamic", + "hostname": "dynamic", + "id": "dynamic", + "type": "apm-server", + "version": "dynamic", + "version_major": "dynamic" + }, + "processor": { + "event": "metric", + "name": "metric" + }, + "service": { + "name": "systemtest", + "node": { + "name": "beowulf" + } + }, + "timeseries": { + "instance": "systemtest:def:41b636ee77bc8c1c" + }, + "transaction": { + "duration": { + "histogram": { + "counts": [ + 10 + ], + "values": [ + 1003519 + ] + } + }, + "name": "def", "root": true, "type": "backend" } diff --git a/systemtest/approvals/TestTransactionAggregationShutdown.approved.json b/systemtest/approvals/TestTransactionAggregationShutdown.approved.json index fc4a64cfdb2..6576e0568c1 100644 --- a/systemtest/approvals/TestTransactionAggregationShutdown.approved.json +++ b/systemtest/approvals/TestTransactionAggregationShutdown.approved.json @@ -2,6 +2,7 @@ "events": [ { "@timestamp": "dynamic", + "_doc_count": 1, "agent": { "name": "go" }, diff --git a/systemtest/estest/search.go b/systemtest/estest/search.go index 3cd02a52cf7..169aba00b96 100644 --- a/systemtest/estest/search.go +++ b/systemtest/estest/search.go @@ -81,7 +81,8 @@ func (r *SearchRequest) Do(ctx context.Context, out *SearchResult, opts ...Reque } type SearchResult struct { - Hits SearchHits `json:"hits"` + Hits SearchHits `json:"hits"` + Aggregations map[string]json.RawMessage `json:"aggregations"` } type SearchHits struct {