Skip to content

Commit

Permalink
[EventHubs] update test to test async producer (#19892)
Browse files Browse the repository at this point in the history
- call async Producer for testing/improving code coverage
- remove passing in fake kwarg to PartitionContext; untested lines in PartitionContext can only be tested with a user implemented CheckpointStore class, so not worrying about this
  • Loading branch information
swathipil committed Sep 13, 2021
1 parent 8f28e2a commit 7b2aeaf
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async def test_receive_no_partition_async(connstr_senders):

async def on_event(partition_context, event):
on_event.received += 1
await partition_context.update_checkpoint(event, fake_kwarg="arg") # ignores fake_kwarg
await partition_context.update_checkpoint(event)
on_event.namespace = partition_context.fully_qualified_namespace
on_event.eventhub_name = partition_context.eventhub_name
on_event.consumer_group = partition_context.consumer_group
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from azure.eventhub import EventData, TransportType
from azure.eventhub.exceptions import EventHubError
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.aio import EventHubProducerClient, EventHubConsumerClient


@pytest.mark.liveTest
Expand All @@ -30,16 +30,27 @@ async def on_event(partition_context, event):

on_event.called = False
connection_str, senders = connstr_senders
# test async producer client
producer_client = EventHubProducerClient.from_connection_string(connection_str)
partitions = await producer_client.get_partition_ids()
senders = []
for p in partitions:
sender = producer_client._create_producer(partition_id=p)
senders.append(sender)

client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group='$default')
async with client:
task = asyncio.ensure_future(client.receive(on_event, partition_id="0", starting_position="@latest"))
await asyncio.sleep(10)
assert on_event.called is False
senders[0].send(EventData(b"Receiving only a single event"), partition_key='0')
await senders[0].send(EventData(b"Receiving only a single event"), partition_key='0')
await asyncio.sleep(10)
assert on_event.called is True

await task
for s in senders:
await s.close()
await producer_client.close()


@pytest.mark.parametrize("position, inclusive, expected_result",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def test_receive_no_partition(connstr_senders):

def on_event(partition_context, event):
on_event.received += 1
partition_context.update_checkpoint(event, fake_kwarg="arg") # ignores fake_kwarg
partition_context.update_checkpoint(event)
on_event.namespace = partition_context.fully_qualified_namespace
on_event.eventhub_name = partition_context.eventhub_name
on_event.consumer_group = partition_context.consumer_group
Expand Down

0 comments on commit 7b2aeaf

Please sign in to comment.