Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Learners promotion v1 #15

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 48 additions & 9 deletions src/v/raft/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@
#include <optional>

namespace raft {
bool group_nodes::contains(model::node_id id) const {
auto v_it = std::find(std::cbegin(voters), std::cend(voters), id);
if (v_it != voters.cend()) {
return true;
}
auto l_it = std::find(std::cbegin(learners), std::cend(learners), id);
return l_it != learners.cend();
}

group_configuration::group_configuration(std::vector<model::broker> brokers)
: _brokers(std::move(brokers)) {
Expand Down Expand Up @@ -132,10 +140,9 @@ void group_configuration::add(std::vector<model::broker> brokers) {
}
}

// FIXME: add to learners when we will implement learners promotion
_old = _current;
for (auto& b : brokers) {
_current.voters.push_back(b.id());
_current.learners.push_back(b.id());
_brokers.push_back(std::move(b));
}
}
Expand Down Expand Up @@ -167,18 +174,38 @@ void group_configuration::remove(const std::vector<model::node_id>& ids) {
}

void group_configuration::replace(std::vector<model::broker> brokers) {
vassert(
!_old, "can not remove broker from joint configuration - {}", *this);
vassert(!_old, "can not replace joint configuration - {}", *this);

// add missing brokers, brokers have to contain both the new and old nodes

if (brokers == _brokers) {
bool has_all = std::all_of(
brokers.begin(), brokers.end(), [this](model::broker& b) {
return _current.contains(b.id());
});
// configurations are identical, do nothing
if (has_all) {
return;
}
}

_old = _current;
_current.learners.clear();
_current.voters.clear();
std::transform(
std::cbegin(brokers),
std::cend(brokers),
std::back_inserter(_current.voters),
[](const model::broker& broker) { return broker.id(); });

for (auto& br : brokers) {
auto was_voter = std::find(
std::cbegin(_old->voters),
std::cend(_old->voters),
br.id())
!= std::cend(_old->voters);

if (was_voter) {
_current.voters.push_back(br.id());
} else {
_current.learners.push_back(br.id());
}
}

for (auto& b : brokers) {
if (!contains_broker(b.id())) {
Expand All @@ -187,6 +214,18 @@ void group_configuration::replace(std::vector<model::broker> brokers) {
}
}

void group_configuration::promote_to_voter(model::node_id id) {
auto it = std::find(
std::cbegin(_current.learners), std::cend(_current.learners), id);
// do nothing
if (it == _current.learners.end()) {
return;
}
// add to voters
_current.learners.erase(it);
_current.voters.push_back(id);
}

void group_configuration::discard_old_config() {
vassert(
_old,
Expand Down
3 changes: 3 additions & 0 deletions src/v/raft/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ struct group_nodes {
std::vector<model::node_id> voters;
std::vector<model::node_id> learners;

bool contains(model::node_id id) const;
friend std::ostream& operator<<(std::ostream&, const group_nodes&);
friend bool operator==(const group_nodes&, const group_nodes&);
};
Expand Down Expand Up @@ -124,6 +125,8 @@ class group_configuration final {

int8_t version() const { return _version; }

void promote_to_voter(model::node_id id);

friend bool
operator==(const group_configuration&, const group_configuration&);

Expand Down
39 changes: 38 additions & 1 deletion src/v/raft/consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,47 @@ consensus::success_reply consensus::update_follower_index(
// learners to nodes and perform data movement to added replicas
}

void consensus::maybe_promote_to_voter(model::node_id id) {
(void)ss::with_gate(_bg, [this, id] {
// is voter already
if (config().is_voter(id)) {
return ss::now();
}
auto it = _fstats.find(id);

// already removed
if (it == _fstats.end()) {
return ss::now();
}

// do not promote to voter, learner is not up to date
if (it->second.match_index < _log.offsets().committed_offset) {
return ss::now();
}

vlog(_ctxlog.trace, "promoting node {} to voter", id);
return _op_lock.get_units()
.then([this, id](ss::semaphore_units<> u) mutable {
auto latest_cfg = _configuration_manager.get_latest();
latest_cfg.promote_to_voter(id);

return replicate_configuration(
std::move(u), std::move(latest_cfg));
})
.then([this, id](std::error_code ec) {
vlog(
_ctxlog.trace, "node {} promotion result {}", id, ec.message());
});
}).handle_exception_type([](const ss::gate_closed_exception&) {});
}

void consensus::process_append_entries_reply(
model::node_id node,
result<append_entries_reply> r,
follower_req_seq seq_id) {
auto is_success = update_follower_index(node, std::move(r), seq_id);
if (is_success) {
maybe_promote_to_voter(node);
maybe_update_leader_commit_idx();
}
}
Expand Down Expand Up @@ -1425,7 +1460,9 @@ ss::future<> consensus::maybe_commit_configuration(ss::semaphore_units<> u) {

auto latest_cfg = _configuration_manager.get_latest();
// as a leader replicate new simple configuration
if (latest_cfg.type() == configuration_type::joint) {
if (
latest_cfg.type() == configuration_type::joint
&& !latest_cfg.current_config().voters.empty()) {
latest_cfg.discard_old_config();
vlog(
_ctxlog.trace,
Expand Down
1 change: 1 addition & 0 deletions src/v/raft/consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ class consensus {
ss::future<std::error_code> change_configuration(Func&&);

ss::future<> maybe_commit_configuration(ss::semaphore_units<>);
void maybe_promote_to_voter(model::node_id);

bytes last_applied_key() const;
// args
Expand Down
2 changes: 1 addition & 1 deletion src/v/raft/recovery_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ recovery_stm::recovery_stm(
: _ptr(p)
, _node_id(node_id)
, _prio(prio)
, _ctxlog(_ptr->group(), _ptr->ntp()) {}
, _ctxlog(_ptr->_ctxlog) {}

ss::future<> recovery_stm::do_recover() {
// We have to send all the records that leader have, event those that are
Expand Down
2 changes: 1 addition & 1 deletion src/v/raft/replicate_entries_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ replicate_entries_stm::replicate_entries_stm(
, _req(std::move(r))
, _followers_seq(std::move(seqs))
, _share_sem(1)
, _ctxlog(_ptr->group(), _ptr->ntp()) {}
, _ctxlog(_ptr->_ctxlog) {}

replicate_entries_stm::~replicate_entries_stm() {
auto gate_not_closed = _req_bg.get_count() > 0 && !_req_bg.is_closed();
Expand Down
2 changes: 1 addition & 1 deletion src/v/raft/vote_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ std::ostream& operator<<(std::ostream& o, const vote_stm::vmeta& m) {
vote_stm::vote_stm(consensus* p)
: _ptr(p)
, _sem(_ptr->config().unique_voter_count())
, _ctxlog(_ptr->group(), _ptr->ntp()) {}
, _ctxlog(_ptr->_ctxlog) {}

vote_stm::~vote_stm() {
if (_vote_bg.get_count() > 0 && !_vote_bg.is_closed()) {
Expand Down