diff --git a/src/v/cluster/tests/serialization_rt_test.cc b/src/v/cluster/tests/serialization_rt_test.cc index e2fa744a7082..99d34637f639 100644 --- a/src/v/cluster/tests/serialization_rt_test.cc +++ b/src/v/cluster/tests/serialization_rt_test.cc @@ -17,6 +17,7 @@ #include "model/metadata.h" #include "model/tests/randoms.h" #include "model/timestamp.h" +#include "raft/types.h" #include "random/generators.h" #include "reflection/adl.h" #include "storage/types.h" @@ -1958,6 +1959,135 @@ SEASTAR_THREAD_TEST_CASE(serde_reflection_roundtrip) { // semantics are preserved. serde_roundtrip_test(data); } + { + raft::transfer_leadership_request data{ + .group = tests::random_named_int(), + }; + if (tests::random_bool()) { + data.target = tests::random_named_int(); + } + roundtrip_test(data); + } + { + raft::transfer_leadership_reply data{ + .success = tests::random_bool(), + .result = raft::errc::append_entries_dispatch_error, + }; + roundtrip_test(data); + } + { + raft::vnode data{ + tests::random_named_int(), + tests::random_named_int()}; + roundtrip_test(data); + } + { + raft::timeout_now_request data{ + .target_node_id = raft:: + vnode{tests::random_named_int(), tests::random_named_int()}, + .node_id = raft:: + vnode{tests::random_named_int(), tests::random_named_int()}, + .group = tests::random_named_int(), + .term = tests::random_named_int(), + }; + roundtrip_test(data); + } + { + raft::timeout_now_reply data{ + .target_node_id = raft:: + vnode{tests::random_named_int(), tests::random_named_int()}, + .term = tests::random_named_int(), + .result = raft::timeout_now_reply::status::failure, + }; + roundtrip_test(data); + } + { + const raft::install_snapshot_request orig{ + .target_node_id = raft:: + vnode{tests::random_named_int(), tests::random_named_int()}, + .term = tests::random_named_int(), + .group = tests::random_named_int(), + .node_id = raft:: + vnode{tests::random_named_int(), tests::random_named_int()}, + .last_included_index = tests::random_named_int(), + .file_offset = random_generators::get_int(), + .chunk = bytes_to_iobuf( + random_generators::get_bytes(random_generators::get_int(1024))), + .done = tests::random_bool(), + }; + /* + * manual adl/serde test to workaround iobuf being move-only + */ + { + raft::install_snapshot_request serde_in{ + .target_node_id = orig.target_node_id, + .term = orig.term, + .group = orig.group, + .node_id = orig.node_id, + .last_included_index = orig.last_included_index, + .file_offset = orig.file_offset, + .chunk = orig.chunk.copy(), + .done = orig.done, + }; + auto serde_out = serde::to_iobuf(std::move(serde_in)); + auto from_serde = serde::from_iobuf( + std::move(serde_out)); + + BOOST_REQUIRE(orig == from_serde); + } + { + raft::install_snapshot_request adl_in{ + .target_node_id = orig.target_node_id, + .term = orig.term, + .group = orig.group, + .node_id = orig.node_id, + .last_included_index = orig.last_included_index, + .file_offset = orig.file_offset, + .chunk = orig.chunk.copy(), + .done = orig.done, + }; + auto adl_out = reflection::to_iobuf(std::move(adl_in)); + auto from_adl + = reflection::from_iobuf( + std::move(adl_out)); + + BOOST_REQUIRE(orig == from_adl); + } + } + { + raft::install_snapshot_reply data{ + .target_node_id = raft:: + vnode{tests::random_named_int(), tests::random_named_int()}, + .term = tests::random_named_int(), + .bytes_stored = random_generators::get_int(), + .success = tests::random_bool(), + }; + roundtrip_test(data); + } + { + raft::vote_request data{ + .node_id = raft:: + vnode{tests::random_named_int(), tests::random_named_int()}, + .target_node_id = raft:: + vnode{tests::random_named_int(), tests::random_named_int()}, + .group = tests::random_named_int(), + .term = tests::random_named_int(), + .prev_log_index = tests::random_named_int(), + .prev_log_term = tests::random_named_int(), + .leadership_transfer = tests::random_bool(), + }; + roundtrip_test(data); + } + { + raft::vote_reply data{ + .target_node_id = raft:: + vnode{tests::random_named_int(), tests::random_named_int()}, + .term = tests::random_named_int(), + .granted = tests::random_bool(), + .log_ok = tests::random_bool(), + }; + roundtrip_test(data); + } } SEASTAR_THREAD_TEST_CASE(cluster_property_kv_exchangable_with_pair) { diff --git a/src/v/raft/group_configuration.h b/src/v/raft/group_configuration.h index 9373e03161ae..fda463826067 100644 --- a/src/v/raft/group_configuration.h +++ b/src/v/raft/group_configuration.h @@ -24,7 +24,7 @@ namespace raft { static constexpr model::revision_id no_revision{}; -class vnode { +class vnode : public serde::envelope> { public: constexpr vnode() = default; @@ -45,6 +45,8 @@ class vnode { constexpr model::node_id id() const { return _node_id; } constexpr model::revision_id revision() const { return _revision; } + auto serde_fields() { return std::tie(_node_id, _revision); } + private: model::node_id _node_id; model::revision_id _revision; diff --git a/src/v/raft/types.h b/src/v/raft/types.h index a9878f16c0c6..a017935fbb3d 100644 --- a/src/v/raft/types.h +++ b/src/v/raft/types.h @@ -278,7 +278,7 @@ struct heartbeat_reply { friend std::ostream& operator<<(std::ostream& o, const heartbeat_reply& r); }; -struct vote_request { +struct vote_request : serde::envelope> { vnode node_id; // node id to validate on receiver vnode target_node_id; @@ -295,9 +295,22 @@ struct vote_request { vnode target_node() const { return target_node_id; } friend std::ostream& operator<<(std::ostream& o, const vote_request& r); + + friend bool operator==(const vote_request&, const vote_request&) = default; + + auto serde_fields() { + return std::tie( + node_id, + target_node_id, + group, + term, + prev_log_index, + prev_log_term, + leadership_transfer); + } }; -struct vote_reply { +struct vote_reply : serde::envelope> { // node id to validate on receiver vnode target_node_id; /// \brief callee's term, for the caller to upate itself @@ -312,6 +325,12 @@ struct vote_reply { bool log_ok = false; friend std::ostream& operator<<(std::ostream& o, const vote_reply& r); + + friend bool operator==(const vote_reply&, const vote_reply&) = default; + + auto serde_fields() { + return std::tie(target_node_id, term, granted, log_ok); + } }; /// This structure is used by consensus to notify other systems about group @@ -374,7 +393,8 @@ struct snapshot_metadata { "version is equal to 3, please change it accordignly"); }; -struct install_snapshot_request { +struct install_snapshot_request + : serde::envelope> { // node id to validate on receiver vnode target_node_id; // leader’s term @@ -396,6 +416,22 @@ struct install_snapshot_request { vnode target_node() const { return target_node_id; } friend std::ostream& operator<<(std::ostream&, const install_snapshot_request&); + + friend bool + operator==(const install_snapshot_request&, const install_snapshot_request&) + = default; + + auto serde_fields() { + return std::tie( + target_node_id, + term, + group, + node_id, + last_included_index, + file_offset, + chunk, + done); + } }; class install_snapshot_request_foreign_wrapper { @@ -426,7 +462,8 @@ class install_snapshot_request_foreign_wrapper { ptr_t _ptr; }; -struct install_snapshot_reply { +struct install_snapshot_reply + : serde::envelope> { // node id to validate on receiver vnode target_node_id; // current term, for leader to update itself @@ -445,6 +482,14 @@ struct install_snapshot_reply { friend std::ostream& operator<<(std::ostream&, const install_snapshot_reply&); + + friend bool + operator==(const install_snapshot_reply&, const install_snapshot_reply&) + = default; + + auto serde_fields() { + return std::tie(target_node_id, term, bytes_stored, success); + } }; /** @@ -461,7 +506,8 @@ struct write_snapshot_cfg { iobuf data; }; -struct timeout_now_request { +struct timeout_now_request + : serde::envelope> { // node id to validate on receiver vnode target_node_id; @@ -471,27 +517,55 @@ struct timeout_now_request { raft::group_id target_group() const { return group; } vnode target_node() const { return target_node_id; } + + friend bool + operator==(const timeout_now_request&, const timeout_now_request&) + = default; + + auto serde_fields() { + return std::tie(target_node_id, node_id, group, term); + } }; -struct timeout_now_reply { +struct timeout_now_reply + : serde::envelope> { enum class status : uint8_t { success, failure }; // node id to validate on receiver vnode target_node_id; model::term_id term; status result; + + friend bool operator==(const timeout_now_reply&, const timeout_now_reply&) + = default; + + auto serde_fields() { return std::tie(target_node_id, term, result); } }; // if not target is specified then the most up-to-date node will be selected -struct transfer_leadership_request { +struct transfer_leadership_request + : serde::envelope> { group_id group; std::optional target; raft::group_id target_group() const { return group; } + + friend bool operator==( + const transfer_leadership_request&, const transfer_leadership_request&) + = default; + + auto serde_fields() { return std::tie(group, target); } }; -struct transfer_leadership_reply { +struct transfer_leadership_reply + : serde::envelope> { bool success{false}; raft::errc result; + + friend bool operator==( + const transfer_leadership_reply&, const transfer_leadership_reply&) + = default; + + auto serde_fields() { return std::tie(success, result); } }; // key types used to store data in key-value store @@ -571,4 +645,170 @@ struct adl { raft::snapshot_metadata from(iobuf_parser& in); }; +template<> +struct adl { + void to(iobuf& out, raft::transfer_leadership_request&& r) { + serialize(out, r.group, r.target); + } + raft::transfer_leadership_request from(iobuf_parser& in) { + auto group = adl{}.from(in); + auto target = adl>{}.from(in); + return {.group = group, .target = target}; + } +}; + +template<> +struct adl { + void to(iobuf& out, raft::transfer_leadership_reply&& r) { + serialize(out, r.success, r.result); + } + raft::transfer_leadership_reply from(iobuf_parser& in) { + auto success = adl{}.from(in); + auto result = adl{}.from(in); + return {.success = success, .result = result}; + } +}; + +template<> +struct adl { + void to(iobuf& out, raft::timeout_now_request&& r) { + serialize(out, r.target_node_id, r.node_id, r.group, r.term); + } + raft::timeout_now_request from(iobuf_parser& in) { + auto target_node_id = adl{}.from(in); + auto node_id = adl{}.from(in); + auto group = adl{}.from(in); + auto term = adl{}.from(in); + return { + .target_node_id = target_node_id, + .node_id = node_id, + .group = group, + .term = term, + }; + } +}; + +template<> +struct adl { + void to(iobuf& out, raft::timeout_now_reply&& r) { + serialize(out, r.target_node_id, r.term, r.result); + } + raft::timeout_now_reply from(iobuf_parser& in) { + auto target_node_id = adl{}.from(in); + auto term = adl{}.from(in); + auto result = adl{}.from(in); + return { + .target_node_id = target_node_id, + .term = term, + .result = result, + }; + } +}; + +template<> +struct adl { + void to(iobuf& out, raft::install_snapshot_request&& r) { + serialize( + out, + r.target_node_id, + r.term, + r.group, + r.node_id, + r.last_included_index, + r.file_offset, + std::move(r.chunk), + r.done); + } + raft::install_snapshot_request from(iobuf_parser& in) { + auto target_node_id = adl{}.from(in); + auto term = adl{}.from(in); + auto group = adl{}.from(in); + auto node_id = adl{}.from(in); + auto last_included_index = adl{}.from(in); + auto file_offset = adl{}.from(in); + auto chunk = adl{}.from(in); + auto done = adl{}.from(in); + return { + .target_node_id = target_node_id, + .term = term, + .group = group, + .node_id = node_id, + .last_included_index = last_included_index, + .file_offset = file_offset, + .chunk = std::move(chunk), + .done = done, + }; + } +}; + +template<> +struct adl { + void to(iobuf& out, raft::install_snapshot_reply&& r) { + serialize(out, r.target_node_id, r.term, r.bytes_stored, r.success); + } + raft::install_snapshot_reply from(iobuf_parser& in) { + auto target_node_id = adl{}.from(in); + auto term = adl{}.from(in); + auto bytes_stored = adl{}.from(in); + auto success = adl{}.from(in); + return { + .target_node_id = target_node_id, + .term = term, + .bytes_stored = bytes_stored, + .success = success, + }; + } +}; + +template<> +struct adl { + void to(iobuf& out, raft::vote_request&& r) { + serialize( + out, + r.node_id, + r.target_node_id, + r.group, + r.term, + r.prev_log_index, + r.prev_log_term, + r.leadership_transfer); + } + raft::vote_request from(iobuf_parser& in) { + auto node_id = adl{}.from(in); + auto target_node_id = adl{}.from(in); + auto group = adl{}.from(in); + auto term = adl{}.from(in); + auto prev_log_index = adl{}.from(in); + auto prev_log_term = adl{}.from(in); + auto leadership_transfer = adl{}.from(in); + return { + .node_id = node_id, + .target_node_id = target_node_id, + .group = group, + .term = term, + .prev_log_index = prev_log_index, + .prev_log_term = prev_log_term, + .leadership_transfer = leadership_transfer, + }; + } +}; + +template<> +struct adl { + void to(iobuf& out, raft::vote_reply&& r) { + serialize(out, r.target_node_id, r.term, r.granted, r.log_ok); + } + raft::vote_reply from(iobuf_parser& in) { + auto target_node_id = adl{}.from(in); + auto term = adl{}.from(in); + auto granted = adl{}.from(in); + auto log_ok = adl{}.from(in); + return { + .target_node_id = target_node_id, + .term = term, + .granted = granted, + .log_ok = log_ok, + }; + } +}; } // namespace reflection