Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement automatic exposition format detection #14445

Merged
merged 10 commits into from
Apr 25, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from math import isinf, isnan
from typing import List # noqa: F401

from prometheus_client.openmetrics.parser import text_fd_to_metric_families as parse_metric_families_strict
from prometheus_client.parser import text_fd_to_metric_families as parse_metric_families
from prometheus_client.openmetrics.parser import text_fd_to_metric_families as parse_openmetrics
from prometheus_client.parser import text_fd_to_metric_families as parse_prometheus

from ....config import is_affirmative
from ....constants import ServiceCheck
Expand Down Expand Up @@ -217,14 +217,17 @@ def __init__(self, check, config):

self.http = RequestsWrapper(config, self.check.init_config, self.check.HTTP_CONFIG_REMAPPER, self.check.log)

# Decide how strictly we will adhere to the latest version of the specification
if is_affirmative(config.get('use_latest_spec', False)):
self.parse_metric_families = parse_metric_families_strict
# https://github.com/prometheus/client_python/blob/v0.9.0/prometheus_client/openmetrics/exposition.py#L7
accept_header = 'application/openmetrics-text; version=0.0.1; charset=utf-8'
self._content_type = ''
self._use_latest_spec = is_affirmative(config.get('use_latest_spec', False))
# Accept headers are taken from:
# https://github.com/prometheus/prometheus/blob/v2.43.0/scrape/scrape.go#L787
if self._use_latest_spec:
accept_header = 'application/openmetrics-text;version=1.0.0,application/openmetrics-text;version=0.0.1'
else:
self.parse_metric_families = parse_metric_families
accept_header = 'text/plain'
accept_header = (
'application/openmetrics-text;version=1.0.0,application/openmetrics-text;version=0.0.1;q=0.75,'
'text/plain;version=0.0.4;q=0.5,*/*;q=0.1'
)

# Request the appropriate exposition format
if self.http.options['headers'].get('Accept') == '*/*':
Expand Down Expand Up @@ -279,6 +282,11 @@ def parse_metrics(self):
if self.raw_line_filter is not None:
line_streamer = self.filter_connection_lines(line_streamer)

# Since we determine `self.parse_metric_families` dynamically from the response and that's done as a
# side effect inside the `line_streamer` generator, we need to consume the first line in order to
# trigger that side effect.
line_streamer = chain([next(line_streamer)], line_streamer)

for metric in self.parse_metric_families(line_streamer):
self.submit_telemetry_number_of_total_metric_samples(metric)

Expand All @@ -289,6 +297,19 @@ def parse_metrics(self):

yield metric

@property
def parse_metric_families(self):
media_type = self._content_type.split(';')[0]
# Setting `use_latest_spec` forces the use of the OpenMetrics format, otherwise
# the format will be chosen based on the media type specified in the response's content-header.
# The selection is based on what Prometheus does:
# https://github.com/prometheus/prometheus/blob/v2.43.0/model/textparse/interface.go#L83-L90
return (
parse_openmetrics
if self._use_latest_spec or media_type == 'application/openmetrics-text'
else parse_prometheus
)

def generate_sample_data(self, metric):
"""
Yield a sample of processed data.
Expand Down Expand Up @@ -341,6 +362,8 @@ def stream_connection_lines(self):
"""

with self.get_connection() as connection:
# Media type will be used to select parser dynamically
self._content_type = connection.headers.get('Content-Type', '')
for line in connection.iter_lines(decode_unicode=True):
yield line

Expand Down Expand Up @@ -380,6 +403,7 @@ def get_connection(self):
response.encoding = 'utf-8'

self.submit_telemetry_endpoint_response_size(response)

return response

def send_request(self, **kwargs):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,10 +421,15 @@ def test_strict_latest_spec(self, dd_run_check):
check = get_check({'use_latest_spec': True})
check.configure_scrapers()
scraper = check.scrapers['test']
assert scraper.http.options['headers']['Accept'] == 'application/openmetrics-text; version=0.0.1; charset=utf-8'
assert scraper.http.options['headers']['Accept'] == (
'application/openmetrics-text;version=1.0.0,application/openmetrics-text;version=0.0.1'
)

def test_plain_text_spec(self, dd_run_check):
def test_dynamic_spec(self, dd_run_check):
check = get_check({'use_latest_spec': False})
check.configure_scrapers()
scraper = check.scrapers['test']
assert scraper.http.options['headers']['Accept'] == 'text/plain'
assert scraper.http.options['headers']['Accept'] == (
'application/openmetrics-text;version=1.0.0,application/openmetrics-text;version=0.0.1;q=0.75,'
'text/plain;version=0.0.4;q=0.5,*/*;q=0.1'
)
45 changes: 30 additions & 15 deletions openmetrics/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
import os

import pytest
from prometheus_client import CollectorRegistry, Counter, Gauge, generate_latest
from prometheus_client.openmetrics.exposition import generate_latest as generate_latest_strict
from prometheus_client import CollectorRegistry, Counter, Gauge
from prometheus_client import generate_latest as generate_prometheus
from prometheus_client.exposition import CONTENT_TYPE_LATEST as PROMETHEUS_CONTENT_TYPE
from prometheus_client.openmetrics.exposition import CONTENT_TYPE_LATEST as OPENMETRICS_CONTENT_TYPE
from prometheus_client.openmetrics.exposition import generate_latest as generate_openmetrics

from datadog_checks.base import ensure_unicode
from datadog_checks.dev import docker_run
Expand All @@ -24,7 +27,7 @@ def dd_environment():


@pytest.fixture
def poll_mock(mock_http_response):
def example_metrics_registry():
registry = CollectorRegistry()
g1 = Gauge('metric1', 'processor usage', ['matched_label', 'node', 'flavor'], registry=registry)
g1.labels(matched_label="foobar", node="host1", flavor="test").set(99.9)
Expand All @@ -36,20 +39,32 @@ def poll_mock(mock_http_response):
c2.labels(node="host2").inc(42)
g3 = Gauge('metric3', 'memory usage', ['matched_label', 'node', 'timestamp'], registry=registry)
g3.labels(matched_label="foobar", node="host2", timestamp="456").set(float('inf'))
return registry

mock_http_response(ensure_unicode(generate_latest(registry)), normalize_content=False)

@pytest.fixture
def prometheus_payload(example_metrics_registry):
return ensure_unicode(generate_prometheus(example_metrics_registry))


@pytest.fixture
def strict_poll_mock(mock_http_response):
registry = CollectorRegistry()
g1 = Gauge('metric1', 'processor usage', ['matched_label', 'node', 'flavor'], registry=registry)
g1.labels(matched_label="foobar", node="host1", flavor="test").set(99.9)
g2 = Gauge('metric2', 'memory usage', ['matched_label', 'node', 'timestamp'], registry=registry)
g2.labels(matched_label="foobar", node="host2", timestamp="123").set(12.2)
c1 = Counter('counter1', 'hits', ['node'], registry=registry)
c1.labels(node="host2").inc(42)
g3 = Gauge('metric3', 'memory usage', ['matched_label', 'node', 'timestamp'], registry=registry)
g3.labels(matched_label="foobar", node="host2", timestamp="456").set(float('inf'))
def openmetrics_payload(example_metrics_registry):
return ensure_unicode(generate_openmetrics(example_metrics_registry))


mock_http_response(ensure_unicode(generate_latest_strict(registry)), normalize_content=False)
@pytest.fixture
def prometheus_poll_mock(mock_http_response, prometheus_payload):
mock_http_response(
prometheus_payload,
normalize_content=False,
headers={'Content-Type': PROMETHEUS_CONTENT_TYPE},
)


@pytest.fixture
def openmetrics_poll_mock(mock_http_response, openmetrics_payload):
mock_http_response(
openmetrics_payload,
normalize_content=False,
headers={'Content-Type': OPENMETRICS_CONTENT_TYPE},
)
123 changes: 46 additions & 77 deletions openmetrics/tests/test_openmetrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,103 +9,74 @@

from .common import CHECK_NAME

pytestmark = pytest.mark.usefixtures("poll_mock")
pytestmark = [
pytest.mark.skipif(PY2, reason='Test only available on Python 3'),
]

instance = {
'prometheus_url': 'http://localhost:10249/metrics',
'namespace': 'openmetrics',
'metrics': [{'metric1': 'renamed.metric1'}, 'metric2', 'counter1_total'],
'send_histograms_buckets': True,
'send_monotonic_counter': True,
}
instance_new = {
'openmetrics_endpoint': 'http://localhost:10249/metrics',
'namespace': 'openmetrics',
'metrics': [{'metric1': 'renamed.metric1'}, 'metric2', 'counter1', 'counter2'],
'collect_histogram_buckets': True,
}

instance_new_strict = {
'openmetrics_endpoint': 'http://localhost:10249/metrics',
'namespace': 'openmetrics',
'metrics': [{'metric1': 'renamed.metric1'}, 'metric2', 'counter1'],
'collect_histogram_buckets': True,
'use_latest_spec': True,
}

def test_openmetrics_check(dd_run_check, aggregator):
c = OpenMetricsCheck('openmetrics', {}, [instance])
dd_run_check(c)
aggregator.assert_metric(
CHECK_NAME + '.renamed.metric1',
tags=['node:host1', 'flavor:test', 'matched_label:foobar'],
metric_type=aggregator.GAUGE,
)
aggregator.assert_metric(
CHECK_NAME + '.metric2',
tags=['timestamp:123', 'node:host2', 'matched_label:foobar'],
metric_type=aggregator.GAUGE,
)
aggregator.assert_metric(
CHECK_NAME + '.counter1_total', tags=['node:host2'], metric_type=aggregator.MONOTONIC_COUNT
)
aggregator.assert_all_metrics_covered()

@pytest.mark.parametrize('poll_mock_fixture', ['prometheus_poll_mock', 'openmetrics_poll_mock'])
def test_openmetrics(aggregator, dd_run_check, request, poll_mock_fixture):
from datadog_checks.base.checks.openmetrics.v2.scraper import OpenMetricsScraper

request.getfixturevalue(poll_mock_fixture)

check = OpenMetricsCheck('openmetrics', {}, [instance_new])
scraper = OpenMetricsScraper(check, instance_new)
dd_run_check(check)

def test_openmetrics_check_counter_gauge(dd_run_check, aggregator):
instance['send_monotonic_counter'] = False
c = OpenMetricsCheck('openmetrics', {}, [instance])
dd_run_check(c)
aggregator.assert_metric(
CHECK_NAME + '.renamed.metric1',
tags=['node:host1', 'flavor:test', 'matched_label:foobar'],
'{}.renamed.metric1'.format(CHECK_NAME),
tags=['endpoint:http://localhost:10249/metrics', 'node:host1', 'flavor:test', 'matched_label:foobar'],
metric_type=aggregator.GAUGE,
)
aggregator.assert_metric(
CHECK_NAME + '.metric2',
tags=['timestamp:123', 'node:host2', 'matched_label:foobar'],
'{}.metric2'.format(CHECK_NAME),
tags=['endpoint:http://localhost:10249/metrics', 'timestamp:123', 'node:host2', 'matched_label:foobar'],
metric_type=aggregator.GAUGE,
)
aggregator.assert_metric(CHECK_NAME + '.counter1_total', tags=['node:host2'], metric_type=aggregator.GAUGE)
aggregator.assert_all_metrics_covered()


def test_invalid_metric(dd_run_check, aggregator):
"""
Testing that invalid values of metrics are discarded
"""
bad_metric_instance = {
'prometheus_url': 'http://localhost:10249/metrics',
'namespace': 'openmetrics',
'metrics': [{'metric1': 'renamed.metric1'}, 'metric2', 'metric3'],
'send_histograms_buckets': True,
}
c = OpenMetricsCheck('openmetrics', {}, [bad_metric_instance])
dd_run_check(c)
assert aggregator.metrics('metric3') == []


def test_openmetrics_wildcard(dd_run_check, aggregator):
instance_wildcard = {
'prometheus_url': 'http://localhost:10249/metrics',
'namespace': 'openmetrics',
'metrics': ['metric*'],
}

c = OpenMetricsCheck('openmetrics', {}, [instance_wildcard])
dd_run_check(c)
aggregator.assert_metric(
CHECK_NAME + '.metric1',
tags=['node:host1', 'flavor:test', 'matched_label:foobar'],
metric_type=aggregator.GAUGE,
'{}.counter1.count'.format(CHECK_NAME),
tags=['endpoint:http://localhost:10249/metrics', 'node:host2'],
metric_type=aggregator.MONOTONIC_COUNT,
)
aggregator.assert_metric(
CHECK_NAME + '.metric2',
tags=['timestamp:123', 'node:host2', 'matched_label:foobar'],
metric_type=aggregator.GAUGE,
'{}.counter2.count'.format(CHECK_NAME),
tags=['endpoint:http://localhost:10249/metrics', 'node:host2'],
metric_type=aggregator.MONOTONIC_COUNT,
)
aggregator.assert_all_metrics_covered()

assert check.http.options['headers']['Accept'] == '*/*'
assert scraper.http.options['headers']['Accept'] == (
'application/openmetrics-text;version=1.0.0,application/openmetrics-text;version=0.0.1;q=0.75,'
'text/plain;version=0.0.4;q=0.5,*/*;q=0.1'
)


@pytest.mark.skipif(PY2, reason='Test only available on Python 3')
def test_linkerd_v2_new(aggregator, dd_run_check):
def test_openmetrics_use_latest_spec(aggregator, dd_run_check, mock_http_response, openmetrics_payload, caplog):
from datadog_checks.base.checks.openmetrics.v2.scraper import OpenMetricsScraper

check = OpenMetricsCheck('openmetrics', {}, [instance_new])
scraper = OpenMetricsScraper(check, instance_new)
# We want to make sure that when `use_latest_spec` is enabled, we use the OpenMetrics parser
# even when the response's `Content-Type` doesn't declare the appropriate media type.
mock_http_response(openmetrics_payload, normalize_content=False)

check = OpenMetricsCheck('openmetrics', {}, [instance_new_strict])
scraper = OpenMetricsScraper(check, instance_new_strict)
dd_run_check(check)

aggregator.assert_metric(
Expand All @@ -123,12 +94,10 @@ def test_linkerd_v2_new(aggregator, dd_run_check):
tags=['endpoint:http://localhost:10249/metrics', 'node:host2'],
metric_type=aggregator.MONOTONIC_COUNT,
)
aggregator.assert_metric(
'{}.counter2.count'.format(CHECK_NAME),
tags=['endpoint:http://localhost:10249/metrics', 'node:host2'],
metric_type=aggregator.MONOTONIC_COUNT,
)
aggregator.assert_all_metrics_covered()

assert check.http.options['headers']['Accept'] == '*/*'
assert scraper.http.options['headers']['Accept'] == 'text/plain'
assert caplog.text == ''
assert scraper.http.options['headers']['Accept'] == (
'application/openmetrics-text;version=1.0.0,application/openmetrics-text;version=0.0.1'
)
Loading