Skip to content

Commit

Permalink
Merge pull request #5365 from ZeDRoman/partition-autobalancer-planner
Browse files Browse the repository at this point in the history
Partition autobalancer planner cancel movement to unavailable nodes
  • Loading branch information
ZeDRoman committed Jul 19, 2022
2 parents db2761d + 141496f commit 66252c6
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 46 deletions.
27 changes: 23 additions & 4 deletions src/v/cluster/partition_balancer_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,25 +133,44 @@ ss::future<> partition_balancer_backend::do_tick() {
vlog(
clusterlog.info,
"violations: {} unavailable nodes, {} full nodes; planned {} "
"reassignments",
"reassignments; cancelled {} reassignments",
plan_data.violations.unavailable_nodes.size(),
plan_data.violations.full_nodes.size(),
plan_data.reassignments.size());
plan_data.reassignments.size(),
plan_data.cancellations.size());
}

_last_leader_term = _raft0->term();
_last_tick_time = ss::lowres_clock::now();
_last_violations = std::move(plan_data.violations);

if (
_topic_table.has_updates_in_progress()
|| !plan_data.reassignments.empty()) {
_topic_table.has_updates_in_progress() || !plan_data.reassignments.empty()
|| !plan_data.cancellations.empty()) {
_last_status = partition_balancer_status::in_progress;
} else if (plan_data.failed_reassignments_count > 0) {
_last_status = partition_balancer_status::stalled;
} else {
_last_status = partition_balancer_status::ready;
}

co_await ss::max_concurrent_for_each(
plan_data.cancellations, 32, [this, current_term](model::ntp& ntp) {
vlog(clusterlog.info, "cancel movement for ntp {}", ntp);
return _topics_frontend
.cancel_moving_partition_replicas(
ntp,
model::timeout_clock::now() + add_move_cmd_timeout,
current_term)
.then([ntp = std::move(ntp)](auto errc) {
vlog(
clusterlog.info,
"{} movement cancellation submitted, errc: {}",
ntp,
errc);
});
});

co_await ss::max_concurrent_for_each(
plan_data.reassignments,
32,
Expand Down
39 changes: 39 additions & 0 deletions src/v/cluster/partition_balancer_planner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,41 @@ void partition_balancer_planner::get_full_node_reassignments(
}
}

/*
* Cancel movement if new assignments contains unavailble node
* and previous replica set doesn't contain this node
*/
void partition_balancer_planner::get_unavailable_node_movement_cancellations(
std::vector<model::ntp>& cancellations,
const reallocation_request_state& rrs) {
for (const auto& update : _topic_table.updates_in_progress()) {
if (
update.second.state
!= topic_table::in_progress_state::update_requested) {
continue;
}

absl::flat_hash_set<model::node_id> previous_replicas_set;
for (const auto& r : update.second.previous_replicas) {
previous_replicas_set.insert(r.node_id);
}

auto current_assignments = _topic_table.get_partition_assignment(
update.first);
if (!current_assignments.has_value()) {
continue;
}
for (const auto& r : current_assignments->replicas) {
if (
rrs.unavailable_nodes.contains(r.node_id)
&& !previous_replicas_set.contains(r.node_id)) {
cancellations.push_back(update.first);
continue;
}
}
}
}

partition_balancer_planner::plan_data
partition_balancer_planner::plan_reassignments(
const cluster_health_report& health_report,
Expand All @@ -416,6 +451,10 @@ partition_balancer_planner::plan_reassignments(
calculate_unavailable_nodes(follower_metrics, rrs, result);
calculate_nodes_with_disk_constraints_violation(rrs, result);

if (_topic_table.has_updates_in_progress()) {
get_unavailable_node_movement_cancellations(result.cancellations, rrs);
}

if (
!_topic_table.has_updates_in_progress()
&& !result.violations.is_empty()) {
Expand Down
5 changes: 5 additions & 0 deletions src/v/cluster/partition_balancer_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class partition_balancer_planner {
struct plan_data {
partition_balancer_violations violations;
std::vector<ntp_reassignments> reassignments;
std::vector<model::ntp> cancellations;
size_t failed_reassignments_count = 0;
};

Expand Down Expand Up @@ -77,6 +78,10 @@ class partition_balancer_planner {

void get_full_node_reassignments(plan_data&, reallocation_request_state&);

void get_unavailable_node_movement_cancellations(
std::vector<model::ntp>& cancellations,
const reallocation_request_state&);

void calculate_nodes_with_disk_constraints_violation(
reallocation_request_state&, plan_data&);

Expand Down
16 changes: 16 additions & 0 deletions src/v/cluster/tests/partition_balancer_planner_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,22 @@ struct partition_balancer_planner_fixture {
}
}

cluster::move_partition_replicas_cmd make_move_partition_replicas_cmd(
model::ntp ntp, std::vector<model::broker_shard> replica_set) {
return cluster::move_partition_replicas_cmd(
std::move(ntp), std::move(replica_set));
}

void move_partition_replicas(cluster::ntp_reassignments& reassignment) {
auto cmd = make_move_partition_replicas_cmd(
reassignment.ntp,
reassignment.allocation_units.get_assignments().front().replicas);
auto res = workers.dispatcher
.apply_update(serialize_cmd(std::move(cmd)).get())
.get();
BOOST_REQUIRE_EQUAL(res, cluster::errc::success);
}

std::vector<raft::follower_metrics>
create_follower_metrics(const std::set<size_t>& unavailable_nodes = {}) {
std::vector<raft::follower_metrics> metrics;
Expand Down
55 changes: 55 additions & 0 deletions src/v/cluster/tests/partition_balancer_planner_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -616,3 +616,58 @@ FIXTURE_TEST(test_lot_of_partitions, partition_balancer_planner_fixture) {
BOOST_REQUIRE_EQUAL(node_3_counter, node_4_counter);
BOOST_REQUIRE_EQUAL(node_4_counter, movement_batch_partitions_amount / 2);
}

/*
* 4 nodes; 1 topic; 1+1 node down
* Node 3 is down after planning
* Initial
* node_0: partitions: 1; down: True; disk: unfilled;
* node_1: partitions: 1; down: False; disk: unfilled;
* node_2: partitions: 1; down: False; disk: unfilled;
* node_3: partitions: 0; down: False; disk: unfilled;
* Expected
* node_0: partitions: 1;
* node_1: partitions: 1;
* node_2: partitions: 1;
* node_3: partitions: 0;
*/
FIXTURE_TEST(test_node_cancelation, partition_balancer_planner_fixture) {
vlog(logger.debug, "test_node_cancelation");
allocator_register_nodes(3);
create_topic("topic-1", 1, 3);
allocator_register_nodes(1);

auto hr = create_health_report();

std::set<size_t> unavailable_nodes = {0};
auto fm = create_follower_metrics(unavailable_nodes);

auto planner_result = planner.plan_reassignments(hr, fm);

BOOST_REQUIRE_EQUAL(planner_result.reassignments.size(), 1);

auto ntp = planner_result.reassignments.front().ntp;

std::unordered_set<model::node_id> expected_nodes(
{model::node_id(1), model::node_id(2), model::node_id(3)});

auto new_replicas = planner_result.reassignments.front()
.allocation_units.get_assignments()
.front()
.replicas;
check_expected_assignments(new_replicas, expected_nodes);

for (auto& reassignment : planner_result.reassignments) {
move_partition_replicas(reassignment);
}

hr = create_health_report();

unavailable_nodes = {0, 3};
fm = create_follower_metrics(unavailable_nodes);

planner_result = planner.plan_reassignments(hr, fm);
BOOST_REQUIRE(planner_result.reassignments.size() == 0);
BOOST_REQUIRE(planner_result.cancellations.size() == 1);
BOOST_REQUIRE(planner_result.cancellations.front() == ntp);
}
149 changes: 107 additions & 42 deletions tests/rptest/tests/partition_balancer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,34 +27,39 @@ def __init__(self, ctx, *args, **kwargs):
ctx,
*args,
extra_rp_conf={
'partition_autobalancing_mode': 'continuous',
'partition_autobalancing_node_availability_timeout_sec':
"partition_autobalancing_mode": "continuous",
"partition_autobalancing_node_availability_timeout_sec":
self.NODE_AVAILABILITY_TIMEOUT,
'partition_autobalancing_tick_interval_ms': 5000,
'raft_learner_recovery_rate': 1_000_000,
"partition_autobalancing_tick_interval_ms": 5000,
"raft_learner_recovery_rate": 1_000_000,
},
**kwargs)
**kwargs,
)

def topic_partitions_replicas(self, topic):

def node2partition_count(self):
topics = [self.topic]
rpk = RpkTool(self.redpanda)
ret = {}
for topic in topics:
num_partitions = topic.partition_count
num_partitions = topic.partition_count

def all_partitions_ready():
try:
partitions = list(rpk.describe_topic(topic.name))
except RpkException:
return False
return (len(partitions) == num_partitions, partitions)
def all_partitions_ready():
try:
partitions = list(rpk.describe_topic(topic.name))
except RpkException:
return False
return (len(partitions) == num_partitions, partitions)

partitions = wait_until_result(
all_partitions_ready,
timeout_sec=120,
backoff_sec=1,
err_msg="failed to wait until all partitions have leaders")
return wait_until_result(
all_partitions_ready,
timeout_sec=120,
backoff_sec=1,
err_msg="failed to wait until all partitions have leaders",
)

def node2partition_count(self):
topics = [self.topic]
ret = {}
for topic in topics:
partitions = self.topic_partitions_replicas(topic)
for p in partitions:
for r in p.replicas:
ret[r] = ret.setdefault(r, 0) + 1
Expand All @@ -69,22 +74,26 @@ def check():
req_start = time.time()

status = admin.get_partition_balancer_status(timeout=1)
self.logger.info(f'partition balancer status: {status}')
self.logger.info(f"partition balancer status: {status}")

if 'seconds_since_last_tick' not in status:
if "seconds_since_last_tick" not in status:
return False
return (req_start - status['seconds_since_last_tick'] - 1 > start
and predicate(status), status)
return (
req_start - status["seconds_since_last_tick"] - 1 > start
and predicate(status),
status,
)

return wait_until_result(
check,
timeout_sec=timeout_sec,
backoff_sec=2,
err_msg="failed to wait until status condition")
err_msg="failed to wait until status condition",
)

def wait_until_ready(self, timeout_sec=120):
return self.wait_until_status(
lambda status: status['status'] == 'ready',
lambda status: status["status"] == "ready",
timeout_sec=timeout_sec)

@cluster(num_nodes=7, log_allow_list=CHAOS_LOG_ALLOW_LIST)
Expand All @@ -106,8 +115,9 @@ def test_unavailable_nodes(self):
for n in range(10):
node = self.redpanda.nodes[n % 5]
failure_types = [
FailureSpec.FAILURE_KILL, FailureSpec.FAILURE_TERMINATE,
FailureSpec.FAILURE_SUSPEND
FailureSpec.FAILURE_KILL,
FailureSpec.FAILURE_TERMINATE,
FailureSpec.FAILURE_SUSPEND,
]
failure = FailureSpec(random.choice(failure_types), node)
f_injector._start_func(failure.type)(failure.node)
Expand All @@ -118,22 +128,77 @@ def test_unavailable_nodes(self):

time.sleep(self.NODE_AVAILABILITY_TIMEOUT)

# TODO: enable when cancellation gets implemented
# wait_for_quiescent_state = random.random() < 0.5
wait_for_quiescent_state = True
self.wait_until_ready()

if wait_for_quiescent_state:
self.wait_until_ready()
node2pc = self.node2partition_count()
self.logger.info(f"partition counts after: {node2pc}")

node2pc = self.node2partition_count()
self.logger.info(f'partition counts after: {node2pc}')

assert sum(node2pc.values()) == total_replicas
assert self.redpanda.idx(node) not in node2pc
else:
self.wait_until_status(lambda s: s['status'] == 'in_progress'
or s['status'] == 'ready')
assert sum(node2pc.values()) == total_replicas
assert self.redpanda.idx(node) not in node2pc

prev_failure = failure

self.run_validation()

def _throttle_recovery(self, new_value):
self.redpanda.set_cluster_config(
{"raft_learner_recovery_rate": str(new_value)})

@cluster(num_nodes=6, log_allow_list=CHAOS_LOG_ALLOW_LIST)
def test_movement_cancellations(self):
self.start_redpanda(num_nodes=4)

self.topic = TopicSpec(partition_count=random.randint(20, 30))

f_injector = FailureInjector(self.redpanda)

self.client().create_topic(self.topic)

self.start_producer(1)
self.start_consumer(1)
self.await_startup()

empty_node = self.redpanda.nodes[-1]

# clean node
initial_failure = FailureSpec(FailureSpec.FAILURE_KILL, empty_node)
f_injector._start_func(initial_failure.type)(initial_failure.node)
time.sleep(self.NODE_AVAILABILITY_TIMEOUT)
self.wait_until_ready()
f_injector._stop_func(initial_failure.type)(initial_failure.node)
node2pc = self.node2partition_count()
self.logger.info(f"partition counts after: {node2pc}")
assert self.redpanda.idx(empty_node) not in node2pc

# throttle recovery to prevent partition move from finishing
self._throttle_recovery(10)

total_replicas = sum(self.node2partition_count().values())

for n in range(3):
node = self.redpanda.nodes[n % 3]
failure = FailureSpec(FailureSpec.FAILURE_KILL, node)
f_injector._start_func(failure.type)(failure.node)

time.sleep(self.NODE_AVAILABILITY_TIMEOUT)

# wait until movement start
self.wait_until_status(lambda s: s["status"] == "in_progress")
f_injector._stop_func(failure.type)(failure.node)

# stop empty node
f_injector._start_func(initial_failure.type)(initial_failure.node)
time.sleep(self.NODE_AVAILABILITY_TIMEOUT)

# wait until movements are cancelled
self.wait_until_ready()

node2pc = self.node2partition_count()
self.logger.info(f"partition counts after: {node2pc}")

assert sum(node2pc.values()) == total_replicas
assert self.redpanda.idx(empty_node) not in node2pc

f_injector._stop_func(initial_failure.type)(initial_failure.node)

self.run_validation()

0 comments on commit 66252c6

Please sign in to comment.