Skip to content

Commit

Permalink
tests/services: enable running OMB with pre-allocated nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
jcsp committed Aug 8, 2022
1 parent bcb2102 commit 75f64f1
Showing 1 changed file with 31 additions and 7 deletions.
38 changes: 31 additions & 7 deletions tests/rptest/services/openmessaging_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import os
import json
import collections
from typing import Optional

from ducktape.services.service import Service
from ducktape.utils.util import wait_until
Expand All @@ -33,9 +34,17 @@ class OpenMessagingBenchmarkWorkers(Service):
}
}

def __init__(self, ctx, num_workers=3):
def __init__(self, ctx, num_workers=3, nodes: Optional[list] = None):
"""
:param num_workers: allocate this many nodes as workers
:param nodes: use these pre-allocated nodes as workers
"""
super(OpenMessagingBenchmarkWorkers,
self).__init__(ctx, num_nodes=num_workers)
self).__init__(ctx, num_nodes=0 if nodes else num_workers)

if nodes is not None:
assert len(nodes) > 0
self.nodes = nodes

def start_node(self, node):
self.logger.info("Starting Open Messaging Benchmark worker node on %s",
Expand Down Expand Up @@ -136,18 +145,31 @@ def __init__(self,
ctx,
redpanda,
driver="SIMPLE_DRIVER",
workload="SIMPLE_WORKLOAD"):
workload="SIMPLE_WORKLOAD",
node=None,
worker_nodes=None):
"""
Creates a utility that can run OpenMessagingBenchmark (OMB) tests in ducktape. See OMB
documentation for definitions of driver/workload files.
"""
super(OpenMessagingBenchmark, self).__init__(ctx, num_nodes=1)
super(OpenMessagingBenchmark,
self).__init__(ctx, num_nodes=0 if node else 1)

if node:
self.nodes = [node]

self._ctx = ctx
self.redpanda = redpanda
self.worker_nodes = worker_nodes
self.workers = None
self.driver = OMBSampleConfigurations.DRIVERS[driver]
self.workload = OMBSampleConfigurations.WORKLOADS[workload][0]
self.validator = OMBSampleConfigurations.WORKLOADS[workload][1]
if isinstance(workload, str):
self.workload = OMBSampleConfigurations.WORKLOADS[workload][0]
self.validator = OMBSampleConfigurations.WORKLOADS[workload][1]
else:
self.workload = workload[0]
self.validator = workload[1]

self.logger.info("Using driver: %s, workload: %s", self.driver["name"],
self.workload["name"])

Expand All @@ -164,7 +186,9 @@ def _create_benchmark_driver_file(self, node):

def _create_workers(self):
self.workers = OpenMessagingBenchmarkWorkers(
self._ctx, num_workers=OpenMessagingBenchmark.NUM_WORKERS)
self._ctx,
num_workers=OpenMessagingBenchmark.NUM_WORKERS,
nodes=self.worker_nodes)
self.workers.start()

def start_node(self, node):
Expand Down

0 comments on commit 75f64f1

Please sign in to comment.