From 9c32f97e914e7bba1a985bb0532062551458aae3 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Tue, 2 Nov 2021 13:22:32 +0100 Subject: [PATCH 01/21] r/types: renamed follower timestamps to better reflect their semantics Signed-off-by: Michal Maslanka (cherry picked from commit 87ec58a660456a75a3e4c73fc49af60aa64e4393) --- docs/rfcs/20200421_raft_recovery.md | 2 +- src/v/raft/consensus.cc | 16 +++++++++------- src/v/raft/consensus.h | 2 +- src/v/raft/heartbeat_manager.cc | 8 +++++--- src/v/raft/replicate_entries_stm.cc | 2 +- src/v/raft/types.h | 4 ++-- 6 files changed, 19 insertions(+), 15 deletions(-) diff --git a/docs/rfcs/20200421_raft_recovery.md b/docs/rfcs/20200421_raft_recovery.md index 9297357eb974..582c5be57027 100644 --- a/docs/rfcs/20200421_raft_recovery.md +++ b/docs/rfcs/20200421_raft_recovery.md @@ -210,7 +210,7 @@ In order to make it possible new fields have to be added to // next index to send to this follower model::offset next_index; // timestamp of last append_entries_rpc call - clock_type::time_point last_append_timestamp; + clock_type::time_point last_sent_append_entries_req_timesptamp; uint64_t failed_appends{0}; bool is_learner = false; bool is_recovering = false; diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index 84d9fa2dd249..de19f712b004 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -164,7 +164,7 @@ clock_type::time_point consensus::majority_heartbeat() const { } if (auto it = _fstats.find(rni); it != _fstats.end()) { - return it->second.last_hbeat_timestamp; + return it->second.last_received_append_entries_reply_timestamp; } // if we do not know the follower state yet i.e. we have @@ -2127,19 +2127,21 @@ model::term_id consensus::get_term(model::offset o) { return _log.get_term(o).value_or(model::term_id{}); } -clock_type::time_point consensus::last_append_timestamp(vnode id) { - return _fstats.get(id).last_append_timestamp; +clock_type::time_point +consensus::last_sent_append_entries_req_timesptamp(vnode id) { + return _fstats.get(id).last_sent_append_entries_req_timesptamp; } void consensus::update_node_append_timestamp(vnode id) { if (auto it = _fstats.find(id); it != _fstats.end()) { - it->second.last_append_timestamp = clock_type::now(); + it->second.last_sent_append_entries_req_timesptamp = clock_type::now(); update_node_hbeat_timestamp(id); } } void consensus::update_node_hbeat_timestamp(vnode id) { - _fstats.get(id).last_hbeat_timestamp = clock_type::now(); + _fstats.get(id).last_received_append_entries_reply_timestamp + = clock_type::now(); } follower_req_seq consensus::next_follower_sequence(vnode id) { @@ -2811,7 +2813,7 @@ bool consensus::should_reconnect_follower(vnode id) { } if (auto it = _fstats.find(id); it != _fstats.end()) { - auto last_at = it->second.last_hbeat_timestamp; + auto last_at = it->second.last_received_append_entries_reply_timestamp; const auto fail_count = it->second.heartbeats_failed; auto is_live = last_at + _jit.base_duration() > clock_type::now(); @@ -2882,7 +2884,7 @@ std::vector consensus::get_follower_metrics() const { ret.reserve(_fstats.size()); auto dirty_offset = _log.offsets().dirty_offset; for (const auto& f : _fstats) { - auto last_hbeat = f.second.last_hbeat_timestamp; + auto last_hbeat = f.second.last_received_append_entries_reply_timestamp; auto is_live = last_hbeat + _jit.base_duration() > clock_type::now(); ret.push_back(follower_metrics{ .id = f.first.id(), diff --git a/src/v/raft/consensus.h b/src/v/raft/consensus.h index 68d6210b083d..137e33add2a3 100644 --- a/src/v/raft/consensus.h +++ b/src/v/raft/consensus.h @@ -154,7 +154,7 @@ class consensus { const model::ntp& ntp() const { return _log.config().ntp(); } clock_type::time_point last_heartbeat() const { return _hbeat; }; - clock_type::time_point last_append_timestamp(vnode); + clock_type::time_point last_sent_append_entries_req_timesptamp(vnode); /** * \brief Persist snapshot with given data and start offset * diff --git a/src/v/raft/heartbeat_manager.cc b/src/v/raft/heartbeat_manager.cc index c3a0fffe74b9..6dac2a2e3609 100644 --- a/src/v/raft/heartbeat_manager.cc +++ b/src/v/raft/heartbeat_manager.cc @@ -83,9 +83,10 @@ static heartbeat_requests requests_for_range( if (ptr->are_heartbeats_suppressed(rni)) { return; } - auto last_append_timestamp = ptr->last_append_timestamp(rni); + auto last_sent_append_entries_req_timesptamp + = ptr->last_sent_append_entries_req_timesptamp(rni); - if (last_append_timestamp > last_heartbeat) { + if (last_sent_append_entries_req_timesptamp > last_heartbeat) { vlog( hbeatlog.trace, "Skipping sending beat to {} gr: {} last hb {}, last append " @@ -93,7 +94,8 @@ static heartbeat_requests requests_for_range( rni, ptr->group(), last_heartbeat.time_since_epoch().count(), - last_append_timestamp.time_since_epoch().count()); + last_sent_append_entries_req_timesptamp.time_since_epoch() + .count()); // we already sent heartbeat, skip it return; } diff --git a/src/v/raft/replicate_entries_stm.cc b/src/v/raft/replicate_entries_stm.cc index 3189ffe81ef8..2456c9800a93 100644 --- a/src/v/raft/replicate_entries_stm.cc +++ b/src/v/raft/replicate_entries_stm.cc @@ -208,7 +208,7 @@ inline bool replicate_entries_stm::should_skip_follower_request(vnode id) { const auto timeout = clock_type::now() - _ptr->_replicate_append_timeout; - return it->second.last_hbeat_timestamp < timeout + return it->second.last_received_append_entries_reply_timestamp < timeout || it->second.is_recovering; } diff --git a/src/v/raft/types.h b/src/v/raft/types.h index 75e981456e0a..ace4c9c918fc 100644 --- a/src/v/raft/types.h +++ b/src/v/raft/types.h @@ -79,8 +79,8 @@ struct follower_index_metadata { // next index to send to this follower model::offset next_index; // timestamp of last append_entries_rpc call - clock_type::time_point last_append_timestamp; - clock_type::time_point last_hbeat_timestamp; + clock_type::time_point last_sent_append_entries_req_timesptamp; + clock_type::time_point last_received_append_entries_reply_timestamp; uint32_t heartbeats_failed{0}; // The pair of sequences used to track append entries requests sent and // received by the follower. Every time append entries request is created From 82a0739f4fc04da7855ed085d0938647c9fcff9b Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Tue, 2 Nov 2021 13:24:49 +0100 Subject: [PATCH 02/21] r/consensus: do not update reply timestamp when sending request Do not update last received append entries reply timestamp when updating last sent append entries timestamp. Signed-off-by: Michal Maslanka (cherry picked from commit 2703dd438e13e60b36e1bb66da25a30081651bcd) --- src/v/raft/consensus.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index de19f712b004..f5e7ffcbadab 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -2135,7 +2135,6 @@ consensus::last_sent_append_entries_req_timesptamp(vnode id) { void consensus::update_node_append_timestamp(vnode id) { if (auto it = _fstats.find(id); it != _fstats.end()) { it->second.last_sent_append_entries_req_timesptamp = clock_type::now(); - update_node_hbeat_timestamp(id); } } From 9bb9007d2efca0feef7c7f1eb63b712e06aef2c3 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Thu, 4 Nov 2021 17:22:33 +0100 Subject: [PATCH 03/21] c/partition_leaders_table: store previous partition leader Kafka Metadata API responses should contain leader id even tho the partition leader is unknown in a given instance of time. In Kafka the previous partition leader is returned when leader id is unknown. In order to be able to act according to Kafka behavior without loosing the ability to track the current leader state added storing the previous leader id into partitions leader table. Signed-off-by: Michal Maslanka (cherry picked from commit 061b46189c764b8fe810180b4883d761927b9e4b) --- src/v/cluster/partition_leaders_table.cc | 51 ++++++++++++++++++------ src/v/cluster/partition_leaders_table.h | 22 +++++++++- 2 files changed, 58 insertions(+), 15 deletions(-) diff --git a/src/v/cluster/partition_leaders_table.cc b/src/v/cluster/partition_leaders_table.cc index 4ba905211b10..d13dcb81f2dd 100644 --- a/src/v/cluster/partition_leaders_table.cc +++ b/src/v/cluster/partition_leaders_table.cc @@ -10,6 +10,7 @@ #include "cluster/partition_leaders_table.h" #include "cluster/cluster_utils.h" +#include "cluster/logger.h" #include "cluster/topic_table.h" #include "model/fundamental.h" #include "model/metadata.h" @@ -37,19 +38,20 @@ ss::future<> partition_leaders_table::stop() { return ss::now(); } -std::optional partition_leaders_table::get_leader( +std::optional +partition_leaders_table::find_leader_meta( model::topic_namespace_view tp_ns, model::partition_id pid) const { const auto& topics_map = _topic_table.local().topics_map(); if (auto it = _leaders.find(leader_key_view{tp_ns, pid}); it != _leaders.end()) { - return it->second.id; + return it->second; } else if (auto it = topics_map.find(tp_ns); it != topics_map.end()) { // Possible leadership query for materialized topic, search for it // in the topics table. if (!it->second.is_topic_replicable()) { // Leadership properties of non replicated topic are that of its // parent - return get_leader( + return find_leader_meta( model::topic_namespace_view{ tp_ns.ns, it->second.get_source_topic()}, pid); @@ -58,6 +60,18 @@ std::optional partition_leaders_table::get_leader( return std::nullopt; } +std::optional partition_leaders_table::get_previous_leader( + model::topic_namespace_view tp_ns, model::partition_id pid) const { + const auto meta = find_leader_meta(tp_ns, pid); + return meta ? meta->previous_leader : std::nullopt; +} + +std::optional partition_leaders_table::get_leader( + model::topic_namespace_view tp_ns, model::partition_id pid) const { + const auto meta = find_leader_meta(tp_ns, pid); + return meta ? meta->current_leader : std::nullopt; +} + std::optional partition_leaders_table::get_leader(const model::ntp& ntp) const { return get_leader(model::topic_namespace_view(ntp), ntp.tp.partition); @@ -74,18 +88,29 @@ void partition_leaders_table::update_partition_leader( auto [new_it, _] = _leaders.emplace( leader_key{ model::topic_namespace(ntp.ns, ntp.tp.topic), ntp.tp.partition}, - leader_meta{leader_id, term}); + leader_meta{.current_leader = leader_id, .update_term = term}); it = new_it; + } else { + // existing partition + if (it->second.update_term > term) { + // Do nothing if update term is older + return; + } + // if current leader has value, store it as a previous leader + if (it->second.current_leader) { + it->second.previous_leader = it->second.current_leader; + } + it->second.current_leader = leader_id; + it->second.update_term = term; } - - if (it->second.update_term > term) { - // Do nothing if update term is older - return; - } - // existing partition - it->second.id = leader_id; - it->second.update_term = term; - + vlog( + clusterlog.trace, + "updated partition: {} leader: {{term: {}, current leader: {}, previous " + "leader: {}}}", + ntp, + it->second.update_term, + it->second.current_leader, + it->second.previous_leader); // notify waiters if update is setting the leader if (!leader_id) { return; diff --git a/src/v/cluster/partition_leaders_table.h b/src/v/cluster/partition_leaders_table.h index e81e46edc2b6..6ae611f05c85 100644 --- a/src/v/cluster/partition_leaders_table.h +++ b/src/v/cluster/partition_leaders_table.h @@ -22,6 +22,8 @@ #include #include +#include + namespace cluster { /// Partition leaders contains information about currently elected partition @@ -40,6 +42,14 @@ class partition_leaders_table { std::optional get_leader(model::topic_namespace_view, model::partition_id) const; + /** + * Returns previous reader of partition if available. This is required by + * Kafka metadata APIs since it require us to return former leader id even + * it the leader is not present in a given time point + */ + std::optional get_previous_leader( + model::topic_namespace_view, model::partition_id) const; + ss::future wait_for_leader( const model::ntp&, ss::lowres_clock::time_point, @@ -58,7 +68,7 @@ class partition_leaders_table { // clang-format on void for_each_leader(Func&& f) const { for (auto& [k, v] : _leaders) { - f(k.tp_ns, k.pid, v.id, v.update_term); + f(k.tp_ns, k.pid, v.current_leader, v.update_term); } } @@ -128,10 +138,18 @@ class partition_leaders_table { // in order to filter out reordered requests we store last update term struct leader_meta { - std::optional id; + // current leader id, this may be empty if a group is in the middle of + // leader election + std::optional current_leader; + // previous leader id, this is empty if and only if there were no leader + // elected for the topic before + std::optional previous_leader; model::term_id update_term; }; + std::optional + find_leader_meta(model::topic_namespace_view, model::partition_id) const; + absl::flat_hash_map _leaders; From 80c43f3fc3d26a5d5f2e8f0c46fdbe54fac760ae Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Mon, 8 Nov 2021 10:59:00 +0100 Subject: [PATCH 04/21] c/metadata_cache: added const qualifier to get_leader method Signed-off-by: Michal Maslanka (cherry picked from commit 3dc000cf36c390fd8ecab821a559c7c1bfa8d90e) --- src/v/cluster/metadata_cache.cc | 2 +- src/v/cluster/metadata_cache.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/v/cluster/metadata_cache.cc b/src/v/cluster/metadata_cache.cc index 0bedd54b26e3..a3ab8c5420a1 100644 --- a/src/v/cluster/metadata_cache.cc +++ b/src/v/cluster/metadata_cache.cc @@ -111,7 +111,7 @@ ss::future metadata_cache::get_leader( } std::optional -metadata_cache::get_leader_id(const model::ntp& ntp) { +metadata_cache::get_leader_id(const model::ntp& ntp) const { return _leaders.local().get_leader(ntp); } diff --git a/src/v/cluster/metadata_cache.h b/src/v/cluster/metadata_cache.h index ec771e5423f6..fffa8c2246cf 100644 --- a/src/v/cluster/metadata_cache.h +++ b/src/v/cluster/metadata_cache.h @@ -100,7 +100,7 @@ class metadata_cache { return contains(model::topic_namespace_view(ntp), ntp.tp.partition); } - std::optional get_leader_id(const model::ntp&); + std::optional get_leader_id(const model::ntp&) const; /// Returns metadata of all topics in cache internal format // const cache_t& all_metadata() const { return _cache; } From bc6b061fe5adc3f15669b3e58a9162ee1073d040 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Mon, 8 Nov 2021 10:59:25 +0100 Subject: [PATCH 05/21] c/metadata_cache: added api to query previous partition leader Signed-off-by: Michal Maslanka (cherry picked from commit d8fd61d006c805507995079e964375fe64592dff) --- src/v/cluster/metadata_cache.cc | 6 ++++++ src/v/cluster/metadata_cache.h | 3 +++ 2 files changed, 9 insertions(+) diff --git a/src/v/cluster/metadata_cache.cc b/src/v/cluster/metadata_cache.cc index a3ab8c5420a1..e7d784df33f2 100644 --- a/src/v/cluster/metadata_cache.cc +++ b/src/v/cluster/metadata_cache.cc @@ -115,6 +115,12 @@ metadata_cache::get_leader_id(const model::ntp& ntp) const { return _leaders.local().get_leader(ntp); } +std::optional +metadata_cache::get_previous_leader_id(const model::ntp& ntp) const { + return _leaders.local().get_previous_leader( + model::topic_namespace_view(ntp), ntp.tp.partition); +} + /// If present returns a leader of raft0 group std::optional metadata_cache::get_controller_leader_id() { return _leaders.local().get_leader(model::controller_ntp); diff --git a/src/v/cluster/metadata_cache.h b/src/v/cluster/metadata_cache.h index fffa8c2246cf..62d7d9dbc795 100644 --- a/src/v/cluster/metadata_cache.h +++ b/src/v/cluster/metadata_cache.h @@ -101,6 +101,9 @@ class metadata_cache { } std::optional get_leader_id(const model::ntp&) const; + + std::optional + get_previous_leader_id(const model::ntp&) const; /// Returns metadata of all topics in cache internal format // const cache_t& all_metadata() const { return _cache; } From 5e9e05ff59eef2b7753589d66a71bd0c2251cba9 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Mon, 8 Nov 2021 11:23:37 +0100 Subject: [PATCH 06/21] k/metadata: use heuristic when there is no leader id available We use simple heuristic to tolerate isolation of a node hosting both partition leader and follower. Kafka clients request metadata refresh in case they receive error that is related with stale metadata - f.e. NOT_LEADER. Metadata request can be processed by any broker and there is no general rule for that which broker to choose to refresh metadata from. (f.e. Java kafka client uses the broker with active least loaded connection.) This may lead to the situation in which client will ask for metadata always the same broker. When that broker is isolated from rest of the cluster it will never update its metadata view. This way the client will always receive stale metadata. This behavior may lead to a live lock in an event of network partition. If current partition leader is isolated from the cluster it will keep answering with its id in the leader_id field for that partition (according to policy where we return a former leader - there is no leader for that broker, it is a candidate). Client will retry produce or fetch request and receive NOT_LEADER error, this will force client to request metadata update, broker will respond with the same metadata and the whole cycle will loop indefinitely. In order to break the loop and force client to make progress we use following heuristics: 1) when current leader is unknown, return former leader (Kafka behavior) 2) when current leader is unknown and previous leader is equal to current node id select random replica_id as a leader (indicate leader isolation) With those heuristics we will always force the client to communicate with the nodes that may not be partitioned. Signed-off-by: Michal Maslanka (cherry picked from commit 2a0269f142950acfc562f1cedc03b85c117edd7c) --- src/v/kafka/server/handlers/metadata.cc | 69 ++++++++++++++++++++++--- 1 file changed, 62 insertions(+), 7 deletions(-) diff --git a/src/v/kafka/server/handlers/metadata.cc b/src/v/kafka/server/handlers/metadata.cc index 50a5f9dc3678..6109661397b8 100644 --- a/src/v/kafka/server/handlers/metadata.cc +++ b/src/v/kafka/server/handlers/metadata.cc @@ -13,6 +13,7 @@ #include "cluster/topics_frontend.h" #include "cluster/types.h" #include "config/configuration.h" +#include "config/node_config.h" #include "kafka/server/errors.h" #include "kafka/server/handlers/details/security.h" #include "kafka/server/handlers/topics/topic_utils.h" @@ -29,17 +30,69 @@ namespace kafka { -metadata_response::topic -make_topic_response_from_topic_metadata(model::topic_metadata&& tp_md) { +static constexpr model::node_id no_leader(-1); +/** + * We use simple heuristic to tolerate isolation of a node hosting both + * partition leader and follower. + * + * Kafka clients request metadata refresh in case they receive error that is + * related with stale metadata - f.e. NOT_LEADER. Metadata request can be + * processed by any broker and there is no general rule for that which + * broker to choose to refresh metadata from. (f.e. Java kafka client uses the + * broker with active least loaded connection.) This may lead to the situation + * in which client will ask for metadata always the same broker. When that + * broker is isolated from rest of the cluster it will never update its metadata + * view. This way the client will always receive stale metadata. + * + * This behavior may lead to a live lock in an event of network partition. If + * current partition leader is isolated from the cluster it will keep answering + * with its id in the leader_id field for that partition (according to policy + * where we return a former leader - there is no leader for that broker, it is a + * candidate). Client will retry produce or fetch request and receive NOT_LEADER + * error, this will force client to request metadata update, broker will respond + * with the same metadata and the whole cycle will loop indefinitely. + * + * In order to break the loop and force client to make progress we use following + * heuristics: + * + * 1) when current leader is unknown, return former leader (Kafka behavior) + * + * 2) when current leader is unknown and previous leader is equal to current + * node id select random replica_id as a leader (indicate leader isolation) + * + * With those heuristics we will always force the client to communicate with the + * nodes that may not be partitioned. + */ +model::node_id get_leader( + const model::ntp& ntp, + const cluster::metadata_cache& md_cache, + const std::vector& replicas) { + const auto current = md_cache.get_leader_id(ntp); + if (current) { + return *current; + } + + const auto previous = md_cache.get_previous_leader_id(ntp); + if (previous == config::node().node_id()) { + auto idx = fast_prng_source() % replicas.size(); + return replicas[idx]; + } + + return previous.value_or(no_leader); +} + +metadata_response::topic make_topic_response_from_topic_metadata( + const cluster::metadata_cache& md_cache, model::topic_metadata&& tp_md) { metadata_response::topic tp; tp.error_code = error_code::none; + auto tp_ns = tp_md.tp_ns; tp.name = std::move(tp_md.tp_ns.tp); tp.is_internal = false; // no internal topics yet std::transform( tp_md.partitions.begin(), tp_md.partitions.end(), std::back_inserter(tp.partitions), - [](model::partition_metadata& p_md) { + [tp_ns = std::move(tp_ns), &md_cache](model::partition_metadata& p_md) { std::vector replicas{}; replicas.reserve(p_md.replicas.size()); std::transform( @@ -50,7 +103,8 @@ make_topic_response_from_topic_metadata(model::topic_metadata&& tp_md) { metadata_response::partition p; p.error_code = error_code::none; p.partition_index = p_md.id; - p.leader_id = p_md.leader_node.value_or(model::node_id(-1)); + p.leader_id = get_leader( + model::ntp(tp_ns.ns, tp_ns.tp, p_md.id), md_cache, replicas); p.replica_nodes = std::move(replicas); p.isr_nodes = p.replica_nodes; p.offline_replicas = {}; @@ -95,9 +149,9 @@ create_topic(request_context& ctx, model::topic&& topic) { res, ctx.controller_api(), tout + model::timeout_clock::now()) - .then([tp_md = std::move(tp_md)]() mutable { + .then([&ctx, tp_md = std::move(tp_md)]() mutable { return make_topic_response_from_topic_metadata( - std::move(tp_md.value())); + ctx.metadata_cache(), std::move(tp_md.value())); }); }) .handle_exception([topic = std::move(topic)]( @@ -125,7 +179,8 @@ static metadata_response::topic make_topic_response( details::authorized_operations(ctx, md.tp_ns.tp)); } - auto res = make_topic_response_from_topic_metadata(std::move(md)); + auto res = make_topic_response_from_topic_metadata( + ctx.metadata_cache(), std::move(md)); res.topic_authorized_operations = auth_operations; return res; } From f799d6c8da4c79e0a68d8f54d346f12144152e7c Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Thu, 4 Nov 2021 17:29:22 +0100 Subject: [PATCH 07/21] ducky: added isolate node failure to failure injector Signed-off-by: Michal Maslanka (cherry picked from commit 7a83955324780381a722cb76f4132cb6f523501f) --- tests/rptest/services/failure_injector.py | 35 +++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/tests/rptest/services/failure_injector.py b/tests/rptest/services/failure_injector.py index cb8d1a517c99..cff9d960f834 100644 --- a/tests/rptest/services/failure_injector.py +++ b/tests/rptest/services/failure_injector.py @@ -18,6 +18,7 @@ class FailureSpec: FAILURE_KILL = 0 FAILURE_TERMINATE = 1 FAILURE_SUSPEND = 2 + FAILURE_ISOLATE = 3 FAILURE_TYPES = [FAILURE_KILL, FAILURE_SUSPEND, FAILURE_TERMINATE] @@ -34,6 +35,12 @@ class FailureInjector: def __init__(self, redpanda): self.redpanda = redpanda + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + self._heal_all() + def inject_failure(self, spec): self.redpanda.logger.info(f"injecting failure: {spec}") self._start_func(spec.type)(spec.node) @@ -54,12 +61,16 @@ def _start_func(self, tp): return self._suspend elif tp == FailureSpec.FAILURE_TERMINATE: return self._terminate + elif tp == FailureSpec.FAILURE_ISOLATE: + return self._isolate def _stop_func(self, tp): if tp == FailureSpec.FAILURE_KILL or tp == FailureSpec.FAILURE_TERMINATE: return self._start elif tp == FailureSpec.FAILURE_SUSPEND: return self._continue + elif tp == FailureSpec.FAILURE_ISOLATE: + return self._heal def _kill(self, node): self.redpanda.logger.info( @@ -71,6 +82,30 @@ def _kill(self, node): err_msg="Redpanda failed to kill in %d seconds" % timeout_sec) + def _isolate(self, node): + self.redpanda.logger.info(f"isolating node {node.account}") + + cmd = "iptables -A OUTPUT -p tcp --destination-port 33145 -j DROP" + node.account.ssh(cmd) + cmd = "iptables -A INPUT -p tcp --destination-port 33145 -j DROP" + node.account.ssh(cmd) + + def _heal(self, node): + self.redpanda.logger.info(f"healing node {node.account}") + cmd = "iptables -D OUTPUT -p tcp --destination-port 33145 -j DROP" + node.account.ssh(cmd) + cmd = "iptables -D INPUT -p tcp --destination-port 33145 -j DROP" + node.account.ssh(cmd) + + def _heal_all(self): + self.redpanda.logger.info(f"healling all network failures") + for n in self.redpanda.nodes: + n.account.ssh("iptables -P INPUT ACCEPT") + n.account.ssh("iptables -P FORWARD ACCEPT") + n.account.ssh("iptables -P OUTPUT ACCEPT") + n.account.ssh("iptables -F") + n.account.ssh("iptables -X") + def _suspend(self, node): self.redpanda.logger.info( f"suspending redpanda on { self.redpanda.idx(node)}") From c6a136cca27186e3356a72d79ddf90f1174c0100 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Thu, 4 Nov 2021 17:30:07 +0100 Subject: [PATCH 08/21] ducky: log ping-pong e2e latency in raft availability tests Signed-off-by: Michal Maslanka (cherry picked from commit 61cde14fc2f2285c041258d930d9ef502fb9d2cf) --- tests/rptest/tests/raft_availability_test.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/rptest/tests/raft_availability_test.py b/tests/rptest/tests/raft_availability_test.py index 95760223efa4..fa548da9e5e8 100644 --- a/tests/rptest/tests/raft_availability_test.py +++ b/tests/rptest/tests/raft_availability_test.py @@ -175,11 +175,13 @@ def _ping_pong(self): rpk = RpkTool(self.redpanda) payload = str(random.randint(0, 1000)) - + start = time.time() offset = rpk.produce(self.topic, "tkey", payload, timeout=5) consumed = kc.consume_one(self.topic, 0, offset) + latency = time.time() - start self.logger.info( - f"_ping_pong produced '{payload}' consumed '{consumed}'") + f"_ping_pong produced '{payload}' consumed '{consumed}' in {(latency)*1000.0:.2f} ms" + ) if consumed['payload'] != payload: raise RuntimeError(f"expected '{payload}' got '{consumed}'") From 56e067d62d6928fcd851710e9bc7233b64f1b46b Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Thu, 4 Nov 2021 17:31:37 +0100 Subject: [PATCH 09/21] ducky: added follower isolation raft availability test Signed-off-by: Michal Maslanka (cherry picked from commit a698c3419c2b9deb7b8a4ba456a77565b0174a9e) --- tests/rptest/tests/raft_availability_test.py | 30 ++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/tests/rptest/tests/raft_availability_test.py b/tests/rptest/tests/raft_availability_test.py index fa548da9e5e8..a6517d78fc2b 100644 --- a/tests/rptest/tests/raft_availability_test.py +++ b/tests/rptest/tests/raft_availability_test.py @@ -15,11 +15,13 @@ from ducktape.mark.resource import cluster from ducktape.mark import parametrize +from ducktape.mark import ignore from ducktape.utils.util import wait_until from rptest.clients.kafka_cat import KafkaCat from rptest.clients.rpk import RpkTool, RpkException from rptest.clients.types import TopicSpec +from rptest.services.failure_injector import FailureInjector, FailureSpec from rptest.tests.redpanda_test import RedpandaTest from rptest.services.rpk_producer import RpkProducer from rptest.services.kaf_producer import KafProducer @@ -483,3 +485,31 @@ def test_leader_transfers_recovery(self, acks): producer.stop() producer.wait() producer.free() + + @cluster(num_nodes=3) + def test_follower_isolation(self): + """ + Simplest HA test. Stop the leader for our partition. Validate that + the cluster remains available afterwards, and that the expected + peer takes over as the new leader. + """ + # Find which node is the leader + initial_leader_id, replicas = self._wait_for_leader() + assert initial_leader_id == replicas[0] + + self._expect_available() + + leader_node = self.redpanda.get_node(initial_leader_id) + self.logger.info( + f"Initial leader {initial_leader_id} {leader_node.account.hostname}" + ) + + with FailureInjector(self.redpanda) as fi: + # isolate one of the followers + fi.inject_failure( + FailureSpec(FailureSpec.FAILURE_ISOLATE, + self.redpanda.get_node(replicas[1]))) + + # expect messages to be produced and consumed without a timeout + for i in range(0, 128): + self._ping_pong() From e50e25436b3c5beace4b0ce68f0aadf44662aa69 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Fri, 11 Jun 2021 10:10:07 +0200 Subject: [PATCH 10/21] r/consensus: removed unused dispatch_flush_with_lock method Signed-off-by: Michal Maslanka (cherry picked from commit 60170bb2b2bb1b658f6ffda324fbee5cf76d23c3) --- src/v/raft/consensus.cc | 14 -------------- src/v/raft/consensus.h | 3 --- 2 files changed, 17 deletions(-) diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index f5e7ffcbadab..0deaf73012fa 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -711,20 +711,6 @@ ss::future> consensus::do_append_replicate_relaxed( .finally([this, u = std::move(u)] { _probe.replicate_done(); }); } -void consensus::dispatch_flush_with_lock() { - if (!_has_pending_flushes) { - return; - } - (void)ss::with_gate(_bg, [this] { - return _op_lock.with([this] { - if (!_has_pending_flushes) { - return ss::make_ready_future<>(); - } - return flush_log(); - }); - }); -} - ss::future consensus::do_make_reader(storage::log_reader_config config) { // limit to last visible index diff --git a/src/v/raft/consensus.h b/src/v/raft/consensus.h index 137e33add2a3..b8691dbd62ce 100644 --- a/src/v/raft/consensus.h +++ b/src/v/raft/consensus.h @@ -429,9 +429,6 @@ class consensus { /// \brief _does not_ hold the lock. ss::future<> flush_log(); - /// \brief called by the vote timer, to dispatch a write under - /// the ops semaphore - void dispatch_flush_with_lock(); void maybe_step_down(); From 6ef966a704578d00d8a07cd7ae89dd738f638cbc Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Fri, 11 Jun 2021 10:23:04 +0200 Subject: [PATCH 11/21] r/consensus: tracking flushed offset inside consensus Tracking last flushed offset inside `raft::consensus` as a part of its internal state. This approach will allow us to stop relying on committed offset information provided by storage layer and use it more like a file system where user has to track the offset which were safely persisted. Signed-off-by: Michal Maslanka (cherry picked from commit 5d30a3923c4fd3ea46cdbc4dbdf2bddbf53f636c) --- src/v/raft/consensus.cc | 24 ++++++++++++++++++++++-- src/v/raft/consensus.h | 1 + 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index 0deaf73012fa..b6a00705491e 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -1053,7 +1053,13 @@ ss::future<> consensus::do_start() { _term = lstats.dirty_offset_term; _voted_for = {}; } - + /** + * since we are starting, there were no new writes to the log + * before that point. It is safe to use dirty offset as a initial + * flushed offset since it is equal to last offset that exists on + * disk and was read in log recovery process. + */ + _flushed_offset = lstats.dirty_offset; /** * The configuration manager state may be divereged from the log * state, as log is flushed lazily, we have to make sure that the @@ -2015,7 +2021,21 @@ append_entries_reply consensus::make_append_entries_reply( ss::future<> consensus::flush_log() { _probe.log_flushed(); - return _log.flush().then([this] { _has_pending_flushes = false; }); + auto flushed_up_to = _log.offsets().dirty_offset; + return _log.flush().then([this, flushed_up_to] { + _flushed_offset = flushed_up_to; + // TODO: remove this assertion when we will remove committed_offset + // from storage. + auto lstats = _log.offsets(); + vassert( + lstats.committed_offset >= _flushed_offset, + "Raft incorrectly tracking flushed log offset. Expected offset: {}, " + " current log offsets: {}, log: {}", + _flushed_offset, + lstats, + _log); + _has_pending_flushes = false; + }); } ss::future consensus::disk_append( diff --git a/src/v/raft/consensus.h b/src/v/raft/consensus.h index b8691dbd62ce..2093ea379e37 100644 --- a/src/v/raft/consensus.h +++ b/src/v/raft/consensus.h @@ -539,6 +539,7 @@ class consensus { // consensus state model::offset _commit_index; model::term_id _term; + model::offset _flushed_offset{}; // read at `ss::future<> start()` vnode _voted_for; From 76839badd3642924eb8fd282ef33b1c739aa93d4 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Fri, 11 Jun 2021 10:25:46 +0200 Subject: [PATCH 12/21] r/consensus: removed usage of log committed offset from raft Do not using log committed offset in raft protocol implementation will allow us to simplify offset management in storage layer. This way the only source of information about which offsets were safely stored on disk will be raft protocol that manages underlying log flushes. Signed-off-by: Michal Maslanka (cherry picked from commit 8f3d2d0c0108057172399f12aa873abced73046d) --- src/v/raft/append_entries_buffer.cc | 5 ++--- src/v/raft/consensus.cc | 35 +++++++++++++---------------- src/v/raft/vote_stm.cc | 2 +- 3 files changed, 19 insertions(+), 23 deletions(-) diff --git a/src/v/raft/append_entries_buffer.cc b/src/v/raft/append_entries_buffer.cc index 2d061e244e4d..ec5ac44c93af 100644 --- a/src/v/raft/append_entries_buffer.cc +++ b/src/v/raft/append_entries_buffer.cc @@ -137,14 +137,13 @@ void append_entries_buffer::propagate_results( replies.size()); auto resp_it = response_promises.begin(); for (auto& reply : replies) { - auto lstats = _consensus._log.offsets(); ss::visit( reply, - [&resp_it, &lstats](append_entries_reply r) { + [&resp_it, this](append_entries_reply r) { // this is important, we want to update response committed // offset here as we flushed after the response structure was // created - r.last_committed_log_index = lstats.committed_offset; + r.last_committed_log_index = _consensus._flushed_offset; resp_it->set_value(r); }, [&resp_it](std::exception_ptr& e) { diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index b6a00705491e..e9e9ecedf9d1 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -379,7 +379,7 @@ void consensus::maybe_promote_to_voter(vnode id) { } // do not promote to voter, learner is not up to date - if (it->second.match_index < _log.offsets().committed_offset) { + if (it->second.match_index < _flushed_offset) { return ss::now(); } @@ -1483,7 +1483,7 @@ consensus::do_append_entries(append_entries_request&& r) { reply.group = r.meta.group; reply.term = _term; reply.last_dirty_log_index = lstats.dirty_offset; - reply.last_committed_log_index = lstats.committed_offset; + reply.last_committed_log_index = _flushed_offset; reply.result = append_entries_reply::status::failure; _probe.append_request(); @@ -2007,14 +2007,13 @@ ss::future> consensus::dispatch_replicate( append_entries_reply consensus::make_append_entries_reply( vnode target_node, storage::append_result disk_results) { - auto lstats = _log.offsets(); append_entries_reply reply; reply.node_id = _self; reply.target_node_id = target_node; reply.group = _group; reply.term = _term; reply.last_dirty_log_index = disk_results.last_offset; - reply.last_committed_log_index = lstats.committed_offset; + reply.last_committed_log_index = _flushed_offset; reply.result = append_entries_reply::status::success; return reply; } @@ -2252,18 +2251,17 @@ consensus::do_maybe_update_leader_commit_idx(ss::semaphore_units<> u) { // If there exists an N such that N > commitIndex, a majority // of matchIndex[i] ≥ N, and log[N].term == currentTerm: // set commitIndex = N (§5.3, §5.4). - auto majority_match = config().quorum_match( - [this, committed_offset = lstats.committed_offset](vnode id) { - // current node - we just return commited offset - if (id == _self) { - return committed_offset; - } - if (auto it = _fstats.find(id); it != _fstats.end()) { - return it->second.match_committed_index(); - } + auto majority_match = config().quorum_match([this](vnode id) { + // current node - we just return commited offset + if (id == _self) { + return _flushed_offset; + } + if (auto it = _fstats.find(id); it != _fstats.end()) { + return it->second.match_committed_index(); + } - return model::offset{}; - }); + return model::offset{}; + }); /** * we have to make sure that we do not advance committed_index beyond the * point which is readable in log. Since we are not waiting for flush to @@ -2275,7 +2273,8 @@ consensus::do_maybe_update_leader_commit_idx(ss::semaphore_units<> u) { * batcher aren't readable since some of the writes are still in flight in * segment appender. */ - majority_match = std::min(majority_match, lstats.committed_offset); + majority_match = std::min(majority_match, _flushed_offset); + if ( majority_match > _commit_index && _log.get_term(majority_match) == _term) { @@ -2306,14 +2305,12 @@ consensus::do_maybe_update_leader_commit_idx(ss::semaphore_units<> u) { } ss::future<> consensus::maybe_update_follower_commit_idx(model::offset request_commit_idx) { - auto lstats = _log.offsets(); // Raft paper: // // If leaderCommit > commitIndex, set commitIndex = // min(leaderCommit, index of last new entry) if (request_commit_idx > _commit_index) { - auto new_commit_idx = std::min( - request_commit_idx, lstats.committed_offset); + auto new_commit_idx = std::min(request_commit_idx, _flushed_offset); if (new_commit_idx != _commit_index) { _commit_index = new_commit_idx; vlog( diff --git a/src/v/raft/vote_stm.cc b/src/v/raft/vote_stm.cc index b3be18ce8a36..1ec26eab3d76 100644 --- a/src/v/raft/vote_stm.cc +++ b/src/v/raft/vote_stm.cc @@ -246,7 +246,7 @@ ss::future<> vote_stm::update_vote_state(ss::semaphore_units<> u) { // Set last heartbeat timestamp to max as we are the leader _ptr->_hbeat = clock_type::time_point::max(); vlog(_ctxlog.info, "became the leader term:{}", _ptr->term()); - _ptr->_last_quorum_replicated_index = _ptr->_log.offsets().committed_offset; + _ptr->_last_quorum_replicated_index = _ptr->_flushed_offset; _ptr->trigger_leadership_notification(); return replicate_config_as_new_leader(std::move(u)) From 48d441bb7c83f8032230ea7136415448292702d6 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Fri, 11 Jun 2021 11:03:30 +0200 Subject: [PATCH 13/21] r/types: renamed last_committed_log_index to last_flushed_log_index Renamed the `last_committed_log_index` field to `last_flushed_log_index` to better reflect the field meaning and prevent confusion with raft `commit_index` Signed-off-by: Michal Maslanka (cherry picked from commit 607119cbc71542334b85352bd886b8a7d7afb22f) --- src/v/raft/append_entries_buffer.cc | 2 +- src/v/raft/consensus.cc | 14 ++++++------ src/v/raft/replicate_entries_stm.cc | 2 +- src/v/raft/tests/type_serialization_tests.cc | 6 ++--- src/v/raft/types.cc | 24 ++++++++++---------- src/v/raft/types.h | 6 ++--- 6 files changed, 27 insertions(+), 27 deletions(-) diff --git a/src/v/raft/append_entries_buffer.cc b/src/v/raft/append_entries_buffer.cc index ec5ac44c93af..fe7273d126cc 100644 --- a/src/v/raft/append_entries_buffer.cc +++ b/src/v/raft/append_entries_buffer.cc @@ -143,7 +143,7 @@ void append_entries_buffer::propagate_results( // this is important, we want to update response committed // offset here as we flushed after the response structure was // created - r.last_committed_log_index = _consensus._flushed_offset; + r.last_flushed_log_index = _consensus._flushed_offset; resp_it->set_value(r); }, [&resp_it](std::exception_ptr& e) { diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index e9e9ecedf9d1..9d9c7e2e6260 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -318,9 +318,9 @@ consensus::success_reply consensus::update_follower_index( _ctxlog.trace, "Updated node {} last committed log index: {}", idx.node_id, - reply.last_committed_log_index); + reply.last_flushed_log_index); idx.last_dirty_log_index = reply.last_dirty_log_index; - idx.last_committed_log_index = reply.last_committed_log_index; + idx.last_flushed_log_index = reply.last_flushed_log_index; idx.next_index = details::next_offset(idx.last_dirty_log_index); } @@ -336,7 +336,7 @@ consensus::success_reply consensus::update_follower_index( // update follower state to allow recovery of follower with // missing entries idx.last_dirty_log_index = reply.last_dirty_log_index; - idx.last_committed_log_index = reply.last_committed_log_index; + idx.last_flushed_log_index = reply.last_flushed_log_index; idx.next_index = details::next_offset(idx.last_dirty_log_index); idx.follower_state_change.broadcast(); } @@ -418,7 +418,7 @@ void consensus::successfull_append_entries_reply( follower_index_metadata& idx, append_entries_reply reply) { // follower and leader logs matches idx.last_dirty_log_index = reply.last_dirty_log_index; - idx.last_committed_log_index = reply.last_committed_log_index; + idx.last_flushed_log_index = reply.last_flushed_log_index; idx.match_index = idx.last_dirty_log_index; idx.next_index = details::next_offset(idx.last_dirty_log_index); vlog( @@ -1483,7 +1483,7 @@ consensus::do_append_entries(append_entries_request&& r) { reply.group = r.meta.group; reply.term = _term; reply.last_dirty_log_index = lstats.dirty_offset; - reply.last_committed_log_index = _flushed_offset; + reply.last_flushed_log_index = _flushed_offset; reply.result = append_entries_reply::status::failure; _probe.append_request(); @@ -2013,7 +2013,7 @@ append_entries_reply consensus::make_append_entries_reply( reply.group = _group; reply.term = _term; reply.last_dirty_log_index = disk_results.last_offset; - reply.last_committed_log_index = _flushed_offset; + reply.last_flushed_log_index = _flushed_offset; reply.result = append_entries_reply::status::success; return reply; } @@ -2891,7 +2891,7 @@ std::vector consensus::get_follower_metrics() const { ret.push_back(follower_metrics{ .id = f.first.id(), .is_learner = f.second.is_learner, - .committed_log_index = f.second.last_committed_log_index, + .committed_log_index = f.second.last_flushed_log_index, .dirty_log_index = f.second.last_dirty_log_index, .match_index = f.second.match_index, .last_heartbeat = last_hbeat, diff --git a/src/v/raft/replicate_entries_stm.cc b/src/v/raft/replicate_entries_stm.cc index 2456c9800a93..9645dba64f66 100644 --- a/src/v/raft/replicate_entries_stm.cc +++ b/src/v/raft/replicate_entries_stm.cc @@ -64,7 +64,7 @@ ss::future> replicate_entries_stm::flush_log() { reply.term = _ptr->term(); // we just flushed offsets are the same reply.last_dirty_log_index = new_committed_offset; - reply.last_committed_log_index = new_committed_offset; + reply.last_flushed_log_index = new_committed_offset; reply.result = append_entries_reply::status::success; return ret_t(reply); }) diff --git a/src/v/raft/tests/type_serialization_tests.cc b/src/v/raft/tests/type_serialization_tests.cc index 5032d29dbbc1..eb9ea0dd3472 100644 --- a/src/v/raft/tests/type_serialization_tests.cc +++ b/src/v/raft/tests/type_serialization_tests.cc @@ -221,7 +221,7 @@ SEASTAR_THREAD_TEST_CASE(heartbeat_response_roundtrip) { .node_id = raft::vnode(model::node_id(1), model::revision_id(i)), .group = raft::group_id(i), .term = model::term_id(random_generators::get_int(-1, 1000)), - .last_committed_log_index = commited_idx, + .last_flushed_log_index = commited_idx, .last_dirty_log_index = dirty_idx, .result = raft::append_entries_reply::status::success}); } @@ -250,8 +250,8 @@ SEASTAR_THREAD_TEST_CASE(heartbeat_response_roundtrip) { BOOST_REQUIRE_EQUAL(expected[gr].group, result.meta[i].group); BOOST_REQUIRE_EQUAL(expected[gr].term, result.meta[i].term); BOOST_REQUIRE_EQUAL( - expected[gr].last_committed_log_index, - result.meta[i].last_committed_log_index); + expected[gr].last_flushed_log_index, + result.meta[i].last_flushed_log_index); BOOST_REQUIRE_EQUAL( expected[gr].last_dirty_log_index, result.meta[i].last_dirty_log_index); diff --git a/src/v/raft/types.cc b/src/v/raft/types.cc index 9a330a3e3092..484850cbede5 100644 --- a/src/v/raft/types.cc +++ b/src/v/raft/types.cc @@ -44,7 +44,7 @@ std::ostream& operator<<(std::ostream& o, const append_entries_reply& r) { << r.target_node_id << ", group: " << r.group << ", term:" << r.term << ", last_dirty_log_index:" << r.last_dirty_log_index - << ", last_committed_log_index:" << r.last_committed_log_index + << ", last_flushed_log_index:" << r.last_flushed_log_index << ", last_term_base_offset:" << r.last_term_base_offset << ", result: " << r.result << "}"; } @@ -58,7 +58,7 @@ std::ostream& operator<<(std::ostream& o, const vote_request& r) { } std::ostream& operator<<(std::ostream& o, const follower_index_metadata& i) { return o << "{node_id: " << i.node_id - << ", last_committed_log_idx: " << i.last_committed_log_index + << ", last_committed_log_idx: " << i.last_flushed_log_index << ", last_dirty_log_idx: " << i.last_dirty_log_index << ", match_index: " << i.match_index << ", next_index: " << i.next_index @@ -285,7 +285,7 @@ struct hbeat_response_array { explicit hbeat_response_array(size_t n) : groups(n) , terms(n) - , last_committed_log_index(n) + , last_flushed_log_index(n) , last_dirty_log_index(n) , last_term_base_offset(n) , revisions(n) @@ -293,7 +293,7 @@ struct hbeat_response_array { std::vector groups; std::vector terms; - std::vector last_committed_log_index; + std::vector last_flushed_log_index; std::vector last_dirty_log_index; std::vector last_term_base_offset; std::vector revisions; @@ -493,7 +493,7 @@ ss::future<> async_adl::to( constexpr bool operator()( const raft::append_entries_reply& lhs, const raft::append_entries_reply& rhs) const { - return lhs.last_committed_log_index < rhs.last_committed_log_index; + return lhs.last_flushed_log_index < rhs.last_flushed_log_index; } }; adl{}.to(out, reply.meta.size()); @@ -513,8 +513,8 @@ ss::future<> async_adl::to( encodee.groups[i] = reply.meta[i].group; encodee.terms[i] = std::max(model::term_id(-1), reply.meta[i].term); - encodee.last_committed_log_index[i] = std::max( - model::offset(-1), reply.meta[i].last_committed_log_index); + encodee.last_flushed_log_index[i] = std::max( + model::offset(-1), reply.meta[i].last_flushed_log_index); encodee.last_dirty_log_index[i] = std::max( model::offset(-1), reply.meta[i].last_dirty_log_index); encodee.last_term_base_offset[i] = std::max( @@ -528,7 +528,7 @@ ss::future<> async_adl::to( internal::encode_one_delta_array(out, encodee.terms); internal::encode_one_delta_array( - out, encodee.last_committed_log_index); + out, encodee.last_flushed_log_index); internal::encode_one_delta_array( out, encodee.last_dirty_log_index); internal::encode_one_delta_array( @@ -569,11 +569,11 @@ async_adl::from(iobuf_parser& in) { in, reply.meta[i - 1].term); } - reply.meta[0].last_committed_log_index = varlong_reader(in); + reply.meta[0].last_flushed_log_index = varlong_reader(in); for (size_t i = 1; i < size; ++i) { - reply.meta[i].last_committed_log_index + reply.meta[i].last_flushed_log_index = internal::read_one_varint_delta( - in, reply.meta[i - 1].last_committed_log_index); + in, reply.meta[i - 1].last_flushed_log_index); } reply.meta[0].last_dirty_log_index = varlong_reader(in); @@ -614,7 +614,7 @@ async_adl::from(iobuf_parser& in) { } for (auto& m : reply.meta) { - m.last_committed_log_index = decode_signed(m.last_committed_log_index); + m.last_flushed_log_index = decode_signed(m.last_flushed_log_index); m.last_dirty_log_index = decode_signed(m.last_dirty_log_index); m.last_term_base_offset = decode_signed(m.last_term_base_offset); m.node_id = raft::vnode( diff --git a/src/v/raft/types.h b/src/v/raft/types.h index ace4c9c918fc..a81361c28b99 100644 --- a/src/v/raft/types.h +++ b/src/v/raft/types.h @@ -67,14 +67,14 @@ struct follower_index_metadata { vnode node_id; // index of last known log for this follower - model::offset last_committed_log_index; + model::offset last_flushed_log_index; // index of last not flushed offset model::offset last_dirty_log_index; // index of log for which leader and follower logs matches model::offset match_index; // Used to establish index persistently replicated by majority constexpr model::offset match_committed_index() const { - return std::min(last_committed_log_index, match_index); + return std::min(last_flushed_log_index, match_index); } // next index to send to this follower model::offset next_index; @@ -238,7 +238,7 @@ struct append_entries_reply { /// \brief The recipient's last log index after it applied changes to /// the log. This is used to speed up finding the correct value for the /// nextIndex with a follower that is far behind a leader - model::offset last_committed_log_index; + model::offset last_flushed_log_index; model::offset last_dirty_log_index; // the last entry base offset used for the recovery speed up, the value is // only valid for not successfull append_entries reply From 65687cca6a17f5be173aa4fa6f11f81833347d63 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Tue, 9 Nov 2021 14:52:48 +0100 Subject: [PATCH 14/21] rpc/types: set data in rpc_client_ctx type constructor Data are always set together with header. We may leverage that fact and create appropriate constructor. Signed-off-by: Michal Maslanka (cherry picked from commit 32a6df85087ce01da860a038243462f79b614f67) --- src/v/rpc/transport.h | 4 +--- src/v/rpc/types.h | 6 ++++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/v/rpc/transport.h b/src/v/rpc/transport.h index 3c5f23b74a8a..b16056088345 100644 --- a/src/v/rpc/transport.h +++ b/src/v/rpc/transport.h @@ -149,9 +149,7 @@ result> map_result(const header& hdr, T data) { auto st = static_cast(hdr.meta); if (st == status::success) { - rpc::client_context ctx(hdr); - ctx.data = std::move(data); - return ret_t(std::move(ctx)); + return ret_t(rpc::client_context(hdr, std::move(data))); } if (st == status::request_timeout) { diff --git a/src/v/rpc/types.h b/src/v/rpc/types.h index b6851f7e3d06..6e9b0fb09619 100644 --- a/src/v/rpc/types.h +++ b/src/v/rpc/types.h @@ -233,8 +233,10 @@ struct method { /// \brief used in returned types for client::send_typed() calls template struct client_context { - explicit client_context(header h) - : hdr(std::move(h)) {} + client_context(header h, T data) + : hdr(h) + , data(std::move(data)) {} + header hdr; T data; }; From 67975b2358d3f0c56b5e80ae93551a30c858231b Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Tue, 9 Nov 2021 14:55:39 +0100 Subject: [PATCH 15/21] rpc/transport: fixed error parsing response with error status Our RPC implementation support propagating server errors to the client. For this we use a `meta` field in RPC response header. The status that is included into `header::meta` field indicates whether the request is successful or its execution failed on the server side. When status indicates an error response body is empty. Fixed an error caused by the fact that we tried to parse response body before we checked the status flag. This caused parsing error and broken promise exception on the client side. Signed-off-by: Michal Maslanka (cherry picked from commit 320b3ce9f176c625b1af52214997c2c046dcfdb3) --- src/v/rpc/transport.h | 54 ++++++++++++++++++++++++++----------------- 1 file changed, 33 insertions(+), 21 deletions(-) diff --git a/src/v/rpc/transport.h b/src/v/rpc/transport.h index b16056088345..91a72a499189 100644 --- a/src/v/rpc/transport.h +++ b/src/v/rpc/transport.h @@ -143,29 +143,46 @@ class transport final : public base_transport { }; namespace internal { + +inline errc map_server_error(status status) { + switch (status) { + case status::success: + return errc::success; + case status::request_timeout: + return errc::client_request_timeout; + case rpc::status::server_error: + return errc::service_error; + case status::method_not_found: + return errc::method_not_found; + }; +}; + template -result> map_result(const header& hdr, T data) { +ss::future>> parse_result( + ss::input_stream& in, std::unique_ptr sctx) { using ret_t = result>; - auto st = static_cast(hdr.meta); + // check status first + auto st = static_cast(sctx->get_header().meta); + // success case if (st == status::success) { - return ret_t(rpc::client_context(hdr, std::move(data))); - } - - if (st == status::request_timeout) { - return ret_t(errc::client_request_timeout); + return parse_type(in, sctx->get_header()) + .then([sctx = std::move(sctx)](T data) { + sctx->signal_body_parse(); + return ret_t( + rpc::client_context(sctx->get_header(), std::move(data))); + }); } - if (st == status::server_error) { - return ret_t(errc::service_error); - } - - if (st == status::method_not_found) { - return ret_t(errc::method_not_found); - } + /** + * signal that request body is parsed since it is empty when status + * indicates server error. + */ + sctx->signal_body_parse(); - return ret_t(errc::service_error); + return ss::make_ready_future(map_server_error(st)); } + } // namespace internal template @@ -191,12 +208,7 @@ transport::send_typed(Input r, uint32_t method_id, rpc::client_opts opts) { if (!sctx) { return ss::make_ready_future(sctx.error()); } - return parse_type(_in, sctx.value()->get_header()) - .then([sctx = std::move(sctx)](Output o) { - sctx.value()->signal_body_parse(); - return internal::map_result( - sctx.value()->get_header(), std::move(o)); - }); + return internal::parse_result(_in, std::move(sctx.value())); }); } From e4eeed932a3242228a37b37d47a815cd34246fff Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Tue, 9 Nov 2021 14:59:15 +0100 Subject: [PATCH 16/21] rpc/tests: added field to throwing echo service response Added field to response of a service that is supposed to thrown an exception during execution. This way we trigger situation in which response that contains error in header has different payload size than successful response. Signed-off-by: Michal Maslanka (cherry picked from commit 8ae27c0e94b939046a4015b6eb1a4a2517cbcf8b) --- src/v/rpc/test/rpc_gen_types.h | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/v/rpc/test/rpc_gen_types.h b/src/v/rpc/test/rpc_gen_types.h index 1b60cafac2d3..960c04364435 100644 --- a/src/v/rpc/test/rpc_gen_types.h +++ b/src/v/rpc/test/rpc_gen_types.h @@ -54,6 +54,8 @@ enum class failure_type { throw_exception, exceptional_future, none }; using throw_req = failure_type; -struct throw_resp {}; +struct throw_resp { + ss::sstring reply; +}; } // namespace echo From 8527b6024d0ae968c9a991e9ab4331a602c48dbe Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Tue, 9 Nov 2021 15:24:15 +0100 Subject: [PATCH 17/21] rpc/transport: forward exception to streaming context promise Streaming context promise must be set with exception when rpc body parsing failed. This way we will forward exception to rpc transport read dispatch loop. Not setting an exception in body parsing promise would lead to `broken_promise` exception since the promise would be deleted before setting its value Signed-off-by: Michal Maslanka (cherry picked from commit 6332a918a4d8673f28f075b88e4028c88c9f1dac) --- src/v/rpc/simple_protocol.cc | 3 +++ src/v/rpc/test/response_handler_tests.cc | 3 +++ src/v/rpc/transport.cc | 3 +++ src/v/rpc/transport.h | 15 ++++++++++++--- src/v/rpc/types.h | 1 + 5 files changed, 22 insertions(+), 3 deletions(-) diff --git a/src/v/rpc/simple_protocol.cc b/src/v/rpc/simple_protocol.cc index 7f451032faaf..f9dbbb7be7dd 100644 --- a/src/v/rpc/simple_protocol.cc +++ b/src/v/rpc/simple_protocol.cc @@ -33,6 +33,9 @@ struct server_context_impl final : streaming_context { ~server_context_impl() override { res.probe().request_completed(); } const header& get_header() const final { return hdr; } void signal_body_parse() final { pr.set_value(); } + void body_parse_exception(std::exception_ptr e) final { + pr.set_exception(std::move(e)); + } server::resources res; header hdr; ss::promise<> pr; diff --git a/src/v/rpc/test/response_handler_tests.cc b/src/v/rpc/test/response_handler_tests.cc index 2e06d190de1a..e3309235e37e 100644 --- a/src/v/rpc/test/response_handler_tests.cc +++ b/src/v/rpc/test/response_handler_tests.cc @@ -12,6 +12,8 @@ #include "test_utils/fixture.h" #include + +#include using namespace std::chrono_literals; // NOLINT struct test_str_ctx final : public rpc::streaming_context { @@ -22,6 +24,7 @@ struct test_str_ctx final : public rpc::streaming_context { virtual const rpc::header& get_header() const final { return hdr; } virtual void signal_body_parse() final {} + virtual void body_parse_exception(std::exception_ptr) final {} rpc::header hdr; ss::semaphore s{1}; diff --git a/src/v/rpc/transport.cc b/src/v/rpc/transport.cc index fa2026a9f44b..da1c58116d85 100644 --- a/src/v/rpc/transport.cc +++ b/src/v/rpc/transport.cc @@ -42,6 +42,9 @@ struct client_context_impl final : streaming_context { } const header& get_header() const final { return _h; } void signal_body_parse() final { pr.set_value(); } + void body_parse_exception(std::exception_ptr e) final { + pr.set_exception(std::move(e)); + } std::reference_wrapper _c; header _h; ss::promise<> pr; diff --git a/src/v/rpc/transport.h b/src/v/rpc/transport.h index 91a72a499189..65d5197aa9ac 100644 --- a/src/v/rpc/transport.h +++ b/src/v/rpc/transport.h @@ -167,10 +167,19 @@ ss::future>> parse_result( // success case if (st == status::success) { return parse_type(in, sctx->get_header()) - .then([sctx = std::move(sctx)](T data) { + .then_wrapped([sctx = std::move(sctx)](ss::future data_fut) { + if (data_fut.failed()) { + sctx->body_parse_exception(data_fut.get_exception()); + /** + * we want to throw an exception when body parsing failed. + * this will invalidate the connection since it may not be + * valid any more. + */ + std::rethrow_exception(data_fut.get_exception()); + } sctx->signal_body_parse(); - return ret_t( - rpc::client_context(sctx->get_header(), std::move(data))); + return ret_t(rpc::client_context( + sctx->get_header(), std::move(data_fut.get()))); }); } diff --git a/src/v/rpc/types.h b/src/v/rpc/types.h index 6e9b0fb09619..b8a843ff32a8 100644 --- a/src/v/rpc/types.h +++ b/src/v/rpc/types.h @@ -156,6 +156,7 @@ class streaming_context { /// \brief because we parse the input as a _stream_ we need to signal /// to the dispatching thread that it can resume parsing for a new RPC virtual void signal_body_parse() = 0; + virtual void body_parse_exception(std::exception_ptr) = 0; /// \brief keep these units until destruction of context. /// usually, we want to keep the reservation of the memory size permanently From bfd51723a6a8667157a477046e93fd6b435ed14b Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Mon, 8 Nov 2021 18:04:59 +0100 Subject: [PATCH 18/21] r/consensus: logging truncation with info severity Log truncation should be rare event. Logging it with info will help us understand and track situations in which truncations happen. Signed-off-by: Michal Maslanka (cherry picked from commit 2c2ba7f2dc133c50a919ffa4dcec9733a36f2a0c) --- src/v/raft/consensus.cc | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index 9d9c7e2e6260..e483cffe4bc3 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -1610,10 +1610,9 @@ consensus::do_append_entries(append_entries_request&& r) { auto truncate_at = details::next_offset( model::offset(r.meta.prev_log_index)); vlog( - _ctxlog.debug, - "Truncate log, request for the same term:{}. Request offset:{} " - "is " - "earlier than what we have:{}. Truncating to: {}", + _ctxlog.info, + "Truncating log in term: {}, Request previous log index: {} is " + "earlier than log end offset: {}. Truncating to: {}", r.meta.term, r.meta.prev_log_index, lstats.dirty_offset, From 96868aa889554b800c045106c34584c70cd01142 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Thu, 11 Nov 2021 10:51:55 +0100 Subject: [PATCH 19/21] r/consensus: update flushed offset when truncating log When truncating log we need to update raft flushed offset to reflect truncated log state Signed-off-by: Michal Maslanka (cherry picked from commit 4d875121d766bc2ad6fd22bfd55ab8bf8c08e481) --- src/v/raft/consensus.cc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index e483cffe4bc3..089af3600283 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -1633,6 +1633,10 @@ consensus::do_append_entries(append_entries_request&& r) { _last_quorum_replicated_index = std::min( details::prev_offset(truncate_at), _last_quorum_replicated_index); + // update flushed offset since truncation may happen to already + // flushed entries + _flushed_offset = std::min( + details::prev_offset(truncate_at), _flushed_offset); return _configuration_manager.truncate(truncate_at).then([this] { _probe.configuration_update(); From 4ceca9fb4bef0049c01258310fd7cb16343a7fa8 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Thu, 11 Nov 2021 11:30:11 +0100 Subject: [PATCH 20/21] r/consensus: updated flushed offset when hydrating a snapshot Signed-off-by: Michal Maslanka (cherry picked from commit 535b9955d726745b4ac84f7ac33a6b5f53d391ce) --- src/v/raft/consensus.cc | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index 089af3600283..87ff526ec9dc 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -1732,6 +1732,11 @@ ss::future<> consensus::truncate_to_latest_snapshot() { return _log.truncate_prefix(storage::truncate_prefix_config( details::next_offset(_last_snapshot_index), _scheduling.default_iopc)); + }) + .then([this] { + // when log was prefix truncate flushed offset should be equal to at + // least last snapshot index + _flushed_offset = std::max(_last_snapshot_index, _flushed_offset); }); } From 0a34ae057667ab5d7096a75d8fd7eaa4c68641d8 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Thu, 11 Nov 2021 11:47:17 +0100 Subject: [PATCH 21/21] r/consensus: fixed validating configuration change in leader transfer In redpanda raft implementation supports relaxed consistency. With relaxed consistency we not always update `_commit_index` even tho entries were replicated to all the nodes. When performing leadership transfer we need to take that into account. When configuration is successfully replicated to majority of nodes and it is not in joint consensus state we are safe to proceed with the transfer. Signed-off-by: Michal Maslanka (cherry picked from commit 42d3d54f9a2d4c5e8e232d95cc6480c5fee422e6) --- src/v/raft/consensus.cc | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index 87ff526ec9dc..67c627671b58 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -2577,14 +2577,16 @@ consensus::do_transfer_leadership(std::optional target) { make_error_code(errc::not_leader)); } - if (_configuration_manager.get_latest_offset() > _commit_index) { + auto conf = _configuration_manager.get_latest(); + if ( + _configuration_manager.get_latest_offset() > last_visible_index() + || conf.type() == configuration_type::joint) { vlog( _ctxlog.warn, "Cannot transfer leadership during configuration change"); return ss::make_ready_future( make_error_code(errc::configuration_change_in_progress)); } - auto conf = _configuration_manager.get_latest(); auto target_rni = conf.current_config().find(*target); if (!target_rni) {