Skip to content

Commit

Permalink
r/consensus: break _op_lock when stopping consensus
Browse files Browse the repository at this point in the history
When raft stops the last step in stop sequence is closing gate. After
gate is closed all the background futures are guaranteed to be finished.
However it may happen that the fiber is waiting for an `_op_lock` while
gate is being closed. We need to make sure that when gate is closed no
other fiber can acquire the `_op_lock`.

Marking `_op_lock` mutex as broken prevents all waiters for acquiring a
mutex and at the same time accessing `consensus` state.

Fixes: redpanda-data#5759

Signed-off-by: Michal Maslanka <michal@redpanda.com>
  • Loading branch information
mmaslankaprv committed Aug 1, 2022
1 parent acde644 commit 708416c
Showing 1 changed file with 72 additions and 22 deletions.
94 changes: 72 additions & 22 deletions src/v/raft/consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include <seastar/core/fstream.hh>
#include <seastar/core/future.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/semaphore.hh>
#include <seastar/util/defer.hh>

#include <fmt/ostream.h>
Expand Down Expand Up @@ -243,6 +244,8 @@ ss::future<> consensus::stop() {
co_await _event_manager.stop();
co_await _append_requests_buffer.stop();
co_await _batcher.stop();

_op_lock.broken();
co_await _bg.close();

// close writer if we have to
Expand Down Expand Up @@ -547,8 +550,11 @@ void consensus::dispatch_recovery(follower_index_metadata& idx) {

ss::future<result<model::offset>> consensus::linearizable_barrier() {
using ret_t = result<model::offset>;

ss::semaphore_units<> u = co_await _op_lock.get_units();
try {
ss::semaphore_units<> u = co_await _op_lock.get_units();
} catch (const ss::broken_semaphore&) {
co_return make_error_code(errc::shutting_down);
}

if (_vstate != vote_state::leader) {
co_return result<model::offset>(make_error_code(errc::not_leader));
Expand Down Expand Up @@ -893,27 +899,30 @@ void consensus::arm_vote_timeout() {

ss::future<std::error_code>
consensus::update_group_member(model::broker broker) {
return _op_lock.get_units().then(
[this, broker = std::move(broker)](ss::semaphore_units<> u) mutable {
auto cfg = _configuration_manager.get_latest();
if (!cfg.contains_broker(broker.id())) {
vlog(
_ctxlog.warn,
"Node with id {} does not exists in current configuration");
return ss::make_ready_future<std::error_code>(
errc::node_does_not_exists);
}
// update broker information
cfg.update(std::move(broker));
return _op_lock.get_units()
.then(
[this, broker = std::move(broker)](ss::semaphore_units<> u) mutable {
auto cfg = _configuration_manager.get_latest();
if (!cfg.contains_broker(broker.id())) {
vlog(
_ctxlog.warn,
"Node with id {} does not exists in current configuration");
return ss::make_ready_future<std::error_code>(
errc::node_does_not_exists);
}
// update broker information
cfg.update(std::move(broker));

return replicate_configuration(std::move(u), std::move(cfg));
});
return replicate_configuration(std::move(u), std::move(cfg));
})
.handle_exception_type(
[](const ss::broken_semaphore&) { return errc::shutting_down; });
}

template<typename Func>
ss::future<std::error_code> consensus::change_configuration(Func&& f) {
return _op_lock.get_units().then(
[this, f = std::forward<Func>(f)](ss::semaphore_units<> u) mutable {
return _op_lock.get_units()
.then([this, f = std::forward<Func>(f)](ss::semaphore_units<> u) mutable {
// prevent updating configuration if last configuration wasn't
// committed
if (_configuration_manager.get_latest_offset() > _commit_index) {
Expand All @@ -937,7 +946,9 @@ ss::future<std::error_code> consensus::change_configuration(Func&& f) {
std::move(u), std::move(res.value()));
}
return ss::make_ready_future<std::error_code>(res.error());
});
})
.handle_exception_type(
[](const ss::broken_semaphore&) { return errc::shutting_down; });
}

ss::future<std::error_code> consensus::add_group_members(
Expand Down Expand Up @@ -1000,7 +1011,12 @@ ss::future<std::error_code> consensus::replace_configuration(
template<typename Func>
ss::future<std::error_code>
consensus::interrupt_configuration_change(model::revision_id revision, Func f) {
auto u = co_await _op_lock.get_units();
ss::semaphore_units<> u;
try {
u = co_await _op_lock.get_units();
} catch (const ss::broken_semaphore&) {
co_return errc::shutting_down;
}
auto latest_cfg = config();
// latest configuration is of joint type
if (latest_cfg.get_state() == configuration_state::simple) {
Expand Down Expand Up @@ -1035,7 +1051,12 @@ consensus::cancel_configuration_change(model::revision_id revision) {
if (!ec) {
// current leader is not a voter, step down
if (!config().is_voter(_self)) {
auto u = co_await _op_lock.get_units();
ss::semaphore_units<> u;
try {
u = co_await _op_lock.get_units();
} catch (const ss::broken_semaphore&) {
co_return errc::shutting_down;
}
do_step_down("current leader is not voter");
}
}
Expand All @@ -1049,7 +1070,12 @@ consensus::abort_configuration_change(model::revision_id revision) {
_ctxlog.info,
"requested abort of current configuration change - {}",
config());
auto u = co_await _op_lock.get_units();
ss::semaphore_units<> u;
try {
u = co_await _op_lock.get_units();
} catch (const ss::broken_semaphore&) {
co_return errc::shutting_down;
}
auto latest_cfg = config();
// latest configuration must be of joint type
if (latest_cfg.get_state() == configuration_state::simple) {
Expand Down Expand Up @@ -1292,6 +1318,8 @@ ss::future<> consensus::do_start() {
_term,
_configuration_manager.get_latest());
});
}).handle_exception_type([](const ss::broken_semaphore&){
return errc::shutting_down;
});
}

Expand Down Expand Up @@ -1396,6 +1424,13 @@ ss::future<vote_reply> consensus::vote(vote_request&& r) {
.term = _term,
.granted = false,
.log_ok = false};
})
.handle_exception_type([this, target_node_id](const ss::broken_semaphore&){
return vote_reply{
.target_node_id = target_node_id,
.term = _term,
.granted = false,
.log_ok = false};
});
});
}
Expand Down Expand Up @@ -1803,6 +1838,8 @@ ss::future<install_snapshot_reply>
consensus::install_snapshot(install_snapshot_request&& r) {
return _op_lock.with([this, r = std::move(r)]() mutable {
return do_install_snapshot(std::move(r));
}).handle_exception_type([](const ss::broken_semaphore&) {
return errc::shutting_down;
});
}

Expand Down Expand Up @@ -2013,6 +2050,8 @@ ss::future<> consensus::write_snapshot(write_snapshot_cfg cfg) {

return do_write_snapshot(cfg.last_included_index, std::move(cfg.data))
.then([] { return true; });
}).handle_exception_type([](const ss::broken_semaphore&) {
return false;
});

if (!updated) {
Expand Down Expand Up @@ -2368,6 +2407,9 @@ ss::future<> consensus::refresh_commit_index() {
return f.then([this, u = std::move(u)]() mutable {
return do_maybe_update_leader_commit_idx(std::move(u));
});
})
.handle_exception_type([](const ss::broken_semaphore&) {
//ignore exception, shutting down
});
}

Expand Down Expand Up @@ -2734,8 +2776,12 @@ consensus::prepare_transfer_leadership(vnode target_rni) {

// Enforce ordering wrt anyone currently doing an append under op_lock
{
try{
auto units = co_await _op_lock.get_units();
vlog(_ctxlog.trace, "transfer leadership: cleared oplock");
} catch(const ss::broken_semaphore&) {
co_return make_error_code(errc::shutting_down);
}
}

// Allow any buffered batches to complete, to avoid racing
Expand Down Expand Up @@ -2988,6 +3034,7 @@ consensus::do_transfer_leadership(std::optional<model::node_id> target) {
// now and new leader sending a vote for its new term.
// (If we accepted more writes, our log could get
// ahead of new leader, and it could lose election)
try {
auto units = co_await _op_lock.get_units();
do_step_down("leadership_transfer");
if (_leader_id) {
Expand All @@ -2996,6 +3043,9 @@ consensus::do_transfer_leadership(std::optional<model::node_id> target) {
}

co_return make_error_code(errc::success);
} catch( const ss::broken_semaphore& ) {
co_return errc::shutting_down;
}
}
});
});
Expand Down

0 comments on commit 708416c

Please sign in to comment.