Skip to content

Commit

Permalink
[Perfstress] Added framework (#15510)
Browse files Browse the repository at this point in the history
* Migrated perf framework

* Updated framework

* Added T2 File tests

* Added legacy tests

* Fixed headers and imports

* Fix legacy test setup

* Python naming refactor

* Renamed directory

* Separated T1 tests

* Added env var utility

* Fixed concurrency

* Use random bytes for data

* Cleaned up some args

* Added system perf tests

* Support system test dependencies

* Optional client sharing

* Removed fileshare tests

* Fixed system tests
  • Loading branch information
annatisch committed Dec 4, 2020
1 parent 71ebc5f commit 96ad9af
Show file tree
Hide file tree
Showing 15 changed files with 590 additions and 0 deletions.
14 changes: 14 additions & 0 deletions tools/azure-devtools/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,27 @@
packages=[
'azure_devtools',
'azure_devtools.scenario_tests',
'azure_devtools.perfstress_tests',
'azure_devtools.ci_tools',
],
entry_points={
'console_scripts': [
'perfstress = azure_devtools.perfstress_tests:run_perfstress_cmd',
'systemperf = azure_devtools.perfstress_tests:run_system_perfstress_tests_cmd',
],
},
extras_require={
'ci_tools':[
"PyGithub>=1.40", # Can Merge PR after 1.36, "requests" and tests after 1.40
"GitPython",
"requests>=2.0"
],
'systemperf':[
"aiohttp>=3.0",
"requests>=2.0",
"tornado==6.0.3"
"pycurl==7.43.0.5"
"httpx==0.11.1"
]
},
package_dir={'': 'src'},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import os
import asyncio

from .perf_stress_runner import PerfStressRunner
from .perf_stress_test import PerfStressTest
from .random_stream import RandomStream, get_random_bytes
from .async_random_stream import AsyncRandomStream

__all__ = [
"PerfStressRunner",
"PerfStressTest",
"RandomStream",
"AsyncRandomStream",
"get_random_bytes"
]


def run_perfstress_cmd():
main_loop = PerfStressRunner()
loop = asyncio.get_event_loop()
loop.run_until_complete(main_loop.start())


def run_system_perfstress_tests_cmd():
root_dir = os.path.dirname(os.path.abspath(__file__))
sys_test_dir = os.path.join(root_dir, 'system_perfstress')
main_loop = PerfStressRunner(test_folder_path=sys_test_dir)
loop = asyncio.get_event_loop()
loop.run_until_complete(main_loop.start())
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

from io import BytesIO

from .random_stream import get_random_bytes


class AsyncRandomStream(BytesIO):
def __init__(self, length, initial_buffer_length=1024 * 1024):
super().__init__()
self._base_data = get_random_bytes(initial_buffer_length)
self._base_data_length = initial_buffer_length
self._position = 0
self._remaining = length
self._closed = False

def read(self, size=None):
if self._remaining == 0:
return b""

if size is None:
e = self._base_data_length
else:
e = size
e = min(e, self._remaining)
if e > self._base_data_length:
self._base_data = get_random_bytes(e)
self._base_data_length = e
self._remaining = self._remaining - e
return self._base_data[:e]

def remaining(self):
return self._remaining

def close(self):
self._closed = True
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import argparse
import asyncio
import time
import inspect
import logging
import os
import pkgutil
import sys
import threading

from .perf_stress_test import PerfStressTest
from .repeated_timer import RepeatedTimer


class PerfStressRunner:
def __init__(self, test_folder_path=None):
if test_folder_path is None:
# Use current working directory
test_folder_path = os.getcwd()

self.logger = logging.getLogger(__name__)
self.logger.setLevel(level=logging.INFO)
handler = logging.StreamHandler()
handler.setLevel(level=logging.INFO)
self.logger.addHandler(handler)

#NOTE: If you need to support registering multiple test locations, move this into Initialize, call lazily on Run, expose RegisterTestLocation function.
self._discover_tests(test_folder_path)
self._parse_args()


def _parse_args(self):
# First, detect which test we're running.
arg_parser = argparse.ArgumentParser(
description='Python Perf Test Runner',
usage='{} <TEST> [<args>]'.format(__file__))

# NOTE: remove this and add another help string to query for available tests
# if/when # of classes become enough that this isn't practical.
arg_parser.add_argument('test', help='Which test to run. Supported tests: {}'.format(" ".join(sorted(self._test_classes.keys()))))

args = arg_parser.parse_args(sys.argv[1:2])
try:
self._test_class_to_run = self._test_classes[args.test]
except KeyError as e:
self.logger.error("Invalid test: {}\n Test must be one of: {}\n".format(args.test, " ".join(sorted(self._test_classes.keys()))))
raise

# Next, parse args for that test. We also do global args here too so as not to confuse the initial test parse.
per_test_arg_parser = argparse.ArgumentParser(
description=self._test_class_to_run.__doc__ or args.test,
usage='{} {} [<args>]'.format(__file__, args.test))

# Global args
per_test_arg_parser.add_argument('-p', '--parallel', nargs='?', type=int, help='Degree of parallelism to run with. Default is 1.', default=1)
per_test_arg_parser.add_argument('-d', '--duration', nargs='?', type=int, help='Duration of the test in seconds. Default is 10.', default=10)
per_test_arg_parser.add_argument('-i', '--iterations', nargs='?', type=int, help='Number of iterations in the main test loop. Default is 1.', default=1)
per_test_arg_parser.add_argument('-w', '--warmup', nargs='?', type=int, help='Duration of warmup in seconds. Default is 5.', default=5)
per_test_arg_parser.add_argument('--no-cleanup', action='store_true', help='Do not run cleanup logic. Default is false.', default=False)
per_test_arg_parser.add_argument('--sync', action='store_true', help='Run tests in sync mode. Default is False.', default=False)

# Per-test args
self._test_class_to_run.add_arguments(per_test_arg_parser)
self.per_test_args = per_test_arg_parser.parse_args(sys.argv[2:])

self.logger.info("")
self.logger.info("=== Options ===")
self.logger.info(args)
self.logger.info(self.per_test_args)
self.logger.info("")


def _discover_tests(self, test_folder_path):
self._test_classes = {}

# Dynamically enumerate all python modules under the tests path for classes that implement PerfStressTest
for loader, name, _ in pkgutil.walk_packages([test_folder_path]):
try:
module = loader.find_module(name).load_module(name)
except Exception as e:
self.logger.warn("Unable to load module {}: {}".format(name, e))
continue
for name, value in inspect.getmembers(module):

if name.startswith('_'):
continue
if inspect.isclass(value) and issubclass(value, PerfStressTest) and value != PerfStressTest:
self.logger.info("Loaded test class: {}".format(name))
self._test_classes[name] = value


async def start(self):
self.logger.info("=== Setup ===")

tests = []
for _ in range(0, self.per_test_args.parallel):
tests.append(self._test_class_to_run(self.per_test_args))

try:
try:
await tests[0].global_setup()
try:
await asyncio.gather(*[test.setup() for test in tests])

self.logger.info("")

if self.per_test_args.warmup > 0:
await self._run_tests(tests, self.per_test_args.warmup, "Warmup")

for i in range(0, self.per_test_args.iterations):
title = "Test"
if self.per_test_args.iterations > 1:
title += " " + (i + 1)
await self._run_tests(tests, self.per_test_args.duration, title)
except Exception as e:
print("Exception: " + str(e))
finally:
if not self.per_test_args.no_cleanup:
self.logger.info("=== Cleanup ===")
await asyncio.gather(*[test.cleanup() for test in tests])
except Exception as e:
print("Exception: " + str(e))
finally:
if not self.per_test_args.no_cleanup:
await tests[0].global_cleanup()
except Exception as e:
print("Exception: " + str(e))
finally:
await asyncio.gather(*[test.close() for test in tests])


async def _run_tests(self, tests, duration, title):
self._completed_operations = [0] * len(tests)
self._last_completion_times = [0] * len(tests)
self._last_total_operations = -1

status_thread = RepeatedTimer(1, self._print_status, title)

if self.per_test_args.sync:
threads = []
for id, test in enumerate(tests):
thread = threading.Thread(target=lambda: self._run_sync_loop(test, duration, id))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
else:
await asyncio.gather(*[self._run_async_loop(test, duration, id) for id, test in enumerate(tests)])

status_thread.stop()

self.logger.info("")
self.logger.info("=== Results ===")

total_operations = sum(self._completed_operations)
operations_per_second = sum(map(
lambda x: x[0] / x[1],
zip(self._completed_operations, self._last_completion_times)))
seconds_per_operation = 1 / operations_per_second
weighted_average_seconds = total_operations / operations_per_second

self.logger.info("Completed {} operations in a weighted-average of {:.2f}s ({:.2f} ops/s, {:.3f} s/op)".format(
total_operations, weighted_average_seconds, operations_per_second, seconds_per_operation))
self.logger.info("")


def _run_sync_loop(self, test, duration, id):
start = time.time()
runtime = 0
while runtime < duration:
test.run_sync()
runtime = time.time() - start
self._completed_operations[id] += 1
self._last_completion_times[id] = runtime


async def _run_async_loop(self, test, duration, id):
start = time.time()
runtime = 0
while runtime < duration:
await test.run_async()
runtime = time.time() - start
self._completed_operations[id] += 1
self._last_completion_times[id] = runtime


def _print_status(self, title):
if self._last_total_operations == -1:
self._last_total_operations = 0
self.logger.info("=== {} ===\nCurrent\t\tTotal".format(title))

total_operations = sum(self._completed_operations)
self.logger.info("{}\t\t{}".format(total_operations - self._last_total_operations, total_operations))
self._last_total_operations = total_operations
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import os


class PerfStressTest:
'''Base class for implementing a python perf test.
- run_sync and run_async must be implemented.
- global_setup and global_cleanup are optional and run once, ever, regardless of parallelism.
- setup and cleanup are run once per test instance (where each instance runs in its own thread/process), regardless of #iterations.
- close is run once per test instance, after cleanup and global_cleanup.
- run_sync/run_async are run once per iteration.
'''
args = {}

def __init__(self, arguments):
self.args = arguments

async def global_setup(self):
return

async def global_cleanup(self):
return

async def setup(self):
return

async def cleanup(self):
return

async def close(self):
return

def __enter__(self):
return

def __exit__(self, exc_type, exc_value, traceback):
return

def run_sync(self):
raise Exception('run_sync must be implemented for {}'.format(self.__class__.__name__))

async def run_async(self):
raise Exception('run_async must be implemented for {}'.format(self.__class__.__name__))

@staticmethod
def add_arguments(parser):
"""
Override this method to add test-specific argparser args to the class.
These are accessible in __init__() and the self.args property.
"""
return

@staticmethod
def get_from_env(variable):
value = os.environ.get(variable)
if not value:
raise Exception("Undefined environment variable {}".format(variable))
return value
Loading

0 comments on commit 96ad9af

Please sign in to comment.