diff --git a/CHANGELOG.md b/CHANGELOG.md index dff32bafca..421900aa59 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,6 +49,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#2404](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2404)) - Remove SDK dependency from opentelemetry-instrumentation-grpc ([#2474](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2474)) +- `opentelemetry-instrumentation-elasticsearch` Improved support for version 8 + ([#2420](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2420)) ## Version 1.24.0/0.45b0 (2024-03-28) diff --git a/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py b/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py index ceb50cac56..00bf397f6f 100644 --- a/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py @@ -94,7 +94,7 @@ def response_hook(span, response): from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.instrumentation.utils import unwrap from opentelemetry.semconv.trace import SpanAttributes -from opentelemetry.trace import SpanKind, get_tracer +from opentelemetry.trace import SpanKind, Status, StatusCode, get_tracer from .utils import sanitize_body @@ -103,6 +103,7 @@ def response_hook(span, response): es_transport_split = elasticsearch.VERSION[0] > 7 if es_transport_split: import elastic_transport + from elastic_transport._models import DefaultType logger = getLogger(__name__) @@ -173,7 +174,12 @@ def _instrument(self, **kwargs): def _uninstrument(self, **kwargs): # pylint: disable=no-member - unwrap(elasticsearch.Transport, "perform_request") + transport_class = ( + elastic_transport.Transport + if es_transport_split + else elasticsearch.Transport + ) + unwrap(transport_class, "perform_request") _regex_doc_url = re.compile(r"/_doc/([^/]+)") @@ -234,7 +240,21 @@ def wrapper(wrapped, _, args, kwargs): kind=SpanKind.CLIENT, ) as span: if callable(request_hook): - request_hook(span, method, url, kwargs) + if es_transport_split: + + def normalize_kwargs(k, v): + if isinstance(v, DefaultType): + v = str(v) + elif isinstance(v, elastic_transport.HttpHeaders): + v = dict(v) + return (k, v) + + hook_kwargs = dict( + normalize_kwargs(k, v) for k, v in kwargs.items() + ) + else: + hook_kwargs = kwargs + request_hook(span, method, url, hook_kwargs) if span.is_recording(): attributes = { @@ -260,16 +280,41 @@ def wrapper(wrapped, _, args, kwargs): span.set_attribute(key, value) rv = wrapped(*args, **kwargs) - if isinstance(rv, dict) and span.is_recording(): + + body = rv.body if es_transport_split else rv + if isinstance(body, dict) and span.is_recording(): for member in _ATTRIBUTES_FROM_RESULT: - if member in rv: + if member in body: span.set_attribute( f"elasticsearch.{member}", - str(rv[member]), + str(body[member]), + ) + + # since the transport split the raising of exceptions that set the error status + # are called after this code so need to set error status manually + if es_transport_split and span.is_recording(): + if not (method == "HEAD" and rv.meta.status == 404) and ( + not 200 <= rv.meta.status < 299 + ): + exception = elasticsearch.exceptions.HTTP_EXCEPTIONS.get( + rv.meta.status, elasticsearch.exceptions.ApiError + ) + message = str(body) + if isinstance(body, dict): + error = body.get("error", message) + if isinstance(error, dict) and "type" in error: + error = error["type"] + message = error + + span.set_status( + Status( + status_code=StatusCode.ERROR, + description=f"{exception.__name__}: {message}", ) + ) if callable(response_hook): - response_hook(span, rv) + response_hook(span, body) return rv return wrapper diff --git a/instrumentation/opentelemetry-instrumentation-elasticsearch/test-requirements-2.txt b/instrumentation/opentelemetry-instrumentation-elasticsearch/test-requirements-2.txt new file mode 100644 index 0000000000..69578e7392 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-elasticsearch/test-requirements-2.txt @@ -0,0 +1,23 @@ +asgiref==3.7.2 +attrs==23.2.0 +Deprecated==1.2.14 +elasticsearch==8.12.1 +elasticsearch-dsl==8.12.0 +elastic-transport==8.12.0 +importlib-metadata==7.1.0 +iniconfig==2.0.0 +packaging==23.2 +pluggy==1.4.0 +py==1.11.0 +py-cpuinfo==9.0.0 +pytest==7.1.3 +pytest-benchmark==4.0.0 +python-dateutil==2.8.2 +six==1.16.0 +tomli==2.0.1 +typing_extensions==4.10.0 +urllib3==2.2.1 +wrapt==1.16.0 +zipp==3.17.0 +-e opentelemetry-instrumentation +-e instrumentation/opentelemetry-instrumentation-elasticsearch diff --git a/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/helpers_es6.py b/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/helpers_es6.py index b27d291ba3..8169eb25c4 100644 --- a/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/helpers_es6.py +++ b/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/helpers_es6.py @@ -31,3 +31,9 @@ class Index: dsl_index_span_name = "Elasticsearch/test-index/doc/2" dsl_index_url = "/test-index/doc/2" dsl_search_method = "GET" + +perform_request_mock_path = "elasticsearch.connection.http_urllib3.Urllib3HttpConnection.perform_request" + + +def mock_response(body: str, status_code: int = 200): + return (status_code, {}, body) diff --git a/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/helpers_es7.py b/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/helpers_es7.py index b22df18452..3a2b18a931 100644 --- a/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/helpers_es7.py +++ b/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/helpers_es7.py @@ -29,3 +29,9 @@ class Index: dsl_index_span_name = "Elasticsearch/test-index/_doc/:id" dsl_index_url = "/test-index/_doc/2" dsl_search_method = "POST" + +perform_request_mock_path = "elasticsearch.connection.http_urllib3.Urllib3HttpConnection.perform_request" + + +def mock_response(body: str, status_code: int=200): + return (status_code, {}, body) diff --git a/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/helpers_es8.py b/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/helpers_es8.py index 04ed2efda2..e16ec73ea5 100644 --- a/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/helpers_es8.py +++ b/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/helpers_es8.py @@ -13,6 +13,8 @@ # limitations under the License. from elasticsearch_dsl import Document, Keyword, Text +from elastic_transport._node import NodeApiResponse +from elastic_transport import ApiResponseMeta, HttpHeaders class Article(Document): @@ -36,6 +38,23 @@ class Index: } } dsl_index_result = (1, {}, '{"result": "created"}') -dsl_index_span_name = "Elasticsearch/test-index/_doc/2" +dsl_index_span_name = "Elasticsearch/test-index/_doc/:id" dsl_index_url = "/test-index/_doc/2" dsl_search_method = "POST" + +perform_request_mock_path = ( + "elastic_transport._node._http_urllib3.Urllib3HttpNode.perform_request" +) + + +def mock_response(body: str, status_code: int = 200): + return NodeApiResponse( + ApiResponseMeta( + status=status_code, + headers=HttpHeaders({}), + duration=100, + http_version="1.1", + node="node", + ), + body.encode(), + ) diff --git a/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py b/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py index 690cbe3d4c..b0ee170329 100644 --- a/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py +++ b/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py @@ -51,25 +51,25 @@ def normalize_arguments(doc_type, body=None): - if major_version == 7: - return {"document": body} if body else {} - return ( - {"body": body, "doc_type": doc_type} - if body - else {"doc_type": doc_type} - ) + if major_version < 7: + return ( + {"body": body, "doc_type": doc_type} + if body + else {"doc_type": doc_type} + ) + return {"document": body} if body else {} def get_elasticsearch_client(*args, **kwargs): client = Elasticsearch(*args, **kwargs) - if major_version == 7: + if major_version == 8: + client._verified_elasticsearch = True + elif major_version == 7: client.transport._verified_elasticsearch = True return client -@mock.patch( - "elasticsearch.connection.http_urllib3.Urllib3HttpConnection.perform_request" -) +@mock.patch(helpers.perform_request_mock_path) class TestElasticsearchIntegration(TestBase): search_attributes = { SpanAttributes.DB_SYSTEM: "elasticsearch", @@ -96,7 +96,7 @@ def tearDown(self): ElasticsearchInstrumentor().uninstrument() def test_instrumentor(self, request_mock): - request_mock.return_value = (1, {}, "{}") + request_mock.return_value = helpers.mock_response("{}") es = get_elasticsearch_client(hosts=["http://localhost:9200"]) es.index( @@ -147,7 +147,7 @@ def test_prefix_arg(self, request_mock): prefix = "prefix-from-env" ElasticsearchInstrumentor().uninstrument() ElasticsearchInstrumentor(span_name_prefix=prefix).instrument() - request_mock.return_value = (1, {}, "{}") + request_mock.return_value = helpers.mock_response("{}") self._test_prefix(prefix) def test_prefix_env(self, request_mock): @@ -156,7 +156,7 @@ def test_prefix_env(self, request_mock): os.environ[env_var] = prefix ElasticsearchInstrumentor().uninstrument() ElasticsearchInstrumentor().instrument() - request_mock.return_value = (1, {}, "{}") + request_mock.return_value = helpers.mock_response("{}") del os.environ[env_var] self._test_prefix(prefix) @@ -174,10 +174,8 @@ def _test_prefix(self, prefix): self.assertTrue(span.name.startswith(prefix)) def test_result_values(self, request_mock): - request_mock.return_value = ( - 1, - {}, - '{"found": false, "timed_out": true, "took": 7}', + request_mock.return_value = helpers.mock_response( + '{"found": false, "timed_out": true, "took": 7}' ) es = get_elasticsearch_client(hosts=["http://localhost:9200"]) es.get( @@ -201,9 +199,18 @@ def test_trace_error_unknown(self, request_mock): def test_trace_error_not_found(self, request_mock): msg = "record not found" - exc = elasticsearch.exceptions.NotFoundError(404, msg) - request_mock.return_value = (1, {}, "{}") - request_mock.side_effect = exc + if major_version == 8: + error = {"error": msg} + response = helpers.mock_response( + json.dumps(error), status_code=404 + ) + request_mock.return_value = response + exc = elasticsearch.exceptions.NotFoundError( + msg, meta=response.meta, body=None + ) + else: + exc = elasticsearch.exceptions.NotFoundError(404, msg) + request_mock.side_effect = exc self._test_trace_error(StatusCode.ERROR, exc) def _test_trace_error(self, code, exc): @@ -222,12 +229,13 @@ def _test_trace_error(self, code, exc): span = spans[0] self.assertFalse(span.status.is_ok) self.assertEqual(span.status.status_code, code) + message = getattr(exc, "message", str(exc)) self.assertEqual( - span.status.description, f"{type(exc).__name__}: {exc}" + span.status.description, f"{type(exc).__name__}: {message}" ) def test_parent(self, request_mock): - request_mock.return_value = (1, {}, "{}") + request_mock.return_value = helpers.mock_response("{}") es = get_elasticsearch_client(hosts=["http://localhost:9200"]) with self.tracer.start_as_current_span("parent"): es.index( @@ -245,7 +253,7 @@ def test_parent(self, request_mock): self.assertEqual(child.parent.span_id, parent.context.span_id) def test_multithread(self, request_mock): - request_mock.return_value = (1, {}, "{}") + request_mock.return_value = helpers.mock_response("{}") es = get_elasticsearch_client(hosts=["http://localhost:9200"]) ev = threading.Event() @@ -292,7 +300,9 @@ def target2(): self.assertIsNone(s3.parent) def test_dsl_search(self, request_mock): - request_mock.return_value = (1, {}, '{"hits": {"hits": []}}') + request_mock.return_value = helpers.mock_response( + '{"hits": {"hits": []}}' + ) client = get_elasticsearch_client(hosts=["http://localhost:9200"]) search = Search(using=client, index="test-index").filter( @@ -310,7 +320,9 @@ def test_dsl_search(self, request_mock): ) def test_dsl_search_sanitized(self, request_mock): - request_mock.return_value = (1, {}, '{"hits": {"hits": []}}') + request_mock.return_value = helpers.mock_response( + '{"hits": {"hits": []}}' + ) client = get_elasticsearch_client(hosts=["http://localhost:9200"]) search = Search(using=client, index="test-index").filter( "term", author="testing" @@ -327,7 +339,10 @@ def test_dsl_search_sanitized(self, request_mock): ) def test_dsl_create(self, request_mock): - request_mock.return_value = (1, {}, "{}") + request_mock.side_effect = [ + helpers.mock_response("{}", status_code=404), + helpers.mock_response("{}"), + ] client = get_elasticsearch_client(hosts=["http://localhost:9200"]) Article.init(using=client) @@ -354,7 +369,10 @@ def test_dsl_create(self, request_mock): ) def test_dsl_create_sanitized(self, request_mock): - request_mock.return_value = (1, {}, "{}") + request_mock.side_effect = [ + helpers.mock_response("{}", status_code=404), + helpers.mock_response("{}"), + ] client = get_elasticsearch_client(hosts=["http://localhost:9200"]) Article.init(using=client) @@ -370,7 +388,9 @@ def test_dsl_create_sanitized(self, request_mock): ) def test_dsl_index(self, request_mock): - request_mock.return_value = (1, {}, helpers.dsl_index_result[2]) + request_mock.return_value = helpers.mock_response( + helpers.dsl_index_result[2] + ) client = get_elasticsearch_client(hosts=["http://localhost:9200"]) article = Article( @@ -416,10 +436,8 @@ def request_hook(span, method, url, kwargs): ElasticsearchInstrumentor().uninstrument() ElasticsearchInstrumentor().instrument(request_hook=request_hook) - request_mock.return_value = ( - 1, - {}, - '{"found": false, "timed_out": true, "took": 7}', + request_mock.return_value = helpers.mock_response( + '{"found": false, "timed_out": true, "took": 7}' ) es = get_elasticsearch_client(hosts=["http://localhost:9200"]) index = "test-index" @@ -439,12 +457,26 @@ def request_hook(span, method, url, kwargs): "GET", spans[0].attributes[request_hook_method_attribute] ) expected_url = f"/{index}/_doc/{doc_id}" + if major_version == 8: + expected_url += "?realtime=true&refresh=true" self.assertEqual( expected_url, spans[0].attributes[request_hook_url_attribute], ) - if major_version == 7: + if major_version == 8: + expected_kwargs = { + "body": None, + "request_timeout": "", + "max_retries": "", + "retry_on_status": "", + "retry_on_timeout": "", + "client_meta": "", + "headers": { + "accept": "application/vnd.elasticsearch+json; compatible-with=8" + }, + } + elif major_version == 7: expected_kwargs = { **kwargs, "headers": {"accept": "application/json"}, @@ -452,8 +484,8 @@ def request_hook(span, method, url, kwargs): else: expected_kwargs = {**kwargs} self.assertEqual( - json.dumps(expected_kwargs), - spans[0].attributes[request_hook_kwargs_attribute], + expected_kwargs, + json.loads(spans[0].attributes[request_hook_kwargs_attribute]), ) def test_response_hook(self, request_mock): @@ -492,7 +524,9 @@ def response_hook(span, response): }, } - request_mock.return_value = (1, {}, json.dumps(response_payload)) + request_mock.return_value = helpers.mock_response( + json.dumps(response_payload) + ) es = get_elasticsearch_client(hosts=["http://localhost:9200"]) es.get( index="test-index", **normalize_arguments(doc_type="_doc"), id=1 @@ -512,7 +546,7 @@ def test_no_op_tracer_provider(self, request_mock): tracer_provider=trace.NoOpTracerProvider() ) response_payload = '{"found": false, "timed_out": true, "took": 7}' - request_mock.return_value = (1, {}, response_payload) + request_mock.return_value = helpers.mock_response(response_payload) es = get_elasticsearch_client(hosts=["http://localhost:9200"]) res = es.get( index="test-index", **normalize_arguments(doc_type="_doc"), id=1 @@ -543,7 +577,7 @@ def test_body_sanitization(self, _): ) def test_bulk(self, request_mock): - request_mock.return_value = (1, {}, "{}") + request_mock.return_value = helpers.mock_response("{}") es = get_elasticsearch_client(hosts=["http://localhost:9200"]) es.bulk( diff --git a/tox.ini b/tox.ini index ed74e485cd..ab199e4371 100644 --- a/tox.ini +++ b/tox.ini @@ -92,8 +92,9 @@ envlist = ; below mean these dependencies are being used: ; 0: elasticsearch-dsl==6.4.0 elasticsearch==6.8.2 ; 1: elasticsearch-dsl==7.4.1 elasticsearch==7.17.9 - py3{8,9,10,11}-test-instrumentation-elasticsearch-{0,1} - pypy3-test-instrumentation-elasticsearch-{0,1} + ; 2: elasticsearch-dsl>=8.0,<8.13 elasticsearch>=8.0,<8.13 + py3{8,9,10,11}-test-instrumentation-elasticsearch-{0,1,2} + pypy3-test-instrumentation-elasticsearch-{0,1,2} lint-instrumentation-elasticsearch ; opentelemetry-instrumentation-falcon @@ -716,6 +717,7 @@ commands_pre = elasticsearch: pip install opentelemetry-test-utils@{env:CORE_REPO}\#egg=opentelemetry-test-utils&subdirectory=tests/opentelemetry-test-utils elasticsearch-0: pip install -r {toxinidir}/instrumentation/opentelemetry-instrumentation-elasticsearch/test-requirements-0.txt elasticsearch-1: pip install -r {toxinidir}/instrumentation/opentelemetry-instrumentation-elasticsearch/test-requirements-1.txt + elasticsearch-2: pip install -r {toxinidir}/instrumentation/opentelemetry-instrumentation-elasticsearch/test-requirements-2.txt lint-instrumentation-elasticsearch: pip install -r {toxinidir}/instrumentation/opentelemetry-instrumentation-elasticsearch/test-requirements-1.txt asyncio: pip install opentelemetry-api@{env:CORE_REPO}\#egg=opentelemetry-api&subdirectory=opentelemetry-api