From eaf6b61ad26c0d1090e5172e8dca7754fa63031a Mon Sep 17 00:00:00 2001 From: Mukund Date: Sat, 6 Jul 2024 07:26:58 +0000 Subject: [PATCH] Fix build failures --- .../_protocol/streaming_pull_manager.py | 10 +++++----- .../subscriber/test_streaming_pull_manager.py | 19 ++++++++++++++++--- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index eb2ff6fa9..136d7bc18 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -660,7 +660,7 @@ def _schedule_message_on_hold( assert self._scheduler is not None assert self._callback is not None if ( - self.open_telemetry_enabled() + self._open_telemetry_enabled == True and msg.open_telemetry_data and msg.open_telemetry_data.subscribe_span ): @@ -1134,7 +1134,7 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None: received_messages = response._pb.received_messages subscribe_spans = [] - if self.open_telemetry_enabled(): + if self._open_telemetry_enabled: for received_message in response.received_messages: parent_span_context = TraceContextTextMapPropagator().extract( carrier=received_message.message, @@ -1185,14 +1185,14 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None: # Immediately (i.e. without waiting for the auto lease management) # modack the messages we received, as this tells the server that we've # received them. - if self.open_telemetry_enabled(): + if self._open_telemetry_enabled: for subscribe_span in subscribe_spans: subscribe_span.add_event("modack start") ack_id_gen = (message.ack_id for message in received_messages) expired_ack_ids = self._send_lease_modacks( ack_id_gen, self.ack_deadline, warn_on_invalid=False ) - if self.open_telemetry_enabled(): + if self._open_telemetry_enabled: for subscribe_span in subscribe_spans: subscribe_span.add_event("modack end") @@ -1213,7 +1213,7 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None: self._scheduler.queue, self._exactly_once_delivery_enabled, ) - if self.open_telemetry_enabled(): + if self._open_telemetry_enabled: message._open_telemetry_data = OpenTelemetryData( subscribe_span=subscribe_spans[i], ) diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index b595b7056..3e5520f85 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -1316,7 +1316,7 @@ def make_running_manager(enable_open_telemetry=False, **kwargs): manager._dispatcher = mock.create_autospec(dispatcher.Dispatcher, instance=True) manager._leaser = mock.create_autospec(leaser.Leaser, instance=True) manager._heartbeater = mock.create_autospec(heartbeater.Heartbeater, instance=True) - manager._open_telemetry_enabled = mock.Mock(return_value=enable_open_telemetry) + manager._open_telemetry_enabled = enable_open_telemetry return ( manager, manager._consumer, @@ -1557,9 +1557,10 @@ def test__on_response_delivery_attempt(): assert msg2.delivery_attempt == 6 -def test__on_response_mod_ack_otel(span_exporter): +@pytest.mark.parametrize("otel_enabled", [(True), (False)]) +def test__on_response_mod_ack_otel(span_exporter, otel_enabled): manager, _, dispatcher, leaser, _, scheduler = make_running_manager( - enable_open_telemetry=True + enable_open_telemetry=otel_enabled, ) manager._callback = mock.sentinel.callback @@ -1605,6 +1606,18 @@ def test__on_response_mod_ack_otel(span_exporter): # Subscribe span would still be active, hence would not be exported. spans = span_exporter.get_finished_spans() assert len(spans) == 0 + if otel_enabled: + call_args = scheduler.schedule.call_args_list + assert len(call_args) == 2 + for call_arg in call_args: + args, _ = call_arg + otel_data = args[1].open_telemetry_data + assert otel_data.concurrency_control_span is not None + assert otel_data.concurrency_control_span.kind == trace.SpanKind.INTERNAL + assert ( + otel_data.concurrency_control_span.name + == "subscriber concurrency control" + ) def test__on_response_modifies_ack_deadline():