Skip to content

Commit

Permalink
Merge pull request #6032 from vbotbuildovich/backport-fixes-to-v22.2.…
Browse files Browse the repository at this point in the history
…x-629

[v22.2.x] More robust waiting for the quiescent state in partition balancer tests
  • Loading branch information
ztlpn committed Aug 15, 2022
2 parents 5c2dcea + 7ae38b0 commit aab41ac
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 58 deletions.
4 changes: 4 additions & 0 deletions src/v/redpanda/admin_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2851,6 +2851,10 @@ void admin_server::register_cluster_routes() {
std::get<cluster::partition_balancer_overview_reply>(result));
} else if (std::holds_alternative<model::node_id>(result)) {
auto node_id = std::get<model::node_id>(result);
vlog(
logger.debug,
"proxying the partition_balancer_overview call to node {}",
node_id);
auto rpc_result
= co_await _connection_cache.local()
.with_node_client<
Expand Down
116 changes: 58 additions & 58 deletions tests/rptest/tests/partition_balancer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,13 @@


class PartitionBalancerTest(EndToEndTest):
NODE_AVAILABILITY_TIMEOUT = 10

def __init__(self, ctx, *args, **kwargs):
super(PartitionBalancerTest, self).__init__(
ctx,
*args,
extra_rp_conf={
"partition_autobalancing_mode": "continuous",
"partition_autobalancing_node_availability_timeout_sec":
self.NODE_AVAILABILITY_TIMEOUT,
"partition_autobalancing_node_availability_timeout_sec": 10,
"partition_autobalancing_tick_interval_ms": 5000,
"raft_learner_recovery_rate": 1_000_000,
},
Expand Down Expand Up @@ -79,34 +76,16 @@ def node2partition_count(self):

return ret

def wait_node_availability_timeout(self, unavailable_node):
# we need to first wait for the controller to stabilize,
# because each time controller changes, unavailability timer
# starts anew.
unavailable_id = self.redpanda.idx(unavailable_node)
admin = Admin(self.redpanda)
hosts = [n.name for n in self.redpanda.nodes if n != unavailable_node]
controller_id = admin.await_stable_leader(
namespace="redpanda",
topic="controller",
partition=0,
timeout_s=60,
hosts=hosts,
check=lambda node_id: node_id != unavailable_id,
)
self.logger.info(
f"controller stable: {self.redpanda.get_node(controller_id).name}")

time.sleep(self.NODE_AVAILABILITY_TIMEOUT)

def wait_until_status(self, predicate, timeout_sec=120):
admin = Admin(self.redpanda)
# We may get a 504 if we proxy a status request to a suspended node.
# It is okay to retry (the controller leader will get re-elected in the meantime).
admin = Admin(self.redpanda, retry_codes=[503, 504])
start = time.time()

def check():
req_start = time.time()

status = admin.get_partition_balancer_status(timeout=1)
status = admin.get_partition_balancer_status(timeout=10)
self.logger.info(f"partition balancer status: {status}")

if "seconds_since_last_tick" not in status:
Expand All @@ -124,10 +103,23 @@ def check():
err_msg="failed to wait until status condition",
)

def wait_until_ready(self, timeout_sec=120):
return self.wait_until_status(
lambda status: status["status"] == "ready",
timeout_sec=timeout_sec)
def wait_until_ready(self,
timeout_sec=120,
expected_unavailable_node=None):
if expected_unavailable_node:
node_id = self.redpanda.idx(expected_unavailable_node)
self.logger.info(f"waiting for quiescent state, "
f"expected unavailable node: {node_id}")
else:
node_id = None
self.logger.info(f"waiting for quiescent state")

def predicate(status):
return (status["status"] == "ready" and
((node_id is None) or (node_id in status["violations"].get(
"unavailable_nodes", []))))

return self.wait_until_status(predicate, timeout_sec=timeout_sec)

def check_no_replicas_on_node(self, node):
node2pc = self.node2partition_count()
Expand Down Expand Up @@ -190,19 +182,6 @@ def make_unavailable(self,
self.cur_failure = FailureSpec(random.choice(failure_types), node)
self.f_injector._start_func(self.cur_failure.type)(
self.cur_failure.node)
self.test.wait_node_availability_timeout(node)

def step_and_wait_for_in_progress(self, node):
self.make_unavailable(node)
self.test.logger.info(f"waiting for partition balancer to kick in")
self.test.wait_until_status(lambda s: s["status"] == "in_progress"
or s["status"] == "ready")

def step_and_wait_for_ready(self, node, wait_for_node_status=False):
self.make_unavailable(node,
wait_for_previous_node=wait_for_node_status)
self.test.logger.info(f"waiting for quiescent state")
self.test.wait_until_ready()

@cluster(num_nodes=7, log_allow_list=CHAOS_LOG_ALLOW_LIST)
def test_unavailable_nodes(self):
Expand All @@ -218,10 +197,30 @@ def test_unavailable_nodes(self):
with self.NodeStopper(self) as ns:
for n in range(7):
node = self.redpanda.nodes[n % len(self.redpanda.nodes)]
ns.make_unavailable(node)

if n in {1, 3, 4}:
ns.step_and_wait_for_in_progress(node)
# For these nodes we don't wait for the quiescent state,
# but just for the partition movement to start and then
# move to the next node.

self.logger.info(
f"waiting for partition balancer to kick in")

node_id = self.redpanda.idx(node)

def predicate(s):
if s["status"] == "in_progress":
return True
elif s["status"] == "ready":
return node_id in s["violations"].get(
"unavailable_nodes", [])
else:
return False

self.wait_until_status(predicate)
else:
ns.step_and_wait_for_ready(node)
self.wait_until_ready(expected_unavailable_node=node)
self.check_no_replicas_on_node(node)

# Restore the system to a fully healthy state before validation:
Expand Down Expand Up @@ -251,7 +250,7 @@ def test_movement_cancellations(self):
# clean node
ns.make_unavailable(empty_node,
failure_types=[FailureSpec.FAILURE_KILL])
self.wait_until_ready()
self.wait_until_ready(expected_unavailable_node=empty_node)
ns.make_available()
self.check_no_replicas_on_node(empty_node)

Expand All @@ -267,15 +266,14 @@ def test_movement_cancellations(self):
self.wait_until_status(lambda s: s["status"] == "in_progress")
ns.make_available()

with self.NodeStopper(self) as ns2:
# stop empty node
ns.make_unavailable(
empty_node, failure_types=[FailureSpec.FAILURE_KILL])
# stop empty node
ns.make_unavailable(empty_node,
failure_types=[FailureSpec.FAILURE_KILL])

# wait until movements are cancelled
self.wait_until_ready()
# wait until movements are cancelled
self.wait_until_ready(expected_unavailable_node=empty_node)

self.check_no_replicas_on_node(empty_node)
self.check_no_replicas_on_node(empty_node)

self.run_validation(consumer_timeout_sec=CONSUMER_TIMEOUT)

Expand Down Expand Up @@ -312,7 +310,8 @@ def check_rack_placement():
with self.NodeStopper(self) as ns:
for n in range(7):
node = self.redpanda.nodes[n % len(self.redpanda.nodes)]
ns.step_and_wait_for_ready(node, wait_for_node_status=True)
ns.make_unavailable(node, wait_for_previous_node=True)
self.wait_until_ready(expected_unavailable_node=node)
self.check_no_replicas_on_node(node)
check_rack_placement()

Expand Down Expand Up @@ -364,7 +363,7 @@ def get_node2partition_count():
ns.make_unavailable(node)
try:
admin_fuzz.pause()
self.wait_until_ready()
self.wait_until_ready(expected_unavailable_node=node)
node2partition_count = get_node2partition_count()
assert self.redpanda.idx(node) not in node2partition_count
finally:
Expand Down Expand Up @@ -543,9 +542,10 @@ def entered_maintenance_mode(node):
admin.patch_cluster_config(
{"raft_learner_recovery_rate": 1_000_000})

self.wait_until_status(lambda s: s["status"] == "ready")

if not kill_same_node:
if kill_same_node:
self.wait_until_ready()
else:
self.wait_until_ready(expected_unavailable_node=to_kill)
self.check_no_replicas_on_node(to_kill)

self.run_validation(enable_idempotence=False,
Expand Down

0 comments on commit aab41ac

Please sign in to comment.