Skip to content

Commit

Permalink
Merge pull request #5053 from dotnwat/rpc-types-serde-support
Browse files Browse the repository at this point in the history
cluster: add support for serde to metadata and id allocator service types
  • Loading branch information
dotnwat committed Jun 8, 2022
2 parents ebb8e46 + a411ebe commit b245f35
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 11 deletions.
49 changes: 40 additions & 9 deletions src/v/cluster/metadata_dissemination_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,21 @@
#include "model/fundamental.h"
#include "model/metadata.h"
#include "reflection/adl.h"
#include "serde/serde.h"

#include <fmt/ostream.h>

#include <ostream>

namespace cluster {

struct ntp_leader {
struct ntp_leader : serde::envelope<ntp_leader, serde::version<0>> {
model::ntp ntp;
model::term_id term;
std::optional<model::node_id> leader_id;

ntp_leader() noexcept = default;

ntp_leader(
model::ntp ntp,
model::term_id term,
Expand All @@ -44,14 +47,19 @@ struct ntp_leader {
l.leader_id ? l.leader_id.value()() : -1);
return o;
}

auto serde_fields() { return std::tie(ntp, term, leader_id); }
};

struct ntp_leader_revision {
struct ntp_leader_revision
: serde::envelope<ntp_leader_revision, serde::version<0>> {
model::ntp ntp;
model::term_id term;
std::optional<model::node_id> leader_id;
model::revision_id revision;

ntp_leader_revision() noexcept = default;

ntp_leader_revision(
model::ntp ntp,
model::term_id term,
Expand All @@ -73,37 +81,60 @@ struct ntp_leader_revision {
r.revision);
return o;
}

auto serde_fields() { return std::tie(ntp, term, leader_id, revision); }
};

struct update_leadership_request {
struct update_leadership_request
: serde::envelope<update_leadership_request, serde::version<0>> {
std::vector<ntp_leader> leaders;

update_leadership_request() noexcept = default;

explicit update_leadership_request(std::vector<ntp_leader> leaders)
: leaders(std::move(leaders)) {}

auto serde_fields() { return std::tie(leaders); }
};

struct update_leadership_request_v2 {
struct update_leadership_request_v2
: serde::envelope<update_leadership_request_v2, serde::version<0>> {
static constexpr int8_t version = 0;
std::vector<ntp_leader_revision> leaders;

update_leadership_request_v2() noexcept = default;

explicit update_leadership_request_v2(
std::vector<ntp_leader_revision> leaders)
: leaders(std::move(leaders)) {}

auto serde_fields() { return std::tie(leaders); }
};

struct update_leadership_reply {
update_leadership_reply() = default;
struct update_leadership_reply
: serde::envelope<update_leadership_reply, serde::version<0>> {
update_leadership_reply() noexcept = default;

auto serde_fields() { return std::tie(); }
};

struct get_leadership_request {
get_leadership_request() = default;
struct get_leadership_request
: serde::envelope<get_leadership_request, serde::version<0>> {
get_leadership_request() noexcept = default;

auto serde_fields() { return std::tie(); }
};

struct get_leadership_reply {
struct get_leadership_reply
: serde::envelope<get_leadership_reply, serde::version<0>> {
std::vector<ntp_leader> leaders;

get_leadership_reply() noexcept = default;

explicit get_leadership_reply(std::vector<ntp_leader> leaders)
: leaders(std::move(leaders)) {}

auto serde_fields() { return std::tie(leaders); }
};

} // namespace cluster
Expand Down
76 changes: 76 additions & 0 deletions src/v/cluster/tests/serialization_rt_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// by the Apache License, Version 2.0

#include "cluster/health_monitor_types.h"
#include "cluster/metadata_dissemination_types.h"
#include "cluster/tests/utils.h"
#include "cluster/types.h"
#include "model/compression.h"
Expand Down Expand Up @@ -452,3 +453,78 @@ SEASTAR_THREAD_TEST_CASE(partition_status_serialization_old_version) {
BOOST_CHECK(result[i].leader_id == original[i].leader_id);
}
}

template<typename T>
void roundtrip_test(const T original) {
auto serde_in = original;
auto adl_in = original;

auto serde_out = serde::to_iobuf(std::move(serde_in));
auto adl_out = reflection::to_iobuf(std::move(adl_in));

auto from_serde = serde::from_iobuf<T>(std::move(serde_out));
auto from_adl = reflection::from_iobuf<T>(std::move(adl_out));

BOOST_REQUIRE(original == from_serde);
BOOST_REQUIRE(original == from_adl);
}

SEASTAR_THREAD_TEST_CASE(serde_reflection_roundtrip) {
roundtrip_test(cluster::ntp_leader(
model::ntp(
model::ns("a namespace"),
model::topic("a topic"),
model::partition_id(287)),
model::term_id(1234),
model::node_id(2)));

roundtrip_test(cluster::ntp_leader_revision(
model::ntp(
model::ns("a namespace"),
model::topic("a topic"),
model::partition_id(287)),
model::term_id(1234),
model::node_id(2),
model::revision_id(888)));

roundtrip_test(cluster::update_leadership_request({
cluster::ntp_leader(
model::ntp(
model::ns("a namespace"),
model::topic("a topic"),
model::partition_id(287)),
model::term_id(1234),
model::node_id(2)),
}));

roundtrip_test(cluster::update_leadership_request_v2({
cluster::ntp_leader_revision(
model::ntp(
model::ns("a namespace"),
model::topic("a topic"),
model::partition_id(287)),
model::term_id(1234),
model::node_id(2),
model::revision_id(8888)),
}));

roundtrip_test(cluster::update_leadership_reply());

roundtrip_test(cluster::get_leadership_request());

roundtrip_test(cluster::get_leadership_reply({
cluster::ntp_leader(
model::ntp(
model::ns("a namespace"),
model::topic("a topic"),
model::partition_id(287)),
model::term_id(1234),
model::node_id(2)),
}));

roundtrip_test(
cluster::allocate_id_request(model::timeout_clock::duration(234234)));

roundtrip_test(
cluster::allocate_id_reply(23433, cluster::errc::invalid_node_operation));
}
15 changes: 13 additions & 2 deletions src/v/cluster/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "model/timeout_clock.h"
#include "raft/types.h"
#include "security/acl.h"
#include "serde/serde.h"
#include "storage/ntp_config.h"
#include "tristate.h"
#include "utils/to_string.h"
Expand All @@ -41,20 +42,30 @@ using broker_ptr = ss::lw_shared_ptr<model::broker>;
using cluster_version = named_type<int64_t, struct cluster_version_tag>;
constexpr cluster_version invalid_version = cluster_version{-1};

struct allocate_id_request {
struct allocate_id_request
: serde::envelope<allocate_id_request, serde::version<0>> {
model::timeout_clock::duration timeout;

allocate_id_request() noexcept = default;

explicit allocate_id_request(model::timeout_clock::duration timeout)
: timeout(timeout) {}

auto serde_fields() { return std::tie(timeout); }
};

struct allocate_id_reply {
struct allocate_id_reply
: serde::envelope<allocate_id_reply, serde::version<0>> {
int64_t id;
errc ec;

allocate_id_reply() noexcept = default;

allocate_id_reply(int64_t id, errc ec)
: id(id)
, ec(ec) {}

auto serde_fields() { return std::tie(id, ec); }
};

enum class tx_errc {
Expand Down

0 comments on commit b245f35

Please sign in to comment.