Skip to content

Commit

Permalink
Merge pull request #5211 from graphcareful/unhandled-net-exception
Browse files Browse the repository at this point in the history
Catch unhandeled exceptions within `net::server`
  • Loading branch information
Rob Blafford committed Jun 24, 2022
2 parents f5aec8b + f2e8c55 commit 68b545b
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 6 deletions.
5 changes: 5 additions & 0 deletions src/v/net/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ std::optional<ss::sstring> is_disconnect_exception(std::exception_ptr e) {
// Happens on unclean client disconnect, typically wrapping
// an out_of_range
return "parse error";
} catch (...) {
// Global catch-all prevents stranded/non-handled exceptional futures.
// In all other non-explicity handled cases, the exception will not be
// related to disconnect issues, therefore fallthrough to return nullopt
// is acceptable.
}

return std::nullopt;
Expand Down
55 changes: 55 additions & 0 deletions src/v/rpc/test/rpc_gen_cycling_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -598,3 +598,58 @@ FIXTURE_TEST(version_not_supported, rpc_integration_fixture) {

t.stop().get();
}

class erroneous_protocol_exception : public std::exception {};

class erroneous_protocol final : public net::server::protocol {
public:
template<std::derived_from<rpc::service> T, typename... Args>
void register_service(Args&&... args) {
_services.push_back(std::make_unique<T>(std::forward<Args>(args)...));
}

const char* name() const final { return "redpanda erraneous proto"; };

ss::future<> apply(net::server::resources rs) final {
return ss::do_until(
[rs] { return rs.conn->input().eof() || rs.abort_requested(); },
[this, rs]() mutable {
return ss::make_exception_future<>(
erroneous_protocol_exception());
});
}

private:
std::vector<std::unique_ptr<rpc::service>> _services;
};

class erroneous_service_fixture
: public rpc_fixture_swappable_proto<erroneous_protocol> {
public:
erroneous_service_fixture()
: rpc_fixture_swappable_proto(redpanda_rpc_port) {}

void register_services() {
register_service<movistar>();
register_service<echo_impl>();
}

static constexpr uint16_t redpanda_rpc_port = 32147;
};

FIXTURE_TEST(unhandled_throw_in_proto_apply, erroneous_service_fixture) {
configure_server();
register_services();
start_server();

rpc::transport t(client_config());
t.connect(model::no_timeout).get();
auto client = echo::echo_client_protocol(t);

/// Server should not crash at this line
client
.echo(
echo::echo_req{.str = "testing..."}, rpc::client_opts(rpc::no_timeout))
.get();
t.stop().get();
}
16 changes: 10 additions & 6 deletions src/v/rpc/test/rpc_integration_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,13 @@ class rpc_base_integration_fixture {
virtual void check_server() = 0;
};

class rpc_simple_integration_fixture : public rpc_base_integration_fixture {
template<std::derived_from<net::server::protocol> T>
class rpc_fixture_swappable_proto : public rpc_base_integration_fixture {
public:
explicit rpc_simple_integration_fixture(uint16_t port)
explicit rpc_fixture_swappable_proto(uint16_t port)
: rpc_base_integration_fixture(port) {}

~rpc_simple_integration_fixture() override { stop_server(); }
~rpc_fixture_swappable_proto() override { stop_server(); }

void start_server() override {
check_server();
Expand Down Expand Up @@ -98,13 +99,13 @@ class rpc_simple_integration_fixture : public rpc_base_integration_fixture {
scfg.max_service_memory_per_core = static_cast<int64_t>(
ss::memory::stats().total_memory() / 10);
_server = std::make_unique<net::server>(std::move(scfg));
_proto = std::make_unique<rpc::simple_protocol>();
_proto = std::make_unique<T>();
}

template<typename Service, typename... Args>
void register_service(Args&&... args) {
check_server();
_proto->register_service<Service>(
_proto->template register_service<Service>(
_sg, _ssg, std::forward<Args>(args)...);
}

Expand All @@ -115,10 +116,13 @@ class rpc_simple_integration_fixture : public rpc_base_integration_fixture {
}
}

std::unique_ptr<rpc::simple_protocol> _proto;
std::unique_ptr<T> _proto;
std::unique_ptr<net::server> _server;
};

using rpc_simple_integration_fixture
= rpc_fixture_swappable_proto<rpc::simple_protocol>;

class rpc_sharded_integration_fixture : public rpc_base_integration_fixture {
public:
explicit rpc_sharded_integration_fixture(uint16_t port)
Expand Down

0 comments on commit 68b545b

Please sign in to comment.