Skip to content

Commit

Permalink
tests/services: optionally start redpandas in parallel
Browse files Browse the repository at this point in the history
This is an efficiency/quality of life improvement for
working with tests that start larger numbers of nodes.

Leave the default as serial startup, because it makes logs
easier to read.
  • Loading branch information
jcsp committed Aug 8, 2022
1 parent 087d653 commit ae72725
Showing 1 changed file with 40 additions and 4 deletions.
44 changes: 40 additions & 4 deletions tests/rptest/services/redpanda.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0
import concurrent.futures
import copy

import time
Expand Down Expand Up @@ -625,8 +626,38 @@ def get_node_memory_mb(self):
memory_kb = int(line.strip().split()[1])
return memory_kb / 1024

def start(self, nodes=None, clean_nodes=True, start_si=True):
"""Start the service on all nodes."""
def _for_nodes(self, nodes, cb: callable, *, parallel: bool):
if not parallel:
# Trivial case: just loop and call
for n in nodes:
cb(n)
return

node_futures = []
with concurrent.futures.ThreadPoolExecutor(
max_workers=len(nodes)) as executor:
for node in nodes:
f = executor.submit(cb, node)
node_futures.append((node, f))

for node, f in node_futures:
f.result()

def start(self,
nodes=None,
clean_nodes=True,
start_si=True,
parallel: bool = False):
"""
Start the service on all nodes.
By default, nodes are started in serial: this makes logs easier to
read and simplifies debugging. For tests starting larger numbers of
nodes where serialized startup becomes annoying, pass parallel=True.
:param parallel: if true, run clean and start operations in parallel
for the nodes being started.
"""
to_start = nodes if nodes is not None else self.nodes
assert all((node in self.nodes for node in to_start))
self.logger.info("%s: starting service" % self.who_am_i())
Expand All @@ -638,7 +669,8 @@ def start(self, nodes=None, clean_nodes=True, start_si=True):
self.logger.debug(
self.who_am_i() +
": killing processes and attempting to clean up before starting")
for node in to_start:

def clean_one(node):
try:
self.stop_node(node)
except Exception:
Expand All @@ -658,14 +690,18 @@ def start(self, nodes=None, clean_nodes=True, start_si=True):
f"Error cleaning node {node.account.hostname}:")
raise

self._for_nodes(to_start, clean_one, parallel=parallel)

if first_start:
self.write_tls_certs()
self.write_bootstrap_cluster_config()

for node in to_start:
def start_one(node):
self.logger.debug("%s: starting node" % self.who_am_i(node))
self.start_node(node)

self._for_nodes(to_start, start_one, parallel=parallel)

if self._start_duration_seconds < 0:
self._start_duration_seconds = time.time() - self._start_time

Expand Down

0 comments on commit ae72725

Please sign in to comment.