diff --git a/CHANGELOG.md b/CHANGELOG.md index d43b64cfaf..a72276d75d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,7 +11,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Fix formatting of ConsoleMetricExporter. ([#3197](https://github.com/open-telemetry/opentelemetry-python/pull/3197)) -## Version 1.16.0/0.37b0 (2023-02-17) +- Add exponential histogram + ([#2964](https://github.com/open-telemetry/opentelemetry-python/pull/2964)) + +## Version 1.16.0/0.37b0 (2023-02-15) + - Change ``__all__`` to be statically defined. ([#3143](https://github.com/open-telemetry/opentelemetry-python/pull/3143)) - Remove the ability to set a global metric prefix for Prometheus exporter diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py index 3c6f59c35f..942ef1d4aa 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py @@ -60,6 +60,7 @@ ResourceMetrics, ScopeMetrics, Sum, + ExponentialHistogram as ExponentialHistogramType, ) _logger = getLogger(__name__) @@ -266,6 +267,51 @@ def _translate_data( metric.data.is_monotonic ) pb2_metric.sum.data_points.append(pt) + + elif isinstance(metric.data, ExponentialHistogramType): + for data_point in metric.data.data_points: + + if data_point.positive.bucket_counts: + positive = pb2.ExponentialHistogramDataPoint.Buckets( + offset=data_point.positive.offset, + bucket_counts=data_point.positive.bucket_counts, + ) + else: + positive = None + + if data_point.negative.bucket_counts: + negative = pb2.ExponentialHistogramDataPoint.Buckets( + offset=data_point.negative.offset, + bucket_counts=data_point.negative.bucket_counts, + ) + else: + negative = None + + pt = pb2.ExponentialHistogramDataPoint( + attributes=self._translate_attributes( + data_point.attributes + ), + time_unix_nano=data_point.time_unix_nano, + start_time_unix_nano=( + data_point.start_time_unix_nano + ), + count=data_point.count, + sum=data_point.sum, + scale=data_point.scale, + zero_count=data_point.zero_count, + positive=positive, + negative=negative, + flags=data_point.flags, + max=data_point.max, + min=data_point.min, + ) + pb2_metric.exponential_histogram.aggregation_temporality = ( + metric.data.aggregation_temporality + ) + pb2_metric.exponential_histogram.data_points.append( + pt + ) + else: _logger.warning( "unsupported data type %s", diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py index 31bb878a5f..b38d91eb83 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py @@ -62,7 +62,14 @@ ObservableUpDownCounter, UpDownCounter, ) -from opentelemetry.sdk.metrics.export import AggregationTemporality, Gauge +from opentelemetry.sdk.metrics.export import AggregationTemporality, Buckets +from opentelemetry.sdk.metrics.export import ( + ExponentialHistogram as ExponentialHistogramType, +) +from opentelemetry.sdk.metrics.export import ( + ExponentialHistogramDataPoint, + Gauge, +) from opentelemetry.sdk.metrics.export import Histogram as HistogramType from opentelemetry.sdk.metrics.export import ( HistogramDataPoint, @@ -163,6 +170,31 @@ def setUp(self): ), ) + exponential_histogram = Metric( + name="exponential_histogram", + description="description", + unit="unit", + data=ExponentialHistogramType( + data_points=[ + ExponentialHistogramDataPoint( + attributes={"a": 1, "b": True}, + start_time_unix_nano=0, + time_unix_nano=1, + count=2, + sum=3, + scale=4, + zero_count=5, + positive=Buckets(offset=6, bucket_counts=[7, 8]), + negative=Buckets(offset=9, bucket_counts=[10, 11]), + flags=12, + min=13.0, + max=14.0, + ) + ], + aggregation_temporality=AggregationTemporality.DELTA, + ), + ) + self.metrics = { "sum_int": MetricsData( resource_metrics=[ @@ -276,6 +308,28 @@ def setUp(self): ) ] ), + "exponential_histogram": MetricsData( + resource_metrics=[ + ResourceMetrics( + resource=Resource( + attributes={"a": 1, "b": False}, + schema_url="resource_schema_url", + ), + scope_metrics=[ + ScopeMetrics( + scope=SDKInstrumentationScope( + name="first_name", + version="first_version", + schema_url="insrumentation_scope_schema_url", + ), + metrics=[exponential_histogram], + schema_url="instrumentation_scope_schema_url", + ) + ], + schema_url="resource_schema_url", + ) + ] + ), } self.multiple_scope_histogram = MetricsData( @@ -896,6 +950,80 @@ def test_translate_histogram(self): actual = self.exporter._translate_data(self.metrics["histogram"]) self.assertEqual(expected, actual) + def test_translate_exponential_histogram(self): + expected = ExportMetricsServiceRequest( + resource_metrics=[ + pb2.ResourceMetrics( + resource=OTLPResource( + attributes=[ + KeyValue(key="a", value=AnyValue(int_value=1)), + KeyValue( + key="b", value=AnyValue(bool_value=False) + ), + ] + ), + scope_metrics=[ + pb2.ScopeMetrics( + scope=InstrumentationScope( + name="first_name", version="first_version" + ), + metrics=[ + pb2.Metric( + name="exponential_histogram", + unit="unit", + description="description", + exponential_histogram=pb2.ExponentialHistogram( + data_points=[ + pb2.ExponentialHistogramDataPoint( + attributes=[ + KeyValue( + key="a", + value=AnyValue( + int_value=1 + ), + ), + KeyValue( + key="b", + value=AnyValue( + bool_value=True + ), + ), + ], + start_time_unix_nano=0, + time_unix_nano=1, + count=2, + sum=3, + scale=4, + zero_count=5, + positive=pb2.ExponentialHistogramDataPoint.Buckets( + offset=6, + bucket_counts=[7, 8], + ), + negative=pb2.ExponentialHistogramDataPoint.Buckets( + offset=9, + bucket_counts=[10, 11], + ), + flags=12, + exemplars=[], + min=13.0, + max=14.0, + ) + ], + aggregation_temporality=AggregationTemporality.DELTA, + ), + ) + ], + ) + ], + ) + ] + ) + # pylint: disable=protected-access + actual = self.exporter._translate_data( + self.metrics["exponential_histogram"] + ) + self.assertEqual(expected, actual) + def test_translate_multiple_scope_histogram(self): expected = ExportMetricsServiceRequest( resource_metrics=[ diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py index 4fb9041d4c..7312df1aa7 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +# pylint: disable=too-many-lines + from abc import ABC, abstractmethod from bisect import bisect_left from enum import IntEnum @@ -31,8 +33,21 @@ Synchronous, UpDownCounter, ) +from opentelemetry.sdk.metrics._internal.exponential_histogram.buckets import ( + Buckets, +) +from opentelemetry.sdk.metrics._internal.exponential_histogram.mapping.exponent_mapping import ( + ExponentMapping, +) +from opentelemetry.sdk.metrics._internal.exponential_histogram.mapping.logarithm_mapping import ( + LogarithmMapping, +) from opentelemetry.sdk.metrics._internal.measurement import Measurement -from opentelemetry.sdk.metrics._internal.point import Gauge +from opentelemetry.sdk.metrics._internal.point import Buckets as BucketsPoint +from opentelemetry.sdk.metrics._internal.point import ( + ExponentialHistogramDataPoint, + Gauge, +) from opentelemetry.sdk.metrics._internal.point import ( Histogram as HistogramPoint, ) @@ -349,6 +364,495 @@ def collect( return current_point +# pylint: disable=protected-access +class _ExponentialBucketHistogramAggregation(_Aggregation[HistogramPoint]): + # _min_max_size and _max_max_size are the smallest and largest values + # the max_size parameter may have, respectively. + + # _min_max_size is is the smallest reasonable value which is small enough + # to contain the entire normal floating point range at the minimum scale. + _min_max_size = 2 + + # _max_max_size is an arbitrary limit meant to limit accidental creation of + # giant exponential bucket histograms. + _max_max_size = 16384 + + def __init__( + self, + attributes: Attributes, + start_time_unix_nano: int, + # This is the default maximum number of buckets per positive or + # negative number range. The value 160 is specified by OpenTelemetry. + # See the derivation here: + # https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exponential-bucket-histogram-aggregation) + max_size: int = 160, + ): + super().__init__(attributes) + # max_size is the maximum capacity of the positive and negative + # buckets. + if max_size < self._min_max_size: + raise ValueError( + f"Buckets max size {max_size} is smaller than " + "minimum max size {self._min_max_size}" + ) + + if max_size > self._max_max_size: + raise ValueError( + f"Buckets max size {max_size} is larger than " + "maximum max size {self._max_max_size}" + ) + + self._max_size = max_size + + # _sum is the sum of all the values aggregated by this aggregator. + self._sum = 0 + + # _count is the count of all calls to aggregate. + self._count = 0 + + # _zero_count is the count of all the calls to aggregate when the value + # to be aggregated is exactly 0. + self._zero_count = 0 + + # _min is the smallest value aggregated by this aggregator. + self._min = inf + + # _max is the smallest value aggregated by this aggregator. + self._max = -inf + + # _positive holds the positive values. + self._positive = Buckets() + + # _negative holds the negative values by their absolute value. + self._negative = Buckets() + + # _mapping corresponds to the current scale, is shared by both the + # positive and negative buckets. + self._mapping = LogarithmMapping(LogarithmMapping._max_scale) + + self._instrument_temporality = AggregationTemporality.DELTA + self._start_time_unix_nano = start_time_unix_nano + + self._previous_scale = None + self._previous_start_time_unix_nano = None + self._previous_sum = None + self._previous_max = None + self._previous_min = None + self._previous_positive = None + self._previous_negative = None + + def aggregate(self, measurement: Measurement) -> None: + # pylint: disable=too-many-branches,too-many-statements, too-many-locals + + with self._lock: + + value = measurement.value + + # 0. Set the following attributes: + # _min + # _max + # _count + # _zero_count + # _sum + if value < self._min: + self._min = value + + if value > self._max: + self._max = value + + self._count += 1 + + if value == 0: + self._zero_count += 1 + # No need to do anything else if value is zero, just increment the + # zero count. + return + + self._sum += value + + # 1. Use the positive buckets for positive values and the negative + # buckets for negative values. + if value > 0: + buckets = self._positive + + else: + # Both exponential and logarithm mappings use only positive values + # so the absolute value is used here. + value = -value + buckets = self._negative + + # 2. Compute the index for the value at the current scale. + index = self._mapping.map_to_index(value) + + # IncrementIndexBy starts here + + # 3. Determine if a change of scale is needed. + is_rescaling_needed = False + + if len(buckets) == 0: + buckets.index_start = index + buckets.index_end = index + buckets.index_base = index + + elif ( + index < buckets.index_start + and (buckets.index_end - index) >= self._max_size + ): + is_rescaling_needed = True + low = index + high = buckets.index_end + + elif ( + index > buckets.index_end + and (index - buckets.index_start) >= self._max_size + ): + is_rescaling_needed = True + low = buckets.index_start + high = index + + # 4. Rescale the mapping if needed. + if is_rescaling_needed: + + self._downscale( + self._get_scale_change(low, high), + self._positive, + self._negative, + ) + + index = self._mapping.map_to_index(value) + + # 5. If the index is outside + # [buckets.index_start, buckets.index_end] readjust the buckets + # boundaries or add more buckets. + if index < buckets.index_start: + span = buckets.index_end - index + + if span >= len(buckets.counts): + buckets.grow(span + 1, self._max_size) + + buckets.index_start = index + + elif index > buckets.index_end: + span = index - buckets.index_start + + if span >= len(buckets.counts): + buckets.grow(span + 1, self._max_size) + + buckets.index_end = index + + # 6. Compute the index of the bucket to be incremented. + bucket_index = index - buckets.index_base + + if bucket_index < 0: + bucket_index += len(buckets.counts) + + # 7. Increment the bucket. + buckets.increment_bucket(bucket_index) + + def collect( + self, + aggregation_temporality: AggregationTemporality, + collection_start_nano: int, + ) -> Optional[_DataPointVarT]: + """ + Atomically return a point for the current value of the metric. + """ + # pylint: disable=too-many-statements, too-many-locals + + with self._lock: + if self._count == 0: + return None + + current_negative = self._negative + current_positive = self._positive + current_zero_count = self._zero_count + current_count = self._count + current_start_time_unix_nano = self._start_time_unix_nano + current_sum = self._sum + current_max = self._max + if current_max == -inf: + current_max = None + current_min = self._min + if current_min == inf: + current_min = None + + if self._count == self._zero_count: + current_scale = 0 + + else: + current_scale = self._mapping.scale + + self._negative = Buckets() + self._positive = Buckets() + self._start_time_unix_nano = collection_start_nano + self._sum = 0 + self._count = 0 + self._zero_count = 0 + self._min = inf + self._max = -inf + + current_point = ExponentialHistogramDataPoint( + attributes=self._attributes, + start_time_unix_nano=current_start_time_unix_nano, + time_unix_nano=collection_start_nano, + count=current_count, + sum=current_sum, + scale=current_scale, + zero_count=current_zero_count, + positive=BucketsPoint( + offset=current_positive.offset, + bucket_counts=current_positive.counts, + ), + negative=BucketsPoint( + offset=current_negative.offset, + bucket_counts=current_negative.counts, + ), + # FIXME: Find the right value for flags + flags=0, + min=current_min, + max=current_max, + ) + + if self._previous_scale is None or ( + self._instrument_temporality is aggregation_temporality + ): + self._previous_scale = current_scale + self._previous_start_time_unix_nano = ( + current_start_time_unix_nano + ) + self._previous_max = current_max + self._previous_min = current_min + self._previous_sum = current_sum + self._previous_positive = current_positive + self._previous_negative = current_negative + + return current_point + + min_scale = min(self._previous_scale, current_scale) + + low_positive, high_positive = self._get_low_high_previous_current( + self._previous_positive, current_positive, min_scale + ) + low_negative, high_negative = self._get_low_high_previous_current( + self._previous_negative, current_negative, min_scale + ) + + min_scale = min( + min_scale + - self._get_scale_change(low_positive, high_positive), + min_scale + - self._get_scale_change(low_negative, high_negative), + ) + + # FIXME Go implementation checks if the histogram (not the mapping + # but the histogram) has a count larger than zero, if not, scale + # (the histogram scale) would be zero. See exponential.go 191 + self._downscale( + self._mapping.scale - min_scale, + self._previous_positive, + self._previous_negative, + ) + + if aggregation_temporality is AggregationTemporality.CUMULATIVE: + + start_time_unix_nano = self._previous_start_time_unix_nano + sum_ = current_sum + self._previous_sum + # Only update min/max on delta -> cumulative + max_ = max(current_max, self._previous_max) + min_ = min(current_min, self._previous_min) + + self._merge( + self._previous_positive, + current_positive, + current_scale, + min_scale, + aggregation_temporality, + ) + self._merge( + self._previous_negative, + current_negative, + current_scale, + min_scale, + aggregation_temporality, + ) + + else: + start_time_unix_nano = self._previous_start_time_unix_nano + sum_ = current_sum - self._previous_sum + max_ = current_max + min_ = current_min + + self._merge( + self._previous_positive, + current_positive, + current_scale, + min_scale, + aggregation_temporality, + ) + self._merge( + self._previous_negative, + current_negative, + current_scale, + min_scale, + aggregation_temporality, + ) + + current_point = ExponentialHistogramDataPoint( + attributes=self._attributes, + start_time_unix_nano=start_time_unix_nano, + time_unix_nano=collection_start_nano, + count=current_count, + sum=sum_, + scale=current_scale, + zero_count=current_zero_count, + positive=BucketsPoint( + offset=current_positive.offset, + bucket_counts=current_positive.counts, + ), + negative=BucketsPoint( + offset=current_negative.offset, + bucket_counts=current_negative.counts, + ), + # FIXME: Find the right value for flags + flags=0, + min=min_, + max=max_, + ) + + self._previous_scale = current_scale + self._previous_positive = current_positive + self._previous_negative = current_negative + self._previous_start_time_unix_nano = current_start_time_unix_nano + self._previous_sum = current_sum + + return current_point + + def _get_low_high_previous_current( + self, previous_point_buckets, current_point_buckets, min_scale + ): + + (previous_point_low, previous_point_high) = self._get_low_high( + previous_point_buckets, min_scale + ) + (current_point_low, current_point_high) = self._get_low_high( + current_point_buckets, min_scale + ) + + if current_point_low > current_point_high: + low = previous_point_low + high = previous_point_high + + elif previous_point_low > previous_point_high: + low = current_point_low + high = current_point_high + + else: + low = min(previous_point_low, current_point_low) + high = max(previous_point_high, current_point_high) + + return low, high + + def _get_low_high(self, buckets, min_scale): + if buckets.counts == [0]: + return 0, -1 + + shift = self._mapping._scale - min_scale + + return buckets.index_start >> shift, buckets.index_end >> shift + + def _get_scale_change(self, low, high): + + change = 0 + + while high - low >= self._max_size: + high = high >> 1 + low = low >> 1 + + change += 1 + + return change + + def _downscale(self, change: int, positive, negative): + + if change == 0: + return + + if change < 0: + raise Exception("Invalid change of scale") + + new_scale = self._mapping.scale - change + + positive.downscale(change) + negative.downscale(change) + + if new_scale <= 0: + mapping = ExponentMapping(new_scale) + else: + mapping = LogarithmMapping(new_scale) + + self._mapping = mapping + + def _merge( + self, + previous_buckets, + current_buckets, + current_scale, + min_scale, + aggregation_temporality, + ): + + current_change = current_scale - min_scale + + for current_bucket_index, current_bucket in enumerate( + current_buckets.counts + ): + + if current_bucket == 0: + continue + + # Not considering the case where len(previous_buckets) == 0. This + # would not happen because self._previous_point is only assigned to + # an ExponentialHistogramDataPoint object if self._count != 0. + + index = ( + current_buckets.offset + current_bucket_index + ) >> current_change + + if index < previous_buckets.index_start: + span = previous_buckets.index_end - index + + if span >= self._max_size: + raise Exception("Incorrect merge scale") + + if span >= len(previous_buckets.counts): + previous_buckets.grow(span + 1, self._max_size) + + previous_buckets.index_start = index + + if index > previous_buckets.index_end: + span = index - previous_buckets.index_end + + if span >= self._max_size: + raise Exception("Incorrect merge scale") + + if span >= len(previous_buckets.counts): + previous_buckets.grow(span + 1, self._max_size) + + previous_buckets.index_end = index + + bucket_index = index - previous_buckets.index_base + + if bucket_index < 0: + bucket_index += len(previous_buckets.counts) + + if aggregation_temporality is AggregationTemporality.DELTA: + current_bucket = -current_bucket + + previous_buckets.increment_bucket( + bucket_index, increment=current_bucket + ) + + class Aggregation(ABC): """ Base class for all aggregation types. @@ -433,6 +937,27 @@ def _create_aggregation( raise Exception(f"Invalid instrument type {type(instrument)} found") +class ExponentialBucketHistogramAggregation(Aggregation): + def __init__( + self, + max_size: int = 160, + ): + + self._max_size = max_size + + def _create_aggregation( + self, + instrument: Instrument, + attributes: Attributes, + start_time_unix_nano: int, + ) -> _Aggregation: + return _ExponentialBucketHistogramAggregation( + attributes, + start_time_unix_nano, + max_size=self._max_size, + ) + + class ExplicitBucketHistogramAggregation(Aggregation): """This aggregation informs the SDK to collect: diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exponential_histogram/buckets.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exponential_histogram/buckets.py new file mode 100644 index 0000000000..5c6b04bd39 --- /dev/null +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exponential_histogram/buckets.py @@ -0,0 +1,176 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from math import ceil, log2 + + +class Buckets: + + # No method of this class is protected by locks because instances of this + # class are only used in methods that are protected by locks themselves. + + def __init__(self): + self._counts = [0] + + # The term index refers to the number of the exponential histogram bucket + # used to determine its boundaries. The lower boundary of a bucket is + # determined by base ** index and the upper boundary of a bucket is + # determined by base ** (index + 1). index values are signedto account + # for values less than or equal to 1. + + # self._index_* will all have values equal to a certain index that is + # determined by the corresponding mapping _map_to_index function and + # the value of the index depends on the value passed to _map_to_index. + + # Index of the 0th position in self._counts: self._counts[0] is the + # count in the bucket with index self.__index_base. + self.__index_base = 0 + + # self.__index_start is the smallest index value represented in + # self._counts. + self.__index_start = 0 + + # self.__index_start is the largest index value represented in + # self._counts. + self.__index_end = 0 + + @property + def index_start(self) -> int: + return self.__index_start + + @index_start.setter + def index_start(self, value: int) -> None: + self.__index_start = value + + @property + def index_end(self) -> int: + return self.__index_end + + @index_end.setter + def index_end(self, value: int) -> None: + self.__index_end = value + + @property + def index_base(self) -> int: + return self.__index_base + + @index_base.setter + def index_base(self, value: int) -> None: + self.__index_base = value + + @property + def counts(self): + return self._counts + + def grow(self, needed: int, max_size: int) -> None: + + size = len(self._counts) + bias = self.__index_base - self.__index_start + old_positive_limit = size - bias + + # 2 ** ceil(log2(needed)) finds the smallest power of two that is larger + # or equal than needed: + # 2 ** ceil(log2(1)) == 1 + # 2 ** ceil(log2(2)) == 2 + # 2 ** ceil(log2(3)) == 4 + # 2 ** ceil(log2(4)) == 4 + # 2 ** ceil(log2(5)) == 8 + # 2 ** ceil(log2(6)) == 8 + # 2 ** ceil(log2(7)) == 8 + # 2 ** ceil(log2(8)) == 8 + new_size = min(2 ** ceil(log2(needed)), max_size) + + new_positive_limit = new_size - bias + + tmp = [0] * new_size + tmp[new_positive_limit:] = self._counts[old_positive_limit:] + tmp[0:old_positive_limit] = self._counts[0:old_positive_limit] + self._counts = tmp + + @property + def offset(self) -> int: + return self.__index_start + + def __len__(self) -> int: + if len(self._counts) == 0: + return 0 + + if self.__index_end == self.__index_start and self[0] == 0: + return 0 + + return self.__index_end - self.__index_start + 1 + + def __getitem__(self, key: int) -> int: + bias = self.__index_base - self.__index_start + + if key < bias: + key += len(self._counts) + + key -= bias + + return self._counts[key] + + def downscale(self, amount: int) -> None: + """ + Rotates, then collapses 2 ** amount to 1 buckets. + """ + + bias = self.__index_base - self.__index_start + + if bias != 0: + + self.__index_base = self.__index_start + + # [0, 1, 2, 3, 4] Original backing array + + self._counts = self._counts[::-1] + # [4, 3, 2, 1, 0] + + self._counts = ( + self._counts[:bias][::-1] + self._counts[bias:][::-1] + ) + # [3, 4, 0, 1, 2] This is a rotation of the backing array. + + size = 1 + self.__index_end - self.__index_start + each = 1 << amount + inpos = 0 + outpos = 0 + + pos = self.__index_start + + while pos <= self.__index_end: + mod = pos % each + if mod < 0: + mod += each + + index = mod + + while index < each and inpos < size: + + if outpos != inpos: + self._counts[outpos] += self._counts[inpos] + self._counts[inpos] = 0 + + inpos += 1 + pos += 1 + index += 1 + + outpos += 1 + + self.__index_start >>= amount + self.__index_end >>= amount + self.__index_base = self.__index_start + + def increment_bucket(self, bucket_index: int, increment: int = 1) -> None: + self._counts[bucket_index] += increment diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py index 4602b7b6bc..bef57eaab0 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py @@ -31,12 +31,14 @@ ExplicitBucketHistogramAggregation, _DropAggregation, _ExplicitBucketHistogramAggregation, + _ExponentialBucketHistogramAggregation, _LastValueAggregation, _SumAggregation, ) from opentelemetry.sdk.metrics._internal.export import AggregationTemporality from opentelemetry.sdk.metrics._internal.measurement import Measurement from opentelemetry.sdk.metrics._internal.point import ( + ExponentialHistogram, Gauge, Histogram, Metric, @@ -192,6 +194,18 @@ def collect(self) -> MetricsData: ): continue + elif isinstance( + # pylint: disable=protected-access + view_instrument_match._aggregation, + _ExponentialBucketHistogramAggregation, + ): + data = ExponentialHistogram( + data_points=view_instrument_match.collect( + aggregation_temporality, collection_start_nanos + ), + aggregation_temporality=aggregation_temporality, + ) + metrics.append( Metric( # pylint: disable=protected-access diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/point.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/point.py index 410c7754d8..cba37e7fdf 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/point.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/point.py @@ -60,6 +60,48 @@ def to_json(self, indent=4) -> str: return dumps(asdict(self), indent=indent) +@dataclass(frozen=True) +class Buckets: + offset: int + bucket_counts: Sequence[int] + + +@dataclass(frozen=True) +class ExponentialHistogramDataPoint: + """Single data point in a timeseries whose boundaries are defined by an + exponential function. This timeseries describes the time-varying scalar + value of a metric. + """ + + attributes: Attributes + start_time_unix_nano: int + time_unix_nano: int + count: int + sum: Union[int, float] + scale: int + zero_count: int + positive: Buckets + negative: Buckets + flags: int + min: float + max: float + + def to_json(self, indent=4) -> str: + return dumps(asdict(self), indent=indent) + + +@dataclass(frozen=True) +class ExponentialHistogram: + """Represents the type of a metric that is calculated by aggregating as an + ExponentialHistogram of all reported measurements over a time interval. + """ + + data_points: Sequence[ExponentialHistogramDataPoint] + aggregation_temporality: ( + "opentelemetry.sdk.metrics.export.AggregationTemporality" + ) + + @dataclass(frozen=True) class Sum: """Represents the type of a scalar metric that is calculated as a sum of diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/__init__.py index a87beadce5..97c31b97ec 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/__init__.py @@ -24,9 +24,12 @@ ) # The point module is not in the export directory to avoid a circular import. -from opentelemetry.sdk.metrics._internal.point import ( +from opentelemetry.sdk.metrics._internal.point import ( # noqa: F401 + Buckets, DataPointT, DataT, + ExponentialHistogram, + ExponentialHistogramDataPoint, Gauge, Histogram, HistogramDataPoint, diff --git a/opentelemetry-sdk/tests/metrics/exponential_histogram/test_exponential_bucket_histogram_aggregation.py b/opentelemetry-sdk/tests/metrics/exponential_histogram/test_exponential_bucket_histogram_aggregation.py new file mode 100644 index 0000000000..65a437bc6d --- /dev/null +++ b/opentelemetry-sdk/tests/metrics/exponential_histogram/test_exponential_bucket_histogram_aggregation.py @@ -0,0 +1,1008 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from itertools import permutations +from math import ldexp +from sys import float_info +from types import MethodType +from unittest import TestCase +from unittest.mock import Mock, patch + +from opentelemetry.sdk.metrics._internal.aggregation import ( + AggregationTemporality, + _ExponentialBucketHistogramAggregation, +) +from opentelemetry.sdk.metrics._internal.exponential_histogram.buckets import ( + Buckets, +) +from opentelemetry.sdk.metrics._internal.exponential_histogram.mapping.exponent_mapping import ( + ExponentMapping, +) +from opentelemetry.sdk.metrics._internal.exponential_histogram.mapping.ieee_754 import ( + MAX_NORMAL_EXPONENT, + MIN_NORMAL_EXPONENT, +) +from opentelemetry.sdk.metrics._internal.exponential_histogram.mapping.logarithm_mapping import ( + LogarithmMapping, +) +from opentelemetry.sdk.metrics._internal.measurement import Measurement + + +def get_counts(buckets: Buckets) -> int: + + counts = [] + + for index in range(len(buckets)): + counts.append(buckets[index]) + + return counts + + +def center_val(mapping: ExponentMapping, index: int) -> float: + return ( + mapping.get_lower_boundary(index) + + mapping.get_lower_boundary(index + 1) + ) / 2 + + +def swap( + first: _ExponentialBucketHistogramAggregation, + second: _ExponentialBucketHistogramAggregation, +): + + for attribute in [ + "_positive", + "_negative", + "_sum", + "_count", + "_zero_count", + "_min", + "_max", + "_mapping", + ]: + temp = getattr(first, attribute) + setattr(first, attribute, getattr(second, attribute)) + setattr(second, attribute, temp) + + +class TestExponentialBucketHistogramAggregation(TestCase): + def assertInEpsilon(self, first, second, epsilon): + self.assertLessEqual(first, (second * (1 + epsilon))) + self.assertGreaterEqual(first, (second * (1 - epsilon))) + + def require_equal(self, a, b): + + if a._sum == 0 or b._sum == 0: + self.assertAlmostEqual(a._sum, b._sum, 1e-6) + else: + self.assertInEpsilon(a._sum, b._sum, 1e-6) + + self.assertEqual(a._count, b._count) + self.assertEqual(a._zero_count, b._zero_count) + + self.assertEqual(a._mapping.scale, b._mapping.scale) + + self.assertEqual(len(a._positive), len(b._positive)) + self.assertEqual(len(a._negative), len(b._negative)) + + for index in range(len(a._positive)): + self.assertEqual(a._positive[index], b._positive[index]) + + for index in range(len(a._negative)): + self.assertEqual(a._negative[index], b._negative[index]) + + def test_alternating_growth_0(self): + """ + Tests insertion of [2, 4, 1]. The index of 2 (i.e., 0) becomes + `indexBase`, the 4 goes to its right and the 1 goes in the last + position of the backing array. With 3 binary orders of magnitude + and MaxSize=4, this must finish with scale=0; with minimum value 1 + this must finish with offset=-1 (all scales). + + """ + + # The corresponding Go test is TestAlternatingGrowth1 where: + # agg := NewFloat64(NewConfig(WithMaxSize(4))) + # agg is an instance of github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator/histogram/structure.Histogram[float64] + + exponential_histogram_aggregation = ( + _ExponentialBucketHistogramAggregation(Mock(), Mock(), max_size=4) + ) + + exponential_histogram_aggregation.aggregate(Measurement(2, Mock())) + exponential_histogram_aggregation.aggregate(Measurement(4, Mock())) + exponential_histogram_aggregation.aggregate(Measurement(1, Mock())) + + self.assertEqual( + exponential_histogram_aggregation._positive.offset, -1 + ) + self.assertEqual(exponential_histogram_aggregation._mapping.scale, 0) + self.assertEqual( + get_counts(exponential_histogram_aggregation._positive), [1, 1, 1] + ) + + def test_alternating_growth_1(self): + """ + Tests insertion of [2, 2, 4, 1, 8, 0.5]. The test proceeds as¶ + above but then downscales once further to scale=-1, thus index -1¶ + holds range [0.25, 1.0), index 0 holds range [1.0, 4), index 1¶ + holds range [4, 16).¶ + """ + + exponential_histogram_aggregation = ( + _ExponentialBucketHistogramAggregation(Mock(), Mock(), max_size=4) + ) + + exponential_histogram_aggregation.aggregate(Measurement(2, Mock())) + exponential_histogram_aggregation.aggregate(Measurement(2, Mock())) + exponential_histogram_aggregation.aggregate(Measurement(2, Mock())) + exponential_histogram_aggregation.aggregate(Measurement(1, Mock())) + exponential_histogram_aggregation.aggregate(Measurement(8, Mock())) + exponential_histogram_aggregation.aggregate(Measurement(0.5, Mock())) + + self.assertEqual( + exponential_histogram_aggregation._positive.offset, -1 + ) + self.assertEqual(exponential_histogram_aggregation._mapping.scale, -1) + self.assertEqual( + get_counts(exponential_histogram_aggregation._positive), [2, 3, 1] + ) + + def test_permutations(self): + """ + Tests that every permutation of certain sequences with maxSize=2 + results¶ in the same scale=-1 histogram. + """ + + for test_values, expected in [ + [ + [0.5, 1.0, 2.0], + { + "scale": -1, + "offset": -1, + "len": 2, + "at_0": 2, + "at_1": 1, + }, + ], + [ + [1.0, 2.0, 4.0], + { + "scale": -1, + "offset": -1, + "len": 2, + "at_0": 1, + "at_1": 2, + }, + ], + [ + [0.25, 0.5, 1], + { + "scale": -1, + "offset": -2, + "len": 2, + "at_0": 1, + "at_1": 2, + }, + ], + ]: + + for permutation in permutations(test_values): + + exponential_histogram_aggregation = ( + _ExponentialBucketHistogramAggregation( + Mock(), Mock(), max_size=2 + ) + ) + + for value in permutation: + + exponential_histogram_aggregation.aggregate( + Measurement(value, Mock()) + ) + + self.assertEqual( + exponential_histogram_aggregation._mapping.scale, + expected["scale"], + ) + self.assertEqual( + exponential_histogram_aggregation._positive.offset, + expected["offset"], + ) + self.assertEqual( + len(exponential_histogram_aggregation._positive), + expected["len"], + ) + self.assertEqual( + exponential_histogram_aggregation._positive[0], + expected["at_0"], + ) + self.assertEqual( + exponential_histogram_aggregation._positive[1], + expected["at_1"], + ) + + def test_ascending_sequence(self): + + for max_size in [3, 4, 6, 9]: + for offset in range(-5, 6): + for init_scale in [0, 4]: + self.ascending_sequence_test(max_size, offset, init_scale) + + def ascending_sequence_test( + self, max_size: int, offset: int, init_scale: int + ): + + for step in range(max_size, max_size * 4): + + exponential_histogram_aggregation = ( + _ExponentialBucketHistogramAggregation( + Mock(), Mock(), max_size=max_size + ) + ) + + if init_scale <= 0: + mapping = ExponentMapping(init_scale) + else: + mapping = LogarithmMapping(init_scale) + + min_val = center_val(mapping, offset) + max_val = center_val(mapping, offset + step) + + sum_ = 0.0 + + for index in range(max_size): + value = center_val(mapping, offset + index) + exponential_histogram_aggregation.aggregate( + Measurement(value, Mock()) + ) + sum_ += value + + self.assertEqual( + init_scale, exponential_histogram_aggregation._mapping._scale + ) + self.assertEqual( + offset, exponential_histogram_aggregation._positive.offset + ) + + exponential_histogram_aggregation.aggregate( + Measurement(max_val, Mock()) + ) + sum_ += max_val + + self.assertNotEqual( + 0, exponential_histogram_aggregation._positive[0] + ) + + # The maximum-index filled bucket is at or + # above the mid-point, (otherwise we + # downscaled too much). + + max_fill = 0 + total_count = 0 + + for index in range( + len(exponential_histogram_aggregation._positive) + ): + total_count += exponential_histogram_aggregation._positive[ + index + ] + if exponential_histogram_aggregation._positive[index] != 0: + max_fill = index + + # FIXME the corresponding Go code is + # require.GreaterOrEqual(t, maxFill, uint32(maxSize)/2), make sure + # this is actually equivalent. + self.assertGreaterEqual(max_fill, int(max_size / 2)) + + self.assertGreaterEqual(max_size + 1, total_count) + self.assertGreaterEqual( + max_size + 1, exponential_histogram_aggregation._count + ) + self.assertGreaterEqual( + sum_, exponential_histogram_aggregation._sum + ) + + if init_scale <= 0: + mapping = ExponentMapping( + exponential_histogram_aggregation._mapping.scale + ) + else: + mapping = LogarithmMapping( + exponential_histogram_aggregation._mapping.scale + ) + index = mapping.map_to_index(min_val) + + self.assertEqual( + index, exponential_histogram_aggregation._positive.offset + ) + + index = mapping.map_to_index(max_val) + + self.assertEqual( + index, + exponential_histogram_aggregation._positive.offset + + len(exponential_histogram_aggregation._positive) + - 1, + ) + + def test_reset(self): + + for increment in [0x1, 0x100, 0x10000, 0x100000000, 0x200000000]: + + def mock_increment(self, bucket_index: int) -> None: + """ + Increments a bucket + """ + + self._counts[bucket_index] += increment + + exponential_histogram_aggregation = ( + _ExponentialBucketHistogramAggregation( + Mock(), Mock(), max_size=256 + ) + ) + + self.assertEqual( + exponential_histogram_aggregation._count, + exponential_histogram_aggregation._zero_count, + ) + self.assertEqual(0, exponential_histogram_aggregation._sum) + expect = 0 + + for value in range(2, 257): + expect += value * increment + with patch.object( + exponential_histogram_aggregation._positive, + "increment_bucket", + # new=positive_mock + MethodType( + mock_increment, + exponential_histogram_aggregation._positive, + ), + ): + exponential_histogram_aggregation.aggregate( + Measurement(value, Mock()) + ) + exponential_histogram_aggregation._count *= increment + exponential_histogram_aggregation._sum *= increment + + self.assertEqual(expect, exponential_histogram_aggregation._sum) + self.assertEqual( + 255 * increment, exponential_histogram_aggregation._count + ) + + # See test_integer_aggregation about why scale is 5, len is + # 256 - (1 << scale)- 1 and offset is (1 << scale) - 1. + scale = exponential_histogram_aggregation._mapping.scale + self.assertEqual(5, scale) + + self.assertEqual( + 256 - ((1 << scale) - 1), + len(exponential_histogram_aggregation._positive), + ) + self.assertEqual( + (1 << scale) - 1, + exponential_histogram_aggregation._positive.offset, + ) + + for index in range(0, 256): + self.assertLessEqual( + exponential_histogram_aggregation._positive[index], + 6 * increment, + ) + + def test_move_into(self): + + exponential_histogram_aggregation_0 = ( + _ExponentialBucketHistogramAggregation( + Mock(), Mock(), max_size=256 + ) + ) + exponential_histogram_aggregation_1 = ( + _ExponentialBucketHistogramAggregation( + Mock(), Mock(), max_size=256 + ) + ) + + expect = 0 + + for index in range(2, 257): + expect += index + exponential_histogram_aggregation_0.aggregate( + Measurement(index, Mock()) + ) + exponential_histogram_aggregation_0.aggregate( + Measurement(0, Mock()) + ) + + swap( + exponential_histogram_aggregation_0, + exponential_histogram_aggregation_1, + ) + + self.assertEqual(0, exponential_histogram_aggregation_0._sum) + self.assertEqual(0, exponential_histogram_aggregation_0._count) + self.assertEqual(0, exponential_histogram_aggregation_0._zero_count) + + self.assertEqual(expect, exponential_histogram_aggregation_1._sum) + self.assertEqual(255 * 2, exponential_histogram_aggregation_1._count) + self.assertEqual(255, exponential_histogram_aggregation_1._zero_count) + + scale = exponential_histogram_aggregation_1._mapping.scale + self.assertEqual(5, scale) + + self.assertEqual( + 256 - ((1 << scale) - 1), + len(exponential_histogram_aggregation_1._positive), + ) + self.assertEqual( + (1 << scale) - 1, + exponential_histogram_aggregation_1._positive.offset, + ) + + for index in range(0, 256): + self.assertLessEqual( + exponential_histogram_aggregation_1._positive[index], 6 + ) + + def test_very_large_numbers(self): + + exponential_histogram_aggregation = ( + _ExponentialBucketHistogramAggregation(Mock(), Mock(), max_size=2) + ) + + def expect_balanced(count: int): + self.assertEqual( + 2, len(exponential_histogram_aggregation._positive) + ) + self.assertEqual( + -1, exponential_histogram_aggregation._positive.offset + ) + self.assertEqual( + count, exponential_histogram_aggregation._positive[0] + ) + self.assertEqual( + count, exponential_histogram_aggregation._positive[1] + ) + + exponential_histogram_aggregation.aggregate( + Measurement(2**-100, Mock()) + ) + exponential_histogram_aggregation.aggregate( + Measurement(2**100, Mock()) + ) + + self.assertLessEqual( + 2**100, (exponential_histogram_aggregation._sum * (1 + 1e-5)) + ) + self.assertGreaterEqual( + 2**100, (exponential_histogram_aggregation._sum * (1 - 1e-5)) + ) + + self.assertEqual(2, exponential_histogram_aggregation._count) + self.assertEqual(-7, exponential_histogram_aggregation._mapping.scale) + + expect_balanced(1) + + exponential_histogram_aggregation.aggregate( + Measurement(2**-127, Mock()) + ) + exponential_histogram_aggregation.aggregate( + Measurement(2**128, Mock()) + ) + + self.assertLessEqual( + 2**128, (exponential_histogram_aggregation._sum * (1 + 1e-5)) + ) + self.assertGreaterEqual( + 2**128, (exponential_histogram_aggregation._sum * (1 - 1e-5)) + ) + + self.assertEqual(4, exponential_histogram_aggregation._count) + self.assertEqual(-7, exponential_histogram_aggregation._mapping.scale) + + expect_balanced(2) + + exponential_histogram_aggregation.aggregate( + Measurement(2**-129, Mock()) + ) + exponential_histogram_aggregation.aggregate( + Measurement(2**255, Mock()) + ) + + self.assertLessEqual( + 2**255, (exponential_histogram_aggregation._sum * (1 + 1e-5)) + ) + self.assertGreaterEqual( + 2**255, (exponential_histogram_aggregation._sum * (1 - 1e-5)) + ) + self.assertEqual(6, exponential_histogram_aggregation._count) + self.assertEqual(-8, exponential_histogram_aggregation._mapping.scale) + + expect_balanced(3) + + def test_full_range(self): + + exponential_histogram_aggregation = ( + _ExponentialBucketHistogramAggregation(Mock(), Mock(), max_size=2) + ) + + exponential_histogram_aggregation.aggregate( + Measurement(float_info.max, Mock()) + ) + exponential_histogram_aggregation.aggregate(Measurement(1, Mock())) + exponential_histogram_aggregation.aggregate( + Measurement(2**-1074, Mock()) + ) + + self.assertEqual( + float_info.max, exponential_histogram_aggregation._sum + ) + self.assertEqual(3, exponential_histogram_aggregation._count) + self.assertEqual( + ExponentMapping._min_scale, + exponential_histogram_aggregation._mapping.scale, + ) + + self.assertEqual( + _ExponentialBucketHistogramAggregation._min_max_size, + len(exponential_histogram_aggregation._positive), + ) + self.assertEqual( + -1, exponential_histogram_aggregation._positive.offset + ) + self.assertLessEqual(exponential_histogram_aggregation._positive[0], 2) + self.assertLessEqual(exponential_histogram_aggregation._positive[1], 1) + + def test_aggregator_min_max(self): + + exponential_histogram_aggregation = ( + _ExponentialBucketHistogramAggregation(Mock(), Mock()) + ) + + for value in [1, 3, 5, 7, 9]: + exponential_histogram_aggregation.aggregate( + Measurement(value, Mock()) + ) + + self.assertEqual(1, exponential_histogram_aggregation._min) + self.assertEqual(9, exponential_histogram_aggregation._max) + + exponential_histogram_aggregation = ( + _ExponentialBucketHistogramAggregation(Mock(), Mock()) + ) + + for value in [-1, -3, -5, -7, -9]: + exponential_histogram_aggregation.aggregate( + Measurement(value, Mock()) + ) + + self.assertEqual(-9, exponential_histogram_aggregation._min) + self.assertEqual(-1, exponential_histogram_aggregation._max) + + def test_aggregator_copy_swap(self): + + exponential_histogram_aggregation_0 = ( + _ExponentialBucketHistogramAggregation(Mock(), Mock()) + ) + for value in [1, 3, 5, 7, 9, -1, -3, -5]: + exponential_histogram_aggregation_0.aggregate( + Measurement(value, Mock()) + ) + exponential_histogram_aggregation_1 = ( + _ExponentialBucketHistogramAggregation(Mock(), Mock()) + ) + for value in [5, 4, 3, 2]: + exponential_histogram_aggregation_1.aggregate( + Measurement(value, Mock()) + ) + exponential_histogram_aggregation_2 = ( + _ExponentialBucketHistogramAggregation(Mock(), Mock()) + ) + + swap( + exponential_histogram_aggregation_0, + exponential_histogram_aggregation_1, + ) + + exponential_histogram_aggregation_2._positive.__init__() + exponential_histogram_aggregation_2._negative.__init__() + exponential_histogram_aggregation_2._sum = 0 + exponential_histogram_aggregation_2._count = 0 + exponential_histogram_aggregation_2._zero_count = 0 + exponential_histogram_aggregation_2._min = 0 + exponential_histogram_aggregation_2._max = 0 + exponential_histogram_aggregation_2._mapping = LogarithmMapping( + LogarithmMapping._max_scale + ) + + for attribute in [ + "_positive", + "_negative", + "_sum", + "_count", + "_zero_count", + "_min", + "_max", + "_mapping", + ]: + setattr( + exponential_histogram_aggregation_2, + attribute, + getattr(exponential_histogram_aggregation_1, attribute), + ) + + self.require_equal( + exponential_histogram_aggregation_1, + exponential_histogram_aggregation_2, + ) + + def test_zero_count_by_increment(self): + + exponential_histogram_aggregation_0 = ( + _ExponentialBucketHistogramAggregation(Mock(), Mock()) + ) + + increment = 10 + + for _ in range(increment): + exponential_histogram_aggregation_0.aggregate( + Measurement(0, Mock()) + ) + exponential_histogram_aggregation_1 = ( + _ExponentialBucketHistogramAggregation(Mock(), Mock()) + ) + + # positive_mock = Mock(wraps=exponential_histogram_aggregation_1._positive) + def mock_increment(self, bucket_index: int) -> None: + """ + Increments a bucket + """ + + self._counts[bucket_index] += increment + + with patch.object( + exponential_histogram_aggregation_1._positive, + "increment_bucket", + # new=positive_mock + MethodType( + mock_increment, exponential_histogram_aggregation_1._positive + ), + ): + exponential_histogram_aggregation_1.aggregate( + Measurement(0, Mock()) + ) + exponential_histogram_aggregation_1._count *= increment + exponential_histogram_aggregation_1._zero_count *= increment + + self.require_equal( + exponential_histogram_aggregation_0, + exponential_histogram_aggregation_1, + ) + + def test_one_count_by_increment(self): + + exponential_histogram_aggregation_0 = ( + _ExponentialBucketHistogramAggregation(Mock(), Mock()) + ) + + increment = 10 + + for _ in range(increment): + exponential_histogram_aggregation_0.aggregate( + Measurement(1, Mock()) + ) + exponential_histogram_aggregation_1 = ( + _ExponentialBucketHistogramAggregation(Mock(), Mock()) + ) + + # positive_mock = Mock(wraps=exponential_histogram_aggregation_1._positive) + def mock_increment(self, bucket_index: int) -> None: + """ + Increments a bucket + """ + + self._counts[bucket_index] += increment + + with patch.object( + exponential_histogram_aggregation_1._positive, + "increment_bucket", + # new=positive_mock + MethodType( + mock_increment, exponential_histogram_aggregation_1._positive + ), + ): + exponential_histogram_aggregation_1.aggregate( + Measurement(1, Mock()) + ) + exponential_histogram_aggregation_1._count *= increment + exponential_histogram_aggregation_1._sum *= increment + + self.require_equal( + exponential_histogram_aggregation_0, + exponential_histogram_aggregation_1, + ) + + def test_boundary_statistics(self): + + total = MAX_NORMAL_EXPONENT - MIN_NORMAL_EXPONENT + 1 + + for scale in range( + LogarithmMapping._min_scale, LogarithmMapping._max_scale + 1 + ): + + above = 0 + below = 0 + + if scale <= 0: + mapping = ExponentMapping(scale) + else: + mapping = LogarithmMapping(scale) + + for exp in range(MIN_NORMAL_EXPONENT, MAX_NORMAL_EXPONENT + 1): + value = ldexp(1, exp) + + index = mapping.map_to_index(value) + + try: + boundary = mapping.get_lower_boundary(index + 1) + except Exception as error: + raise error + self.fail(f"Unexpected exception {error} raised") + + if boundary < value: + above += 1 + elif boundary > value: + below += 1 + + self.assertInEpsilon(0.5, above / total, 0.05) + self.assertInEpsilon(0.5, below / total, 0.06) + + def test_min_max_size(self): + """ + Tests that the minimum max_size is the right value. + """ + + exponential_histogram_aggregation = ( + _ExponentialBucketHistogramAggregation( + Mock(), + Mock(), + max_size=_ExponentialBucketHistogramAggregation._min_max_size, + ) + ) + + # The minimum and maximum normal floating point values are used here to + # make sure the mapping can contain the full range of values. + exponential_histogram_aggregation.aggregate(Mock(value=float_info.min)) + exponential_histogram_aggregation.aggregate(Mock(value=float_info.max)) + + # This means the smallest max_scale is enough for the full range of the + # normal floating point values. + self.assertEqual( + len(exponential_histogram_aggregation._positive._counts), + exponential_histogram_aggregation._min_max_size, + ) + + def test_aggregate_collect(self): + """ + Tests a repeated cycle of aggregation and collection. + """ + """ + try: + exponential_histogram_aggregation = ( + _ExponentialBucketHistogramAggregation( + Mock(), + Mock(), + ) + ) + + exponential_histogram_aggregation.aggregate(Measurement(2, Mock())) + exponential_histogram_aggregation.collect( + AggregationTemporality.CUMULATIVE, 0 + ) + exponential_histogram_aggregation.aggregate(Measurement(2, Mock())) + exponential_histogram_aggregation.collect( + AggregationTemporality.CUMULATIVE, 0 + ) + exponential_histogram_aggregation.aggregate(Measurement(2, Mock())) + exponential_histogram_aggregation.collect( + AggregationTemporality.CUMULATIVE, 0 + ) + except Exception as error: + self.fail(f"Unexpected exception raised: {error}") + """ + exponential_histogram_aggregation = ( + _ExponentialBucketHistogramAggregation( + Mock(), + Mock(), + ) + ) + + exponential_histogram_aggregation.aggregate(Measurement(2, Mock())) + exponential_histogram_aggregation.collect( + AggregationTemporality.CUMULATIVE, 0 + ) + exponential_histogram_aggregation.aggregate(Measurement(2, Mock())) + exponential_histogram_aggregation.collect( + AggregationTemporality.CUMULATIVE, 0 + ) + exponential_histogram_aggregation.aggregate(Measurement(2, Mock())) + exponential_histogram_aggregation.collect( + AggregationTemporality.CUMULATIVE, 0 + ) + + def test_collect_results_cumulative(self): + exponential_histogram_aggregation = ( + _ExponentialBucketHistogramAggregation( + Mock(), + Mock(), + ) + ) + + self.assertEqual(exponential_histogram_aggregation._mapping._scale, 20) + + exponential_histogram_aggregation.aggregate(Measurement(2, Mock())) + self.assertEqual(exponential_histogram_aggregation._mapping._scale, 20) + + exponential_histogram_aggregation.aggregate(Measurement(4, Mock())) + self.assertEqual(exponential_histogram_aggregation._mapping._scale, 7) + + exponential_histogram_aggregation.aggregate(Measurement(1, Mock())) + self.assertEqual(exponential_histogram_aggregation._mapping._scale, 6) + + collection_0 = exponential_histogram_aggregation.collect( + AggregationTemporality.CUMULATIVE, Mock() + ) + + self.assertEqual(len(collection_0.positive.bucket_counts), 160) + + self.assertEqual(collection_0.count, 3) + self.assertEqual(collection_0.sum, 7) + self.assertEqual(collection_0.scale, 6) + self.assertEqual(collection_0.zero_count, 0) + self.assertEqual( + collection_0.positive.bucket_counts, + [1, *[0] * 63, 1, *[0] * 31, 1, *[0] * 63], + ) + self.assertEqual(collection_0.flags, 0) + self.assertEqual(collection_0.min, 1) + self.assertEqual(collection_0.max, 4) + + exponential_histogram_aggregation.aggregate(Measurement(1, Mock())) + exponential_histogram_aggregation.aggregate(Measurement(8, Mock())) + exponential_histogram_aggregation.aggregate(Measurement(0.5, Mock())) + exponential_histogram_aggregation.aggregate(Measurement(0.1, Mock())) + exponential_histogram_aggregation.aggregate(Measurement(0.045, Mock())) + + collection_1 = exponential_histogram_aggregation.collect( + AggregationTemporality.CUMULATIVE, Mock() + ) + + previous_count = collection_1.positive.bucket_counts[0] + + count_counts = [[previous_count, 0]] + + for count in collection_1.positive.bucket_counts: + if count == previous_count: + count_counts[-1][1] += 1 + else: + previous_count = count + count_counts.append([previous_count, 1]) + + self.assertEqual(collection_1.count, 5) + self.assertEqual(collection_1.sum, 16.645) + self.assertEqual(collection_1.scale, 4) + self.assertEqual(collection_1.zero_count, 0) + + self.assertEqual( + collection_1.positive.bucket_counts, + [ + 1, + *[0] * 15, + 1, + *[0] * 47, + 1, + *[0] * 40, + 1, + *[0] * 17, + 1, + *[0] * 36, + ], + ) + self.assertEqual(collection_1.flags, 0) + self.assertEqual(collection_1.min, 0.045) + self.assertEqual(collection_1.max, 8) + + def test_merge_collect_cumulative(self): + exponential_histogram_aggregation = ( + _ExponentialBucketHistogramAggregation(Mock(), Mock(), max_size=4) + ) + + for value in [2, 4, 8, 16]: + exponential_histogram_aggregation.aggregate( + Measurement(value, Mock()) + ) + + self.assertEqual(exponential_histogram_aggregation._mapping.scale, 0) + self.assertEqual(exponential_histogram_aggregation._positive.offset, 0) + self.assertEqual( + exponential_histogram_aggregation._positive.counts, [1, 1, 1, 1] + ) + + result = exponential_histogram_aggregation.collect( + AggregationTemporality.CUMULATIVE, + 0, + ) + + for value in [1, 2, 4, 8]: + exponential_histogram_aggregation.aggregate( + Measurement(1 / value, Mock()) + ) + + self.assertEqual(exponential_histogram_aggregation._mapping.scale, 0) + self.assertEqual( + exponential_histogram_aggregation._positive.offset, -4 + ) + self.assertEqual( + exponential_histogram_aggregation._positive.counts, [1, 1, 1, 1] + ) + + result_1 = exponential_histogram_aggregation.collect( + AggregationTemporality.CUMULATIVE, + 0, + ) + + self.assertEqual(result.scale, result_1.scale) + + def test_merge_collect_delta(self): + exponential_histogram_aggregation = ( + _ExponentialBucketHistogramAggregation(Mock(), Mock(), max_size=4) + ) + + for value in [2, 4, 8, 16]: + exponential_histogram_aggregation.aggregate( + Measurement(value, Mock()) + ) + + self.assertEqual(exponential_histogram_aggregation._mapping.scale, 0) + self.assertEqual(exponential_histogram_aggregation._positive.offset, 0) + self.assertEqual( + exponential_histogram_aggregation._positive.counts, [1, 1, 1, 1] + ) + + result = exponential_histogram_aggregation.collect( + AggregationTemporality.DELTA, + 0, + ) + + for value in [1, 2, 4, 8]: + exponential_histogram_aggregation.aggregate( + Measurement(1 / value, Mock()) + ) + + self.assertEqual(exponential_histogram_aggregation._mapping.scale, 0) + self.assertEqual( + exponential_histogram_aggregation._positive.offset, -4 + ) + self.assertEqual( + exponential_histogram_aggregation._positive.counts, [1, 1, 1, 1] + ) + + result_1 = exponential_histogram_aggregation.collect( + AggregationTemporality.DELTA, + 0, + ) + + self.assertEqual(result.scale, result_1.scale)