Skip to content

Commit

Permalink
tests: make ManyPartitionsTest flexible-size
Browse files Browse the repository at this point in the history
This enables:
- Running on different instance types without
  hacking the test
- Running on local docker while developing the
  test itself.
  • Loading branch information
jcsp committed Aug 8, 2022
1 parent 5e895f2 commit e248199
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 35 deletions.
134 changes: 99 additions & 35 deletions tests/rptest/scale_tests/many_partitions_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,21 @@
# franz-go default maxBrokerReadBytes -- --fetch-max-bytes may not exceed this
BIG_FETCH = 104857600

# The maximum partition count that's stable on a three node redpanda cluster
# with replicas=3.
# Don't go above this partition count. This limit reflects
# issues (at time of writing in redpanda 22.1) around disk contention on node startup,
# where ducktape's startup threshold is violated by the time it takes systems with
# more partitions to replay recent content on startup.
HARD_PARTITION_LIMIT = 10000
# The maximum total number of partitions in a Redpanda cluster. No matter
# how big the nodes or how many of them, the test does not exceed this.
# This limit represents bottlenecks in central controller/health
# functions.
HARD_PARTITION_LIMIT = 100000

# How many partitions we will create per shard: this value enables
# reaching HARD_PARTITION_LIMIT on 6x i3en.6xlarge nodes.
PARTITIONS_PER_SHARD = 2200

# Number of partitions to create when running in docker (i.e.
# when dedicated_nodes=false). This is independent of the
# amount of RAM or CPU that the nodes claim to have, because
# we know they are liable to be oversubscribed.
DOCKER_PARTITION_LIMIT = 1000


class ManyPartitionsTest(PreallocNodesTest):
Expand Down Expand Up @@ -134,10 +142,12 @@ def test_many_partitions(self):
* Run a general produce+consume workload to check that the system remains in
a functional state.
"""
# This test requires dedicated system resources to run reliably.
#assert self.redpanda.dedicated_nodes
This test dynamically scales with the nodes provided. It can run
on docker environments, but that is only for developers iterating
on the test itself: meaningful tests of scale must be done on
dedicated nodes.
"""

# Scale tests are not run on debug builds
assert not self.debug_mode
Expand All @@ -150,41 +160,87 @@ def test_many_partitions(self):
# We are validating that the system works up to the limit, and that it works
# up to the limit within the default per-partition memory footprint.
node_memory = self.redpanda.get_node_memory_mb()
node_cpus = self.redpanda.get_node_cpu_count()
node_disk_free = self.redpanda.get_node_disk_free()

self.logger.info(
f"Nodes have {node_cpus} cores, {node_memory}MB memory, {node_disk_free / (1024 * 1024)}MB free disk"
)

# HARD_PARTITION_LIMIT is for a 3 node cluster, adjust according to
# the number of nodes in this cluster.
partition_limit = HARD_PARTITION_LIMIT * (node_count / 3)
# On large nodes, reserve half of shard 0 to minimize interference
# between data and control plane, as control plane messages become
# very large.

mb_per_partition = 1
shard0_reserve = None
if node_cpus >= 8:
shard0_reserve = PARTITIONS_PER_SHARD / 2

# How much memory to reserve for internal partitions, such as
# id_allocator. This is intentionally higher than needed, to
# avoid having to update this test each time a new internal topic
# is added.
# Reserve a few slots for internal partitions, do not be
# super specific about how many because we may add some in
# future for e.g. audit logging.
internal_partition_slack = 10

# Calculate how many partitions we will aim to create, based
# on the size & count of nodes. This enables running the
# test on various instance sizes without explicitly adjusting.
partition_limit = (node_count * node_cpus * PARTITIONS_PER_SHARD
) / replication_factor - internal_partition_slack
if shard0_reserve:
partition_limit -= node_count * shard0_reserve

partition_limit = min(HARD_PARTITION_LIMIT, partition_limit)

if not self.redpanda.dedicated_nodes:
partition_limit = min(DOCKER_PARTITION_LIMIT, partition_limit)

self.logger.info(f"Selected partition limit {partition_limit}")

# Emulate seastar's policy for default reserved memory
reserved_memory = max(1536, int(0.07 * node_memory) + 1)
effective_node_memory = node_memory - reserved_memory

# TODO: calculate an appropriate segment size for the disk space divided
# by the partition count, then set an appropriate retention.bytes and
# enable compaction, so that during the final stress period of the test,
# we are exercising compaction.
# Aim to use about half the disk space: set retention limits
# to enforce that. This enables traffic tests to run as long
# as they like without risking filling the disk.
partition_replicas_per_node = int(
(partition_limit * replication_factor) / node_count)
retention_bytes = int(
(node_disk_free / 2) / partition_replicas_per_node)

# Clamp memory if nodes have more memory than should be required
# to exercise the partition limit.
if effective_node_memory > HARD_PARTITION_LIMIT / mb_per_partition:
clamp_memory = mb_per_partition * (
(HARD_PARTITION_LIMIT + internal_partition_slack) +
reserved_memory)
# Choose an appropriate segment size to enable retention
# to kick in.
# TODO: redpanda should figure this out automatically by
# rolling segments pre-emptively if low on disk space
segment_size = int(retention_bytes / 2)

# Handy if hacking HARD_PARTITION_LIMIT to something low to run on a workstation
clamp_memory = max(clamp_memory, 500)
self.logger.info(
f"Selected retention.bytes={retention_bytes}, segment.bytes={segment_size}"
)

resource_settings = ResourceSettings(memory_mb=clamp_memory)
self.redpanda.set_resource_settings(resource_settings)
elif effective_node_memory < HARD_PARTITION_LIMIT / mb_per_partition:
mb_per_partition = 1
if not self.redpanda.dedicated_nodes:
# In docker, assume we're on a laptop drive and not doing
# real testing, so disable fsync to make test run faster.
resource_settings_args = {'bypass_fsync': True}

# In docker, make the test a bit more realistic by clamping
# memory if nodes have more memory than should be required
# to exercise the partition limit.
if effective_node_memory > partition_replicas_per_node / mb_per_partition:
clamp_memory = mb_per_partition * (
(partition_replicas_per_node + internal_partition_slack) +
reserved_memory)

# Handy if hacking HARD_PARTITION_LIMIT to something low to run on a workstation
clamp_memory = max(clamp_memory, 500)
resource_settings_args['memory_mb'] = clamp_memory

self.redpanda.set_resource_settings(
ResourceSettings(**resource_settings_args))

# Should not happen on the expected EC2 instance types where
# the cores-RAM ratio is sufficient to meet our shards-per-core
if effective_node_memory < partition_replicas_per_node / mb_per_partition:
raise RuntimeError(
f"Node memory is too small ({node_memory}MB - {reserved_memory}MB)"
)
Expand All @@ -209,7 +265,11 @@ def test_many_partitions(self):
f"Creating topic {tn} with {n_partitions} partitions")
self.rpk.create_topic(tn,
partitions=n_partitions,
replicas=replication_factor)
replicas=replication_factor,
config={
'segment.bytes': segment_size,
'retention.bytes': retention_bytes
})

self.logger.info(f"Awaiting elections...")
wait_until(lambda: self._all_elections_done(topic_names, n_partitions),
Expand Down Expand Up @@ -249,6 +309,7 @@ def test_many_partitions(self):
# Approx time to write or read all messages, for timeouts
# Pessimistic bandwidth guess, accounting for the sub-disk bandwidth
# that a single-threaded consumer may see

expect_bandwidth = 50 * 1024 * 1024

expect_transmit_time = int(write_bytes_per_topic / expect_bandwidth)
Expand Down Expand Up @@ -342,12 +403,13 @@ def get_fd_counts():

# Now that we've tested basic ability to form consensus and survive some
# restarts, move on to a more general stress test.

self.logger.info("Entering traffic stress test")
target_topic = topic_names[0]

stress_msg_size = 32768

stress_data_size = 1024 * 1024 * 1024 * 100

stress_msg_count = int(stress_data_size / stress_msg_size)
fast_producer = FranzGoVerifiableProducer(
self.test_context,
Expand Down Expand Up @@ -377,6 +439,8 @@ def get_fd_counts():
rand_consumer.wait()

fast_producer.wait()
self.logger.info(
"Write+randread stress test complete, verifying sequentially")

seq_consumer = FranzGoVerifiableSeqConsumer(self.test_context,
self.redpanda,
Expand Down
44 changes: 44 additions & 0 deletions tests/rptest/services/redpanda.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ def __init__(self,
def memory_mb(self):
return self._memory_mb

@property
def num_cpus(self):
return self._num_cpus

def to_cli(self, *, dedicated_node):
"""
Expand Down Expand Up @@ -633,6 +637,46 @@ def get_node_memory_mb(self):
memory_kb = int(line.strip().split()[1])
return memory_kb / 1024

def get_node_cpu_count(self):
if self._resource_settings.num_cpus is not None:
self.logger.info(f"get_node_cpu_count: got from ResourceSettings")
return self._resource_settings.num_cpus
elif self._dedicated_nodes is False:
self.logger.info(
f"get_node_cpu_count: using ResourceSettings default")
return self._resource_settings.DEFAULT_NUM_CPUS
else:
self.logger.info(f"get_node_cpu_count: fetching from node")

# Assume nodes are symmetric, so we can just ask one
node = self.nodes[0]
core_count_str = node.account.ssh_output(
"cat /proc/cpuinfo | grep ^processor | wc -l")
return int(core_count_str.strip())

def get_node_disk_free(self):
# Assume nodes are symmetric, so we can just ask one
node = self.nodes[0]

if node.account.exists(self.PERSISTENT_ROOT):
df_path = self.PERSISTENT_ROOT
else:
# If dir doesn't exist yet, use the parent.
df_path = os.path.dirname(self.PERSISTENT_ROOT)

df_out = node.account.ssh_output(f"df --output=avail {df_path}")

avail_kb = int(df_out.strip().split(b"\n")[1].strip())

if not self.dedicated_nodes:
# Assume docker images share a filesystem. This may not
# be the truth (e.g. in CI they get indepdendent XFS
# filesystems), but it's the safe assumption on e.g.
# a workstation.
avail_kb = int(avail_kb / len(self.nodes))

return avail_kb * 1024

def _for_nodes(self, nodes, cb: callable, *, parallel: bool):
if not parallel:
# Trivial case: just loop and call
Expand Down

0 comments on commit e248199

Please sign in to comment.