Skip to content

Commit

Permalink
Merge pull request redpanda-data#5141 from bharathv/omb
Browse files Browse the repository at this point in the history
perf/tests: Define an OMB perf run template
  • Loading branch information
bharathv committed Jul 8, 2022
2 parents 33d149c + 9255930 commit acb4e2e
Show file tree
Hide file tree
Showing 9 changed files with 371 additions and 107 deletions.
2 changes: 1 addition & 1 deletion tests/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ RUN python3 -m pip install --upgrade --force pip && \

# Install the OMB tool
RUN git -C /opt clone https://github.com/redpanda-data/openmessaging-benchmark.git && \
cd /opt/openmessaging-benchmark && git reset --hard cde313756faa1ebd0114a35d6fc3b09198559738 && mvn package
cd /opt/openmessaging-benchmark && git reset --hard 3f664931cb3f879ce11bf823d5bfdf91b07d989e && mvn package

# Compile and install rust-based workload generator.
# Install & remove compiler in the same step, to avoid bulking
Expand Down
50 changes: 50 additions & 0 deletions tests/rptest/perf/openmessaging_perf_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright 2022 Vectorized, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

import statistics

from rptest.tests.redpanda_test import RedpandaTest
from rptest.services.cluster import cluster
from rptest.services.openmessaging_benchmark import OpenMessagingBenchmark
from ducktape.mark import parametrize


class RedPandaOpenMessagingBenchmarkPerf(RedpandaTest):
# We need to be a little generous with the wait time here as the benchmark
# run often takes longer than the configured time. This is often the case
# when consumers are catching up with the produced data, when
# consumerBacklogSizeGB is configured (check OMB documentation for details).
BENCHMARK_WAIT_TIME_MIN = 30

def __init__(self, ctx):
self._ctx = ctx
super(OpenBenchmarkTest, self).__init__(test_context=ctx,
num_brokers=3)

@cluster(num_nodes=9)
@parametrize(driver_idx="ACK_ALL_GROUP_LINGER_1MS_IDEM_MAX_IN_FLIGHT",
workload_idx="TOPIC1_PART100_1KB_4PROD_1250K_RATE")
def test_perf_with_idempotence_and_max_in_flight(self, driver_idx,
workload_idx):
"""
This test runs ACK_ALL_GROUP_LINGER_1MS_IDEM_MAX_IN_FLIGHT driver with
TOPIC1_PART100_1KB_4PROD_1250K_RATE workload. Has a runtime of ~1hr.
"""

# Make sure this is running in a dedicated environment as the perf run
# is long running and requires significant resources.
assert self.redpanda.dedicated_nodes

benchmark = OpenMessagingBenchmark(self._ctx, self.redpanda,
driver_idx, workload_idx)
benchmark.start()
benchmark_time_min = benchmark.benchmark_time(
) + OpenBenchmarkTest.BENCHMARK_WAIT_TIME_MIN
benchmark.wait(timeout_sec=benchmark_time_min * 60)
benchmark.check_succeed()
41 changes: 6 additions & 35 deletions tests/rptest/scale_tests/openmessaging_benchmark_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from rptest.tests.redpanda_test import RedpandaTest
from rptest.services.cluster import cluster
from rptest.services.openmessaging_benchmark import OpenMessagingBenchmark
from ducktape.mark import matrix


class OpenBenchmarkTest(RedpandaTest):
Expand All @@ -23,41 +24,11 @@ def __init__(self, ctx):
num_brokers=3)

@cluster(num_nodes=6)
def test_default_omb_configuration(self):
benchmark = OpenMessagingBenchmark(self._ctx, self.redpanda)
benchmark.start()
benchmark_time_min = benchmark.benchmark_time(
) + OpenBenchmarkTest.BENCHMARK_WAIT_TIME_MIN
benchmark.wait(timeout_sec=benchmark_time_min * 60)
benchmark.check_succeed()

@cluster(num_nodes=8)
def test_multiple_topics_omb(self):
benchmark = OpenMessagingBenchmark(self._ctx, self.redpanda)
benchmark.set_configuration({"topics": 2})
benchmark.start()
benchmark_time_min = benchmark.benchmark_time(
) + OpenBenchmarkTest.BENCHMARK_WAIT_TIME_MIN
benchmark.wait(timeout_sec=benchmark_time_min * 60)
benchmark.check_succeed()

@cluster(num_nodes=8)
def test_multiple_producers_consumers_omb(self):
benchmark = OpenMessagingBenchmark(self._ctx, self.redpanda)
benchmark.set_configuration({
"producers_per_topic": 2,
"consumer_per_subscription": 2
})
benchmark.start()
benchmark_time_min = benchmark.benchmark_time(
) + OpenBenchmarkTest.BENCHMARK_WAIT_TIME_MIN
benchmark.wait(timeout_sec=benchmark_time_min * 60)
benchmark.check_succeed()

@cluster(num_nodes=7)
def test_multiple_subscriptions_omb(self):
benchmark = OpenMessagingBenchmark(self._ctx, self.redpanda)
benchmark.set_configuration({"subscriptions_per_topic": 2})
@matrix(driver=["SIMPLE_DRIVER", "ACK_ALL_GROUP_LINGER_1MS"],
workload=["SIMPLE_WORKLOAD"])
def test_default_omb_configuration(self, driver, workload):
benchmark = OpenMessagingBenchmark(self._ctx, self.redpanda, driver,
workload)
benchmark.start()
benchmark_time_min = benchmark.benchmark_time(
) + OpenBenchmarkTest.BENCHMARK_WAIT_TIME_MIN
Expand Down
106 changes: 48 additions & 58 deletions tests/rptest/services/openmessaging_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from ducktape.services.service import Service
from ducktape.utils.util import wait_until
from rptest.services.utils import BadLogLines, NodeCrash
from rptest.services.openmessaging_benchmark_configs import OMBSampleConfigurations


# Benchmark worker that is used by benchmark process to run consumers and producers
Expand Down Expand Up @@ -113,82 +114,60 @@ def get_adresses(self):
# Benchmark process service
class OpenMessagingBenchmark(Service):
PERSISTENT_ROOT = "/var/lib/openmessaging"
RESULTS_DIR = os.path.join(PERSISTENT_ROOT, "results")
CHARTS_DIR = os.path.join(PERSISTENT_ROOT, "charts")
STDOUT_STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "benchmark.log")
OPENMESSAGING_DIR = "/opt/openmessaging-benchmark"
DRIVER_FILE = os.path.join(OPENMESSAGING_DIR,
"driver-redpanda/redpanda-ducktape.yaml")
WORKLOAD_FILE = os.path.join(OPENMESSAGING_DIR, "workloads/ducktape.yaml")
OUTPUT_FILE = os.path.join(PERSISTENT_ROOT, "result.json")
NUM_WORKERS = 2

logs = {
"open_messaging_benchmark_stdout_stderr": {
"path": STDOUT_STDERR_CAPTURE,
# Includes charts/ and results/ directories along with benchmark.log
"open_messaging_benchmark_root": {
"path": PERSISTENT_ROOT,
"collect_default": True
},
"open_messaging_benchmark_result": {
"path": OUTPUT_FILE,
"collect_default": True
}
}

def __init__(self, ctx, redpanda):
def __init__(self,
ctx,
redpanda,
driver="SIMPLE_DRIVER",
workload="SIMPLE_WORKLOAD"):
"""
Creates a utility that can run OpenMessagingBenchmark (OMB) tests in ducktape. See OMB
documentation for definitions of driver/workload files.
"""
super(OpenMessagingBenchmark, self).__init__(ctx, num_nodes=1)
self._ctx = ctx
self.redpanda = redpanda
self.set_default_configuration()
self.workers = None

def set_default_configuration(self):
self.configuration = {
"topics": 1,
"partitions_per_topic": 3,
"subscriptions_per_topic": 1,
"producers_per_topic": 1,
"consumer_per_subscription": 1,
"consumer_backlog_size_GB": 0,
}
if self.redpanda.dedicated_nodes:
self.configuration["producer_rate"] = 0
self.configuration["test_duration_minutes"] = 10
self.configuration["warmup_duration_minutes"] = 5
elif os.environ.get('CI', None) == 'true':
self.configuration["producer_rate"] = 10
self.configuration["test_duration_minutes"] = 5
self.configuration["warmup_duration_minutes"] = 5
else:
self.configuration["producer_rate"] = 10
self.configuration["test_duration_minutes"] = 1
self.configuration["warmup_duration_minutes"] = 1

def set_configuration(self, config):
for key, value in config.items():
self.configuration[key] = value
self.driver = OMBSampleConfigurations.DRIVERS[driver]
self.workload = OMBSampleConfigurations.WORKLOADS[workload][0]
self.validator = OMBSampleConfigurations.WORKLOADS[workload][1]
output_file_name = "result-{}-{}.json".format(self.driver["name"],
self.workload["name"])
self.output_file = os.path.join(OpenMessagingBenchmark.RESULTS_DIR,
output_file_name)
self.logger.info("Using driver: %s, workload: %s", self.driver["name"],
self.workload["name"])

def _create_benchmark_workload_file(self, node):
conf = self.render("omb_workload.yaml", **self.configuration)

self.logger.debug(
"Open Messaging Benchmark: Workload configuration: \n %s", conf)
conf = self.render("omb_workload.yaml", **self.workload)
self.logger.info("Rendered workload config: \n %s", conf)
node.account.create_file(OpenMessagingBenchmark.WORKLOAD_FILE, conf)

def _create_benchmark_driver_file(self, node):
rp_node = self.redpanda.nodes[0].account.hostname
conf = self.render("omb_driver.yaml", redpanda_node=rp_node)

self.logger.debug(
"Open Messaging Benchmark: Driver configuration: \n %s", conf)
self.driver["redpanda_node"] = self.redpanda.nodes[0].account.hostname
conf = self.render("omb_driver.yaml", **self.driver)
self.logger.info("Rendered driver config: \n %s", conf)
node.account.create_file(OpenMessagingBenchmark.DRIVER_FILE, conf)

def _create_workers(self):
consumer_workers_amount = self.configuration[
"topics"] * self.configuration[
"subscriptions_per_topic"] * self.configuration[
"consumer_per_subscription"]
producer_workers_amount = self.configuration[
"topics"] * self.configuration["producers_per_topic"]
workers_amount = consumer_workers_amount + producer_workers_amount
self.workers = OpenMessagingBenchmarkWorkers(
self._ctx, num_workers=workers_amount)
self._ctx, num_workers=OpenMessagingBenchmark.NUM_WORKERS)
self.workers.start()

def start_node(self, node):
Expand All @@ -212,11 +191,19 @@ def start_node(self, node):
bin/benchmark \
--drivers {OpenMessagingBenchmark.DRIVER_FILE} \
--workers {worker_nodes} \
--output {OpenMessagingBenchmark.OUTPUT_FILE} \
--output {self.output_file} \
{OpenMessagingBenchmark.WORKLOAD_FILE} >> {OpenMessagingBenchmark.STDOUT_STDERR_CAPTURE} 2>&1 \
& disown"

node.account.mkdirs(OpenMessagingBenchmark.PERSISTENT_ROOT)
# This command generates charts and returns some metrics data like latency quantiles and throughput that
# we can use to determine if they fall in the expected range.
self.chart_cmd = f"""cd {OpenMessagingBenchmark.OPENMESSAGING_DIR} &&
bin/generate_charts.py --results {OpenMessagingBenchmark.RESULTS_DIR} --output {OpenMessagingBenchmark.CHARTS_DIR}
"""

node.account.mkdirs(OpenMessagingBenchmark.RESULTS_DIR)
node.account.mkdirs(OpenMessagingBenchmark.CHARTS_DIR)
self.node = node

with node.account.monitor_log(
OpenMessagingBenchmark.STDOUT_STDERR_CAPTURE) as monitor:
Expand Down Expand Up @@ -250,10 +237,13 @@ def check_succeed(self):
self.workers.check_has_errors()
for node in self.nodes:
# Here we check that OMB finished and put result in file
assert node.account.exists(
OpenMessagingBenchmark.OUTPUT_FILE
), f"{node.account.hostname} OMB is not finished"
assert node.account.exists(\
self.output_file), f"{node.account.hostname} OMB is not finished"
self.raise_on_bad_log_lines(node)
# Generate charts from the result
self.logger.info(f"Generating charts with command {self.chart_cmd}")
metrics = json.loads(self.node.account.ssh_output(self.chart_cmd))
OMBSampleConfigurations.validate_metrics(metrics, self.validator)

def wait_node(self, node, timeout_sec):
process_pid = node.account.java_pids("benchmark")
Expand Down Expand Up @@ -287,5 +277,5 @@ def clean_node(self, node):
allow_fail=True)

def benchmark_time(self):
return self.configuration["test_duration_minutes"] + self.configuration[
return self.workload["test_duration_minutes"] + self.workload[
"warmup_duration_minutes"]
Loading

0 comments on commit acb4e2e

Please sign in to comment.