Skip to content

Commit

Permalink
ducky: tune partition_movement to be faster
Browse files Browse the repository at this point in the history
duration of partition_movement_test.py falls down from
14 minutes 15.077 seconds to 12 minutes 39.137 seconds
  • Loading branch information
rystsov committed Jul 14, 2022
1 parent 9f048db commit cd60f49
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 28 deletions.
2 changes: 2 additions & 0 deletions tests/rptest/tests/end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from rptest.util import wait_until
from rptest.services.redpanda import RedpandaService
from rptest.clients.default import DefaultClient
from rptest.services.admin import Admin
from rptest.services.verifiable_consumer import VerifiableConsumer
from rptest.services.verifiable_producer import VerifiableProducer, is_int_with_prefix

Expand Down Expand Up @@ -90,6 +91,7 @@ def start_redpanda(self,
si_settings=self.si_settings)
self.redpanda.start()
self._client = DefaultClient(self.redpanda)
self._admin_client = Admin(self.redpanda)

@property
def s3_client(self):
Expand Down
60 changes: 32 additions & 28 deletions tests/rptest/tests/partition_movement.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,10 @@
# by the Apache License, Version 2.0

import random
from rptest.services.admin import Admin
from rptest.util import wait_until


class PartitionMovementMixin():
@staticmethod
def _random_partition(metadata):
topic = random.choice(metadata)
partition = random.choice(topic.partitions)
return topic.name, partition.id

@staticmethod
def _choose_replacement(admin, assignments, allow_no_ops=True):
"""
Expand Down Expand Up @@ -87,52 +80,63 @@ def keep(p):
return result

def _wait_post_move(self, topic, partition, assignments, timeout_sec):
admin = Admin(self.redpanda)

def status_done():
info = admin.get_partitions(topic, partition)
self.logger.info(
f"current assignments for {topic}-{partition}: {info}")
converged = self._equal_assignments(info["replicas"], assignments)
return converged and info["status"] == "done"

# wait until redpanda reports complete
wait_until(status_done, timeout_sec=timeout_sec, backoff_sec=2)

# wait until all nodes see new assignment
def derived_done():
info = self._get_current_partitions(admin, topic, partition)
info = self._get_current_partitions(self._admin_client, topic,
partition)
self.logger.info(
f"derived assignments for {topic}-{partition}: {info}")
return self._equal_assignments(info, assignments)

wait_until(derived_done, timeout_sec=timeout_sec, backoff_sec=2)
wait_until(derived_done, timeout_sec=timeout_sec, backoff_sec=1)

def _do_move_and_verify(self, topic, partition, timeout_sec):
admin = Admin(self.redpanda)
def status_done():
for node in self.redpanda.nodes:
info = self._admin_client.get_partitions(topic,
partition,
node=node)
self.logger.info(
f"current assignments for {topic}-{partition}: {info}")
converged = self._equal_assignments(info["replicas"],
assignments)
if converged and info["status"] == "done":
return True
return False

# wait until redpanda reports complete
wait_until(status_done, timeout_sec=timeout_sec, backoff_sec=1)

def _do_move_and_verify(self, topic, partition, timeout_sec):
# get the partition's replica set, including core assignments. the kafka
# api doesn't expose core information, so we use the redpanda admin api.
assignments = self._get_assignments(admin, topic, partition)
assignments = self._get_assignments(self._admin_client, topic,
partition)
self.logger.info(f"assignments for {topic}-{partition}: {assignments}")

# build new replica set by replacing a random assignment
selected, replacements = self._choose_replacement(admin, assignments)
selected, replacements = self._choose_replacement(
self._admin_client, assignments)
self.logger.info(
f"replacement for {topic}-{partition}:{len(selected)}: {selected} -> {replacements}"
)
self.logger.info(
f"new assignments for {topic}-{partition}: {assignments}")

r = admin.set_partition_replicas(topic, partition, assignments)
self._admin_client.set_partition_replicas(topic, partition,
assignments)

self._wait_post_move(topic, partition, assignments, timeout_sec)

return (topic, partition, assignments)

def _move_and_verify(self):
# choose a random topic-partition
metadata = self.client().describe_topics()
topic, partition = self._random_partition(metadata)
partitions = []
for p in self._admin_client.get_partitions():
if p["ns"] != "kafka":
continue
partitions.append((p["topic"], p["partition_id"]))
topic, partition = random.choice(partitions)
# timeout for __consumer_offsets topic has to be long enough
# to wait for compaction to finish. For resource constrained machines
# and redpanda debug builds it may take a very long time
Expand Down

0 comments on commit cd60f49

Please sign in to comment.