diff --git a/docs/rfcs/20200421_raft_recovery.md b/docs/rfcs/20200421_raft_recovery.md index 9297357eb974..582c5be57027 100644 --- a/docs/rfcs/20200421_raft_recovery.md +++ b/docs/rfcs/20200421_raft_recovery.md @@ -210,7 +210,7 @@ In order to make it possible new fields have to be added to // next index to send to this follower model::offset next_index; // timestamp of last append_entries_rpc call - clock_type::time_point last_append_timestamp; + clock_type::time_point last_sent_append_entries_req_timesptamp; uint64_t failed_appends{0}; bool is_learner = false; bool is_recovering = false; diff --git a/src/v/cluster/metadata_cache.cc b/src/v/cluster/metadata_cache.cc index 0bedd54b26e3..e7d784df33f2 100644 --- a/src/v/cluster/metadata_cache.cc +++ b/src/v/cluster/metadata_cache.cc @@ -111,10 +111,16 @@ ss::future metadata_cache::get_leader( } std::optional -metadata_cache::get_leader_id(const model::ntp& ntp) { +metadata_cache::get_leader_id(const model::ntp& ntp) const { return _leaders.local().get_leader(ntp); } +std::optional +metadata_cache::get_previous_leader_id(const model::ntp& ntp) const { + return _leaders.local().get_previous_leader( + model::topic_namespace_view(ntp), ntp.tp.partition); +} + /// If present returns a leader of raft0 group std::optional metadata_cache::get_controller_leader_id() { return _leaders.local().get_leader(model::controller_ntp); diff --git a/src/v/cluster/metadata_cache.h b/src/v/cluster/metadata_cache.h index ec771e5423f6..62d7d9dbc795 100644 --- a/src/v/cluster/metadata_cache.h +++ b/src/v/cluster/metadata_cache.h @@ -100,7 +100,10 @@ class metadata_cache { return contains(model::topic_namespace_view(ntp), ntp.tp.partition); } - std::optional get_leader_id(const model::ntp&); + std::optional get_leader_id(const model::ntp&) const; + + std::optional + get_previous_leader_id(const model::ntp&) const; /// Returns metadata of all topics in cache internal format // const cache_t& all_metadata() const { return _cache; } diff --git a/src/v/cluster/partition_leaders_table.cc b/src/v/cluster/partition_leaders_table.cc index 4ba905211b10..d13dcb81f2dd 100644 --- a/src/v/cluster/partition_leaders_table.cc +++ b/src/v/cluster/partition_leaders_table.cc @@ -10,6 +10,7 @@ #include "cluster/partition_leaders_table.h" #include "cluster/cluster_utils.h" +#include "cluster/logger.h" #include "cluster/topic_table.h" #include "model/fundamental.h" #include "model/metadata.h" @@ -37,19 +38,20 @@ ss::future<> partition_leaders_table::stop() { return ss::now(); } -std::optional partition_leaders_table::get_leader( +std::optional +partition_leaders_table::find_leader_meta( model::topic_namespace_view tp_ns, model::partition_id pid) const { const auto& topics_map = _topic_table.local().topics_map(); if (auto it = _leaders.find(leader_key_view{tp_ns, pid}); it != _leaders.end()) { - return it->second.id; + return it->second; } else if (auto it = topics_map.find(tp_ns); it != topics_map.end()) { // Possible leadership query for materialized topic, search for it // in the topics table. if (!it->second.is_topic_replicable()) { // Leadership properties of non replicated topic are that of its // parent - return get_leader( + return find_leader_meta( model::topic_namespace_view{ tp_ns.ns, it->second.get_source_topic()}, pid); @@ -58,6 +60,18 @@ std::optional partition_leaders_table::get_leader( return std::nullopt; } +std::optional partition_leaders_table::get_previous_leader( + model::topic_namespace_view tp_ns, model::partition_id pid) const { + const auto meta = find_leader_meta(tp_ns, pid); + return meta ? meta->previous_leader : std::nullopt; +} + +std::optional partition_leaders_table::get_leader( + model::topic_namespace_view tp_ns, model::partition_id pid) const { + const auto meta = find_leader_meta(tp_ns, pid); + return meta ? meta->current_leader : std::nullopt; +} + std::optional partition_leaders_table::get_leader(const model::ntp& ntp) const { return get_leader(model::topic_namespace_view(ntp), ntp.tp.partition); @@ -74,18 +88,29 @@ void partition_leaders_table::update_partition_leader( auto [new_it, _] = _leaders.emplace( leader_key{ model::topic_namespace(ntp.ns, ntp.tp.topic), ntp.tp.partition}, - leader_meta{leader_id, term}); + leader_meta{.current_leader = leader_id, .update_term = term}); it = new_it; + } else { + // existing partition + if (it->second.update_term > term) { + // Do nothing if update term is older + return; + } + // if current leader has value, store it as a previous leader + if (it->second.current_leader) { + it->second.previous_leader = it->second.current_leader; + } + it->second.current_leader = leader_id; + it->second.update_term = term; } - - if (it->second.update_term > term) { - // Do nothing if update term is older - return; - } - // existing partition - it->second.id = leader_id; - it->second.update_term = term; - + vlog( + clusterlog.trace, + "updated partition: {} leader: {{term: {}, current leader: {}, previous " + "leader: {}}}", + ntp, + it->second.update_term, + it->second.current_leader, + it->second.previous_leader); // notify waiters if update is setting the leader if (!leader_id) { return; diff --git a/src/v/cluster/partition_leaders_table.h b/src/v/cluster/partition_leaders_table.h index e81e46edc2b6..6ae611f05c85 100644 --- a/src/v/cluster/partition_leaders_table.h +++ b/src/v/cluster/partition_leaders_table.h @@ -22,6 +22,8 @@ #include #include +#include + namespace cluster { /// Partition leaders contains information about currently elected partition @@ -40,6 +42,14 @@ class partition_leaders_table { std::optional get_leader(model::topic_namespace_view, model::partition_id) const; + /** + * Returns previous reader of partition if available. This is required by + * Kafka metadata APIs since it require us to return former leader id even + * it the leader is not present in a given time point + */ + std::optional get_previous_leader( + model::topic_namespace_view, model::partition_id) const; + ss::future wait_for_leader( const model::ntp&, ss::lowres_clock::time_point, @@ -58,7 +68,7 @@ class partition_leaders_table { // clang-format on void for_each_leader(Func&& f) const { for (auto& [k, v] : _leaders) { - f(k.tp_ns, k.pid, v.id, v.update_term); + f(k.tp_ns, k.pid, v.current_leader, v.update_term); } } @@ -128,10 +138,18 @@ class partition_leaders_table { // in order to filter out reordered requests we store last update term struct leader_meta { - std::optional id; + // current leader id, this may be empty if a group is in the middle of + // leader election + std::optional current_leader; + // previous leader id, this is empty if and only if there were no leader + // elected for the topic before + std::optional previous_leader; model::term_id update_term; }; + std::optional + find_leader_meta(model::topic_namespace_view, model::partition_id) const; + absl::flat_hash_map _leaders; diff --git a/src/v/kafka/server/handlers/metadata.cc b/src/v/kafka/server/handlers/metadata.cc index 50a5f9dc3678..6109661397b8 100644 --- a/src/v/kafka/server/handlers/metadata.cc +++ b/src/v/kafka/server/handlers/metadata.cc @@ -13,6 +13,7 @@ #include "cluster/topics_frontend.h" #include "cluster/types.h" #include "config/configuration.h" +#include "config/node_config.h" #include "kafka/server/errors.h" #include "kafka/server/handlers/details/security.h" #include "kafka/server/handlers/topics/topic_utils.h" @@ -29,17 +30,69 @@ namespace kafka { -metadata_response::topic -make_topic_response_from_topic_metadata(model::topic_metadata&& tp_md) { +static constexpr model::node_id no_leader(-1); +/** + * We use simple heuristic to tolerate isolation of a node hosting both + * partition leader and follower. + * + * Kafka clients request metadata refresh in case they receive error that is + * related with stale metadata - f.e. NOT_LEADER. Metadata request can be + * processed by any broker and there is no general rule for that which + * broker to choose to refresh metadata from. (f.e. Java kafka client uses the + * broker with active least loaded connection.) This may lead to the situation + * in which client will ask for metadata always the same broker. When that + * broker is isolated from rest of the cluster it will never update its metadata + * view. This way the client will always receive stale metadata. + * + * This behavior may lead to a live lock in an event of network partition. If + * current partition leader is isolated from the cluster it will keep answering + * with its id in the leader_id field for that partition (according to policy + * where we return a former leader - there is no leader for that broker, it is a + * candidate). Client will retry produce or fetch request and receive NOT_LEADER + * error, this will force client to request metadata update, broker will respond + * with the same metadata and the whole cycle will loop indefinitely. + * + * In order to break the loop and force client to make progress we use following + * heuristics: + * + * 1) when current leader is unknown, return former leader (Kafka behavior) + * + * 2) when current leader is unknown and previous leader is equal to current + * node id select random replica_id as a leader (indicate leader isolation) + * + * With those heuristics we will always force the client to communicate with the + * nodes that may not be partitioned. + */ +model::node_id get_leader( + const model::ntp& ntp, + const cluster::metadata_cache& md_cache, + const std::vector& replicas) { + const auto current = md_cache.get_leader_id(ntp); + if (current) { + return *current; + } + + const auto previous = md_cache.get_previous_leader_id(ntp); + if (previous == config::node().node_id()) { + auto idx = fast_prng_source() % replicas.size(); + return replicas[idx]; + } + + return previous.value_or(no_leader); +} + +metadata_response::topic make_topic_response_from_topic_metadata( + const cluster::metadata_cache& md_cache, model::topic_metadata&& tp_md) { metadata_response::topic tp; tp.error_code = error_code::none; + auto tp_ns = tp_md.tp_ns; tp.name = std::move(tp_md.tp_ns.tp); tp.is_internal = false; // no internal topics yet std::transform( tp_md.partitions.begin(), tp_md.partitions.end(), std::back_inserter(tp.partitions), - [](model::partition_metadata& p_md) { + [tp_ns = std::move(tp_ns), &md_cache](model::partition_metadata& p_md) { std::vector replicas{}; replicas.reserve(p_md.replicas.size()); std::transform( @@ -50,7 +103,8 @@ make_topic_response_from_topic_metadata(model::topic_metadata&& tp_md) { metadata_response::partition p; p.error_code = error_code::none; p.partition_index = p_md.id; - p.leader_id = p_md.leader_node.value_or(model::node_id(-1)); + p.leader_id = get_leader( + model::ntp(tp_ns.ns, tp_ns.tp, p_md.id), md_cache, replicas); p.replica_nodes = std::move(replicas); p.isr_nodes = p.replica_nodes; p.offline_replicas = {}; @@ -95,9 +149,9 @@ create_topic(request_context& ctx, model::topic&& topic) { res, ctx.controller_api(), tout + model::timeout_clock::now()) - .then([tp_md = std::move(tp_md)]() mutable { + .then([&ctx, tp_md = std::move(tp_md)]() mutable { return make_topic_response_from_topic_metadata( - std::move(tp_md.value())); + ctx.metadata_cache(), std::move(tp_md.value())); }); }) .handle_exception([topic = std::move(topic)]( @@ -125,7 +179,8 @@ static metadata_response::topic make_topic_response( details::authorized_operations(ctx, md.tp_ns.tp)); } - auto res = make_topic_response_from_topic_metadata(std::move(md)); + auto res = make_topic_response_from_topic_metadata( + ctx.metadata_cache(), std::move(md)); res.topic_authorized_operations = auth_operations; return res; } diff --git a/src/v/raft/append_entries_buffer.cc b/src/v/raft/append_entries_buffer.cc index 2d061e244e4d..fe7273d126cc 100644 --- a/src/v/raft/append_entries_buffer.cc +++ b/src/v/raft/append_entries_buffer.cc @@ -137,14 +137,13 @@ void append_entries_buffer::propagate_results( replies.size()); auto resp_it = response_promises.begin(); for (auto& reply : replies) { - auto lstats = _consensus._log.offsets(); ss::visit( reply, - [&resp_it, &lstats](append_entries_reply r) { + [&resp_it, this](append_entries_reply r) { // this is important, we want to update response committed // offset here as we flushed after the response structure was // created - r.last_committed_log_index = lstats.committed_offset; + r.last_flushed_log_index = _consensus._flushed_offset; resp_it->set_value(r); }, [&resp_it](std::exception_ptr& e) { diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index 84d9fa2dd249..67c627671b58 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -164,7 +164,7 @@ clock_type::time_point consensus::majority_heartbeat() const { } if (auto it = _fstats.find(rni); it != _fstats.end()) { - return it->second.last_hbeat_timestamp; + return it->second.last_received_append_entries_reply_timestamp; } // if we do not know the follower state yet i.e. we have @@ -318,9 +318,9 @@ consensus::success_reply consensus::update_follower_index( _ctxlog.trace, "Updated node {} last committed log index: {}", idx.node_id, - reply.last_committed_log_index); + reply.last_flushed_log_index); idx.last_dirty_log_index = reply.last_dirty_log_index; - idx.last_committed_log_index = reply.last_committed_log_index; + idx.last_flushed_log_index = reply.last_flushed_log_index; idx.next_index = details::next_offset(idx.last_dirty_log_index); } @@ -336,7 +336,7 @@ consensus::success_reply consensus::update_follower_index( // update follower state to allow recovery of follower with // missing entries idx.last_dirty_log_index = reply.last_dirty_log_index; - idx.last_committed_log_index = reply.last_committed_log_index; + idx.last_flushed_log_index = reply.last_flushed_log_index; idx.next_index = details::next_offset(idx.last_dirty_log_index); idx.follower_state_change.broadcast(); } @@ -379,7 +379,7 @@ void consensus::maybe_promote_to_voter(vnode id) { } // do not promote to voter, learner is not up to date - if (it->second.match_index < _log.offsets().committed_offset) { + if (it->second.match_index < _flushed_offset) { return ss::now(); } @@ -418,7 +418,7 @@ void consensus::successfull_append_entries_reply( follower_index_metadata& idx, append_entries_reply reply) { // follower and leader logs matches idx.last_dirty_log_index = reply.last_dirty_log_index; - idx.last_committed_log_index = reply.last_committed_log_index; + idx.last_flushed_log_index = reply.last_flushed_log_index; idx.match_index = idx.last_dirty_log_index; idx.next_index = details::next_offset(idx.last_dirty_log_index); vlog( @@ -711,20 +711,6 @@ ss::future> consensus::do_append_replicate_relaxed( .finally([this, u = std::move(u)] { _probe.replicate_done(); }); } -void consensus::dispatch_flush_with_lock() { - if (!_has_pending_flushes) { - return; - } - (void)ss::with_gate(_bg, [this] { - return _op_lock.with([this] { - if (!_has_pending_flushes) { - return ss::make_ready_future<>(); - } - return flush_log(); - }); - }); -} - ss::future consensus::do_make_reader(storage::log_reader_config config) { // limit to last visible index @@ -1067,7 +1053,13 @@ ss::future<> consensus::do_start() { _term = lstats.dirty_offset_term; _voted_for = {}; } - + /** + * since we are starting, there were no new writes to the log + * before that point. It is safe to use dirty offset as a initial + * flushed offset since it is equal to last offset that exists on + * disk and was read in log recovery process. + */ + _flushed_offset = lstats.dirty_offset; /** * The configuration manager state may be divereged from the log * state, as log is flushed lazily, we have to make sure that the @@ -1491,7 +1483,7 @@ consensus::do_append_entries(append_entries_request&& r) { reply.group = r.meta.group; reply.term = _term; reply.last_dirty_log_index = lstats.dirty_offset; - reply.last_committed_log_index = lstats.committed_offset; + reply.last_flushed_log_index = _flushed_offset; reply.result = append_entries_reply::status::failure; _probe.append_request(); @@ -1618,10 +1610,9 @@ consensus::do_append_entries(append_entries_request&& r) { auto truncate_at = details::next_offset( model::offset(r.meta.prev_log_index)); vlog( - _ctxlog.debug, - "Truncate log, request for the same term:{}. Request offset:{} " - "is " - "earlier than what we have:{}. Truncating to: {}", + _ctxlog.info, + "Truncating log in term: {}, Request previous log index: {} is " + "earlier than log end offset: {}. Truncating to: {}", r.meta.term, r.meta.prev_log_index, lstats.dirty_offset, @@ -1642,6 +1633,10 @@ consensus::do_append_entries(append_entries_request&& r) { _last_quorum_replicated_index = std::min( details::prev_offset(truncate_at), _last_quorum_replicated_index); + // update flushed offset since truncation may happen to already + // flushed entries + _flushed_offset = std::min( + details::prev_offset(truncate_at), _flushed_offset); return _configuration_manager.truncate(truncate_at).then([this] { _probe.configuration_update(); @@ -1737,6 +1732,11 @@ ss::future<> consensus::truncate_to_latest_snapshot() { return _log.truncate_prefix(storage::truncate_prefix_config( details::next_offset(_last_snapshot_index), _scheduling.default_iopc)); + }) + .then([this] { + // when log was prefix truncate flushed offset should be equal to at + // least last snapshot index + _flushed_offset = std::max(_last_snapshot_index, _flushed_offset); }); } @@ -2015,21 +2015,34 @@ ss::future> consensus::dispatch_replicate( append_entries_reply consensus::make_append_entries_reply( vnode target_node, storage::append_result disk_results) { - auto lstats = _log.offsets(); append_entries_reply reply; reply.node_id = _self; reply.target_node_id = target_node; reply.group = _group; reply.term = _term; reply.last_dirty_log_index = disk_results.last_offset; - reply.last_committed_log_index = lstats.committed_offset; + reply.last_flushed_log_index = _flushed_offset; reply.result = append_entries_reply::status::success; return reply; } ss::future<> consensus::flush_log() { _probe.log_flushed(); - return _log.flush().then([this] { _has_pending_flushes = false; }); + auto flushed_up_to = _log.offsets().dirty_offset; + return _log.flush().then([this, flushed_up_to] { + _flushed_offset = flushed_up_to; + // TODO: remove this assertion when we will remove committed_offset + // from storage. + auto lstats = _log.offsets(); + vassert( + lstats.committed_offset >= _flushed_offset, + "Raft incorrectly tracking flushed log offset. Expected offset: {}, " + " current log offsets: {}, log: {}", + _flushed_offset, + lstats, + _log); + _has_pending_flushes = false; + }); } ss::future consensus::disk_append( @@ -2127,19 +2140,20 @@ model::term_id consensus::get_term(model::offset o) { return _log.get_term(o).value_or(model::term_id{}); } -clock_type::time_point consensus::last_append_timestamp(vnode id) { - return _fstats.get(id).last_append_timestamp; +clock_type::time_point +consensus::last_sent_append_entries_req_timesptamp(vnode id) { + return _fstats.get(id).last_sent_append_entries_req_timesptamp; } void consensus::update_node_append_timestamp(vnode id) { if (auto it = _fstats.find(id); it != _fstats.end()) { - it->second.last_append_timestamp = clock_type::now(); - update_node_hbeat_timestamp(id); + it->second.last_sent_append_entries_req_timesptamp = clock_type::now(); } } void consensus::update_node_hbeat_timestamp(vnode id) { - _fstats.get(id).last_hbeat_timestamp = clock_type::now(); + _fstats.get(id).last_received_append_entries_reply_timestamp + = clock_type::now(); } follower_req_seq consensus::next_follower_sequence(vnode id) { @@ -2245,18 +2259,17 @@ consensus::do_maybe_update_leader_commit_idx(ss::semaphore_units<> u) { // If there exists an N such that N > commitIndex, a majority // of matchIndex[i] ≥ N, and log[N].term == currentTerm: // set commitIndex = N (§5.3, §5.4). - auto majority_match = config().quorum_match( - [this, committed_offset = lstats.committed_offset](vnode id) { - // current node - we just return commited offset - if (id == _self) { - return committed_offset; - } - if (auto it = _fstats.find(id); it != _fstats.end()) { - return it->second.match_committed_index(); - } + auto majority_match = config().quorum_match([this](vnode id) { + // current node - we just return commited offset + if (id == _self) { + return _flushed_offset; + } + if (auto it = _fstats.find(id); it != _fstats.end()) { + return it->second.match_committed_index(); + } - return model::offset{}; - }); + return model::offset{}; + }); /** * we have to make sure that we do not advance committed_index beyond the * point which is readable in log. Since we are not waiting for flush to @@ -2268,7 +2281,8 @@ consensus::do_maybe_update_leader_commit_idx(ss::semaphore_units<> u) { * batcher aren't readable since some of the writes are still in flight in * segment appender. */ - majority_match = std::min(majority_match, lstats.committed_offset); + majority_match = std::min(majority_match, _flushed_offset); + if ( majority_match > _commit_index && _log.get_term(majority_match) == _term) { @@ -2299,14 +2313,12 @@ consensus::do_maybe_update_leader_commit_idx(ss::semaphore_units<> u) { } ss::future<> consensus::maybe_update_follower_commit_idx(model::offset request_commit_idx) { - auto lstats = _log.offsets(); // Raft paper: // // If leaderCommit > commitIndex, set commitIndex = // min(leaderCommit, index of last new entry) if (request_commit_idx > _commit_index) { - auto new_commit_idx = std::min( - request_commit_idx, lstats.committed_offset); + auto new_commit_idx = std::min(request_commit_idx, _flushed_offset); if (new_commit_idx != _commit_index) { _commit_index = new_commit_idx; vlog( @@ -2565,14 +2577,16 @@ consensus::do_transfer_leadership(std::optional target) { make_error_code(errc::not_leader)); } - if (_configuration_manager.get_latest_offset() > _commit_index) { + auto conf = _configuration_manager.get_latest(); + if ( + _configuration_manager.get_latest_offset() > last_visible_index() + || conf.type() == configuration_type::joint) { vlog( _ctxlog.warn, "Cannot transfer leadership during configuration change"); return ss::make_ready_future( make_error_code(errc::configuration_change_in_progress)); } - auto conf = _configuration_manager.get_latest(); auto target_rni = conf.current_config().find(*target); if (!target_rni) { @@ -2811,7 +2825,7 @@ bool consensus::should_reconnect_follower(vnode id) { } if (auto it = _fstats.find(id); it != _fstats.end()) { - auto last_at = it->second.last_hbeat_timestamp; + auto last_at = it->second.last_received_append_entries_reply_timestamp; const auto fail_count = it->second.heartbeats_failed; auto is_live = last_at + _jit.base_duration() > clock_type::now(); @@ -2882,12 +2896,12 @@ std::vector consensus::get_follower_metrics() const { ret.reserve(_fstats.size()); auto dirty_offset = _log.offsets().dirty_offset; for (const auto& f : _fstats) { - auto last_hbeat = f.second.last_hbeat_timestamp; + auto last_hbeat = f.second.last_received_append_entries_reply_timestamp; auto is_live = last_hbeat + _jit.base_duration() > clock_type::now(); ret.push_back(follower_metrics{ .id = f.first.id(), .is_learner = f.second.is_learner, - .committed_log_index = f.second.last_committed_log_index, + .committed_log_index = f.second.last_flushed_log_index, .dirty_log_index = f.second.last_dirty_log_index, .match_index = f.second.match_index, .last_heartbeat = last_hbeat, diff --git a/src/v/raft/consensus.h b/src/v/raft/consensus.h index 68d6210b083d..2093ea379e37 100644 --- a/src/v/raft/consensus.h +++ b/src/v/raft/consensus.h @@ -154,7 +154,7 @@ class consensus { const model::ntp& ntp() const { return _log.config().ntp(); } clock_type::time_point last_heartbeat() const { return _hbeat; }; - clock_type::time_point last_append_timestamp(vnode); + clock_type::time_point last_sent_append_entries_req_timesptamp(vnode); /** * \brief Persist snapshot with given data and start offset * @@ -429,9 +429,6 @@ class consensus { /// \brief _does not_ hold the lock. ss::future<> flush_log(); - /// \brief called by the vote timer, to dispatch a write under - /// the ops semaphore - void dispatch_flush_with_lock(); void maybe_step_down(); @@ -542,6 +539,7 @@ class consensus { // consensus state model::offset _commit_index; model::term_id _term; + model::offset _flushed_offset{}; // read at `ss::future<> start()` vnode _voted_for; diff --git a/src/v/raft/heartbeat_manager.cc b/src/v/raft/heartbeat_manager.cc index c3a0fffe74b9..6dac2a2e3609 100644 --- a/src/v/raft/heartbeat_manager.cc +++ b/src/v/raft/heartbeat_manager.cc @@ -83,9 +83,10 @@ static heartbeat_requests requests_for_range( if (ptr->are_heartbeats_suppressed(rni)) { return; } - auto last_append_timestamp = ptr->last_append_timestamp(rni); + auto last_sent_append_entries_req_timesptamp + = ptr->last_sent_append_entries_req_timesptamp(rni); - if (last_append_timestamp > last_heartbeat) { + if (last_sent_append_entries_req_timesptamp > last_heartbeat) { vlog( hbeatlog.trace, "Skipping sending beat to {} gr: {} last hb {}, last append " @@ -93,7 +94,8 @@ static heartbeat_requests requests_for_range( rni, ptr->group(), last_heartbeat.time_since_epoch().count(), - last_append_timestamp.time_since_epoch().count()); + last_sent_append_entries_req_timesptamp.time_since_epoch() + .count()); // we already sent heartbeat, skip it return; } diff --git a/src/v/raft/replicate_entries_stm.cc b/src/v/raft/replicate_entries_stm.cc index 3189ffe81ef8..9645dba64f66 100644 --- a/src/v/raft/replicate_entries_stm.cc +++ b/src/v/raft/replicate_entries_stm.cc @@ -64,7 +64,7 @@ ss::future> replicate_entries_stm::flush_log() { reply.term = _ptr->term(); // we just flushed offsets are the same reply.last_dirty_log_index = new_committed_offset; - reply.last_committed_log_index = new_committed_offset; + reply.last_flushed_log_index = new_committed_offset; reply.result = append_entries_reply::status::success; return ret_t(reply); }) @@ -208,7 +208,7 @@ inline bool replicate_entries_stm::should_skip_follower_request(vnode id) { const auto timeout = clock_type::now() - _ptr->_replicate_append_timeout; - return it->second.last_hbeat_timestamp < timeout + return it->second.last_received_append_entries_reply_timestamp < timeout || it->second.is_recovering; } diff --git a/src/v/raft/tests/type_serialization_tests.cc b/src/v/raft/tests/type_serialization_tests.cc index 5032d29dbbc1..eb9ea0dd3472 100644 --- a/src/v/raft/tests/type_serialization_tests.cc +++ b/src/v/raft/tests/type_serialization_tests.cc @@ -221,7 +221,7 @@ SEASTAR_THREAD_TEST_CASE(heartbeat_response_roundtrip) { .node_id = raft::vnode(model::node_id(1), model::revision_id(i)), .group = raft::group_id(i), .term = model::term_id(random_generators::get_int(-1, 1000)), - .last_committed_log_index = commited_idx, + .last_flushed_log_index = commited_idx, .last_dirty_log_index = dirty_idx, .result = raft::append_entries_reply::status::success}); } @@ -250,8 +250,8 @@ SEASTAR_THREAD_TEST_CASE(heartbeat_response_roundtrip) { BOOST_REQUIRE_EQUAL(expected[gr].group, result.meta[i].group); BOOST_REQUIRE_EQUAL(expected[gr].term, result.meta[i].term); BOOST_REQUIRE_EQUAL( - expected[gr].last_committed_log_index, - result.meta[i].last_committed_log_index); + expected[gr].last_flushed_log_index, + result.meta[i].last_flushed_log_index); BOOST_REQUIRE_EQUAL( expected[gr].last_dirty_log_index, result.meta[i].last_dirty_log_index); diff --git a/src/v/raft/types.cc b/src/v/raft/types.cc index 9a330a3e3092..484850cbede5 100644 --- a/src/v/raft/types.cc +++ b/src/v/raft/types.cc @@ -44,7 +44,7 @@ std::ostream& operator<<(std::ostream& o, const append_entries_reply& r) { << r.target_node_id << ", group: " << r.group << ", term:" << r.term << ", last_dirty_log_index:" << r.last_dirty_log_index - << ", last_committed_log_index:" << r.last_committed_log_index + << ", last_flushed_log_index:" << r.last_flushed_log_index << ", last_term_base_offset:" << r.last_term_base_offset << ", result: " << r.result << "}"; } @@ -58,7 +58,7 @@ std::ostream& operator<<(std::ostream& o, const vote_request& r) { } std::ostream& operator<<(std::ostream& o, const follower_index_metadata& i) { return o << "{node_id: " << i.node_id - << ", last_committed_log_idx: " << i.last_committed_log_index + << ", last_committed_log_idx: " << i.last_flushed_log_index << ", last_dirty_log_idx: " << i.last_dirty_log_index << ", match_index: " << i.match_index << ", next_index: " << i.next_index @@ -285,7 +285,7 @@ struct hbeat_response_array { explicit hbeat_response_array(size_t n) : groups(n) , terms(n) - , last_committed_log_index(n) + , last_flushed_log_index(n) , last_dirty_log_index(n) , last_term_base_offset(n) , revisions(n) @@ -293,7 +293,7 @@ struct hbeat_response_array { std::vector groups; std::vector terms; - std::vector last_committed_log_index; + std::vector last_flushed_log_index; std::vector last_dirty_log_index; std::vector last_term_base_offset; std::vector revisions; @@ -493,7 +493,7 @@ ss::future<> async_adl::to( constexpr bool operator()( const raft::append_entries_reply& lhs, const raft::append_entries_reply& rhs) const { - return lhs.last_committed_log_index < rhs.last_committed_log_index; + return lhs.last_flushed_log_index < rhs.last_flushed_log_index; } }; adl{}.to(out, reply.meta.size()); @@ -513,8 +513,8 @@ ss::future<> async_adl::to( encodee.groups[i] = reply.meta[i].group; encodee.terms[i] = std::max(model::term_id(-1), reply.meta[i].term); - encodee.last_committed_log_index[i] = std::max( - model::offset(-1), reply.meta[i].last_committed_log_index); + encodee.last_flushed_log_index[i] = std::max( + model::offset(-1), reply.meta[i].last_flushed_log_index); encodee.last_dirty_log_index[i] = std::max( model::offset(-1), reply.meta[i].last_dirty_log_index); encodee.last_term_base_offset[i] = std::max( @@ -528,7 +528,7 @@ ss::future<> async_adl::to( internal::encode_one_delta_array(out, encodee.terms); internal::encode_one_delta_array( - out, encodee.last_committed_log_index); + out, encodee.last_flushed_log_index); internal::encode_one_delta_array( out, encodee.last_dirty_log_index); internal::encode_one_delta_array( @@ -569,11 +569,11 @@ async_adl::from(iobuf_parser& in) { in, reply.meta[i - 1].term); } - reply.meta[0].last_committed_log_index = varlong_reader(in); + reply.meta[0].last_flushed_log_index = varlong_reader(in); for (size_t i = 1; i < size; ++i) { - reply.meta[i].last_committed_log_index + reply.meta[i].last_flushed_log_index = internal::read_one_varint_delta( - in, reply.meta[i - 1].last_committed_log_index); + in, reply.meta[i - 1].last_flushed_log_index); } reply.meta[0].last_dirty_log_index = varlong_reader(in); @@ -614,7 +614,7 @@ async_adl::from(iobuf_parser& in) { } for (auto& m : reply.meta) { - m.last_committed_log_index = decode_signed(m.last_committed_log_index); + m.last_flushed_log_index = decode_signed(m.last_flushed_log_index); m.last_dirty_log_index = decode_signed(m.last_dirty_log_index); m.last_term_base_offset = decode_signed(m.last_term_base_offset); m.node_id = raft::vnode( diff --git a/src/v/raft/types.h b/src/v/raft/types.h index 75e981456e0a..a81361c28b99 100644 --- a/src/v/raft/types.h +++ b/src/v/raft/types.h @@ -67,20 +67,20 @@ struct follower_index_metadata { vnode node_id; // index of last known log for this follower - model::offset last_committed_log_index; + model::offset last_flushed_log_index; // index of last not flushed offset model::offset last_dirty_log_index; // index of log for which leader and follower logs matches model::offset match_index; // Used to establish index persistently replicated by majority constexpr model::offset match_committed_index() const { - return std::min(last_committed_log_index, match_index); + return std::min(last_flushed_log_index, match_index); } // next index to send to this follower model::offset next_index; // timestamp of last append_entries_rpc call - clock_type::time_point last_append_timestamp; - clock_type::time_point last_hbeat_timestamp; + clock_type::time_point last_sent_append_entries_req_timesptamp; + clock_type::time_point last_received_append_entries_reply_timestamp; uint32_t heartbeats_failed{0}; // The pair of sequences used to track append entries requests sent and // received by the follower. Every time append entries request is created @@ -238,7 +238,7 @@ struct append_entries_reply { /// \brief The recipient's last log index after it applied changes to /// the log. This is used to speed up finding the correct value for the /// nextIndex with a follower that is far behind a leader - model::offset last_committed_log_index; + model::offset last_flushed_log_index; model::offset last_dirty_log_index; // the last entry base offset used for the recovery speed up, the value is // only valid for not successfull append_entries reply diff --git a/src/v/raft/vote_stm.cc b/src/v/raft/vote_stm.cc index b3be18ce8a36..1ec26eab3d76 100644 --- a/src/v/raft/vote_stm.cc +++ b/src/v/raft/vote_stm.cc @@ -246,7 +246,7 @@ ss::future<> vote_stm::update_vote_state(ss::semaphore_units<> u) { // Set last heartbeat timestamp to max as we are the leader _ptr->_hbeat = clock_type::time_point::max(); vlog(_ctxlog.info, "became the leader term:{}", _ptr->term()); - _ptr->_last_quorum_replicated_index = _ptr->_log.offsets().committed_offset; + _ptr->_last_quorum_replicated_index = _ptr->_flushed_offset; _ptr->trigger_leadership_notification(); return replicate_config_as_new_leader(std::move(u)) diff --git a/src/v/rpc/simple_protocol.cc b/src/v/rpc/simple_protocol.cc index 7f451032faaf..f9dbbb7be7dd 100644 --- a/src/v/rpc/simple_protocol.cc +++ b/src/v/rpc/simple_protocol.cc @@ -33,6 +33,9 @@ struct server_context_impl final : streaming_context { ~server_context_impl() override { res.probe().request_completed(); } const header& get_header() const final { return hdr; } void signal_body_parse() final { pr.set_value(); } + void body_parse_exception(std::exception_ptr e) final { + pr.set_exception(std::move(e)); + } server::resources res; header hdr; ss::promise<> pr; diff --git a/src/v/rpc/test/response_handler_tests.cc b/src/v/rpc/test/response_handler_tests.cc index 2e06d190de1a..e3309235e37e 100644 --- a/src/v/rpc/test/response_handler_tests.cc +++ b/src/v/rpc/test/response_handler_tests.cc @@ -12,6 +12,8 @@ #include "test_utils/fixture.h" #include + +#include using namespace std::chrono_literals; // NOLINT struct test_str_ctx final : public rpc::streaming_context { @@ -22,6 +24,7 @@ struct test_str_ctx final : public rpc::streaming_context { virtual const rpc::header& get_header() const final { return hdr; } virtual void signal_body_parse() final {} + virtual void body_parse_exception(std::exception_ptr) final {} rpc::header hdr; ss::semaphore s{1}; diff --git a/src/v/rpc/test/rpc_gen_types.h b/src/v/rpc/test/rpc_gen_types.h index 1b60cafac2d3..960c04364435 100644 --- a/src/v/rpc/test/rpc_gen_types.h +++ b/src/v/rpc/test/rpc_gen_types.h @@ -54,6 +54,8 @@ enum class failure_type { throw_exception, exceptional_future, none }; using throw_req = failure_type; -struct throw_resp {}; +struct throw_resp { + ss::sstring reply; +}; } // namespace echo diff --git a/src/v/rpc/transport.cc b/src/v/rpc/transport.cc index fa2026a9f44b..da1c58116d85 100644 --- a/src/v/rpc/transport.cc +++ b/src/v/rpc/transport.cc @@ -42,6 +42,9 @@ struct client_context_impl final : streaming_context { } const header& get_header() const final { return _h; } void signal_body_parse() final { pr.set_value(); } + void body_parse_exception(std::exception_ptr e) final { + pr.set_exception(std::move(e)); + } std::reference_wrapper _c; header _h; ss::promise<> pr; diff --git a/src/v/rpc/transport.h b/src/v/rpc/transport.h index 3c5f23b74a8a..65d5197aa9ac 100644 --- a/src/v/rpc/transport.h +++ b/src/v/rpc/transport.h @@ -143,31 +143,55 @@ class transport final : public base_transport { }; namespace internal { + +inline errc map_server_error(status status) { + switch (status) { + case status::success: + return errc::success; + case status::request_timeout: + return errc::client_request_timeout; + case rpc::status::server_error: + return errc::service_error; + case status::method_not_found: + return errc::method_not_found; + }; +}; + template -result> map_result(const header& hdr, T data) { +ss::future>> parse_result( + ss::input_stream& in, std::unique_ptr sctx) { using ret_t = result>; - auto st = static_cast(hdr.meta); + // check status first + auto st = static_cast(sctx->get_header().meta); + // success case if (st == status::success) { - rpc::client_context ctx(hdr); - ctx.data = std::move(data); - return ret_t(std::move(ctx)); - } - - if (st == status::request_timeout) { - return ret_t(errc::client_request_timeout); + return parse_type(in, sctx->get_header()) + .then_wrapped([sctx = std::move(sctx)](ss::future data_fut) { + if (data_fut.failed()) { + sctx->body_parse_exception(data_fut.get_exception()); + /** + * we want to throw an exception when body parsing failed. + * this will invalidate the connection since it may not be + * valid any more. + */ + std::rethrow_exception(data_fut.get_exception()); + } + sctx->signal_body_parse(); + return ret_t(rpc::client_context( + sctx->get_header(), std::move(data_fut.get()))); + }); } - if (st == status::server_error) { - return ret_t(errc::service_error); - } - - if (st == status::method_not_found) { - return ret_t(errc::method_not_found); - } + /** + * signal that request body is parsed since it is empty when status + * indicates server error. + */ + sctx->signal_body_parse(); - return ret_t(errc::service_error); + return ss::make_ready_future(map_server_error(st)); } + } // namespace internal template @@ -193,12 +217,7 @@ transport::send_typed(Input r, uint32_t method_id, rpc::client_opts opts) { if (!sctx) { return ss::make_ready_future(sctx.error()); } - return parse_type(_in, sctx.value()->get_header()) - .then([sctx = std::move(sctx)](Output o) { - sctx.value()->signal_body_parse(); - return internal::map_result( - sctx.value()->get_header(), std::move(o)); - }); + return internal::parse_result(_in, std::move(sctx.value())); }); } diff --git a/src/v/rpc/types.h b/src/v/rpc/types.h index b6851f7e3d06..b8a843ff32a8 100644 --- a/src/v/rpc/types.h +++ b/src/v/rpc/types.h @@ -156,6 +156,7 @@ class streaming_context { /// \brief because we parse the input as a _stream_ we need to signal /// to the dispatching thread that it can resume parsing for a new RPC virtual void signal_body_parse() = 0; + virtual void body_parse_exception(std::exception_ptr) = 0; /// \brief keep these units until destruction of context. /// usually, we want to keep the reservation of the memory size permanently @@ -233,8 +234,10 @@ struct method { /// \brief used in returned types for client::send_typed() calls template struct client_context { - explicit client_context(header h) - : hdr(std::move(h)) {} + client_context(header h, T data) + : hdr(h) + , data(std::move(data)) {} + header hdr; T data; }; diff --git a/tests/rptest/services/failure_injector.py b/tests/rptest/services/failure_injector.py index cb8d1a517c99..cff9d960f834 100644 --- a/tests/rptest/services/failure_injector.py +++ b/tests/rptest/services/failure_injector.py @@ -18,6 +18,7 @@ class FailureSpec: FAILURE_KILL = 0 FAILURE_TERMINATE = 1 FAILURE_SUSPEND = 2 + FAILURE_ISOLATE = 3 FAILURE_TYPES = [FAILURE_KILL, FAILURE_SUSPEND, FAILURE_TERMINATE] @@ -34,6 +35,12 @@ class FailureInjector: def __init__(self, redpanda): self.redpanda = redpanda + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + self._heal_all() + def inject_failure(self, spec): self.redpanda.logger.info(f"injecting failure: {spec}") self._start_func(spec.type)(spec.node) @@ -54,12 +61,16 @@ def _start_func(self, tp): return self._suspend elif tp == FailureSpec.FAILURE_TERMINATE: return self._terminate + elif tp == FailureSpec.FAILURE_ISOLATE: + return self._isolate def _stop_func(self, tp): if tp == FailureSpec.FAILURE_KILL or tp == FailureSpec.FAILURE_TERMINATE: return self._start elif tp == FailureSpec.FAILURE_SUSPEND: return self._continue + elif tp == FailureSpec.FAILURE_ISOLATE: + return self._heal def _kill(self, node): self.redpanda.logger.info( @@ -71,6 +82,30 @@ def _kill(self, node): err_msg="Redpanda failed to kill in %d seconds" % timeout_sec) + def _isolate(self, node): + self.redpanda.logger.info(f"isolating node {node.account}") + + cmd = "iptables -A OUTPUT -p tcp --destination-port 33145 -j DROP" + node.account.ssh(cmd) + cmd = "iptables -A INPUT -p tcp --destination-port 33145 -j DROP" + node.account.ssh(cmd) + + def _heal(self, node): + self.redpanda.logger.info(f"healing node {node.account}") + cmd = "iptables -D OUTPUT -p tcp --destination-port 33145 -j DROP" + node.account.ssh(cmd) + cmd = "iptables -D INPUT -p tcp --destination-port 33145 -j DROP" + node.account.ssh(cmd) + + def _heal_all(self): + self.redpanda.logger.info(f"healling all network failures") + for n in self.redpanda.nodes: + n.account.ssh("iptables -P INPUT ACCEPT") + n.account.ssh("iptables -P FORWARD ACCEPT") + n.account.ssh("iptables -P OUTPUT ACCEPT") + n.account.ssh("iptables -F") + n.account.ssh("iptables -X") + def _suspend(self, node): self.redpanda.logger.info( f"suspending redpanda on { self.redpanda.idx(node)}") diff --git a/tests/rptest/tests/raft_availability_test.py b/tests/rptest/tests/raft_availability_test.py index 95760223efa4..a6517d78fc2b 100644 --- a/tests/rptest/tests/raft_availability_test.py +++ b/tests/rptest/tests/raft_availability_test.py @@ -15,11 +15,13 @@ from ducktape.mark.resource import cluster from ducktape.mark import parametrize +from ducktape.mark import ignore from ducktape.utils.util import wait_until from rptest.clients.kafka_cat import KafkaCat from rptest.clients.rpk import RpkTool, RpkException from rptest.clients.types import TopicSpec +from rptest.services.failure_injector import FailureInjector, FailureSpec from rptest.tests.redpanda_test import RedpandaTest from rptest.services.rpk_producer import RpkProducer from rptest.services.kaf_producer import KafProducer @@ -175,11 +177,13 @@ def _ping_pong(self): rpk = RpkTool(self.redpanda) payload = str(random.randint(0, 1000)) - + start = time.time() offset = rpk.produce(self.topic, "tkey", payload, timeout=5) consumed = kc.consume_one(self.topic, 0, offset) + latency = time.time() - start self.logger.info( - f"_ping_pong produced '{payload}' consumed '{consumed}'") + f"_ping_pong produced '{payload}' consumed '{consumed}' in {(latency)*1000.0:.2f} ms" + ) if consumed['payload'] != payload: raise RuntimeError(f"expected '{payload}' got '{consumed}'") @@ -481,3 +485,31 @@ def test_leader_transfers_recovery(self, acks): producer.stop() producer.wait() producer.free() + + @cluster(num_nodes=3) + def test_follower_isolation(self): + """ + Simplest HA test. Stop the leader for our partition. Validate that + the cluster remains available afterwards, and that the expected + peer takes over as the new leader. + """ + # Find which node is the leader + initial_leader_id, replicas = self._wait_for_leader() + assert initial_leader_id == replicas[0] + + self._expect_available() + + leader_node = self.redpanda.get_node(initial_leader_id) + self.logger.info( + f"Initial leader {initial_leader_id} {leader_node.account.hostname}" + ) + + with FailureInjector(self.redpanda) as fi: + # isolate one of the followers + fi.inject_failure( + FailureSpec(FailureSpec.FAILURE_ISOLATE, + self.redpanda.get_node(replicas[1]))) + + # expect messages to be produced and consumed without a timeout + for i in range(0, 128): + self._ping_pong()