Skip to content

Commit

Permalink
tests: cleanup in ManyPartitionsTest
Browse files Browse the repository at this point in the history
This is followup from PR
redpanda-data#5816
  • Loading branch information
jcsp committed Aug 11, 2022
1 parent c119573 commit c0b95f7
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 31 deletions.
58 changes: 27 additions & 31 deletions tests/rptest/scale_tests/many_partitions_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import time
import concurrent.futures
from collections import defaultdict
from collections import Counter

from ducktape.utils.util import wait_until, TimeoutError
import numpy
Expand All @@ -34,10 +34,10 @@
# how big the nodes or how many of them, the test does not exceed this.
# This limit represents bottlenecks in central controller/health
# functions.
HARD_PARTITION_LIMIT = 100000
HARD_PARTITION_LIMIT = 50000

# How many partitions we will create per shard: this is the primary scaling
# factor that controler how many partitions a given cluster will get.
# factor that controls how many partitions a given cluster will get.
PARTITIONS_PER_SHARD = 1000

# Number of partitions to create when running in docker (i.e.
Expand Down Expand Up @@ -119,16 +119,11 @@ def __init__(self, redpanda, replication_factor):
# timeouts when waiting for traffic: it is not a scientific
# success condition for the tests.
if self.redpanda.dedicated_nodes:
# A 24 core i3en.6xlarge has about 2GB/s disk write
# A 24 core i3en.6xlarge has about 1GB/s disk write
# bandwidth. Divide by 2 to give comfortable room for variation.
# This is total bandwidth from a group of producers.

# self.expect_bandwidth = (node_count / replication_factor) * (
# self.node_cpus / 24.0) * 1E9

# FIXME: surprisingly low throughput sometimes.
self.expect_bandwidth = (node_count / replication_factor) * (
self.node_cpus / 24.0) * 200E6
self.node_cpus / 24.0) * 1E9

# Single-producer tests are slower, bottlenecked on the
# client side.
Expand Down Expand Up @@ -285,13 +280,16 @@ def _node_leadership_evacuated(self, topic_names: list[str],

def _node_leadership_balanced(self, topic_names: list[str],
p_per_topic: int):
node_leader_counts = defaultdict(int)
node_leader_counts = Counter()
any_incomplete = False
for tn in topic_names:
try:
partitions = list(self.rpk.describe_topic(tn, tolerant=True))
except RpkException as e:
self.logger.warn(f"rpk timeout: {e}")
# We can get e.g. timeouts from rpk if it is trying to describe
# a big topic on a heavily loaded cluster: treat these as retryable
# and let our caller call us again.
self.logger.warn(f"RPK error, assuming retryable: {e}")
return False

if len(partitions) < p_per_topic:
Expand All @@ -300,12 +298,12 @@ def _node_leadership_balanced(self, topic_names: list[str],
continue

assert len(partitions) == p_per_topic
for p in partitions:
node_leader_counts[p.leader] += 1
node_leader_counts.update(p.leader for p in partitions)

for n, c in node_leader_counts.items():
self.logger.info(f"node {n} leaderships: {c}")

assert len(node_leader_counts) <= len(self.redpanda.nodes)
if len(node_leader_counts) != len(self.redpanda.nodes):
self.logger.info("Not all nodes have leaderships")
return False
Expand Down Expand Up @@ -365,19 +363,14 @@ def setUp(self):
pass

def _get_fd_counts(self):
counts = {}
with concurrent.futures.ThreadPoolExecutor(
max_workers=len(self.redpanda.nodes)) as executor:
futs = {}
for node in self.redpanda.nodes:
futs[node.name] = executor.submit(
lambda: sum(1 for _ in self.redpanda.lsof_node(node)))

for node_name, fut in futs.items():
file_count = fut.result()
counts[node_name] = file_count

return counts
return list(
executor.map(
lambda n: tuple(
[n, sum(1 for _ in self.redpanda.lsof_node(n))]),
self.redpanda.nodes))

def _concurrent_restart(self):
"""
Expand Down Expand Up @@ -451,7 +444,7 @@ def _restart_stress(self, scale: ScaleParameters, topic_names: list,
# on each restart (https://github.com/redpanda-data/redpanda/issues/4057)
restart_count = 2

for node_name, file_count in self._get_fd_counts().items():
for node_name, file_count in self._get_fd_counts():
self.logger.info(
f"Open files before restarts on {node_name}: {file_count}")

Expand All @@ -472,7 +465,7 @@ def _restart_stress(self, scale: ScaleParameters, topic_names: list,

inter_restart_check()

for node_name, file_count in self._get_fd_counts().items():
for node_name, file_count in self._get_fd_counts():
self.logger.info(
f"Open files after {i} restarts on {node_name}: {file_count}"
)
Expand Down Expand Up @@ -500,7 +493,7 @@ def _write_and_random_read(self, scale: ScaleParameters, topic_names):
target_topic = topic_names[0]

# Assume fetches will be 10MB, the franz-go default
fetch_mb_per_partition = 10 * 1024 * 1024
fetch_bytes_per_partition = 10 * 1024 * 1024

# * Need enough data that if a consumer tried to fetch it all at once
# in a single request, it would run out of memory. OR the amount of
Expand All @@ -515,7 +508,8 @@ def _write_and_random_read(self, scale: ScaleParameters, topic_names):

write_bytes_per_topic = min(
int((self.redpanda.get_node_memory_mb() * 1024 * 1024) /
len(topic_names)), fetch_mb_per_partition * n_partitions) * 2
len(topic_names)),
fetch_bytes_per_partition * n_partitions) * 2

if not self.redpanda.dedicated_nodes:
# Docker developer mode: likely to be on a workstation with lots of RAM
Expand Down Expand Up @@ -738,8 +732,10 @@ def _test_many_partitions(self, compacted):

scale = ScaleParameters(self.redpanda, replication_factor)

# Run with one huge topic: this is the more stressful case for Redpanda, compared
# with multiple modestly-sized topics, so it's what we test to find the system's limits.
# Run with one huge topic: it is more stressful for redpanda when clients
# request the metadata for many partitions at once, and the simplest way
# to get traffic generators to do that without the clients supporting
# writing to arrays of topics is to put all the partitions into one topic.
n_topics = 1

# Partitions per topic
Expand Down Expand Up @@ -774,7 +770,7 @@ def _test_many_partitions(self, compacted):
backoff_sec=5)
self.logger.info(f"Initial elections done.")

for node_name, file_count in self._get_fd_counts().items():
for node_name, file_count in self._get_fd_counts():
self.logger.info(
f"Open files after initial elections on {node_name}: {file_count}"
)
Expand Down
1 change: 1 addition & 0 deletions tests/rptest/services/kgo_repeater_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ def group_ready():
# in practice I see some a small minority of consumers drop out of the
# group sometimes when the cluster undergoes an all-node concurrent restart,
# and I don't want to stop the test for that.
# https://github.com/redpanda-data/redpanda/issues/5959
self.logger.debug(
f"group_ready: waiting for node count ({group.members} != {expect_members})"
)
Expand Down

0 comments on commit c0b95f7

Please sign in to comment.