From 708416cee8b2c212bd29522c12a892c567ff7400 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Mon, 1 Aug 2022 11:56:09 +0200 Subject: [PATCH] r/consensus: break _op_lock when stopping consensus When raft stops the last step in stop sequence is closing gate. After gate is closed all the background futures are guaranteed to be finished. However it may happen that the fiber is waiting for an `_op_lock` while gate is being closed. We need to make sure that when gate is closed no other fiber can acquire the `_op_lock`. Marking `_op_lock` mutex as broken prevents all waiters for acquiring a mutex and at the same time accessing `consensus` state. Fixes: #5759 Signed-off-by: Michal Maslanka --- src/v/raft/consensus.cc | 94 +++++++++++++++++++++++++++++++---------- 1 file changed, 72 insertions(+), 22 deletions(-) diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index 2c97020b6c41e..b14622fd619ef 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -40,6 +40,7 @@ #include #include #include +#include #include #include @@ -243,6 +244,8 @@ ss::future<> consensus::stop() { co_await _event_manager.stop(); co_await _append_requests_buffer.stop(); co_await _batcher.stop(); + + _op_lock.broken(); co_await _bg.close(); // close writer if we have to @@ -547,8 +550,11 @@ void consensus::dispatch_recovery(follower_index_metadata& idx) { ss::future> consensus::linearizable_barrier() { using ret_t = result; - - ss::semaphore_units<> u = co_await _op_lock.get_units(); + try { + ss::semaphore_units<> u = co_await _op_lock.get_units(); + } catch (const ss::broken_semaphore&) { + co_return make_error_code(errc::shutting_down); + } if (_vstate != vote_state::leader) { co_return result(make_error_code(errc::not_leader)); @@ -893,27 +899,30 @@ void consensus::arm_vote_timeout() { ss::future consensus::update_group_member(model::broker broker) { - return _op_lock.get_units().then( - [this, broker = std::move(broker)](ss::semaphore_units<> u) mutable { - auto cfg = _configuration_manager.get_latest(); - if (!cfg.contains_broker(broker.id())) { - vlog( - _ctxlog.warn, - "Node with id {} does not exists in current configuration"); - return ss::make_ready_future( - errc::node_does_not_exists); - } - // update broker information - cfg.update(std::move(broker)); + return _op_lock.get_units() + .then( + [this, broker = std::move(broker)](ss::semaphore_units<> u) mutable { + auto cfg = _configuration_manager.get_latest(); + if (!cfg.contains_broker(broker.id())) { + vlog( + _ctxlog.warn, + "Node with id {} does not exists in current configuration"); + return ss::make_ready_future( + errc::node_does_not_exists); + } + // update broker information + cfg.update(std::move(broker)); - return replicate_configuration(std::move(u), std::move(cfg)); - }); + return replicate_configuration(std::move(u), std::move(cfg)); + }) + .handle_exception_type( + [](const ss::broken_semaphore&) { return errc::shutting_down; }); } template ss::future consensus::change_configuration(Func&& f) { - return _op_lock.get_units().then( - [this, f = std::forward(f)](ss::semaphore_units<> u) mutable { + return _op_lock.get_units() + .then([this, f = std::forward(f)](ss::semaphore_units<> u) mutable { // prevent updating configuration if last configuration wasn't // committed if (_configuration_manager.get_latest_offset() > _commit_index) { @@ -937,7 +946,9 @@ ss::future consensus::change_configuration(Func&& f) { std::move(u), std::move(res.value())); } return ss::make_ready_future(res.error()); - }); + }) + .handle_exception_type( + [](const ss::broken_semaphore&) { return errc::shutting_down; }); } ss::future consensus::add_group_members( @@ -1000,7 +1011,12 @@ ss::future consensus::replace_configuration( template ss::future consensus::interrupt_configuration_change(model::revision_id revision, Func f) { - auto u = co_await _op_lock.get_units(); + ss::semaphore_units<> u; + try { + u = co_await _op_lock.get_units(); + } catch (const ss::broken_semaphore&) { + co_return errc::shutting_down; + } auto latest_cfg = config(); // latest configuration is of joint type if (latest_cfg.get_state() == configuration_state::simple) { @@ -1035,7 +1051,12 @@ consensus::cancel_configuration_change(model::revision_id revision) { if (!ec) { // current leader is not a voter, step down if (!config().is_voter(_self)) { - auto u = co_await _op_lock.get_units(); + ss::semaphore_units<> u; + try { + u = co_await _op_lock.get_units(); + } catch (const ss::broken_semaphore&) { + co_return errc::shutting_down; + } do_step_down("current leader is not voter"); } } @@ -1049,7 +1070,12 @@ consensus::abort_configuration_change(model::revision_id revision) { _ctxlog.info, "requested abort of current configuration change - {}", config()); - auto u = co_await _op_lock.get_units(); + ss::semaphore_units<> u; + try { + u = co_await _op_lock.get_units(); + } catch (const ss::broken_semaphore&) { + co_return errc::shutting_down; + } auto latest_cfg = config(); // latest configuration must be of joint type if (latest_cfg.get_state() == configuration_state::simple) { @@ -1292,6 +1318,8 @@ ss::future<> consensus::do_start() { _term, _configuration_manager.get_latest()); }); + }).handle_exception_type([](const ss::broken_semaphore&){ + return errc::shutting_down; }); } @@ -1396,6 +1424,13 @@ ss::future consensus::vote(vote_request&& r) { .term = _term, .granted = false, .log_ok = false}; + }) + .handle_exception_type([this, target_node_id](const ss::broken_semaphore&){ + return vote_reply{ + .target_node_id = target_node_id, + .term = _term, + .granted = false, + .log_ok = false}; }); }); } @@ -1803,6 +1838,8 @@ ss::future consensus::install_snapshot(install_snapshot_request&& r) { return _op_lock.with([this, r = std::move(r)]() mutable { return do_install_snapshot(std::move(r)); + }).handle_exception_type([](const ss::broken_semaphore&) { + return errc::shutting_down; }); } @@ -2013,6 +2050,8 @@ ss::future<> consensus::write_snapshot(write_snapshot_cfg cfg) { return do_write_snapshot(cfg.last_included_index, std::move(cfg.data)) .then([] { return true; }); + }).handle_exception_type([](const ss::broken_semaphore&) { + return false; }); if (!updated) { @@ -2368,6 +2407,9 @@ ss::future<> consensus::refresh_commit_index() { return f.then([this, u = std::move(u)]() mutable { return do_maybe_update_leader_commit_idx(std::move(u)); }); + }) + .handle_exception_type([](const ss::broken_semaphore&) { + //ignore exception, shutting down }); } @@ -2734,8 +2776,12 @@ consensus::prepare_transfer_leadership(vnode target_rni) { // Enforce ordering wrt anyone currently doing an append under op_lock { + try{ auto units = co_await _op_lock.get_units(); vlog(_ctxlog.trace, "transfer leadership: cleared oplock"); + } catch(const ss::broken_semaphore&) { + co_return make_error_code(errc::shutting_down); + } } // Allow any buffered batches to complete, to avoid racing @@ -2988,6 +3034,7 @@ consensus::do_transfer_leadership(std::optional target) { // now and new leader sending a vote for its new term. // (If we accepted more writes, our log could get // ahead of new leader, and it could lose election) + try { auto units = co_await _op_lock.get_units(); do_step_down("leadership_transfer"); if (_leader_id) { @@ -2996,6 +3043,9 @@ consensus::do_transfer_leadership(std::optional target) { } co_return make_error_code(errc::success); + } catch( const ss::broken_semaphore& ) { + co_return errc::shutting_down; + } } }); });