Skip to content

Commit

Permalink
feat: Add send notification task to locust load test file
Browse files Browse the repository at this point in the history
This adds a send notification task to locust. It also updates the register task to handle errors a bit better.

SYNC-3846

Note: I didn't add the unregister function as it seems like we don't need that currently, but it can be added if needed.
  • Loading branch information
b4handjr committed Sep 19, 2023
1 parent 58086e7 commit 453ba8d
Show file tree
Hide file tree
Showing 8 changed files with 824 additions and 560 deletions.
15 changes: 10 additions & 5 deletions tests/load/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ Follow the steps bellow to execute the load tests locally:
Environment variables, listed bellow or specified by [Locust][11], can be set in
`tests\load\docker-compose.yml`.

| Environment Variable | Node(s) | Description |
|----------------------|------------------|---------------------------------|
| SERVER_URL | master & worker | The autopush web socket address |
| ENDPOINT_URL | master & worker | The autopush HTTP address |
| Environment Variable | Node(s) | Description |
|----------------------|-----------------|--------------------------------------|
| SERVER_URL | master & worker | The autopush web socket address |
| ENDPOINT_URL | master & worker | The autopush HTTP address |
| AUTOPUSH_WAIT_TIME | master & worker | The wait time between task execution |

#### 2. Host Locust via Docker

Expand Down Expand Up @@ -173,7 +174,11 @@ the load test will stop automatically.

**Exceptions**

* The number of exceptions raised by the test framework should be `0`
* Exceptions indicate errors that occur during Locust's execution of the load tests and
should be minimal.
* The following exceptions are known to happen, but make sure their occurrence isn't
trending positively:
* ZeroStatusRequestError
* Locust reports Exceptions via the "autopush_exceptions.csv" file and the UI
(under the "Exceptions" tab)

Expand Down
24 changes: 24 additions & 0 deletions tests/load/locustfiles/args.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

from locust import between, constant


def parse_wait_time(val: str):
"""Parse a wait_time
Either a single numeric (for `constant`) or two separated by a comma (arguments to `between`)
"""
match val.count(","):
case 0:
return constant(float_or_int(val))
case 1:
return between(*map(float_or_int, val.split(",", 1)))
case _:
raise ValueError("Invalid wait_time")


def float_or_int(val: str):
float_val: float = float(val)
return int(float_val) if float_val.is_integer() else float_val
14 changes: 14 additions & 0 deletions tests/load/locustfiles/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.


class ZeroStatusRequestError(Exception):
"""Custom exception for when a Locust request fails with a '0' status code."""

def __init__(self):
error_message: str = (
"A connection, timeout or similar error happened while sending a request "
"from Locust. Status Code: 0"
)
super().__init__(error_message)
4 changes: 3 additions & 1 deletion tests/load/locustfiles/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ class AutopushLoadTestShape(LoadTestShape):
"""

MAX_RUN_TIME: int = 600 # 10 minutes
MAX_USERS: int = 83300 # Calculation: 700 users * 119 locust workers (defined in setup_k8s.sh)
WORKER_COUNT: int = 119 # Must match value defined in setup_k8s.sh
USERS_PER_WORKER: int = 350 # Number of users supported on a worker running on a n1-standard-4
MAX_USERS: int = WORKER_COUNT * USERS_PER_WORKER
trend: QuadraticTrend
user_classes: list[Type[User]] = [AutopushUser]

Expand Down
197 changes: 154 additions & 43 deletions tests/load/locustfiles/locustfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,19 @@

"""Performance test module."""

import base64
import json
import random
import string
import time
import uuid
from typing import Any

from locust import FastHttpUser, between, events, task
from locust.exception import StopUser
from models import HelloMessage, RegisterMessage
from args import parse_wait_time
from exceptions import ZeroStatusRequestError
from locust import FastHttpUser, events, task
from models import HelloMessage, NotificationMessage, RegisterMessage
from pydantic import ValidationError
from websocket import create_connection


Expand All @@ -31,6 +36,18 @@ def _(parser: Any):
required=True,
help="Endpoint URL",
)
parser.add_argument(
"--wait_time",
type=str,
env_var="AUTOPUSH_WAIT_TIME",
help="AutopushUser wait time between tasks",
default="20, 25",
)


@events.test_start.add_listener
def _(environment, **kwargs):
environment.autopush_wait_time = parse_wait_time(environment.parsed_options.wait_time)


class TimeEvent:
Expand All @@ -47,14 +64,21 @@ def __enter__(self) -> object:
def __exit__(self, *args) -> None:
end_time: float = time.perf_counter()
exception: Any = None

if args[0] is not None:
exception = args[0], args[1]
exception_type = args[0]
exception_value = args[1]
traceback = args[2]

if not isinstance(exception_value, (AssertionError, ValidationError)):
# An unexpected exception occurred stop the user.
self.user.stop()
return None

# Assertion and Validation errors are expected exceptional outcomes should
# a message received from Autopush be invalid.
exception = exception_type, exception_value, traceback

if not isinstance(args[1], (AssertionError, type(None))):
self.user.environment.events.user_error.fire(
user_instance=self.user.context(), exception=args[1], tb=args[2]
)
exception = None
self.user.environment.events.request.fire(
request_type="WSS",
name=self.name,
Expand All @@ -66,24 +90,38 @@ def __exit__(self, *args) -> None:


class AutopushUser(FastHttpUser):
wait_time = between(30, 35)

def __init__(self, environment) -> None:
super().__init__(environment)
self.uaid: str = ""
self.channels: dict[str, str] = {}
self.ws: Any = None
self.headers = {"TTL": "60", "Content-Encoding": "aes128gcm", "Topic": "aaaa"}
self.encrypted_data = base64.urlsafe_b64decode(
"TestData"
+ "".join(
[
random.choice(string.ascii_letters + string.digits)
for i in range(0, random.randrange(1024, 4096, 2) - 8)
]
)
+ "=="
)

def wait_time(self):
return self.environment.autopush_wait_time(self)

def on_start(self) -> Any:
self.connect()
if not self.ws:
raise StopUser()
self.stop()

self.hello()
if not self.uaid:
raise StopUser()
self.stop()

self.register()
if not self.channels:
raise StopUser()
self.stop()

def on_stop(self) -> Any:
if self.ws:
Expand All @@ -96,54 +134,127 @@ def connect(self) -> None:
self.ws = create_connection(
self.environment.parsed_options.websocket_url,
header={"Origin": "http://localhost:1337"},
ssl=False,
timeout=30, # timeout defaults to None
)

def disconnect(self) -> None:
self.ws.close()
if self.ws:
self.ws.close()
self.ws = None

def hello(self) -> None:
"""
Send a 'hello' message to Autopush.
"""Send a 'hello' message to Autopush.
Connections must say hello after connecting to the server, otherwise the connection is
quickly dropped.
Raises:
AssertionError: If the user fails to send the hello
AssertionError: If the hello message response is empty or has an invalid status
ValidationError: If the hello message schema is not as expected
"""
with self._time_event(name="hello") as timer:
body = json.dumps(dict(messageType="hello", use_webpush=True))
body: str = json.dumps(
dict(
messageType="hello",
use_webpush=True,
uaid=self.uaid,
channelIDs=list(self.channels.keys()),
)
)
with self._time_event(name="hello - send") as timer:
self.ws.send(body)
reply = self.ws.recv()
assert reply, "No 'hello' response"
res: HelloMessage = HelloMessage(**json.loads(reply))
assert res.status == 200, f"Unexpected status. Expected: 200 Actual: {res.status}"
timer.response_length = len(reply.encode("utf-8"))
self.uaid = res.uaid

def register(self) -> None:
with self._time_event(name="hello - recv") as timer:
response: str = self.ws.recv()
assert response, "No 'hello' response"
message: HelloMessage = HelloMessage(**json.loads(response))
timer.response_length = len(response.encode("utf-8"))

if not self.uaid:
self.uaid = message.uaid

def ack(self) -> None:
"""Send an 'ack' message to push.
After sending a notification, the client must also send an 'ack' to the server to
confirm receipt. If there is a pending notification, this will try and receive it
before sending an acknowledgement.
Raises:
AssertionError: If the notification message response is empty
ValidationError: If the notification message schema is not as expected
"""
Send a 'register' message to Autopush. Subscribes to an Autopush channel.
with self._time_event(name="notification - recv") as timer:
response = self.ws.recv()
assert response, "No 'notification' response"
message: NotificationMessage = NotificationMessage(**json.loads(response))
timer.response_length = len(response.encode("utf-8"))

body: str = json.dumps(
dict(
messageType="ack",
updates=[dict(channelID=message.channelID, version=message.version)],
)
)
with self._time_event(name="ack - send"):
self.ws.send(body)

def post_notification(self, endpoint_url) -> bool:
"""Send a notification to Autopush.
Args:
endpoint_url: A channel destination endpoint url
Returns:
bool: Flag indicating if the post notification request was successful
Raises:
AssertionError: If the user fails to register a channel
ZeroStatusRequestError: In the event that Locust experiences a network issue while
sending a notification.
"""
with self.client.post(
url=endpoint_url,
name="notification",
data=self.encrypted_data,
headers=self.headers,
catch_response=True,
) as response:
if response.status_code == 0:
raise ZeroStatusRequestError()
if response.status_code != 201:
response.failure(f"{response.status_code=}, expected 201, {response.text=}")
return False
return True

def register(self) -> None:
"""Send a 'register' message to Autopush. Subscribes to an Autopush channel.
Raises:
AssertionError: If the register message response is empty or has an invalid status or
channel ID.
ValidationError: If the register message schema is not as expected
"""

chid: str = str(uuid.uuid4())
body = json.dumps(dict(messageType="register", channelID=chid))

with self._time_event(name="register") as timer:
body = json.dumps(dict(messageType="register", channelID=chid))
with self._time_event(name="register - send"):
self.ws.send(body)
reply = self.ws.recv()
assert reply, f"No 'register' response CHID: {chid}"
res: RegisterMessage = RegisterMessage(**json.loads(reply))
assert res.status == 200, f"Unexpected status. Expected: 200 Actual: {res.status}"
assert res.channelID == chid, f"Channel ID did not match, received {res.channelID}"
timer.response_length = len(reply.encode("utf-8"))
self.channels[chid] = res.pushEndpoint

@task
def do_nothing(self) -> None:
pass

with self._time_event(name="register - recv") as timer:
response: str = self.ws.recv()
assert response, "No 'notification' response"
message: RegisterMessage = RegisterMessage(**json.loads(response))
assert (
message.channelID == chid
), f"Channel ID Error. Expected: {chid} Actual: {message.channelID}"
timer.response_length = len(response.encode("utf-8"))

self.channels[chid] = message.pushEndpoint

@task(weight=95)
def send_direct_notification(self):
"""Sends a notification to a registered endpoint while connected to Autopush."""
endpoint_url = self.channels[random.choice(list(self.channels.keys()))]

if not self.post_notification(endpoint_url):
self.stop()

self.ack()
14 changes: 12 additions & 2 deletions tests/load/locustfiles/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class HelloMessage(BaseModel):

messageType: Literal["hello"]
uaid: str
status: int
status: Literal[200]
use_webpush: bool
broadcasts: dict[str, Any]

Expand All @@ -24,5 +24,15 @@ class RegisterMessage(BaseModel):

messageType: Literal["register"]
channelID: str
status: int
status: Literal[200]
pushEndpoint: str


class NotificationMessage(BaseModel):
"""Autopush 'ack' response message."""

data: str
headers: dict[str, str]
messageType: Literal["notification"]
channelID: str
version: str
Loading

0 comments on commit 453ba8d

Please sign in to comment.