diff --git a/CHANGELOG.md b/CHANGELOG.md index 9d98c2c5995..9a45203253e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- Implement LowMemory temporality + ([#3223](https://github.com/open-telemetry/opentelemetry-python/pull/3223)) - PeriodicExportingMetricReader will continue if collection times out ([#3100](https://github.com/open-telemetry/opentelemetry-python/pull/3100)) - Fix formatting of ConsoleMetricExporter. diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py index 8abb381cce8..d5ba83c9397 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py @@ -11,7 +11,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import dataclasses +from dataclasses import replace from logging import getLogger from os import environ from typing import Dict, Iterable, List, Optional, Sequence @@ -120,15 +120,17 @@ def __init__( ) instrument_class_temporality = {} - if ( + + otel_exporter_otlp_metrics_temporality_preference = ( environ.get( OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE, "CUMULATIVE", ) .upper() .strip() - == "DELTA" - ): + ) + + if otel_exporter_otlp_metrics_temporality_preference == "DELTA": instrument_class_temporality = { Counter: AggregationTemporality.DELTA, UpDownCounter: AggregationTemporality.CUMULATIVE, @@ -137,7 +139,27 @@ def __init__( ObservableUpDownCounter: AggregationTemporality.CUMULATIVE, ObservableGauge: AggregationTemporality.CUMULATIVE, } + + elif otel_exporter_otlp_metrics_temporality_preference == "LOWMEMORY": + instrument_class_temporality = { + Counter: AggregationTemporality.DELTA, + UpDownCounter: AggregationTemporality.CUMULATIVE, + Histogram: AggregationTemporality.DELTA, + ObservableCounter: AggregationTemporality.CUMULATIVE, + ObservableUpDownCounter: AggregationTemporality.CUMULATIVE, + ObservableGauge: AggregationTemporality.CUMULATIVE, + } + else: + if otel_exporter_otlp_metrics_temporality_preference != ( + "CUMULATIVE" + ): + _logger.warning( + "Unrecognized OTEL_EXPORTER_METRICS_TEMPORALITY_PREFERENCE" + " value found: " + f"{otel_exporter_otlp_metrics_temporality_preference}, " + "using CUMULATIVE" + ) instrument_class_temporality = { Counter: AggregationTemporality.CUMULATIVE, UpDownCounter: AggregationTemporality.CUMULATIVE, @@ -146,6 +168,7 @@ def __init__( ObservableUpDownCounter: AggregationTemporality.CUMULATIVE, ObservableGauge: AggregationTemporality.CUMULATIVE, } + instrument_class_temporality.update(preferred_temporality or {}) MetricExporter.__init__( @@ -359,7 +382,7 @@ def _split_metrics_data( for resource_metrics in metrics_data.resource_metrics: split_scope_metrics: List[ScopeMetrics] = [] split_resource_metrics.append( - dataclasses.replace( + replace( resource_metrics, scope_metrics=split_scope_metrics, ) @@ -367,7 +390,7 @@ def _split_metrics_data( for scope_metrics in resource_metrics.scope_metrics: split_metrics: List[Metric] = [] split_scope_metrics.append( - dataclasses.replace( + replace( scope_metrics, metrics=split_metrics, ) @@ -375,9 +398,9 @@ def _split_metrics_data( for metric in scope_metrics.metrics: split_data_points: List[DataPointT] = [] split_metrics.append( - dataclasses.replace( + replace( metric, - data=dataclasses.replace( + data=replace( metric.data, data_points=split_data_points, ), @@ -396,22 +419,22 @@ def _split_metrics_data( batch_size = 0 split_data_points = [] split_metrics = [ - dataclasses.replace( + replace( metric, - data=dataclasses.replace( + data=replace( metric.data, data_points=split_data_points, ), ) ] split_scope_metrics = [ - dataclasses.replace( + replace( scope_metrics, metrics=split_metrics, ) ] split_resource_metrics = [ - dataclasses.replace( + replace( resource_metrics, scope_metrics=split_scope_metrics, ) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py index 64ab205ad0e..308774c3c34 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py @@ -18,6 +18,7 @@ # pylint: disable=too-many-lines from logging import WARNING +from os import environ from os.path import dirname from typing import List from unittest import TestCase @@ -1532,6 +1533,111 @@ def test_shutdown_wait_last_export(self): finally: export_thread.join() + def test_aggregation_temporality(self): + + otlp_metric_exporter = OTLPMetricExporter() + + for ( + temporality + ) in otlp_metric_exporter._preferred_temporality.values(): + self.assertEqual(temporality, AggregationTemporality.CUMULATIVE) + + with patch.dict( + environ, + {OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE: "CUMULATIVE"}, + ): + + otlp_metric_exporter = OTLPMetricExporter() + + for ( + temporality + ) in otlp_metric_exporter._preferred_temporality.values(): + self.assertEqual( + temporality, AggregationTemporality.CUMULATIVE + ) + + with patch.dict( + environ, {OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE: "ABC"} + ): + + with self.assertLogs(level=WARNING): + otlp_metric_exporter = OTLPMetricExporter() + + for ( + temporality + ) in otlp_metric_exporter._preferred_temporality.values(): + self.assertEqual( + temporality, AggregationTemporality.CUMULATIVE + ) + + with patch.dict( + environ, + {OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE: "DELTA"}, + ): + + otlp_metric_exporter = OTLPMetricExporter() + + self.assertEqual( + otlp_metric_exporter._preferred_temporality[Counter], + AggregationTemporality.DELTA, + ) + self.assertEqual( + otlp_metric_exporter._preferred_temporality[UpDownCounter], + AggregationTemporality.CUMULATIVE, + ) + self.assertEqual( + otlp_metric_exporter._preferred_temporality[Histogram], + AggregationTemporality.DELTA, + ) + self.assertEqual( + otlp_metric_exporter._preferred_temporality[ObservableCounter], + AggregationTemporality.DELTA, + ) + self.assertEqual( + otlp_metric_exporter._preferred_temporality[ + ObservableUpDownCounter + ], + AggregationTemporality.CUMULATIVE, + ) + self.assertEqual( + otlp_metric_exporter._preferred_temporality[ObservableGauge], + AggregationTemporality.CUMULATIVE, + ) + + with patch.dict( + environ, + {OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE: "LOWMEMORY"}, + ): + + otlp_metric_exporter = OTLPMetricExporter() + + self.assertEqual( + otlp_metric_exporter._preferred_temporality[Counter], + AggregationTemporality.DELTA, + ) + self.assertEqual( + otlp_metric_exporter._preferred_temporality[UpDownCounter], + AggregationTemporality.CUMULATIVE, + ) + self.assertEqual( + otlp_metric_exporter._preferred_temporality[Histogram], + AggregationTemporality.DELTA, + ) + self.assertEqual( + otlp_metric_exporter._preferred_temporality[ObservableCounter], + AggregationTemporality.CUMULATIVE, + ) + self.assertEqual( + otlp_metric_exporter._preferred_temporality[ + ObservableUpDownCounter + ], + AggregationTemporality.CUMULATIVE, + ) + self.assertEqual( + otlp_metric_exporter._preferred_temporality[ObservableGauge], + AggregationTemporality.CUMULATIVE, + ) + def _resource_metrics( index: int, scope_metrics: List[ScopeMetrics] diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py index c04f5bbbcee..ffd5102a2d9 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py @@ -138,15 +138,17 @@ def __init__( ) instrument_class_temporality = {} - if ( + + otel_exporter_otlp_metrics_temporality_preference = ( environ.get( OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE, "CUMULATIVE", ) .upper() .strip() - == "DELTA" - ): + ) + + if otel_exporter_otlp_metrics_temporality_preference == "DELTA": instrument_class_temporality = { Counter: AggregationTemporality.DELTA, UpDownCounter: AggregationTemporality.CUMULATIVE, @@ -155,7 +157,27 @@ def __init__( ObservableUpDownCounter: AggregationTemporality.CUMULATIVE, ObservableGauge: AggregationTemporality.CUMULATIVE, } + + elif otel_exporter_otlp_metrics_temporality_preference == "LOWMEMORY": + instrument_class_temporality = { + Counter: AggregationTemporality.DELTA, + UpDownCounter: AggregationTemporality.CUMULATIVE, + Histogram: AggregationTemporality.DELTA, + ObservableCounter: AggregationTemporality.CUMULATIVE, + ObservableUpDownCounter: AggregationTemporality.CUMULATIVE, + ObservableGauge: AggregationTemporality.CUMULATIVE, + } + else: + if otel_exporter_otlp_metrics_temporality_preference != ( + "CUMULATIVE" + ): + _logger.warning( + "Unrecognized OTEL_EXPORTER_METRICS_TEMPORALITY_PREFERENCE" + " value found: " + f"{otel_exporter_otlp_metrics_temporality_preference}, " + "using CUMULATIVE" + ) instrument_class_temporality = { Counter: AggregationTemporality.CUMULATIVE, UpDownCounter: AggregationTemporality.CUMULATIVE, @@ -164,6 +186,7 @@ def __init__( ObservableUpDownCounter: AggregationTemporality.CUMULATIVE, ObservableGauge: AggregationTemporality.CUMULATIVE, } + instrument_class_temporality.update(preferred_temporality or {}) MetricExporter.__init__( diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py index 45bc0c6faaf..9f57a23ae2a 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py @@ -12,11 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -import unittest +from logging import WARNING +from os import environ +from unittest import TestCase from unittest.mock import patch -import requests -import responses +from requests import Session +from requests.models import Response +from responses import POST, activate, add from opentelemetry.exporter.otlp.proto.http import Compression from opentelemetry.exporter.otlp.proto.http.metric_exporter import ( @@ -36,10 +39,20 @@ OTEL_EXPORTER_OTLP_METRICS_COMPRESSION, OTEL_EXPORTER_OTLP_METRICS_ENDPOINT, OTEL_EXPORTER_OTLP_METRICS_HEADERS, + OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE, OTEL_EXPORTER_OTLP_METRICS_TIMEOUT, OTEL_EXPORTER_OTLP_TIMEOUT, ) +from opentelemetry.sdk.metrics import ( + Counter, + Histogram, + ObservableCounter, + ObservableGauge, + ObservableUpDownCounter, + UpDownCounter, +) from opentelemetry.sdk.metrics.export import ( + AggregationTemporality, MetricExportResult, MetricsData, ResourceMetrics, @@ -58,7 +71,7 @@ # pylint: disable=protected-access -class TestOTLPMetricExporter(unittest.TestCase): +class TestOTLPMetricExporter(TestCase): def setUp(self): self.metrics = { @@ -97,7 +110,7 @@ def test_constructor_default(self): self.assertEqual(exporter._timeout, DEFAULT_TIMEOUT) self.assertIs(exporter._compression, DEFAULT_COMPRESSION) self.assertEqual(exporter._headers, {}) - self.assertIsInstance(exporter._session, requests.Session) + self.assertIsInstance(exporter._session, Session) @patch.dict( "os.environ", @@ -129,7 +142,7 @@ def test_exporter_metrics_env_take_priority(self): "metricenv3": "==val3==", }, ) - self.assertIsInstance(exporter._session, requests.Session) + self.assertIsInstance(exporter._session, Session) @patch.dict( "os.environ", @@ -149,7 +162,7 @@ def test_exporter_constructor_take_priority(self): headers={"testHeader1": "value1", "testHeader2": "value2"}, timeout=20, compression=Compression.NoCompression, - session=requests.Session(), + session=Session(), ) self.assertEqual(exporter._endpoint, "example.com/1234") @@ -160,7 +173,7 @@ def test_exporter_constructor_take_priority(self): exporter._headers, {"testHeader1": "value1", "testHeader2": "value2"}, ) - self.assertIsInstance(exporter._session, requests.Session) + self.assertIsInstance(exporter._session, Session) @patch.dict( "os.environ", @@ -228,9 +241,9 @@ def test_headers_parse_from_env(self): ), ) - @patch.object(requests.Session, "post") + @patch.object(Session, "post") def test_success(self, mock_post): - resp = requests.models.Response() + resp = Response() resp.status_code = 200 mock_post.return_value = resp @@ -241,9 +254,9 @@ def test_success(self, mock_post): MetricExportResult.SUCCESS, ) - @patch.object(requests.Session, "post") + @patch.object(Session, "post") def test_failure(self, mock_post): - resp = requests.models.Response() + resp = Response() resp.status_code = 401 mock_post.return_value = resp @@ -254,10 +267,10 @@ def test_failure(self, mock_post): MetricExportResult.FAILURE, ) - @patch.object(requests.Session, "post") + @patch.object(Session, "post") def test_serialization(self, mock_post): - resp = requests.models.Response() + resp = Response() resp.status_code = 200 mock_post.return_value = resp @@ -276,7 +289,7 @@ def test_serialization(self, mock_post): timeout=exporter._timeout, ) - @responses.activate + @activate @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.backoff") @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.sleep") def test_handles_backoff_v2_api(self, mock_sleep, mock_backoff): @@ -289,8 +302,8 @@ def generate_delays(*args, **kwargs): mock_backoff.expo.configure_mock(**{"side_effect": generate_delays}) # return a retryable error - responses.add( - responses.POST, + add( + POST, "http://metrics.example.com/export", json={"error": "something exploded"}, status=500, @@ -303,3 +316,108 @@ def generate_delays(*args, **kwargs): exporter.export(metrics_data) mock_sleep.assert_called_once_with(1) + + def test_aggregation_temporality(self): + + otlp_metric_exporter = OTLPMetricExporter() + + for ( + temporality + ) in otlp_metric_exporter._preferred_temporality.values(): + self.assertEqual(temporality, AggregationTemporality.CUMULATIVE) + + with patch.dict( + environ, + {OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE: "CUMULATIVE"}, + ): + + otlp_metric_exporter = OTLPMetricExporter() + + for ( + temporality + ) in otlp_metric_exporter._preferred_temporality.values(): + self.assertEqual( + temporality, AggregationTemporality.CUMULATIVE + ) + + with patch.dict( + environ, {OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE: "ABC"} + ): + + with self.assertLogs(level=WARNING): + otlp_metric_exporter = OTLPMetricExporter() + + for ( + temporality + ) in otlp_metric_exporter._preferred_temporality.values(): + self.assertEqual( + temporality, AggregationTemporality.CUMULATIVE + ) + + with patch.dict( + environ, + {OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE: "DELTA"}, + ): + + otlp_metric_exporter = OTLPMetricExporter() + + self.assertEqual( + otlp_metric_exporter._preferred_temporality[Counter], + AggregationTemporality.DELTA, + ) + self.assertEqual( + otlp_metric_exporter._preferred_temporality[UpDownCounter], + AggregationTemporality.CUMULATIVE, + ) + self.assertEqual( + otlp_metric_exporter._preferred_temporality[Histogram], + AggregationTemporality.DELTA, + ) + self.assertEqual( + otlp_metric_exporter._preferred_temporality[ObservableCounter], + AggregationTemporality.DELTA, + ) + self.assertEqual( + otlp_metric_exporter._preferred_temporality[ + ObservableUpDownCounter + ], + AggregationTemporality.CUMULATIVE, + ) + self.assertEqual( + otlp_metric_exporter._preferred_temporality[ObservableGauge], + AggregationTemporality.CUMULATIVE, + ) + + with patch.dict( + environ, + {OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE: "LOWMEMORY"}, + ): + + otlp_metric_exporter = OTLPMetricExporter() + + self.assertEqual( + otlp_metric_exporter._preferred_temporality[Counter], + AggregationTemporality.DELTA, + ) + self.assertEqual( + otlp_metric_exporter._preferred_temporality[UpDownCounter], + AggregationTemporality.CUMULATIVE, + ) + self.assertEqual( + otlp_metric_exporter._preferred_temporality[Histogram], + AggregationTemporality.DELTA, + ) + self.assertEqual( + otlp_metric_exporter._preferred_temporality[ObservableCounter], + AggregationTemporality.CUMULATIVE, + ) + self.assertEqual( + otlp_metric_exporter._preferred_temporality[ + ObservableUpDownCounter + ], + AggregationTemporality.CUMULATIVE, + ) + self.assertEqual( + otlp_metric_exporter._preferred_temporality[ObservableGauge], + AggregationTemporality.CUMULATIVE, + ) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py b/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py index 376fb187dc1..38819943265 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py @@ -576,9 +576,11 @@ variable allows users to set the default aggregation temporality policy to use on the basis of instrument kind. The valid (case-insensitive) values are: -``CUMULATIVE``: Choose ``CUMULATIVE`` aggregation temporality for all instrument kinds. -``DELTA``: Choose ``DELTA`` aggregation temporality for ``Counter``, ``Asynchronous Counter`` and ``Histogram``. -Choose ``CUMULATIVE`` aggregation temporality for ``UpDownCounter`` and ``Asynchronous UpDownCounter``. +``CUMULATIVE``: Use ``CUMULATIVE`` aggregation temporality for all instrument kinds. +``DELTA``: Use ``DELTA`` aggregation temporality for ``Counter``, ``Asynchronous Counter`` and ``Histogram``. +Use ``CUMULATIVE`` aggregation temporality for ``UpDownCounter`` and ``Asynchronous UpDownCounter``. +``LOWMEMORY``: Use ``DELTA`` aggregation temporality for ``Counter`` and ``Histogram``. +Use ``CUMULATIVE`` aggregation temporality for ``UpDownCounter``, ``AsynchronousCounter`` and ``Asynchronous UpDownCounter``. """ OTEL_EXPORTER_JAEGER_GRPC_INSECURE = "OTEL_EXPORTER_JAEGER_GRPC_INSECURE"