Skip to content

Commit

Permalink
ducktape: partition balancer cancellations tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ZeDRoman committed Jul 19, 2022
1 parent 919eafc commit 0cfcd63
Showing 1 changed file with 82 additions and 19 deletions.
101 changes: 82 additions & 19 deletions tests/rptest/tests/partition_balancer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,27 +35,30 @@ def __init__(self, ctx, *args, **kwargs):
**kwargs,
)

def topic_partitions_replicas(self, topic):

rpk = RpkTool(self.redpanda)
num_partitions = topic.partition_count

def all_partitions_ready():
try:
partitions = list(rpk.describe_topic(topic.name))
except RpkException:
return False
return (len(partitions) == num_partitions, partitions)

return wait_until_result(
all_partitions_ready,
timeout_sec=30,
backoff_sec=1,
err_msg="failed to wait until all partitions have leaders",
)

def node2partition_count(self):
topics = [self.topic]
rpk = RpkTool(self.redpanda)
ret = {}
for topic in topics:
num_partitions = topic.partition_count

def all_partitions_ready():
try:
partitions = list(rpk.describe_topic(topic.name))
except RpkException:
return False
return (len(partitions) == num_partitions, partitions)

partitions = wait_until_result(
all_partitions_ready,
timeout_sec=30,
backoff_sec=1,
err_msg="failed to wait until all partitions have leaders",
)

partitions = self.topic_partitions_replicas(topic)
for p in partitions:
for r in p.replicas:
ret[r] = ret.setdefault(r, 0) + 1
Expand Down Expand Up @@ -124,8 +127,6 @@ def test_unavailable_nodes(self):

time.sleep(self.NODE_AVAILABILITY_TIMEOUT)

self.wait_until_ready()

node2pc = self.node2partition_count()
self.logger.info(f"partition counts after: {node2pc}")

Expand All @@ -135,3 +136,65 @@ def test_unavailable_nodes(self):
prev_failure = failure

self.run_validation()

def _throttle_recovery(self, new_value):
self.redpanda.set_cluster_config({"raft_learner_recovery_rate": str(new_value)})

@cluster(num_nodes=6, log_allow_list=CHAOS_LOG_ALLOW_LIST)
def test_movement_cancellations(self):
self.start_redpanda(num_nodes=4)

self.topic = TopicSpec(partition_count=random.randint(20, 30))

f_injector = FailureInjector(self.redpanda)

self.client().create_topic(self.topic)

self.start_producer(1)
self.start_consumer(1)
self.await_startup()

empty_node = self.redpanda.nodes[-1]

# clean node
initial_failure = FailureSpec(FailureSpec.FAILURE_KILL, empty_node)
f_injector._start_func(initial_failure.type)(initial_failure.node)
time.sleep(self.NODE_AVAILABILITY_TIMEOUT)
self.wait_until_ready()
f_injector._stop_func(initial_failure.type)(initial_failure.node)
node2pc = self.node2partition_count()
self.logger.info(f"partition counts after: {node2pc}")
assert self.redpanda.idx(empty_node) not in node2pc

# throttle recovery to prevent partition move from finishing
self._throttle_recovery(10)

total_replicas = sum(self.node2partition_count().values())

for n in range(3):
node = self.redpanda.nodes[n % 3]
failure = FailureSpec(FailureSpec.FAILURE_KILL, node)
f_injector._start_func(failure.type)(failure.node)

time.sleep(self.NODE_AVAILABILITY_TIMEOUT)

# wait until movement start
self.wait_until_status(lambda s: s["status"] == "in_progress")
f_injector._stop_func(failure.type)(failure.node)

# stop empty node
f_injector._start_func(initial_failure.type)(initial_failure.node)
time.sleep(self.NODE_AVAILABILITY_TIMEOUT)

# wait until movements are cancelled
self.wait_until_ready()

node2pc = self.node2partition_count()
self.logger.info(f"partition counts after: {node2pc}")

assert sum(node2pc.values()) == total_replicas
assert self.redpanda.idx(empty_node) not in node2pc

f_injector._stop_func(initial_failure.type)(initial_failure.node)

self.run_validation()

0 comments on commit 0cfcd63

Please sign in to comment.