Skip to content

Commit

Permalink
Implement LowMemory temporality
Browse files Browse the repository at this point in the history
Fixes #3075
  • Loading branch information
ocelotl committed Mar 16, 2023
1 parent a70e4a0 commit 7d4587e
Show file tree
Hide file tree
Showing 6 changed files with 309 additions and 35 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- Implement LowMemory temporality
([#3223](https://github.com/open-telemetry/opentelemetry-python/pull/3223))
- PeriodicExportingMetricReader will continue if collection times out
([#3100](https://github.com/open-telemetry/opentelemetry-python/pull/3100))
- Fix formatting of ConsoleMetricExporter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import dataclasses
from dataclasses import replace
from logging import getLogger
from os import environ
from typing import Dict, Iterable, List, Optional, Sequence
Expand Down Expand Up @@ -120,15 +120,17 @@ def __init__(
)

instrument_class_temporality = {}
if (

otel_exporter_otlp_metrics_temporality_preference = (
environ.get(
OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE,
"CUMULATIVE",
)
.upper()
.strip()
== "DELTA"
):
)

if otel_exporter_otlp_metrics_temporality_preference == "DELTA":
instrument_class_temporality = {
Counter: AggregationTemporality.DELTA,
UpDownCounter: AggregationTemporality.CUMULATIVE,
Expand All @@ -137,7 +139,27 @@ def __init__(
ObservableUpDownCounter: AggregationTemporality.CUMULATIVE,
ObservableGauge: AggregationTemporality.CUMULATIVE,
}

elif otel_exporter_otlp_metrics_temporality_preference == "LOWMEMORY":
instrument_class_temporality = {
Counter: AggregationTemporality.DELTA,
UpDownCounter: AggregationTemporality.CUMULATIVE,
Histogram: AggregationTemporality.DELTA,
ObservableCounter: AggregationTemporality.CUMULATIVE,
ObservableUpDownCounter: AggregationTemporality.CUMULATIVE,
ObservableGauge: AggregationTemporality.CUMULATIVE,
}

else:
if otel_exporter_otlp_metrics_temporality_preference != (
"CUMULATIVE"
):
_logger.warning(
"Unrecognized OTEL_EXPORTER_METRICS_TEMPORALITY_PREFERENCE"
" value found: "
f"{otel_exporter_otlp_metrics_temporality_preference}, "
"using CUMULATIVE"
)
instrument_class_temporality = {
Counter: AggregationTemporality.CUMULATIVE,
UpDownCounter: AggregationTemporality.CUMULATIVE,
Expand All @@ -146,6 +168,7 @@ def __init__(
ObservableUpDownCounter: AggregationTemporality.CUMULATIVE,
ObservableGauge: AggregationTemporality.CUMULATIVE,
}

instrument_class_temporality.update(preferred_temporality or {})

MetricExporter.__init__(
Expand Down Expand Up @@ -359,25 +382,25 @@ def _split_metrics_data(
for resource_metrics in metrics_data.resource_metrics:
split_scope_metrics: List[ScopeMetrics] = []
split_resource_metrics.append(
dataclasses.replace(
replace(
resource_metrics,
scope_metrics=split_scope_metrics,
)
)
for scope_metrics in resource_metrics.scope_metrics:
split_metrics: List[Metric] = []
split_scope_metrics.append(
dataclasses.replace(
replace(
scope_metrics,
metrics=split_metrics,
)
)
for metric in scope_metrics.metrics:
split_data_points: List[DataPointT] = []
split_metrics.append(
dataclasses.replace(
replace(
metric,
data=dataclasses.replace(
data=replace(
metric.data,
data_points=split_data_points,
),
Expand All @@ -396,22 +419,22 @@ def _split_metrics_data(
batch_size = 0
split_data_points = []
split_metrics = [
dataclasses.replace(
replace(
metric,
data=dataclasses.replace(
data=replace(
metric.data,
data_points=split_data_points,
),
)
]
split_scope_metrics = [
dataclasses.replace(
replace(
scope_metrics,
metrics=split_metrics,
)
]
split_resource_metrics = [
dataclasses.replace(
replace(
resource_metrics,
scope_metrics=split_scope_metrics,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

# pylint: disable=too-many-lines
from logging import WARNING
from os import environ
from os.path import dirname
from typing import List
from unittest import TestCase
Expand Down Expand Up @@ -1532,6 +1533,111 @@ def test_shutdown_wait_last_export(self):
finally:
export_thread.join()

def test_aggregation_temporality(self):

otlp_metric_exporter = OTLPMetricExporter()

for (
temporality
) in otlp_metric_exporter._preferred_temporality.values():
self.assertEqual(temporality, AggregationTemporality.CUMULATIVE)

with patch.dict(
environ,
{OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE: "CUMULATIVE"},
):

otlp_metric_exporter = OTLPMetricExporter()

for (
temporality
) in otlp_metric_exporter._preferred_temporality.values():
self.assertEqual(
temporality, AggregationTemporality.CUMULATIVE
)

with patch.dict(
environ, {OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE: "ABC"}
):

with self.assertLogs(level=WARNING):
otlp_metric_exporter = OTLPMetricExporter()

for (
temporality
) in otlp_metric_exporter._preferred_temporality.values():
self.assertEqual(
temporality, AggregationTemporality.CUMULATIVE
)

with patch.dict(
environ,
{OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE: "DELTA"},
):

otlp_metric_exporter = OTLPMetricExporter()

self.assertEqual(
otlp_metric_exporter._preferred_temporality[Counter],
AggregationTemporality.DELTA,
)
self.assertEqual(
otlp_metric_exporter._preferred_temporality[UpDownCounter],
AggregationTemporality.CUMULATIVE,
)
self.assertEqual(
otlp_metric_exporter._preferred_temporality[Histogram],
AggregationTemporality.DELTA,
)
self.assertEqual(
otlp_metric_exporter._preferred_temporality[ObservableCounter],
AggregationTemporality.DELTA,
)
self.assertEqual(
otlp_metric_exporter._preferred_temporality[
ObservableUpDownCounter
],
AggregationTemporality.CUMULATIVE,
)
self.assertEqual(
otlp_metric_exporter._preferred_temporality[ObservableGauge],
AggregationTemporality.CUMULATIVE,
)

with patch.dict(
environ,
{OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE: "LOWMEMORY"},
):

otlp_metric_exporter = OTLPMetricExporter()

self.assertEqual(
otlp_metric_exporter._preferred_temporality[Counter],
AggregationTemporality.DELTA,
)
self.assertEqual(
otlp_metric_exporter._preferred_temporality[UpDownCounter],
AggregationTemporality.CUMULATIVE,
)
self.assertEqual(
otlp_metric_exporter._preferred_temporality[Histogram],
AggregationTemporality.DELTA,
)
self.assertEqual(
otlp_metric_exporter._preferred_temporality[ObservableCounter],
AggregationTemporality.CUMULATIVE,
)
self.assertEqual(
otlp_metric_exporter._preferred_temporality[
ObservableUpDownCounter
],
AggregationTemporality.CUMULATIVE,
)
self.assertEqual(
otlp_metric_exporter._preferred_temporality[ObservableGauge],
AggregationTemporality.CUMULATIVE,
)


def _resource_metrics(
index: int, scope_metrics: List[ScopeMetrics]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,17 @@ def __init__(
)

instrument_class_temporality = {}
if (

otel_exporter_otlp_metrics_temporality_preference = (
environ.get(
OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE,
"CUMULATIVE",
)
.upper()
.strip()
== "DELTA"
):
)

if otel_exporter_otlp_metrics_temporality_preference == "DELTA":
instrument_class_temporality = {
Counter: AggregationTemporality.DELTA,
UpDownCounter: AggregationTemporality.CUMULATIVE,
Expand All @@ -155,7 +157,27 @@ def __init__(
ObservableUpDownCounter: AggregationTemporality.CUMULATIVE,
ObservableGauge: AggregationTemporality.CUMULATIVE,
}

elif otel_exporter_otlp_metrics_temporality_preference == "LOWMEMORY":
instrument_class_temporality = {
Counter: AggregationTemporality.DELTA,
UpDownCounter: AggregationTemporality.CUMULATIVE,
Histogram: AggregationTemporality.DELTA,
ObservableCounter: AggregationTemporality.CUMULATIVE,
ObservableUpDownCounter: AggregationTemporality.CUMULATIVE,
ObservableGauge: AggregationTemporality.CUMULATIVE,
}

else:
if otel_exporter_otlp_metrics_temporality_preference != (
"CUMULATIVE"
):
_logger.warning(
"Unrecognized OTEL_EXPORTER_METRICS_TEMPORALITY_PREFERENCE"
" value found: "
f"{otel_exporter_otlp_metrics_temporality_preference}, "
"using CUMULATIVE"
)
instrument_class_temporality = {
Counter: AggregationTemporality.CUMULATIVE,
UpDownCounter: AggregationTemporality.CUMULATIVE,
Expand All @@ -164,6 +186,7 @@ def __init__(
ObservableUpDownCounter: AggregationTemporality.CUMULATIVE,
ObservableGauge: AggregationTemporality.CUMULATIVE,
}

instrument_class_temporality.update(preferred_temporality or {})

MetricExporter.__init__(
Expand Down
Loading

0 comments on commit 7d4587e

Please sign in to comment.