From 6e6f6db23a907e05ae27bbaaa11f0d9a71cae8a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Wed, 19 Feb 2020 16:11:58 -0500 Subject: [PATCH 1/6] sdk/metrics/aggregators: make Counter threadsafe --- .../sdk/metrics/export/aggregate.py | 10 +++-- .../tests/metrics/export/test_export.py | 39 +++++++++++++++++++ 2 files changed, 46 insertions(+), 3 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py index 5c55ba038a..56a4d64a97 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py @@ -13,6 +13,7 @@ # limitations under the License. import abc +import threading from collections import namedtuple @@ -47,13 +48,16 @@ def __init__(self): super().__init__() self.current = 0 self.checkpoint = 0 + self._lock = threading.Lock() def update(self, value): - self.current += value + with self._lock: + self.current += value def take_checkpoint(self): - self.checkpoint = self.current - self.current = 0 + with self._lock: + self.checkpoint = self.current + self.current = 0 def merge(self, other): self.checkpoint += other.checkpoint diff --git a/opentelemetry-sdk/tests/metrics/export/test_export.py b/opentelemetry-sdk/tests/metrics/export/test_export.py index 5df6c6d08a..9da9dae73e 100644 --- a/opentelemetry-sdk/tests/metrics/export/test_export.py +++ b/opentelemetry-sdk/tests/metrics/export/test_export.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import concurrent.futures +import random import unittest from unittest import mock @@ -222,6 +224,15 @@ def test_ungrouped_batcher_process_not_stateful(self): class TestCounterAggregator(unittest.TestCase): + @classmethod + def call_update(cls, counter): + update_total = 0 + for _ in range(0, 100000): + val = random.getrandbits(32) + counter.update(val) + update_total += val + return update_total + def test_update(self): counter = CounterAggregator() counter.update(1.0) @@ -243,6 +254,34 @@ def test_merge(self): counter.merge(counter2) self.assertEqual(counter.checkpoint, 4.0) + def test_concurrent_update(self): + counter = CounterAggregator() + + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + fut1 = executor.submit(self.call_update, counter) + fut2 = executor.submit(self.call_update, counter) + + updapte_total = fut1.result() + fut2.result() + + counter.take_checkpoint() + self.assertEqual(updapte_total, counter.checkpoint) + + def test_concurrent_update_and_checkpoint(self): + counter = CounterAggregator() + checkpoint_total = 0 + + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: + fut = executor.submit(self.call_update, counter) + + while fut.running(): + counter.take_checkpoint() + checkpoint_total += counter.checkpoint + + counter.take_checkpoint() + checkpoint_total += counter.checkpoint + + self.assertEqual(fut.result(), checkpoint_total) + class TestMinMaxSumCountAggregator(unittest.TestCase): def test_update(self): From 1f9444ec465b987f2f3603deb81655878d8dbe16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Wed, 19 Feb 2020 17:19:11 -0500 Subject: [PATCH 2/6] sdk/metrics/aggregator: make MinMaxSumCount threadsafe --- .../sdk/metrics/export/aggregate.py | 61 +++++++------ .../tests/metrics/export/test_export.py | 89 +++++++++++++++++-- 2 files changed, 111 insertions(+), 39 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py index 56a4d64a97..9e707469f8 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py @@ -67,46 +67,45 @@ class MinMaxSumCountAggregator(Aggregator): """Agregator for Measure metrics that keeps min, max, sum and count.""" _TYPE = namedtuple("minmaxsumcount", "min max sum count") + _EMPTY = _TYPE(None, None, None, 0) @classmethod - def _min(cls, val1, val2): - if val1 is None and val2 is None: - return None - return min(val1 or val2, val2 or val1) - - @classmethod - def _max(cls, val1, val2): - if val1 is None and val2 is None: - return None - return max(val1 or val2, val2 or val1) - - @classmethod - def _sum(cls, val1, val2): - if val1 is None and val2 is None: - return None - return (val1 or 0) + (val2 or 0) + def _merge_checkpoint(cls, val1, val2): + if val1 is cls._EMPTY: + return val2 + if val2 is cls._EMPTY: + return val1 + return cls._TYPE( + min(val1.min, val2.min), + max(val1.max, val2.max), + val1.sum + val2.sum, + val1.count + val2.count, + ) def __init__(self): super().__init__() - self.current = self._TYPE(None, None, None, 0) - self.checkpoint = self._TYPE(None, None, None, 0) + self.current = self._EMPTY + self.checkpoint = self._EMPTY + self._lock = threading.Lock() def update(self, value): - self.current = self._TYPE( - self._min(self.current.min, value), - self._max(self.current.max, value), - self._sum(self.current.sum, value), - self.current.count + 1, - ) + with self._lock: + if self.current is self._EMPTY: + self.current = self._TYPE(value, value, value, 1) + else: + self.current = self._TYPE( + min(self.current.min, value), + max(self.current.max, value), + self.current.sum + value, + self.current.count + 1, + ) def take_checkpoint(self): - self.checkpoint = self.current - self.current = self._TYPE(None, None, None, 0) + with self._lock: + self.checkpoint = self.current + self.current = self._EMPTY def merge(self, other): - self.checkpoint = self._TYPE( - self._min(self.checkpoint.min, other.checkpoint.min), - self._max(self.checkpoint.max, other.checkpoint.max), - self._sum(self.checkpoint.sum, other.checkpoint.sum), - self.checkpoint.count + other.checkpoint.count, + self.checkpoint = self._merge_checkpoint( + self.checkpoint, other.checkpoint ) diff --git a/opentelemetry-sdk/tests/metrics/export/test_export.py b/opentelemetry-sdk/tests/metrics/export/test_export.py index 9da9dae73e..d5e166cd30 100644 --- a/opentelemetry-sdk/tests/metrics/export/test_export.py +++ b/opentelemetry-sdk/tests/metrics/export/test_export.py @@ -284,11 +284,28 @@ def test_concurrent_update_and_checkpoint(self): class TestMinMaxSumCountAggregator(unittest.TestCase): + @classmethod + def call_update(cls, mmsc): + min_ = 2 ** 32 + max_ = 0 + sum_ = 0 + count_ = 0 + for _ in range(0, 100000): + val = random.getrandbits(32) + mmsc.update(val) + if val < min_: + min_ = val + if val > max_: + max_ = val + sum_ += val + count_ += 1 + return MinMaxSumCountAggregator._TYPE(min_, max_, sum_, count_) + def test_update(self): mmsc = MinMaxSumCountAggregator() # test current values without any update self.assertEqual( - mmsc.current, (None, None, None, 0), + mmsc.current, MinMaxSumCountAggregator._EMPTY, ) # call update with some values @@ -306,7 +323,7 @@ def test_checkpoint(self): # take checkpoint wihtout any update mmsc.take_checkpoint() self.assertEqual( - mmsc.checkpoint, (None, None, None, 0), + mmsc.checkpoint, MinMaxSumCountAggregator._EMPTY, ) # call update with some values @@ -321,7 +338,7 @@ def test_checkpoint(self): ) self.assertEqual( - mmsc.current, (None, None, None, 0), + mmsc.current, MinMaxSumCountAggregator._EMPTY, ) def test_merge(self): @@ -338,14 +355,34 @@ def test_merge(self): self.assertEqual( mmsc1.checkpoint, - ( - min(checkpoint1.min, checkpoint2.min), - max(checkpoint1.max, checkpoint2.max), - checkpoint1.sum + checkpoint2.sum, - checkpoint1.count + checkpoint2.count, + MinMaxSumCountAggregator._merge_checkpoint( + checkpoint1, checkpoint2 ), ) + def test_merge_checkpoint(self): + func = MinMaxSumCountAggregator._merge_checkpoint + _type = MinMaxSumCountAggregator._TYPE + empty = MinMaxSumCountAggregator._EMPTY + + ret = func(empty, empty) + self.assertEqual(ret, empty) + + ret = func(empty, _type(0, 0, 0, 0)) + self.assertEqual(ret, _type(0, 0, 0, 0)) + + ret = func(_type(0, 0, 0, 0), empty) + self.assertEqual(ret, _type(0, 0, 0, 0)) + + ret = func(_type(0, 0, 0, 0), _type(0, 0, 0, 0)) + self.assertEqual(ret, _type(0, 0, 0, 0)) + + ret = func(_type(44, 23, 55, 86), empty) + self.assertEqual(ret, _type(44, 23, 55, 86)) + + ret = func(_type(3, 150, 101, 3), _type(1, 33, 44, 2)) + self.assertEqual(ret, _type(1, 150, 101 + 44, 2 + 3)) + def test_merge_with_empty(self): mmsc1 = MinMaxSumCountAggregator() mmsc2 = MinMaxSumCountAggregator() @@ -357,6 +394,42 @@ def test_merge_with_empty(self): self.assertEqual(mmsc1.checkpoint, checkpoint1) + def test_concurrent_update(self): + mmsc = MinMaxSumCountAggregator() + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as ex: + fut1 = ex.submit(self.call_update, mmsc) + fut2 = ex.submit(self.call_update, mmsc) + + ret1 = fut1.result() + ret2 = fut2.result() + + update_total = MinMaxSumCountAggregator._merge_checkpoint( + ret1, ret2 + ) + mmsc.take_checkpoint() + + self.assertEqual(update_total, mmsc.checkpoint) + + def test_concurrent_update_and_checkpoint(self): + mmsc = MinMaxSumCountAggregator() + checkpoint_total = MinMaxSumCountAggregator._TYPE(2 ** 32, 0, 0, 0) + + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as ex: + fut = ex.submit(self.call_update, mmsc) + + while fut.running(): + mmsc.take_checkpoint() + checkpoint_total = MinMaxSumCountAggregator._merge_checkpoint( + checkpoint_total, mmsc.checkpoint + ) + + mmsc.take_checkpoint() + checkpoint_total = MinMaxSumCountAggregator._merge_checkpoint( + checkpoint_total, mmsc.checkpoint + ) + + self.assertEqual(checkpoint_total, fut.result()) + class TestController(unittest.TestCase): def test_push_controller(self): From 2482e09894934f30aa8aba1b798c2f4d8ea18b26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Fri, 21 Feb 2020 14:20:57 -0500 Subject: [PATCH 3/6] use future.done() instead of future.running() It's possible tha future.running() returns BEFORE the task is scheduled. --- opentelemetry-sdk/tests/metrics/export/test_export.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/opentelemetry-sdk/tests/metrics/export/test_export.py b/opentelemetry-sdk/tests/metrics/export/test_export.py index d5e166cd30..c74d61b2fc 100644 --- a/opentelemetry-sdk/tests/metrics/export/test_export.py +++ b/opentelemetry-sdk/tests/metrics/export/test_export.py @@ -273,7 +273,7 @@ def test_concurrent_update_and_checkpoint(self): with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: fut = executor.submit(self.call_update, counter) - while fut.running(): + while not fut.done(): counter.take_checkpoint() checkpoint_total += counter.checkpoint @@ -417,7 +417,7 @@ def test_concurrent_update_and_checkpoint(self): with concurrent.futures.ThreadPoolExecutor(max_workers=1) as ex: fut = ex.submit(self.call_update, mmsc) - while fut.running(): + while not fut.done(): mmsc.take_checkpoint() checkpoint_total = MinMaxSumCountAggregator._merge_checkpoint( checkpoint_total, mmsc.checkpoint From 80d73abcb5e43fc8e79311cc127b98b072a8fd67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Mon, 24 Feb 2020 08:01:09 -0500 Subject: [PATCH 4/6] use float("inf") instead of 2 ** 32. --- opentelemetry-sdk/tests/metrics/export/test_export.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/opentelemetry-sdk/tests/metrics/export/test_export.py b/opentelemetry-sdk/tests/metrics/export/test_export.py index c74d61b2fc..b7b34995d8 100644 --- a/opentelemetry-sdk/tests/metrics/export/test_export.py +++ b/opentelemetry-sdk/tests/metrics/export/test_export.py @@ -286,8 +286,8 @@ def test_concurrent_update_and_checkpoint(self): class TestMinMaxSumCountAggregator(unittest.TestCase): @classmethod def call_update(cls, mmsc): - min_ = 2 ** 32 - max_ = 0 + min_ = float("inf") + max_ = float("-inf") sum_ = 0 count_ = 0 for _ in range(0, 100000): From e9580d82a4d110518f358b8ca098bf912fe328f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Mon, 24 Feb 2020 21:09:13 -0500 Subject: [PATCH 5/6] use staticmethod instead of classmethod --- opentelemetry-sdk/tests/metrics/export/test_export.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/opentelemetry-sdk/tests/metrics/export/test_export.py b/opentelemetry-sdk/tests/metrics/export/test_export.py index b7b34995d8..51d7aaaf4f 100644 --- a/opentelemetry-sdk/tests/metrics/export/test_export.py +++ b/opentelemetry-sdk/tests/metrics/export/test_export.py @@ -224,8 +224,8 @@ def test_ungrouped_batcher_process_not_stateful(self): class TestCounterAggregator(unittest.TestCase): - @classmethod - def call_update(cls, counter): + @staticmethod + def call_update(counter): update_total = 0 for _ in range(0, 100000): val = random.getrandbits(32) @@ -284,8 +284,8 @@ def test_concurrent_update_and_checkpoint(self): class TestMinMaxSumCountAggregator(unittest.TestCase): - @classmethod - def call_update(cls, mmsc): + @staticmethod + def call_update(mmsc): min_ = float("inf") max_ = float("-inf") sum_ = 0 From 53ae9ae902d800d4764368e8fea6009e0797443c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Tue, 25 Feb 2020 07:42:41 -0500 Subject: [PATCH 6/6] add locks on merge() too --- .../src/opentelemetry/sdk/metrics/export/aggregate.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py index 9e707469f8..f082cce891 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py @@ -60,7 +60,8 @@ def take_checkpoint(self): self.current = 0 def merge(self, other): - self.checkpoint += other.checkpoint + with self._lock: + self.checkpoint += other.checkpoint class MinMaxSumCountAggregator(Aggregator): @@ -106,6 +107,7 @@ def take_checkpoint(self): self.current = self._EMPTY def merge(self, other): - self.checkpoint = self._merge_checkpoint( - self.checkpoint, other.checkpoint - ) + with self._lock: + self.checkpoint = self._merge_checkpoint( + self.checkpoint, other.checkpoint + )