diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index 03b35530eebe..bc8b07fade32 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -916,10 +916,9 @@ consensus::update_group_member(model::broker broker) { return replicate_configuration(std::move(u), std::move(cfg)); }) - .handle_exception_type( - [](const ss::broken_semaphore&) { - return make_error_code(errc::shutting_down); - }); + .handle_exception_type([](const ss::broken_semaphore&) { + return make_error_code(errc::shutting_down); + }); } template @@ -950,10 +949,9 @@ ss::future consensus::change_configuration(Func&& f) { } return ss::make_ready_future(res.error()); }) - .handle_exception_type( - [](const ss::broken_semaphore&) { - return make_error_code(errc::shutting_down); - }); + .handle_exception_type([](const ss::broken_semaphore&) { + return make_error_code(errc::shutting_down); + }); } ss::future consensus::add_group_members( @@ -1129,201 +1127,209 @@ ss::future<> consensus::start() { ss::future<> consensus::do_start() { vlog(_ctxlog.info, "Starting"); - return _op_lock.with([this] { - read_voted_for(); - - /* - * temporary workaround: - * - * if the group's ntp matches the pattern, then do not load the initial - * configuration snapshto from the keyvalue store. more info here: - * - * https://github.com/redpanda-data/redpanda/issues/1870 - */ - const auto& ntp = _log.config().ntp(); - const auto normalized_ntp = fmt::format( - "{}.{}.{}", ntp.ns(), ntp.tp.topic(), ntp.tp.partition()); - const auto& patterns = config::shard_local_cfg() - .full_raft_configuration_recovery_pattern(); - auto initial_state = std::any_of( - patterns.cbegin(), - patterns.cend(), - [&normalized_ntp](const ss::sstring& pattern) { - return pattern == "*" || normalized_ntp.starts_with(pattern); - }); - if (!initial_state) { - initial_state = is_initial_state(); - } - - vlog( - _ctxlog.info, - "Starting with voted_for {} term {} initial_state {}", - _voted_for, - _term, - initial_state); - - return _configuration_manager.start(initial_state, _self.revision()) - .then([this] { - vlog( - _ctxlog.trace, - "Configuration manager started: {}", - _configuration_manager); - }) - .then([this, initial_state] { - offset_translator::must_reset must_reset{initial_state}; - - absl::btree_map offset2delta; - for (auto it = _configuration_manager.begin(); - it != _configuration_manager.end(); - ++it) { - offset2delta.emplace(it->first, it->second.idx()); - } - - auto bootstrap = offset_translator::bootstrap_state{ - .offset2delta = std::move(offset2delta), - .highest_known_offset - = _configuration_manager.get_highest_known_offset(), - }; + return _op_lock + .with([this] { + read_voted_for(); + + /* + * temporary workaround: + * + * if the group's ntp matches the pattern, then do not load the + * initial configuration snapshto from the keyvalue store. more info + * here: + * + * https://github.com/redpanda-data/redpanda/issues/1870 + */ + const auto& ntp = _log.config().ntp(); + const auto normalized_ntp = fmt::format( + "{}.{}.{}", ntp.ns(), ntp.tp.topic(), ntp.tp.partition()); + const auto& patterns = config::shard_local_cfg() + .full_raft_configuration_recovery_pattern(); + auto initial_state = std::any_of( + patterns.cbegin(), + patterns.cend(), + [&normalized_ntp](const ss::sstring& pattern) { + return pattern == "*" || normalized_ntp.starts_with(pattern); + }); + if (!initial_state) { + initial_state = is_initial_state(); + } - return _offset_translator.start(must_reset, std::move(bootstrap)); - }) - .then([this] { return hydrate_snapshot(); }) - .then([this] { - vlog( - _ctxlog.debug, - "Starting raft bootstrap from {}", - _configuration_manager.get_highest_known_offset()); - return details::read_bootstrap_state( - _log, - model::next_offset( - _configuration_manager.get_highest_known_offset()), - _as); - }) - .then([this](configuration_bootstrap_state st) { - auto lstats = _log.offsets(); + vlog( + _ctxlog.info, + "Starting with voted_for {} term {} initial_state {}", + _voted_for, + _term, + initial_state); + + return _configuration_manager.start(initial_state, _self.revision()) + .then([this] { + vlog( + _ctxlog.trace, + "Configuration manager started: {}", + _configuration_manager); + }) + .then([this, initial_state] { + offset_translator::must_reset must_reset{initial_state}; + + absl::btree_map offset2delta; + for (auto it = _configuration_manager.begin(); + it != _configuration_manager.end(); + ++it) { + offset2delta.emplace(it->first, it->second.idx()); + } - vlog(_ctxlog.info, "Read bootstrap state: {}", st); - vlog(_ctxlog.info, "Current log offsets: {}", lstats); + auto bootstrap = offset_translator::bootstrap_state{ + .offset2delta = std::move(offset2delta), + .highest_known_offset + = _configuration_manager.get_highest_known_offset(), + }; - // if log term is newer than the one comming from voted_for state, - // we reset voted_for state - if (lstats.dirty_offset_term > _term) { - _term = lstats.dirty_offset_term; - _voted_for = {}; - } - /** - * since we are starting, there were no new writes to the log - * before that point. It is safe to use dirty offset as a initial - * flushed offset since it is equal to last offset that exists on - * disk and was read in log recovery process. - */ - _flushed_offset = lstats.dirty_offset; - /** - * The configuration manager state may be divereged from the log - * state, as log is flushed lazily, we have to make sure that the - * log and configuration manager has exactly the same offsets - * range - */ - vlog( - _ctxlog.info, - "Truncating configurations at {}", - lstats.dirty_offset); + return _offset_translator.start( + must_reset, std::move(bootstrap)); + }) + .then([this] { return hydrate_snapshot(); }) + .then([this] { + vlog( + _ctxlog.debug, + "Starting raft bootstrap from {}", + _configuration_manager.get_highest_known_offset()); + return details::read_bootstrap_state( + _log, + model::next_offset( + _configuration_manager.get_highest_known_offset()), + _as); + }) + .then([this](configuration_bootstrap_state st) { + auto lstats = _log.offsets(); - auto f = _configuration_manager.truncate( - model::next_offset(lstats.dirty_offset)); + vlog(_ctxlog.info, "Read bootstrap state: {}", st); + vlog(_ctxlog.info, "Current log offsets: {}", lstats); - /** - * We read some batches from the log and have to update the - * configuration manager. - */ - if (st.config_batches_seen() > 0) { - f = f.then([this, st = std::move(st)]() mutable { - return _configuration_manager.add( - std::move(st).release_configurations()); - }); - } + // if log term is newer than the one comming from voted_for + // state, we reset voted_for state + if (lstats.dirty_offset_term > _term) { + _term = lstats.dirty_offset_term; + _voted_for = {}; + } + /** + * since we are starting, there were no new writes to the log + * before that point. It is safe to use dirty offset as a + * initial flushed offset since it is equal to last offset that + * exists on disk and was read in log recovery process. + */ + _flushed_offset = lstats.dirty_offset; + /** + * The configuration manager state may be divereged from the log + * state, as log is flushed lazily, we have to make sure that + * the log and configuration manager has exactly the same + * offsets range + */ + vlog( + _ctxlog.info, + "Truncating configurations at {}", + lstats.dirty_offset); + + auto f = _configuration_manager.truncate( + model::next_offset(lstats.dirty_offset)); + + /** + * We read some batches from the log and have to update the + * configuration manager. + */ + if (st.config_batches_seen() > 0) { + f = f.then([this, st = std::move(st)]() mutable { + return _configuration_manager.add( + std::move(st).release_configurations()); + }); + } - return f.then([this] { - update_follower_stats(_configuration_manager.get_latest()); - }); - }) - .then([this] { return _offset_translator.sync_with_log(_log, _as); }) - .then([this] { - /** - * fix for incorrectly persisted configuration index. In previous - * version of redpanda due to the issue with incorrectly assigned - * raft configuration indicies - * (https://github.com/redpanda-data/redpanda/issues/2326) there - * may be a persistent corruption in offset translation caused by - * incorrectly persited configuration index. It may cause log - * offset to be negative. Here we check if this problem exists and - * if so apply necessary offset translation. - */ - const auto so = start_offset(); - // no prefix truncation was applied we do not need adjustment - if (so <= model::offset(0)) { - return ss::now(); - } - auto delta = _configuration_manager.offset_delta(start_offset()); + return f.then([this] { + update_follower_stats(_configuration_manager.get_latest()); + }); + }) + .then( + [this] { return _offset_translator.sync_with_log(_log, _as); }) + .then([this] { + /** + * fix for incorrectly persisted configuration index. In + * previous version of redpanda due to the issue with + * incorrectly assigned raft configuration indicies + * (https://github.com/redpanda-data/redpanda/issues/2326) there + * may be a persistent corruption in offset translation caused + * by incorrectly persited configuration index. It may cause log + * offset to be negative. Here we check if this problem exists + * and if so apply necessary offset translation. + */ + const auto so = start_offset(); + // no prefix truncation was applied we do not need adjustment + if (so <= model::offset(0)) { + return ss::now(); + } + auto delta = _configuration_manager.offset_delta( + start_offset()); - if (so >= delta) { - return ss::now(); - } - // if start offset is smaller than offset delta we need to apply - // adjustment - const configuration_manager::configuration_idx new_idx(so()); - vlog( - _ctxlog.info, - "adjusting configuration index, current start offset: {}, " - "delta: {}, new initial index: {}", - so, - delta, - new_idx); - return _configuration_manager.adjust_configuration_idx(new_idx); - }) - .then([this] { - auto next_election = clock_type::now(); - // set last heartbeat timestamp to prevent skipping first - // election - _hbeat = clock_type::time_point::min(); - auto conf = _configuration_manager.get_latest().brokers(); - if (!conf.empty() && _self.id() == conf.begin()->id()) { - // for single node scenarios arm immediate election, - // use standard election timeout otherwise. - if (conf.size() > 1) { - next_election += _jit.next_duration(); - } - } else { - // current node is not a preselected leader, add 2x jitter - // to give opportunity to the preselected leader to win - // the first round - next_election += _jit.base_duration() - + 2 * _jit.next_jitter_duration(); - } - if (!_bg.is_closed()) { - _vote_timeout.rearm(next_election); - } - }) - .then([this] { - auto last_applied = read_last_applied(); - if (last_applied > _commit_index) { - _commit_index = last_applied; - maybe_update_last_visible_index(_commit_index); - vlog( - _ctxlog.trace, "Recovered commit_index: {}", _commit_index); - } - }) - .then([this] { return _event_manager.start(); }) - .then([this] { _append_requests_buffer.start(); }) - .then([this] { - vlog( - _ctxlog.info, - "started raft, log offsets: {}, term: {}, configuration: {}", - _log.offsets(), - _term, - _configuration_manager.get_latest()); - }); - }).handle_exception_type([](const ss::broken_semaphore&){}); + if (so >= delta) { + return ss::now(); + } + // if start offset is smaller than offset delta we need to apply + // adjustment + const configuration_manager::configuration_idx new_idx(so()); + vlog( + _ctxlog.info, + "adjusting configuration index, current start offset: {}, " + "delta: {}, new initial index: {}", + so, + delta, + new_idx); + return _configuration_manager.adjust_configuration_idx(new_idx); + }) + .then([this] { + auto next_election = clock_type::now(); + // set last heartbeat timestamp to prevent skipping first + // election + _hbeat = clock_type::time_point::min(); + auto conf = _configuration_manager.get_latest().brokers(); + if (!conf.empty() && _self.id() == conf.begin()->id()) { + // for single node scenarios arm immediate election, + // use standard election timeout otherwise. + if (conf.size() > 1) { + next_election += _jit.next_duration(); + } + } else { + // current node is not a preselected leader, add 2x jitter + // to give opportunity to the preselected leader to win + // the first round + next_election += _jit.base_duration() + + 2 * _jit.next_jitter_duration(); + } + if (!_bg.is_closed()) { + _vote_timeout.rearm(next_election); + } + }) + .then([this] { + auto last_applied = read_last_applied(); + if (last_applied > _commit_index) { + _commit_index = last_applied; + maybe_update_last_visible_index(_commit_index); + vlog( + _ctxlog.trace, + "Recovered commit_index: {}", + _commit_index); + } + }) + .then([this] { return _event_manager.start(); }) + .then([this] { _append_requests_buffer.start(); }) + .then([this] { + vlog( + _ctxlog.info, + "started raft, log offsets: {}, term: {}, configuration: {}", + _log.offsets(), + _term, + _configuration_manager.get_latest()); + }); + }) + .handle_exception_type([](const ss::broken_semaphore&) {}); } ss::future<> @@ -1428,7 +1434,8 @@ ss::future consensus::vote(vote_request&& r) { .granted = false, .log_ok = false}; }) - .handle_exception_type([this, target_node_id](const ss::broken_semaphore&){ + .handle_exception_type( + [this, target_node_id](const ss::broken_semaphore&) { return vote_reply{ .target_node_id = target_node_id, .term = _term, @@ -1839,12 +1846,13 @@ consensus::do_append_entries(append_entries_request&& r) { ss::future consensus::install_snapshot(install_snapshot_request&& r) { - return _op_lock.with([this, r = std::move(r)]() mutable { - return do_install_snapshot(std::move(r)); - }).handle_exception_type([this](const ss::broken_semaphore&) { - return install_snapshot_reply{ - .term = _term, .bytes_stored = 0, .success = false}; - }); + return _op_lock + .with([this, r = std::move(r)]() mutable { + return do_install_snapshot(std::move(r)); + }) + .handle_exception_type([this](const ss::broken_semaphore&) { + return install_snapshot_reply{.term = _term, .success = false}; + }); } ss::future<> consensus::hydrate_snapshot() { @@ -2036,27 +2044,29 @@ ss::future consensus::finish_snapshot( ss::future<> consensus::write_snapshot(write_snapshot_cfg cfg) { model::offset last_included_index = cfg.last_included_index; - bool updated = co_await _op_lock.with( - [this, cfg = std::move(cfg)]() mutable { - // do nothing, we already have snapshot for this offset - // MUST be checked under the _op_lock - if (cfg.last_included_index <= _last_snapshot_index) { - return ss::make_ready_future(false); - } - - auto max_offset = last_visible_index(); - vassert( - cfg.last_included_index <= max_offset, - "Can not take snapshot, requested offset: {} is greater than max " - "snapshot offset: {}", - cfg.last_included_index, - max_offset); - - return do_write_snapshot(cfg.last_included_index, std::move(cfg.data)) - .then([] { return true; }); - }).handle_exception_type([](const ss::broken_semaphore&) { - return false; - }); + bool updated = co_await _op_lock + .with([this, cfg = std::move(cfg)]() mutable { + // do nothing, we already have snapshot for this offset + // MUST be checked under the _op_lock + if (cfg.last_included_index <= _last_snapshot_index) { + return ss::make_ready_future(false); + } + + auto max_offset = last_visible_index(); + vassert( + cfg.last_included_index <= max_offset, + "Can not take snapshot, requested offset: {} is " + "greater than max " + "snapshot offset: {}", + cfg.last_included_index, + max_offset); + + return do_write_snapshot( + cfg.last_included_index, std::move(cfg.data)) + .then([] { return true; }); + }) + .handle_exception_type( + [](const ss::broken_semaphore&) { return false; }); if (!updated) { co_return; @@ -2398,23 +2408,24 @@ consensus::next_followers_request_seq() { } ss::future<> consensus::refresh_commit_index() { - return _op_lock.get_units().then([this](ss::semaphore_units<> u) mutable { - auto f = ss::now(); + return _op_lock.get_units() + .then([this](ss::semaphore_units<> u) mutable { + auto f = ss::now(); - if (_has_pending_flushes) { - f = flush_log(); - } + if (_has_pending_flushes) { + f = flush_log(); + } - if (!is_elected_leader()) { - return f; - } - return f.then([this, u = std::move(u)]() mutable { - return do_maybe_update_leader_commit_idx(std::move(u)); - }); - }) - .handle_exception_type([](const ss::broken_semaphore&) { - //ignore exception, shutting down - }); + if (!is_elected_leader()) { + return f; + } + return f.then([this, u = std::move(u)]() mutable { + return do_maybe_update_leader_commit_idx(std::move(u)); + }); + }) + .handle_exception_type([](const ss::broken_semaphore&) { + // ignore exception, shutting down + }); } void consensus::maybe_update_leader_commit_idx() { @@ -2780,10 +2791,10 @@ consensus::prepare_transfer_leadership(vnode target_rni) { // Enforce ordering wrt anyone currently doing an append under op_lock { - try{ - auto units = co_await _op_lock.get_units(); - vlog(_ctxlog.trace, "transfer leadership: cleared oplock"); - } catch(const ss::broken_semaphore&) { + try { + auto units = co_await _op_lock.get_units(); + vlog(_ctxlog.trace, "transfer leadership: cleared oplock"); + } catch (const ss::broken_semaphore&) { co_return make_error_code(errc::shutting_down); } } @@ -3039,16 +3050,16 @@ consensus::do_transfer_leadership(std::optional target) { // (If we accepted more writes, our log could get // ahead of new leader, and it could lose election) try { - auto units = co_await _op_lock.get_units(); - do_step_down("leadership_transfer"); - if (_leader_id) { - _leader_id = std::nullopt; - trigger_leadership_notification(); - } - - co_return make_error_code(errc::success); - } catch( const ss::broken_semaphore& ) { - co_return errc::shutting_down; + auto units = co_await _op_lock.get_units(); + do_step_down("leadership_transfer"); + if (_leader_id) { + _leader_id = std::nullopt; + trigger_leadership_notification(); + } + + co_return make_error_code(errc::success); + } catch (const ss::broken_semaphore&) { + co_return errc::shutting_down; } } });