Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Perfstress][Storage] Added Datalake perf tests #15861

Merged
merged 7 commits into from
Mar 3, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions eng/.docsettings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ omitted_paths:
- doc/*
- sdk/**/samples/*
- sdk/identity/azure-identity/tests/*
- sdk/**/tests/perfstress_tests/*

language: python
root_check_enabled: True
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# DataLake Performance Tests

In order to run the performance tests, the `azure-devtools` package must be installed. This is done as part of the `dev_requirements`.
Start be creating a new virtual environment for your perf tests. This will need to be a Python 3 environment, preferably >=3.7.
Note that there are no T1 tests for this project.

### Setup for T2 perf test runs

```cmd
(env) ~/azure-storage-file-datalake> pip install -r dev_requirements.txt
(env) ~/azure-storage-file-datalake> pip install -e .
```

## Test commands

When `azure-devtools` is installed, you will have access to the `perfstress` command line tool, which will scan the current module for runable perf tests. Only a specific test can be run at a time (i.e. there is no "run all" feature).

```cmd
(env) ~/azure-storage-file-datalake> cd tests
(env) ~/azure-storage-file-datalake/tests> perfstress
```
Using the `perfstress` command alone will list the available perf tests found.

### Common perf command line options
These options are available for all perf tests:
- `--duration=10` Number of seconds to run as many operations (the "run" function) as possible. Default is 10.
- `--iterations=1` Number of test iterations to run. Default is 1.
- `--parallel=1` Number of tests to run in parallel. Default is 1.
- `--no-client-share` Whether each parallel test instance should share a single client, or use their own. Default is False (sharing).
- `--warm-up=5` Number of seconds to spend warming up the connection before measuing begins. Default is 5.
- `--sync` Whether to run the tests in sync or async. Default is False (async).
- `--no-cleanup` Whether to keep newly created resources after test run. Default is False (resources will be deleted).

### Common DataLake command line options
The options are available for all SB perf tests:
- `--size=10240` Size in bytes of data to be transferred in upload or download tests. Default is 10240.
- `--max-concurrency=1` Number of threads to concurrently upload/download a single operation using the SDK API parameter. Default is 1.

### T2 Tests
The tests currently written for the T2 SDK:
- `UploadTest` Uploads a stream of `size` bytes to a new File.
- `UploadFromFileTest` Uploads a local file of `size` bytes to a new File.
- `DownloadTest` Download a stream of `size` bytes.
- `AppendTest` Append `size` bytes to an existing file.

## Example command
```cmd
(env) ~/azure-storage-file-datalake/tests> perfstress UploadTest --parallel=2 --size=10240
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import os
import uuid

from azure_devtools.perfstress_tests import PerfStressTest

from azure.core.exceptions import ResourceNotFoundError
from azure.storage.filedatalake import DataLakeServiceClient as SyncDataLakeServiceClient
from azure.storage.filedatalake.aio import DataLakeServiceClient as AsyncDataLakeServiceClient


class _ServiceTest(PerfStressTest):
service_client = None
async_service_client = None

def __init__(self, arguments):
super().__init__(arguments)
connection_string = self.get_from_env("AZURE_STORAGE_CONNECTION_STRING")
if not _ServiceTest.service_client or self.args.no_client_share:
_ServiceTest.service_client = SyncDataLakeServiceClient.from_connection_string(conn_str=connection_string)
_ServiceTest.async_service_client = AsyncDataLakeServiceClient.from_connection_string(conn_str=connection_string)
self.service_client = _ServiceTest.service_client
self.async_service_client =_ServiceTest.async_service_client

async def close(self):
await self.async_service_client.close()
await super().close()

@staticmethod
def add_arguments(parser):
super(_ServiceTest, _ServiceTest).add_arguments(parser)
parser.add_argument('-c', '--max-concurrency', nargs='?', type=int, help='Maximum number of concurrent threads used for data transfer. Defaults to 1', default=1)
parser.add_argument('-s', '--size', nargs='?', type=int, help='Size of data to transfer. Default is 10240.', default=10240)
parser.add_argument('--no-client-share', action='store_true', help='Create one ServiceClient per test instance. Default is to share a single ServiceClient.', default=False)


class _FileSystemTest(_ServiceTest):
fs_name = "perfstress-" + str(uuid.uuid4())

def __init__(self, arguments):
super().__init__(arguments)
self.fs_client = self.service_client.get_file_system_client(self.fs_name)
self.async_fs_client = self.async_service_client.get_file_system_client(self.fs_name)

async def global_setup(self):
await super().global_setup()
await self.async_fs_client.create_file_system()

async def global_cleanup(self):
await self.async_fs_client.delete_file_system()
await super().global_cleanup()

async def close(self):
await self.async_fs_client.close()
await super().close()


class _FileTest(_FileSystemTest):
def __init__(self, arguments):
super().__init__(arguments)
file_name = "sharefiletest-" + str(uuid.uuid4())
self.file_client = self.fs_client.get_file_client(file_name)
self.async_file_client = self.async_fs_client.get_file_client(file_name)

async def close(self):
await self.async_file_client.close()
await super().close()
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import uuid

from azure_devtools.perfstress_tests import RandomStream, AsyncRandomStream

from ._test_base import _FileSystemTest


class AppendTest(_FileSystemTest):
def __init__(self, arguments):
super().__init__(arguments)
file_name = "filetest-" + str(uuid.uuid4())
mikeharder marked this conversation as resolved.
Show resolved Hide resolved
self.file_client = self.fs_client.get_file_client(file_name)
self.async_file_client = self.async_fs_client.get_file_client(file_name)
self.upload_stream = RandomStream(self.args.size)
self.upload_stream_async = AsyncRandomStream(self.args.size)

async def setup(self):
await self.async_file_client.create_file()

def run_sync(self):
self.upload_stream.reset()
self.file_client.append_data(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can appending to the same file over and over cause issues with service perf or correctness (e.g. is there a max file size)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will check in with the Storage team to see if they can shed light on that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be no problems. When you keep re-appending, you would be rewriting the appended data (i.e the last append wins - assuming same length and offset). As for the maximum size, its 4.75 TiB (100 MiB X 50,000 blocks) for version 2016-05-31 and later, and 195 GiB (4 MiB X 50,000 blocks) for all older versions (since a datalake file is a block blob).

self.upload_stream,
length=self.args.size,
offset=0)

async def run_async(self):
self.upload_stream_async.reset()
await self.async_file_client.append_data(
self.upload_stream_async,
length=self.args.size,
offset=0)
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

from azure_devtools.perfstress_tests import get_random_bytes, WriteStream

from ._test_base import _FileSystemTest


class DownloadTest(_FileSystemTest):
annatisch marked this conversation as resolved.
Show resolved Hide resolved
def __init__(self, arguments):
super().__init__(arguments)
file_name = "downloadtest"
self.file_client = self.fs_client.get_file_client(file_name)
self.async_file_client = self.async_fs_client.get_file_client(file_name)
self.download_stream = WriteStream()

async def global_setup(self):
await super().global_setup()
data = get_random_bytes(self.args.size)
await self.async_file_client.create_file()
await self.async_file_client.upload_data(data, overwrite=True)

def run_sync(self):
self.download_stream.reset()
stream = self.file_client.download_file(max_concurrency=self.args.max_concurrency)
stream.readinto(self.download_stream)

async def run_async(self):
self.download_stream.reset()
stream = await self.async_file_client.download_file(max_concurrency=self.args.max_concurrency)
await stream.readinto(self.download_stream)

async def close(self):
await self.async_file_client.close()
await super().close()
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

from ._test_base import _FileTest

from azure_devtools.perfstress_tests import RandomStream
from azure_devtools.perfstress_tests import AsyncRandomStream


class UploadTest(_FileTest):
mikeharder marked this conversation as resolved.
Show resolved Hide resolved
def __init__(self, arguments):
super().__init__(arguments)
self.upload_stream = RandomStream(self.args.size)
self.upload_stream_async = AsyncRandomStream(self.args.size)

def run_sync(self):
self.upload_stream.reset()
self.file_client.upload_data(
self.upload_stream,
length=self.args.size,
overwrite=True,
max_concurrency=self.args.max_concurrency)

async def run_async(self):
self.upload_stream_async.reset()
await self.async_file_client.upload_data(
self.upload_stream_async,
length=self.args.size,
overwrite=True,
max_concurrency=self.args.max_concurrency)
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import os
import tempfile

from azure_devtools.perfstress_tests import get_random_bytes

from ._test_base import _FileTest


class UploadFromFileTest(_FileTest):
temp_file = None

async def global_setup(self):
await super().global_setup()
data = get_random_bytes(self.args.size)
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
UploadFromFileTest.temp_file = temp_file.name
temp_file.write(data)

async def global_cleanup(self):
os.remove(UploadFromFileTest.temp_file)
await super().global_cleanup()

def run_sync(self):
with open(UploadFromFileTest.temp_file, 'rb') as fp:
mikeharder marked this conversation as resolved.
Show resolved Hide resolved
self.file_client.upload_data(
fp,
overwrite=True,
max_concurrency=self.args.max_concurrency)

async def run_async(self):
with open(UploadFromFileTest.temp_file, 'rb') as fp:
await self.async_file_client.upload_data(
fp,
overwrite=True,
max_concurrency=self.args.max_concurrency)
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@

from .perf_stress_runner import PerfStressRunner
from .perf_stress_test import PerfStressTest
from .random_stream import RandomStream, get_random_bytes
from .random_stream import RandomStream, WriteStream, get_random_bytes
from .async_random_stream import AsyncRandomStream

__all__ = [
"PerfStressRunner",
"PerfStressTest",
"RandomStream",
"WriteStream",
"AsyncRandomStream",
"get_random_bytes"
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,51 @@

from io import BytesIO

from .random_stream import get_random_bytes
from .random_stream import get_random_bytes, _DEFAULT_LENGTH


class AsyncRandomStream(BytesIO):
def __init__(self, length, initial_buffer_length=1024 * 1024):
def __init__(self, length, initial_buffer_length=_DEFAULT_LENGTH):
super().__init__()
self._base_data = get_random_bytes(initial_buffer_length)
self._base_data_length = initial_buffer_length
self._data_length = length
self._base_buffer_length = initial_buffer_length
self._position = 0
self._remaining = length
self._closed = False

def reset(self):
self._position = 0
self._remaining = self._data_length
self._closed = False

def read(self, size=None):
if self._remaining == 0:
return b""

if size is None:
e = self._base_data_length
e = self._base_buffer_length
else:
e = size
e = min(e, self._remaining)
if e > self._base_data_length:
if e > self._base_buffer_length:
self._base_data = get_random_bytes(e)
self._base_data_length = e
self._base_buffer_length = e
self._remaining = self._remaining - e
self._position += e
return self._base_data[:e]

def seek(self, index, whence=0):
if whence == 0:
self._position = index
elif whence == 1:
self._position = self._position + index
elif whence == 2:
self._position = self._length - 1 + index

def tell(self):
return self._position

def remaining(self):
return self._remaining

Expand Down
Loading