diff --git a/tools/azure-devtools/setup.py b/tools/azure-devtools/setup.py index ba7da7982693..d800c2b0fbca 100644 --- a/tools/azure-devtools/setup.py +++ b/tools/azure-devtools/setup.py @@ -49,13 +49,27 @@ packages=[ 'azure_devtools', 'azure_devtools.scenario_tests', + 'azure_devtools.perfstress_tests', 'azure_devtools.ci_tools', ], + entry_points={ + 'console_scripts': [ + 'perfstress = azure_devtools.perfstress_tests:run_perfstress_cmd', + 'systemperf = azure_devtools.perfstress_tests:run_system_perfstress_tests_cmd', + ], + }, extras_require={ 'ci_tools':[ "PyGithub>=1.40", # Can Merge PR after 1.36, "requests" and tests after 1.40 "GitPython", "requests>=2.0" + ], + 'systemperf':[ + "aiohttp>=3.0", + "requests>=2.0", + "tornado==6.0.3" + "pycurl==7.43.0.5" + "httpx==0.11.1" ] }, package_dir={'': 'src'}, diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/__init__.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/__init__.py new file mode 100644 index 000000000000..e3c21008314b --- /dev/null +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/__init__.py @@ -0,0 +1,34 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import os +import asyncio + +from .perf_stress_runner import PerfStressRunner +from .perf_stress_test import PerfStressTest +from .random_stream import RandomStream, get_random_bytes +from .async_random_stream import AsyncRandomStream + +__all__ = [ + "PerfStressRunner", + "PerfStressTest", + "RandomStream", + "AsyncRandomStream", + "get_random_bytes" +] + + +def run_perfstress_cmd(): + main_loop = PerfStressRunner() + loop = asyncio.get_event_loop() + loop.run_until_complete(main_loop.start()) + + +def run_system_perfstress_tests_cmd(): + root_dir = os.path.dirname(os.path.abspath(__file__)) + sys_test_dir = os.path.join(root_dir, 'system_perfstress') + main_loop = PerfStressRunner(test_folder_path=sys_test_dir) + loop = asyncio.get_event_loop() + loop.run_until_complete(main_loop.start()) diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/async_random_stream.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/async_random_stream.py new file mode 100644 index 000000000000..11f0d663c416 --- /dev/null +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/async_random_stream.py @@ -0,0 +1,39 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +from io import BytesIO + +from .random_stream import get_random_bytes + + +class AsyncRandomStream(BytesIO): + def __init__(self, length, initial_buffer_length=1024 * 1024): + super().__init__() + self._base_data = get_random_bytes(initial_buffer_length) + self._base_data_length = initial_buffer_length + self._position = 0 + self._remaining = length + self._closed = False + + def read(self, size=None): + if self._remaining == 0: + return b"" + + if size is None: + e = self._base_data_length + else: + e = size + e = min(e, self._remaining) + if e > self._base_data_length: + self._base_data = get_random_bytes(e) + self._base_data_length = e + self._remaining = self._remaining - e + return self._base_data[:e] + + def remaining(self): + return self._remaining + + def close(self): + self._closed = True diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_runner.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_runner.py new file mode 100644 index 000000000000..f90c246d848c --- /dev/null +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_runner.py @@ -0,0 +1,199 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import argparse +import asyncio +import time +import inspect +import logging +import os +import pkgutil +import sys +import threading + +from .perf_stress_test import PerfStressTest +from .repeated_timer import RepeatedTimer + + +class PerfStressRunner: + def __init__(self, test_folder_path=None): + if test_folder_path is None: + # Use current working directory + test_folder_path = os.getcwd() + + self.logger = logging.getLogger(__name__) + self.logger.setLevel(level=logging.INFO) + handler = logging.StreamHandler() + handler.setLevel(level=logging.INFO) + self.logger.addHandler(handler) + + #NOTE: If you need to support registering multiple test locations, move this into Initialize, call lazily on Run, expose RegisterTestLocation function. + self._discover_tests(test_folder_path) + self._parse_args() + + + def _parse_args(self): + # First, detect which test we're running. + arg_parser = argparse.ArgumentParser( + description='Python Perf Test Runner', + usage='{} []'.format(__file__)) + + # NOTE: remove this and add another help string to query for available tests + # if/when # of classes become enough that this isn't practical. + arg_parser.add_argument('test', help='Which test to run. Supported tests: {}'.format(" ".join(sorted(self._test_classes.keys())))) + + args = arg_parser.parse_args(sys.argv[1:2]) + try: + self._test_class_to_run = self._test_classes[args.test] + except KeyError as e: + self.logger.error("Invalid test: {}\n Test must be one of: {}\n".format(args.test, " ".join(sorted(self._test_classes.keys())))) + raise + + # Next, parse args for that test. We also do global args here too so as not to confuse the initial test parse. + per_test_arg_parser = argparse.ArgumentParser( + description=self._test_class_to_run.__doc__ or args.test, + usage='{} {} []'.format(__file__, args.test)) + + # Global args + per_test_arg_parser.add_argument('-p', '--parallel', nargs='?', type=int, help='Degree of parallelism to run with. Default is 1.', default=1) + per_test_arg_parser.add_argument('-d', '--duration', nargs='?', type=int, help='Duration of the test in seconds. Default is 10.', default=10) + per_test_arg_parser.add_argument('-i', '--iterations', nargs='?', type=int, help='Number of iterations in the main test loop. Default is 1.', default=1) + per_test_arg_parser.add_argument('-w', '--warmup', nargs='?', type=int, help='Duration of warmup in seconds. Default is 5.', default=5) + per_test_arg_parser.add_argument('--no-cleanup', action='store_true', help='Do not run cleanup logic. Default is false.', default=False) + per_test_arg_parser.add_argument('--sync', action='store_true', help='Run tests in sync mode. Default is False.', default=False) + + # Per-test args + self._test_class_to_run.add_arguments(per_test_arg_parser) + self.per_test_args = per_test_arg_parser.parse_args(sys.argv[2:]) + + self.logger.info("") + self.logger.info("=== Options ===") + self.logger.info(args) + self.logger.info(self.per_test_args) + self.logger.info("") + + + def _discover_tests(self, test_folder_path): + self._test_classes = {} + + # Dynamically enumerate all python modules under the tests path for classes that implement PerfStressTest + for loader, name, _ in pkgutil.walk_packages([test_folder_path]): + try: + module = loader.find_module(name).load_module(name) + except Exception as e: + self.logger.warn("Unable to load module {}: {}".format(name, e)) + continue + for name, value in inspect.getmembers(module): + + if name.startswith('_'): + continue + if inspect.isclass(value) and issubclass(value, PerfStressTest) and value != PerfStressTest: + self.logger.info("Loaded test class: {}".format(name)) + self._test_classes[name] = value + + + async def start(self): + self.logger.info("=== Setup ===") + + tests = [] + for _ in range(0, self.per_test_args.parallel): + tests.append(self._test_class_to_run(self.per_test_args)) + + try: + try: + await tests[0].global_setup() + try: + await asyncio.gather(*[test.setup() for test in tests]) + + self.logger.info("") + + if self.per_test_args.warmup > 0: + await self._run_tests(tests, self.per_test_args.warmup, "Warmup") + + for i in range(0, self.per_test_args.iterations): + title = "Test" + if self.per_test_args.iterations > 1: + title += " " + (i + 1) + await self._run_tests(tests, self.per_test_args.duration, title) + except Exception as e: + print("Exception: " + str(e)) + finally: + if not self.per_test_args.no_cleanup: + self.logger.info("=== Cleanup ===") + await asyncio.gather(*[test.cleanup() for test in tests]) + except Exception as e: + print("Exception: " + str(e)) + finally: + if not self.per_test_args.no_cleanup: + await tests[0].global_cleanup() + except Exception as e: + print("Exception: " + str(e)) + finally: + await asyncio.gather(*[test.close() for test in tests]) + + + async def _run_tests(self, tests, duration, title): + self._completed_operations = [0] * len(tests) + self._last_completion_times = [0] * len(tests) + self._last_total_operations = -1 + + status_thread = RepeatedTimer(1, self._print_status, title) + + if self.per_test_args.sync: + threads = [] + for id, test in enumerate(tests): + thread = threading.Thread(target=lambda: self._run_sync_loop(test, duration, id)) + threads.append(thread) + thread.start() + for thread in threads: + thread.join() + else: + await asyncio.gather(*[self._run_async_loop(test, duration, id) for id, test in enumerate(tests)]) + + status_thread.stop() + + self.logger.info("") + self.logger.info("=== Results ===") + + total_operations = sum(self._completed_operations) + operations_per_second = sum(map( + lambda x: x[0] / x[1], + zip(self._completed_operations, self._last_completion_times))) + seconds_per_operation = 1 / operations_per_second + weighted_average_seconds = total_operations / operations_per_second + + self.logger.info("Completed {} operations in a weighted-average of {:.2f}s ({:.2f} ops/s, {:.3f} s/op)".format( + total_operations, weighted_average_seconds, operations_per_second, seconds_per_operation)) + self.logger.info("") + + + def _run_sync_loop(self, test, duration, id): + start = time.time() + runtime = 0 + while runtime < duration: + test.run_sync() + runtime = time.time() - start + self._completed_operations[id] += 1 + self._last_completion_times[id] = runtime + + + async def _run_async_loop(self, test, duration, id): + start = time.time() + runtime = 0 + while runtime < duration: + await test.run_async() + runtime = time.time() - start + self._completed_operations[id] += 1 + self._last_completion_times[id] = runtime + + + def _print_status(self, title): + if self._last_total_operations == -1: + self._last_total_operations = 0 + self.logger.info("=== {} ===\nCurrent\t\tTotal".format(title)) + + total_operations = sum(self._completed_operations) + self.logger.info("{}\t\t{}".format(total_operations - self._last_total_operations, total_operations)) + self._last_total_operations = total_operations diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_test.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_test.py new file mode 100644 index 000000000000..c59a7973216d --- /dev/null +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_test.py @@ -0,0 +1,63 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import os + + +class PerfStressTest: + '''Base class for implementing a python perf test. + + - run_sync and run_async must be implemented. + - global_setup and global_cleanup are optional and run once, ever, regardless of parallelism. + - setup and cleanup are run once per test instance (where each instance runs in its own thread/process), regardless of #iterations. + - close is run once per test instance, after cleanup and global_cleanup. + - run_sync/run_async are run once per iteration. + ''' + args = {} + + def __init__(self, arguments): + self.args = arguments + + async def global_setup(self): + return + + async def global_cleanup(self): + return + + async def setup(self): + return + + async def cleanup(self): + return + + async def close(self): + return + + def __enter__(self): + return + + def __exit__(self, exc_type, exc_value, traceback): + return + + def run_sync(self): + raise Exception('run_sync must be implemented for {}'.format(self.__class__.__name__)) + + async def run_async(self): + raise Exception('run_async must be implemented for {}'.format(self.__class__.__name__)) + + @staticmethod + def add_arguments(parser): + """ + Override this method to add test-specific argparser args to the class. + These are accessible in __init__() and the self.args property. + """ + return + + @staticmethod + def get_from_env(variable): + value = os.environ.get(variable) + if not value: + raise Exception("Undefined environment variable {}".format(variable)) + return value diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/random_stream.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/random_stream.py new file mode 100644 index 000000000000..0ab5283c288f --- /dev/null +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/random_stream.py @@ -0,0 +1,35 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import os + +def get_random_bytes(buffer_length): + return os.urandom(buffer_length) + + +class RandomStream: + def __init__(self, length, initial_buffer_length=1024*1024): + self._base_data = get_random_bytes(initial_buffer_length) + self._base_data_length = initial_buffer_length + self._position = 0 + self._remaining = length + + def read(self, size=None): + if self._remaining == 0: + return None + + if size is None: + e = self._base_data_length + else: + e = size + e = min(e, self._remaining) + if e > self._base_data_length: + self._base_data = get_random_bytes(e) + self._base_data_length = e + self._remaining = self._remaining - e + return self._base_data[:e] + + def remaining(self): + return self._remaining \ No newline at end of file diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/repeated_timer.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/repeated_timer.py new file mode 100644 index 000000000000..902203567524 --- /dev/null +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/repeated_timer.py @@ -0,0 +1,37 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +from threading import Timer + +# Credit to https://stackoverflow.com/questions/3393612/run-certain-code-every-n-seconds +class RepeatedTimer(object): + def __init__(self, interval, function, *args, **kwargs): + self._timer = None + self.interval = interval + self.function = function + self.args = args + self.kwargs = kwargs + self.is_running = False + self.start() + + + def _run(self): + self.is_running = False + self.start() + self.function(*self.args, **self.kwargs) + + + def start(self): + if not self.is_running: + #NOTE: If there is a concern about perf impact of this Timer, we'd need to convert to multiprocess and use IPC. + + self._timer = Timer(self.interval, self._run) + self._timer.start() + self.is_running = True + + + def stop(self): + self._timer.cancel() + self.is_running = False \ No newline at end of file diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/__init__.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/__init__.py new file mode 100644 index 000000000000..34913fb394d7 --- /dev/null +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/__init__.py @@ -0,0 +1,4 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/aiohttp_get_test.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/aiohttp_get_test.py new file mode 100644 index 000000000000..4f0d03352618 --- /dev/null +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/aiohttp_get_test.py @@ -0,0 +1,25 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import aiohttp + +from azure_devtools.perfstress_tests import PerfStressTest + + +class AioHttpGetTest(PerfStressTest): + + async def global_setup(self): + type(self).session = aiohttp.ClientSession() + + async def global_cleanup(self): + await type(self).session.close() + + async def run_async(self): + async with type(self).session.get(self.Arguments.url) as response: + await response.text() + + @staticmethod + def add_arguments(parser): + parser.add_argument('-u', '--url', required=True) diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/httpx_get_test.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/httpx_get_test.py new file mode 100644 index 000000000000..08c6214ab6b3 --- /dev/null +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/httpx_get_test.py @@ -0,0 +1,25 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import httpx + +from azure_devtools.perfstress_tests import PerfStressTest + + +class HttpxGetTest(PerfStressTest): + + async def global_setup(self): + type(self).client = httpx.AsyncClient() + + async def global_cleanup(self): + await type(self).client.aclose() + + async def run_async(self): + response = await type(self).client.get(self.Arguments.url) + _ = response.text + + @staticmethod + def add_arguments(parser): + parser.add_argument('-u', '--url', required=True) diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/no_op_test.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/no_op_test.py new file mode 100644 index 000000000000..ff3f696ffa3c --- /dev/null +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/no_op_test.py @@ -0,0 +1,14 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +from azure_devtools.perfstress_tests import PerfStressTest + + +class NoOpTest(PerfStressTest): + def run_sync(self): + pass + + async def run_async(self): + pass diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/requests_get_test.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/requests_get_test.py new file mode 100644 index 000000000000..ee04fa180b90 --- /dev/null +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/requests_get_test.py @@ -0,0 +1,21 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import requests + +from azure_devtools.perfstress_tests import PerfStressTest + + +class RequestsGetTest(PerfStressTest): + + async def global_setup(self): + type(self).session = requests.Session() + + def run_sync(self): + type(self).session.get(self.Arguments.url).text + + @staticmethod + def add_arguments(parser): + parser.add_argument('-u', '--url', required=True) diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/sleep_test.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/sleep_test.py new file mode 100644 index 000000000000..eb92d6b54b65 --- /dev/null +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/sleep_test.py @@ -0,0 +1,25 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import math +import time +import asyncio + +from azure_devtools.perfstress_tests import PerfStressTest + + +# Used for verifying the perf framework correctly computes average throughput across parallel tests of different speed +class SleepTest(PerfStressTest): + instance_count = 0 + + def __init__(self): + type(self).instance_count += 1 + self.seconds_per_operation = math.pow(2, type(self).instance_count) + + def run_sync(self): + time.sleep(self.seconds_per_operation) + + async def run_async(self): + await asyncio.sleep(self.seconds_per_operation) diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/socket_http_get_test.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/socket_http_get_test.py new file mode 100644 index 000000000000..1563c9324501 --- /dev/null +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/socket_http_get_test.py @@ -0,0 +1,33 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import asyncio +from urllib.parse import urlparse + +from azure_devtools.perfstress_tests import PerfStressTest + + +class SocketHttpGetTest(PerfStressTest): + + async def setup(self): + parsed_url = urlparse(self.Arguments.url) + hostname = parsed_url.hostname + port = parsed_url.port + path = parsed_url.path + + message = f'GET {path} HTTP/1.1\r\nHost: {hostname}:{port}\r\n\r\n' + self.message_bytes = message.encode() + self.reader, self.writer = await asyncio.open_connection(parsed_url.hostname, parsed_url.port) + + async def cleanup(self): + self.writer.close() + + async def run_async(self): + self.writer.write(self.message_bytes) + await self.reader.read(200) + + @staticmethod + def add_arguments(parser): + parser.add_argument('-u', '--url', required=True) diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/tornado_get_test.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/tornado_get_test.py new file mode 100644 index 000000000000..adda137f5f3a --- /dev/null +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/tornado_get_test.py @@ -0,0 +1,22 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +from tornado import httpclient + +from azure_devtools.perfstress_tests import PerfStressTest + + +class TornadoGetTest(PerfStressTest): + + async def global_setup(self): + httpclient.AsyncHTTPClient.configure("tornado.curl_httpclient.CurlAsyncHTTPClient") + type(self).client = httpclient.AsyncHTTPClient() + + async def run_async(self): + await type(self).client.fetch(self.Arguments.url) + + @staticmethod + def add_arguments(parser): + parser.add_argument('-u', '--url', required=True)