Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc: reset transport version via reconnect_transport #5520

Merged
merged 5 commits into from
Jul 21, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/v/rpc/simple_protocol.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ simple_protocol::dispatch_method_once(header h, net::server::resources rs) {
ctx->res.conn->addr);
rs.probe().method_not_found();
netbuf reply_buf;
reply_buf.set_version(ctx->get_header().version);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes me think it'd be nice to be able to add a vassert that we never send a v0 netbuf over the wire, e.g. in send_reply(). I don't think we can though since we'll still send v0 over the wire if talking to an old server (though it gets ignored regardless)

Another thought is that we could use some simple_reply_buf around the simple protocol instead of netbuf that is constructed with an initial version (even if it's just _version).

I don't feel strongly that either of these would be much better than what's there, but just brainstorming ways we can be sure our sent bytes are reasonable

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can though since we'll still send v0 over the wire if talking to an old server (though it gets ignored regardless)

correct

Another thought is that we could use some simple_reply_buf around the simple protocol instead of netbuf that is constructed with an initial version (even if it's just _version).

i think the right move would be to add a constructor to netbuf so it can't be constructed without providing a version

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea that seems reasonable too. I was initially worried we'd end up with some lifecycle issues, e.g. not knowing what the right version is at time of construction, but I agree baking this into netbuf would feel better

reply_buf.set_status(rpc::status::method_not_found);
return send_reply_skip_payload(ctx, std::move(reply_buf))
.then([ctx] { ctx->signal_body_parse(); });
Expand All @@ -163,10 +164,12 @@ simple_protocol::dispatch_method_once(header h, net::server::resources rs) {
return m->handle(ctx->res.conn->input(), *ctx)
.then_wrapped([ctx, m, l = ctx->res.hist().auto_measure(), rs](
ss::future<netbuf> fut) mutable {
bool error = true;
netbuf reply_buf;
try {
reply_buf = fut.get0();
reply_buf.set_status(rpc::status::success);
error = false;
} catch (const rpc_internal_body_parsing_exception& e) {
// We have to distinguish between exceptions thrown by
// the service handler and the one caused by the
Expand Down Expand Up @@ -202,6 +205,20 @@ simple_protocol::dispatch_method_once(header h, net::server::resources rs) {
rs.probe().service_error();
reply_buf.set_status(rpc::status::server_error);
}
if (error) {
/*
* when an exception occurs while processing a request
* reply_buf is sent back to the client as a reply with
* a status/error code and no body. the version needs to
* also be set to match the request version to avoid the
* client complaining about an unexpected version.
*
* in the error free case the handler will provide a
* fully defined (with version) netbuf which replaces
* reply_buf (see try block above).
*/
reply_buf.set_version(ctx->get_header().version);
}
return send_reply(ctx, std::move(reply_buf))
.finally([m, l = std::move(l)]() mutable {
m->probes.latency_hist().record(std::move(l));
Expand Down