diff --git a/go.mod b/go.mod index ce0b461c46c..7ee0c2ae0ad 100644 --- a/go.mod +++ b/go.mod @@ -12,26 +12,26 @@ require ( github.com/aws/aws-sdk-go v1.19.18 // indirect github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b // indirect github.com/census-instrumentation/opencensus-proto v0.2.0 + github.com/go-kit/kit v0.8.0 github.com/gogo/googleapis v1.2.0 // indirect github.com/golang/protobuf v1.3.1 github.com/google/go-cmp v0.3.0 github.com/gorilla/mux v1.6.2 github.com/grpc-ecosystem/grpc-gateway v1.8.5 - github.com/guillermo/go.procstat v0.0.0-20131123175440-34c2813d2e7f github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/jaegertracing/jaeger v1.9.0 + github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024 github.com/omnition/scribe-go v0.0.0-20190131012523-9e3c68f31124 github.com/opentracing/opentracing-go v1.1.0 // indirect github.com/openzipkin/zipkin-go v0.1.6 github.com/orijtech/prometheus-go-metrics-exporter v0.0.3-0.20190313163149-b321c5297f60 - github.com/orijtech/promreceiver v0.0.6 github.com/pkg/errors v0.8.0 github.com/prashantv/protectmem v0.0.0-20171002184600-e20412882b3a // indirect github.com/prometheus/client_golang v0.9.2 + github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1 github.com/prometheus/prometheus v0.0.0-20190131111325-62e591f928dd github.com/rs/cors v1.6.0 - github.com/shirou/gopsutil v2.18.12+incompatible github.com/soheilhy/cmux v0.1.4 github.com/spf13/cast v1.2.0 github.com/spf13/cobra v0.0.3 diff --git a/go.sum b/go.sum index 5ce7ab77af4..5531be9ab8c 100644 --- a/go.sum +++ b/go.sum @@ -108,8 +108,6 @@ github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfb github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.0 h1:kbxbvI4Un1LUWKxufD+BiE6AEExYYgkQLQmLFqA1LFk= -github.com/golang/protobuf v1.3.0/go.mod h1:Qd/q+1AKNOZr9uGQzbzCmRO6sUih6GTPZv6a1/R87v0= github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/snappy v0.0.0-20160529050041-d9eb7a3d35ec/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= @@ -145,8 +143,6 @@ github.com/grpc-ecosystem/grpc-gateway v1.6.3/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpg github.com/grpc-ecosystem/grpc-gateway v1.8.5 h1:2+KSC78XiO6Qy0hIjfc1OD9H+hsaJdJlb8Kqsd41CTE= github.com/grpc-ecosystem/grpc-gateway v1.8.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645/go.mod h1:6iZfnjpejD4L/4DwD7NryNaJyCQdzwWwH2MWhCA90Kw= -github.com/guillermo/go.procstat v0.0.0-20131123175440-34c2813d2e7f h1:5qK7cub9F9wqib56+0HZlXgPn24GtmEVRoETcwQoOyA= -github.com/guillermo/go.procstat v0.0.0-20131123175440-34c2813d2e7f/go.mod h1:ovoU5+mwafQ5XoEAuIEA9EMocbfVJ0vDacPD67dpL4k= github.com/hashicorp/consul v0.0.0-20180615161029-bed22a81e9fd h1:auIpcMc3+//R94n6tzTN+sJDiNvL3k5+Rus62AtvO4M= github.com/hashicorp/consul v0.0.0-20180615161029-bed22a81e9fd/go.mod h1:mFrjN1mfidgJfYP1xrJCF+AfRhr6Eaqhb2+sfyn/OOI= github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= @@ -250,8 +246,6 @@ github.com/openzipkin/zipkin-go v0.1.6 h1:yXiysv1CSK7Q5yjGy1710zZGnsbMUIjluWBxtL github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw= github.com/orijtech/prometheus-go-metrics-exporter v0.0.3-0.20190313163149-b321c5297f60 h1:vN7d/Zv6aOXqhspiqoEMkb6uFHNARVESmYn5XtNeyrk= github.com/orijtech/prometheus-go-metrics-exporter v0.0.3-0.20190313163149-b321c5297f60/go.mod h1:+Mu9w51Uc2RNKSUTA95d6Pvy8cxFiRX3ANRPlCcnGLA= -github.com/orijtech/promreceiver v0.0.6 h1:2e3OEMZEiQkDk6/xoqmC8MIpkf2XDEcG7i0kFgxtQRY= -github.com/orijtech/promreceiver v0.0.6/go.mod h1:JYRuGbKzdsEv4QcKeFBAFgs8KHmz+m7fv0l1moal21c= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c h1:Lgl0gzECD8GnQ5QCWA8o6BtfL6mDH5rQgM4/fX3avOs= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc= @@ -303,8 +297,6 @@ github.com/sasha-s/go-deadlock v0.0.0-20161201235124-341000892f3d/go.mod h1:StQn github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= -github.com/shirou/gopsutil v2.18.12+incompatible h1:1eaJvGomDnH74/5cF4CTmTbLHAriGFsTZppLXDX93OM= -github.com/shirou/gopsutil v2.18.12+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4= github.com/shurcooL/httpfs v0.0.0-20171119174359-809beceb2371/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg= github.com/shurcooL/vfsgen v0.0.0-20180711163814-62bca832be04/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw= @@ -428,8 +420,6 @@ google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9Ywl google.golang.org/appengine v1.4.0 h1:/wp5JvzpHIxhs/dumFmF7BXTf3Z+dd4uXta4kVyO508= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= -google.golang.org/genproto v0.0.0-20180831171423-11092d34479b h1:lohp5blsw53GBXtLyLNaTXPXS9pJ1tiTw61ZHUoE9Qw= -google.golang.org/genproto v0.0.0-20180831171423-11092d34479b/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190418145605-e7d98fc518a7/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb h1:i1Ppqkc3WQXikh8bXiwHqAN5Rv3/qDCcRk0/Otx73BY= diff --git a/receiver/prometheusreceiver/README.md b/receiver/prometheusreceiver/README.md new file mode 100644 index 00000000000..e7638112163 --- /dev/null +++ b/receiver/prometheusreceiver/README.md @@ -0,0 +1,465 @@ +# Prometheus Receiver Design Spec + +## Design Goal + +### Provide a seamless onboarding experience for users who are already familiar with Prometheus scrape config + +Prometheus has a very powerful config system for user to config how Prometheus can scrape the metrics data from any +application which expose a Prometheus format metrics endpoint. It provides very useful features like filtering unwanted +metrics, relabeling tags..etc. The original Prometheus receiver of OpenCensus took the approach of using Prometheus' own +scraper's source code as a library to achieve this goal. Overall the idea was great, however, the original +implementation has a lot of glitches, it cannot be fixed by small patches. This new Prometheus receiver is going to +follow the same idea of leveraging Prometheus sourcecode, with a proper implementation. + +### Map Prometheus metrics to the corresponding OpenCensus metrics properly + +Prometheus receiver shall be able to map Prometheus metrics to ocagent's proto based metrics, it shall respect the +original metric name, value, timestamp, as well as tags. It doesn't need to provide one-to-one mapping, since supported +metric types are different from the two systems. However, it shall not drop data. + +### Parity between Prometheus and ocagent Prometheus exporter + +Prometheus itself can also used as an exporter, that it can expose the metrics it scrape from other system with its own +metrics endpoint, so is ocagent. We shall be able to retain parity from the following two setups: + +1. app -> promethues -> metric-endpoint +2. app -> ocgent-with-prometheus-receiver -> ocagent-promethues-exporter-metrics-endpoint + + +## Prometheus Text Format Overview + +Prometheus text format is a line orient format. For each non-empty line, which not begins with #, is a metric data +point with includes a metric name and its value, which is of float64 type, as well as some optional data such as tags +and timestamp, which is in milliseconds. For lines begin with #, they are either comments, which need to be filtered, +or metadata, which including type hints and units that are usually indicating the beginning of a new individual metric +or a group of new metrics. More details of Prometheus text format can be found from its +[official document](https://prometheus.io/docs/instrumenting/exposition_formats/#text-based-format). + +### Metric types +Based on this document, Prometheus supports the following 5 types of metrics: +* Counter +* Gauge +* Histogram +* Summary +* Untyped + +However, this is not the whole story, from the implementation details of Prometheus scraper, which the receiver based on, +it supports a couple more undocumented metrics types, including: + +* Gaugehistogram +* Info +* Statset + +More details can be found from the +[prometheus text parser source code]( https://github.com/prometheus/prometheus/blob/master/pkg/textparse/interface.go#L82) + + +### Metric Grouping + +Other than metric types, the type hint comment and metric grouping are also important to know in order to parse Prometheus +text metrics properly. From any Prometheus metrics endpoints, metrics are usually grouped together by starting with a +comment section which includes some very important information like type hints about the metrics, and metrics points of the same +group will have the same metric name but a different set of tag values, for example: + +``` +# HELP container_cpu_load_average_10s Value of container cpu load average over the last 10 seconds. +# TYPE container_cpu_load_average_10s gauge +container_cpu_load_average_10s{id="/",image="",name=""} 0 +container_cpu_load_average_10s{id="/000-metadata",image="",name=""} 0 +container_cpu_load_average_10s{id="/001-sysfs",image="",name=""} 0 +``` + +The above example was taken from an cadvisor metric endpoint, the type hint tells that the name of this metric group is +`container_cpu_load_average_10s` and it's of `gague` type. Then it follows by some individual metric points which are of the +same metric name. For each individual metric within this group, they share the same set of tag keys, with unique value sets. + + +## Prometheus Metric Scraper Anatomy + +The metrics scraper is a component which is used to scrape remote Prometheus metric endpoints, it is also the component +which ocagent Prometheus receiver is based on. It's important to understand how it works in order to implement the receiver +properly. + +### Major components of Prometheus Scape package + +- **[ScapeManager](https://github.com/prometheus/prometheus/blob/v2.9.2/scrape/manager.go):** +the component which loads the scrape_config, and manage the scraping tasks + +- **[ScrapePool](https://github.com/prometheus/prometheus/blob/d3245f15022551c6fc8281766ea62db4d71e2747/scrape/scrape.go#L154-L439):** +an object which manage scrapes for a sets of targets + +- **[Scraper](https://github.com/prometheus/prometheus/blob/d3245f15022551c6fc8281766ea62db4d71e2747/scrape/scrape.go#L506-L511):** +a http client to fetch data from remote metrics endpoints + +- **[Target](https://github.com/prometheus/prometheus/blob/v2.9.2/scrape/target.go):** +the remote metric endpoint, as well as related relabing settings and other metadata + +- **[TextParser](https://github.com/prometheus/prometheus/tree/v2.9.2/pkg/textparse):** +a DFA style streaming decoder/parser for prometheus text format + +- **[Appendable](https://github.com/prometheus/prometheus/blob/d3245f15022551c6fc8281766ea62db4d71e2747/scrape/manager.go#L37-L39):** +it is used to acquire a storage appender instance at the beginning of each scrapeLoop run + +- **[storage.Appender](https://github.com/prometheus/prometheus/blob/d3245f15022551c6fc8281766ea62db4d71e2747/storage/interface.go#L86-L95):** +an abstraction of the metric storage which can be a filesystem, a database or an remote endpoint...etc. As for OpenCensus prometheus receiver, this is +also the interface we need to implement to provide a customized storage appender which is backed by an ocagent metrics sink. + +- **[ScrapeLoop](https://github.com/prometheus/prometheus/blob/d3245f15022551c6fc8281766ea62db4d71e2747/scrape/scrape.go#L586-L1024):** +the actual scrape pipeline which performs the main scraping and ingestion logic. + + +### Prometheus ScrapeLoop workflow explained +Each scraping cycle is trigger by an configured interval, its workflow is as shown in the flowchart below: + +![ScrapeLoop Flowchart](scrapeloop-flowchart.png) + +It basically does the following things in turn: + + 1. make a http call to fetch data from the binding [target](#target)'s metrics endpoint with [scraper](#scraper) + 2. acquired a [storage appender](#storage-appender) instance with the [Appendable](#appendable) interface + 3. feed the data to a textParser + 4. parse and feed metric data points to storage appender + 5. commit if success or rollback + 6. report task status + + +## Implementing Prometheus storage.Appender with ocagent metrics sink + +### The storage.Appender interface +As discussed in the previous section, the storage.Appender is the most important piece of components for us to implement so as to bring the two worlds together. +It has a very simple interface which is defined below: +```go +type Appender interface { + Add(l labels.Labels, t int64, v float64) (uint64, error) + + + AddFast(l labels.Labels, ref uint64, t int64, v float64) error + + + // Commit submits the collected samples and purges the batch. + Commit() error + + + Rollback() error +} +``` + +*Note: the above code belongs to the Prometheus project, its license can be found [here](https://github.com/prometheus/prometheus/blob/v2.9.2/LICENSE)* + + +One can see that the interface is very simple, it only has 4 methods: `Add`, `AddFast`, `Commit` and `Rollback`. +The last two methods are easy to understand: `Commit` is called when the processing of the scraped page is completed and +success, whereas `Rollback` is called if error occurs in between the process. + +However for the two methods starting with 'Add', there's no document on the Prometheus project for how they should be used. +By examining the scrapeLoop source code, as well as some storage.Appender implementations. It indicates that the first +method `Add` is always used for the first time when a unique metrics, which means the combination of metric name and its +tags are unique, is seen for the first time. The `Add` method can return a non zero reference number, then the scrapeLoop +can cache this number with the metric's uniq signature. The next time, such as the next scrape cycle of the same target, +when the metric is seen again by matching its signature, it will call the `AddFast` method with the cached reference number. +This reference number might make sense to databases which has unique key as numbers, however, in our use case, it's not +necessary, thus we can always return 0 ref number from the `Add` method to skip this caching mechanism. + + +### Challenges and solutions +Even though the definition of this interface is very simple, however, to implement it properly is a bit challenging, given that +every time the Add/AddFast method is called, it only provides the information about the current data point, the context of what metric group +this data point belonging to is not provided, we have to keep track of it internally within the appender. And this is not the whole story, +there are a couple other issues we need to address, including: + +1. Have a way to link the Target with the current appender instance + +The labels provided to the Add/AddFast methods dose not include some target specified information such as `job name` +which is important construct the +[Node proto](https://github.com/census-instrumentation/opencensus-proto/blob/e2601ef16f8a085a69d94ace5133f97438f8945f/src/opencensus/proto/agent/common/v1/common.proto#L36-L51) +object of OpenCensus. The target object is not accessible from the Appender interface, however, we can get it from the ScrapeManager, when designing the +ocagent appender, we need to have a way to inject the binding target into the appender instance. + + +3. Group metrics from the same family together + +In OpenCensus, metric points of the same name are usually grouped together as one timeseries but different data points. +It's important for the appender to keep track of the metric family changes, and group metrics of the same family together. +Keep in mind that the Add/AddFast method is operated in a streaming manner, ScrapeLoop does not provide any direct hints on metric name change, +the appender itself need to keep track of it. It's also important to know that for some special types such as `histogram` +and `summary`, not all the data points have the same name, there are some special metric points has postfix like `_sum` and `_count`, +we need to handle this properly, and do not consider this is a metric family change. + + +4. Group complex metrics such as histogram together in proper order + +In Prometheus, a single aggregated type of metric data such as `histogram` and `summary` is represent by multiple metric data points, such as +buckets and quantiles as well as the additional `_sum` and `_count` data. ScrapeLoop will feed them into the appender individually. The ocagent +appender need to have a way to bundle them together to transform them into a single Metric Datapoint Proto object. + +5. Tags need to handle carefully + +ScrapeLoop strips out any tag with empty value, however, in OpenCensus, the tag keys is stored separately, we need to able to get all the possible tag keys +of the same metric family before committing the metric family to the sink. + + +## Prometheus Metric to OpenCensus Metric Proto Mapping + + +### Target as Node +The Target of Prometheus is defined by the scrape_config, it has the information like `hostname` of the remote service, +and a user defined `job name` which can be used as the service name. These two piece of information makes it a great fit +to map it into the `Node` proto of the OpenCensus MetricsData type, as shown below: + +```go +type MetricsData struct { + Node *commonpb.Node + Resource *resourcepb.Resource + Metrics []*metricspb.Metric +} +``` + +The scrape page as whole also can be fit into the above `MetricsData` data structure, and all the metrics data points +can be stored with the `Metrics` array. We will explain the mappings of individual metirc types in the following couple sections + + +### Metric Value Mapping + In OpenCensus, metrics value types can be either `int64` or `float64`, while in Prometheus the value can be safely assumed it's always `float64` based on the +[Prometheus Text Format Document](https://prometheus.io/docs/instrumenting/exposition_formats/#text-format-details) as quoted below: + +> value is a float represented as required by Go's ParseFloat() function. In addition to standard numerical values, Nan, +Inf, and -Inf are valid values representing not a number, positive infinity, and negative infinity, respectively. + +It will make sense for us to stick with this data type as much as possible across all metrics types + + +### Counter +Counter as described in the [Prometheus Metric Types Document](https://prometheus.io/docs/concepts/metric_types/#counter), +> is a cumulative metric that represents a single monotonically increasing counter whose value can only increase or be +> reset to zero on restart + +It is one of the most simple metric types we can be found in both systems. Examples of Prometheus Counters is as shown +below: +``` +# HELP http_requests_total The total number of HTTP requests. +# TYPE http_requests_total counter +http_requests_total{method="post",code="200"} 1027 +http_requests_total{method="post",code="400"} 3 +``` + +The corresponding Ocagent Metric will be: +```go +metrics := []*metricspb.Metric{ + { + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "http_requests_total", + Type: metricspb.MetricDescriptor_CUMULATIVE_DOUBLE, + LabelKeys: []*metricspb.LabelKey{{Key: "method"}, {Key: "code"}}}, + Timeseries: []*metricspb.TimeSeries{ + { + StartTimestamp: tsOc, + LabelValues: []*metricspb.LabelValue{{Value: "post", HasValue: true}, {Value: "200", HasValue: true}}, + Points: []*metricspb.Point{ + {Timestamp: tsOc, Value: &metricspb.Point_DoubleValue{DoubleValue: 1027.0}}, + }, + }, + { + StartTimestamp: tsOc, + LabelValues: []*metricspb.LabelValue{{Value: "post", HasValue: false}, {Value: "400", HasValue: true}}, + Points: []*metricspb.Point{ + {Timestamp: tsOc, Value: &metricspb.Point_DoubleValue{DoubleValue: 3.0}}, + }, + }, + }, + }, +} +``` + +*Note: `tsOc` is a timestamp object, which is based on the timestamp provided by a scrapLoop. In most cases, it is +the timestamp when a target is scrapped, however, it can also be the timestamp recorded with a metric* + + +### Gauge +Gauge, as described in the [Prometheus Metric Types Document](https://prometheus.io/docs/concepts/metric_types/#guage), +> is a metric that represents a single numerical value that can arbitrarily go up and down + +``` +# HELP gauge_test some test gauges. +# TYPE gauge_test gague +gauge_test{id="1",foo="bar"} 1.0 +gauge_test{id="2",foo=""} 2.0 + +``` + +A major different between Gauges of Prometheus and OpenCensus are the value types. In Prometheus, as mentioned earlier, +all values can be considered as float type, however, in OpenCensus, Gauges can either be `Int64` or `Double`. To make +the transformation easier, we always assume the data type is `Double`. + +The corresponding Ocagent Metric of the above examples will be: +```go +metrics := []*metricspb.Metric{ + { + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "gauge_test", + Type: metricspb.MetricDescriptor_GAUGE_DOUBLE, + LabelKeys: []*metricspb.LabelKey{{Key: "id"}, {Key: "foo"}}}, + Timeseries: []*metricspb.TimeSeries{ + { + StartTimestamp: tsOc, + LabelValues: []*metricspb.LabelValue{{Value: "1", HasValue: true}, {Value: "bar", HasValue: true}}, + Points: []*metricspb.Point{ + {Timestamp: tsOc, Value: &metricspb.Point_DoubleValue{DoubleValue: 1.0}}, + }, + }, + { + StartTimestamp: tsOc, + LabelValues: []*metricspb.LabelValue{{Value: "2", HasValue: true}, {Value: "", HasValue: false}}, + Points: []*metricspb.Point{ + {Timestamp: tsOc, Value: &metricspb.Point_DoubleValue{DoubleValue: 2.0}}, + }, + }, + }, + }, +} +``` + + +### Histogram +Histogram is a complex data type, in Prometheus, it uses multiple data points to represent a single histogram. Its +description can be found from: [Prometheus Histogram](https://prometheus.io/docs/concepts/metric_types/#histogram). + +An example of histogram is as shown below: +``` +# HELP hist_test This is my histogram vec +# TYPE hist_test histogram +hist_test_bucket{t1="1",,le="10.0"} 1.0 +hist_test_bucket{t1="1",le="20.0"} 3.0 +hist_test_bucket{t1="1",le="+inf"} 10.0 +hist_test_sum{t1="1"} 100.0 +hist_test_count{t1="1"} 10.0 +hist_test_bucket{t1="2",,le="10.0"} 10.0 +hist_test_bucket{t1="2",le="20.0"} 30.0 +hist_test_bucket{t1="2",le="+inf"} 100.0 +hist_test_sum{t1="2"} 10000.0 +hist_test_count{t1="2"} 100.0 + +``` + +Its corresponding Ocagent metrics will be: +```go +metrics := []*metricspb.Metric{ + { + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "hist_test", + Type: metricspb.MetricDescriptor_CUMULATIVE_DISTRIBUTION, + LabelKeys: []*metricspb.LabelKey{{Key: "t1"}}}, + Timeseries: []*metricspb.TimeSeries{ + { + StartTimestamp: tsOc, + LabelValues: []*metricspb.LabelValue{{Value: "1", HasValue: true}}, + Points: []*metricspb.Point{ + {Timestamp: tsOc, Value: &metricspb.Point_DistributionValue{ + DistributionValue: &metricspb.DistributionValue{ + BucketOptions: &metricspb.DistributionValue_BucketOptions{ + Type: &metricspb.DistributionValue_BucketOptions_Explicit_{ + Explicit: &metricspb.DistributionValue_BucketOptions_Explicit{ + Bounds: []float64{10, 20}, + }, + }, + }, + Count: 10, + Sum: 100.0, + Buckets: []*metricspb.DistributionValue_Bucket{{Count: 1}, {Count: 2}, {Count: 7}}, + }}}, + }, + }, + { + StartTimestamp: tsOc, + LabelValues: []*metricspb.LabelValue{{Value: "2", HasValue: true}}, + Points: []*metricspb.Point{ + {Timestamp: tsOc, Value: &metricspb.Point_DistributionValue{ + DistributionValue: &metricspb.DistributionValue{ + BucketOptions: &metricspb.DistributionValue_BucketOptions{ + Type: &metricspb.DistributionValue_BucketOptions_Explicit_{ + Explicit: &metricspb.DistributionValue_BucketOptions_Explicit{ + Bounds: []float64{10, 20}, + }, + }, + }, + Count: 100, + Sum: 10000.0, + Buckets: []*metricspb.DistributionValue_Bucket{{Count: 10}, {Count: 20}, {Count: 70}}, + }}}, + }, + }, + }, + }, +} + +``` + +There's an important difference between Prometheus bucket and OpenCensus bucket that, bucket counts from Prometheus are +cumulative, to transform this into OpenCensus format, one need to apply the following formula: +```CurrentOCBucketVlaue = CurrentPrometheusBucketValue - PrevPrometheusBucketValue``` + +OpenCensus does not use `+inf` as bound, one needs to remove it to generate the Bounds of the OpenCensus Bounds. + +Other than that, the `SumOfSquaredDeviation`, which is required by OpenCensus format for histogram, is not provided by +Prometheus. We have to set this value to `0` instead. + +### Gaugehistogram + +This is an undocumented data type, it shall be same as regular [Histogram](#histogram) + +### Summary + +Same as histogram, summary is also a complex metric type which is represent by multiple data points. A detailed +description can be found from [Prometheus Summary](https://prometheus.io/docs/concepts/metric_types/#summary) +``` +# HELP go_gc_duration_seconds A summary of the GC invocation durations. +# TYPE go_gc_duration_seconds summary +go_gc_duration_seconds{quantile="0"} 0.0001271 +go_gc_duration_seconds{quantile="0.25"} 0.0002455 +go_gc_duration_seconds{quantile="0.5"} 0.0002904 +go_gc_duration_seconds{quantile="0.75"} 0.0003426 +go_gc_duration_seconds{quantile="1"} 0.0023638 +go_gc_duration_seconds_sum 17.391350544 +go_gc_duration_seconds_count 52489 +``` + +The corresponding Ocagent metrics is as shown below: + +```go +metrics := []*metricspb.Metric{ + { + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "go_gc_duration_seconds", + Type: metricspb.MetricDescriptor_SUMMARY, + LabelKeys: []*metricspb.LabelKey{}}, + Timeseries: []*metricspb.TimeSeries{ + { + StartTimestamp: tsOc, + LabelValues: []*metricspb.LabelValue{}, + Points: []*metricspb.Point{ + {Timestamp: tsOc, Value: &metricspb.Point_SummaryValue{ + SummaryValue: &metricspb.SummaryValue{ + Sum: &wrappers.DoubleValue{Value: 17.391350544}, + Count: &wrappers.Int64Value{Value: 52489}, + Snapshot: &metricspb.SummaryValue_Snapshot{ + PercentileValues: []*metricspb.SummaryValue_Snapshot_ValueAtPercentile{ + {Percentile: 0.0, Value: 0.0001271}, + {Percentile: 25.0, Value: 0.0002455}, + {Percentile: 50.0, Value: 0.0002904}, + {Percentile: 75.0, Value: 0.0003426}, + {Percentile: 100.0, Value: 0.0023638}, + }, + }}}}, + }, + }, + }, + }, +} + +``` + +The major difference between the two systems is that in Prometheus it uses `quantile`, while in OpenCensus `percentile` +is used. Other than that, OpenCensus has optional values for `Sum` and `Count` of a snapshot, however, they are not +provided in Prometheus, and `nil` will be used for these values. + +### Others + +For any other Prometheus metrics types, they will make to the [Guage](#gague) type of Ocagent \ No newline at end of file diff --git a/receiver/prometheusreceiver/internal/internal_test.go b/receiver/prometheusreceiver/internal/internal_test.go new file mode 100644 index 00000000000..9ea4a417be3 --- /dev/null +++ b/receiver/prometheusreceiver/internal/internal_test.go @@ -0,0 +1,79 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "context" + "errors" + "github.com/open-telemetry/opentelemetry-service/data" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/scrape" + "go.uber.org/zap" + "sync" +) + +// test helpers + +var zapLogger *zap.Logger +var testLogger *zap.SugaredLogger + +func init() { + zl, _ := zap.NewDevelopment() + zapLogger = zl + testLogger = zapLogger.Sugar() +} + +type mockMetadataCache struct { + data map[string]scrape.MetricMetadata +} + +func (m *mockMetadataCache) Metadata(metricName string) (scrape.MetricMetadata, bool) { + mm, ok := m.data[metricName] + return mm, ok +} + +func (m *mockMetadataCache) SharedLabels() labels.Labels { + return labels.FromStrings("__scheme__", "http") +} + +func newMockConsumer() *mockConsumer { + return &mockConsumer{ + Metrics: make(chan *data.MetricsData, 1), + } +} + +type mockConsumer struct { + Metrics chan *data.MetricsData + consumOnce sync.Once +} + +func (m *mockConsumer) ConsumeMetricsData(ctx context.Context, md data.MetricsData) error { + m.consumOnce.Do(func() { + m.Metrics <- &md + }) + return nil +} + +type mockMetadataSvc struct { + caches map[string]*mockMetadataCache +} + +func (mm *mockMetadataSvc) Get(job, instance string) (MetadataCache, error) { + if mc, ok := mm.caches[job+"_"+instance]; ok { + return mc, nil + } + + return nil, errors.New("cache not found") +} diff --git a/receiver/prometheusreceiver/internal/logger.go b/receiver/prometheusreceiver/internal/logger.go new file mode 100644 index 00000000000..bcfa89151c0 --- /dev/null +++ b/receiver/prometheusreceiver/internal/logger.go @@ -0,0 +1,43 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import "go.uber.org/zap" +import gokitLog "github.com/go-kit/kit/log" + +// NewZapToGokitLogAdapter create an adapter for zap.Logger to gokitLog.Logger +func NewZapToGokitLogAdapter(logger *zap.Logger) gokitLog.Logger { + // need to skip two levels in order to get the correct caller + // one for this method, the other for gokitLog + logger = logger.WithOptions(zap.AddCallerSkip(2)) + return &zapToGokitLogAdapter{l: logger.Sugar()} +} + +type zapToGokitLogAdapter struct { + l *zap.SugaredLogger +} + +func (w *zapToGokitLogAdapter) Log(keyvals ...interface{}) error { + if len(keyvals)%2 == 0 { + // expecting key value pairs, the number of items need to be even + w.l.Infow("", keyvals...) + } else { + // in case something goes wrong + w.l.Info(keyvals...) + } + return nil +} + +var _ gokitLog.Logger = (*zapToGokitLogAdapter)(nil) diff --git a/receiver/prometheusreceiver/internal/metadata.go b/receiver/prometheusreceiver/internal/metadata.go new file mode 100644 index 00000000000..ba362e8f728 --- /dev/null +++ b/receiver/prometheusreceiver/internal/metadata.go @@ -0,0 +1,66 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "errors" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/scrape" +) + +// MetadataService is an adapter to scrapeManager and provide only the functionality which is needed +type MetadataService interface { + Get(job, instance string) (MetadataCache, error) +} + +// MetadataCache is an adapter to prometheus' scrape.Target and provide only the functionality which is needed +type MetadataCache interface { + Metadata(metricName string) (scrape.MetricMetadata, bool) + SharedLabels() labels.Labels +} + +type mService struct { + sm *scrape.Manager +} + +func (t *mService) Get(job, instance string) (MetadataCache, error) { + targetGroup, ok := t.sm.TargetsAll()[job] + if !ok { + return nil, errors.New("unable to find a target group with job=" + job) + } + + // from the same targetGroup, instance is not going to be duplicated + for _, target := range targetGroup { + if target.DiscoveredLabels().Get(model.AddressLabel) == instance { + return &mCache{target}, nil + } + } + + return nil, errors.New("unable to find a target with job=" + job + ", and instance=" + instance) +} + +// adapter to get metadata from scrape.Target +type mCache struct { + t *scrape.Target +} + +func (m *mCache) Metadata(metricName string) (scrape.MetricMetadata, bool) { + return m.t.Metadata(metricName) +} + +func (m *mCache) SharedLabels() labels.Labels { + return m.t.DiscoveredLabels() +} diff --git a/receiver/prometheusreceiver/internal/metricsbuilder.go b/receiver/prometheusreceiver/internal/metricsbuilder.go new file mode 100644 index 00000000000..45abe3b1c5b --- /dev/null +++ b/receiver/prometheusreceiver/internal/metricsbuilder.go @@ -0,0 +1,549 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "errors" + "fmt" + "github.com/golang/protobuf/ptypes/wrappers" + "go.uber.org/zap" + "sort" + "strconv" + "strings" + + commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" + metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" + "github.com/golang/protobuf/ptypes/timestamp" + "github.com/open-telemetry/opentelemetry-service/data" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/textparse" + "github.com/prometheus/prometheus/scrape" +) + +const metricsSuffixCount = "_count" +const metricsSuffixBucket = "_bucket" +const metricsSuffixSum = "_sum" + +var trimmableSuffixes = []string{metricsSuffixBucket, metricsSuffixCount, metricsSuffixSum} +var errNoDataToBuild = errors.New("there's no data to build") +var errNoBoundaryLabel = errors.New("given metricType has no BucketLabel or QuantileLabel") +var errEmptyBoundaryLabel = errors.New("BucketLabel or QuantileLabel is empty") +var dummyMetric = &data.MetricsData{ + Node: &commonpb.Node{ + Identifier: &commonpb.ProcessIdentifier{HostName: "127.0.0.1"}, + ServiceInfo: &commonpb.ServiceInfo{Name: "internal"}, + }, + Metrics: make([]*metricspb.Metric, 0), +} + +type metricBuilder struct { + node *commonpb.Node + ts int64 + hasData bool + hasInternalMetric bool + mc MetadataCache + metrics []*metricspb.Metric + currentMetricFamily string + currentMetricLabelKeys map[string]int + currentMetadata *scrape.MetricMetadata + currentMetric *metricspb.Metric + currentDpGroups map[string][]*dataPoint + currentDpGOrder map[string]int + currentMetadataGroups map[string]*dataPointGroupMetadata + logger *zap.SugaredLogger +} + +type dataPoint struct { + value float64 + boundary float64 + ts int64 +} + +type dataPointGroupMetadata struct { + ts int64 + ls *labels.Labels + count float64 + hasCount bool + sum float64 + hasSum bool +} + +// newMetricBuilder creates a MetricBuilder which is allowed to feed all the datapoints from a single prometheus +// scraped page by calling its AddDataPoint function, and turn them into an opencensus data.MetricsData object +// by calling its Build function +func newMetricBuilder(node *commonpb.Node, mc MetadataCache, logger *zap.SugaredLogger) *metricBuilder { + + return &metricBuilder{ + node: node, + mc: mc, + metrics: make([]*metricspb.Metric, 0), + logger: logger, + } +} + +// AddDataPoint is for feeding prometheus data points in its processing order +func (b *metricBuilder) AddDataPoint(ls labels.Labels, t int64, v float64) error { + metricName := ls.Get(model.MetricNameLabel) + if metricName == "" { + return errMetricNameNotFound + } else if shouldSkip(metricName) { + b.hasInternalMetric = true + lm := ls.Map() + delete(lm, model.MetricNameLabel) + b.logger.Infow("skip internal metric", "name", metricName, "ts", t, "value", v, "labels", lm) + return nil + } + + b.hasData = true + if err := b.initNewMetricIfNeeded(metricName, ls); err != nil { + return err + } + + // update the labelKeys array, need to do it for every metrics as scrapeLoop can remove labels with empty values + // only when we complete the whole MetricFamily, we can get the full set of label names + b.updateLabelKeys(ls) + groupKey := dpgSignature(b.currentMetricLabelKeys, ls) + mg, ok := b.currentMetadataGroups[groupKey] + if !ok { + mg = &dataPointGroupMetadata{ts: t, ls: &ls} + b.currentMetadataGroups[groupKey] = mg + } + switch mtype := b.currentMetric.MetricDescriptor.Type; mtype { + case metricspb.MetricDescriptor_GAUGE_DISTRIBUTION: + fallthrough + case metricspb.MetricDescriptor_CUMULATIVE_DISTRIBUTION: + fallthrough + case metricspb.MetricDescriptor_SUMMARY: + if strings.HasSuffix(metricName, metricsSuffixSum) { + mg.sum = v + mg.hasSum = true + } else if strings.HasSuffix(metricName, metricsSuffixCount) { + mg.count = v + mg.hasCount = true + } else { + pg, ok := b.currentDpGroups[groupKey] + if !ok { + pg = make([]*dataPoint, 0) + b.currentDpGOrder[groupKey] = len(b.currentDpGOrder) + } + boundary, err := getBoundary(mtype, ls) + if err != nil { + return err + } + dp := &dataPoint{ + value: v, + boundary: boundary, + } + b.currentDpGroups[groupKey] = append(pg, dp) + } + default: + pg, ok := b.currentDpGroups[groupKey] + if !ok { + pg = make([]*dataPoint, 0, 4) + b.currentDpGOrder[groupKey] = len(b.currentDpGOrder) + } + dp := &dataPoint{ + value: v, + ts: t, + } + b.currentDpGroups[groupKey] = append(pg, dp) + } + + return nil +} + +// Build is to build an opencensus data.MetricsData based on all added data points +func (b *metricBuilder) Build() (*data.MetricsData, error) { + if !b.hasData { + if b.hasInternalMetric { + return dummyMetric, nil + } + return nil, errNoDataToBuild + } + + if err := b.completeCurrentMetric(); err != nil { + return nil, err + } + return &data.MetricsData{ + Node: b.node, + Metrics: b.metrics, + }, nil +} + +func (b *metricBuilder) initNewMetricIfNeeded(metricName string, _ labels.Labels) error { + metricFamily := normalizeMetricName(metricName) + // the 2nd case is for poorly named metrics + if b.currentMetricFamily != metricFamily && !(metricFamily != metricName && metricName == b.currentMetricFamily) { + if b.currentMetric != nil { + if err := b.completeCurrentMetric(); err != nil { + return err + } + } + + // setup new metric group + metadata, ok := b.mc.Metadata(metricFamily) + + // perform a 2nd lookup with the original metric name + // this can only happen if there's a metric is not histogram or summary and ends with one of the suffixes + // not a good name, but you can still do it that way. hence perform a 2nd lookup with original name + if !ok && metricName != metricFamily { + metricFamily = metricName + metadata, ok = b.mc.Metadata(metricFamily) + // still not found, this can happen for metrics has no TYPE HINTS + if !ok { + metadata.Metric = metricFamily + metadata.Type = textparse.MetricTypeUnknown + } + } + b.currentMetricFamily = metricFamily + b.currentMetadata = &metadata + b.currentDpGroups = make(map[string][]*dataPoint) + b.currentDpGOrder = make(map[string]int) + b.currentMetadataGroups = make(map[string]*dataPointGroupMetadata) + b.currentMetric = &metricspb.Metric{ + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: metricFamily, + Description: metadata.Help, + Unit: heuristicalMetricAndKnownUnits(metricFamily, metadata.Unit), + Type: convToOCAMetricType(metadata.Type), + // Due to the fact that scrapeLoop strips any tags with emtpy value, we cannot get the + // full set of labels by just looking at the first metrics, thus setting the LabelKeys of + // this proto need to be done after finished the whole group + }, + Timeseries: make([]*metricspb.TimeSeries, 0), + } + b.currentMetricLabelKeys = make(map[string]int) + } + return nil +} + +func (b *metricBuilder) updateLabelKeys(ls labels.Labels) { + for _, l := range ls { + if isUsefulLabel(b.currentMetric.MetricDescriptor.Type, l.Name) { + if _, ok := b.currentMetricLabelKeys[l.Name]; !ok { + b.currentMetricLabelKeys[l.Name] = len(b.currentMetricLabelKeys) + } + } + } +} + +func (b *metricBuilder) getLabelKeys() []*metricspb.LabelKey { + lks := make([]*metricspb.LabelKey, len(b.currentMetricLabelKeys)) + + for k, v := range b.currentMetricLabelKeys { + lks[v] = &metricspb.LabelKey{Key: k} + } + + return lks +} + +func (b *metricBuilder) getLabelValues(ls *labels.Labels) []*metricspb.LabelValue { + lvs := make([]*metricspb.LabelValue, len(b.currentMetricLabelKeys)) + for k, v := range b.currentMetricLabelKeys { + value := ls.Get(k) + lvs[v] = &metricspb.LabelValue{Value: value, HasValue: value != ""} + } + + return lvs +} + +func (b *metricBuilder) completeCurrentMetric() error { + switch mtype := b.currentMetric.MetricDescriptor.Type; mtype { + case metricspb.MetricDescriptor_GAUGE_DISTRIBUTION, metricspb.MetricDescriptor_CUMULATIVE_DISTRIBUTION: + groups := b.currentDpGroupOrdered() + if len(groups) == 0 { + // this can happen if only sum or count is added, but not following data points + return errors.New("no data point added to summary") + } + for _, gk := range groups { + pts := b.currentDpGroups[gk] + mg, ok := b.currentMetadataGroups[gk] + if !ok || !(mg.hasSum && mg.hasCount) { + return errors.New("count or sum is missing: " + gk) + } + + // always use the first timestamp for the whole metric group + ts := timestampFromMs(mg.ts) + // sort it by boundary + sort.Slice(pts, func(i, j int) bool { + return pts[i].boundary < pts[j].boundary + }) + + // for OCAgent Proto, the bounds won't include +inf + bounds := make([]float64, len(pts)-1) + buckets := make([]*metricspb.DistributionValue_Bucket, len(pts)) + + for i := 0; i < len(pts); i++ { + if i != len(pts)-1 { + // not need to add +inf as bound to oc proto + bounds[i] = pts[i].boundary + } + adjustedCount := pts[i].value + if i != 0 { + adjustedCount -= pts[i-1].value + } + buckets[i] = &metricspb.DistributionValue_Bucket{Count: int64(adjustedCount)} + } + + distrValue := &metricspb.DistributionValue{ + BucketOptions: &metricspb.DistributionValue_BucketOptions{ + Type: &metricspb.DistributionValue_BucketOptions_Explicit_{ + Explicit: &metricspb.DistributionValue_BucketOptions_Explicit{ + Bounds: bounds, + }, + }, + }, + Count: int64(mg.count), + Sum: mg.sum, + Buckets: buckets, + // SumOfSquaredDeviation: // there's no way to compute this value from prometheus data + } + + timeseries := &metricspb.TimeSeries{ + StartTimestamp: ts, + LabelValues: b.getLabelValues(mg.ls), + Points: []*metricspb.Point{ + {Timestamp: ts, Value: &metricspb.Point_DistributionValue{DistributionValue: distrValue}}, + }, + } + + b.currentMetric.MetricDescriptor.LabelKeys = b.getLabelKeys() + b.currentMetric.Timeseries = append(b.currentMetric.Timeseries, timeseries) + } + case metricspb.MetricDescriptor_SUMMARY: + groups := b.currentDpGroupOrdered() + if len(groups) == 0 { + // this can happen if only sum or count is added, but not following data points + return errors.New("no data point added to summary") + } + for _, gk := range groups { + pts := b.currentDpGroups[gk] + mg, ok := b.currentMetadataGroups[gk] + if !ok || !(mg.hasSum && mg.hasCount) { + return errors.New("count or sum is missing: " + gk) + } + + // always use the first timestamp for the whole metric group + ts := timestampFromMs(mg.ts) + // sort it by boundary + sort.Slice(pts, func(i, j int) bool { + return pts[i].boundary < pts[j].boundary + }) + + percentiles := make([]*metricspb.SummaryValue_Snapshot_ValueAtPercentile, len(pts)) + + for i, p := range pts { + percentiles[i] = + &metricspb.SummaryValue_Snapshot_ValueAtPercentile{Percentile: p.boundary * 100, Value: p.value} + } + + // Based on the summary description from https://prometheus.io/docs/concepts/metric_types/#summary + // the quantiles are calculated over a sliding time window, however, the count is the total count of + // observations and the corresponding sum is a sum of all observed values, thus the sum and count used + // at the global level of the metricspb.SummaryValue + + summaryValue := &metricspb.SummaryValue{ + Sum: &wrappers.DoubleValue{Value: mg.sum}, + Count: &wrappers.Int64Value{Value: int64(mg.count)}, + Snapshot: &metricspb.SummaryValue_Snapshot{ + PercentileValues: percentiles, + }, + } + + timeseries := &metricspb.TimeSeries{ + StartTimestamp: ts, + LabelValues: b.getLabelValues(mg.ls), + Points: []*metricspb.Point{ + {Timestamp: ts, Value: &metricspb.Point_SummaryValue{SummaryValue: summaryValue}}, + }, + } + + b.currentMetric.MetricDescriptor.LabelKeys = b.getLabelKeys() + b.currentMetric.Timeseries = append(b.currentMetric.Timeseries, timeseries) + } + default: + for _, gk := range b.currentDpGroupOrdered() { + pts := b.currentDpGroups[gk] + mg := b.currentMetadataGroups[gk] + + var startTs *timestamp.Timestamp + // do not set startTs if metric type is Gauge as per comment + if mtype != metricspb.MetricDescriptor_GAUGE_DOUBLE { + startTs = timestampFromMs(mg.ts) + } + + for _, p := range pts { + timeseries := &metricspb.TimeSeries{ + StartTimestamp: startTs, + Points: []*metricspb.Point{{Timestamp: timestampFromMs(p.ts), Value: &metricspb.Point_DoubleValue{DoubleValue: p.value}}}, + LabelValues: b.getLabelValues(mg.ls), + } + b.currentMetric.Timeseries = append(b.currentMetric.Timeseries, timeseries) + } + + b.currentMetric.MetricDescriptor.LabelKeys = b.getLabelKeys() + } + } + + b.metrics = append(b.metrics, b.currentMetric) + b.currentMetric = nil + return nil +} + +func (b *metricBuilder) currentDpGroupOrdered() []string { + groupKeys := make([]string, len(b.currentDpGOrder)) + for k, v := range b.currentDpGOrder { + groupKeys[v] = k + } + + return groupKeys +} + +func isUsefulLabel(mType metricspb.MetricDescriptor_Type, labelKey string) bool { + result := false + switch labelKey { + case model.MetricNameLabel: + case model.InstanceLabel: + case model.SchemeLabel: + case model.MetricsPathLabel: + case model.JobLabel: + case model.BucketLabel: + result = mType != metricspb.MetricDescriptor_GAUGE_DISTRIBUTION && + mType != metricspb.MetricDescriptor_CUMULATIVE_DISTRIBUTION + case model.QuantileLabel: + result = mType != metricspb.MetricDescriptor_SUMMARY + default: + result = true + } + return result +} + +func dpgSignature(knownLabelKeys map[string]int, ls labels.Labels) string { + sign := make([]string, len(knownLabelKeys)) + for k, v := range knownLabelKeys { + sign[v] = ls.Get(k) + } + + return fmt.Sprintf("%#v", sign) +} + +func normalizeMetricName(name string) string { + for _, s := range trimmableSuffixes { + if strings.HasSuffix(name, s) && name != s { + return strings.TrimSuffix(name, s) + } + } + return name +} + +func getBoundary(metricType metricspb.MetricDescriptor_Type, labels labels.Labels) (float64, error) { + labelName := "" + if metricType == metricspb.MetricDescriptor_CUMULATIVE_DISTRIBUTION || + metricType == metricspb.MetricDescriptor_GAUGE_DISTRIBUTION { + labelName = model.BucketLabel + } else if metricType == metricspb.MetricDescriptor_SUMMARY { + labelName = model.QuantileLabel + } else { + return 0, errNoBoundaryLabel + } + + v := labels.Get(labelName) + if v == "" { + return 0, errEmptyBoundaryLabel + } + + return strconv.ParseFloat(v, 64) +} + +func convToOCAMetricType(metricType textparse.MetricType) metricspb.MetricDescriptor_Type { + switch metricType { + case textparse.MetricTypeCounter: + // always use float64, as it's the internal data type used in prometheus + return metricspb.MetricDescriptor_CUMULATIVE_DOUBLE + case textparse.MetricTypeGauge: + return metricspb.MetricDescriptor_GAUGE_DOUBLE + case textparse.MetricTypeHistogram: + return metricspb.MetricDescriptor_CUMULATIVE_DISTRIBUTION + case textparse.MetricTypeGaugeHistogram: + return metricspb.MetricDescriptor_GAUGE_DISTRIBUTION + case textparse.MetricTypeSummary: + return metricspb.MetricDescriptor_SUMMARY + default: + // including: textparse.MetricTypeUnknown, textparse.MetricTypeInfo, textparse.MetricTypeStateset + return metricspb.MetricDescriptor_UNSPECIFIED + } +} + +/* + code borrowed from the original promreceiver +*/ + +func heuristicalMetricAndKnownUnits(metricName, parsedUnit string) string { + if parsedUnit != "" { + return parsedUnit + } + lastUnderscoreIndex := strings.LastIndex(metricName, "_") + if lastUnderscoreIndex <= 0 || lastUnderscoreIndex >= len(metricName)-1 { + return "" + } + + unit := "" + + supposedUnit := metricName[lastUnderscoreIndex+1:] + switch strings.ToLower(supposedUnit) { + case "millisecond", "milliseconds", "ms": + unit = "ms" + case "second", "seconds", "s": + unit = "s" + case "microsecond", "microseconds", "us": + unit = "us" + case "nanosecond", "nanoseconds", "ns": + unit = "ns" + case "byte", "bytes", "by": + unit = "By" + case "bit", "bits": + unit = "Bi" + case "kilogram", "kilograms", "kg": + unit = "kg" + case "gram", "grams", "g": + unit = "g" + case "meter", "meters", "metre", "metres", "m": + unit = "m" + case "kilometer", "kilometers", "kilometre", "kilometres", "km": + unit = "km" + case "milimeter", "milimeters", "milimetre", "milimetres", "mm": + unit = "mm" + case "nanogram", "ng", "nanograms": + unit = "ng" + } + + return unit +} + +func timestampFromMs(timeAtMs int64) *timestamp.Timestamp { + secs, ns := timeAtMs/1e3, (timeAtMs%1e3)*1e6 + return ×tamp.Timestamp{ + Seconds: secs, + Nanos: int32(ns), + } +} + +func shouldSkip(metricName string) bool { + if metricName == "up" || strings.HasPrefix(metricName, "scrape_") { + return true + } + return false +} diff --git a/receiver/prometheusreceiver/internal/metricsbuilder_test.go b/receiver/prometheusreceiver/internal/metricsbuilder_test.go new file mode 100644 index 00000000000..8ae4e83c88d --- /dev/null +++ b/receiver/prometheusreceiver/internal/metricsbuilder_test.go @@ -0,0 +1,866 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "github.com/golang/protobuf/ptypes/wrappers" + "reflect" + "testing" + + commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" + metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" + "github.com/open-telemetry/opentelemetry-service/exporter/exportertest" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/textparse" + "github.com/prometheus/prometheus/scrape" +) + +func Test_isUsefulLabel(t *testing.T) { + type args struct { + mType metricspb.MetricDescriptor_Type + labelKey string + } + tests := []struct { + name string + args args + want bool + }{ + {"metricName", args{metricspb.MetricDescriptor_GAUGE_DOUBLE, model.MetricNameLabel}, false}, + {"instance", args{metricspb.MetricDescriptor_GAUGE_DOUBLE, model.InstanceLabel}, false}, + {"scheme", args{metricspb.MetricDescriptor_GAUGE_DOUBLE, model.SchemeLabel}, false}, + {"metricPath", args{metricspb.MetricDescriptor_GAUGE_DOUBLE, model.MetricsPathLabel}, false}, + {"job", args{metricspb.MetricDescriptor_GAUGE_DOUBLE, model.JobLabel}, false}, + {"bucket", args{metricspb.MetricDescriptor_GAUGE_DOUBLE, model.BucketLabel}, true}, + {"bucketForGaugeDistribution", args{metricspb.MetricDescriptor_GAUGE_DISTRIBUTION, model.BucketLabel}, false}, + {"bucketForCumulativeDistribution", args{metricspb.MetricDescriptor_CUMULATIVE_DISTRIBUTION, model.BucketLabel}, false}, + {"Quantile", args{metricspb.MetricDescriptor_GAUGE_DOUBLE, model.QuantileLabel}, true}, + {"QuantileForSummay", args{metricspb.MetricDescriptor_SUMMARY, model.QuantileLabel}, false}, + {"other", args{metricspb.MetricDescriptor_GAUGE_DOUBLE, "other"}, true}, + {"empty", args{metricspb.MetricDescriptor_GAUGE_DOUBLE, ""}, true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := isUsefulLabel(tt.args.mType, tt.args.labelKey); got != tt.want { + t.Errorf("isUsefulLabel() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_dpgSignature(t *testing.T) { + knownLabelKeys := map[string]int{"a": 0, "b": 1} + + tests := []struct { + name string + ls labels.Labels + want string + }{ + {"1st label", labels.FromStrings("a", "va"), `[]string{"va", ""}`}, + {"2nd label", labels.FromStrings("b", "vb"), `[]string{"", "vb"}`}, + {"two labels", labels.FromStrings("a", "va", "b", "vb"), `[]string{"va", "vb"}`}, + {"extra label", labels.FromStrings("a", "va", "b", "vb", "x", "xa"), `[]string{"va", "vb"}`}, + {"different order", labels.FromStrings("b", "vb", "a", "va"), `[]string{"va", "vb"}`}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := dpgSignature(knownLabelKeys, tt.ls); got != tt.want { + t.Errorf("dpgSignature() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_normalizeMetricName(t *testing.T) { + tests := []struct { + name string + mname string + want string + }{ + {"normal", "normal", "normal"}, + {"count", "foo_count", "foo"}, + {"bucket", "foo_bucket", "foo"}, + {"sum", "foo_sum", "foo"}, + {"no_prefix", "_sum", "_sum"}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := normalizeMetricName(tt.mname); got != tt.want { + t.Errorf("normalizeMetricName() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_getBoundary(t *testing.T) { + ls := labels.FromStrings("le", "100.0", "foo", "bar", "quantile", "0.5") + ls2 := labels.FromStrings("foo", "bar") + ls3 := labels.FromStrings("le", "xyz", "foo", "bar", "quantile", "0.5") + type args struct { + metricType metricspb.MetricDescriptor_Type + labels labels.Labels + } + tests := []struct { + name string + args args + want float64 + wantErr bool + }{ + {"histogram", args{metricspb.MetricDescriptor_CUMULATIVE_DISTRIBUTION, ls}, 100.0, false}, + {"gaugehistogram", args{metricspb.MetricDescriptor_GAUGE_DISTRIBUTION, ls}, 100.0, false}, + {"gaugehistogram_no_label", args{metricspb.MetricDescriptor_GAUGE_DISTRIBUTION, ls2}, 0, true}, + {"gaugehistogram_bad_value", args{metricspb.MetricDescriptor_GAUGE_DISTRIBUTION, ls3}, 0, true}, + {"summary", args{metricspb.MetricDescriptor_SUMMARY, ls}, 0.5, false}, + {"otherType", args{metricspb.MetricDescriptor_GAUGE_DOUBLE, ls}, 0, true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := getBoundary(tt.args.metricType, tt.args.labels) + if (err != nil) != tt.wantErr { + t.Errorf("getBoundary() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("getBoundary() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_convToOCAMetricType(t *testing.T) { + tests := []struct { + name string + metricType textparse.MetricType + want metricspb.MetricDescriptor_Type + }{ + {"counter", textparse.MetricTypeCounter, metricspb.MetricDescriptor_CUMULATIVE_DOUBLE}, + {"gauge", textparse.MetricTypeGauge, metricspb.MetricDescriptor_GAUGE_DOUBLE}, + {"histogram", textparse.MetricTypeHistogram, metricspb.MetricDescriptor_CUMULATIVE_DISTRIBUTION}, + {"guageHistogram", textparse.MetricTypeGaugeHistogram, metricspb.MetricDescriptor_GAUGE_DISTRIBUTION}, + {"summary", textparse.MetricTypeSummary, metricspb.MetricDescriptor_SUMMARY}, + {"info", textparse.MetricTypeInfo, metricspb.MetricDescriptor_UNSPECIFIED}, + {"stateset", textparse.MetricTypeStateset, metricspb.MetricDescriptor_UNSPECIFIED}, + {"unknown", textparse.MetricTypeUnknown, metricspb.MetricDescriptor_UNSPECIFIED}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := convToOCAMetricType(tt.metricType); !reflect.DeepEqual(got, tt.want) { + t.Errorf("convToOCAMetricType() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_metricBuilder(t *testing.T) { + type pt struct { + lb labels.Labels + v float64 + hasErr bool + } + + node := &commonpb.Node{ + ServiceInfo: &commonpb.ServiceInfo{Name: "myjob"}, + Identifier: &commonpb.ProcessIdentifier{ + HostName: "example.com", + }, + } + + ts := int64(1555366610000) + tsOc := timestampFromMs(ts) + + mc := &mockMetadataCache{ + data: map[string]scrape.MetricMetadata{ + "counter_test": {Metric: "counter_test", Type: textparse.MetricTypeCounter, Help: "", Unit: ""}, + "gauge_test": {Metric: "gauge_test", Type: textparse.MetricTypeGauge, Help: "", Unit: ""}, + "hist_test": {Metric: "hist_test", Type: textparse.MetricTypeHistogram, Help: "", Unit: ""}, + "ghist_test": {Metric: "ghist_test", Type: textparse.MetricTypeGaugeHistogram, Help: "", Unit: ""}, + "summary_test": {Metric: "summary_test", Type: textparse.MetricTypeSummary, Help: "", Unit: ""}, + "unknown_test": {Metric: "unknown_test", Type: textparse.MetricTypeUnknown, Help: "", Unit: ""}, + "poor_name_count": {Metric: "poor_name_count", Type: textparse.MetricTypeCounter, Help: "", Unit: ""}, + "up": {Metric: "up", Type: textparse.MetricTypeCounter, Help: "", Unit: ""}, + "scrape_foo": {Metric: "scrape_foo", Type: textparse.MetricTypeCounter, Help: "", Unit: ""}, + }, + } + + tests := []struct { + name string + pts []*pt + processErr bool + buildErr bool + metrics []*metricspb.Metric + }{ + { + name: "counters", + pts: []*pt{ + {createLabels("counter_test", "t1", "1"), 1.0, false}, + {createLabels("counter_test", "t2", "2"), 2.0, false}, + {createLabels("counter_test", "t1", "3", "t2", "4"), 3.0, false}, + {createLabels("counter_test"), 4.0, false}, + }, + buildErr: false, + metrics: []*metricspb.Metric{ + { + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "counter_test", + Type: metricspb.MetricDescriptor_CUMULATIVE_DOUBLE, + LabelKeys: []*metricspb.LabelKey{{Key: "t1"}, {Key: "t2"}}}, + Timeseries: []*metricspb.TimeSeries{ + { + StartTimestamp: tsOc, + LabelValues: []*metricspb.LabelValue{{Value: "1", HasValue: true}, {Value: "", HasValue: false}}, + Points: []*metricspb.Point{ + {Timestamp: tsOc, Value: &metricspb.Point_DoubleValue{DoubleValue: 1.0}}, + }, + }, + { + StartTimestamp: tsOc, + LabelValues: []*metricspb.LabelValue{{Value: "", HasValue: false}, {Value: "2", HasValue: true}}, + Points: []*metricspb.Point{ + {Timestamp: tsOc, Value: &metricspb.Point_DoubleValue{DoubleValue: 2.0}}, + }, + }, + { + StartTimestamp: tsOc, + LabelValues: []*metricspb.LabelValue{{Value: "3", HasValue: true}, {Value: "4", HasValue: true}}, + Points: []*metricspb.Point{ + {Timestamp: tsOc, Value: &metricspb.Point_DoubleValue{DoubleValue: 3.0}}, + }, + }, + { + StartTimestamp: tsOc, + LabelValues: []*metricspb.LabelValue{{Value: "", HasValue: false}, {Value: "", HasValue: false}}, + Points: []*metricspb.Point{ + {Timestamp: tsOc, Value: &metricspb.Point_DoubleValue{DoubleValue: 4.0}}, + }, + }, + }, + }, + }, + }, + { + name: "gauge", + pts: []*pt{ + {createLabels("gauge_test", "t1", "1"), 1.0, false}, + {createLabels("gauge_test", "t2", "2"), 2.0, false}, + {createLabels("gauge_test", "t1", "3", "t2", "4"), 3.0, false}, + {createLabels("gauge_test"), 4.0, false}, + }, + buildErr: false, + metrics: []*metricspb.Metric{ + { + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "gauge_test", + Type: metricspb.MetricDescriptor_GAUGE_DOUBLE, + LabelKeys: []*metricspb.LabelKey{{Key: "t1"}, {Key: "t2"}}}, + Timeseries: []*metricspb.TimeSeries{ + { + LabelValues: []*metricspb.LabelValue{{Value: "1", HasValue: true}, {Value: "", HasValue: false}}, + Points: []*metricspb.Point{ + {Timestamp: tsOc, Value: &metricspb.Point_DoubleValue{DoubleValue: 1.0}}, + }, + }, + { + LabelValues: []*metricspb.LabelValue{{Value: "", HasValue: false}, {Value: "2", HasValue: true}}, + Points: []*metricspb.Point{ + {Timestamp: tsOc, Value: &metricspb.Point_DoubleValue{DoubleValue: 2.0}}, + }, + }, + { + LabelValues: []*metricspb.LabelValue{{Value: "3", HasValue: true}, {Value: "4", HasValue: true}}, + Points: []*metricspb.Point{ + {Timestamp: tsOc, Value: &metricspb.Point_DoubleValue{DoubleValue: 3.0}}, + }, + }, + { + LabelValues: []*metricspb.LabelValue{{Value: "", HasValue: false}, {Value: "", HasValue: false}}, + Points: []*metricspb.Point{ + {Timestamp: tsOc, Value: &metricspb.Point_DoubleValue{DoubleValue: 4.0}}, + }, + }, + }, + }, + }, + }, + { + name: "two_groups", + pts: []*pt{ + {createLabels("counter_test", "t1", "1"), 1.0, false}, + {createLabels("counter_test", "t2", "2"), 2.0, false}, + {createLabels("gauge_test", "t1", "1"), 1.0, false}, + {createLabels("gauge_test", "t2", "2"), 2.0, false}, + }, + buildErr: false, + metrics: []*metricspb.Metric{ + { + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "counter_test", + Type: metricspb.MetricDescriptor_CUMULATIVE_DOUBLE, + LabelKeys: []*metricspb.LabelKey{{Key: "t1"}, {Key: "t2"}}}, + Timeseries: []*metricspb.TimeSeries{ + { + StartTimestamp: tsOc, + LabelValues: []*metricspb.LabelValue{{Value: "1", HasValue: true}, {Value: "", HasValue: false}}, + Points: []*metricspb.Point{ + {Timestamp: tsOc, Value: &metricspb.Point_DoubleValue{DoubleValue: 1.0}}, + }, + }, + { + StartTimestamp: tsOc, + LabelValues: []*metricspb.LabelValue{{Value: "", HasValue: false}, {Value: "2", HasValue: true}}, + Points: []*metricspb.Point{ + {Timestamp: tsOc, Value: &metricspb.Point_DoubleValue{DoubleValue: 2.0}}, + }, + }, + }, + }, + { + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "gauge_test", + Type: metricspb.MetricDescriptor_GAUGE_DOUBLE, + LabelKeys: []*metricspb.LabelKey{{Key: "t1"}, {Key: "t2"}}}, + Timeseries: []*metricspb.TimeSeries{ + { + LabelValues: []*metricspb.LabelValue{{Value: "1", HasValue: true}, {Value: "", HasValue: false}}, + Points: []*metricspb.Point{ + {Timestamp: tsOc, Value: &metricspb.Point_DoubleValue{DoubleValue: 1.0}}, + }, + }, + { + LabelValues: []*metricspb.LabelValue{{Value: "", HasValue: false}, {Value: "2", HasValue: true}}, + Points: []*metricspb.Point{ + {Timestamp: tsOc, Value: &metricspb.Point_DoubleValue{DoubleValue: 2.0}}, + }, + }, + }, + }, + }, + }, + { + name: "histogram", + pts: []*pt{ + {createLabels("hist_test", "t1", "1", "le", "10"), 1.0, false}, + {createLabels("hist_test", "t1", "1", "le", "20"), 3.0, false}, + {createLabels("hist_test", "t1", "1", "le", "+inf"), 10.0, false}, + {createLabels("hist_test_sum", "t1", "1"), 100.0, false}, + {createLabels("hist_test_count", "t1", "1"), 10.0, false}, + {createLabels("hist_test", "t1", "2", "le", "10"), 10.0, false}, + {createLabels("hist_test", "t1", "2", "le", "20"), 30.0, false}, + {createLabels("hist_test", "t1", "2", "le", "+inf"), 100.0, false}, + {createLabels("hist_test_sum", "t1", "2"), 10000.0, false}, + {createLabels("hist_test_count", "t1", "2"), 100.0, false}, + }, + buildErr: false, + metrics: []*metricspb.Metric{ + { + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "hist_test", + Type: metricspb.MetricDescriptor_CUMULATIVE_DISTRIBUTION, + LabelKeys: []*metricspb.LabelKey{{Key: "t1"}}}, + Timeseries: []*metricspb.TimeSeries{ + { + StartTimestamp: tsOc, + LabelValues: []*metricspb.LabelValue{{Value: "1", HasValue: true}}, + Points: []*metricspb.Point{ + {Timestamp: tsOc, Value: &metricspb.Point_DistributionValue{ + DistributionValue: &metricspb.DistributionValue{ + BucketOptions: &metricspb.DistributionValue_BucketOptions{ + Type: &metricspb.DistributionValue_BucketOptions_Explicit_{ + Explicit: &metricspb.DistributionValue_BucketOptions_Explicit{ + Bounds: []float64{10, 20}, + }, + }, + }, + Count: 10, + Sum: 100.0, + Buckets: []*metricspb.DistributionValue_Bucket{{Count: 1}, {Count: 2}, {Count: 7}}, + }}}, + }, + }, + { + StartTimestamp: tsOc, + LabelValues: []*metricspb.LabelValue{{Value: "2", HasValue: true}}, + Points: []*metricspb.Point{ + {Timestamp: tsOc, Value: &metricspb.Point_DistributionValue{ + DistributionValue: &metricspb.DistributionValue{ + BucketOptions: &metricspb.DistributionValue_BucketOptions{ + Type: &metricspb.DistributionValue_BucketOptions_Explicit_{ + Explicit: &metricspb.DistributionValue_BucketOptions_Explicit{ + Bounds: []float64{10, 20}, + }, + }, + }, + Count: 100, + Sum: 10000.0, + Buckets: []*metricspb.DistributionValue_Bucket{{Count: 10}, {Count: 20}, {Count: 70}}, + }}}, + }, + }, + }, + }, + }, + }, + { + name: "gaugehistogram", + pts: []*pt{ + {createLabels("ghist_test", "t1", "1", "le", "10"), 1.0, false}, + {createLabels("ghist_test", "t1", "1", "le", "20"), 3.0, false}, + {createLabels("ghist_test", "t1", "1", "le", "+inf"), 10.0, false}, + {createLabels("ghist_test_sum", "t1", "1"), 100.0, false}, + {createLabels("ghist_test_count", "t1", "1"), 10.0, false}, + }, + buildErr: false, + metrics: []*metricspb.Metric{ + { + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "ghist_test", + Type: metricspb.MetricDescriptor_GAUGE_DISTRIBUTION, + LabelKeys: []*metricspb.LabelKey{{Key: "t1"}}}, + Timeseries: []*metricspb.TimeSeries{ + { + StartTimestamp: tsOc, + LabelValues: []*metricspb.LabelValue{{Value: "1", HasValue: true}}, + Points: []*metricspb.Point{ + {Timestamp: tsOc, Value: &metricspb.Point_DistributionValue{ + DistributionValue: &metricspb.DistributionValue{ + BucketOptions: &metricspb.DistributionValue_BucketOptions{ + Type: &metricspb.DistributionValue_BucketOptions_Explicit_{ + Explicit: &metricspb.DistributionValue_BucketOptions_Explicit{ + Bounds: []float64{10, 20}, + }, + }, + }, + Count: 10, + Sum: 100.0, + Buckets: []*metricspb.DistributionValue_Bucket{{Count: 1}, {Count: 2}, {Count: 7}}, + }}}, + }, + }, + }, + }, + }, + }, + { + name: "histogram_mixed_oder", + pts: []*pt{ + {createLabels("ghist_test", "t1", "1", "le", "10"), 1.0, false}, + {createLabels("ghist_test_sum", "t1", "1"), 100.0, false}, + {createLabels("ghist_test", "t1", "1", "le", "+inf"), 10.0, false}, + {createLabels("ghist_test_count", "t1", "1"), 10.0, false}, + {createLabels("ghist_test", "t1", "1", "le", "20"), 3.0, false}, + }, + buildErr: false, + metrics: []*metricspb.Metric{ + { + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "ghist_test", + Type: metricspb.MetricDescriptor_GAUGE_DISTRIBUTION, + LabelKeys: []*metricspb.LabelKey{{Key: "t1"}}}, + Timeseries: []*metricspb.TimeSeries{ + { + StartTimestamp: tsOc, + LabelValues: []*metricspb.LabelValue{{Value: "1", HasValue: true}}, + Points: []*metricspb.Point{ + {Timestamp: tsOc, Value: &metricspb.Point_DistributionValue{ + DistributionValue: &metricspb.DistributionValue{ + BucketOptions: &metricspb.DistributionValue_BucketOptions{ + Type: &metricspb.DistributionValue_BucketOptions_Explicit_{ + Explicit: &metricspb.DistributionValue_BucketOptions_Explicit{ + Bounds: []float64{10, 20}, + }, + }, + }, + Count: 10, + Sum: 100.0, + Buckets: []*metricspb.DistributionValue_Bucket{{Count: 1}, {Count: 2}, {Count: 7}}, + }}}, + }, + }, + }, + }, + }, + }, + { + name: "summary", + pts: []*pt{ + {createLabels("summary_test", "t1", "1", "quantile", "0.5"), 1.0, false}, + {createLabels("summary_test", "t1", "1", "quantile", "0.9"), 3.0, false}, + {createLabels("summary_test_sum", "t1", "1"), 100.0, false}, + {createLabels("summary_test_count", "t1", "1"), 1000.0, false}, + }, + buildErr: false, + metrics: []*metricspb.Metric{ + { + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "summary_test", + Type: metricspb.MetricDescriptor_SUMMARY, + LabelKeys: []*metricspb.LabelKey{{Key: "t1"}}}, + Timeseries: []*metricspb.TimeSeries{ + { + StartTimestamp: tsOc, + LabelValues: []*metricspb.LabelValue{{Value: "1", HasValue: true}}, + Points: []*metricspb.Point{ + {Timestamp: tsOc, Value: &metricspb.Point_SummaryValue{ + SummaryValue: &metricspb.SummaryValue{ + Sum: &wrappers.DoubleValue{Value: 100.0}, + Count: &wrappers.Int64Value{Value: 1000}, + Snapshot: &metricspb.SummaryValue_Snapshot{ + PercentileValues: []*metricspb.SummaryValue_Snapshot_ValueAtPercentile{ + {Percentile: 50.0, Value: 1}, + {Percentile: 90.0, Value: 3}, + }, + }}}}, + }, + }, + }, + }, + }, + }, + { + name: "unknowns", + pts: []*pt{ + {createLabels("unknown_test", "t1", "1"), 1.0, false}, + {createLabels("unknown_test", "t2", "2"), 2.0, false}, + {createLabels("unknown_test", "t1", "3", "t2", "4"), 3.0, false}, + {createLabels("unknown_test"), 4.0, false}, + }, + buildErr: false, + metrics: []*metricspb.Metric{ + { + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "unknown_test", + Type: metricspb.MetricDescriptor_UNSPECIFIED, + LabelKeys: []*metricspb.LabelKey{{Key: "t1"}, {Key: "t2"}}}, + Timeseries: []*metricspb.TimeSeries{ + { + StartTimestamp: tsOc, + LabelValues: []*metricspb.LabelValue{{Value: "1", HasValue: true}, {Value: "", HasValue: false}}, + Points: []*metricspb.Point{ + {Timestamp: tsOc, Value: &metricspb.Point_DoubleValue{DoubleValue: 1.0}}, + }, + }, + { + StartTimestamp: tsOc, + LabelValues: []*metricspb.LabelValue{{Value: "", HasValue: false}, {Value: "2", HasValue: true}}, + Points: []*metricspb.Point{ + {Timestamp: tsOc, Value: &metricspb.Point_DoubleValue{DoubleValue: 2.0}}, + }, + }, + { + StartTimestamp: tsOc, + LabelValues: []*metricspb.LabelValue{{Value: "3", HasValue: true}, {Value: "4", HasValue: true}}, + Points: []*metricspb.Point{ + {Timestamp: tsOc, Value: &metricspb.Point_DoubleValue{DoubleValue: 3.0}}, + }, + }, + { + StartTimestamp: tsOc, + LabelValues: []*metricspb.LabelValue{{Value: "", HasValue: false}, {Value: "", HasValue: false}}, + Points: []*metricspb.Point{ + {Timestamp: tsOc, Value: &metricspb.Point_DoubleValue{DoubleValue: 4.0}}, + }, + }, + }, + }, + }, + }, + { + name: "no-hints-individual-family", + pts: []*pt{ + {createLabels("metric_family1", "t1", "1"), 1.0, false}, + {createLabels("metric_family2", "t2", "2"), 2.0, false}, + }, + buildErr: false, + metrics: []*metricspb.Metric{ + { + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "metric_family1", + Type: metricspb.MetricDescriptor_UNSPECIFIED, + LabelKeys: []*metricspb.LabelKey{{Key: "t1"}}}, + Timeseries: []*metricspb.TimeSeries{ + { + StartTimestamp: tsOc, + LabelValues: []*metricspb.LabelValue{{Value: "1", HasValue: true}}, + Points: []*metricspb.Point{ + {Timestamp: tsOc, Value: &metricspb.Point_DoubleValue{DoubleValue: 1.0}}, + }, + }, + }, + }, + { + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "metric_family2", + Type: metricspb.MetricDescriptor_UNSPECIFIED, + LabelKeys: []*metricspb.LabelKey{{Key: "t2"}}}, + Timeseries: []*metricspb.TimeSeries{ + { + StartTimestamp: tsOc, + LabelValues: []*metricspb.LabelValue{{Value: "2", HasValue: true}}, + Points: []*metricspb.Point{ + {Timestamp: tsOc, Value: &metricspb.Point_DoubleValue{DoubleValue: 2.0}}, + }, + }, + }, + }, + }, + }, + { + name: "poor_name_count", + pts: []*pt{ + {createLabels("poor_name_count", "t1", "1"), 1.0, false}, + {createLabels("poor_name_count", "t2", "2"), 2.0, false}, + }, + buildErr: false, + metrics: []*metricspb.Metric{ + { + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "poor_name_count", + Type: metricspb.MetricDescriptor_CUMULATIVE_DOUBLE, + LabelKeys: []*metricspb.LabelKey{{Key: "t1"}, {Key: "t2"}}}, + Timeseries: []*metricspb.TimeSeries{ + { + StartTimestamp: tsOc, + LabelValues: []*metricspb.LabelValue{{Value: "1", HasValue: true}, {Value: "", HasValue: false}}, + Points: []*metricspb.Point{ + {Timestamp: tsOc, Value: &metricspb.Point_DoubleValue{DoubleValue: 1.0}}, + }, + }, + { + StartTimestamp: tsOc, + LabelValues: []*metricspb.LabelValue{{Value: "", HasValue: false}, {Value: "2", HasValue: true}}, + Points: []*metricspb.Point{ + {Timestamp: tsOc, Value: &metricspb.Point_DoubleValue{DoubleValue: 2.0}}, + }, + }, + }, + }, + }, + }, + { + name: "poor_name_nohint_count", + pts: []*pt{ + {createLabels("poor_name_nohint_count", "t1", "1"), 1.0, false}, + {createLabels("poor_name_nohint_count", "t2", "2"), 2.0, false}, + }, + buildErr: false, + metrics: []*metricspb.Metric{ + { + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "poor_name_nohint_count", + Type: metricspb.MetricDescriptor_UNSPECIFIED, + LabelKeys: []*metricspb.LabelKey{{Key: "t1"}, {Key: "t2"}}}, + Timeseries: []*metricspb.TimeSeries{ + { + StartTimestamp: tsOc, + LabelValues: []*metricspb.LabelValue{{Value: "1", HasValue: true}, {Value: "", HasValue: false}}, + Points: []*metricspb.Point{ + {Timestamp: tsOc, Value: &metricspb.Point_DoubleValue{DoubleValue: 1.0}}, + }, + }, + { + StartTimestamp: tsOc, + LabelValues: []*metricspb.LabelValue{{Value: "", HasValue: false}, {Value: "2", HasValue: true}}, + Points: []*metricspb.Point{ + {Timestamp: tsOc, Value: &metricspb.Point_DoubleValue{DoubleValue: 2.0}}, + }, + }, + }, + }, + }, + }, + { + name: "noDataAdded", + pts: []*pt{}, + buildErr: true, + }, + { + name: "emptyMetricName", + pts: []*pt{ + {createLabels("counter_test", "t1", "1"), 1.0, false}, + {createLabels("counter_test", "t2", "2"), 2.0, false}, + {createLabels("", "t1", "3", "t2", "4"), 3.0, true}, + }, + }, + { + name: "bad_label_histogram", + pts: []*pt{ + {createLabels("ghist_test", "t1", "1"), 1.0, true}, + }, + buildErr: true, + }, + { + name: "incomplete_histogram_no_sum_count", + pts: []*pt{ + {createLabels("ghist_test", "t1", "1", "le", "10"), 1.0, false}, + }, + buildErr: true, + }, + { + name: "incomplete_histogram_no_buckets", + pts: []*pt{ + {createLabels("ghist_test_sum", "t1", "1"), 1.0, false}, + {createLabels("ghist_test_count", "t1", "1"), 1.0, false}, + }, + buildErr: true, + }, + { + name: "bad_label_summary", + pts: []*pt{ + {createLabels("summary_test", "t1", "1"), 1.0, true}, + }, + buildErr: true, + }, + { + name: "incomplete_summary_no_sum_count", + pts: []*pt{ + {createLabels("summary_test", "t1", "1", "quantile", "0.5"), 1.0, false}, + }, + buildErr: true, + }, + { + name: "incomplete_summary_no_sum_count", + pts: []*pt{ + {createLabels("summary_test_sum", "t1", "1"), 1.0, false}, + {createLabels("summary_test_count", "t1", "1"), 1.0, false}, + }, + buildErr: true, + }, + { + name: "incomplete_previous", + pts: []*pt{ + {createLabels("summary_test", "t1", "1", "quantile", "0.5"), 1.0, false}, + {createLabels("new_metric", "t1", "1"), 1.0, true}, + }, + }, + { + name: "skipped", + pts: []*pt{ + {createLabels("up", "t1", "1"), 1.0, false}, + {createLabels("scrape_foo", "t1", "1"), 1.0, false}, + }, + buildErr: false, + metrics: make([]*metricspb.Metric, 0), + }, + { + name: "nodata", + pts: []*pt{}, + buildErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + b := newMetricBuilder(node, mc, testLogger) + for _, v := range tt.pts { + if err := b.AddDataPoint(v.lb, ts, v.v); (err != nil) != v.hasErr { + t.Errorf("metricBuilder.AddDataPoint() error = %v, wantErr %v", err, v.hasErr) + } + if v.hasErr { + // any error in between will cause whole page to fail on scrapeLoop + // no need to continue + return + } + } + + mt, err := b.Build() + + if err != nil { + if !tt.buildErr { + t.Errorf("metricBuilder.Build() error = %v, wantErr %v", err, tt.buildErr) + } + return + } else if tt.buildErr { + t.Errorf("metricBuilder.Build() error = %v, wantErr %v", err, tt.buildErr) + return + } + + if !reflect.DeepEqual(mt.Metrics, tt.metrics) { + t.Errorf("metricBuilder.Build() metric = %v, want %v", + string(exportertest.ToJSON(mt.Metrics)), string(exportertest.ToJSON(tt.metrics))) + } + }) + } +} + +func createLabels(mFamily string, tagPairs ...string) labels.Labels { + lm := make(map[string]string) + lm[model.MetricNameLabel] = mFamily + if len(tagPairs)%2 != 0 { + panic("tag pairs is not even") + } + + for i := 0; i < len(tagPairs); i += 2 { + lm[tagPairs[i]] = tagPairs[i+1] + } + + return labels.FromMap(lm) +} + +func Test_heuristicalMetricAndKnownUnits(t *testing.T) { + tests := []struct { + metricName string + parsedUnit string + want string + }{ + {"test", "ms", "ms"}, + {"millisecond", "", ""}, + {"test_millisecond", "", "ms"}, + {"test_milliseconds", "", "ms"}, + {"test_ms", "", "ms"}, + {"test_second", "", "s"}, + {"test_seconds", "", "s"}, + {"test_s", "", "s"}, + {"test_microsecond", "", "us"}, + {"test_microseconds", "", "us"}, + {"test_us", "", "us"}, + {"test_nanosecond", "", "ns"}, + {"test_nanoseconds", "", "ns"}, + {"test_ns", "", "ns"}, + {"test_byte", "", "By"}, + {"test_bytes", "", "By"}, + {"test_by", "", "By"}, + {"test_bit", "", "Bi"}, + {"test_bits", "", "Bi"}, + {"test_kilogram", "", "kg"}, + {"test_kilograms", "", "kg"}, + {"test_kg", "", "kg"}, + {"test_gram", "", "g"}, + {"test_grams", "", "g"}, + {"test_g", "", "g"}, + {"test_nanogram", "", "ng"}, + {"test_nanograms", "", "ng"}, + {"test_ng", "", "ng"}, + {"test_meter", "", "m"}, + {"test_meters", "", "m"}, + {"test_metre", "", "m"}, + {"test_metres", "", "m"}, + {"test_m", "", "m"}, + {"test_kilometer", "", "km"}, + {"test_kilometers", "", "km"}, + {"test_kilometre", "", "km"}, + {"test_kilometres", "", "km"}, + {"test_km", "", "km"}, + {"test_milimeter", "", "mm"}, + {"test_milimeters", "", "mm"}, + {"test_milimetre", "", "mm"}, + {"test_milimetres", "", "mm"}, + {"test_mm", "", "mm"}, + } + for _, tt := range tests { + t.Run(tt.metricName, func(t *testing.T) { + if got := heuristicalMetricAndKnownUnits(tt.metricName, tt.parsedUnit); got != tt.want { + t.Errorf("heuristicalMetricAndKnownUnits() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/receiver/prometheusreceiver/internal/ocastore.go b/receiver/prometheusreceiver/internal/ocastore.go new file mode 100644 index 00000000000..47db09791d6 --- /dev/null +++ b/receiver/prometheusreceiver/internal/ocastore.go @@ -0,0 +1,110 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "context" + "errors" + "github.com/open-telemetry/opentelemetry-service/consumer" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/scrape" + "github.com/prometheus/prometheus/storage" + "go.uber.org/zap" + "io" + "sync" + "sync/atomic" +) + +const ( + runningStateInit = iota + runningStateReady + runningStateStop +) + +var idSeq int64 +var errAlreadyStop = errors.New("ocaStore already stopped") + +// OcaStore is an interface combines io.Closer and prometheus' scrape.Appendable +type OcaStore interface { + scrape.Appendable + io.Closer + SetScrapeManager(*scrape.Manager) +} + +// OpenCensus Store for prometheus +type ocaStore struct { + running int32 + logger *zap.SugaredLogger + sink consumer.MetricsConsumer + mc *mService + once *sync.Once + ctx context.Context +} + +// NewOcaStore returns an ocaStore instance, which can be acted as prometheus' scrape.Appendable +func NewOcaStore(ctx context.Context, sink consumer.MetricsConsumer, logger *zap.SugaredLogger) OcaStore { + return &ocaStore{ + running: runningStateInit, + ctx: ctx, + sink: sink, + logger: logger, + once: &sync.Once{}, + } +} + +// SetScrapeManager is used to config the underlying scrape.Manager as it's needed for OcaStore, otherwise OcaStore +// cannot accept any Appender() request +func (o *ocaStore) SetScrapeManager(scrapeManager *scrape.Manager) { + if scrapeManager != nil && atomic.CompareAndSwapInt32(&o.running, runningStateInit, runningStateReady) { + o.mc = &mService{scrapeManager} + } +} + +func (o *ocaStore) Appender() (storage.Appender, error) { + state := atomic.LoadInt32(&o.running) + if state == runningStateReady { + return newTransaction(o.ctx, o.mc, o.sink, o.logger), nil + } else if state == runningStateInit { + return nil, errors.New("ScrapeManager is not set") + } + // instead of returning an error, return a dummy appender instead, otherwise it can trigger panic + return noopAppender(true), nil +} + +func (o *ocaStore) Close() error { + atomic.CompareAndSwapInt32(&o.running, runningStateReady, runningStateStop) + return nil +} + +// noopAppender, always return error on any operations +type noopAppender bool + +func (noopAppender) Add(l labels.Labels, t int64, v float64) (uint64, error) { + return 0, errAlreadyStop +} + +func (noopAppender) AddFast(l labels.Labels, ref uint64, t int64, v float64) error { + return errAlreadyStop +} + +func (noopAppender) Commit() error { + return errAlreadyStop +} + +func (noopAppender) Rollback() error { + return errAlreadyStop +} + +var _ storage.Appender = noopAppender(true) diff --git a/receiver/prometheusreceiver/internal/ocastore_test.go b/receiver/prometheusreceiver/internal/ocastore_test.go new file mode 100644 index 00000000000..f9f29baf465 --- /dev/null +++ b/receiver/prometheusreceiver/internal/ocastore_test.go @@ -0,0 +1,312 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "context" + "fmt" + "github.com/go-kit/kit/log" + "github.com/open-telemetry/opentelemetry-service/data" + "github.com/prometheus/prometheus/discovery" + "net/http" + "net/http/httptest" + "net/url" + "reflect" + "testing" + "time" + + metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" + "github.com/prometheus/prometheus/config" + sd_config "github.com/prometheus/prometheus/discovery/config" + "github.com/prometheus/prometheus/scrape" +) + +func TestOcaStore(t *testing.T) { + + o := NewOcaStore(context.Background(), nil, nil) + + _, err := o.Appender() + if err == nil { + t.Fatal("expecting error, but get nil") + } + + o.SetScrapeManager(nil) + _, err = o.Appender() + if err == nil { + t.Fatal("expecting error, but get nil") + } + + o.SetScrapeManager(&scrape.Manager{}) + + app, err := o.Appender() + if app == nil { + t.Fatalf("expecting app, but got error %v\n", err) + } + + _ = o.Close() + + app, err = o.Appender() + if app == nil || err != nil { + t.Fatalf("expect app!=nil and err==nil, got app=%v and err=%v", app, err) + } + +} + +func TestOcaStoreIntegration(t *testing.T) { + // verify at a high level + type v struct { + mname string + mtype metricspb.MetricDescriptor_Type + numLbKeys int + numTs int + } + tests := []struct { + name string + page string + mv []v + }{ + { + name: "promethues_text_format_example", + page: testData1, + mv: []v{ + { + mname: "http_requests_total", + mtype: metricspb.MetricDescriptor_CUMULATIVE_DOUBLE, + numLbKeys: 2, + numTs: 2, + }, + { + mname: "http_request_duration_seconds", + mtype: metricspb.MetricDescriptor_CUMULATIVE_DISTRIBUTION, + numLbKeys: 0, + numTs: 1, + }, + { + mname: "rpc_duration_seconds", + mtype: metricspb.MetricDescriptor_SUMMARY, + numLbKeys: 0, + numTs: 1, + }, + }, + }, + { + name: "test_my_histogram_vec", + page: testDataHistoVec, + mv: []v{ + { + mname: "test_my_histogram_vec", + mtype: metricspb.MetricDescriptor_CUMULATIVE_DISTRIBUTION, + numLbKeys: 2, + numTs: 2, + }, + }, + }, + { + name: "test_my_summary_vec", + page: testDataSummaryVec, + mv: []v{ + { + mname: "test_my_summary", + mtype: metricspb.MetricDescriptor_SUMMARY, + numLbKeys: 0, + numTs: 1, + }, + { + mname: "test_my_summary_vec", + mtype: metricspb.MetricDescriptor_SUMMARY, + numLbKeys: 2, + numTs: 3, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + srv := startMockServer(tt.page) + target, _ := url.Parse(srv.URL) + defer srv.Close() + mc, cancel := startScraper(target.Host) + defer cancel() + + var d *data.MetricsData + select { + case d = <-mc.Metrics: + case <-time.After(time.Second * 10): + t.Error("no data come back in 10 second") + } + + if len(tt.mv) != len(d.Metrics) { + t.Errorf("Number of metrics got=%v exepct=%v\n", len(d.Metrics), len(tt.mv)) + } + + for i, dd := range d.Metrics { + got := v{ + mname: dd.MetricDescriptor.Name, + mtype: dd.MetricDescriptor.Type, + numLbKeys: len(dd.MetricDescriptor.LabelKeys), + numTs: len(dd.Timeseries), + } + if !reflect.DeepEqual(got, tt.mv[i]) { + t.Errorf("got %v, expect %v", got, tt.mv[i]) + } + } + }) + } + +} + +func startMockServer(page string) *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + rw.Write([]byte(page)) + })) +} + +func startScraper(target string) (*mockConsumer, context.CancelFunc) { + rawCfg := fmt.Sprintf(scrapCfgTpl, target) + + cfg, err := config.Load(rawCfg) + if err != nil { + fmt.Println(err) + return nil, nil + } + con := newMockConsumer() + o := NewOcaStore(context.Background(), con, testLogger) + scrapeManager := scrape.NewManager(log.NewNopLogger(), o) + o.SetScrapeManager(scrapeManager) + + ctx := context.Background() + ctxScrape, cancelScrape := context.WithCancel(ctx) + + discoveryManagerScrape := discovery.NewManager(ctxScrape, NewZapToGokitLogAdapter(zapLogger)) + go discoveryManagerScrape.Run() + scrapeManager.ApplyConfig(cfg) + + // Run the scrape manager. + syncConfig := make(chan bool) + errsChan := make(chan error, 1) + go func() { + defer close(errsChan) + + <-time.After(100 * time.Millisecond) + close(syncConfig) + if err := scrapeManager.Run(discoveryManagerScrape.SyncCh()); err != nil { + errsChan <- err + } + }() + <-syncConfig + // By this point we've given time to the scrape manager + // to start applying its original configuration. + + discoveryCfg := make(map[string]sd_config.ServiceDiscoveryConfig) + for _, scrapeConfig := range cfg.ScrapeConfigs { + discoveryCfg[scrapeConfig.JobName] = scrapeConfig.ServiceDiscoveryConfig + } + + // Now trigger the discovery notification to the scrape manager. + discoveryManagerScrape.ApplyConfig(discoveryCfg) + + return con, cancelScrape +} + +var scrapCfgTpl = ` + +scrape_configs: +- job_name: 'test' + scrape_interval: 80ms + static_configs: + - targets: ['%s'] +` + +// https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-format-example +var testData1 = ` +# HELP http_requests_total The total number of HTTP requests. +# TYPE http_requests_total counter +http_requests_total{method="post",code="200"} 1027 1395066363000 +http_requests_total{method="post",code="400"} 3 1395066363000 + +# A histogram, which has a pretty complex representation in the text format: +# HELP http_request_duration_seconds A histogram of the request duration. +# TYPE http_request_duration_seconds histogram +http_request_duration_seconds_bucket{le="0.05"} 24054 +http_request_duration_seconds_bucket{le="0.1"} 33444 +http_request_duration_seconds_bucket{le="0.2"} 100392 +http_request_duration_seconds_bucket{le="0.5"} 129389 +http_request_duration_seconds_bucket{le="1"} 133988 +http_request_duration_seconds_bucket{le="+Inf"} 144320 +http_request_duration_seconds_sum 53423 +http_request_duration_seconds_count 144320 + +# Finally a summary, which has a complex representation, too: +# HELP rpc_duration_seconds A summary of the RPC duration in seconds. +# TYPE rpc_duration_seconds summary +rpc_duration_seconds{quantile="0.01"} 3102 +rpc_duration_seconds{quantile="0.05"} 3272 +rpc_duration_seconds{quantile="0.5"} 4773 +rpc_duration_seconds{quantile="0.9"} 9001 +rpc_duration_seconds{quantile="0.99"} 76656 +rpc_duration_seconds_sum 1.7560473e+07 +rpc_duration_seconds_count 2693 +` + +var testDataHistoVec = ` +# HELP test_my_histogram_vec This is my histogram vec +# TYPE test_my_histogram_vec histogram +test_my_histogram_vec_bucket{bar="",foo="f",le="0.0"} 0.0 +test_my_histogram_vec_bucket{bar="",foo="f",le="30.0"} 2.0 +test_my_histogram_vec_bucket{bar="",foo="f",le="50.0"} 5.0 +test_my_histogram_vec_bucket{bar="",foo="f",le="80.0"} 8.0 +test_my_histogram_vec_bucket{bar="",foo="f",le="100.0"} 9.0 +test_my_histogram_vec_bucket{bar="",foo="f",le="200.0"} 9.0 +test_my_histogram_vec_bucket{bar="",foo="f",le="+Inf"} 9.0 +test_my_histogram_vec_sum{bar="",foo="f"} 420.06922114567107 +test_my_histogram_vec_count{bar="",foo="f"} 9.0 +test_my_histogram_vec_bucket{bar="b",foo="",le="0.0"} 0.0 +test_my_histogram_vec_bucket{bar="b",foo="",le="30.0"} 4.0 +test_my_histogram_vec_bucket{bar="b",foo="",le="50.0"} 6.0 +test_my_histogram_vec_bucket{bar="b",foo="",le="80.0"} 9.0 +test_my_histogram_vec_bucket{bar="b",foo="",le="100.0"} 9.0 +test_my_histogram_vec_bucket{bar="b",foo="",le="200.0"} 9.0 +test_my_histogram_vec_bucket{bar="b",foo="",le="+Inf"} 9.0 +test_my_histogram_vec_sum{bar="b",foo=""} 307.6198217635357 +test_my_histogram_vec_count{bar="b",foo=""} 9.0 +` + +var testDataSummaryVec = ` +# HELP test_my_summary This is my summary +# TYPE test_my_summary summary +test_my_summary{quantile="0.5"} 5.297674177868265 +test_my_summary{quantile="0.9"} 9.178352863969653 +test_my_summary{quantile="0.99"} 9.178352863969653 +test_my_summary_sum 41.730240267137724 +test_my_summary_count 9.0 +# HELP test_my_summary_vec This is my summary vec +# TYPE test_my_summary_vec summary +test_my_summary_vec{bar="b1",foo="f1",quantile="0.5"} 7.11378856098672 +test_my_summary_vec{bar="b1",foo="f1",quantile="0.9"} 8.533665390719884 +test_my_summary_vec{bar="b1",foo="f1",quantile="0.99"} 8.533665390719884 +test_my_summary_vec_sum{bar="b1",foo="f1"} 55.1297982492356 +test_my_summary_vec_count{bar="b1",foo="f1"} 9.0 +test_my_summary_vec{bar="b3",foo="f1",quantile="0.5"} 3.430603231384155 +test_my_summary_vec{bar="b3",foo="f1",quantile="0.9"} 9.938629091300923 +test_my_summary_vec{bar="b3",foo="f1",quantile="0.99"} 9.938629091300923 +test_my_summary_vec_sum{bar="b3",foo="f1"} 40.31948120896123 +test_my_summary_vec_count{bar="b3",foo="f1"} 9.0 +test_my_summary_vec{bar="b3",foo="f2",quantile="0.5"} 5.757833591018714 +test_my_summary_vec{bar="b3",foo="f2",quantile="0.9"} 8.186181950691724 +test_my_summary_vec{bar="b3",foo="f2",quantile="0.99"} 8.186181950691724 +test_my_summary_vec_sum{bar="b3",foo="f2"} 46.82000375730371 +test_my_summary_vec_count{bar="b3",foo="f2"} 9.0 +` diff --git a/receiver/prometheusreceiver/internal/transaction.go b/receiver/prometheusreceiver/internal/transaction.go new file mode 100644 index 00000000000..0c83350a218 --- /dev/null +++ b/receiver/prometheusreceiver/internal/transaction.go @@ -0,0 +1,147 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "context" + "errors" + commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" + "github.com/open-telemetry/opentelemetry-service/consumer" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" + "go.uber.org/zap" + "strings" + "sync/atomic" +) + +const ( + portAttr = "port" + schemeAttr = "scheme" +) + +var errMetricNameNotFound = errors.New("metricName not found from labels") +var errTransactionAborted = errors.New("transaction aborted") +var errNoJobInstance = errors.New("job or instance cannot be found from labels") + +// A transaction is corresponding to an individual scrape operation or stale report. +// That said, whenever prometheus receiver scrapped a target metric endpoint a page of raw metrics is returned, +// a transaction, which acts as appender, is created to process this page of data, the scrapeLoop will call the Add or +// AddFast method to insert metrics data points, when finished either Commit, which means success, is called and data +// will be flush to the downstream consumer, or Rollback, which means discard all the data, is called and all data +// points are discarded. +type transaction struct { + id int64 + ctx context.Context + isNew bool + sink consumer.MetricsConsumer + ms MetadataService + metricBuilder *metricBuilder + logger *zap.SugaredLogger +} + +func newTransaction(ctx context.Context, ms MetadataService, sink consumer.MetricsConsumer, logger *zap.SugaredLogger) *transaction { + return &transaction{ + id: atomic.AddInt64(&idSeq, 1), + ctx: ctx, + isNew: true, + sink: sink, + ms: ms, + logger: logger, + } +} + +// ensure *transaction has implemented the storage.Appender interface +var _ storage.Appender = (*transaction)(nil) + +// there's no document on the first return value, however, it's somehow used in AddFast. I assume this is like a +// uniqKey kind of thing for storage like a database, so that the operation can be perform faster with this key. +// however, in this case, return 0 like what the prometheus remote store does shall be enough +func (tr *transaction) Add(l labels.Labels, t int64, v float64) (uint64, error) { + return 0, tr.AddFast(l, 0, t, v) +} + +// returning an error from this method can cause the whole appending transaction to be aborted and fail +func (tr *transaction) AddFast(ls labels.Labels, _ uint64, t int64, v float64) error { + select { + case <-tr.ctx.Done(): + return errTransactionAborted + default: + } + if tr.isNew { + if err := tr.initTransaction(ls); err != nil { + return err + } + } + return tr.metricBuilder.AddDataPoint(ls, t, v) +} + +func (tr *transaction) initTransaction(ls labels.Labels) error { + job, instance := ls.Get(model.JobLabel), ls.Get(model.InstanceLabel) + if job == "" || instance == "" { + return errNoJobInstance + } + // discover the binding target when this method is called for the first time during a transaction + mc, err := tr.ms.Get(job, instance) + if err != nil { + return err + } + node := createNode(job, instance, mc.SharedLabels().Get(model.SchemeLabel)) + tr.metricBuilder = newMetricBuilder(node, mc, tr.logger) + tr.isNew = false + return nil +} + +// submit metrics data to consumers +func (tr *transaction) Commit() error { + if tr.isNew { + // In a situation like not able to connect to the remote server, scrapeloop will still commit even if it had + // never added any data points, that the transaction has not been initialized. + return nil + } + + md, err := tr.metricBuilder.Build() + if err != nil { + return err + } + + if md != dummyMetric { + return tr.sink.ConsumeMetricsData(context.Background(), *md) + } + + return nil +} + +func (tr *transaction) Rollback() error { + return nil +} + +func createNode(job, instance, scheme string) *commonpb.Node { + splitted := strings.Split(instance, ":") + host, port := splitted[0], "80" + if len(splitted) >= 2 { + port = splitted[1] + } + return &commonpb.Node{ + ServiceInfo: &commonpb.ServiceInfo{Name: job}, + Identifier: &commonpb.ProcessIdentifier{ + HostName: host, + }, + Attributes: map[string]string{ + portAttr: port, + schemeAttr: scheme, + }, + } +} diff --git a/receiver/prometheusreceiver/internal/transaction_test.go b/receiver/prometheusreceiver/internal/transaction_test.go new file mode 100644 index 00000000000..40a7e5aea9f --- /dev/null +++ b/receiver/prometheusreceiver/internal/transaction_test.go @@ -0,0 +1,94 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "context" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/scrape" + "reflect" + "testing" + "time" +) + +func Test_transaction(t *testing.T) { + ms := &mockMetadataSvc{ + caches: map[string]*mockMetadataCache{ + "test_localhost:8080": {data: map[string]scrape.MetricMetadata{}}, + }, + } + + t.Run("Commit Without Adding", func(t *testing.T) { + mcon := newMockConsumer() + tr := newTransaction(context.Background(), ms, mcon, testLogger) + if got := tr.Commit(); got != nil { + t.Errorf("expecting nil from Commit() but got err %v", got) + } + }) + + t.Run("Rollback dose nothing", func(t *testing.T) { + mcon := newMockConsumer() + tr := newTransaction(context.Background(), ms, mcon, testLogger) + if got := tr.Rollback(); got != nil { + t.Errorf("expecting nil from Rollback() but got err %v", got) + } + }) + + badLabels := labels.Labels([]labels.Label{{Name: "foo", Value: "bar"}}) + t.Run("Add One No Target", func(t *testing.T) { + mcon := newMockConsumer() + tr := newTransaction(context.Background(), ms, mcon, testLogger) + if _, got := tr.Add(badLabels, time.Now().Unix()*1000, 1.0); got == nil { + t.Errorf("expecting error from Add() but got nil") + } + }) + + jobNotFoundLb := labels.Labels([]labels.Label{ + {Name: "instance", Value: "localhost:8080"}, + {Name: "job", Value: "test2"}, + {Name: "foo", Value: "bar"}}) + t.Run("Add One Job not found", func(t *testing.T) { + mcon := newMockConsumer() + tr := newTransaction(context.Background(), ms, mcon, testLogger) + if _, got := tr.Add(jobNotFoundLb, time.Now().Unix()*1000, 1.0); got == nil { + t.Errorf("expecting error from Add() but got nil") + } + }) + + goodLabels := labels.Labels([]labels.Label{{Name: "instance", Value: "localhost:8080"}, + {Name: "job", Value: "test"}, + {Name: "__name__", Value: "foo"}}) + t.Run("Add One Good", func(t *testing.T) { + mcon := newMockConsumer() + tr := newTransaction(context.Background(), ms, mcon, testLogger) + if _, got := tr.Add(goodLabels, time.Now().Unix()*1000, 1.0); got != nil { + t.Errorf("expecting error == nil from Add() but got: %v\n", got) + } + if got := tr.Commit(); got != nil { + t.Errorf("expecting nil from Commit() but got err %v", got) + } + + expected := createNode("test", "localhost:8080", "http") + md := <-mcon.Metrics + if !reflect.DeepEqual(md.Node, expected) { + t.Errorf("generated node %v and expected node %v is different\n", md.Node, expected) + } + + if len(md.Metrics) != 1 { + t.Errorf("expecting one metrics, but got %v\n", len(md.Metrics)) + } + }) + +} diff --git a/receiver/prometheusreceiver/metrics_receiver.go b/receiver/prometheusreceiver/metrics_receiver.go index 633c79b98e7..dee1285235c 100644 --- a/receiver/prometheusreceiver/metrics_receiver.go +++ b/receiver/prometheusreceiver/metrics_receiver.go @@ -1,4 +1,4 @@ -// Copyright 2019, OpenTelemetry Authors +// Copyright 2019, OpenCensus Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -18,18 +18,21 @@ import ( "context" "errors" "fmt" + "github.com/prometheus/prometheus/discovery" + "github.com/prometheus/prometheus/scrape" + "github.com/prometheus/prometheus/storage" + "go.uber.org/zap" "sync" "time" - "github.com/spf13/viper" - "gopkg.in/yaml.v2" - - agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1" "github.com/open-telemetry/opentelemetry-service/consumer" - "github.com/open-telemetry/opentelemetry-service/data" "github.com/open-telemetry/opentelemetry-service/receiver" - "github.com/orijtech/promreceiver" + "github.com/open-telemetry/opentelemetry-service/receiver/prometheusreceiver/internal" + "github.com/prometheus/prometheus/config" + sd_config "github.com/prometheus/prometheus/discovery/config" + "github.com/spf13/viper" + "gopkg.in/yaml.v2" ) // Configuration defines the behavior and targets of the Prometheus scrapers. @@ -42,9 +45,12 @@ type Configuration struct { // Preceiver is the type that provides Prometheus scraper/receiver functionality. type Preceiver struct { startOnce sync.Once - recv *promreceiver.Receiver + stopOnce sync.Once + ocaStore storage.Appender cfg *Configuration consumer consumer.MetricsConsumer + cancel context.CancelFunc + logger *zap.Logger } var _ receiver.MetricsReceiver = (*Preceiver)(nil) @@ -60,7 +66,7 @@ const ( ) // New creates a new prometheus.Receiver reference. -func New(v *viper.Viper, next consumer.MetricsConsumer) (*Preceiver, error) { +func New(logger *zap.Logger, v *viper.Viper, next consumer.MetricsConsumer) (*Preceiver, error) { var cfg Configuration // Unmarshal our config values (using viper's mapstructure) @@ -88,6 +94,7 @@ func New(v *viper.Viper, next consumer.MetricsConsumer) (*Preceiver, error) { pr := &Preceiver{ cfg: &cfg, consumer: next, + logger: logger, } return pr, nil } @@ -102,55 +109,62 @@ func (pr *Preceiver) MetricsSource() string { // StartMetricsReception is the method that starts Prometheus scraping and it // is controlled by having previously defined a Configuration using perhaps New. func (pr *Preceiver) StartMetricsReception(ctx context.Context, asyncErrorChan chan<- error) error { - var err = errAlreadyStarted pr.startOnce.Do(func() { - if pr.consumer == nil { - err = errNilMetricsReceiverSink + c, cancel := context.WithCancel(ctx) + pr.cancel = cancel + app := internal.NewOcaStore(c, pr.consumer, pr.logger.Sugar()) + // need to use a logger with the gokitLog interface + l := internal.NewZapToGokitLogAdapter(pr.logger) + scrapeManager := scrape.NewManager(l, app) + app.SetScrapeManager(scrapeManager) + discoveryManagerScrape := discovery.NewManager(ctx, l) + go func() { + if err := discoveryManagerScrape.Run(); err != nil { + asyncErrorChan <- err + } + }() + if err := scrapeManager.ApplyConfig(pr.cfg.ScrapeConfig); err != nil { + asyncErrorChan <- err return } - tms := &promMetricsReceiverToOpenCensusMetricsReceiver{nextConsumer: pr.consumer} - cfg := pr.cfg - pr.recv, err = promreceiver.ReceiverFromConfig( - context.Background(), - tms, - cfg.ScrapeConfig, - promreceiver.WithBufferPeriod(cfg.BufferPeriod), - promreceiver.WithBufferCount(cfg.BufferCount)) + // Run the scrape manager. + syncConfig := make(chan bool) + errsChan := make(chan error, 1) + go func() { + defer close(errsChan) + <-time.After(100 * time.Millisecond) + close(syncConfig) + if err := scrapeManager.Run(discoveryManagerScrape.SyncCh()); err != nil { + errsChan <- err + } + }() + <-syncConfig + // By this point we've given time to the scrape manager + // to start applying its original configuration. + + discoveryCfg := make(map[string]sd_config.ServiceDiscoveryConfig) + for _, scrapeConfig := range pr.cfg.ScrapeConfig.ScrapeConfigs { + discoveryCfg[scrapeConfig.JobName] = scrapeConfig.ServiceDiscoveryConfig + } + + // Now trigger the discovery notification to the scrape manager. + if err := discoveryManagerScrape.ApplyConfig(discoveryCfg); err != nil { + errsChan <- err + } }) - return err + return nil } // Flush triggers the Flush method on the underlying Prometheus scrapers and instructs // them to immediately sned over the metrics they've collected, to the MetricsConsumer. +// it's not needed on the new prometheus receiver implementation, let it do nothing func (pr *Preceiver) Flush() { - pr.recv.Flush() + } // StopMetricsReception stops and cancels the underlying Prometheus scrapers. func (pr *Preceiver) StopMetricsReception(ctx context.Context) error { - pr.Flush() - return pr.recv.Cancel() -} - -type promMetricsReceiverToOpenCensusMetricsReceiver struct { - nextConsumer consumer.MetricsConsumer -} - -var _ promreceiver.MetricsSink = (*promMetricsReceiverToOpenCensusMetricsReceiver)(nil) - -var errNilRequest = errors.New("expecting a non-nil request") - -// ReceiveMetrics is a converter that enables MetricsReceivers to act as MetricsSink. -func (pmrtomr *promMetricsReceiverToOpenCensusMetricsReceiver) ReceiveMetrics(ctx context.Context, ereq *agentmetricspb.ExportMetricsServiceRequest) error { - if ereq == nil { - return errNilRequest - } - - err := pmrtomr.nextConsumer.ConsumeMetricsData(ctx, data.MetricsData{ - Node: ereq.Node, - Resource: ereq.Resource, - Metrics: ereq.Metrics, - }) - return err + pr.stopOnce.Do(pr.cancel) + return nil } diff --git a/receiver/prometheusreceiver/metrics_receiver_test.go b/receiver/prometheusreceiver/metrics_receiver_test.go index 93c75c5f05a..cbdb844cf52 100644 --- a/receiver/prometheusreceiver/metrics_receiver_test.go +++ b/receiver/prometheusreceiver/metrics_receiver_test.go @@ -1,4 +1,4 @@ -// Copyright 2019, OpenTelemetry Authors +// Copyright 2019, OpenCensus Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -17,28 +17,30 @@ package prometheusreceiver import ( "context" "fmt" + "go.opencensus.io/metric/metricdata" + "go.opencensus.io/metric/metricproducer" + "go.uber.org/zap" "net" "net/http" "net/http/httptest" "net/url" + "reflect" + "sort" "testing" "time" - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" - "contrib.go.opencensus.io/exporter/prometheus" - "github.com/spf13/viper" - "go.opencensus.io/metric/metricdata" - "go.opencensus.io/metric/metricproducer" - commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" "github.com/golang/protobuf/ptypes/timestamp" + "github.com/open-telemetry/opentelemetry-service/data" "github.com/open-telemetry/opentelemetry-service/exporter/exportertest" "github.com/open-telemetry/opentelemetry-service/internal/config/viperutils" + "github.com/spf13/viper" ) +var logger, _ = zap.NewDevelopment() + type scrapeCounter struct { scrapeTrackCh chan bool shutdownCh <-chan bool @@ -48,7 +50,7 @@ type scrapeCounter struct { func (sc *scrapeCounter) ServeHTTP(rw http.ResponseWriter, req *http.Request) { select { case <-sc.shutdownCh: - http.Error(rw, "shuting down", http.StatusGone) + http.Error(rw, "shutting down", http.StatusGone) default: sc.scrapeTrackCh <- true @@ -59,19 +61,19 @@ func (sc *scrapeCounter) ServeHTTP(rw http.ResponseWriter, req *http.Request) { func TestNew(t *testing.T) { v := viper.New() - _, err := New(v, nil) + _, err := New(logger, v, nil) if err != errNilScrapeConfig { t.Fatalf("Expected errNilScrapeConfig but did not get it.") } v.Set("config", nil) - _, err = New(v, nil) + _, err = New(logger, v, nil) if err != errNilScrapeConfig { t.Fatalf("Expected errNilScrapeConfig but did not get it.") } v.Set("config.blah", "some_value") - _, err = New(v, nil) + _, err = New(logger, v, nil) if err != errNilScrapeConfig { t.Fatalf("Expected errNilScrapeConfig but did not get it.") } @@ -103,12 +105,9 @@ func TestEndToEnd(t *testing.T) { config: scrape_configs: - job_name: 'demo' - scrape_interval: %s - static_configs: - targets: ['%s'] - buffer_period: 500ms buffer_count: 2 `, scrapePeriod, cstURL.Host) @@ -121,7 +120,7 @@ buffer_count: 2 } cms := new(exportertest.SinkMetricsExporter) - precv, err := New(v, cms) + precv, err := New(logger, v, cms) if err != nil { t.Fatalf("Failed to create promreceiver: %v", err) } @@ -197,44 +196,98 @@ buffer_count: 2 // Pause for the next scrape precv.Flush() - close(shutdownCh) - gotMDs := cms.AllMetrics() - if len(gotMDs) == 0 { - t.Errorf("Want at least one Metric. Got zero.") - } + got := cms.AllMetrics() - // Now compare the received metrics data with what we expect. - wantNode := &commonpb.Node{ - Identifier: &commonpb.ProcessIdentifier{ - HostName: host, - }, - ServiceInfo: &commonpb.ServiceInfo{ - Name: "demo", - }, - Attributes: map[string]string{ - "scheme": "http", - "port": port, - }, - } + // Unfortunately we can't control the time that Prometheus produces from scraping, + // hence for equality, we manually have to retrieve the times recorded by Prometheus, + // but indexed by each unique MetricDescriptor.Name(). + retrievedTimestamps := indexTimestampsByMetricDescriptorName(got) - wantMetricPb1 := &metricspb.Metric{ - MetricDescriptor: &metricspb.MetricDescriptor{ - Name: "e2ereceiver_e2e_calls", - Description: "The number of calls", - Type: metricspb.MetricDescriptor_CUMULATIVE_INT64, - LabelKeys: []*metricspb.LabelKey{ - {Key: "method"}, + // Now compare the received metrics data with what we expect. + want1 := []data.MetricsData{ + { + Node: &commonpb.Node{ + Identifier: &commonpb.ProcessIdentifier{ + HostName: host, + }, + ServiceInfo: &commonpb.ServiceInfo{ + Name: "demo", + }, + Attributes: map[string]string{ + "scheme": "http", + "port": port, + }, }, - }, - Timeseries: []*metricspb.TimeSeries{ - { - LabelValues: []*metricspb.LabelValue{ - {Value: "a.b.c.run"}, + Metrics: []*metricspb.Metric{ + { + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "e2ereceiver_e2e_calls", + Description: "The number of calls", + Type: metricspb.MetricDescriptor_CUMULATIVE_DOUBLE, + LabelKeys: []*metricspb.LabelKey{ + {Key: "method"}, + }, + }, + Timeseries: []*metricspb.TimeSeries{ + { + StartTimestamp: retrievedTimestamps["e2ereceiver_e2e_calls"], + LabelValues: []*metricspb.LabelValue{ + {Value: "a.b.c.run", HasValue: true}, + }, + Points: []*metricspb.Point{ + { + Timestamp: retrievedTimestamps["e2ereceiver_e2e_calls"], + Value: &metricspb.Point_DoubleValue{ + DoubleValue: 1, + }, + }, + }, + }, + }, }, - Points: []*metricspb.Point{ - { - Value: &metricspb.Point_Int64Value{ - Int64Value: 1, + { + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "e2ereceiver_e2e_call_latency", + Description: "The latency in milliseconds per call", + Type: metricspb.MetricDescriptor_CUMULATIVE_DISTRIBUTION, + LabelKeys: []*metricspb.LabelKey{ + {Key: "method"}, + }, + }, + Timeseries: []*metricspb.TimeSeries{ + { + StartTimestamp: retrievedTimestamps["e2ereceiver_e2e_call_latency"], + LabelValues: []*metricspb.LabelValue{ + {Value: "a.b.c.run", HasValue: true}, + }, + Points: []*metricspb.Point{ + { + Timestamp: retrievedTimestamps["e2ereceiver_e2e_call_latency"], + Value: &metricspb.Point_DistributionValue{ + DistributionValue: &metricspb.DistributionValue{ + Count: 1, + Sum: 180, + BucketOptions: &metricspb.DistributionValue_BucketOptions{ + Type: &metricspb.DistributionValue_BucketOptions_Explicit_{ + Explicit: &metricspb.DistributionValue_BucketOptions_Explicit{ + Bounds: []float64{0, 100, 200, 500, 1000, 5000}, + }, + }, + }, + // size(bounds) + 1 (= N) buckets. The boundaries for bucket + Buckets: []*metricspb.DistributionValue_Bucket{ + {}, + {Count: 1}, + {}, + {}, + {}, + {}, + {}, + }, + }, + }, + }, + }, }, }, }, @@ -242,42 +295,89 @@ buffer_count: 2 }, } - wantMetricPb2 := &metricspb.Metric{ - MetricDescriptor: &metricspb.MetricDescriptor{ - Name: "e2ereceiver_e2e_call_latency", - Description: "The latency in milliseconds per call", - Type: metricspb.MetricDescriptor_CUMULATIVE_DISTRIBUTION, - LabelKeys: []*metricspb.LabelKey{ - {Key: "method"}, + want2 := []data.MetricsData{ + { + Node: &commonpb.Node{ + Identifier: &commonpb.ProcessIdentifier{ + HostName: host, + }, + ServiceInfo: &commonpb.ServiceInfo{ + Name: "demo", + }, + Attributes: map[string]string{ + "scheme": "http", + "port": port, + }, }, - }, - Timeseries: []*metricspb.TimeSeries{ - { - LabelValues: []*metricspb.LabelValue{ - {Value: "a.b.c.run"}, + Metrics: []*metricspb.Metric{ + { + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "e2ereceiver_e2e_calls", + Description: "The number of calls", + Type: metricspb.MetricDescriptor_CUMULATIVE_DOUBLE, + LabelKeys: []*metricspb.LabelKey{ + {Key: "method"}, + }, + }, + Timeseries: []*metricspb.TimeSeries{ + { + StartTimestamp: retrievedTimestamps["e2ereceiver_e2e_calls"], + LabelValues: []*metricspb.LabelValue{ + {Value: "a.b.c.run", HasValue: true}, + }, + Points: []*metricspb.Point{ + { + Timestamp: retrievedTimestamps["e2ereceiver_e2e_calls"], + Value: &metricspb.Point_DoubleValue{ + DoubleValue: 1, + }, + }, + }, + }, + }, }, - Points: []*metricspb.Point{ - { - Value: &metricspb.Point_DistributionValue{ - DistributionValue: &metricspb.DistributionValue{ - Count: 1, - Sum: 180, - SumOfSquaredDeviation: 1, - BucketOptions: &metricspb.DistributionValue_BucketOptions{ - Type: &metricspb.DistributionValue_BucketOptions_Explicit_{ - Explicit: &metricspb.DistributionValue_BucketOptions_Explicit{ - Bounds: []float64{0, 100, 200, 500, 1000, 5000}, + { + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "e2ereceiver_e2e_call_latency", + Description: "The latency in milliseconds per call", + Type: metricspb.MetricDescriptor_CUMULATIVE_DISTRIBUTION, + LabelKeys: []*metricspb.LabelKey{ + {Key: "method"}, + }, + }, + Timeseries: []*metricspb.TimeSeries{ + { + StartTimestamp: retrievedTimestamps["e2ereceiver_e2e_call_latency"], + LabelValues: []*metricspb.LabelValue{ + {Value: "a.b.c.run", HasValue: true}, + }, + Points: []*metricspb.Point{ + { + Timestamp: retrievedTimestamps["e2ereceiver_e2e_call_latency"], + Value: &metricspb.Point_DistributionValue{ + DistributionValue: &metricspb.DistributionValue{ + Count: 1, + Sum: 180, + BucketOptions: &metricspb.DistributionValue_BucketOptions{ + Type: &metricspb.DistributionValue_BucketOptions_Explicit_{ + Explicit: &metricspb.DistributionValue_BucketOptions_Explicit{ + Bounds: []float64{0, 100, 200, 500, 1000, 5000}, + }, + }, + }, + // size(bounds) + 1 (= N) buckets. The boundaries for bucket + Buckets: []*metricspb.DistributionValue_Bucket{ + {}, + {Count: 1}, + {}, + {}, + {}, + {}, + {}, + }, }, }, }, - Buckets: []*metricspb.DistributionValue_Bucket{ - {}, - {Count: 1}, - {}, - {}, - {}, - {}, - }, }, }, }, @@ -286,22 +386,61 @@ buffer_count: 2 }, } - for _, md := range gotMDs { - node := md.Node - if diff := cmpNodePb(node, wantNode); diff != "" { - t.Errorf("Mismatch Node\n-Got +Want:\n%s", diff) - } - metricPbs := md.Metrics - if len(metricPbs) != 1 { - t.Errorf("Want 1 metric, got %d", len(metricPbs)) + // Firstly sort them so that comparisons return stable results. + byMetricsSorter(t, got) + byMetricsSorter(t, want1) + byMetricsSorter(t, want2) + + if reflect.DeepEqual(got, want1) { + fmt.Println("done") + return + } + + // Since these tests rely on underdeterministic behavior and timing that's imprecise. + // The best that we can do is provide any of variants of what we want. + wantPermutations := [][]data.MetricsData{ + want1, want2, + } + + for _, want := range wantPermutations { + if !reflect.DeepEqual(got, want) { + t.Errorf("different metric got:\n%v\nwant:\n%v\n", string(exportertest.ToJSON(got)), + string(exportertest.ToJSON(want))) } - metricPb := metricPbs[0] - diff1 := cmpMetricPb(metricPb, wantMetricPb1) - diff2 := cmpMetricPb(metricPb, wantMetricPb2) - if diff1 != "" && diff2 != "" { - t.Errorf("Metric doesn't match with either of wanted metrics\n-Got +Want:\n%s\n-Got +Want:\n%s", diff1, diff2) + } +} + +func byMetricsSorter(t *testing.T, mds []data.MetricsData) { + for i, md := range mds { + eMetrics := md.Metrics + sort.Slice(eMetrics, func(i, j int) bool { + mdi, mdj := eMetrics[i], eMetrics[j] + return mdi.GetMetricDescriptor().GetName() < mdj.GetMetricDescriptor().GetName() + }) + md.Metrics = eMetrics + mds[i] = md + } + + // Then sort by requests. + sort.Slice(mds, func(i, j int) bool { + mdi, mdj := mds[i], mds[j] + return string(exportertest.ToJSON(mdi)) < string(exportertest.ToJSON(mdj)) + }) +} + +func indexTimestampsByMetricDescriptorName(mds []data.MetricsData) map[string]*timestamp.Timestamp { + index := make(map[string]*timestamp.Timestamp) + for _, md := range mds { + for _, eimetric := range md.Metrics { + for _, eiTimeSeries := range eimetric.Timeseries { + if ts := eiTimeSeries.GetStartTimestamp(); ts != nil { + index[eimetric.GetMetricDescriptor().GetName()] = ts + break + } + } } } + return index } type fakeProducer struct { @@ -311,23 +450,3 @@ type fakeProducer struct { func (producer *fakeProducer) Read() []*metricdata.Metric { return producer.metrics } - -func cmpNodePb(got, want *commonpb.Node) string { - // Ignore all "XXX_sizecache" fields. - return cmp.Diff( - got, - want, - cmpopts.IgnoreFields(commonpb.Node{}, "XXX_sizecache"), - cmpopts.IgnoreFields(commonpb.ProcessIdentifier{}, "XXX_sizecache"), - cmpopts.IgnoreFields(commonpb.ServiceInfo{}, "XXX_sizecache")) -} - -func cmpMetricPb(got, want *metricspb.Metric) string { - // Start and end time are non-deteministic. Ignore them when do the comparison. - return cmp.Diff( - got, - want, - cmpopts.IgnoreTypes(×tamp.Timestamp{}), - cmpopts.IgnoreFields(metricspb.MetricDescriptor{}, "XXX_sizecache"), - cmpopts.IgnoreFields(metricspb.LabelKey{}, "XXX_sizecache")) -} diff --git a/receiver/prometheusreceiver/scrapeloop-flowchart.png b/receiver/prometheusreceiver/scrapeloop-flowchart.png new file mode 100644 index 00000000000..5853a9df927 Binary files /dev/null and b/receiver/prometheusreceiver/scrapeloop-flowchart.png differ