From 3e5509d5e181aa3da8b45c2d4248035c6bce4847 Mon Sep 17 00:00:00 2001 From: Kieran Brantner-Magee Date: Mon, 30 Mar 2020 12:45:06 -0700 Subject: [PATCH 1/2] Fix small lingering bugs in stress_test, make duration overrideable from command line, remove historical stress tests now that they're ported. --- sdk/servicebus/azure-servicebus/conftest.py | 4 + .../tests/stress_tests/stress_test_base.py | 37 ++++-- ...stress_test_queue_peeklock_send_receive.py | 119 ------------------ ..._test_queue_peeklock_send_receive_batch.py | 89 ------------- ...s_test_queue_receivedelete_send_receive.py | 85 ------------- ..._queue_receivedelete_send_receive_batch.py | 88 ------------- ...tress_test_queue_reconnect_send_receive.py | 77 ------------ .../stress_test_queue_slow_send_receive.py | 79 ------------ ...ress_test_queue_slow_send_receive_batch.py | 82 ------------ .../tests/stress_tests/test_stress_queues.py | 7 +- 10 files changed, 32 insertions(+), 635 deletions(-) delete mode 100644 sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_queue_peeklock_send_receive.py delete mode 100644 sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_queue_peeklock_send_receive_batch.py delete mode 100644 sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_queue_receivedelete_send_receive.py delete mode 100644 sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_queue_receivedelete_send_receive_batch.py delete mode 100644 sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_queue_reconnect_send_receive.py delete mode 100644 sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_queue_slow_send_receive.py delete mode 100644 sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_queue_slow_send_receive_batch.py diff --git a/sdk/servicebus/azure-servicebus/conftest.py b/sdk/servicebus/azure-servicebus/conftest.py index 5cda4d1f3389..6d440a7f620e 100644 --- a/sdk/servicebus/azure-servicebus/conftest.py +++ b/sdk/servicebus/azure-servicebus/conftest.py @@ -20,6 +20,10 @@ if not any([arg.startswith('test_stress') or arg.endswith('StressTest') for arg in sys.argv]): collect_ignore.append("tests/stress_tests") +# Allow us to pass stress_test_duration from the command line. +def pytest_addoption(parser): + parser.addoption('--stress_test_duration_seconds', action="store", default=None) + # Note: This is duplicated between here and the basic conftest, so that it does not throw warnings if you're # running locally to this SDK. (Everything works properly, pytest just makes a bit of noise.) def pytest_configure(config): diff --git a/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_base.py b/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_base.py index bcc547d4a202..124f5944b957 100644 --- a/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_base.py +++ b/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_base.py @@ -7,6 +7,7 @@ import time from datetime import datetime, timedelta import concurrent +import sys import uuid from azure.servicebus import ServiceBusClient, Message, BatchMessage @@ -18,22 +19,30 @@ class ReceiveType: pull="pull" -class StressTestResults: - total_sent=0 - total_received=0 - time_elapsed=None - state_by_sender={} - state_by_receiver={} +class StressTestResults(object): + def __init__(self): + self.total_sent=0 + self.total_received=0 + self.time_elapsed=None + self.state_by_sender={} + self.state_by_receiver={} + def __repr__(self): + return str(vars(self)) -class StressTestRunnerState: + +class StressTestRunnerState(object): '''Per-runner state, e.g. if you spawn 3 senders each will have this as their state object, which will be coalesced at completion into StressTestResults''' - total_sent=0 - total_received=0 + def __init__(self): + self.total_sent=0 + self.total_received=0 class StressTestRunner: + '''Framework for running a service bus stress test. + Duration can be overriden via the --stress_test_duration flag from the command line''' + def __init__(self, senders, receivers, @@ -58,6 +67,11 @@ def __init__(self, # If we ever require multiple runs of this one after another, just make Run() reset this. self._state = StressTestRunnerState() + self._duration_override = None + for arg in sys.argv: + if arg.startswith('--stress_test_duration_seconds='): + self._duration_override = timedelta(seconds=int(arg.split('=')[1])) + # Plugin functions the caller can override to further tailor the test. @staticmethod @@ -103,7 +117,7 @@ def _ConstructMessage(self): message = Message(self.PreProcessMessageBody("a" * self.message_size)) self.PreProcessMessage(message) batch.add(message) - self.PreProcessBatch(batch) + self.PreProcessMessageBatch(batch) return batch else: message = Message(self.PreProcessMessageBody("a" * self.message_size)) @@ -147,7 +161,7 @@ def _Receive(self, receiver, end_time): def Run(self): start_time = datetime.now() - end_time = start_time + self.duration + end_time = start_time + (self._duration_override or self.duration) sent_messages = 0 received_messages = 0 with concurrent.futures.ProcessPoolExecutor(max_workers=4) as proc_pool: @@ -160,5 +174,6 @@ def Run(self): result.total_sent = sum([r.total_sent for r in result.state_by_sender.values()]) result.total_received = sum([r.total_received for r in result.state_by_receiver.values()]) result.time_elapsed = end_time - start_time + print("Stress test completed. Results:\n", result) return result diff --git a/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_queue_peeklock_send_receive.py b/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_queue_peeklock_send_receive.py deleted file mode 100644 index cffcf53a6174..000000000000 --- a/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_queue_peeklock_send_receive.py +++ /dev/null @@ -1,119 +0,0 @@ -#------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for -# license information. -#-------------------------------------------------------------------------- - -import os -import time -import uuid -from datetime import datetime, timedelta -import concurrent - -from azure.servicebus import ServiceBusClient -from azure.servicebus.common.message import BatchMessage -from azure.servicebus.common.constants import ReceiveSettleMode - - -def create_standard_queue(sb_config): - from azure.servicebus.control_client import ServiceBusService, Queue - queue_name = str(uuid.uuid4()) - queue_value = Queue( - lock_duration='PT30S', - requires_duplicate_detection=False, - dead_lettering_on_message_expiration=True, - requires_session=False) - client = ServiceBusService( - service_namespace=sb_config['hostname'], - shared_access_key_name=sb_config['key_name'], - shared_access_key_value=sb_config['access_key']) - if client.create_queue(queue_name, queue=queue_value, fail_on_exist=True): - return queue_name - raise ValueError("Queue creation failed.") - - -def cleanup_queue(servicebus_config, queue_name): - from azure.servicebus.control_client import ServiceBusService - client = ServiceBusService( - service_namespace=servicebus_config['hostname'], - shared_access_key_name=servicebus_config['key_name'], - shared_access_key_value=servicebus_config['access_key']) - client.delete_queue(queue_name) - - -def message_send_process(sb_config, queue, endtime): - - def message_batch(): - for i in range(5): - yield "Stress Test message no. {}".format(i) - - client = ServiceBusClient( - service_namespace=sb_config['hostname'], - shared_access_key_name=sb_config['key_name'], - shared_access_key_value=sb_config['access_key'], - debug=False) - - total = 0 - queue_client = client.get_queue(queue) - with queue_client.get_sender() as sender: - while endtime > datetime.now(): - message = BatchMessage(message_batch()) - sender.send(message) - total += 5 - time.sleep(0.01) - if total % 50 == 0: - print("Sent {} messages".format(total)) - return total - - -def message_receive_process(sb_config, queue, endtime): - client = ServiceBusClient( - service_namespace=sb_config['hostname'], - shared_access_key_name=sb_config['key_name'], - shared_access_key_value=sb_config['access_key'], - debug=False) - - queue_client = client.get_queue(queue) - with queue_client.get_receiver(idle_timeout=10, mode=ReceiveSettleMode.PeekLock, prefetch=10) as receiver: - total = 0 - for message in receiver: - message.complete() - total += 1 - if total % 50 == 0: - print("Received {} messages".format(total)) - if endtime <= datetime.now(): - break - - return total - - -def stress_test_queue_peeklock_send_receive(sb_config, queue): - starttime = datetime.now() - endtime = starttime + timedelta(hours=24) - sent_messages = 0 - received_messages = 0 - - with concurrent.futures.ProcessPoolExecutor(max_workers=4) as proc_pool: - senders = [proc_pool.submit(message_send_process, sb_config, queue, endtime) for i in range(2)] - receivers = [proc_pool.submit(message_receive_process, sb_config, queue, endtime) for i in range(2)] - - for done in concurrent.futures.as_completed(senders + receivers): - if done in senders: - sent_messages += done.result() - else: - received_messages += done.result() - print("Sent {} messages and received {} messages.".format(sent_messages, received_messages)) - - -if __name__ == '__main__': - live_config = {} - live_config['hostname'] = os.environ['SERVICE_BUS_HOSTNAME'] - live_config['key_name'] = os.environ['SERVICE_BUS_SAS_POLICY'] - live_config['access_key'] = os.environ['SERVICE_BUS_SAS_KEY'] - try: - test_queue = create_standard_queue(live_config) - print("Created queue {}".format(test_queue)) - stress_test_queue_peeklock_send_receive(live_config, test_queue) - finally: - print("Cleaning up queue {}".format(test_queue)) - cleanup_queue(live_config, test_queue) diff --git a/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_queue_peeklock_send_receive_batch.py b/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_queue_peeklock_send_receive_batch.py deleted file mode 100644 index 634f4085b8df..000000000000 --- a/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_queue_peeklock_send_receive_batch.py +++ /dev/null @@ -1,89 +0,0 @@ -#------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for -# license information. -#-------------------------------------------------------------------------- - -import time -from datetime import datetime, timedelta -import concurrent - -import conftest -from azure.servicebus import ServiceBusClient -from azure.servicebus.common.message import BatchMessage -from azure.servicebus.common.constants import ReceiveSettleMode - - -def message_send_process(sb_config, queue, endtime): - - def message_batch(): - for i in range(5): - yield "Stress Test message no. {}".format(i) - - client = ServiceBusClient( - service_namespace=sb_config['hostname'], - shared_access_key_name=sb_config['key_name'], - shared_access_key_value=sb_config['access_key'], - debug=False) - - total = 0 - queue_client = client.get_queue(queue) - with queue_client.get_sender() as sender: - while endtime > datetime.now(): - message = BatchMessage(message_batch()) - sender.send(message) - total += 5 - time.sleep(0.01) - return total - - -def message_receive_process(sb_config, queue, endtime): - client = ServiceBusClient( - service_namespace=sb_config['hostname'], - shared_access_key_name=sb_config['key_name'], - shared_access_key_value=sb_config['access_key'], - debug=False) - - queue_client = client.get_queue(queue) - with queue_client.get_receiver(idle_timeout=10, mode=ReceiveSettleMode.PeekLock, prefetch=10) as receiver: - total = 0 - batch = receiver.fetch_next() - while batch: - for message in batch: - message.complete() - total += 1 - if endtime <= datetime.now(): - break - batch = receiver.fetch_next() - - return total - - -def stress_test_queue_peeklock_send_receive_batch(sb_config, queue): - starttime = datetime.now() - endtime = starttime + timedelta(seconds=30) - sent_messages = 0 - received_messages = 0 - - with concurrent.futures.ProcessPoolExecutor(max_workers=4) as proc_pool: - senders = [proc_pool.submit(message_send_process, sb_config, queue, endtime) for i in range(2)] - receivers = [proc_pool.submit(message_receive_process, sb_config, queue, endtime) for i in range(2)] - - for done in concurrent.futures.as_completed(senders + receivers): - if done in senders: - sent_messages += done.result() - else: - received_messages += done.result() - - print("Sent {} messages and received {} messages.".format(sent_messages, received_messages)) - - -if __name__ == '__main__': - live_config = conftest.get_live_servicebus_config() - queue_name = conftest.create_standard_queue(live_config) - print("Created queue {}".format(queue_name)) - try: - stress_test_queue_peeklock_send_receive_batch(live_config, queue_name) - finally: - print("Cleaning up queue {}".format(queue_name)) - conftest.cleanup_queue(live_config, queue_name) diff --git a/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_queue_receivedelete_send_receive.py b/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_queue_receivedelete_send_receive.py deleted file mode 100644 index ecca0a43cb83..000000000000 --- a/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_queue_receivedelete_send_receive.py +++ /dev/null @@ -1,85 +0,0 @@ -#------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for -# license information. -#-------------------------------------------------------------------------- - -import time -from datetime import datetime, timedelta -import concurrent - -import conftest -from azure.servicebus import ServiceBusClient -from azure.servicebus.common.message import BatchMessage -from azure.servicebus.common.constants import ReceiveSettleMode - - -def message_send_process(sb_config, queue, endtime): - - def message_batch(): - for i in range(5): - yield "Stress Test message no. {}".format(i) - - client = ServiceBusClient( - service_namespace=sb_config['hostname'], - shared_access_key_name=sb_config['key_name'], - shared_access_key_value=sb_config['access_key'], - debug=False) - - total = 0 - queue_client = client.get_queue(queue) - with queue_client.get_sender() as sender: - while endtime > datetime.now(): - message = BatchMessage(message_batch()) - sender.send(message) - total += 5 - time.sleep(0.01) - return total - - -def message_receive_process(sb_config, queue, endtime): - client = ServiceBusClient( - service_namespace=sb_config['hostname'], - shared_access_key_name=sb_config['key_name'], - shared_access_key_value=sb_config['access_key'], - debug=False) - - queue_client = client.get_queue(queue) - with queue_client.get_receiver(idle_timeout=10, mode=ReceiveSettleMode.ReceiveAndDelete, prefetch=10) as receiver: - total = 0 - for _ in receiver: - total += 1 - if endtime <= datetime.now(): - break - - return total - - -def stress_test_queue_receivedelete_send_receive(sb_config, queue): - starttime = datetime.now() - endtime = starttime + timedelta(seconds=130) - sent_messages = 0 - received_messages = 0 - - with concurrent.futures.ProcessPoolExecutor(max_workers=4) as proc_pool: - senders = [proc_pool.submit(message_send_process, sb_config, queue, endtime) for i in range(2)] - receivers = [proc_pool.submit(message_receive_process, sb_config, queue, endtime) for i in range(2)] - - for done in concurrent.futures.as_completed(senders + receivers): - if done in senders: - sent_messages += done.result() - else: - received_messages += done.result() - - print("Sent {} messages and received {} messages.".format(sent_messages, received_messages)) - - -if __name__ == '__main__': - live_config = conftest.get_live_servicebus_config() - queue_name = conftest.create_standard_queue(live_config) - print("Created queue {}".format(queue_name)) - try: - stress_test_queue_receivedelete_send_receive(live_config, queue_name) - finally: - print("Cleaning up queue {}".format(queue_name)) - conftest.cleanup_queue(live_config, queue_name) diff --git a/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_queue_receivedelete_send_receive_batch.py b/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_queue_receivedelete_send_receive_batch.py deleted file mode 100644 index 87b52d771eb9..000000000000 --- a/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_queue_receivedelete_send_receive_batch.py +++ /dev/null @@ -1,88 +0,0 @@ -#------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for -# license information. -#-------------------------------------------------------------------------- - -import time -from datetime import datetime, timedelta -import concurrent - -import conftest -from azure.servicebus import ServiceBusClient -from azure.servicebus.common.message import BatchMessage -from azure.servicebus.common.constants import ReceiveSettleMode - - -def message_send_process(sb_config, queue, endtime): - - def message_batch(): - for i in range(5): - yield "Stress Test message no. {}".format(i) - - client = ServiceBusClient( - service_namespace=sb_config['hostname'], - shared_access_key_name=sb_config['key_name'], - shared_access_key_value=sb_config['access_key'], - debug=False) - - total = 0 - queue_client = client.get_queue(queue) - with queue_client.get_sender() as sender: - while endtime > datetime.now(): - message = BatchMessage(message_batch()) - sender.send(message) - total += 5 - time.sleep(0.01) - return total - - -def message_receive_process(sb_config, queue, endtime): - client = ServiceBusClient( - service_namespace=sb_config['hostname'], - shared_access_key_name=sb_config['key_name'], - shared_access_key_value=sb_config['access_key'], - debug=False) - - queue_client = client.get_queue(queue) - with queue_client.get_receiver(idle_timeout=10, mode=ReceiveSettleMode.ReceiveAndDelete, prefetch=10) as receiver: - total = 0 - batch = receiver.fetch_next() - while batch: - for _ in batch: - total += 1 - if endtime <= datetime.now(): - break - batch = receiver.fetch_next() - - return total - - -def stress_test_queue_receivedelete_send_receive_batch(sb_config, queue): - starttime = datetime.now() - endtime = starttime + timedelta(seconds=30) - sent_messages = 0 - received_messages = 0 - - with concurrent.futures.ProcessPoolExecutor(max_workers=4) as proc_pool: - senders = [proc_pool.submit(message_send_process, sb_config, queue, endtime) for i in range(2)] - receivers = [proc_pool.submit(message_receive_process, sb_config, queue, endtime) for i in range(2)] - - for done in concurrent.futures.as_completed(senders + receivers): - if done in senders: - sent_messages += done.result() - else: - received_messages += done.result() - - print("Sent {} messages and received {} messages.".format(sent_messages, received_messages)) - - -if __name__ == '__main__': - live_config = conftest.get_live_servicebus_config() - queue_name = conftest.create_standard_queue(live_config) - print("Created queue {}".format(queue_name)) - try: - stress_test_queue_receivedelete_send_receive_batch(live_config, queue_name) - finally: - print("Cleaning up queue {}".format(queue_name)) - conftest.cleanup_queue(live_config, queue_name) diff --git a/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_queue_reconnect_send_receive.py b/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_queue_reconnect_send_receive.py deleted file mode 100644 index 154c47b53672..000000000000 --- a/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_queue_reconnect_send_receive.py +++ /dev/null @@ -1,77 +0,0 @@ -#------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for -# license information. -#-------------------------------------------------------------------------- - -import time -from datetime import datetime, timedelta -import concurrent - -import conftest -from azure.servicebus import ServiceBusClient, Message - - -def message_send_process(sb_config, queue, endtime): - client = ServiceBusClient( - service_namespace=sb_config['hostname'], - shared_access_key_name=sb_config['key_name'], - shared_access_key_value=sb_config['access_key'], - debug=False) - - total = 0 - queue_client = client.get_queue(queue) - while endtime > datetime.now(): - queue_client.send(Message("Slow stress test message")) - total += 1 - time.sleep(3) - return total - - -def message_receive_process(sb_config, queue, endtime): - client = ServiceBusClient( - service_namespace=sb_config['hostname'], - shared_access_key_name=sb_config['key_name'], - shared_access_key_value=sb_config['access_key'], - debug=False) - - queue_client = client.get_queue(queue) - total = 0 - while endtime > datetime.now(): - with queue_client.get_receiver() as receiver: - batch = receiver.fetch_next() - for message in batch: - total += 1 - message.complete() - - return total - - -def stress_test_queue_slow_send_receive(sb_config, queue): - starttime = datetime.now() - endtime = starttime + timedelta(seconds=30) - sent_messages = 0 - received_messages = 0 - - with concurrent.futures.ProcessPoolExecutor(max_workers=4) as proc_pool: - senders = [proc_pool.submit(message_send_process, sb_config, queue, endtime) for i in range(1)] - receivers = [proc_pool.submit(message_receive_process, sb_config, queue, endtime) for i in range(1)] - - for done in concurrent.futures.as_completed(senders + receivers): - if done in senders: - sent_messages += done.result() - else: - received_messages += done.result() - - print("Sent {} messages and received {} messages.".format(sent_messages, received_messages)) - - -if __name__ == '__main__': - live_config = conftest.get_live_servicebus_config() - queue_name = conftest.create_standard_queue(live_config) - print("Created queue {}".format(queue_name)) - try: - stress_test_queue_slow_send_receive(live_config, queue_name) - finally: - print("Cleaning up queue {}".format(queue_name)) - conftest.cleanup_queue(live_config, queue_name) diff --git a/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_queue_slow_send_receive.py b/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_queue_slow_send_receive.py deleted file mode 100644 index 1c13ae52d91f..000000000000 --- a/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_queue_slow_send_receive.py +++ /dev/null @@ -1,79 +0,0 @@ -#------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for -# license information. -#-------------------------------------------------------------------------- - -import time -from datetime import datetime, timedelta -import concurrent - -import conftest -from azure.servicebus import ServiceBusClient, Message -from azure.servicebus.common.constants import ReceiveSettleMode - - -def message_send_process(sb_config, queue, endtime): - client = ServiceBusClient( - service_namespace=sb_config['hostname'], - shared_access_key_name=sb_config['key_name'], - shared_access_key_value=sb_config['access_key'], - debug=False) - - total = 0 - queue_client = client.get_queue(queue) - with queue_client.get_sender() as sender: - while endtime > datetime.now(): - sender.send(Message("Slow stress test message")) - total += 1 - time.sleep(3600) - return total - - -def message_receive_process(sb_config, queue, endtime): - client = ServiceBusClient( - service_namespace=sb_config['hostname'], - shared_access_key_name=sb_config['key_name'], - shared_access_key_value=sb_config['access_key'], - debug=False) - - queue_client = client.get_queue(queue) - with queue_client.get_receiver(mode=ReceiveSettleMode.PeekLock) as receiver: - total = 0 - for message in receiver: - message.complete() - total += 1 - if endtime <= datetime.now(): - break - - return total - - -def stress_test_queue_slow_send_receive(sb_config, queue): - starttime = datetime.now() - endtime = starttime + timedelta(hours=3) - sent_messages = 0 - received_messages = 0 - - with concurrent.futures.ProcessPoolExecutor(max_workers=4) as proc_pool: - senders = [proc_pool.submit(message_send_process, sb_config, queue, endtime) for i in range(1)] - receivers = [proc_pool.submit(message_receive_process, sb_config, queue, endtime) for i in range(1)] - - for done in concurrent.futures.as_completed(senders + receivers): - if done in senders: - sent_messages += done.result() - else: - received_messages += done.result() - - print("Sent {} messages and received {} messages.".format(sent_messages, received_messages)) - - -if __name__ == '__main__': - live_config = conftest.get_live_servicebus_config() - queue_name = conftest.create_standard_queue(live_config) - print("Created queue {}".format(queue_name)) - try: - stress_test_queue_slow_send_receive(live_config, queue_name) - finally: - print("Cleaning up queue {}".format(queue_name)) - conftest.cleanup_queue(live_config, queue_name) diff --git a/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_queue_slow_send_receive_batch.py b/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_queue_slow_send_receive_batch.py deleted file mode 100644 index c92fd6ba93f6..000000000000 --- a/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_queue_slow_send_receive_batch.py +++ /dev/null @@ -1,82 +0,0 @@ -#------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for -# license information. -#-------------------------------------------------------------------------- - -import time -from datetime import datetime, timedelta -import concurrent - -import conftest -from azure.servicebus import ServiceBusClient, Message -from azure.servicebus.common.constants import ReceiveSettleMode - - -def message_send_process(sb_config, queue, endtime): - client = ServiceBusClient( - service_namespace=sb_config['hostname'], - shared_access_key_name=sb_config['key_name'], - shared_access_key_value=sb_config['access_key'], - debug=False) - - total = 0 - queue_client = client.get_queue(queue) - with queue_client.get_sender() as sender: - while endtime > datetime.now(): - sender.send(Message("Slow stress test message")) - total += 1 - time.sleep(3600) - return total - - -def message_receive_process(sb_config, queue, endtime): - client = ServiceBusClient( - service_namespace=sb_config['hostname'], - shared_access_key_name=sb_config['key_name'], - shared_access_key_value=sb_config['access_key'], - debug=False) - - queue_client = client.get_queue(queue) - with queue_client.get_receiver(mode=ReceiveSettleMode.PeekLock) as receiver: - total = 0 - batch = receiver.fetch_next() - while batch: - for message in batch: - message.complete() - total += 1 - if endtime <= datetime.now(): - break - batch = receiver.fetch_next() - - return total - - -def stress_test_queue_slow_send_receive(sb_config, queue): - starttime = datetime.now() - endtime = starttime + timedelta(hours=3) - sent_messages = 0 - received_messages = 0 - - with concurrent.futures.ProcessPoolExecutor(max_workers=4) as proc_pool: - senders = [proc_pool.submit(message_send_process, sb_config, queue, endtime) for i in range(1)] - receivers = [proc_pool.submit(message_receive_process, sb_config, queue, endtime) for i in range(2)] - - for done in concurrent.futures.as_completed(senders + receivers): - if done in senders: - sent_messages += done.result() - else: - received_messages += done.result() - - print("Sent {} messages and received {} messages.".format(sent_messages, received_messages)) - - -if __name__ == '__main__': - live_config = conftest.get_live_servicebus_config() - queue_name = conftest.create_standard_queue(live_config) - print("Created queue {}".format(queue_name)) - try: - stress_test_queue_slow_send_receive(live_config, queue_name) - finally: - print("Cleaning up queue {}".format(queue_name)) - conftest.cleanup_queue(live_config, queue_name) diff --git a/sdk/servicebus/azure-servicebus/tests/stress_tests/test_stress_queues.py b/sdk/servicebus/azure-servicebus/tests/stress_tests/test_stress_queues.py index 67d9e1700d4a..befd1e7d89d8 100644 --- a/sdk/servicebus/azure-servicebus/tests/stress_tests/test_stress_queues.py +++ b/sdk/servicebus/azure-servicebus/tests/stress_tests/test_stress_queues.py @@ -21,7 +21,6 @@ _logger = get_logger(logging.DEBUG) -#TODO: Make test duration settable via command line arg? class ServiceBusQueueStressTests(AzureMgmtTestCase): @pytest.mark.liveTest @@ -38,7 +37,6 @@ def test_stress_queue_send_and_receive(self, servicebus_namespace_connection_str duration=timedelta(seconds=60)) result = stress_test.Run() - print(result) assert(result.total_sent > 0) assert(result.total_received > 0) @@ -58,7 +56,6 @@ def test_stress_queue_send_and_pull_receive(self, servicebus_namespace_connectio duration=timedelta(seconds=60)) result = stress_test.Run() - print(result) assert(result.total_sent > 0) assert(result.total_received > 0) @@ -110,8 +107,8 @@ def test_stress_queue_receive_and_delete(self, servicebus_namespace_connection_s sb_client = ServiceBusClient.from_connection_string( servicebus_namespace_connection_string, debug=False) - stress_test = StressTestRunner(senders = [sb_client.get_queue_sender(servicebus_queue.name, mode=ReceiveSettleMode.ReceiveAndDelete)], - receivers = [sb_client.get_queue_receiver(servicebus_queue.name)], + stress_test = StressTestRunner(senders = [sb_client.get_queue_sender(servicebus_queue.name)], + receivers = [sb_client.get_queue_receiver(servicebus_queue.name, mode=ReceiveSettleMode.ReceiveAndDelete)], duration=timedelta(seconds=60)) result = stress_test.Run() From ab852458338c1cddaab9d5bee7da4b16cf95d194 Mon Sep 17 00:00:00 2001 From: Kieran Brantner-Magee Date: Mon, 30 Mar 2020 12:46:11 -0700 Subject: [PATCH 2/2] improve stress test result printout --- .../azure-servicebus/tests/stress_tests/stress_test_base.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_base.py b/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_base.py index 124f5944b957..a42b54548e2a 100644 --- a/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_base.py +++ b/sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_base.py @@ -38,6 +38,9 @@ def __init__(self): self.total_sent=0 self.total_received=0 + def __repr__(self): + return str(vars(self)) + class StressTestRunner: '''Framework for running a service bus stress test.