Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc: reset transport version via reconnect_transport #5520

Merged
merged 5 commits into from
Jul 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/v/rpc/reconnect_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ class reconnect_transport {

bool is_valid() const { return _transport.is_valid(); }

ss::future<result<rpc::transport*>> reconnect(clock_type::time_point);
ss::future<result<rpc::transport*>> reconnect(clock_type::duration);

rpc::transport& get() { return _transport; }

/// safe client connect - attempts to reconnect if not connected
Expand All @@ -50,6 +47,9 @@ class reconnect_transport {
ss::future<> stop();

private:
ss::future<result<rpc::transport*>> reconnect(clock_type::time_point);
ss::future<result<rpc::transport*>> reconnect(clock_type::duration);

rpc::transport _transport;
rpc::clock_type::time_point _stamp{rpc::clock_type::now()};
ss::semaphore _connected_sem{1};
Expand Down
19 changes: 19 additions & 0 deletions src/v/rpc/simple_protocol.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ simple_protocol::dispatch_method_once(header h, net::server::resources rs) {
ctx->res.conn->addr);
rs.probe().method_not_found();
netbuf reply_buf;
reply_buf.set_version(ctx->get_header().version);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes me think it'd be nice to be able to add a vassert that we never send a v0 netbuf over the wire, e.g. in send_reply(). I don't think we can though since we'll still send v0 over the wire if talking to an old server (though it gets ignored regardless)

Another thought is that we could use some simple_reply_buf around the simple protocol instead of netbuf that is constructed with an initial version (even if it's just _version).

I don't feel strongly that either of these would be much better than what's there, but just brainstorming ways we can be sure our sent bytes are reasonable

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can though since we'll still send v0 over the wire if talking to an old server (though it gets ignored regardless)

correct

Another thought is that we could use some simple_reply_buf around the simple protocol instead of netbuf that is constructed with an initial version (even if it's just _version).

i think the right move would be to add a constructor to netbuf so it can't be constructed without providing a version

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea that seems reasonable too. I was initially worried we'd end up with some lifecycle issues, e.g. not knowing what the right version is at time of construction, but I agree baking this into netbuf would feel better

reply_buf.set_status(rpc::status::method_not_found);
return send_reply_skip_payload(ctx, std::move(reply_buf))
.then([ctx] { ctx->signal_body_parse(); });
Expand All @@ -163,10 +164,12 @@ simple_protocol::dispatch_method_once(header h, net::server::resources rs) {
return m->handle(ctx->res.conn->input(), *ctx)
.then_wrapped([ctx, m, l = ctx->res.hist().auto_measure(), rs](
ss::future<netbuf> fut) mutable {
bool error = true;
netbuf reply_buf;
try {
reply_buf = fut.get0();
reply_buf.set_status(rpc::status::success);
error = false;
} catch (const rpc_internal_body_parsing_exception& e) {
// We have to distinguish between exceptions thrown by
// the service handler and the one caused by the
Expand All @@ -176,6 +179,8 @@ simple_protocol::dispatch_method_once(header h, net::server::resources rs) {
ctx->pr.set_exception(e);
return ss::now();
} catch (const ss::timed_out_error& e) {
rpclog.debug("Timing out request on timed_out_error "
"(shutting down)");
reply_buf.set_status(rpc::status::request_timeout);
} catch (const ss::gate_closed_exception& e) {
// gate_closed is typical during shutdown. Treat
Expand All @@ -202,6 +207,20 @@ simple_protocol::dispatch_method_once(header h, net::server::resources rs) {
rs.probe().service_error();
reply_buf.set_status(rpc::status::server_error);
}
if (error) {
/*
* when an exception occurs while processing a request
* reply_buf is sent back to the client as a reply with
* a status/error code and no body. the version needs to
* also be set to match the request version to avoid the
* client complaining about an unexpected version.
*
* in the error free case the handler will provide a
* fully defined (with version) netbuf which replaces
* reply_buf (see try block above).
*/
reply_buf.set_version(ctx->get_header().version);
}
return send_reply(ctx, std::move(reply_buf))
.finally([m, l = std::move(l)]() mutable {
m->probes.latency_hist().record(std::move(l));
Expand Down
4 changes: 2 additions & 2 deletions src/v/rpc/test/rpc_gen_cycling_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -889,8 +889,8 @@ FIXTURE_TEST(oc_ns_adl_serde_no_upgrade, rpc_integration_fixture) {
start_server();

rpc::transport t(client_config());
t.set_version(rpc::transport_version::v0);
t.connect(model::no_timeout).get();
t.set_version(rpc::transport_version::v0); // connect resets version=v1
auto stop = ss::defer([&t] { t.stop().get(); });
auto client = echo::echo_client_protocol(t);

Expand Down Expand Up @@ -920,8 +920,8 @@ FIXTURE_TEST(oc_ns_adl_only_no_upgrade, rpc_integration_fixture) {
start_server();

rpc::transport t(client_config());
t.set_version(rpc::transport_version::v0);
t.connect(model::no_timeout).get();
t.set_version(rpc::transport_version::v0); // connect resets version=v1
auto stop = ss::defer([&t] { t.stop().get(); });
auto client = echo::echo_client_protocol(t);

Expand Down
1 change: 1 addition & 0 deletions src/v/rpc/transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ void transport::reset_state() {
_correlation_idx = 0;
_last_seq = sequence_t{0};
_seq = sequence_t{0};
_version = transport_version::v1;
}

ss::future<>
Expand Down
4 changes: 3 additions & 1 deletion src/v/rpc/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ class transport final : public net::base_transport {
* version level used when dispatching requests. this value may change
* during the lifetime of the transport. for example the version may be
* upgraded if it is discovered that a server supports a newer version.
*
* reset to v1 in reset_state() to support reconnect_transport.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

*/
transport_version _version{transport_version::v1};

Expand Down Expand Up @@ -170,7 +172,7 @@ ss::future<result<rpc::client_context<T>>> parse_result(
}
if (protocol_violation) {
vlog(
rpclog.warn,
rpclog.error,
"Protocol violation: request version {} incompatible with "
"reply version {}",
req_ver,
Expand Down