Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v22.2.x] More robust waiting for the quiescent state in partition balancer tests #6032

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/v/redpanda/admin_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2851,6 +2851,10 @@ void admin_server::register_cluster_routes() {
std::get<cluster::partition_balancer_overview_reply>(result));
} else if (std::holds_alternative<model::node_id>(result)) {
auto node_id = std::get<model::node_id>(result);
vlog(
logger.debug,
"proxying the partition_balancer_overview call to node {}",
node_id);
auto rpc_result
= co_await _connection_cache.local()
.with_node_client<
Expand Down
116 changes: 58 additions & 58 deletions tests/rptest/tests/partition_balancer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,13 @@


class PartitionBalancerTest(EndToEndTest):
NODE_AVAILABILITY_TIMEOUT = 10

def __init__(self, ctx, *args, **kwargs):
super(PartitionBalancerTest, self).__init__(
ctx,
*args,
extra_rp_conf={
"partition_autobalancing_mode": "continuous",
"partition_autobalancing_node_availability_timeout_sec":
self.NODE_AVAILABILITY_TIMEOUT,
"partition_autobalancing_node_availability_timeout_sec": 10,
"partition_autobalancing_tick_interval_ms": 5000,
"raft_learner_recovery_rate": 1_000_000,
},
Expand Down Expand Up @@ -79,34 +76,16 @@ def node2partition_count(self):

return ret

def wait_node_availability_timeout(self, unavailable_node):
# we need to first wait for the controller to stabilize,
# because each time controller changes, unavailability timer
# starts anew.
unavailable_id = self.redpanda.idx(unavailable_node)
admin = Admin(self.redpanda)
hosts = [n.name for n in self.redpanda.nodes if n != unavailable_node]
controller_id = admin.await_stable_leader(
namespace="redpanda",
topic="controller",
partition=0,
timeout_s=60,
hosts=hosts,
check=lambda node_id: node_id != unavailable_id,
)
self.logger.info(
f"controller stable: {self.redpanda.get_node(controller_id).name}")

time.sleep(self.NODE_AVAILABILITY_TIMEOUT)

def wait_until_status(self, predicate, timeout_sec=120):
admin = Admin(self.redpanda)
# We may get a 504 if we proxy a status request to a suspended node.
# It is okay to retry (the controller leader will get re-elected in the meantime).
admin = Admin(self.redpanda, retry_codes=[503, 504])
start = time.time()

def check():
req_start = time.time()

status = admin.get_partition_balancer_status(timeout=1)
status = admin.get_partition_balancer_status(timeout=10)
self.logger.info(f"partition balancer status: {status}")

if "seconds_since_last_tick" not in status:
Expand All @@ -124,10 +103,23 @@ def check():
err_msg="failed to wait until status condition",
)

def wait_until_ready(self, timeout_sec=120):
return self.wait_until_status(
lambda status: status["status"] == "ready",
timeout_sec=timeout_sec)
def wait_until_ready(self,
timeout_sec=120,
expected_unavailable_node=None):
if expected_unavailable_node:
node_id = self.redpanda.idx(expected_unavailable_node)
self.logger.info(f"waiting for quiescent state, "
f"expected unavailable node: {node_id}")
else:
node_id = None
self.logger.info(f"waiting for quiescent state")

def predicate(status):
return (status["status"] == "ready" and
((node_id is None) or (node_id in status["violations"].get(
"unavailable_nodes", []))))

return self.wait_until_status(predicate, timeout_sec=timeout_sec)

def check_no_replicas_on_node(self, node):
node2pc = self.node2partition_count()
Expand Down Expand Up @@ -190,19 +182,6 @@ def make_unavailable(self,
self.cur_failure = FailureSpec(random.choice(failure_types), node)
self.f_injector._start_func(self.cur_failure.type)(
self.cur_failure.node)
self.test.wait_node_availability_timeout(node)

def step_and_wait_for_in_progress(self, node):
self.make_unavailable(node)
self.test.logger.info(f"waiting for partition balancer to kick in")
self.test.wait_until_status(lambda s: s["status"] == "in_progress"
or s["status"] == "ready")

def step_and_wait_for_ready(self, node, wait_for_node_status=False):
self.make_unavailable(node,
wait_for_previous_node=wait_for_node_status)
self.test.logger.info(f"waiting for quiescent state")
self.test.wait_until_ready()

@cluster(num_nodes=7, log_allow_list=CHAOS_LOG_ALLOW_LIST)
def test_unavailable_nodes(self):
Expand All @@ -218,10 +197,30 @@ def test_unavailable_nodes(self):
with self.NodeStopper(self) as ns:
for n in range(7):
node = self.redpanda.nodes[n % len(self.redpanda.nodes)]
ns.make_unavailable(node)

if n in {1, 3, 4}:
ns.step_and_wait_for_in_progress(node)
# For these nodes we don't wait for the quiescent state,
# but just for the partition movement to start and then
# move to the next node.

self.logger.info(
f"waiting for partition balancer to kick in")

node_id = self.redpanda.idx(node)

def predicate(s):
if s["status"] == "in_progress":
return True
elif s["status"] == "ready":
return node_id in s["violations"].get(
"unavailable_nodes", [])
else:
return False

self.wait_until_status(predicate)
else:
ns.step_and_wait_for_ready(node)
self.wait_until_ready(expected_unavailable_node=node)
self.check_no_replicas_on_node(node)

# Restore the system to a fully healthy state before validation:
Expand Down Expand Up @@ -251,7 +250,7 @@ def test_movement_cancellations(self):
# clean node
ns.make_unavailable(empty_node,
failure_types=[FailureSpec.FAILURE_KILL])
self.wait_until_ready()
self.wait_until_ready(expected_unavailable_node=empty_node)
ns.make_available()
self.check_no_replicas_on_node(empty_node)

Expand All @@ -267,15 +266,14 @@ def test_movement_cancellations(self):
self.wait_until_status(lambda s: s["status"] == "in_progress")
ns.make_available()

with self.NodeStopper(self) as ns2:
# stop empty node
ns.make_unavailable(
empty_node, failure_types=[FailureSpec.FAILURE_KILL])
# stop empty node
ns.make_unavailable(empty_node,
failure_types=[FailureSpec.FAILURE_KILL])

# wait until movements are cancelled
self.wait_until_ready()
# wait until movements are cancelled
self.wait_until_ready(expected_unavailable_node=empty_node)

self.check_no_replicas_on_node(empty_node)
self.check_no_replicas_on_node(empty_node)

self.run_validation(consumer_timeout_sec=CONSUMER_TIMEOUT)

Expand Down Expand Up @@ -312,7 +310,8 @@ def check_rack_placement():
with self.NodeStopper(self) as ns:
for n in range(7):
node = self.redpanda.nodes[n % len(self.redpanda.nodes)]
ns.step_and_wait_for_ready(node, wait_for_node_status=True)
ns.make_unavailable(node, wait_for_previous_node=True)
self.wait_until_ready(expected_unavailable_node=node)
self.check_no_replicas_on_node(node)
check_rack_placement()

Expand Down Expand Up @@ -364,7 +363,7 @@ def get_node2partition_count():
ns.make_unavailable(node)
try:
admin_fuzz.pause()
self.wait_until_ready()
self.wait_until_ready(expected_unavailable_node=node)
node2partition_count = get_node2partition_count()
assert self.redpanda.idx(node) not in node2partition_count
finally:
Expand Down Expand Up @@ -543,9 +542,10 @@ def entered_maintenance_mode(node):
admin.patch_cluster_config(
{"raft_learner_recovery_rate": 1_000_000})

self.wait_until_status(lambda s: s["status"] == "ready")

if not kill_same_node:
if kill_same_node:
self.wait_until_ready()
else:
self.wait_until_ready(expected_unavailable_node=to_kill)
self.check_no_replicas_on_node(to_kill)

self.run_validation(enable_idempotence=False,
Expand Down