From 744851b3cf3a58471e44c3a20f143275fa16cdce Mon Sep 17 00:00:00 2001 From: Shalev Roda <65566801+shalevr@users.noreply.github.com> Date: Wed, 12 Oct 2022 19:22:12 +0300 Subject: [PATCH] metric instrumentation Tornado (#1252) --- CHANGELOG.md | 3 + instrumentation/README.md | 2 +- .../instrumentation/tornado/__init__.py | 186 ++++++++++++-- .../instrumentation/tornado/client.py | 51 +++- .../instrumentation/tornado/package.py | 2 + .../tests/test_instrumentation.py | 8 +- .../tests/test_metrics_instrumentation.py | 235 ++++++++++++++++++ 7 files changed, 464 insertions(+), 23 deletions(-) create mode 100644 instrumentation/opentelemetry-instrumentation-tornado/tests/test_metrics_instrumentation.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 0c0743d209..6e26036875 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v1.13.0-0.34b0...HEAD) +- Add metric instrumentation for tornado + ([#1252](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1252)) + ### Added diff --git a/instrumentation/README.md b/instrumentation/README.md index 973adeee85..33e178af86 100644 --- a/instrumentation/README.md +++ b/instrumentation/README.md @@ -38,7 +38,7 @@ | [opentelemetry-instrumentation-sqlite3](./opentelemetry-instrumentation-sqlite3) | sqlite3 | No | [opentelemetry-instrumentation-starlette](./opentelemetry-instrumentation-starlette) | starlette ~= 0.13.0 | Yes | [opentelemetry-instrumentation-system-metrics](./opentelemetry-instrumentation-system-metrics) | psutil >= 5 | No -| [opentelemetry-instrumentation-tornado](./opentelemetry-instrumentation-tornado) | tornado >= 5.1.1 | No +| [opentelemetry-instrumentation-tornado](./opentelemetry-instrumentation-tornado) | tornado >= 5.1.1 | Yes | [opentelemetry-instrumentation-urllib](./opentelemetry-instrumentation-urllib) | urllib | No | [opentelemetry-instrumentation-urllib3](./opentelemetry-instrumentation-urllib3) | urllib3 >= 1.0.0, < 2.0.0 | Yes | [opentelemetry-instrumentation-wsgi](./opentelemetry-instrumentation-wsgi) | wsgi | Yes \ No newline at end of file diff --git a/instrumentation/opentelemetry-instrumentation-tornado/src/opentelemetry/instrumentation/tornado/__init__.py b/instrumentation/opentelemetry-instrumentation-tornado/src/opentelemetry/instrumentation/tornado/__init__.py index 59fc13944e..c316c8075f 100644 --- a/instrumentation/opentelemetry-instrumentation-tornado/src/opentelemetry/instrumentation/tornado/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-tornado/src/opentelemetry/instrumentation/tornado/__init__.py @@ -157,7 +157,8 @@ def client_resposne_hook(span, future): from functools import partial from logging import getLogger from time import time_ns -from typing import Collection +from timeit import default_timer +from typing import Collection, Dict import tornado.web import wrapt @@ -177,6 +178,8 @@ def client_resposne_hook(span, future): http_status_to_status_code, unwrap, ) +from opentelemetry.metrics import get_meter +from opentelemetry.metrics._internal.instrument import Histogram from opentelemetry.propagators import textmap from opentelemetry.semconv.trace import SpanAttributes from opentelemetry.trace.status import Status, StatusCode @@ -197,6 +200,14 @@ def client_resposne_hook(span, future): _HANDLER_CONTEXT_KEY = "_otel_trace_context_key" _OTEL_PATCHED_KEY = "_otel_patched_key" +_START_TIME = "start_time" +_CLIENT_DURATION_HISTOGRAM = "http.client.duration" +_CLIENT_REQUEST_SIZE_HISTOGRAM = "http.client.request.size" +_CLIENT_RESPONSE_SIZE_HISTOGRAM = "http.client.response.size" +_SERVER_DURATION_HISTOGRAM = "http.server.duration" +_SERVER_REQUEST_SIZE_HISTOGRAM = "http.server.request.size" +_SERVER_RESPONSE_SIZE_HISTOGRAM = "http.server.response.size" +_SERVER_ACTIVE_REQUESTS_HISTOGRAM = "http.server.active_requests" _excluded_urls = get_excluded_urls("TORNADO") _traced_request_attrs = get_traced_request_attrs("TORNADO") @@ -233,13 +244,21 @@ def _instrument(self, **kwargs): tracer_provider = kwargs.get("tracer_provider") tracer = trace.get_tracer(__name__, __version__, tracer_provider) + meter_provider = kwargs.get("meter_provider") + meter = get_meter(__name__, __version__, meter_provider) + + client_histograms = _create_client_histograms(meter) + server_histograms = _create_server_histograms(meter) + client_request_hook = kwargs.get("client_request_hook", None) client_response_hook = kwargs.get("client_response_hook", None) server_request_hook = kwargs.get("server_request_hook", None) def handler_init(init, handler, args, kwargs): cls = handler.__class__ - if patch_handler_class(tracer, cls, server_request_hook): + if patch_handler_class( + tracer, server_histograms, cls, server_request_hook + ): self.patched_handlers.append(cls) return init(*args, **kwargs) @@ -250,7 +269,13 @@ def handler_init(init, handler, args, kwargs): "tornado.httpclient", "AsyncHTTPClient.fetch", partial( - fetch_async, tracer, client_request_hook, client_response_hook + fetch_async, + tracer, + client_request_hook, + client_response_hook, + client_histograms[_CLIENT_DURATION_HISTOGRAM], + client_histograms[_CLIENT_REQUEST_SIZE_HISTOGRAM], + client_histograms[_CLIENT_RESPONSE_SIZE_HISTOGRAM], ), ) @@ -262,14 +287,71 @@ def _uninstrument(self, **kwargs): self.patched_handlers = [] -def patch_handler_class(tracer, cls, request_hook=None): +def _create_server_histograms(meter) -> Dict[str, Histogram]: + histograms = { + _SERVER_DURATION_HISTOGRAM: meter.create_histogram( + name="http.server.duration", + unit="ms", + description="measures the duration outbound HTTP requests", + ), + _SERVER_REQUEST_SIZE_HISTOGRAM: meter.create_histogram( + name="http.server.request.size", + unit="By", + description="measures the size of HTTP request messages (compressed)", + ), + _SERVER_RESPONSE_SIZE_HISTOGRAM: meter.create_histogram( + name="http.server.response.size", + unit="By", + description="measures the size of HTTP response messages (compressed)", + ), + _SERVER_ACTIVE_REQUESTS_HISTOGRAM: meter.create_up_down_counter( + name="http.server.active_requests", + unit="requests", + description="measures the number of concurrent HTTP requests that are currently in-flight", + ), + } + + return histograms + + +def _create_client_histograms(meter) -> Dict[str, Histogram]: + histograms = { + _CLIENT_DURATION_HISTOGRAM: meter.create_histogram( + name="http.client.duration", + unit="ms", + description="measures the duration outbound HTTP requests", + ), + _CLIENT_REQUEST_SIZE_HISTOGRAM: meter.create_histogram( + name="http.client.request.size", + unit="By", + description="measures the size of HTTP request messages (compressed)", + ), + _CLIENT_RESPONSE_SIZE_HISTOGRAM: meter.create_histogram( + name="http.client.response.size", + unit="By", + description="measures the size of HTTP response messages (compressed)", + ), + } + + return histograms + + +def patch_handler_class(tracer, server_histograms, cls, request_hook=None): if getattr(cls, _OTEL_PATCHED_KEY, False): return False setattr(cls, _OTEL_PATCHED_KEY, True) - _wrap(cls, "prepare", partial(_prepare, tracer, request_hook)) - _wrap(cls, "on_finish", partial(_on_finish, tracer)) - _wrap(cls, "log_exception", partial(_log_exception, tracer)) + _wrap( + cls, + "prepare", + partial(_prepare, tracer, server_histograms, request_hook), + ) + _wrap(cls, "on_finish", partial(_on_finish, tracer, server_histograms)) + _wrap( + cls, + "log_exception", + partial(_log_exception, tracer, server_histograms), + ) return True @@ -289,28 +371,40 @@ def _wrap(cls, method_name, wrapper): wrapt.apply_patch(cls, method_name, wrapper) -def _prepare(tracer, request_hook, func, handler, args, kwargs): - start_time = time_ns() +def _prepare( + tracer, server_histograms, request_hook, func, handler, args, kwargs +): + server_histograms[_START_TIME] = default_timer() + request = handler.request if _excluded_urls.url_disabled(request.uri): return func(*args, **kwargs) - ctx = _start_span(tracer, handler, start_time) + + _record_prepare_metrics(server_histograms, handler) + + ctx = _start_span(tracer, handler) if request_hook: request_hook(ctx.span, handler) return func(*args, **kwargs) -def _on_finish(tracer, func, handler, args, kwargs): +def _on_finish(tracer, server_histograms, func, handler, args, kwargs): response = func(*args, **kwargs) + + _record_on_finish_metrics(server_histograms, handler) + _finish_span(tracer, handler) + return response -def _log_exception(tracer, func, handler, args, kwargs): +def _log_exception(tracer, server_histograms, func, handler, args, kwargs): error = None if len(args) == 3: error = args[1] + _record_on_finish_metrics(server_histograms, handler, error) + _finish_span(tracer, handler, error) return func(*args, **kwargs) @@ -377,11 +471,11 @@ def _get_full_handler_name(handler): return f"{klass.__module__}.{klass.__qualname__}" -def _start_span(tracer, handler, start_time) -> _TraceContext: +def _start_span(tracer, handler) -> _TraceContext: span, token = _start_internal_or_server_span( tracer=tracer, span_name=_get_operation_name(handler, handler.request), - start_time=start_time, + start_time=time_ns(), context_carrier=handler.request.headers, context_getter=textmap.default_getter, ) @@ -423,7 +517,7 @@ def _finish_span(tracer, handler, error=None): if isinstance(error, tornado.web.HTTPError): status_code = error.status_code if not ctx and status_code == 404: - ctx = _start_span(tracer, handler, time_ns()) + ctx = _start_span(tracer, handler) else: status_code = 500 reason = None @@ -462,3 +556,65 @@ def _finish_span(tracer, handler, error=None): if ctx.token: context.detach(ctx.token) delattr(handler, _HANDLER_CONTEXT_KEY) + + +def _record_prepare_metrics(server_histograms, handler): + request_size = int(handler.request.headers.get("Content-Length", 0)) + metric_attributes = _create_metric_attributes(handler) + + server_histograms[_SERVER_REQUEST_SIZE_HISTOGRAM].record( + request_size, attributes=metric_attributes + ) + + active_requests_attributes = _create_active_requests_attributes( + handler.request + ) + server_histograms[_SERVER_ACTIVE_REQUESTS_HISTOGRAM].add( + 1, attributes=active_requests_attributes + ) + + +def _record_on_finish_metrics(server_histograms, handler, error=None): + elapsed_time = round( + (default_timer() - server_histograms[_START_TIME]) * 1000 + ) + + response_size = int(handler._headers.get("Content-Length", 0)) + metric_attributes = _create_metric_attributes(handler) + + if isinstance(error, tornado.web.HTTPError): + metric_attributes[SpanAttributes.HTTP_STATUS_CODE] = error.status_code + + server_histograms[_SERVER_RESPONSE_SIZE_HISTOGRAM].record( + response_size, attributes=metric_attributes + ) + + server_histograms[_SERVER_DURATION_HISTOGRAM].record( + elapsed_time, attributes=metric_attributes + ) + + active_requests_attributes = _create_active_requests_attributes( + handler.request + ) + server_histograms[_SERVER_ACTIVE_REQUESTS_HISTOGRAM].add( + -1, attributes=active_requests_attributes + ) + + +def _create_active_requests_attributes(request): + metric_attributes = { + SpanAttributes.HTTP_METHOD: request.method, + SpanAttributes.HTTP_SCHEME: request.protocol, + SpanAttributes.HTTP_FLAVOR: request.version, + SpanAttributes.HTTP_HOST: request.host, + SpanAttributes.HTTP_TARGET: request.path, + } + + return metric_attributes + + +def _create_metric_attributes(handler): + metric_attributes = _create_active_requests_attributes(handler.request) + metric_attributes[SpanAttributes.HTTP_STATUS_CODE] = handler.get_status() + + return metric_attributes diff --git a/instrumentation/opentelemetry-instrumentation-tornado/src/opentelemetry/instrumentation/tornado/client.py b/instrumentation/opentelemetry-instrumentation-tornado/src/opentelemetry/instrumentation/tornado/client.py index 9a682c6160..090f87a88b 100644 --- a/instrumentation/opentelemetry-instrumentation-tornado/src/opentelemetry/instrumentation/tornado/client.py +++ b/instrumentation/opentelemetry-instrumentation-tornado/src/opentelemetry/instrumentation/tornado/client.py @@ -41,7 +41,18 @@ def _normalize_request(args, kwargs): return (new_args, new_kwargs) -def fetch_async(tracer, request_hook, response_hook, func, _, args, kwargs): +def fetch_async( + tracer, + request_hook, + response_hook, + duration_histogram, + request_size_histogram, + response_size_histogram, + func, + _, + args, + kwargs, +): start_time = time_ns() # Return immediately if no args were provided (error) @@ -78,21 +89,34 @@ def fetch_async(tracer, request_hook, response_hook, func, _, args, kwargs): _finish_tracing_callback, span=span, response_hook=response_hook, + duration_histogram=duration_histogram, + request_size_histogram=request_size_histogram, + response_size_histogram=response_size_histogram, ) ) return future -def _finish_tracing_callback(future, span, response_hook): +def _finish_tracing_callback( + future, + span, + response_hook, + duration_histogram, + request_size_histogram, + response_size_histogram, +): status_code = None description = None exc = future.exception() + + response = future.result() + if span.is_recording() and exc: if isinstance(exc, HTTPError): status_code = exc.code description = f"{type(exc).__name__}: {exc}" else: - status_code = future.result().code + status_code = response.code if status_code is not None: span.set_attribute(SpanAttributes.HTTP_STATUS_CODE, status_code) @@ -102,6 +126,27 @@ def _finish_tracing_callback(future, span, response_hook): description=description, ) ) + + metric_attributes = _create_metric_attributes(response) + request_size = int(response.request.headers.get("Content-Length", 0)) + response_size = int(response.headers.get("Content-Length", 0)) + + duration_histogram.record( + response.request_time, attributes=metric_attributes + ) + request_size_histogram.record(request_size, attributes=metric_attributes) + response_size_histogram.record(response_size, attributes=metric_attributes) + if response_hook: response_hook(span, future) span.end() + + +def _create_metric_attributes(response): + metric_attributes = { + SpanAttributes.HTTP_STATUS_CODE: response.code, + SpanAttributes.HTTP_URL: remove_url_credentials(response.request.url), + SpanAttributes.HTTP_METHOD: response.request.method, + } + + return metric_attributes diff --git a/instrumentation/opentelemetry-instrumentation-tornado/src/opentelemetry/instrumentation/tornado/package.py b/instrumentation/opentelemetry-instrumentation-tornado/src/opentelemetry/instrumentation/tornado/package.py index 195980dd07..734587b752 100644 --- a/instrumentation/opentelemetry-instrumentation-tornado/src/opentelemetry/instrumentation/tornado/package.py +++ b/instrumentation/opentelemetry-instrumentation-tornado/src/opentelemetry/instrumentation/tornado/package.py @@ -14,3 +14,5 @@ _instruments = ("tornado >= 5.1.1",) + +_supports_metrics = True diff --git a/instrumentation/opentelemetry-instrumentation-tornado/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-tornado/tests/test_instrumentation.py index 8dcc94b683..2b47ddc822 100644 --- a/instrumentation/opentelemetry-instrumentation-tornado/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-tornado/tests/test_instrumentation.py @@ -55,12 +55,12 @@ def get_app(self): return app def setUp(self): + super().setUp() TornadoInstrumentor().instrument( server_request_hook=getattr(self, "server_request_hook", None), client_request_hook=getattr(self, "client_request_hook", None), client_response_hook=getattr(self, "client_response_hook", None), ) - super().setUp() # pylint: disable=protected-access self.env_patch = patch.dict( "os.environ", @@ -110,9 +110,9 @@ def test_patch_references(self): def test_patch_applied_only_once(self): tracer = trace.get_tracer(__name__) - self.assertTrue(patch_handler_class(tracer, AsyncHandler)) - self.assertFalse(patch_handler_class(tracer, AsyncHandler)) - self.assertFalse(patch_handler_class(tracer, AsyncHandler)) + self.assertTrue(patch_handler_class(tracer, {}, AsyncHandler)) + self.assertFalse(patch_handler_class(tracer, {}, AsyncHandler)) + self.assertFalse(patch_handler_class(tracer, {}, AsyncHandler)) unpatch_handler_class(AsyncHandler) diff --git a/instrumentation/opentelemetry-instrumentation-tornado/tests/test_metrics_instrumentation.py b/instrumentation/opentelemetry-instrumentation-tornado/tests/test_metrics_instrumentation.py new file mode 100644 index 0000000000..14ef4b842a --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-tornado/tests/test_metrics_instrumentation.py @@ -0,0 +1,235 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from timeit import default_timer + +from tornado.testing import AsyncHTTPTestCase + +from opentelemetry import trace +from opentelemetry.instrumentation.tornado import TornadoInstrumentor +from opentelemetry.sdk.metrics.export import ( + HistogramDataPoint, + NumberDataPoint, +) +from opentelemetry.test.test_base import TestBase + +from .tornado_test_app import make_app + + +class TornadoTest(AsyncHTTPTestCase, TestBase): + # pylint:disable=no-self-use + def get_app(self): + tracer = trace.get_tracer(__name__) + app = make_app(tracer) + return app + + def get_sorted_metrics(self): + resource_metrics = ( + self.memory_metrics_reader.get_metrics_data().resource_metrics + ) + for metrics in resource_metrics: + for scope_metrics in metrics.scope_metrics: + all_metrics = list(scope_metrics.metrics) + return self.sorted_metrics(all_metrics) + + @staticmethod + def sorted_metrics(metrics): + """ + Sorts metrics by metric name. + """ + return sorted( + metrics, + key=lambda m: m.name, + ) + + def assert_metric_expected( + self, metric, expected_value, expected_attributes + ): + data_point = next(metric.data.data_points) + + if isinstance(data_point, HistogramDataPoint): + self.assertEqual( + data_point.sum, + expected_value, + ) + elif isinstance(data_point, NumberDataPoint): + self.assertEqual( + data_point.value, + expected_value, + ) + + self.assertDictEqual( + expected_attributes, + dict(data_point.attributes), + ) + + def assert_duration_metric_expected( + self, metric, duration_estimated, expected_attributes + ): + data_point = next(metric.data.data_points) + + self.assertAlmostEqual( + data_point.sum, + duration_estimated, + delta=200, + ) + + self.assertDictEqual( + expected_attributes, + dict(data_point.attributes), + ) + + def setUp(self): + super().setUp() + TornadoInstrumentor().instrument( + server_request_hook=getattr(self, "server_request_hook", None), + client_request_hook=getattr(self, "client_request_hook", None), + client_response_hook=getattr(self, "client_response_hook", None), + meter_provider=self.meter_provider, + ) + + def tearDown(self): + TornadoInstrumentor().uninstrument() + super().tearDown() + + +class TestTornadoInstrumentor(TornadoTest): + def test_basic_metrics(self): + start_time = default_timer() + response = self.fetch("/") + client_duration_estimated = (default_timer() - start_time) * 1000 + + metrics = self.get_sorted_metrics() + self.assertEqual(len(metrics), 7) + + ( + client_duration, + client_request_size, + client_response_size, + ) = metrics[:3] + + ( + server_active_request, + server_duration, + server_request_size, + server_response_size, + ) = metrics[3:] + + self.assertEqual( + server_active_request.name, "http.server.active_requests" + ) + self.assert_metric_expected( + server_active_request, + 0, + { + "http.method": "GET", + "http.flavor": "HTTP/1.1", + "http.scheme": "http", + "http.target": "/", + "http.host": response.request.headers["host"], + }, + ) + + self.assertEqual(server_duration.name, "http.server.duration") + self.assert_duration_metric_expected( + server_duration, + client_duration_estimated, + { + "http.status_code": response.code, + "http.method": "GET", + "http.flavor": "HTTP/1.1", + "http.scheme": "http", + "http.target": "/", + "http.host": response.request.headers["host"], + }, + ) + + self.assertEqual(server_request_size.name, "http.server.request.size") + self.assert_metric_expected( + server_request_size, + 0, + { + "http.status_code": 200, + "http.method": "GET", + "http.flavor": "HTTP/1.1", + "http.scheme": "http", + "http.target": "/", + "http.host": response.request.headers["host"], + }, + ) + + self.assertEqual( + server_response_size.name, "http.server.response.size" + ) + self.assert_metric_expected( + server_response_size, + len(response.body), + { + "http.status_code": response.code, + "http.method": "GET", + "http.flavor": "HTTP/1.1", + "http.scheme": "http", + "http.target": "/", + "http.host": response.request.headers["host"], + }, + ) + + self.assertEqual(client_duration.name, "http.client.duration") + self.assert_duration_metric_expected( + client_duration, + client_duration_estimated, + { + "http.status_code": response.code, + "http.method": "GET", + "http.url": response.effective_url, + }, + ) + + self.assertEqual(client_request_size.name, "http.client.request.size") + self.assert_metric_expected( + client_request_size, + 0, + { + "http.status_code": response.code, + "http.method": "GET", + "http.url": response.effective_url, + }, + ) + + self.assertEqual( + client_response_size.name, "http.client.response.size" + ) + self.assert_metric_expected( + client_response_size, + len(response.body), + { + "http.status_code": response.code, + "http.method": "GET", + "http.url": response.effective_url, + }, + ) + + def test_metric_uninstrument(self): + self.fetch("/") + TornadoInstrumentor().uninstrument() + self.fetch("/") + + metrics_list = self.memory_metrics_reader.get_metrics_data() + for resource_metric in metrics_list.resource_metrics: + for scope_metric in resource_metric.scope_metrics: + for metric in scope_metric.metrics: + for point in list(metric.data.data_points): + if isinstance(point, HistogramDataPoint): + self.assertEqual(point.count, 1)