From 75d50403b1af0122e46f3571c97154eb6dad2385 Mon Sep 17 00:00:00 2001 From: Noah Watkins Date: Wed, 20 Jul 2022 14:10:38 -0700 Subject: [PATCH 1/5] rpc: set reply version on error When an error occurs while processing a request (e.g. a handler throws or the server receives an unsupported api request) a default initialized netbuf is created to hold the error and is sent back to the client. Prior to this commit the header included v0 as the version for such error replies. Since the error may be returned to a client whose transport was upgraded to v2 a protocol violation error would be logged by the client. This commit matches the reply version to the request version for error replies under the reasonable assumption that (1) the rpc header is compatibile across transport versions and (2) the errors contain no decodable payloads. Signed-off-by: Noah Watkins --- src/v/rpc/simple_protocol.cc | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/v/rpc/simple_protocol.cc b/src/v/rpc/simple_protocol.cc index de4398a75ba2..a5f050934abf 100644 --- a/src/v/rpc/simple_protocol.cc +++ b/src/v/rpc/simple_protocol.cc @@ -153,6 +153,7 @@ simple_protocol::dispatch_method_once(header h, net::server::resources rs) { ctx->res.conn->addr); rs.probe().method_not_found(); netbuf reply_buf; + reply_buf.set_version(ctx->get_header().version); reply_buf.set_status(rpc::status::method_not_found); return send_reply_skip_payload(ctx, std::move(reply_buf)) .then([ctx] { ctx->signal_body_parse(); }); @@ -163,10 +164,12 @@ simple_protocol::dispatch_method_once(header h, net::server::resources rs) { return m->handle(ctx->res.conn->input(), *ctx) .then_wrapped([ctx, m, l = ctx->res.hist().auto_measure(), rs]( ss::future fut) mutable { + bool error = true; netbuf reply_buf; try { reply_buf = fut.get0(); reply_buf.set_status(rpc::status::success); + error = false; } catch (const rpc_internal_body_parsing_exception& e) { // We have to distinguish between exceptions thrown by // the service handler and the one caused by the @@ -202,6 +205,20 @@ simple_protocol::dispatch_method_once(header h, net::server::resources rs) { rs.probe().service_error(); reply_buf.set_status(rpc::status::server_error); } + if (error) { + /* + * when an exception occurs while processing a request + * reply_buf is sent back to the client as a reply with + * a status/error code and no body. the version needs to + * also be set to match the request version to avoid the + * client complaining about an unexpected version. + * + * in the error free case the handler will provide a + * fully defined (with version) netbuf which replaces + * reply_buf (see try block above). + */ + reply_buf.set_version(ctx->get_header().version); + } return send_reply(ctx, std::move(reply_buf)) .finally([m, l = std::move(l)]() mutable { m->probes.latency_hist().record(std::move(l)); From 2dcc1b99933b39df64ae6ffab4abc250d5be0aa4 Mon Sep 17 00:00:00 2001 From: Noah Watkins Date: Wed, 20 Jul 2022 14:12:01 -0700 Subject: [PATCH 2/5] rpc: disambiguate timeout errors from other exceptions Other exceptions log messages when returning timeout errors to the client. However, an actual timeout exception didn't log so an observer would have to deduce exactly what happened based on an omission in the logs. Signed-off-by: Noah Watkins --- src/v/rpc/simple_protocol.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/v/rpc/simple_protocol.cc b/src/v/rpc/simple_protocol.cc index a5f050934abf..261b214671a1 100644 --- a/src/v/rpc/simple_protocol.cc +++ b/src/v/rpc/simple_protocol.cc @@ -179,6 +179,8 @@ simple_protocol::dispatch_method_once(header h, net::server::resources rs) { ctx->pr.set_exception(e); return ss::now(); } catch (const ss::timed_out_error& e) { + rpclog.debug("Timing out request on timed_out_error " + "(shutting down)"); reply_buf.set_status(rpc::status::request_timeout); } catch (const ss::gate_closed_exception& e) { // gate_closed is typical during shutdown. Treat From dfe2e136bd34f87f92b1b5f74b68c2ab6c8bd8ae Mon Sep 17 00:00:00 2001 From: Noah Watkins Date: Tue, 19 Jul 2022 11:57:31 -0700 Subject: [PATCH 3/5] rpc: privatize internal apis Signed-off-by: Noah Watkins --- src/v/rpc/reconnect_transport.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/v/rpc/reconnect_transport.h b/src/v/rpc/reconnect_transport.h index 5c2228ccce39..80ff3cbbcfa0 100644 --- a/src/v/rpc/reconnect_transport.h +++ b/src/v/rpc/reconnect_transport.h @@ -32,9 +32,6 @@ class reconnect_transport { bool is_valid() const { return _transport.is_valid(); } - ss::future> reconnect(clock_type::time_point); - ss::future> reconnect(clock_type::duration); - rpc::transport& get() { return _transport; } /// safe client connect - attempts to reconnect if not connected @@ -50,6 +47,9 @@ class reconnect_transport { ss::future<> stop(); private: + ss::future> reconnect(clock_type::time_point); + ss::future> reconnect(clock_type::duration); + rpc::transport _transport; rpc::clock_type::time_point _stamp{rpc::clock_type::now()}; ss::semaphore _connected_sem{1}; From ddbf0cccdafa4ed3e68e4fc7ea8a69e0bee2c42c Mon Sep 17 00:00:00 2001 From: Noah Watkins Date: Tue, 19 Jul 2022 13:56:52 -0700 Subject: [PATCH 4/5] rpc: reset transport version on connect This is needed for the case in which a peer on the other end of a reconnect_transport is downgraded. unless we reset the version on reconnect then we'll get a policy violation if the peer responds with a lower version than the transport had been upgraded to. Signed-off-by: Noah Watkins --- src/v/rpc/test/rpc_gen_cycling_test.cc | 4 ++-- src/v/rpc/transport.cc | 1 + src/v/rpc/transport.h | 2 ++ 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/v/rpc/test/rpc_gen_cycling_test.cc b/src/v/rpc/test/rpc_gen_cycling_test.cc index b95c5b893ef3..6c6133554edf 100644 --- a/src/v/rpc/test/rpc_gen_cycling_test.cc +++ b/src/v/rpc/test/rpc_gen_cycling_test.cc @@ -889,8 +889,8 @@ FIXTURE_TEST(oc_ns_adl_serde_no_upgrade, rpc_integration_fixture) { start_server(); rpc::transport t(client_config()); - t.set_version(rpc::transport_version::v0); t.connect(model::no_timeout).get(); + t.set_version(rpc::transport_version::v0); // connect resets version=v1 auto stop = ss::defer([&t] { t.stop().get(); }); auto client = echo::echo_client_protocol(t); @@ -920,8 +920,8 @@ FIXTURE_TEST(oc_ns_adl_only_no_upgrade, rpc_integration_fixture) { start_server(); rpc::transport t(client_config()); - t.set_version(rpc::transport_version::v0); t.connect(model::no_timeout).get(); + t.set_version(rpc::transport_version::v0); // connect resets version=v1 auto stop = ss::defer([&t] { t.stop().get(); }); auto client = echo::echo_client_protocol(t); diff --git a/src/v/rpc/transport.cc b/src/v/rpc/transport.cc index 9d1000791664..70505c395a2b 100644 --- a/src/v/rpc/transport.cc +++ b/src/v/rpc/transport.cc @@ -83,6 +83,7 @@ void transport::reset_state() { _correlation_idx = 0; _last_seq = sequence_t{0}; _seq = sequence_t{0}; + _version = transport_version::v1; } ss::future<> diff --git a/src/v/rpc/transport.h b/src/v/rpc/transport.h index abbdf6d078cd..c59d94a64485 100644 --- a/src/v/rpc/transport.h +++ b/src/v/rpc/transport.h @@ -112,6 +112,8 @@ class transport final : public net::base_transport { * version level used when dispatching requests. this value may change * during the lifetime of the transport. for example the version may be * upgraded if it is discovered that a server supports a newer version. + * + * reset to v1 in reset_state() to support reconnect_transport. */ transport_version _version{transport_version::v1}; From f3c6b5f64e9e4378b74d187b2d9a0a7cfccf7ca2 Mon Sep 17 00:00:00 2001 From: Noah Watkins Date: Tue, 19 Jul 2022 13:57:07 -0700 Subject: [PATCH 5/5] rpc: log policy violations at error level Signed-off-by: Noah Watkins --- src/v/rpc/transport.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/v/rpc/transport.h b/src/v/rpc/transport.h index c59d94a64485..0a371beecaf3 100644 --- a/src/v/rpc/transport.h +++ b/src/v/rpc/transport.h @@ -172,7 +172,7 @@ ss::future>> parse_result( } if (protocol_violation) { vlog( - rpclog.warn, + rpclog.error, "Protocol violation: request version {} incompatible with " "reply version {}", req_ver,