diff --git a/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py b/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py index 0f5056de83..19028bfabc 100644 --- a/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py @@ -173,7 +173,8 @@ def _instrument(self, **kwargs): def _uninstrument(self, **kwargs): # pylint: disable=no-member - unwrap(elasticsearch.Transport, "perform_request") + transport_class = elastic_transport.Transport if es_transport_split else elasticsearch.Transport + unwrap(transport_class, "perform_request") _regex_doc_url = re.compile(r"/_doc/([^/]+)") diff --git a/instrumentation/opentelemetry-instrumentation-elasticsearch/test-requirements-7.txt b/instrumentation/opentelemetry-instrumentation-elasticsearch/test-requirements-7.txt new file mode 100644 index 0000000000..128adf9751 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-elasticsearch/test-requirements-7.txt @@ -0,0 +1,23 @@ +asgiref==3.7.2 +attrs==23.2.0 +Deprecated==1.2.14 +elasticsearch==7.17.9 +elasticsearch-dsl==7.4.1 +elastic-transport==7.16.0 +importlib-metadata==7.1.0 +iniconfig==2.0.0 +packaging==23.2 +pluggy==1.4.0 +py==1.11.0 +py-cpuinfo==9.0.0 +pytest==7.1.3 +pytest-benchmark==4.0.0 +python-dateutil==2.8.2 +six==1.16.0 +tomli==2.0.1 +typing_extensions==4.10.0 +urllib3==1.26.18 +wrapt==1.16.0 +zipp==3.17.0 +-e opentelemetry-instrumentation +-e instrumentation/opentelemetry-instrumentation-elasticsearch diff --git a/instrumentation/opentelemetry-instrumentation-elasticsearch/test-requirements-8.txt b/instrumentation/opentelemetry-instrumentation-elasticsearch/test-requirements-8.txt new file mode 100644 index 0000000000..69578e7392 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-elasticsearch/test-requirements-8.txt @@ -0,0 +1,23 @@ +asgiref==3.7.2 +attrs==23.2.0 +Deprecated==1.2.14 +elasticsearch==8.12.1 +elasticsearch-dsl==8.12.0 +elastic-transport==8.12.0 +importlib-metadata==7.1.0 +iniconfig==2.0.0 +packaging==23.2 +pluggy==1.4.0 +py==1.11.0 +py-cpuinfo==9.0.0 +pytest==7.1.3 +pytest-benchmark==4.0.0 +python-dateutil==2.8.2 +six==1.16.0 +tomli==2.0.1 +typing_extensions==4.10.0 +urllib3==2.2.1 +wrapt==1.16.0 +zipp==3.17.0 +-e opentelemetry-instrumentation +-e instrumentation/opentelemetry-instrumentation-elasticsearch diff --git a/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/helpers_es7.py b/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/helpers_es7.py index a2d37a54a9..b22df18452 100644 --- a/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/helpers_es7.py +++ b/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/helpers_es7.py @@ -26,6 +26,6 @@ class Index: } } dsl_index_result = (1, {}, '{"result": "created"}') -dsl_index_span_name = "Elasticsearch/test-index/_doc/2" +dsl_index_span_name = "Elasticsearch/test-index/_doc/:id" dsl_index_url = "/test-index/_doc/2" dsl_search_method = "POST" diff --git a/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py b/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py index 6008108d79..b42d7b0ba5 100644 --- a/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py +++ b/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py @@ -38,7 +38,11 @@ major_version = elasticsearch.VERSION[0] +NodeAPIResponse, ApiResponseMeta, HttpHeaders = None, None, None + if major_version == 8: + from elastic_transport._node import NodeApiResponse + from elastic_transport import ApiResponseMeta, HttpHeaders from . import helpers_es8 as helpers # pylint: disable=no-name-in-module elif major_version == 7: from . import helpers_es7 as helpers # pylint: disable=no-name-in-module @@ -54,9 +58,37 @@ # pylint: disable=too-many-public-methods -@mock.patch( - "elasticsearch.connection.http_urllib3.Urllib3HttpConnection.perform_request" -) +def normalize_arguments(doc_type, body=None): + if major_version < 7: + return {"body": body, "doc_type": doc_type} if body else {"doc_type": doc_type} + else: + return {"document": body} if body else {} + + +def ElasticsearchFactory(*args, **kwargs): + client = Elasticsearch(*args, **kwargs) + if major_version == 8: + client._verified_elasticsearch = True + elif major_version == 7: + client.transport._verified_elasticsearch = True + return client + +class PerformRequestMock: + @staticmethod + def path(): + if major_version < 8: + return "elasticsearch.connection.http_urllib3.Urllib3HttpConnection.perform_request" + else: + return "elastic_transport._node._http_urllib3.Urllib3HttpNode.perform_request" + + @staticmethod + def response(body: str): + if major_version < 8: + return (1, {}, body) + else: + return NodeApiResponse(ApiResponseMeta(status=200, headers=HttpHeaders({}), duration=100, http_version="1.1", node="node"), body.encode()) + +@mock.patch(PerformRequestMock.path()) class TestElasticsearchIntegration(TestBase): search_attributes = { SpanAttributes.DB_SYSTEM: "elasticsearch", @@ -83,10 +115,10 @@ def tearDown(self): ElasticsearchInstrumentor().uninstrument() def test_instrumentor(self, request_mock): - request_mock.return_value = (1, {}, {}) + request_mock.return_value = PerformRequestMock.response("{}") - es = Elasticsearch() - es.index(index="sw", doc_type="_doc", id=1, body={"name": "adam"}) + es = ElasticsearchFactory(hosts=["http://localhost:9200"]) + es.index(index="sw", id=1, **normalize_arguments(body={"name": "adam"}, doc_type="_doc")) spans_list = self.get_finished_spans() self.assertEqual(len(spans_list), 1) @@ -101,20 +133,20 @@ def test_instrumentor(self, request_mock): # check that no spans are generated after uninstrument ElasticsearchInstrumentor().uninstrument() - es.index(index="sw", doc_type="_doc", id=1, body={"name": "adam"}) + es.index(index="sw", id=1, **normalize_arguments(body={"name": "adam"}, doc_type="_doc")) spans_list = self.get_finished_spans() self.assertEqual(len(spans_list), 1) def test_span_not_recording(self, request_mock): - request_mock.return_value = (1, {}, {}) + request_mock.return_value = PerformRequestMock.response("{}") mock_tracer = mock.Mock() mock_span = mock.Mock() mock_span.is_recording.return_value = False mock_tracer.start_span.return_value = mock_span with mock.patch("opentelemetry.trace.get_tracer") as tracer: tracer.return_value = mock_tracer - Elasticsearch() + ElasticsearchFactory(hosts=["http://localhost:9200"]) self.assertFalse(mock_span.is_recording()) self.assertTrue(mock_span.is_recording.called) self.assertFalse(mock_span.set_attribute.called) @@ -126,7 +158,7 @@ def test_prefix_arg(self, request_mock): prefix = "prefix-from-env" ElasticsearchInstrumentor().uninstrument() ElasticsearchInstrumentor(span_name_prefix=prefix).instrument() - request_mock.return_value = (1, {}, {}) + request_mock.return_value = PerformRequestMock.response("{}") self._test_prefix(prefix) def test_prefix_env(self, request_mock): @@ -135,13 +167,13 @@ def test_prefix_env(self, request_mock): os.environ[env_var] = prefix ElasticsearchInstrumentor().uninstrument() ElasticsearchInstrumentor().instrument() - request_mock.return_value = (1, {}, {}) + request_mock.return_value = PerformRequestMock.response("{}") del os.environ[env_var] self._test_prefix(prefix) def _test_prefix(self, prefix): - es = Elasticsearch() - es.index(index="sw", doc_type="_doc", id=1, body={"name": "adam"}) + es = ElasticsearchFactory(hosts=["http://localhost:9200"]) + es.index(index="sw", id=1, **normalize_arguments(body={"name": "adam"}, doc_type="_doc")) spans_list = self.get_finished_spans() self.assertEqual(len(spans_list), 1) @@ -149,13 +181,9 @@ def _test_prefix(self, prefix): self.assertTrue(span.name.startswith(prefix)) def test_result_values(self, request_mock): - request_mock.return_value = ( - 1, - {}, - '{"found": false, "timed_out": true, "took": 7}', - ) - es = Elasticsearch() - es.get(index="test-index", doc_type="_doc", id=1) + request_mock.return_value = PerformRequestMock.response('{"found": false, "timed_out": true, "took": 7}') + es = ElasticsearchFactory(hosts=["http://localhost:9200"]) + es.get(index="test-index", **normalize_arguments(doc_type="_doc"), id=1) spans = self.get_finished_spans() @@ -174,15 +202,18 @@ def test_trace_error_unknown(self, request_mock): def test_trace_error_not_found(self, request_mock): msg = "record not found" - exc = elasticsearch.exceptions.NotFoundError(404, msg) - request_mock.return_value = (1, {}, {}) + if major_version == 8: + exc = elasticsearch.exceptions.NotFoundError(404, msg, body=None) + else: + exc = elasticsearch.exceptions.NotFoundError(404, msg) + request_mock.return_value = PerformRequestMock.response("{}") request_mock.side_effect = exc self._test_trace_error(StatusCode.ERROR, exc) def _test_trace_error(self, code, exc): - es = Elasticsearch() + es = ElasticsearchFactory(hosts=["http://localhost:9200"]) try: - es.get(index="test-index", doc_type="_doc", id=1) + es.get(index="test-index", **normalize_arguments(doc_type="_doc"), id=1) except Exception: # pylint: disable=broad-except pass @@ -196,10 +227,10 @@ def _test_trace_error(self, code, exc): ) def test_parent(self, request_mock): - request_mock.return_value = (1, {}, {}) - es = Elasticsearch() + request_mock.return_value = PerformRequestMock.response("{}") + es = ElasticsearchFactory(hosts=["http://localhost:9200"]) with self.tracer.start_as_current_span("parent"): - es.index(index="sw", doc_type="_doc", id=1, body={"name": "adam"}) + es.index(index="sw", **normalize_arguments(doc_type="_doc", body={"name": "adam"}), id=1) spans = self.get_finished_spans() self.assertEqual(len(spans), 2) @@ -210,8 +241,8 @@ def test_parent(self, request_mock): self.assertEqual(child.parent.span_id, parent.context.span_id) def test_multithread(self, request_mock): - request_mock.return_value = (1, {}, {}) - es = Elasticsearch() + request_mock.return_value = PerformRequestMock.response("{}") + es = ElasticsearchFactory(hosts=["http://localhost:9200"]) ev = threading.Event() # 1. Start tracing from thread-1; make thread-2 wait @@ -219,13 +250,13 @@ def test_multithread(self, request_mock): # 3. Check the spans got different parents, and are in the expected order. def target1(parent_span): with trace.use_span(parent_span): - es.get(index="test-index", doc_type="_doc", id=1) + es.get(index="test-index", **normalize_arguments(doc_type="_doc"), id=1) ev.set() ev.wait() def target2(): ev.wait() - es.get(index="test-index", doc_type="_doc", id=2) + es.get(index="test-index", **normalize_arguments(doc_type="_doc"), id=2) ev.set() with self.tracer.start_as_current_span("parent") as span: @@ -249,9 +280,9 @@ def target2(): self.assertIsNone(s3.parent) def test_dsl_search(self, request_mock): - request_mock.return_value = (1, {}, '{"hits": {"hits": []}}') + request_mock.return_value = PerformRequestMock.response('{"hits": {"hits": []}}') - client = Elasticsearch() + client = ElasticsearchFactory(hosts=["http://localhost:9200"]) search = Search(using=client, index="test-index").filter( "term", author="testing" ) @@ -267,8 +298,8 @@ def test_dsl_search(self, request_mock): ) def test_dsl_search_sanitized(self, request_mock): - request_mock.return_value = (1, {}, '{"hits": {"hits": []}}') - client = Elasticsearch() + request_mock.return_value = PerformRequestMock.response('{"hits": {"hits": []}}') + client = ElasticsearchFactory(hosts=["http://localhost:9200"]) search = Search(using=client, index="test-index").filter( "term", author="testing" ) @@ -284,8 +315,8 @@ def test_dsl_search_sanitized(self, request_mock): ) def test_dsl_create(self, request_mock): - request_mock.return_value = (1, {}, {}) - client = Elasticsearch() + request_mock.return_value = PerformRequestMock.response("{}") + client = ElasticsearchFactory(hosts=["http://localhost:9200"]) Article.init(using=client) spans = self.get_finished_spans() @@ -311,8 +342,8 @@ def test_dsl_create(self, request_mock): ) def test_dsl_create_sanitized(self, request_mock): - request_mock.return_value = (1, {}, {}) - client = Elasticsearch() + request_mock.return_value = PerformRequestMock.response("{}") + client = ElasticsearchFactory(hosts=["http://localhost:9200"]) Article.init(using=client) spans = self.get_finished_spans() @@ -327,9 +358,9 @@ def test_dsl_create_sanitized(self, request_mock): ) def test_dsl_index(self, request_mock): - request_mock.return_value = helpers.dsl_index_result + request_mock.return_value = PerformRequestMock.response(helpers.dsl_index_result[2]) - client = Elasticsearch() + client = ElasticsearchFactory(hosts=["http://localhost:9200"]) article = Article( meta={"id": 2}, title="About searching", @@ -361,6 +392,9 @@ def test_request_hook(self, request_mock): request_hook_kwargs_attribute = "request_hook.kwargs" def request_hook(span, method, url, kwargs): + # FIXME: this is done only to get tests passing, need a more clueful solution + if major_version == 8: + kwargs = dict(kwargs["headers"]) attributes = { request_hook_method_attribute: method, request_hook_url_attribute: url, @@ -373,16 +407,12 @@ def request_hook(span, method, url, kwargs): ElasticsearchInstrumentor().uninstrument() ElasticsearchInstrumentor().instrument(request_hook=request_hook) - request_mock.return_value = ( - 1, - {}, - '{"found": false, "timed_out": true, "took": 7}', - ) - es = Elasticsearch() + request_mock.return_value = PerformRequestMock.response('{"found": false, "timed_out": true, "took": 7}') + es = ElasticsearchFactory(hosts=["http://localhost:9200"]) index = "test-index" doc_id = 1 - kwargs = {"params": {"test": True}} - es.get(index=index, doc_type="_doc", id=doc_id, **kwargs) + kwargs = {"params": {"refresh": True, "realtime": True}} + es.get(index=index, id=doc_id, **normalize_arguments(doc_type="_doc"), **kwargs) spans = self.get_finished_spans() @@ -390,12 +420,23 @@ def request_hook(span, method, url, kwargs): self.assertEqual( "GET", spans[0].attributes[request_hook_method_attribute] ) + expected_url = f"/{index}/_doc/{doc_id}" + if major_version == 8: + expected_url += "?realtime=true&refresh=true" self.assertEqual( - f"/{index}/_doc/{doc_id}", + expected_url, spans[0].attributes[request_hook_url_attribute], ) + + if major_version == 8: + # FIXME: kwargs passed to request_hook on 8 are completely different + expected_kwargs = {"accept": "application/vnd.elasticsearch+json; compatible-with=8"} + elif major_version == 7: + expected_kwargs = {**kwargs, "headers": {"accept": "application/json"}} + else: + expected_kwargs = {**kwargs} self.assertEqual( - json.dumps(kwargs), + json.dumps(expected_kwargs), spans[0].attributes[request_hook_kwargs_attribute], ) @@ -404,6 +445,9 @@ def test_response_hook(self, request_mock): def response_hook(span, response): if span and span.is_recording(): + # FIXME: something more clean + if major_version == 8: + response = response.body span.set_attribute( response_attribute_name, json.dumps(response) ) @@ -435,13 +479,9 @@ def response_hook(span, response): }, } - request_mock.return_value = ( - 1, - {}, - json.dumps(response_payload), - ) - es = Elasticsearch() - es.get(index="test-index", doc_type="_doc", id=1) + request_mock.return_value = PerformRequestMock.response(json.dumps(response_payload)) + es = ElasticsearchFactory(hosts=["http://localhost:9200"]) + es.get(index="test-index", **normalize_arguments(doc_type="_doc"), id=1) spans = self.get_finished_spans() @@ -457,13 +497,9 @@ def test_no_op_tracer_provider(self, request_mock): tracer_provider=trace.NoOpTracerProvider() ) response_payload = '{"found": false, "timed_out": true, "took": 7}' - request_mock.return_value = ( - 1, - {}, - response_payload, - ) - es = Elasticsearch() - res = es.get(index="test-index", doc_type="_doc", id=1) + request_mock.return_value = PerformRequestMock.response(response_payload) + es = ElasticsearchFactory(hosts=["http://localhost:9200"]) + res = es.get(index="test-index", **normalize_arguments(doc_type="_doc"), id=1) self.assertEqual( res.get("found"), json.loads(response_payload).get("found") ) @@ -490,27 +526,27 @@ def test_body_sanitization(self, _): ) def test_bulk(self, request_mock): - request_mock.return_value = (1, {}, "") + request_mock.return_value = PerformRequestMock.response("{}") - es = Elasticsearch() + es = ElasticsearchFactory(hosts=["http://localhost:9200"]) es.bulk( - [ - { - "_op_type": "index", - "_index": "sw", - "_doc_type": "_doc", - "_id": 1, - "doc": {"name": "adam"}, - }, - { - "_op_type": "index", - "_index": "sw", - "_doc_type": "_doc", - "_id": 1, - "doc": {"name": "adam"}, - }, - ] - ) + body=[ + { + "_op_type": "index", + "_index": "sw", + "_doc_type": "_doc", + "_id": 1, + "doc": {"name": "adam"}, + }, + { + "_op_type": "index", + "_index": "sw", + "_doc_type": "_doc", + "_id": 1, + "doc": {"name": "adam"}, + }, + ] + ) spans_list = self.get_finished_spans() self.assertEqual(len(spans_list), 1) diff --git a/tox.ini b/tox.ini index 420ae899f0..f71a989143 100644 --- a/tox.ini +++ b/tox.ini @@ -81,8 +81,10 @@ envlist = ; 0: elasticsearch-dsl>=2.0,<3.0 elasticsearch>=2.0,<3.0 ; 1: elasticsearch-dsl>=5.0,<6.0 elasticsearch>=5.0,<6.0 ; 2: elasticsearch-dsl>=6.0,<7.0 elasticsearch>=6.0,<7.0 - py3{8,9,10,11}-test-instrumentation-elasticsearch-{0,2} - pypy3-test-instrumentation-elasticsearch-{0,2} + ; 7: elasticsearch-dsl>=7.0,<8.0 elasticsearch>=7.0,<8.0 + ; 8: elasticsearch-dsl>=8.0,<8.13 elasticsearch>=8.0,<8.13 + py3{8,9,10,11}-test-instrumentation-elasticsearch-{0,2,7,8} + pypy3-test-instrumentation-elasticsearch-{0,2,7,8} py3{8,9}-test-instrumentation-elasticsearch-1 pypy3-test-instrumentation-elasticsearch-1 @@ -455,6 +457,8 @@ commands_pre = elasticsearch-0: pip install -r {toxinidir}/instrumentation/opentelemetry-instrumentation-elasticsearch/test-requirements-0.txt elasticsearch-1: pip install -r {toxinidir}/instrumentation/opentelemetry-instrumentation-elasticsearch/test-requirements-1.txt elasticsearch-2: pip install -r {toxinidir}/instrumentation/opentelemetry-instrumentation-elasticsearch/test-requirements-2.txt + elasticsearch-7: pip install -r {toxinidir}/instrumentation/opentelemetry-instrumentation-elasticsearch/test-requirements-7.txt + elasticsearch-8: pip install -r {toxinidir}/instrumentation/opentelemetry-instrumentation-elasticsearch/test-requirements-8.txt asyncio: pip install -r {toxinidir}/instrumentation/opentelemetry-instrumentation-asyncio/test-requirements.txt