diff --git a/src/v/rpc/reconnect_transport.h b/src/v/rpc/reconnect_transport.h index 5c2228ccce39..80ff3cbbcfa0 100644 --- a/src/v/rpc/reconnect_transport.h +++ b/src/v/rpc/reconnect_transport.h @@ -32,9 +32,6 @@ class reconnect_transport { bool is_valid() const { return _transport.is_valid(); } - ss::future> reconnect(clock_type::time_point); - ss::future> reconnect(clock_type::duration); - rpc::transport& get() { return _transport; } /// safe client connect - attempts to reconnect if not connected @@ -50,6 +47,9 @@ class reconnect_transport { ss::future<> stop(); private: + ss::future> reconnect(clock_type::time_point); + ss::future> reconnect(clock_type::duration); + rpc::transport _transport; rpc::clock_type::time_point _stamp{rpc::clock_type::now()}; ss::semaphore _connected_sem{1}; diff --git a/src/v/rpc/simple_protocol.cc b/src/v/rpc/simple_protocol.cc index de4398a75ba2..261b214671a1 100644 --- a/src/v/rpc/simple_protocol.cc +++ b/src/v/rpc/simple_protocol.cc @@ -153,6 +153,7 @@ simple_protocol::dispatch_method_once(header h, net::server::resources rs) { ctx->res.conn->addr); rs.probe().method_not_found(); netbuf reply_buf; + reply_buf.set_version(ctx->get_header().version); reply_buf.set_status(rpc::status::method_not_found); return send_reply_skip_payload(ctx, std::move(reply_buf)) .then([ctx] { ctx->signal_body_parse(); }); @@ -163,10 +164,12 @@ simple_protocol::dispatch_method_once(header h, net::server::resources rs) { return m->handle(ctx->res.conn->input(), *ctx) .then_wrapped([ctx, m, l = ctx->res.hist().auto_measure(), rs]( ss::future fut) mutable { + bool error = true; netbuf reply_buf; try { reply_buf = fut.get0(); reply_buf.set_status(rpc::status::success); + error = false; } catch (const rpc_internal_body_parsing_exception& e) { // We have to distinguish between exceptions thrown by // the service handler and the one caused by the @@ -176,6 +179,8 @@ simple_protocol::dispatch_method_once(header h, net::server::resources rs) { ctx->pr.set_exception(e); return ss::now(); } catch (const ss::timed_out_error& e) { + rpclog.debug("Timing out request on timed_out_error " + "(shutting down)"); reply_buf.set_status(rpc::status::request_timeout); } catch (const ss::gate_closed_exception& e) { // gate_closed is typical during shutdown. Treat @@ -202,6 +207,20 @@ simple_protocol::dispatch_method_once(header h, net::server::resources rs) { rs.probe().service_error(); reply_buf.set_status(rpc::status::server_error); } + if (error) { + /* + * when an exception occurs while processing a request + * reply_buf is sent back to the client as a reply with + * a status/error code and no body. the version needs to + * also be set to match the request version to avoid the + * client complaining about an unexpected version. + * + * in the error free case the handler will provide a + * fully defined (with version) netbuf which replaces + * reply_buf (see try block above). + */ + reply_buf.set_version(ctx->get_header().version); + } return send_reply(ctx, std::move(reply_buf)) .finally([m, l = std::move(l)]() mutable { m->probes.latency_hist().record(std::move(l)); diff --git a/src/v/rpc/test/rpc_gen_cycling_test.cc b/src/v/rpc/test/rpc_gen_cycling_test.cc index b95c5b893ef3..6c6133554edf 100644 --- a/src/v/rpc/test/rpc_gen_cycling_test.cc +++ b/src/v/rpc/test/rpc_gen_cycling_test.cc @@ -889,8 +889,8 @@ FIXTURE_TEST(oc_ns_adl_serde_no_upgrade, rpc_integration_fixture) { start_server(); rpc::transport t(client_config()); - t.set_version(rpc::transport_version::v0); t.connect(model::no_timeout).get(); + t.set_version(rpc::transport_version::v0); // connect resets version=v1 auto stop = ss::defer([&t] { t.stop().get(); }); auto client = echo::echo_client_protocol(t); @@ -920,8 +920,8 @@ FIXTURE_TEST(oc_ns_adl_only_no_upgrade, rpc_integration_fixture) { start_server(); rpc::transport t(client_config()); - t.set_version(rpc::transport_version::v0); t.connect(model::no_timeout).get(); + t.set_version(rpc::transport_version::v0); // connect resets version=v1 auto stop = ss::defer([&t] { t.stop().get(); }); auto client = echo::echo_client_protocol(t); diff --git a/src/v/rpc/transport.cc b/src/v/rpc/transport.cc index 9d1000791664..70505c395a2b 100644 --- a/src/v/rpc/transport.cc +++ b/src/v/rpc/transport.cc @@ -83,6 +83,7 @@ void transport::reset_state() { _correlation_idx = 0; _last_seq = sequence_t{0}; _seq = sequence_t{0}; + _version = transport_version::v1; } ss::future<> diff --git a/src/v/rpc/transport.h b/src/v/rpc/transport.h index abbdf6d078cd..0a371beecaf3 100644 --- a/src/v/rpc/transport.h +++ b/src/v/rpc/transport.h @@ -112,6 +112,8 @@ class transport final : public net::base_transport { * version level used when dispatching requests. this value may change * during the lifetime of the transport. for example the version may be * upgraded if it is discovered that a server supports a newer version. + * + * reset to v1 in reset_state() to support reconnect_transport. */ transport_version _version{transport_version::v1}; @@ -170,7 +172,7 @@ ss::future>> parse_result( } if (protocol_violation) { vlog( - rpclog.warn, + rpclog.error, "Protocol violation: request version {} incompatible with " "reply version {}", req_ver,