Skip to content

Commit

Permalink
Merge pull request #5520 from dotnwat/fix-5506
Browse files Browse the repository at this point in the history
rpc: reset transport version via reconnect_transport
  • Loading branch information
dotnwat committed Jul 21, 2022
2 parents a80aca9 + f3c6b5f commit b2403ff
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 6 deletions.
6 changes: 3 additions & 3 deletions src/v/rpc/reconnect_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ class reconnect_transport {

bool is_valid() const { return _transport.is_valid(); }

ss::future<result<rpc::transport*>> reconnect(clock_type::time_point);
ss::future<result<rpc::transport*>> reconnect(clock_type::duration);

rpc::transport& get() { return _transport; }

/// safe client connect - attempts to reconnect if not connected
Expand All @@ -50,6 +47,9 @@ class reconnect_transport {
ss::future<> stop();

private:
ss::future<result<rpc::transport*>> reconnect(clock_type::time_point);
ss::future<result<rpc::transport*>> reconnect(clock_type::duration);

rpc::transport _transport;
rpc::clock_type::time_point _stamp{rpc::clock_type::now()};
ss::semaphore _connected_sem{1};
Expand Down
19 changes: 19 additions & 0 deletions src/v/rpc/simple_protocol.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ simple_protocol::dispatch_method_once(header h, net::server::resources rs) {
ctx->res.conn->addr);
rs.probe().method_not_found();
netbuf reply_buf;
reply_buf.set_version(ctx->get_header().version);
reply_buf.set_status(rpc::status::method_not_found);
return send_reply_skip_payload(ctx, std::move(reply_buf))
.then([ctx] { ctx->signal_body_parse(); });
Expand All @@ -163,10 +164,12 @@ simple_protocol::dispatch_method_once(header h, net::server::resources rs) {
return m->handle(ctx->res.conn->input(), *ctx)
.then_wrapped([ctx, m, l = ctx->res.hist().auto_measure(), rs](
ss::future<netbuf> fut) mutable {
bool error = true;
netbuf reply_buf;
try {
reply_buf = fut.get0();
reply_buf.set_status(rpc::status::success);
error = false;
} catch (const rpc_internal_body_parsing_exception& e) {
// We have to distinguish between exceptions thrown by
// the service handler and the one caused by the
Expand All @@ -176,6 +179,8 @@ simple_protocol::dispatch_method_once(header h, net::server::resources rs) {
ctx->pr.set_exception(e);
return ss::now();
} catch (const ss::timed_out_error& e) {
rpclog.debug("Timing out request on timed_out_error "
"(shutting down)");
reply_buf.set_status(rpc::status::request_timeout);
} catch (const ss::gate_closed_exception& e) {
// gate_closed is typical during shutdown. Treat
Expand All @@ -202,6 +207,20 @@ simple_protocol::dispatch_method_once(header h, net::server::resources rs) {
rs.probe().service_error();
reply_buf.set_status(rpc::status::server_error);
}
if (error) {
/*
* when an exception occurs while processing a request
* reply_buf is sent back to the client as a reply with
* a status/error code and no body. the version needs to
* also be set to match the request version to avoid the
* client complaining about an unexpected version.
*
* in the error free case the handler will provide a
* fully defined (with version) netbuf which replaces
* reply_buf (see try block above).
*/
reply_buf.set_version(ctx->get_header().version);
}
return send_reply(ctx, std::move(reply_buf))
.finally([m, l = std::move(l)]() mutable {
m->probes.latency_hist().record(std::move(l));
Expand Down
4 changes: 2 additions & 2 deletions src/v/rpc/test/rpc_gen_cycling_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -889,8 +889,8 @@ FIXTURE_TEST(oc_ns_adl_serde_no_upgrade, rpc_integration_fixture) {
start_server();

rpc::transport t(client_config());
t.set_version(rpc::transport_version::v0);
t.connect(model::no_timeout).get();
t.set_version(rpc::transport_version::v0); // connect resets version=v1
auto stop = ss::defer([&t] { t.stop().get(); });
auto client = echo::echo_client_protocol(t);

Expand Down Expand Up @@ -920,8 +920,8 @@ FIXTURE_TEST(oc_ns_adl_only_no_upgrade, rpc_integration_fixture) {
start_server();

rpc::transport t(client_config());
t.set_version(rpc::transport_version::v0);
t.connect(model::no_timeout).get();
t.set_version(rpc::transport_version::v0); // connect resets version=v1
auto stop = ss::defer([&t] { t.stop().get(); });
auto client = echo::echo_client_protocol(t);

Expand Down
1 change: 1 addition & 0 deletions src/v/rpc/transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ void transport::reset_state() {
_correlation_idx = 0;
_last_seq = sequence_t{0};
_seq = sequence_t{0};
_version = transport_version::v1;
}

ss::future<>
Expand Down
4 changes: 3 additions & 1 deletion src/v/rpc/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ class transport final : public net::base_transport {
* version level used when dispatching requests. this value may change
* during the lifetime of the transport. for example the version may be
* upgraded if it is discovered that a server supports a newer version.
*
* reset to v1 in reset_state() to support reconnect_transport.
*/
transport_version _version{transport_version::v1};

Expand Down Expand Up @@ -170,7 +172,7 @@ ss::future<result<rpc::client_context<T>>> parse_result(
}
if (protocol_violation) {
vlog(
rpclog.warn,
rpclog.error,
"Protocol violation: request version {} incompatible with "
"reply version {}",
req_ver,
Expand Down

0 comments on commit b2403ff

Please sign in to comment.