Skip to content

Commit

Permalink
Merge pull request #6315 from VadimPlh/fix-con-rate-test
Browse files Browse the repository at this point in the history
[ducktape] Fix connection rate test
  • Loading branch information
Matko Medenjak committed Sep 15, 2022
2 parents 2253454 + ede9131 commit 1e5dbcc
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 20 deletions.
11 changes: 6 additions & 5 deletions tests/rptest/services/rpk_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ def __init__(self,
group='',
save_msgs=True,
fetch_max_bytes=None,
num_msgs=None):
num_msgs=None,
retry_sec=5):
super(RpkConsumer, self).__init__(context, num_nodes=1)
self._redpanda = redpanda
self._topic = topic
Expand All @@ -48,9 +49,9 @@ def __init__(self,
self._save_msgs = save_msgs
self._fetch_max_bytes = fetch_max_bytes
self._num_msgs = num_msgs
self._retry_sec = retry_sec

def _worker(self, idx, node):
retry_sec = 5
err = None

self._stopping.clear()
Expand All @@ -65,11 +66,11 @@ def _worker(self, idx, node):

err = e
self._redpanda.logger.error(
f"Consumer failed with error: '{e}'. Retrying in {retry_sec} seconds."
f"Consumer failed with error: '{e}'. Retrying in {self._retry_sec} seconds."
)
attempt += 1
self._stopping.wait(retry_sec)
time.sleep(retry_sec)
self._stopping.wait(self._retry_sec)
time.sleep(self._retry_sec)
else:
err = None

Expand Down
30 changes: 15 additions & 15 deletions tests/rptest/tests/connection_rate_limit_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from rptest.tests.prealloc_nodes import PreallocNodesTest
from rptest.services.redpanda import ResourceSettings
from rptest.services.kgo_verifier_services import KgoVerifierProducer
from rptest.services.kaf_consumer import KafConsumer
from rptest.services.rpk_consumer import RpkConsumer
from rptest.services.metrics_check import MetricCheck

RATE_METRIC = "vectorized_kafka_rpc_connections_wait_rate_total"
Expand Down Expand Up @@ -52,8 +52,12 @@ def __init__(self, test_context):
self.preallocated_nodes)

def start_consumer(self):
return KafConsumer(self.test_context, self.redpanda, self.topics[0],
self.READ_COUNT, "oldest")
return RpkConsumer(context=self.test_context,
redpanda=self.redpanda,
topic=self.topics[0],
num_msgs=1,
save_msgs=True,
retry_sec=(1 / self.RATE_LIMIT))

def stop_consumer(self, consumer):
try:
Expand All @@ -77,17 +81,15 @@ def consumed():
need_finish = True
for i in range(consumers_count):
self.logger.debug(
f"Offset for {i} consumer: {consumers[i].offset}")
f"Offset for {i} consumer: {len(consumers[i].messages)}")

if consumers[i].done is True:
self.stop_consumer(consumers[i])
self.logger.debug(f"Rerun consumer {i}")
consumers[i] = self.start_consumer()
consumers[i].start()

if (consumers[i].offset.get(0) is
None) or consumers[i].offset[0] == 0:
if len(consumers[i].messages) == 0:
need_finish = False
if consumers[i].done is True:
self.stop_consumer(consumers[i])
self.logger.debug(f"Rerun consumer {i}")
consumers[i] = self.start_consumer()
consumers[i].start()

return need_finish

Expand All @@ -105,7 +107,7 @@ def consumed():
def get_read_time(self, consumers_count):
deltas = list()

for i in range(10):
for i in range(20):
connection_time = self.read_data(consumers_count)
deltas.append(connection_time)

Expand All @@ -114,8 +116,6 @@ def get_read_time(self, consumers_count):

return sum(deltas) / len(deltas)

@ok_to_fail # https://github.com/redpanda-data/redpanda/issues/5276
# https://github.com/redpanda-data/redpanda/issues/6074
@cluster(num_nodes=8)
def connection_rate_test(self):
self._producer.start(clean=False)
Expand Down

0 comments on commit 1e5dbcc

Please sign in to comment.