Skip to content

Commit

Permalink
ducktape: Increase consumer count for connection_rate test
Browse files Browse the repository at this point in the history
  • Loading branch information
VadimPlh committed Sep 5, 2022
1 parent 703105a commit 8742a51
Showing 1 changed file with 14 additions and 15 deletions.
29 changes: 14 additions & 15 deletions tests/rptest/tests/connection_rate_limit_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ class ConnectionRateLimitTest(PreallocNodesTest):
MSG_SIZE = 1000000
PRODUCE_COUNT = 10
READ_COUNT = 10
RANDOM_READ_PARALLEL = 3
RATE_LIMIT = 4
RANDOM_READ_PARALLEL = 20
RATE_LIMIT = 21
REFRESH_TOKENS_TIME_SEC = 2

topics = (TopicSpec(partition_count=1, replication_factor=1), )
Expand All @@ -46,14 +46,17 @@ def __init__(self, test_context):
extra_rp_conf={"kafka_connection_rate_limit": self.RATE_LIMIT},
resource_settings=resource_setting)

self._custom_node = test_context.cluster.alloc(
ClusterSpec.simple_linux(1))

self._producer = KgoVerifierProducer(test_context, self.redpanda,
self.topics[0], self.MSG_SIZE,
self.PRODUCE_COUNT,
self.preallocated_nodes)

def start_consumer(self):
return KafConsumer(self.test_context, self.redpanda, self.topics[0],
self.READ_COUNT, "oldest")
self.READ_COUNT, "oldest", self._custom_node)

def stop_consumer(self, consumer):
try:
Expand All @@ -79,15 +82,16 @@ def consumed():
self.logger.debug(
f"Offset for {i} consumer: {consumers[i].offset}")

if consumers[i].done is True:
self.stop_consumer(consumers[i])
self.logger.debug(f"Rerun consumer {i}")
consumers[i] = self.start_consumer()
consumers[i].start()

if (consumers[i].offset.get(0) is
None) or consumers[i].offset[0] == 0:
need_finish = False
if consumers[i].done is True:
self.logger.debug(f"Rerun consumer {i}")
consumers[i] = self.start_consumer()
consumers[i].start()
else:
if consumers[i].done is False:
self.stop_consumer(consumers[i])

return need_finish

Expand All @@ -97,9 +101,6 @@ def consumed():

finish = time.time()

for i in range(consumers_count):
self.stop_consumer(consumers[i])

return finish - start

def get_read_time(self, consumers_count):
Expand All @@ -114,9 +115,7 @@ def get_read_time(self, consumers_count):

return sum(deltas) / len(deltas)

@ok_to_fail # https://github.com/redpanda-data/redpanda/issues/5276
# https://github.com/redpanda-data/redpanda/issues/6074
@cluster(num_nodes=8)
@cluster(num_nodes=3)
def connection_rate_test(self):
self._producer.start(clean=False)
self._producer.wait()
Expand Down

0 comments on commit 8742a51

Please sign in to comment.