Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kibrantn/servicebus/track 2 stress test cleanup #11

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sdk/servicebus/azure-servicebus/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
if not any([arg.startswith('test_stress') or arg.endswith('StressTest') for arg in sys.argv]):
collect_ignore.append("tests/stress_tests")

# Allow us to pass stress_test_duration from the command line.
def pytest_addoption(parser):
parser.addoption('--stress_test_duration_seconds', action="store", default=None)

# Note: This is duplicated between here and the basic conftest, so that it does not throw warnings if you're
# running locally to this SDK. (Everything works properly, pytest just makes a bit of noise.)
def pytest_configure(config):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import time
from datetime import datetime, timedelta
import concurrent
import sys
import uuid

from azure.servicebus import ServiceBusClient, Message, BatchMessage
Expand All @@ -18,22 +19,33 @@ class ReceiveType:
pull="pull"


class StressTestResults:
total_sent=0
total_received=0
time_elapsed=None
state_by_sender={}
state_by_receiver={}
class StressTestResults(object):
def __init__(self):
self.total_sent=0
self.total_received=0
self.time_elapsed=None
self.state_by_sender={}
self.state_by_receiver={}

def __repr__(self):
return str(vars(self))

class StressTestRunnerState:

class StressTestRunnerState(object):
'''Per-runner state, e.g. if you spawn 3 senders each will have this as their state object,
which will be coalesced at completion into StressTestResults'''
total_sent=0
total_received=0
def __init__(self):
self.total_sent=0
self.total_received=0

def __repr__(self):
return str(vars(self))


class StressTestRunner:
'''Framework for running a service bus stress test.
Duration can be overriden via the --stress_test_duration flag from the command line'''

def __init__(self,
senders,
receivers,
Expand All @@ -58,6 +70,11 @@ def __init__(self,
# If we ever require multiple runs of this one after another, just make Run() reset this.
self._state = StressTestRunnerState()

self._duration_override = None
for arg in sys.argv:
if arg.startswith('--stress_test_duration_seconds='):
self._duration_override = timedelta(seconds=int(arg.split('=')[1]))


# Plugin functions the caller can override to further tailor the test.
@staticmethod
Expand Down Expand Up @@ -103,7 +120,7 @@ def _ConstructMessage(self):
message = Message(self.PreProcessMessageBody("a" * self.message_size))
self.PreProcessMessage(message)
batch.add(message)
self.PreProcessBatch(batch)
self.PreProcessMessageBatch(batch)
return batch
else:
message = Message(self.PreProcessMessageBody("a" * self.message_size))
Expand Down Expand Up @@ -147,7 +164,7 @@ def _Receive(self, receiver, end_time):

def Run(self):
start_time = datetime.now()
end_time = start_time + self.duration
end_time = start_time + (self._duration_override or self.duration)
sent_messages = 0
received_messages = 0
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as proc_pool:
Expand All @@ -160,5 +177,6 @@ def Run(self):
result.total_sent = sum([r.total_sent for r in result.state_by_sender.values()])
result.total_received = sum([r.total_received for r in result.state_by_receiver.values()])
result.time_elapsed = end_time - start_time
print("Stress test completed. Results:\n", result)
return result

This file was deleted.

This file was deleted.

This file was deleted.

Loading