Skip to content

Commit

Permalink
Merge pull request #5743 from dotnwat/fix-rpc-encoder-mismatch
Browse files Browse the repository at this point in the history
rpc: fix request/response codec mismatch
  • Loading branch information
dotnwat committed Aug 1, 2022
2 parents 2393184 + 3114c52 commit 9b41f3e
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 9 deletions.
2 changes: 1 addition & 1 deletion src/v/coproc/script_dispatcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ ss::future<result<bool>> script_dispatcher::heartbeat(int8_t connect_attempts) {
auto size = co_await _sdb.invoke_on(
script_database_main_shard,
[](script_database& sdb) { return sdb.size(); });
if (heartbeat.value().data() == static_cast<long>(size)) {
if (heartbeat.value().data.size == static_cast<long>(size)) {
co_return true;
}
/// There is a discrepency between the number of registered coprocs
Expand Down
5 changes: 3 additions & 2 deletions src/v/coproc/tests/utils/supervisor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -320,10 +320,11 @@ ss::future<state_size_t>
supervisor::heartbeat(empty_request&&, rpc::streaming_context&) {
if (!*_delay_heartbeat.local()) {
auto unique_scripts = co_await registered_scripts();
co_return state_size_t(unique_scripts.size());
co_return state_size_t{
.size = static_cast<int64_t>(unique_scripts.size())};
}
vlog(coproclog.warn, "Simulating node restart... heartbeat request");
co_return state_size_t(-1);
co_return state_size_t{.size = -1};
}

} // namespace coproc
12 changes: 9 additions & 3 deletions src/v/coproc/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,15 @@ struct enable_copros_reply {
std::vector<data> acks;
};

/// Stub for what should be 0 parameter method, 'disable_all_coprocessors'
using empty_request = named_type<int8_t, struct empty_req_tag>;
using state_size_t = named_type<int64_t, struct state_size_tag>;
struct empty_request {
using rpc_serde_exempt = std::true_type;
int8_t empty;
};

struct state_size_t {
using rpc_serde_exempt = std::true_type;
int64_t size;
};

/// \brief deregistration request, remove all topics registered to a coprocessor
/// with id 'script_id'.
Expand Down
19 changes: 19 additions & 0 deletions src/v/rpc/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,25 @@ struct service::execution_helper {
using input = Input;
using output = Output;

/*
* ensure that request and response types are compatible. this solves a
* particular class of issue where someone marks for example the request as
* exempt but forgets to mark the response. in this scenario it is then
* possible for the assertion regarding unexpected encodings to fire.
*/
static_assert(is_rpc_adl_exempt<Input> == is_rpc_adl_exempt<Output>);
static_assert(is_rpc_serde_exempt<Input> == is_rpc_serde_exempt<Output>);

/*
* provided that input/output are no exempt from serde support, require that
* they are both serde envelopes. this prevents a class of situation in
* which a request/response is a natively supported type, but not actually
* an envelope, such as a request being a raw integer or a named_type. we
* want our rpc's to be versioned.
*/
static_assert(is_rpc_serde_exempt<Input> || serde::is_envelope<Input>);
static_assert(is_rpc_serde_exempt<Output> || serde::is_envelope<Output>);

template<typename Func>
static ss::future<netbuf> exec(
ss::input_stream<char>& in,
Expand Down
4 changes: 2 additions & 2 deletions src/v/rpc/test/rpc_gen_cycling_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ struct echo_impl final : echo::echo_service_base<Codec> {

ss::future<echo::throw_resp>
throw_exception(echo::throw_req&& req, rpc::streaming_context&) final {
switch (req) {
switch (req.type) {
case echo::failure_type::exceptional_future:
return ss::make_exception_future<echo::throw_resp>(
std::runtime_error("gentle crash"));
Expand Down Expand Up @@ -406,7 +406,7 @@ FIXTURE_TEST(server_exception_test, rpc_integration_fixture) {
client.connect(model::no_timeout).get();
auto ret = client
.throw_exception(
echo::failure_type::exceptional_future,
{.type = echo::failure_type::exceptional_future},
rpc::client_opts(model::no_timeout))
.get0();

Expand Down
5 changes: 4 additions & 1 deletion src/v/rpc/test/rpc_gen_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ struct cnt_resp {

enum class failure_type { throw_exception, exceptional_future, none };

using throw_req = failure_type;
struct throw_req {
using rpc_serde_exempt = std::true_type;
failure_type type;
};

struct throw_resp {
using rpc_serde_exempt = std::true_type;
Expand Down

0 comments on commit 9b41f3e

Please sign in to comment.