Skip to content

Commit

Permalink
Merge pull request #4963 from dotnwat/rpc-serde
Browse files Browse the repository at this point in the history
rpc: adaptive rpc message encoding
  • Loading branch information
dotnwat committed Jul 19, 2022
2 parents de6bcdb + 5d201d7 commit 5808440
Show file tree
Hide file tree
Showing 15 changed files with 867 additions and 85 deletions.
1 change: 0 additions & 1 deletion src/v/cluster/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ class controller {
ss::sharded<topics_frontend> _tp_frontend; // instance per core
ss::sharded<controller_backend> _backend; // instance per core
ss::sharded<controller_stm> _stm; // single instance
ss::sharded<controller_service> _service; // instance per core
ss::sharded<controller_api> _api; // instance per core
ss::sharded<members_frontend> _members_frontend; // instance per core
ss::sharded<members_backend> _members_backend; // single instance
Expand Down
1 change: 0 additions & 1 deletion src/v/cluster/fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ namespace cluster {

class controller;
class controller_backend;
class controller_service;
class controller_stm_shard;
class id_allocator_frontend;
class rm_partition_frontend;
Expand Down
2 changes: 2 additions & 0 deletions src/v/cluster/partition_balancer_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ struct partition_balancer_violations
model::node_id id;
model::timestamp unavailable_since;

unavailable_node() noexcept = default;
unavailable_node(model::node_id id, model::timestamp unavailable_since)
: id(id)
, unavailable_since(unavailable_since) {}
Expand All @@ -65,6 +66,7 @@ struct partition_balancer_violations
model::node_id id;
uint32_t disk_used_percent;

full_node() noexcept = default;
full_node(model::node_id id, uint32_t disk_used_percent)
: id(id)
, disk_used_percent(disk_used_percent) {}
Expand Down
6 changes: 6 additions & 0 deletions src/v/coproc/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ enum class topic_ingestion_policy : int8_t { earliest = 0, stored, latest };

/// \brief type to use for registration/deregistration of a topic
struct enable_copros_request {
using rpc_serde_exempt = std::true_type;
struct data {
script_id id;
iobuf source_code;
Expand All @@ -57,6 +58,7 @@ struct enable_copros_request {
/// \brief registration acks per copro, responses are organized in the
/// same order as the list of topics in the 'topics' array
struct enable_copros_reply {
using rpc_serde_exempt = std::true_type;
using topic_policy = std::pair<model::topic, topic_ingestion_policy>;
struct script_metadata {
script_id id;
Expand All @@ -76,19 +78,22 @@ using state_size_t = named_type<int64_t, struct state_size_tag>;
/// \brief deregistration request, remove all topics registered to a coprocessor
/// with id 'script_id'.
struct disable_copros_request {
using rpc_serde_exempt = std::true_type;
std::vector<script_id> ids;
};

/// \brief deregistration acks per topic, responses are organized in the
/// same order as the list of topics in the 'ids' array
struct disable_copros_reply {
using rpc_serde_exempt = std::true_type;
using ack = std::pair<script_id, disable_response_code>;
std::vector<ack> acks;
};

/// \brief Request that co-processors with the given script ids, process batches
/// from the reader whose source topic is the given ntp
struct process_batch_request {
using rpc_serde_exempt = std::true_type;
struct data {
std::vector<script_id> ids;
model::ntp ntp;
Expand All @@ -100,6 +105,7 @@ struct process_batch_request {
/// \brief Response from the above request, acks from script ids that have
/// processed the record and produce new batches on a new materialized ntp
struct process_batch_reply {
using rpc_serde_exempt = std::true_type;
struct data {
script_id id;
model::ntp source;
Expand Down
188 changes: 186 additions & 2 deletions src/v/rpc/parse_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,191 @@ inline void validate_payload_and_header(const iobuf& io, const header& h) {
}
}

/*
* the transition from adl to serde encoding in rpc requires a period of time
* where both encodings are supported for all message types. however, we do not
* want to extend this requirement to brand new messages / services, nor to rpc
* types used in coproc which will remain in legacy adl format for now.
*
* we use the type system to enforce these rules and allow types to be opt-out
* on a case-by-case basis for adl (new messages) or serde (legacy like coproc).
*
* the `rpc_adl_exempt` and `rpc_serde_exempt` type trait helpers can be used to
* opt-out a type T from adl or serde support. a type is marked exempt by
* defining the type `T::rpc_(adl|serde)_exempt`. the typedef may be defined as
* any type such as std::{void_t, true_type}.
*
* Example:
*
* struct exempt_msg {
* using rpc_adl_exempt = std::true_type;
* ...
* };
*
* then use the `is_rpc_adl_exempt` or `is_rpc_serde_exempt` concept to test.
*/
template<typename T>
concept is_rpc_adl_exempt = requires {
typename T::rpc_adl_exempt;
};

template<typename T>
concept is_rpc_serde_exempt = requires {
typename T::rpc_serde_exempt;
};

/*
* Encode a client request for the given transport version.
*
* Unless the message type T is explicitly exempt from adl<> support, type T
* must be supported by both adl<> and serde encoding frameworks. When the type
* is not exempt from adl<> support, serde is used when the version >= v2.
*
* The returned version indicates what level of encoding is used. This is always
* equal to the input version, except for serde-only messags which return v2.
* Callers are expected to further validate the runtime implications of this.
*/
template<typename T>
ss::future<transport_version>
encode_for_version(iobuf& out, T msg, transport_version version) {
static_assert(!is_rpc_adl_exempt<T> || !is_rpc_serde_exempt<T>);

if constexpr (is_rpc_serde_exempt<T>) {
return reflection::async_adl<T>{}.to(out, std::move(msg)).then([] {
return transport_version::v0;
});
} else if constexpr (is_rpc_adl_exempt<T>) {
return ss::do_with(std::move(msg), [&out](T& msg) {
return serde::write_async(out, std::move(msg)).then([] {
return transport_version::v2;
});
});
} else {
if (version < transport_version::v2) {
return reflection::async_adl<T>{}
.to(out, std::move(msg))
.then([version] { return version; });
} else {
return ss::do_with(std::move(msg), [&out, version](T& msg) {
return serde::write_async(out, std::move(msg)).then([version] {
return version;
});
});
}
}
}

/*
* Decode a client request at the given transport version.
*/
template<typename T>
ss::future<T>
decode_for_version(iobuf_parser& parser, transport_version version) {
static_assert(!is_rpc_adl_exempt<T> || !is_rpc_serde_exempt<T>);

if constexpr (is_rpc_serde_exempt<T>) {
if (version != transport_version::v0) {
return ss::make_exception_future<T>(std::runtime_error(fmt::format(
"Unexpected adl-only message {} at {} != v0",
typeid(T).name(),
version)));
}
return reflection::async_adl<T>{}.from(parser);
} else if constexpr (is_rpc_adl_exempt<T>) {
if (version < transport_version::v2) {
return ss::make_exception_future<T>(std::runtime_error(fmt::format(
"Unexpected serde-only message {} at {} < v2",
typeid(T).name(),
version)));
}
return serde::read_async<T>(parser);
} else {
if (version < transport_version::v2) {
return reflection::async_adl<T>{}.from(parser);
} else {
return serde::read_async<T>(parser);
}
}
}

/*
* type used to factor out version-specific functionality from request handling
* in services. this is used so that tests can specialize behavior.
*
* this is the default mixin that is used by the code generator.
*/
struct default_message_codec {
/*
* decodes a request (server) or response (client)
*/
template<typename T>
static ss::future<T>
decode(iobuf_parser& parser, transport_version version) {
return decode_for_version<T>(parser, version);
}

/*
* Used by the server to determine which version use when sending a response
* back to the client. The default behavior is maintain the same version as
* the received request.
*/
static transport_version response_version(const header& h) {
return h.version;
}

/*
* encodes a request (client) or response (server)
*/
template<typename T>
static ss::future<transport_version>
encode(iobuf& out, T msg, transport_version version) {
return encode_for_version(out, std::move(msg), version);
}
};

/*
* service specialization mixin to create a v0 compliant service. a v0 service
* encodes and decodes using adl, ignores versions on requests, and sends
* replies with v0 in the header.
*
* example:
* using echo_service_v0 = echo_service_base<v0_message_codec>;
*
* Note that for serde-supported messages a vassert(false) is generated. First,
* the v0_message_encoder is only used in tests. Second, serde usage is not
* possible in v0 servers, so this restriction is realistic. And from a
* practical standpoint this allows us to avoid bifurcation of services (or more
* sfinae magic) in tests so that serde-only types were never present within a
* service configured with a v0_message_encoder.
*/
struct v0_message_codec {
template<typename T>
static ss::future<T> decode(iobuf_parser& parser, transport_version) {
if constexpr (is_rpc_adl_exempt<T>) {
vassert(false, "Cannot use serde-only types in v0 server");
} else {
return reflection::async_adl<T>{}.from(parser);
}
}

static transport_version response_version(const header&) {
return transport_version::v0;
}

template<typename T>
static ss::future<transport_version>
encode(iobuf& out, T msg, transport_version) {
if constexpr (is_rpc_adl_exempt<T>) {
vassert(false, "Cannot use serde-only types in v0 server");
} else {
return reflection::async_adl<T>{}.to(out, std::move(msg)).then([] {
return transport_version::v0;
});
}
}
};

template<typename T, typename Codec>
ss::future<T> parse_type(ss::input_stream<char>& in, const header& h) {
return read_iobuf_exactly(in, h.payload_size).then([h](iobuf io) {
validate_payload_and_header(io, h);
Expand All @@ -104,8 +288,8 @@ ss::future<T> parse_type(ss::input_stream<char>& in, const header& h) {

auto p = std::make_unique<iobuf_parser>(std::move(io));
auto raw = p.get();
return reflection::async_adl<T>{}.from(*raw).finally(
[p = std::move(p)] {});
return Codec::template decode<T>(*raw, h.version)
.finally([p = std::move(p)] {});
});
}

Expand Down
30 changes: 23 additions & 7 deletions src/v/rpc/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ namespace rpc {

/// \brief most service implementations will be codegenerated
struct service {
template<typename Input, typename Output>
template<typename Input, typename Output, typename Codec>
struct execution_helper;

service() = default;
Expand All @@ -50,7 +50,7 @@ class rpc_internal_body_parsing_exception : public std::exception {
seastar::sstring _what;
};

template<typename Input, typename Output>
template<typename Input, typename Output, typename Codec>
struct service::execution_helper {
using input = Input;
using output = Output;
Expand All @@ -63,7 +63,7 @@ struct service::execution_helper {
Func&& f) {
return ctx.permanent_memory_reservation(ctx.get_header().payload_size)
.then([f = std::forward<Func>(f), method_id, &in, &ctx]() mutable {
return parse_type<Input>(in, ctx.get_header())
return parse_type<Input, Codec>(in, ctx.get_header())
.then_wrapped([f = std::forward<Func>(f),
&ctx](ss::future<Input> input_f) mutable {
if (input_f.failed()) {
Expand All @@ -74,13 +74,29 @@ struct service::execution_helper {
auto input = input_f.get0();
return f(std::move(input), ctx);
})
.then([method_id](Output out) mutable {
.then([method_id, &ctx](Output out) mutable {
const auto version = Codec::response_version(
ctx.get_header());
auto b = std::make_unique<netbuf>();
auto raw_b = b.get();
raw_b->set_service_method_id(method_id);
return reflection::async_adl<Output>{}
.to(raw_b->buffer(), std::move(out))
.then([b = std::move(b)] { return std::move(*b); });
raw_b->set_version(version);
return Codec::encode(
raw_b->buffer(), std::move(out), version)
.then([version, b = std::move(b)](
transport_version effective_version) {
/*
* this assertion is safe because the conditions under
* which this assertion would fail should have been
* verified in parse_type above.
*/
vassert(
effective_version == version,
"Unexpected encoding at effective {} != {}",
effective_version,
version);
return std::move(*b);
});
});
});
}
Expand Down
15 changes: 15 additions & 0 deletions src/v/rpc/test/echo_service.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,21 @@
"name": "throw_exception",
"input_type": "throw_req",
"output_type": "throw_resp"
},
{
"name": "echo_adl_only",
"input_type": "echo_req_adl_only",
"output_type": "echo_resp_adl_only"
},
{
"name": "echo_adl_serde",
"input_type": "echo_req_adl_serde",
"output_type": "echo_resp_adl_serde"
},
{
"name": "echo_serde_only",
"input_type": "echo_req_serde_only",
"output_type": "echo_resp_serde_only"
}
]
}
5 changes: 3 additions & 2 deletions src/v/rpc/test/netbuf_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ namespace rpc {
/// \brief expects the inputstream to be prefixed by an rpc::header
template<typename T>
ss::future<T> parse_framed(ss::input_stream<char>& in) {
return parse_header(in).then(
[&in](std::optional<header> o) { return parse_type<T>(in, o.value()); });
return parse_header(in).then([&in](std::optional<header> o) {
return parse_type<T, default_message_codec>(in, o.value());
});
}
} // namespace rpc

Expand Down
Loading

0 comments on commit 5808440

Please sign in to comment.