Skip to content

Commit

Permalink
Merge pull request #5371 from ztlpn/partition-autobalancer-dev
Browse files Browse the repository at this point in the history
Partition balancer backend
  • Loading branch information
ztlpn committed Jul 12, 2022
2 parents 8b95b18 + 4e67b24 commit 74c883a
Show file tree
Hide file tree
Showing 46 changed files with 1,318 additions and 174 deletions.
10 changes: 10 additions & 0 deletions src/v/cluster/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ rpcgen(
INCLUDES ${CMAKE_BINARY_DIR}/src/v
)

rpcgen(
TARGET partition_balancer_rpc
IN_FILE ${CMAKE_CURRENT_SOURCE_DIR}/partition_balancer_rpc.json
OUT_FILE ${CMAKE_CURRENT_BINARY_DIR}/partition_balancer_rpc_service.h
INCLUDES ${CMAKE_BINARY_DIR}/src/v
)

v_cc_library(
NAME cluster
SRCS
Expand Down Expand Up @@ -95,12 +102,15 @@ v_cc_library(
drain_manager.cc
read_replica_manager.cc
partition_balancer_planner.cc
partition_balancer_backend.cc
partition_balancer_rpc_handler.cc
DEPS
Seastar::seastar
controller_rpc
metadata_rpc
id_allocator_rpc
tx_gateway_rpc
partition_balancer_rpc
v::raft
Roaring::roaring
absl::flat_hash_map
Expand Down
11 changes: 7 additions & 4 deletions src/v/cluster/cluster_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,23 +210,26 @@ ss::future<std::error_code> replicate_and_wait(
ss::sharded<feature_table>& feature_table,
ss::sharded<ss::abort_source>& as,
Cmd&& cmd,
model::timeout_clock::time_point timeout) {
model::timeout_clock::time_point timeout,
std::optional<model::term_id> term = std::nullopt) {
const bool use_serde_serialization = feature_table.local().is_active(
feature::serde_raft_0);
return stm.invoke_on(
controller_stm_shard,
[cmd = std::forward<Cmd>(cmd),
term,
&as = as,
timeout,
use_serde_serialization](controller_stm& stm) mutable {
if (likely(use_serde_serialization)) {
auto b = serde_serialize_cmd(std::forward<Cmd>(cmd));
return stm.replicate_and_wait(std::move(b), timeout, as.local());
return stm.replicate_and_wait(
std::move(b), timeout, as.local(), term);
}
return serialize_cmd(std::forward<Cmd>(cmd))
.then([&stm, timeout, &as](model::record_batch b) {
.then([&stm, timeout, term, &as](model::record_batch b) {
return stm.replicate_and_wait(
std::move(b), timeout, as.local());
std::move(b), timeout, as.local(), term);
});
});
}
Expand Down
25 changes: 25 additions & 0 deletions src/v/cluster/controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "cluster/members_table.h"
#include "cluster/metadata_dissemination_service.h"
#include "cluster/metrics_reporter.h"
#include "cluster/partition_balancer_backend.h"
#include "cluster/partition_leaders_table.h"
#include "cluster/partition_manager.h"
#include "cluster/raft0_utils.h"
Expand Down Expand Up @@ -335,6 +336,29 @@ ss::future<> controller::start() {
})
.then([this] {
return _metrics_reporter.invoke_on(0, &metrics_reporter::start);
})
.then([this] {
return _partition_balancer.start_single(
_raft0,
std::ref(_stm),
std::ref(_tp_state),
std::ref(_hm_frontend),
std::ref(_partition_allocator),
std::ref(_tp_frontend),
config::shard_local_cfg().partition_autobalancing_mode.bind(),
config::shard_local_cfg()
.partition_autobalancing_node_availability_timeout_sec.bind(),
config::shard_local_cfg()
.partition_autobalancing_max_disk_usage_percent.bind(),
config::shard_local_cfg()
.partition_autobalancing_tick_interval_ms.bind(),
config::shard_local_cfg()
.partition_autobalancing_movement_batch_size_bytes.bind());
})
.then([this] {
return _partition_balancer.invoke_on(
partition_balancer_backend::shard,
&partition_balancer_backend::start);
});
}

Expand All @@ -357,6 +381,7 @@ ss::future<> controller::stop() {
auto stop_leader_balancer = _leader_balancer ? _leader_balancer->stop()
: ss::now();
return stop_leader_balancer
.then([this] { return _partition_balancer.stop(); })
.then([this] { return _metrics_reporter.stop(); })
.then([this] { return _feature_manager.stop(); })
.then([this] { return _hm_frontend.stop(); })
Expand Down
5 changes: 5 additions & 0 deletions src/v/cluster/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ class controller {

ss::sharded<shard_table>& get_shard_table() { return _shard_table; }

ss::sharded<partition_balancer_backend>& get_partition_balancer() {
return _partition_balancer;
}

ss::sharded<ss::abort_source>& get_abort_source() { return _as; }

bool is_raft0_leader() const {
Expand Down Expand Up @@ -159,6 +163,7 @@ class controller {
ss::sharded<feature_backend> _feature_backend; // instance per core
ss::sharded<feature_table>& _feature_table; // instance per core
std::unique_ptr<leader_balancer> _leader_balancer;
ss::sharded<partition_balancer_backend> _partition_balancer;
consensus_ptr _raft0;
ss::sharded<cloud_storage::remote>& _cloud_storage_api;
controller_probe _probe;
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,6 @@ class feature_frontend;
class feature_manager;
class feature_table;
class drain_manager;
class partition_balancer_backend;

} // namespace cluster
4 changes: 3 additions & 1 deletion src/v/cluster/members_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,9 @@ void members_backend::calculate_reallocations_after_decommissioned(
}
void members_backend::calculate_reallocations_after_node_added(
members_backend::update_meta& meta) const {
if (!config::shard_local_cfg().enable_auto_rebalance_on_node_add()) {
if (
config::shard_local_cfg().partition_autobalancing_mode()
== model::partition_autobalancing_mode::off) {
return;
}
auto& topics = _topics.local().topics_map();
Expand Down
218 changes: 218 additions & 0 deletions src/v/cluster/partition_balancer_backend.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
/*
* Copyright 2022 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/

#include "cluster/partition_balancer_backend.h"

#include "cluster/health_monitor_frontend.h"
#include "cluster/logger.h"
#include "cluster/partition_balancer_planner.h"
#include "cluster/topics_frontend.h"
#include "random/generators.h"

#include <seastar/core/coroutine.hh>

#include <chrono>

using namespace std::chrono_literals;

namespace cluster {

static constexpr std::chrono::seconds controller_stm_sync_timeout = 10s;
static constexpr std::chrono::seconds add_move_cmd_timeout = 10s;

partition_balancer_backend::partition_balancer_backend(
consensus_ptr raft0,
ss::sharded<controller_stm>& controller_stm,
ss::sharded<topic_table>& topic_table,
ss::sharded<health_monitor_frontend>& health_monitor,
ss::sharded<partition_allocator>& partition_allocator,
ss::sharded<topics_frontend>& topics_frontend,
config::binding<model::partition_autobalancing_mode>&& mode,
config::binding<std::chrono::seconds>&& availability_timeout,
config::binding<unsigned>&& max_disk_usage_percent,
config::binding<std::chrono::milliseconds>&& tick_interval,
config::binding<size_t>&& movement_batch_size_bytes)
: _raft0(std::move(raft0))
, _controller_stm(controller_stm.local())
, _topic_table(topic_table.local())
, _health_monitor(health_monitor.local())
, _partition_allocator(partition_allocator.local())
, _topics_frontend(topics_frontend.local())
, _mode(std::move(mode))
, _availability_timeout(std::move(availability_timeout))
, _max_disk_usage_percent(std::move(max_disk_usage_percent))
, _tick_interval(std::move(tick_interval))
, _movement_batch_size_bytes(std::move(movement_batch_size_bytes))
, _timer([this] { tick(); }) {}

void partition_balancer_backend::start() {
if (is_enabled()) {
vlog(clusterlog.info, "partition autobalancing enabled");
_timer.arm(_tick_interval());
}

_mode.watch([this] { on_mode_changed(); });
}

void partition_balancer_backend::tick() {
ssx::background = ssx::spawn_with_gate_then(_gate, [this] {
return do_tick().finally([this] {
if (is_enabled() && !_gate.is_closed()) {
_timer.arm(_tick_interval());
}
});
}).handle_exception([](const std::exception_ptr& e) {
vlog(clusterlog.warn, "tick error: {}", e);
});
}

void partition_balancer_backend::on_mode_changed() {
if (_gate.is_closed()) {
return;
}

if (is_enabled()) {
vlog(clusterlog.info, "partition autobalancing enabled");
if (!_timer.armed()) {
_timer.arm(0ms);
}
} else {
vlog(clusterlog.info, "partition autobalancing disabled");
_timer.cancel();
}
}

ss::future<> partition_balancer_backend::stop() {
vlog(clusterlog.info, "stopping...");
_timer.cancel();
return _gate.close();
}

ss::future<> partition_balancer_backend::do_tick() {
if (!_raft0->is_leader()) {
vlog(clusterlog.debug, "not leader, skipping tick");
co_return;
}

vlog(clusterlog.debug, "tick");

auto current_term = _raft0->term();

co_await _controller_stm.wait(
_raft0->committed_offset(),
model::timeout_clock::now() + controller_stm_sync_timeout);

if (_raft0->term() != current_term) {
vlog(clusterlog.debug, "lost leadership, exiting");
co_return;
}

auto health_report
= co_await _health_monitor.get_current_cluster_health_snapshot(
cluster_report_filter{});

auto plan_data
= partition_balancer_planner(
planner_config{
.max_disk_usage_ratio = _max_disk_usage_percent() / 100.0,
.movement_disk_size_batch = _movement_batch_size_bytes(),
.node_availability_timeout_sec = _availability_timeout(),
},
_topic_table,
_partition_allocator)
.plan_reassignments(health_report, _raft0->get_follower_metrics());

if (!plan_data.violations.is_empty()) {
vlog(
clusterlog.info,
"violations: {} unavailable nodes, {} full nodes; planned {} "
"reassignments",
plan_data.violations.unavailable_nodes.size(),
plan_data.violations.full_nodes.size(),
plan_data.reassignments.size());
}

_last_leader_term = _raft0->term();
_last_tick_time = ss::lowres_clock::now();
_last_violations = std::move(plan_data.violations);
if (
_topic_table.has_updates_in_progress()
|| !plan_data.reassignments.empty()) {
_last_status = partition_balancer_status::in_progress;
} else if (plan_data.failed_reassignments_count > 0) {
_last_status = partition_balancer_status::stalled;
} else {
_last_status = partition_balancer_status::ready;
}

co_await ss::max_concurrent_for_each(
plan_data.reassignments,
32,
[this, current_term](ntp_reassignments& reassignment) {
vlog(
clusterlog.info,
"moving {} to {}",
reassignment.ntp,
reassignment.allocation_units.get_assignments().front().replicas);

return _topics_frontend
.move_partition_replicas(
reassignment.ntp,
reassignment.allocation_units.get_assignments().front().replicas,
model::timeout_clock::now() + add_move_cmd_timeout,
current_term)
.then([reassignment = std::move(reassignment)](auto errc) {
vlog(
clusterlog.info,
"{} reassignment submitted, errc: {}",
reassignment.ntp,
errc);
});
});
}

partition_balancer_overview_reply partition_balancer_backend::overview() const {
vassert(ss::this_shard_id() == shard, "called on a wrong shard");

partition_balancer_overview_reply ret;

if (_mode() != model::partition_autobalancing_mode::continuous) {
ret.status = partition_balancer_status::off;
ret.error = errc::feature_disabled;
return ret;
}

if (!is_leader()) {
ret.error = errc::not_leader;
return ret;
}

if (_raft0->term() != _last_leader_term) {
// we haven't done a single tick in this term yet, return empty response
ret.status = partition_balancer_status::starting;
ret.error = errc::success;
return ret;
}

ret.status = _last_status;
ret.violations = _last_violations;

auto now = ss::lowres_clock::now();
auto time_since_last_tick = now - _last_tick_time;
ret.last_tick_time = model::to_timestamp(
model::timestamp_clock::now()
- std::chrono::duration_cast<model::timestamp_clock::duration>(
time_since_last_tick));

ret.error = errc::success;
return ret;
}

} // namespace cluster
Loading

0 comments on commit 74c883a

Please sign in to comment.