diff --git a/src/v/cluster/metadata_dissemination_types.h b/src/v/cluster/metadata_dissemination_types.h index 6f1a358a584a..bb881956e2dc 100644 --- a/src/v/cluster/metadata_dissemination_types.h +++ b/src/v/cluster/metadata_dissemination_types.h @@ -15,6 +15,7 @@ #include "model/fundamental.h" #include "model/metadata.h" #include "reflection/adl.h" +#include "serde/serde.h" #include @@ -22,11 +23,13 @@ namespace cluster { -struct ntp_leader { +struct ntp_leader : serde::envelope> { model::ntp ntp; model::term_id term; std::optional leader_id; + ntp_leader() noexcept = default; + ntp_leader( model::ntp ntp, model::term_id term, @@ -44,14 +47,19 @@ struct ntp_leader { l.leader_id ? l.leader_id.value()() : -1); return o; } + + auto serde_fields() { return std::tie(ntp, term, leader_id); } }; -struct ntp_leader_revision { +struct ntp_leader_revision + : serde::envelope> { model::ntp ntp; model::term_id term; std::optional leader_id; model::revision_id revision; + ntp_leader_revision() noexcept = default; + ntp_leader_revision( model::ntp ntp, model::term_id term, @@ -73,37 +81,60 @@ struct ntp_leader_revision { r.revision); return o; } + + auto serde_fields() { return std::tie(ntp, term, leader_id, revision); } }; -struct update_leadership_request { +struct update_leadership_request + : serde::envelope> { std::vector leaders; + update_leadership_request() noexcept = default; + explicit update_leadership_request(std::vector leaders) : leaders(std::move(leaders)) {} + + auto serde_fields() { return std::tie(leaders); } }; -struct update_leadership_request_v2 { +struct update_leadership_request_v2 + : serde::envelope> { static constexpr int8_t version = 0; std::vector leaders; + update_leadership_request_v2() noexcept = default; + explicit update_leadership_request_v2( std::vector leaders) : leaders(std::move(leaders)) {} + + auto serde_fields() { return std::tie(leaders); } }; -struct update_leadership_reply { - update_leadership_reply() = default; +struct update_leadership_reply + : serde::envelope> { + update_leadership_reply() noexcept = default; + + auto serde_fields() { return std::tie(); } }; -struct get_leadership_request { - get_leadership_request() = default; +struct get_leadership_request + : serde::envelope> { + get_leadership_request() noexcept = default; + + auto serde_fields() { return std::tie(); } }; -struct get_leadership_reply { +struct get_leadership_reply + : serde::envelope> { std::vector leaders; + get_leadership_reply() noexcept = default; + explicit get_leadership_reply(std::vector leaders) : leaders(std::move(leaders)) {} + + auto serde_fields() { return std::tie(leaders); } }; } // namespace cluster diff --git a/src/v/cluster/tests/serialization_rt_test.cc b/src/v/cluster/tests/serialization_rt_test.cc index 2b02a92d109d..05f741ae72cf 100644 --- a/src/v/cluster/tests/serialization_rt_test.cc +++ b/src/v/cluster/tests/serialization_rt_test.cc @@ -8,6 +8,7 @@ // by the Apache License, Version 2.0 #include "cluster/health_monitor_types.h" +#include "cluster/metadata_dissemination_types.h" #include "cluster/tests/utils.h" #include "cluster/types.h" #include "model/compression.h" @@ -452,3 +453,78 @@ SEASTAR_THREAD_TEST_CASE(partition_status_serialization_old_version) { BOOST_CHECK(result[i].leader_id == original[i].leader_id); } } + +template +void roundtrip_test(const T original) { + auto serde_in = original; + auto adl_in = original; + + auto serde_out = serde::to_iobuf(std::move(serde_in)); + auto adl_out = reflection::to_iobuf(std::move(adl_in)); + + auto from_serde = serde::from_iobuf(std::move(serde_out)); + auto from_adl = reflection::from_iobuf(std::move(adl_out)); + + BOOST_REQUIRE(original == from_serde); + BOOST_REQUIRE(original == from_adl); +} + +SEASTAR_THREAD_TEST_CASE(serde_reflection_roundtrip) { + roundtrip_test(cluster::ntp_leader( + model::ntp( + model::ns("a namespace"), + model::topic("a topic"), + model::partition_id(287)), + model::term_id(1234), + model::node_id(2))); + + roundtrip_test(cluster::ntp_leader_revision( + model::ntp( + model::ns("a namespace"), + model::topic("a topic"), + model::partition_id(287)), + model::term_id(1234), + model::node_id(2), + model::revision_id(888))); + + roundtrip_test(cluster::update_leadership_request({ + cluster::ntp_leader( + model::ntp( + model::ns("a namespace"), + model::topic("a topic"), + model::partition_id(287)), + model::term_id(1234), + model::node_id(2)), + })); + + roundtrip_test(cluster::update_leadership_request_v2({ + cluster::ntp_leader_revision( + model::ntp( + model::ns("a namespace"), + model::topic("a topic"), + model::partition_id(287)), + model::term_id(1234), + model::node_id(2), + model::revision_id(8888)), + })); + + roundtrip_test(cluster::update_leadership_reply()); + + roundtrip_test(cluster::get_leadership_request()); + + roundtrip_test(cluster::get_leadership_reply({ + cluster::ntp_leader( + model::ntp( + model::ns("a namespace"), + model::topic("a topic"), + model::partition_id(287)), + model::term_id(1234), + model::node_id(2)), + })); + + roundtrip_test( + cluster::allocate_id_request(model::timeout_clock::duration(234234))); + + roundtrip_test( + cluster::allocate_id_reply(23433, cluster::errc::invalid_node_operation)); +} diff --git a/src/v/cluster/types.h b/src/v/cluster/types.h index e08317e7aa46..5fec8bbfa7a8 100644 --- a/src/v/cluster/types.h +++ b/src/v/cluster/types.h @@ -22,6 +22,7 @@ #include "model/timeout_clock.h" #include "raft/types.h" #include "security/acl.h" +#include "serde/serde.h" #include "storage/ntp_config.h" #include "tristate.h" #include "utils/to_string.h" @@ -41,20 +42,30 @@ using broker_ptr = ss::lw_shared_ptr; using cluster_version = named_type; constexpr cluster_version invalid_version = cluster_version{-1}; -struct allocate_id_request { +struct allocate_id_request + : serde::envelope> { model::timeout_clock::duration timeout; + allocate_id_request() noexcept = default; + explicit allocate_id_request(model::timeout_clock::duration timeout) : timeout(timeout) {} + + auto serde_fields() { return std::tie(timeout); } }; -struct allocate_id_reply { +struct allocate_id_reply + : serde::envelope> { int64_t id; errc ec; + allocate_id_reply() noexcept = default; + allocate_id_reply(int64_t id, errc ec) : id(id) , ec(ec) {} + + auto serde_fields() { return std::tie(id, ec); } }; enum class tx_errc {