Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partition autobalancer planner cancel movement to unavailable nodes #5365

Merged
merged 4 commits into from
Jul 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions src/v/cluster/partition_balancer_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,25 +133,44 @@ ss::future<> partition_balancer_backend::do_tick() {
vlog(
clusterlog.info,
"violations: {} unavailable nodes, {} full nodes; planned {} "
"reassignments",
"reassignments; cancelled {} reassignments",
plan_data.violations.unavailable_nodes.size(),
plan_data.violations.full_nodes.size(),
plan_data.reassignments.size());
plan_data.reassignments.size(),
plan_data.cancellations.size());
}

_last_leader_term = _raft0->term();
_last_tick_time = ss::lowres_clock::now();
_last_violations = std::move(plan_data.violations);

if (
_topic_table.has_updates_in_progress()
|| !plan_data.reassignments.empty()) {
_topic_table.has_updates_in_progress() || !plan_data.reassignments.empty()
|| !plan_data.cancellations.empty()) {
_last_status = partition_balancer_status::in_progress;
} else if (plan_data.failed_reassignments_count > 0) {
_last_status = partition_balancer_status::stalled;
} else {
_last_status = partition_balancer_status::ready;
}

co_await ss::max_concurrent_for_each(
plan_data.cancellations, 32, [this, current_term](model::ntp& ntp) {
vlog(clusterlog.info, "cancel movement for ntp {}", ntp);
return _topics_frontend
.cancel_moving_partition_replicas(
ntp,
model::timeout_clock::now() + add_move_cmd_timeout,
current_term)
.then([ntp = std::move(ntp)](auto errc) {
vlog(
clusterlog.info,
"{} movement cancellation submitted, errc: {}",
ntp,
errc);
});
});

co_await ss::max_concurrent_for_each(
plan_data.reassignments,
32,
Expand Down
39 changes: 39 additions & 0 deletions src/v/cluster/partition_balancer_planner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,41 @@ void partition_balancer_planner::get_full_node_reassignments(
}
}

/*
* Cancel movement if new assignments contains unavailble node
* and previous replica set doesn't contain this node
*/
void partition_balancer_planner::get_unavailable_node_movement_cancellations(
std::vector<model::ntp>& cancellations,
const reallocation_request_state& rrs) {
for (const auto& update : _topic_table.updates_in_progress()) {
if (
update.second.state
!= topic_table::in_progress_state::update_requested) {
continue;
}

absl::flat_hash_set<model::node_id> previous_replicas_set;
for (const auto& r : update.second.previous_replicas) {
previous_replicas_set.insert(r.node_id);
}

auto current_assignments = _topic_table.get_partition_assignment(
update.first);
if (!current_assignments.has_value()) {
continue;
}
for (const auto& r : current_assignments->replicas) {
if (
rrs.unavailable_nodes.contains(r.node_id)
&& !previous_replicas_set.contains(r.node_id)) {
cancellations.push_back(update.first);
continue;
}
}
}
}

partition_balancer_planner::plan_data
partition_balancer_planner::plan_reassignments(
const cluster_health_report& health_report,
Expand All @@ -416,6 +451,10 @@ partition_balancer_planner::plan_reassignments(
calculate_unavailable_nodes(follower_metrics, rrs, result);
calculate_nodes_with_disk_constraints_violation(rrs, result);

if (_topic_table.has_updates_in_progress()) {
get_unavailable_node_movement_cancellations(result.cancellations, rrs);
}

if (
!_topic_table.has_updates_in_progress()
&& !result.violations.is_empty()) {
Expand Down
5 changes: 5 additions & 0 deletions src/v/cluster/partition_balancer_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class partition_balancer_planner {
struct plan_data {
partition_balancer_violations violations;
std::vector<ntp_reassignments> reassignments;
std::vector<model::ntp> cancellations;
size_t failed_reassignments_count = 0;
};

Expand Down Expand Up @@ -77,6 +78,10 @@ class partition_balancer_planner {

void get_full_node_reassignments(plan_data&, reallocation_request_state&);

void get_unavailable_node_movement_cancellations(
std::vector<model::ntp>& cancellations,
const reallocation_request_state&);

void calculate_nodes_with_disk_constraints_violation(
reallocation_request_state&, plan_data&);

Expand Down
16 changes: 16 additions & 0 deletions src/v/cluster/tests/partition_balancer_planner_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,22 @@ struct partition_balancer_planner_fixture {
}
}

cluster::move_partition_replicas_cmd make_move_partition_replicas_cmd(
model::ntp ntp, std::vector<model::broker_shard> replica_set) {
return cluster::move_partition_replicas_cmd(
std::move(ntp), std::move(replica_set));
}

void move_partition_replicas(cluster::ntp_reassignments& reassignment) {
auto cmd = make_move_partition_replicas_cmd(
reassignment.ntp,
reassignment.allocation_units.get_assignments().front().replicas);
auto res = workers.dispatcher
.apply_update(serialize_cmd(std::move(cmd)).get())
.get();
BOOST_REQUIRE_EQUAL(res, cluster::errc::success);
}

std::vector<raft::follower_metrics>
create_follower_metrics(const std::set<size_t>& unavailable_nodes = {}) {
std::vector<raft::follower_metrics> metrics;
Expand Down
55 changes: 55 additions & 0 deletions src/v/cluster/tests/partition_balancer_planner_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -616,3 +616,58 @@ FIXTURE_TEST(test_lot_of_partitions, partition_balancer_planner_fixture) {
BOOST_REQUIRE_EQUAL(node_3_counter, node_4_counter);
BOOST_REQUIRE_EQUAL(node_4_counter, movement_batch_partitions_amount / 2);
}

/*
* 4 nodes; 1 topic; 1+1 node down
* Node 3 is down after planning
* Initial
* node_0: partitions: 1; down: True; disk: unfilled;
* node_1: partitions: 1; down: False; disk: unfilled;
* node_2: partitions: 1; down: False; disk: unfilled;
* node_3: partitions: 0; down: False; disk: unfilled;
* Expected
* node_0: partitions: 1;
* node_1: partitions: 1;
* node_2: partitions: 1;
* node_3: partitions: 0;
*/
FIXTURE_TEST(test_node_cancelation, partition_balancer_planner_fixture) {
vlog(logger.debug, "test_node_cancelation");
allocator_register_nodes(3);
create_topic("topic-1", 1, 3);
allocator_register_nodes(1);

auto hr = create_health_report();

std::set<size_t> unavailable_nodes = {0};
auto fm = create_follower_metrics(unavailable_nodes);

auto planner_result = planner.plan_reassignments(hr, fm);

BOOST_REQUIRE_EQUAL(planner_result.reassignments.size(), 1);

auto ntp = planner_result.reassignments.front().ntp;

std::unordered_set<model::node_id> expected_nodes(
{model::node_id(1), model::node_id(2), model::node_id(3)});

auto new_replicas = planner_result.reassignments.front()
.allocation_units.get_assignments()
.front()
.replicas;
check_expected_assignments(new_replicas, expected_nodes);

for (auto& reassignment : planner_result.reassignments) {
move_partition_replicas(reassignment);
}

hr = create_health_report();

unavailable_nodes = {0, 3};
fm = create_follower_metrics(unavailable_nodes);

planner_result = planner.plan_reassignments(hr, fm);
BOOST_REQUIRE(planner_result.reassignments.size() == 0);
BOOST_REQUIRE(planner_result.cancellations.size() == 1);
BOOST_REQUIRE(planner_result.cancellations.front() == ntp);
}
149 changes: 107 additions & 42 deletions tests/rptest/tests/partition_balancer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,34 +27,39 @@ def __init__(self, ctx, *args, **kwargs):
ctx,
*args,
extra_rp_conf={
'partition_autobalancing_mode': 'continuous',
'partition_autobalancing_node_availability_timeout_sec':
"partition_autobalancing_mode": "continuous",
"partition_autobalancing_node_availability_timeout_sec":
self.NODE_AVAILABILITY_TIMEOUT,
'partition_autobalancing_tick_interval_ms': 5000,
'raft_learner_recovery_rate': 1_000_000,
"partition_autobalancing_tick_interval_ms": 5000,
"raft_learner_recovery_rate": 1_000_000,
},
**kwargs)
**kwargs,
)

def topic_partitions_replicas(self, topic):

def node2partition_count(self):
topics = [self.topic]
rpk = RpkTool(self.redpanda)
ret = {}
for topic in topics:
num_partitions = topic.partition_count
num_partitions = topic.partition_count

def all_partitions_ready():
try:
partitions = list(rpk.describe_topic(topic.name))
except RpkException:
return False
return (len(partitions) == num_partitions, partitions)
def all_partitions_ready():
try:
partitions = list(rpk.describe_topic(topic.name))
except RpkException:
return False
return (len(partitions) == num_partitions, partitions)

partitions = wait_until_result(
all_partitions_ready,
timeout_sec=120,
backoff_sec=1,
err_msg="failed to wait until all partitions have leaders")
return wait_until_result(
all_partitions_ready,
timeout_sec=120,
backoff_sec=1,
err_msg="failed to wait until all partitions have leaders",
)

def node2partition_count(self):
topics = [self.topic]
ret = {}
for topic in topics:
partitions = self.topic_partitions_replicas(topic)
for p in partitions:
for r in p.replicas:
ret[r] = ret.setdefault(r, 0) + 1
Expand All @@ -69,22 +74,26 @@ def check():
req_start = time.time()

status = admin.get_partition_balancer_status(timeout=1)
self.logger.info(f'partition balancer status: {status}')
self.logger.info(f"partition balancer status: {status}")

if 'seconds_since_last_tick' not in status:
if "seconds_since_last_tick" not in status:
return False
return (req_start - status['seconds_since_last_tick'] - 1 > start
and predicate(status), status)
return (
req_start - status["seconds_since_last_tick"] - 1 > start
and predicate(status),
status,
)

return wait_until_result(
check,
timeout_sec=timeout_sec,
backoff_sec=2,
err_msg="failed to wait until status condition")
err_msg="failed to wait until status condition",
)

def wait_until_ready(self, timeout_sec=120):
return self.wait_until_status(
lambda status: status['status'] == 'ready',
lambda status: status["status"] == "ready",
timeout_sec=timeout_sec)

@cluster(num_nodes=7, log_allow_list=CHAOS_LOG_ALLOW_LIST)
Expand All @@ -106,8 +115,9 @@ def test_unavailable_nodes(self):
for n in range(10):
node = self.redpanda.nodes[n % 5]
failure_types = [
FailureSpec.FAILURE_KILL, FailureSpec.FAILURE_TERMINATE,
FailureSpec.FAILURE_SUSPEND
FailureSpec.FAILURE_KILL,
FailureSpec.FAILURE_TERMINATE,
FailureSpec.FAILURE_SUSPEND,
]
failure = FailureSpec(random.choice(failure_types), node)
f_injector._start_func(failure.type)(failure.node)
Expand All @@ -118,22 +128,77 @@ def test_unavailable_nodes(self):

time.sleep(self.NODE_AVAILABILITY_TIMEOUT)

# TODO: enable when cancellation gets implemented
# wait_for_quiescent_state = random.random() < 0.5
wait_for_quiescent_state = True
self.wait_until_ready()

if wait_for_quiescent_state:
self.wait_until_ready()
ztlpn marked this conversation as resolved.
Show resolved Hide resolved
node2pc = self.node2partition_count()
self.logger.info(f"partition counts after: {node2pc}")

node2pc = self.node2partition_count()
self.logger.info(f'partition counts after: {node2pc}')

assert sum(node2pc.values()) == total_replicas
assert self.redpanda.idx(node) not in node2pc
else:
self.wait_until_status(lambda s: s['status'] == 'in_progress'
or s['status'] == 'ready')
assert sum(node2pc.values()) == total_replicas
assert self.redpanda.idx(node) not in node2pc

prev_failure = failure

self.run_validation()

def _throttle_recovery(self, new_value):
self.redpanda.set_cluster_config(
{"raft_learner_recovery_rate": str(new_value)})

@cluster(num_nodes=6, log_allow_list=CHAOS_LOG_ALLOW_LIST)
def test_movement_cancellations(self):
self.start_redpanda(num_nodes=4)

self.topic = TopicSpec(partition_count=random.randint(20, 30))

f_injector = FailureInjector(self.redpanda)

self.client().create_topic(self.topic)

self.start_producer(1)
self.start_consumer(1)
self.await_startup()

empty_node = self.redpanda.nodes[-1]

# clean node
initial_failure = FailureSpec(FailureSpec.FAILURE_KILL, empty_node)
f_injector._start_func(initial_failure.type)(initial_failure.node)
time.sleep(self.NODE_AVAILABILITY_TIMEOUT)
self.wait_until_ready()
f_injector._stop_func(initial_failure.type)(initial_failure.node)
node2pc = self.node2partition_count()
self.logger.info(f"partition counts after: {node2pc}")
assert self.redpanda.idx(empty_node) not in node2pc

# throttle recovery to prevent partition move from finishing
self._throttle_recovery(10)

total_replicas = sum(self.node2partition_count().values())

for n in range(3):
node = self.redpanda.nodes[n % 3]
failure = FailureSpec(FailureSpec.FAILURE_KILL, node)
f_injector._start_func(failure.type)(failure.node)

time.sleep(self.NODE_AVAILABILITY_TIMEOUT)

# wait until movement start
self.wait_until_status(lambda s: s["status"] == "in_progress")
f_injector._stop_func(failure.type)(failure.node)

# stop empty node
f_injector._start_func(initial_failure.type)(initial_failure.node)
time.sleep(self.NODE_AVAILABILITY_TIMEOUT)

# wait until movements are cancelled
self.wait_until_ready()

node2pc = self.node2partition_count()
self.logger.info(f"partition counts after: {node2pc}")

assert sum(node2pc.values()) == total_replicas
assert self.redpanda.idx(empty_node) not in node2pc

f_injector._stop_func(initial_failure.type)(initial_failure.node)

self.run_validation()