diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index 2c97020b6c41e..b14622fd619ef 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -40,6 +40,7 @@ #include #include #include +#include #include #include @@ -243,6 +244,8 @@ ss::future<> consensus::stop() { co_await _event_manager.stop(); co_await _append_requests_buffer.stop(); co_await _batcher.stop(); + + _op_lock.broken(); co_await _bg.close(); // close writer if we have to @@ -547,8 +550,11 @@ void consensus::dispatch_recovery(follower_index_metadata& idx) { ss::future> consensus::linearizable_barrier() { using ret_t = result; - - ss::semaphore_units<> u = co_await _op_lock.get_units(); + try { + ss::semaphore_units<> u = co_await _op_lock.get_units(); + } catch (const ss::broken_semaphore&) { + co_return make_error_code(errc::shutting_down); + } if (_vstate != vote_state::leader) { co_return result(make_error_code(errc::not_leader)); @@ -893,27 +899,30 @@ void consensus::arm_vote_timeout() { ss::future consensus::update_group_member(model::broker broker) { - return _op_lock.get_units().then( - [this, broker = std::move(broker)](ss::semaphore_units<> u) mutable { - auto cfg = _configuration_manager.get_latest(); - if (!cfg.contains_broker(broker.id())) { - vlog( - _ctxlog.warn, - "Node with id {} does not exists in current configuration"); - return ss::make_ready_future( - errc::node_does_not_exists); - } - // update broker information - cfg.update(std::move(broker)); + return _op_lock.get_units() + .then( + [this, broker = std::move(broker)](ss::semaphore_units<> u) mutable { + auto cfg = _configuration_manager.get_latest(); + if (!cfg.contains_broker(broker.id())) { + vlog( + _ctxlog.warn, + "Node with id {} does not exists in current configuration"); + return ss::make_ready_future( + errc::node_does_not_exists); + } + // update broker information + cfg.update(std::move(broker)); - return replicate_configuration(std::move(u), std::move(cfg)); - }); + return replicate_configuration(std::move(u), std::move(cfg)); + }) + .handle_exception_type( + [](const ss::broken_semaphore&) { return errc::shutting_down; }); } template ss::future consensus::change_configuration(Func&& f) { - return _op_lock.get_units().then( - [this, f = std::forward(f)](ss::semaphore_units<> u) mutable { + return _op_lock.get_units() + .then([this, f = std::forward(f)](ss::semaphore_units<> u) mutable { // prevent updating configuration if last configuration wasn't // committed if (_configuration_manager.get_latest_offset() > _commit_index) { @@ -937,7 +946,9 @@ ss::future consensus::change_configuration(Func&& f) { std::move(u), std::move(res.value())); } return ss::make_ready_future(res.error()); - }); + }) + .handle_exception_type( + [](const ss::broken_semaphore&) { return errc::shutting_down; }); } ss::future consensus::add_group_members( @@ -1000,7 +1011,12 @@ ss::future consensus::replace_configuration( template ss::future consensus::interrupt_configuration_change(model::revision_id revision, Func f) { - auto u = co_await _op_lock.get_units(); + ss::semaphore_units<> u; + try { + u = co_await _op_lock.get_units(); + } catch (const ss::broken_semaphore&) { + co_return errc::shutting_down; + } auto latest_cfg = config(); // latest configuration is of joint type if (latest_cfg.get_state() == configuration_state::simple) { @@ -1035,7 +1051,12 @@ consensus::cancel_configuration_change(model::revision_id revision) { if (!ec) { // current leader is not a voter, step down if (!config().is_voter(_self)) { - auto u = co_await _op_lock.get_units(); + ss::semaphore_units<> u; + try { + u = co_await _op_lock.get_units(); + } catch (const ss::broken_semaphore&) { + co_return errc::shutting_down; + } do_step_down("current leader is not voter"); } } @@ -1049,7 +1070,12 @@ consensus::abort_configuration_change(model::revision_id revision) { _ctxlog.info, "requested abort of current configuration change - {}", config()); - auto u = co_await _op_lock.get_units(); + ss::semaphore_units<> u; + try { + u = co_await _op_lock.get_units(); + } catch (const ss::broken_semaphore&) { + co_return errc::shutting_down; + } auto latest_cfg = config(); // latest configuration must be of joint type if (latest_cfg.get_state() == configuration_state::simple) { @@ -1292,6 +1318,8 @@ ss::future<> consensus::do_start() { _term, _configuration_manager.get_latest()); }); + }).handle_exception_type([](const ss::broken_semaphore&){ + return errc::shutting_down; }); } @@ -1396,6 +1424,13 @@ ss::future consensus::vote(vote_request&& r) { .term = _term, .granted = false, .log_ok = false}; + }) + .handle_exception_type([this, target_node_id](const ss::broken_semaphore&){ + return vote_reply{ + .target_node_id = target_node_id, + .term = _term, + .granted = false, + .log_ok = false}; }); }); } @@ -1803,6 +1838,8 @@ ss::future consensus::install_snapshot(install_snapshot_request&& r) { return _op_lock.with([this, r = std::move(r)]() mutable { return do_install_snapshot(std::move(r)); + }).handle_exception_type([](const ss::broken_semaphore&) { + return errc::shutting_down; }); } @@ -2013,6 +2050,8 @@ ss::future<> consensus::write_snapshot(write_snapshot_cfg cfg) { return do_write_snapshot(cfg.last_included_index, std::move(cfg.data)) .then([] { return true; }); + }).handle_exception_type([](const ss::broken_semaphore&) { + return false; }); if (!updated) { @@ -2368,6 +2407,9 @@ ss::future<> consensus::refresh_commit_index() { return f.then([this, u = std::move(u)]() mutable { return do_maybe_update_leader_commit_idx(std::move(u)); }); + }) + .handle_exception_type([](const ss::broken_semaphore&) { + //ignore exception, shutting down }); } @@ -2734,8 +2776,12 @@ consensus::prepare_transfer_leadership(vnode target_rni) { // Enforce ordering wrt anyone currently doing an append under op_lock { + try{ auto units = co_await _op_lock.get_units(); vlog(_ctxlog.trace, "transfer leadership: cleared oplock"); + } catch(const ss::broken_semaphore&) { + co_return make_error_code(errc::shutting_down); + } } // Allow any buffered batches to complete, to avoid racing @@ -2988,6 +3034,7 @@ consensus::do_transfer_leadership(std::optional target) { // now and new leader sending a vote for its new term. // (If we accepted more writes, our log could get // ahead of new leader, and it could lose election) + try { auto units = co_await _op_lock.get_units(); do_step_down("leadership_transfer"); if (_leader_id) { @@ -2996,6 +3043,9 @@ consensus::do_transfer_leadership(std::optional target) { } co_return make_error_code(errc::success); + } catch( const ss::broken_semaphore& ) { + co_return errc::shutting_down; + } } }); });