Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v24.1.x] Mute just restarted nodes in leader_balancer #19832

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 58 additions & 11 deletions src/v/cluster/scheduling/leader_balancer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,20 @@ ss::future<ss::stop_iteration> leader_balancer::balance() {
_need_controller_refresh = false;
}

auto group_replicas = co_await collect_group_replicas_from_health_report();
auto health_report = co_await _health_monitor.get_cluster_health(
cluster_report_filter{},
force_refresh::no,
model::timeout_clock::now() + 5s);
if (!health_report) {
vlog(
clusterlog.warn,
"couldn't get health report: {}",
health_report.error());
co_return ss::stop_iteration::no;
}

auto group_replicas = co_await collect_group_replicas_from_health_report(
health_report.value());
auto index = build_index(std::move(group_replicas));
auto group_id_to_topic = build_group_id_to_topic_rev();

Expand All @@ -412,6 +425,8 @@ ss::future<ss::stop_iteration> leader_balancer::balance() {
co_return ss::stop_iteration::no;
}

auto muted_nodes = collect_muted_nodes(health_report.value());

auto mode = config::shard_local_cfg().leader_balancer_mode();
std::unique_ptr<leader_balancer_strategy> strategy;

Expand All @@ -422,12 +437,12 @@ ss::future<ss::stop_iteration> leader_balancer::balance() {
leader_balancer_types::random_hill_climbing_strategy>(
std::move(index),
std::move(group_id_to_topic),
leader_balancer_types::muted_index{muted_nodes(), {}});
leader_balancer_types::muted_index{std::move(muted_nodes), {}});
break;
case model::leader_balancer_mode::greedy_balanced_shards:
vlog(clusterlog.debug, "using greedy_balanced_shards");
strategy = std::make_unique<greedy_balanced_shards>(
std::move(index), muted_nodes());
std::move(index), std::move(muted_nodes));
break;
default:
vlog(clusterlog.error, "unexpected mode value: {}", mode);
Expand Down Expand Up @@ -564,7 +579,8 @@ ss::future<ss::stop_iteration> leader_balancer::balance() {
co_return ss::stop_iteration::no;
}

absl::flat_hash_set<model::node_id> leader_balancer::muted_nodes() const {
absl::flat_hash_set<model::node_id>
leader_balancer::collect_muted_nodes(const cluster_health_report& hr) {
absl::flat_hash_set<model::node_id> nodes;
const auto now = raft::clock_type::now();
for (const auto& follower : _raft0->get_follower_metrics()) {
Expand All @@ -578,6 +594,7 @@ absl::flat_hash_set<model::node_id> leader_balancer::muted_nodes() const {
std::chrono::duration_cast<std::chrono::milliseconds>(
last_hbeat_age)
.count());
continue;
}

if (auto nm = _members.get_node_metadata_ref(follower.id); nm) {
Expand All @@ -590,8 +607,42 @@ absl::flat_hash_set<model::node_id> leader_balancer::muted_nodes() const {
"Leadership rebalancer muting node {} in a maintenance "
"state.",
follower.id);
continue;
}
}

auto report_it = std::find_if(
hr.node_reports.begin(),
hr.node_reports.end(),
[id = follower.id](const node_health_report_ptr& n) {
return n->id == id;
});
if (report_it == hr.node_reports.end()) {
nodes.insert(follower.id);
vlog(
clusterlog.info,
"Leadership rebalancer muting node {} without a health report.",
follower.id);
continue;
}

auto uptime = (*report_it)->local_state.uptime;
if (uptime < leader_activation_delay) {
nodes.insert(follower.id);
vlog(
clusterlog.info,
"Leadership rebalancer muting node {} that "
"just restarted ({}s. ago)",
follower.id,
uptime / 1s);

// schedule a tick soon so that we can rebalance to the restarted
// node.
_timer.cancel();
_timer.arm(leader_activation_delay);

continue;
}
}
return nodes;
}
Expand Down Expand Up @@ -632,20 +683,16 @@ leader_balancer::build_group_id_to_topic_rev() const {
/// Returns nullopt if shard info from health report can not yet be used. In
/// this case callers have to rely on shard info from topic table.
ss::future<std::optional<leader_balancer::group_replicas_t>>
leader_balancer::collect_group_replicas_from_health_report() {
leader_balancer::collect_group_replicas_from_health_report(
const cluster_health_report& hr) {
if (!_feature_table.is_active(
features::feature::partition_shard_in_health_report)) {
co_return std::nullopt;
}

auto hr = co_await _health_monitor.get_cluster_health(
cluster_report_filter{},
force_refresh::no,
model::timeout_clock::now() + 5s);

group_replicas_t group_replicas;
ssx::async_counter counter;
for (const auto& node : hr.value().node_reports) {
for (const auto& node : hr.node_reports) {
for (const auto& topic : node->topics) {
auto maybe_meta = _topics.get_topic_metadata_ref(topic.tp_ns);
if (!maybe_meta) {
Expand Down
7 changes: 5 additions & 2 deletions src/v/cluster/scheduling/leader_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#pragma once
#include "absl/container/flat_hash_map.h"
#include "base/seastarx.h"
#include "cluster/health_monitor_types.h"
#include "cluster/partition_manager.h"
#include "cluster/scheduling/leader_balancer_probe.h"
#include "cluster/scheduling/leader_balancer_strategy.h"
Expand Down Expand Up @@ -105,12 +106,14 @@ class leader_balancer {

using group_replicas_t = absl::btree_map<raft::group_id, replicas_t>;
ss::future<std::optional<group_replicas_t>>
collect_group_replicas_from_health_report();
collect_group_replicas_from_health_report(const cluster_health_report&);
leader_balancer_types::group_id_to_topic_revision_t
build_group_id_to_topic_rev() const;
index_type build_index(std::optional<group_replicas_t>);
absl::flat_hash_set<model::node_id>
collect_muted_nodes(const cluster_health_report&);

leader_balancer_types::muted_groups_t muted_groups() const;
absl::flat_hash_set<model::node_id> muted_nodes() const;

ss::future<bool> do_transfer(reassignment);
ss::future<bool> do_transfer_local(reassignment) const;
Expand Down
2 changes: 1 addition & 1 deletion tests/rptest/tests/topic_recovery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1762,7 +1762,7 @@ def test_many_partitions(self, cloud_storage_type, check_mode):
num_topics=5,
num_partitions_per_topic=20,
check_mode=check_mode)
self.do_run(test_case)
self.do_run(test_case, upload_delay_sec=120)

@cluster(num_nodes=4,
log_allow_list=TRANSIENT_ERRORS +
Expand Down
Loading