Skip to content

Commit

Permalink
ducky: improve kafka_cli_consumer logging
Browse files Browse the repository at this point in the history
  • Loading branch information
rystsov committed Aug 18, 2022
1 parent f4b3458 commit e38344f
Showing 1 changed file with 15 additions and 4 deletions.
19 changes: 15 additions & 4 deletions tests/rptest/services/kafka_cli_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def script(self):
return self._cli._script("kafka-console-consumer.sh")

def _worker(self, _, node):
self.logger.debug("%s: starting worker thread" % self.who_am_i(node))
self._stopping.clear()
try:

Expand Down Expand Up @@ -74,6 +75,8 @@ def _worker(self, _, node):
self.logger.debug(f"consumed: '{line}'")
self._messages.append(line)
except:
self.logger.exception("%s: something is wrong" %
self.who_am_i(node))
if self._stopping.is_set():
# Expect a non-zero exit code when killing during teardown
pass
Expand All @@ -89,13 +92,21 @@ def wait_for_messages(self, messages, timeout=30):

def wait_for_started(self, timeout=10):
def all_started():
return all([
len(node.account.java_pids("ConsoleConsumer")) == 1
for node in self.nodes
])
for node in self.nodes:
pids = node.account.java_pids("ConsoleConsumer")
self.logger.debug(
"%s: ConsoleConsumer pids: %s" %
(self.who_am_i(node), ",".join(map(str, pids))))
if len(pids) != 1:
return False
return True

wait_until(all_started, timeout, backoff_sec=1)

def stop_node(self, node):
self._stopping.set()
self.logger.exception("%s: stop requested" % self.who_am_i(node))
pids = node.account.java_pids("ConsoleConsumer")
self.logger.debug("%s: ConsoleConsumer pids: %s" %
(self.who_am_i(node), ",".join(map(str, pids))))
node.account.kill_process("java", clean_shutdown=True)

0 comments on commit e38344f

Please sign in to comment.