Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EventHubs] Stress test refactor #20389

Merged
merged 11 commits into from
Sep 22, 2021
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions sdk/eventhub/azure-eventhub/stress/Chart.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
dependencies:
- name: stress-test-addons
repository: https://stresstestcharts.blob.core.windows.net/helm/
version: 0.1.6
digest: sha256:b97697ef5f303eec43e9a94fca8e312d20b8aed71318250499344aeca9880d31
generated: "2021-08-24T11:24:15.375395-07:00"
13 changes: 13 additions & 0 deletions sdk/eventhub/azure-eventhub/stress/Chart.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
apiVersion: v2
name: python-eventhubs-stress-test
description: python event hubs stress test.
version: 0.1.1
appVersion: v0.1
annotations:
stressTest: 'true'
namespace: python-eventhubs-stress-test-ns

dependencies:
- name: stress-test-addons
version: 0.1.6
repository: https://stresstestcharts.blob.core.windows.net/helm/
10 changes: 10 additions & 0 deletions sdk/eventhub/azure-eventhub/stress/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# syntax=docker/dockerfile:1

FROM python:3.8-slim-buster

WORKDIR /app

COPY ./scripts /app/stress/scripts

WORKDIR /app/stress/scripts
RUN pip3 install -r dev_requirement.txt
8 changes: 0 additions & 8 deletions sdk/eventhub/azure-eventhub/stress/dev_requirement.txt

This file was deleted.

5 changes: 5 additions & 0 deletions sdk/eventhub/azure-eventhub/stress/parameters.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentParameters.json#",
"contentVersion": "1.0.0.0",
"parameters": { }
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import os

from opencensus.ext.azure import metrics_exporter
from opencensus.stats import aggregation as aggregation_module
from opencensus.stats import measure as measure_module
Expand All @@ -26,7 +23,7 @@ def __init__(self, test_name, test_description=None):
self.desc = test_description

events_measure_name = "The number of events handled by " + self.name
events_measure_desc = "The number of events handled by " + self.desc if self.desc else None
events_measure_desc = "The number of events handled by" + self.desc if self.desc else None
memory_measure_name = "memory usage percentage for " + self.name
memory_measure_desc = "memory usage percentage for " + self.desc if self.desc else None
cpu_measure_name = "cpu usage percentage for " + self.name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import logging
from collections import defaultdict
from functools import partial
from dotenv import load_dotenv

from azure.identity.aio import ClientSecretCredential
from azure.eventhub.aio import EventHubConsumerClient
Expand All @@ -22,6 +23,9 @@
from process_monitor import ProcessMonitor
from app_insights_metric import AzureMonitorMetric

ENV_FILE = os.environ.get('ENV_FILE')
load_dotenv(dotenv_path=ENV_FILE, override=True)


def parse_starting_position(args):
starting_position = None
Expand All @@ -39,25 +43,26 @@ def parse_starting_position(args):


parser = argparse.ArgumentParser()
parser.add_argument("--link_credit", default=3000, type=int)
parser.add_argument("--output_interval", type=float, default=1000)
parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=30)
parser.add_argument("--consumer_group", help="Consumer group name", default="$default")
parser.add_argument("--link_credit", default=int(os.environ.get("LINK_CREDIT", 3000)), type=int)
parser.add_argument("--output_interval", type=float, default=int(os.environ.get("OUTPUT_INTERVAL", 5000)))
parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=int(os.environ.get("DURATION", 999999999)))
parser.add_argument("--consumer_group", help="Consumer group name", default=os.environ.get("CONSUMER_GROUP", "$default"))
parser.add_argument("--auth_timeout", help="Authorization Timeout", type=float, default=60)
parser.add_argument("--starting_offset", help="Starting offset", type=str)
parser.add_argument("--starting_offset", help="Starting offset", type=str, default=os.environ.get("STARTING_OFFSET", "-1"))
parser.add_argument("--starting_sequence_number", help="Starting sequence number", type=int)
parser.add_argument("--starting_datetime", help="Starting datetime string, should be format of YYYY-mm-dd HH:mm:ss", type=str)
parser.add_argument("--starting_datetime", help="Starting datetime string, should be format of YYYY-mm-dd HH:mm:ss")
parser.add_argument("--partitions", help="Number of partitions. 0 means to get partitions from eventhubs", type=int, default=0)
parser.add_argument("--recv_partition_id", help="Receive from a specific partition if this is set", type=int)
parser.add_argument("--max_batch_size", type=int, default=0,
parser.add_argument("--max_batch_size", type=int, default=int(os.environ.get("MAX_BATCH_SIZE", 0)),
help="Call EventHubConsumerClient.receive_batch() if not 0, otherwise call receive()")
parser.add_argument("--max_wait_time", type=float, default=0,
help="max_wait_time of EventHubConsumerClient.receive_batch() or EventHubConsumerClient.receive()")

parser.add_argument("--track_last_enqueued_event_properties", action="store_true")
parser.add_argument("--load_balancing_interval", help="time duration in seconds between two load balance", type=float, default=10)
parser.add_argument("--conn_str", help="EventHub connection string",
default=os.environ.get('EVENT_HUB_PERF_32_CONN_STR'))
parser.add_argument("--eventhub", help="Name of EventHub")
default=os.environ.get('EVENT_HUB_CONN_STR'))
parser.add_argument("--eventhub", help="Name of EventHub", default=os.environ.get('EVENT_HUB_NAME'))
parser.add_argument("--address", help="Address URI to the EventHub entity")
parser.add_argument("--sas_policy", help="Name of the shared access policy to authenticate with")
parser.add_argument("--sas_key", help="Shared access key")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import logging
from collections import defaultdict
from functools import partial
from dotenv import load_dotenv

from azure.identity import ClientSecretCredential
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
Expand All @@ -20,6 +21,9 @@
from process_monitor import ProcessMonitor
from app_insights_metric import AzureMonitorMetric

ENV_FILE = os.environ.get('ENV_FILE')
load_dotenv(dotenv_path=ENV_FILE, override=True)


def parse_starting_position(args):
starting_position = None
Expand All @@ -37,26 +41,26 @@ def parse_starting_position(args):


parser = argparse.ArgumentParser()
parser.add_argument("--link_credit", default=3000, type=int)
parser.add_argument("--output_interval", type=float, default=1000)
parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=30)
parser.add_argument("--consumer_group", help="Consumer group name", default="$default")
parser.add_argument("--link_credit", default=int(os.environ.get("LINK_CREDIT", 3000)), type=int)
parser.add_argument("--output_interval", type=float, default=int(os.environ.get("OUTPUT_INTERVAL", 5000)))
parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=int(os.environ.get("DURATION", 999999999)))
parser.add_argument("--consumer_group", help="Consumer group name", default=os.environ.get("CONSUMER_GROUP", "$default"))
parser.add_argument("--auth_timeout", help="Authorization Timeout", type=float, default=60)
parser.add_argument("--starting_offset", help="Starting offset", type=str)
parser.add_argument("--starting_offset", help="Starting offset", type=str, default=os.environ.get("STARTING_OFFSET", "-1"))
parser.add_argument("--starting_sequence_number", help="Starting sequence number", type=int)
parser.add_argument("--starting_datetime", help="Starting datetime string, should be format of YYYY-mm-dd HH:mm:ss")
parser.add_argument("--partitions", help="Number of partitions. 0 means to get partitions from eventhubs", type=int, default=0)
parser.add_argument("--recv_partition_id", help="Receive from a specific partition if this is set", type=int)
parser.add_argument("--max_batch_size", type=int, default=0,
parser.add_argument("--max_batch_size", type=int, default=int(os.environ.get("MAX_BATCH_SIZE", 0)),
help="Call EventHubConsumerClient.receive_batch() if not 0, otherwise call receive()")
parser.add_argument("--max_wait_time", type=float, default=0,
help="max_wait_time of EventHubConsumerClient.receive_batch() or EventHubConsumerClient.receive()")

parser.add_argument("--track_last_enqueued_event_properties", action="store_true")
parser.add_argument("--load_balancing_interval", help="time duration in seconds between two load balance", type=float, default=10)
parser.add_argument("--conn_str", help="EventHub connection string",
default=os.environ.get('EVENT_HUB_PERF_32_CONN_STR'))
parser.add_argument("--eventhub", help="Name of EventHub")
default=os.environ.get('EVENT_HUB_CONN_STR'))
parser.add_argument("--eventhub", help="Name of EventHub", default=os.environ.get('EVENT_HUB_NAME'))
parser.add_argument("--address", help="Address URI to the EventHub entity")
parser.add_argument("--sas-policy", help="Name of the shared access policy to authenticate with")
parser.add_argument("--sas-key", help="Shared access key")
Expand All @@ -82,7 +86,14 @@ def parse_starting_position(args):

args = parser.parse_args()
starting_position = parse_starting_position(args)
LOGGER = get_logger(args.log_filename, "stress_receive_sync", level=logging.INFO, print_console=args.print_console)
print_console = args.print_console or (os.environ.get("PRINT_CONSOLE") == "1")

LOGGER = get_logger(
args.log_filename,
"stress_receive_sync",
level=logging.INFO,
print_console=print_console
)
LOG_PER_COUNT = args.output_interval

start_time = time.perf_counter()
Expand All @@ -105,7 +116,6 @@ def on_event_received(process_monitor, partition_context, event):
recv_cnt_map[partition_context.partition_id] += 1 if event else 0
if recv_cnt_map[partition_context.partition_id] % LOG_PER_COUNT == 0:
total_time_elapsed = time.perf_counter() - start_time

partition_previous_time = recv_time_map.get(partition_context.partition_id)
partition_current_time = time.perf_counter()
recv_time_map[partition_context.partition_id] = partition_current_time
Expand Down Expand Up @@ -213,7 +223,11 @@ def create_client(args):

def run(args):

with ProcessMonitor("monitor_{}".format(args.log_filename), "consumer_stress_sync", print_console=args.print_console) as process_monitor:
with ProcessMonitor(
"monitor_{}".format(args.log_filename),
"consumer_stress_sync",
print_console=print_console
) as process_monitor:
kwargs_dict = {
"prefetch": args.link_credit,
"partition_id": str(args.recv_partition_id) if args.recv_partition_id else None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import time
import asyncio
from argparse import ArgumentParser
from dotenv import load_dotenv

from azure.eventhub import EventHubProducerClient, EventData, EventHubSharedKeyCredential, TransportType
from azure.eventhub.exceptions import EventHubError
Expand All @@ -20,6 +21,8 @@
from process_monitor import ProcessMonitor
from app_insights_metric import AzureMonitorMetric

ENV_FILE = os.environ.get('ENV_FILE')


def handle_exception(error, ignore_send_failure, stress_logger, azure_monitor_metric):
err_msg = "Sync send failed due to error: {}".format(repr(error))
Expand Down Expand Up @@ -89,9 +92,9 @@ async def stress_send_list_async(producer: EventHubProducerClientAsync, args, st
class StressTestRunner(object):
def __init__(self, argument_parser):
self.argument_parser = argument_parser
self.argument_parser.add_argument("-m", "--method", required=True)
self.argument_parser.add_argument("--output_interval", type=float, default=1000)
self.argument_parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=30)
self.argument_parser.add_argument("-m", "--method", default="stress_send_list_sync")
self.argument_parser.add_argument("--output_interval", type=float, default=int(os.environ.get("OUTPUT_INTERVAL", 5000)))
self.argument_parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=int(os.environ.get("DURATION", 999999999)))
self.argument_parser.add_argument(
"--partitions",
help="Number of partitions. 0 means to get partitions from eventhubs",
Expand All @@ -109,9 +112,9 @@ def __init__(self, argument_parser):
type=str
)
self.argument_parser.add_argument("--conn_str", help="EventHub connection string",
default=os.environ.get('EVENT_HUB_PERF_32_CONN_STR'))
default=os.environ.get('EVENT_HUB_CONN_STR'))
parser.add_argument("--auth_timeout", help="Authorization Timeout", type=float, default=60)
self.argument_parser.add_argument("--eventhub", help="Name of EventHub")
self.argument_parser.add_argument("--eventhub", help="Name of EventHub", default=os.environ.get('EVENT_HUB_NAME'))
self.argument_parser.add_argument(
"--transport_type",
help="Transport type, 0 means AMQP, 1 means AMQP over WebSocket",
Expand Down Expand Up @@ -412,6 +415,7 @@ async def run_test_method_parallel_async(self, test_method, worker, logger, proc


if __name__ == '__main__':
load_dotenv(dotenv_path=ENV_FILE, override=True)
parser = ArgumentParser()
runner = StressTestRunner(parser)
runner.run()
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
psutil
azure-eventhub
azure-eventhub-checkpointstoreblob
azure-eventhub-checkpointstoreblob-aio
azure-servicebus==0.50.3
azure-storage-blob
azure-identity
opencensus-ext-azure
python-dotenv
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import os
import sys
import logging
from logging.handlers import RotatingFileHandler
Expand Down
31 changes: 31 additions & 0 deletions sdk/eventhub/azure-eventhub/stress/templates/network_loss.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
apiVersion: chaos-mesh.org/v1alpha1
kind: NetworkChaos
metadata:
name: '{{ $.Release.Name }}-{{ $.Release.Revision }}'
namespace: {{ $.Release.Namespace }}
annotations:
experiment.chaos-mesh.org/pause: 'true'
labels:
scenario: 'stress'
release: '{{ $.Release.Name }}'
revision: '{{ $.Release.Revision }}'
spec:
scheduler:
cron: '@every 30s'
duration: '10s'
action: loss
direction: to
externalTargets:
- 'eh-stress-{{ .Release.Name }}-{{ .Release.Revision}}.servicebus.windows.net'
yunhaoling marked this conversation as resolved.
Show resolved Hide resolved
mode: one
selector:
labelSelectors:
testInstance: 'deploy-python-eh-stress-instance'
yunhaoling marked this conversation as resolved.
Show resolved Hide resolved
chaos: 'true'
namespaces:
- {{ $.Release.Namespace }}
podPhaseSelectors:
- 'Running'
loss:
loss: '100'
correlation: '100'
16 changes: 16 additions & 0 deletions sdk/eventhub/azure-eventhub/stress/templates/testjob.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{{- include "stress-test-addons.deploy-job-template.from-pod" (list . "stress.python-eh-stress") -}}
{{- define "stress.python-eh-stress" -}}
metadata:
labels:
testName: "deploy-python-eh-stress"
testInstance: "deploy-python-eh-stress-instance"
chaos: "true"
spec:
containers:
- name: python-eh-stress
image: stresstestregistry.azurecr.io/yuling/py-eh-stress:v2
yunhaoling marked this conversation as resolved.
Show resolved Hide resolved
{{- include "stress-test-addons.container-env" . | nindent 6 }}
command: ['bash', '-c', 'python3 azure_eventhub_producer_stress.py & python3 azure_eventhub_consumer_stress_sync.py']
yunhaoling marked this conversation as resolved.
Show resolved Hide resolved
# async test command
# command: ['bash', '-c', 'python3 azure_eventhub_producer_stress.py -m stress_send_list_async & python3 azure_eventhub_consumer_stress_async.py']
{{- end -}}
Loading