Skip to content

Commit

Permalink
Fix build failures
Browse files Browse the repository at this point in the history
  • Loading branch information
mukund-ananthu committed Jul 7, 2024
1 parent 857d163 commit eaf6b61
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ def _schedule_message_on_hold(
assert self._scheduler is not None
assert self._callback is not None
if (
self.open_telemetry_enabled()
self._open_telemetry_enabled == True
and msg.open_telemetry_data
and msg.open_telemetry_data.subscribe_span
):
Expand Down Expand Up @@ -1134,7 +1134,7 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None:
received_messages = response._pb.received_messages

subscribe_spans = []
if self.open_telemetry_enabled():
if self._open_telemetry_enabled:
for received_message in response.received_messages:
parent_span_context = TraceContextTextMapPropagator().extract(
carrier=received_message.message,
Expand Down Expand Up @@ -1185,14 +1185,14 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None:
# Immediately (i.e. without waiting for the auto lease management)
# modack the messages we received, as this tells the server that we've
# received them.
if self.open_telemetry_enabled():
if self._open_telemetry_enabled:
for subscribe_span in subscribe_spans:
subscribe_span.add_event("modack start")
ack_id_gen = (message.ack_id for message in received_messages)
expired_ack_ids = self._send_lease_modacks(
ack_id_gen, self.ack_deadline, warn_on_invalid=False
)
if self.open_telemetry_enabled():
if self._open_telemetry_enabled:
for subscribe_span in subscribe_spans:
subscribe_span.add_event("modack end")

Expand All @@ -1213,7 +1213,7 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None:
self._scheduler.queue,
self._exactly_once_delivery_enabled,
)
if self.open_telemetry_enabled():
if self._open_telemetry_enabled:
message._open_telemetry_data = OpenTelemetryData(
subscribe_span=subscribe_spans[i],
)
Expand Down
19 changes: 16 additions & 3 deletions tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1316,7 +1316,7 @@ def make_running_manager(enable_open_telemetry=False, **kwargs):
manager._dispatcher = mock.create_autospec(dispatcher.Dispatcher, instance=True)
manager._leaser = mock.create_autospec(leaser.Leaser, instance=True)
manager._heartbeater = mock.create_autospec(heartbeater.Heartbeater, instance=True)
manager._open_telemetry_enabled = mock.Mock(return_value=enable_open_telemetry)
manager._open_telemetry_enabled = enable_open_telemetry
return (
manager,
manager._consumer,
Expand Down Expand Up @@ -1557,9 +1557,10 @@ def test__on_response_delivery_attempt():
assert msg2.delivery_attempt == 6


def test__on_response_mod_ack_otel(span_exporter):
@pytest.mark.parametrize("otel_enabled", [(True), (False)])
def test__on_response_mod_ack_otel(span_exporter, otel_enabled):
manager, _, dispatcher, leaser, _, scheduler = make_running_manager(
enable_open_telemetry=True
enable_open_telemetry=otel_enabled,
)
manager._callback = mock.sentinel.callback

Expand Down Expand Up @@ -1605,6 +1606,18 @@ def test__on_response_mod_ack_otel(span_exporter):
# Subscribe span would still be active, hence would not be exported.
spans = span_exporter.get_finished_spans()
assert len(spans) == 0
if otel_enabled:
call_args = scheduler.schedule.call_args_list
assert len(call_args) == 2
for call_arg in call_args:
args, _ = call_arg
otel_data = args[1].open_telemetry_data
assert otel_data.concurrency_control_span is not None
assert otel_data.concurrency_control_span.kind == trace.SpanKind.INTERNAL
assert (
otel_data.concurrency_control_span.name
== "subscriber concurrency control"
)


def test__on_response_modifies_ack_deadline():
Expand Down

0 comments on commit eaf6b61

Please sign in to comment.