Skip to content

Commit

Permalink
ducktape: Use rpkconsumer in conn_rate test
Browse files Browse the repository at this point in the history
  • Loading branch information
VadimPlh committed Sep 12, 2022
1 parent 9e6b5b3 commit ede9131
Showing 1 changed file with 15 additions and 15 deletions.
30 changes: 15 additions & 15 deletions tests/rptest/tests/connection_rate_limit_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from rptest.tests.prealloc_nodes import PreallocNodesTest
from rptest.services.redpanda import ResourceSettings
from rptest.services.kgo_verifier_services import KgoVerifierProducer
from rptest.services.kaf_consumer import KafConsumer
from rptest.services.rpk_consumer import RpkConsumer
from rptest.services.metrics_check import MetricCheck

RATE_METRIC = "vectorized_kafka_rpc_connections_wait_rate_total"
Expand Down Expand Up @@ -52,8 +52,12 @@ def __init__(self, test_context):
self.preallocated_nodes)

def start_consumer(self):
return KafConsumer(self.test_context, self.redpanda, self.topics[0],
self.READ_COUNT, "oldest")
return RpkConsumer(context=self.test_context,
redpanda=self.redpanda,
topic=self.topics[0],
num_msgs=1,
save_msgs=True,
retry_sec=(1 / self.RATE_LIMIT))

def stop_consumer(self, consumer):
try:
Expand All @@ -77,17 +81,15 @@ def consumed():
need_finish = True
for i in range(consumers_count):
self.logger.debug(
f"Offset for {i} consumer: {consumers[i].offset}")
f"Offset for {i} consumer: {len(consumers[i].messages)}")

if consumers[i].done is True:
self.stop_consumer(consumers[i])
self.logger.debug(f"Rerun consumer {i}")
consumers[i] = self.start_consumer()
consumers[i].start()

if (consumers[i].offset.get(0) is
None) or consumers[i].offset[0] == 0:
if len(consumers[i].messages) == 0:
need_finish = False
if consumers[i].done is True:
self.stop_consumer(consumers[i])
self.logger.debug(f"Rerun consumer {i}")
consumers[i] = self.start_consumer()
consumers[i].start()

return need_finish

Expand All @@ -105,7 +107,7 @@ def consumed():
def get_read_time(self, consumers_count):
deltas = list()

for i in range(10):
for i in range(20):
connection_time = self.read_data(consumers_count)
deltas.append(connection_time)

Expand All @@ -114,8 +116,6 @@ def get_read_time(self, consumers_count):

return sum(deltas) / len(deltas)

@ok_to_fail # https://github.com/redpanda-data/redpanda/issues/5276
# https://github.com/redpanda-data/redpanda/issues/6074
@cluster(num_nodes=8)
def connection_rate_test(self):
self._producer.start(clean=False)
Expand Down

0 comments on commit ede9131

Please sign in to comment.