diff --git a/tests/rptest/services/kafka_cli_consumer.py b/tests/rptest/services/kafka_cli_consumer.py index 16778094be82..ee54865a5f26 100644 --- a/tests/rptest/services/kafka_cli_consumer.py +++ b/tests/rptest/services/kafka_cli_consumer.py @@ -91,6 +91,15 @@ def wait_for_messages(self, messages, timeout=30): timeout, backoff_sec=2) + def wait_for_started(self, timeout=10): + def all_started(): + return all([ + len(node.account.java_pids("ConsoleConsumer")) == 1 + for node in self.nodes + ]) + + wait_until(all_started, timeout, backoff_sec=1) + def stop_node(self, node): self._stopping.set() node.account.kill_process("java", clean_shutdown=True)