From acde644322201518c6445ea1f40575df89d61dbd Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Mon, 1 Aug 2022 11:12:58 +0200 Subject: [PATCH 1/3] r/consensus: coroutinized raft::stop() method Signed-off-by: Michal Maslanka --- src/v/raft/consensus.cc | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index 0e4abe85f1c8..2c97020b6c41 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -240,18 +240,16 @@ ss::future<> consensus::stop() { for (auto& idx : _fstats) { idx.second.follower_state_change.broken(); } - return _event_manager.stop() - .then([this] { return _append_requests_buffer.stop(); }) - .then([this] { return _batcher.stop(); }) - .then([this] { return _bg.close(); }) - .then([this] { - // close writer if we have to - if (likely(!_snapshot_writer)) { - return ss::now(); - } - return _snapshot_writer->close().then( - [this] { _snapshot_writer.reset(); }); - }); + co_await _event_manager.stop(); + co_await _append_requests_buffer.stop(); + co_await _batcher.stop(); + co_await _bg.close(); + + // close writer if we have to + if (unlikely(_snapshot_writer)) { + co_await _snapshot_writer->close(); + _snapshot_writer.reset(); + } } consensus::success_reply consensus::update_follower_index( From ab31867acd81ea3c00ea5e530f100a8350c17bd8 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Mon, 1 Aug 2022 11:56:09 +0200 Subject: [PATCH 2/3] r/consensus: break _op_lock when stopping consensus When raft stops the last step in stop sequence is closing gate. After gate is closed all the background futures are guaranteed to be finished. However it may happen that the fiber is waiting for an `_op_lock` while gate is being closed. We need to make sure that when gate is closed no other fiber can acquire the `_op_lock`. Marking `_op_lock` mutex as broken prevents all waiters for acquiring a mutex and at the same time accessing `consensus` state. Fixes: #5759 Signed-off-by: Michal Maslanka --- src/v/raft/consensus.cc | 102 ++++++++++++++++++++++++++++++---------- 1 file changed, 78 insertions(+), 24 deletions(-) diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index 2c97020b6c41..03b35530eebe 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -40,6 +40,7 @@ #include #include #include +#include #include #include @@ -243,6 +244,8 @@ ss::future<> consensus::stop() { co_await _event_manager.stop(); co_await _append_requests_buffer.stop(); co_await _batcher.stop(); + + _op_lock.broken(); co_await _bg.close(); // close writer if we have to @@ -547,8 +550,12 @@ void consensus::dispatch_recovery(follower_index_metadata& idx) { ss::future> consensus::linearizable_barrier() { using ret_t = result; - - ss::semaphore_units<> u = co_await _op_lock.get_units(); + ss::semaphore_units<> u; + try { + u = co_await _op_lock.get_units(); + } catch (const ss::broken_semaphore&) { + co_return make_error_code(errc::shutting_down); + } if (_vstate != vote_state::leader) { co_return result(make_error_code(errc::not_leader)); @@ -893,27 +900,32 @@ void consensus::arm_vote_timeout() { ss::future consensus::update_group_member(model::broker broker) { - return _op_lock.get_units().then( - [this, broker = std::move(broker)](ss::semaphore_units<> u) mutable { - auto cfg = _configuration_manager.get_latest(); - if (!cfg.contains_broker(broker.id())) { - vlog( - _ctxlog.warn, - "Node with id {} does not exists in current configuration"); - return ss::make_ready_future( - errc::node_does_not_exists); - } - // update broker information - cfg.update(std::move(broker)); - - return replicate_configuration(std::move(u), std::move(cfg)); - }); + return _op_lock.get_units() + .then( + [this, broker = std::move(broker)](ss::semaphore_units<> u) mutable { + auto cfg = _configuration_manager.get_latest(); + if (!cfg.contains_broker(broker.id())) { + vlog( + _ctxlog.warn, + "Node with id {} does not exists in current configuration"); + return ss::make_ready_future( + errc::node_does_not_exists); + } + // update broker information + cfg.update(std::move(broker)); + + return replicate_configuration(std::move(u), std::move(cfg)); + }) + .handle_exception_type( + [](const ss::broken_semaphore&) { + return make_error_code(errc::shutting_down); + }); } template ss::future consensus::change_configuration(Func&& f) { - return _op_lock.get_units().then( - [this, f = std::forward(f)](ss::semaphore_units<> u) mutable { + return _op_lock.get_units() + .then([this, f = std::forward(f)](ss::semaphore_units<> u) mutable { // prevent updating configuration if last configuration wasn't // committed if (_configuration_manager.get_latest_offset() > _commit_index) { @@ -937,7 +949,11 @@ ss::future consensus::change_configuration(Func&& f) { std::move(u), std::move(res.value())); } return ss::make_ready_future(res.error()); - }); + }) + .handle_exception_type( + [](const ss::broken_semaphore&) { + return make_error_code(errc::shutting_down); + }); } ss::future consensus::add_group_members( @@ -1000,7 +1016,12 @@ ss::future consensus::replace_configuration( template ss::future consensus::interrupt_configuration_change(model::revision_id revision, Func f) { - auto u = co_await _op_lock.get_units(); + ss::semaphore_units<> u; + try { + u = co_await _op_lock.get_units(); + } catch (const ss::broken_semaphore&) { + co_return errc::shutting_down; + } auto latest_cfg = config(); // latest configuration is of joint type if (latest_cfg.get_state() == configuration_state::simple) { @@ -1035,7 +1056,12 @@ consensus::cancel_configuration_change(model::revision_id revision) { if (!ec) { // current leader is not a voter, step down if (!config().is_voter(_self)) { - auto u = co_await _op_lock.get_units(); + ss::semaphore_units<> u; + try { + u = co_await _op_lock.get_units(); + } catch (const ss::broken_semaphore&) { + co_return errc::shutting_down; + } do_step_down("current leader is not voter"); } } @@ -1049,7 +1075,12 @@ consensus::abort_configuration_change(model::revision_id revision) { _ctxlog.info, "requested abort of current configuration change - {}", config()); - auto u = co_await _op_lock.get_units(); + ss::semaphore_units<> u; + try { + u = co_await _op_lock.get_units(); + } catch (const ss::broken_semaphore&) { + co_return errc::shutting_down; + } auto latest_cfg = config(); // latest configuration must be of joint type if (latest_cfg.get_state() == configuration_state::simple) { @@ -1292,7 +1323,7 @@ ss::future<> consensus::do_start() { _term, _configuration_manager.get_latest()); }); - }); + }).handle_exception_type([](const ss::broken_semaphore&){}); } ss::future<> @@ -1396,6 +1427,13 @@ ss::future consensus::vote(vote_request&& r) { .term = _term, .granted = false, .log_ok = false}; + }) + .handle_exception_type([this, target_node_id](const ss::broken_semaphore&){ + return vote_reply{ + .target_node_id = target_node_id, + .term = _term, + .granted = false, + .log_ok = false}; }); }); } @@ -1803,6 +1841,9 @@ ss::future consensus::install_snapshot(install_snapshot_request&& r) { return _op_lock.with([this, r = std::move(r)]() mutable { return do_install_snapshot(std::move(r)); + }).handle_exception_type([this](const ss::broken_semaphore&) { + return install_snapshot_reply{ + .term = _term, .bytes_stored = 0, .success = false}; }); } @@ -2013,6 +2054,8 @@ ss::future<> consensus::write_snapshot(write_snapshot_cfg cfg) { return do_write_snapshot(cfg.last_included_index, std::move(cfg.data)) .then([] { return true; }); + }).handle_exception_type([](const ss::broken_semaphore&) { + return false; }); if (!updated) { @@ -2368,6 +2411,9 @@ ss::future<> consensus::refresh_commit_index() { return f.then([this, u = std::move(u)]() mutable { return do_maybe_update_leader_commit_idx(std::move(u)); }); + }) + .handle_exception_type([](const ss::broken_semaphore&) { + //ignore exception, shutting down }); } @@ -2734,8 +2780,12 @@ consensus::prepare_transfer_leadership(vnode target_rni) { // Enforce ordering wrt anyone currently doing an append under op_lock { + try{ auto units = co_await _op_lock.get_units(); vlog(_ctxlog.trace, "transfer leadership: cleared oplock"); + } catch(const ss::broken_semaphore&) { + co_return make_error_code(errc::shutting_down); + } } // Allow any buffered batches to complete, to avoid racing @@ -2988,6 +3038,7 @@ consensus::do_transfer_leadership(std::optional target) { // now and new leader sending a vote for its new term. // (If we accepted more writes, our log could get // ahead of new leader, and it could lose election) + try { auto units = co_await _op_lock.get_units(); do_step_down("leadership_transfer"); if (_leader_id) { @@ -2996,6 +3047,9 @@ consensus::do_transfer_leadership(std::optional target) { } co_return make_error_code(errc::success); + } catch( const ss::broken_semaphore& ) { + co_return errc::shutting_down; + } } }); }); From a616d3598a7aa4f63ec22168863b120789ba8048 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Mon, 1 Aug 2022 12:34:41 +0200 Subject: [PATCH 3/3] r/consensus: applied formatting Signed-off-by: Michal Maslanka --- src/v/raft/consensus.cc | 517 ++++++++++++++++++++-------------------- 1 file changed, 264 insertions(+), 253 deletions(-) diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index 03b35530eebe..bc8b07fade32 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -916,10 +916,9 @@ consensus::update_group_member(model::broker broker) { return replicate_configuration(std::move(u), std::move(cfg)); }) - .handle_exception_type( - [](const ss::broken_semaphore&) { - return make_error_code(errc::shutting_down); - }); + .handle_exception_type([](const ss::broken_semaphore&) { + return make_error_code(errc::shutting_down); + }); } template @@ -950,10 +949,9 @@ ss::future consensus::change_configuration(Func&& f) { } return ss::make_ready_future(res.error()); }) - .handle_exception_type( - [](const ss::broken_semaphore&) { - return make_error_code(errc::shutting_down); - }); + .handle_exception_type([](const ss::broken_semaphore&) { + return make_error_code(errc::shutting_down); + }); } ss::future consensus::add_group_members( @@ -1129,201 +1127,209 @@ ss::future<> consensus::start() { ss::future<> consensus::do_start() { vlog(_ctxlog.info, "Starting"); - return _op_lock.with([this] { - read_voted_for(); - - /* - * temporary workaround: - * - * if the group's ntp matches the pattern, then do not load the initial - * configuration snapshto from the keyvalue store. more info here: - * - * https://github.com/redpanda-data/redpanda/issues/1870 - */ - const auto& ntp = _log.config().ntp(); - const auto normalized_ntp = fmt::format( - "{}.{}.{}", ntp.ns(), ntp.tp.topic(), ntp.tp.partition()); - const auto& patterns = config::shard_local_cfg() - .full_raft_configuration_recovery_pattern(); - auto initial_state = std::any_of( - patterns.cbegin(), - patterns.cend(), - [&normalized_ntp](const ss::sstring& pattern) { - return pattern == "*" || normalized_ntp.starts_with(pattern); - }); - if (!initial_state) { - initial_state = is_initial_state(); - } - - vlog( - _ctxlog.info, - "Starting with voted_for {} term {} initial_state {}", - _voted_for, - _term, - initial_state); - - return _configuration_manager.start(initial_state, _self.revision()) - .then([this] { - vlog( - _ctxlog.trace, - "Configuration manager started: {}", - _configuration_manager); - }) - .then([this, initial_state] { - offset_translator::must_reset must_reset{initial_state}; - - absl::btree_map offset2delta; - for (auto it = _configuration_manager.begin(); - it != _configuration_manager.end(); - ++it) { - offset2delta.emplace(it->first, it->second.idx()); - } - - auto bootstrap = offset_translator::bootstrap_state{ - .offset2delta = std::move(offset2delta), - .highest_known_offset - = _configuration_manager.get_highest_known_offset(), - }; + return _op_lock + .with([this] { + read_voted_for(); + + /* + * temporary workaround: + * + * if the group's ntp matches the pattern, then do not load the + * initial configuration snapshto from the keyvalue store. more info + * here: + * + * https://github.com/redpanda-data/redpanda/issues/1870 + */ + const auto& ntp = _log.config().ntp(); + const auto normalized_ntp = fmt::format( + "{}.{}.{}", ntp.ns(), ntp.tp.topic(), ntp.tp.partition()); + const auto& patterns = config::shard_local_cfg() + .full_raft_configuration_recovery_pattern(); + auto initial_state = std::any_of( + patterns.cbegin(), + patterns.cend(), + [&normalized_ntp](const ss::sstring& pattern) { + return pattern == "*" || normalized_ntp.starts_with(pattern); + }); + if (!initial_state) { + initial_state = is_initial_state(); + } - return _offset_translator.start(must_reset, std::move(bootstrap)); - }) - .then([this] { return hydrate_snapshot(); }) - .then([this] { - vlog( - _ctxlog.debug, - "Starting raft bootstrap from {}", - _configuration_manager.get_highest_known_offset()); - return details::read_bootstrap_state( - _log, - model::next_offset( - _configuration_manager.get_highest_known_offset()), - _as); - }) - .then([this](configuration_bootstrap_state st) { - auto lstats = _log.offsets(); + vlog( + _ctxlog.info, + "Starting with voted_for {} term {} initial_state {}", + _voted_for, + _term, + initial_state); + + return _configuration_manager.start(initial_state, _self.revision()) + .then([this] { + vlog( + _ctxlog.trace, + "Configuration manager started: {}", + _configuration_manager); + }) + .then([this, initial_state] { + offset_translator::must_reset must_reset{initial_state}; + + absl::btree_map offset2delta; + for (auto it = _configuration_manager.begin(); + it != _configuration_manager.end(); + ++it) { + offset2delta.emplace(it->first, it->second.idx()); + } - vlog(_ctxlog.info, "Read bootstrap state: {}", st); - vlog(_ctxlog.info, "Current log offsets: {}", lstats); + auto bootstrap = offset_translator::bootstrap_state{ + .offset2delta = std::move(offset2delta), + .highest_known_offset + = _configuration_manager.get_highest_known_offset(), + }; - // if log term is newer than the one comming from voted_for state, - // we reset voted_for state - if (lstats.dirty_offset_term > _term) { - _term = lstats.dirty_offset_term; - _voted_for = {}; - } - /** - * since we are starting, there were no new writes to the log - * before that point. It is safe to use dirty offset as a initial - * flushed offset since it is equal to last offset that exists on - * disk and was read in log recovery process. - */ - _flushed_offset = lstats.dirty_offset; - /** - * The configuration manager state may be divereged from the log - * state, as log is flushed lazily, we have to make sure that the - * log and configuration manager has exactly the same offsets - * range - */ - vlog( - _ctxlog.info, - "Truncating configurations at {}", - lstats.dirty_offset); + return _offset_translator.start( + must_reset, std::move(bootstrap)); + }) + .then([this] { return hydrate_snapshot(); }) + .then([this] { + vlog( + _ctxlog.debug, + "Starting raft bootstrap from {}", + _configuration_manager.get_highest_known_offset()); + return details::read_bootstrap_state( + _log, + model::next_offset( + _configuration_manager.get_highest_known_offset()), + _as); + }) + .then([this](configuration_bootstrap_state st) { + auto lstats = _log.offsets(); - auto f = _configuration_manager.truncate( - model::next_offset(lstats.dirty_offset)); + vlog(_ctxlog.info, "Read bootstrap state: {}", st); + vlog(_ctxlog.info, "Current log offsets: {}", lstats); - /** - * We read some batches from the log and have to update the - * configuration manager. - */ - if (st.config_batches_seen() > 0) { - f = f.then([this, st = std::move(st)]() mutable { - return _configuration_manager.add( - std::move(st).release_configurations()); - }); - } + // if log term is newer than the one comming from voted_for + // state, we reset voted_for state + if (lstats.dirty_offset_term > _term) { + _term = lstats.dirty_offset_term; + _voted_for = {}; + } + /** + * since we are starting, there were no new writes to the log + * before that point. It is safe to use dirty offset as a + * initial flushed offset since it is equal to last offset that + * exists on disk and was read in log recovery process. + */ + _flushed_offset = lstats.dirty_offset; + /** + * The configuration manager state may be divereged from the log + * state, as log is flushed lazily, we have to make sure that + * the log and configuration manager has exactly the same + * offsets range + */ + vlog( + _ctxlog.info, + "Truncating configurations at {}", + lstats.dirty_offset); + + auto f = _configuration_manager.truncate( + model::next_offset(lstats.dirty_offset)); + + /** + * We read some batches from the log and have to update the + * configuration manager. + */ + if (st.config_batches_seen() > 0) { + f = f.then([this, st = std::move(st)]() mutable { + return _configuration_manager.add( + std::move(st).release_configurations()); + }); + } - return f.then([this] { - update_follower_stats(_configuration_manager.get_latest()); - }); - }) - .then([this] { return _offset_translator.sync_with_log(_log, _as); }) - .then([this] { - /** - * fix for incorrectly persisted configuration index. In previous - * version of redpanda due to the issue with incorrectly assigned - * raft configuration indicies - * (https://github.com/redpanda-data/redpanda/issues/2326) there - * may be a persistent corruption in offset translation caused by - * incorrectly persited configuration index. It may cause log - * offset to be negative. Here we check if this problem exists and - * if so apply necessary offset translation. - */ - const auto so = start_offset(); - // no prefix truncation was applied we do not need adjustment - if (so <= model::offset(0)) { - return ss::now(); - } - auto delta = _configuration_manager.offset_delta(start_offset()); + return f.then([this] { + update_follower_stats(_configuration_manager.get_latest()); + }); + }) + .then( + [this] { return _offset_translator.sync_with_log(_log, _as); }) + .then([this] { + /** + * fix for incorrectly persisted configuration index. In + * previous version of redpanda due to the issue with + * incorrectly assigned raft configuration indicies + * (https://github.com/redpanda-data/redpanda/issues/2326) there + * may be a persistent corruption in offset translation caused + * by incorrectly persited configuration index. It may cause log + * offset to be negative. Here we check if this problem exists + * and if so apply necessary offset translation. + */ + const auto so = start_offset(); + // no prefix truncation was applied we do not need adjustment + if (so <= model::offset(0)) { + return ss::now(); + } + auto delta = _configuration_manager.offset_delta( + start_offset()); - if (so >= delta) { - return ss::now(); - } - // if start offset is smaller than offset delta we need to apply - // adjustment - const configuration_manager::configuration_idx new_idx(so()); - vlog( - _ctxlog.info, - "adjusting configuration index, current start offset: {}, " - "delta: {}, new initial index: {}", - so, - delta, - new_idx); - return _configuration_manager.adjust_configuration_idx(new_idx); - }) - .then([this] { - auto next_election = clock_type::now(); - // set last heartbeat timestamp to prevent skipping first - // election - _hbeat = clock_type::time_point::min(); - auto conf = _configuration_manager.get_latest().brokers(); - if (!conf.empty() && _self.id() == conf.begin()->id()) { - // for single node scenarios arm immediate election, - // use standard election timeout otherwise. - if (conf.size() > 1) { - next_election += _jit.next_duration(); - } - } else { - // current node is not a preselected leader, add 2x jitter - // to give opportunity to the preselected leader to win - // the first round - next_election += _jit.base_duration() - + 2 * _jit.next_jitter_duration(); - } - if (!_bg.is_closed()) { - _vote_timeout.rearm(next_election); - } - }) - .then([this] { - auto last_applied = read_last_applied(); - if (last_applied > _commit_index) { - _commit_index = last_applied; - maybe_update_last_visible_index(_commit_index); - vlog( - _ctxlog.trace, "Recovered commit_index: {}", _commit_index); - } - }) - .then([this] { return _event_manager.start(); }) - .then([this] { _append_requests_buffer.start(); }) - .then([this] { - vlog( - _ctxlog.info, - "started raft, log offsets: {}, term: {}, configuration: {}", - _log.offsets(), - _term, - _configuration_manager.get_latest()); - }); - }).handle_exception_type([](const ss::broken_semaphore&){}); + if (so >= delta) { + return ss::now(); + } + // if start offset is smaller than offset delta we need to apply + // adjustment + const configuration_manager::configuration_idx new_idx(so()); + vlog( + _ctxlog.info, + "adjusting configuration index, current start offset: {}, " + "delta: {}, new initial index: {}", + so, + delta, + new_idx); + return _configuration_manager.adjust_configuration_idx(new_idx); + }) + .then([this] { + auto next_election = clock_type::now(); + // set last heartbeat timestamp to prevent skipping first + // election + _hbeat = clock_type::time_point::min(); + auto conf = _configuration_manager.get_latest().brokers(); + if (!conf.empty() && _self.id() == conf.begin()->id()) { + // for single node scenarios arm immediate election, + // use standard election timeout otherwise. + if (conf.size() > 1) { + next_election += _jit.next_duration(); + } + } else { + // current node is not a preselected leader, add 2x jitter + // to give opportunity to the preselected leader to win + // the first round + next_election += _jit.base_duration() + + 2 * _jit.next_jitter_duration(); + } + if (!_bg.is_closed()) { + _vote_timeout.rearm(next_election); + } + }) + .then([this] { + auto last_applied = read_last_applied(); + if (last_applied > _commit_index) { + _commit_index = last_applied; + maybe_update_last_visible_index(_commit_index); + vlog( + _ctxlog.trace, + "Recovered commit_index: {}", + _commit_index); + } + }) + .then([this] { return _event_manager.start(); }) + .then([this] { _append_requests_buffer.start(); }) + .then([this] { + vlog( + _ctxlog.info, + "started raft, log offsets: {}, term: {}, configuration: {}", + _log.offsets(), + _term, + _configuration_manager.get_latest()); + }); + }) + .handle_exception_type([](const ss::broken_semaphore&) {}); } ss::future<> @@ -1428,7 +1434,8 @@ ss::future consensus::vote(vote_request&& r) { .granted = false, .log_ok = false}; }) - .handle_exception_type([this, target_node_id](const ss::broken_semaphore&){ + .handle_exception_type( + [this, target_node_id](const ss::broken_semaphore&) { return vote_reply{ .target_node_id = target_node_id, .term = _term, @@ -1839,12 +1846,13 @@ consensus::do_append_entries(append_entries_request&& r) { ss::future consensus::install_snapshot(install_snapshot_request&& r) { - return _op_lock.with([this, r = std::move(r)]() mutable { - return do_install_snapshot(std::move(r)); - }).handle_exception_type([this](const ss::broken_semaphore&) { - return install_snapshot_reply{ - .term = _term, .bytes_stored = 0, .success = false}; - }); + return _op_lock + .with([this, r = std::move(r)]() mutable { + return do_install_snapshot(std::move(r)); + }) + .handle_exception_type([this](const ss::broken_semaphore&) { + return install_snapshot_reply{.term = _term, .success = false}; + }); } ss::future<> consensus::hydrate_snapshot() { @@ -2036,27 +2044,29 @@ ss::future consensus::finish_snapshot( ss::future<> consensus::write_snapshot(write_snapshot_cfg cfg) { model::offset last_included_index = cfg.last_included_index; - bool updated = co_await _op_lock.with( - [this, cfg = std::move(cfg)]() mutable { - // do nothing, we already have snapshot for this offset - // MUST be checked under the _op_lock - if (cfg.last_included_index <= _last_snapshot_index) { - return ss::make_ready_future(false); - } - - auto max_offset = last_visible_index(); - vassert( - cfg.last_included_index <= max_offset, - "Can not take snapshot, requested offset: {} is greater than max " - "snapshot offset: {}", - cfg.last_included_index, - max_offset); - - return do_write_snapshot(cfg.last_included_index, std::move(cfg.data)) - .then([] { return true; }); - }).handle_exception_type([](const ss::broken_semaphore&) { - return false; - }); + bool updated = co_await _op_lock + .with([this, cfg = std::move(cfg)]() mutable { + // do nothing, we already have snapshot for this offset + // MUST be checked under the _op_lock + if (cfg.last_included_index <= _last_snapshot_index) { + return ss::make_ready_future(false); + } + + auto max_offset = last_visible_index(); + vassert( + cfg.last_included_index <= max_offset, + "Can not take snapshot, requested offset: {} is " + "greater than max " + "snapshot offset: {}", + cfg.last_included_index, + max_offset); + + return do_write_snapshot( + cfg.last_included_index, std::move(cfg.data)) + .then([] { return true; }); + }) + .handle_exception_type( + [](const ss::broken_semaphore&) { return false; }); if (!updated) { co_return; @@ -2398,23 +2408,24 @@ consensus::next_followers_request_seq() { } ss::future<> consensus::refresh_commit_index() { - return _op_lock.get_units().then([this](ss::semaphore_units<> u) mutable { - auto f = ss::now(); + return _op_lock.get_units() + .then([this](ss::semaphore_units<> u) mutable { + auto f = ss::now(); - if (_has_pending_flushes) { - f = flush_log(); - } + if (_has_pending_flushes) { + f = flush_log(); + } - if (!is_elected_leader()) { - return f; - } - return f.then([this, u = std::move(u)]() mutable { - return do_maybe_update_leader_commit_idx(std::move(u)); - }); - }) - .handle_exception_type([](const ss::broken_semaphore&) { - //ignore exception, shutting down - }); + if (!is_elected_leader()) { + return f; + } + return f.then([this, u = std::move(u)]() mutable { + return do_maybe_update_leader_commit_idx(std::move(u)); + }); + }) + .handle_exception_type([](const ss::broken_semaphore&) { + // ignore exception, shutting down + }); } void consensus::maybe_update_leader_commit_idx() { @@ -2780,10 +2791,10 @@ consensus::prepare_transfer_leadership(vnode target_rni) { // Enforce ordering wrt anyone currently doing an append under op_lock { - try{ - auto units = co_await _op_lock.get_units(); - vlog(_ctxlog.trace, "transfer leadership: cleared oplock"); - } catch(const ss::broken_semaphore&) { + try { + auto units = co_await _op_lock.get_units(); + vlog(_ctxlog.trace, "transfer leadership: cleared oplock"); + } catch (const ss::broken_semaphore&) { co_return make_error_code(errc::shutting_down); } } @@ -3039,16 +3050,16 @@ consensus::do_transfer_leadership(std::optional target) { // (If we accepted more writes, our log could get // ahead of new leader, and it could lose election) try { - auto units = co_await _op_lock.get_units(); - do_step_down("leadership_transfer"); - if (_leader_id) { - _leader_id = std::nullopt; - trigger_leadership_notification(); - } - - co_return make_error_code(errc::success); - } catch( const ss::broken_semaphore& ) { - co_return errc::shutting_down; + auto units = co_await _op_lock.get_units(); + do_step_down("leadership_transfer"); + if (_leader_id) { + _leader_id = std::nullopt; + trigger_leadership_notification(); + } + + co_return make_error_code(errc::success); + } catch (const ss::broken_semaphore&) { + co_return errc::shutting_down; } } });