From 819be846669ec487d55fcad2423e80bc94ff380f Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Mon, 2 May 2022 11:29:19 +0200 Subject: [PATCH 1/2] tests/cli_consumer: added ability to consume from beginning Signed-off-by: Michal Maslanka --- tests/rptest/services/kafka_cli_consumer.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/rptest/services/kafka_cli_consumer.py b/tests/rptest/services/kafka_cli_consumer.py index 3b581e8ff4ef..f74f1eb4025b 100644 --- a/tests/rptest/services/kafka_cli_consumer.py +++ b/tests/rptest/services/kafka_cli_consumer.py @@ -24,6 +24,7 @@ def __init__(self, offset=None, partitions=None, isolation_level=None, + from_beginning=False, consumer_properties={}): super(KafkaCliConsumer, self).__init__(context, num_nodes=1) self._redpanda = redpanda @@ -32,6 +33,7 @@ def __init__(self, self._offset = offset self._partitions = partitions self._isolation_level = isolation_level + self._from_beginning = from_beginning self._consumer_properties = consumer_properties self._stopping = threading.Event() assert self._partitions is not None or self._group is not None, "either partitions or group have to be set" @@ -56,6 +58,8 @@ def _worker(self, _, node): cmd += ['--partition', ','.join(self._partitions)] if self._isolation_level is not None: cmd += ["--isolation-level", str(self._isolation_level)] + if self._from_beginning: + cmd += ["--from-beginning"] for k, v in self._consumer_properties.items(): cmd += ['--consumer-property', f"{k}={v}"] From c4382e1759a8646d9594677ea29fb5b170e86efb Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Mon, 2 May 2022 11:29:58 +0200 Subject: [PATCH 2/2] tests/fetch_long_poll: consume all messages from topic Signed-off-by: Michal Maslanka --- tests/rptest/tests/fetch_long_poll_test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/rptest/tests/fetch_long_poll_test.py b/tests/rptest/tests/fetch_long_poll_test.py index 4e456ffa5f48..59e8eaa3e820 100644 --- a/tests/rptest/tests/fetch_long_poll_test.py +++ b/tests/rptest/tests/fetch_long_poll_test.py @@ -49,6 +49,7 @@ def fetch_long_poll_test(self): self.redpanda, topic=topic.name, group='test-gr-1', + from_beginning=True, consumer_properties={ 'fetch.min.bytes': 1024, 'fetch.max.wait.ms': 1000,