Skip to content

Commit

Permalink
r/client_protocol: do not use rvalue references in client protocol
Browse files Browse the repository at this point in the history
Signed-off-by: Michał Maślanka <michal@redpanda.com>
  • Loading branch information
mmaslankaprv committed Aug 28, 2024
1 parent 989e09a commit a7be240
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 66 deletions.
22 changes: 9 additions & 13 deletions src/v/raft/buffered_protocol.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ ss::future<result<Ret>> try_with_gate(ss::gate& gate, Func&& f) {

template<typename Req, typename Resp>
using client_f = ss::future<result<Resp>> (consensus_client_protocol::*)(
model::node_id, Req&&, rpc::client_opts);
model::node_id, Req, rpc::client_opts);

template<typename Req, typename Ret>
ss::future<result<Ret>> apply_with_gate(
Expand Down Expand Up @@ -84,7 +84,7 @@ buffered_protocol::buffered_protocol(
}

ss::future<result<vote_reply>> buffered_protocol::vote(
model::node_id target_node, vote_request&& req, rpc::client_opts opts) {
model::node_id target_node, vote_request req, rpc::client_opts opts) {
return apply_with_gate(
_gate,
_base_protocol,
Expand All @@ -96,7 +96,7 @@ ss::future<result<vote_reply>> buffered_protocol::vote(

ss::future<result<append_entries_reply>> buffered_protocol::append_entries(
model::node_id target_node,
append_entries_request&& req,
append_entries_request req,
rpc::client_opts opts) {
return try_with_gate(
_gate,
Expand All @@ -121,7 +121,7 @@ ss::future<result<append_entries_reply>> buffered_protocol::append_entries(
};

ss::future<result<heartbeat_reply>> buffered_protocol::heartbeat(
model::node_id target_node, heartbeat_request&& req, rpc::client_opts opts) {
model::node_id target_node, heartbeat_request req, rpc::client_opts opts) {
return apply_with_gate(
_gate,
_base_protocol,
Expand All @@ -132,9 +132,7 @@ ss::future<result<heartbeat_reply>> buffered_protocol::heartbeat(
}

ss::future<result<heartbeat_reply_v2>> buffered_protocol::heartbeat_v2(
model::node_id target_node,
heartbeat_request_v2&& req,
rpc::client_opts opts) {
model::node_id target_node, heartbeat_request_v2 req, rpc::client_opts opts) {
return apply_with_gate(
_gate,
_base_protocol,
Expand All @@ -146,7 +144,7 @@ ss::future<result<heartbeat_reply_v2>> buffered_protocol::heartbeat_v2(

ss::future<result<install_snapshot_reply>> buffered_protocol::install_snapshot(
model::node_id target_node,
install_snapshot_request&& req,
install_snapshot_request req,
rpc::client_opts opts) {
return apply_with_gate(
_gate,
Expand All @@ -158,9 +156,7 @@ ss::future<result<install_snapshot_reply>> buffered_protocol::install_snapshot(
}

ss::future<result<timeout_now_reply>> buffered_protocol::timeout_now(
model::node_id target_node,
timeout_now_request&& req,
rpc::client_opts opts) {
model::node_id target_node, timeout_now_request req, rpc::client_opts opts) {
return apply_with_gate(
_gate,
_base_protocol,
Expand All @@ -173,7 +169,7 @@ ss::future<result<timeout_now_reply>> buffered_protocol::timeout_now(
ss::future<result<transfer_leadership_reply>>
buffered_protocol::transfer_leadership(
model::node_id target_node,
transfer_leadership_request&& req,
transfer_leadership_request req,
rpc::client_opts opts) {
return apply_with_gate(
_gate,
Expand Down Expand Up @@ -313,7 +309,7 @@ bool append_entries_queue::is_idle() const {
}

ss::future<result<append_entries_reply>> append_entries_queue::append_entries(
append_entries_request&& r, rpc::client_opts opts) {
append_entries_request r, rpc::client_opts opts) {
// or wait until we can buffer next request, this will propagate back
// pressure to caller
auto sz = r.total_size();
Expand Down
16 changes: 8 additions & 8 deletions src/v/raft/buffered_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ struct append_entries_queue {
~append_entries_queue() noexcept = default;

ss::future<result<append_entries_reply>>
append_entries(append_entries_request&&, rpc::client_opts);
append_entries(append_entries_request, rpc::client_opts);

ss::future<> stop();

Expand Down Expand Up @@ -126,26 +126,26 @@ class buffered_protocol : public consensus_client_protocol::impl {
config::binding<size_t> max_buffered_bytes);

ss::future<result<vote_reply>>
vote(model::node_id, vote_request&&, rpc::client_opts) final;
vote(model::node_id, vote_request, rpc::client_opts) final;

ss::future<result<append_entries_reply>> append_entries(
model::node_id, append_entries_request&&, rpc::client_opts) final;
model::node_id, append_entries_request, rpc::client_opts) final;

ss::future<result<heartbeat_reply>>
heartbeat(model::node_id, heartbeat_request&&, rpc::client_opts) final;
heartbeat(model::node_id, heartbeat_request, rpc::client_opts) final;

ss::future<result<heartbeat_reply_v2>> heartbeat_v2(
model::node_id, heartbeat_request_v2&&, rpc::client_opts) final;
model::node_id, heartbeat_request_v2, rpc::client_opts) final;

ss::future<result<install_snapshot_reply>> install_snapshot(
model::node_id, install_snapshot_request&&, rpc::client_opts) final;
model::node_id, install_snapshot_request, rpc::client_opts) final;

ss::future<result<timeout_now_reply>>
timeout_now(model::node_id, timeout_now_request&&, rpc::client_opts) final;
timeout_now(model::node_id, timeout_now_request, rpc::client_opts) final;
ss::future<bool> ensure_disconnect(model::node_id) final;

ss::future<result<transfer_leadership_reply>> transfer_leadership(
model::node_id, transfer_leadership_request&&, rpc::client_opts) final;
model::node_id, transfer_leadership_request, rpc::client_opts) final;

ss::future<> reset_backoff(model::node_id n) final;

Expand Down
32 changes: 15 additions & 17 deletions src/v/raft/consensus_client_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,31 +28,31 @@ class consensus_client_protocol final {
public:
struct impl {
virtual ss::future<result<vote_reply>>
vote(model::node_id, vote_request&&, rpc::client_opts) = 0;
vote(model::node_id, vote_request, rpc::client_opts) = 0;

virtual ss::future<result<append_entries_reply>> append_entries(
model::node_id, append_entries_request&&, rpc::client_opts)
model::node_id, append_entries_request, rpc::client_opts)
= 0;

virtual ss::future<result<heartbeat_reply>>
heartbeat(model::node_id, heartbeat_request&&, rpc::client_opts) = 0;
heartbeat(model::node_id, heartbeat_request, rpc::client_opts) = 0;
virtual ss::future<result<heartbeat_reply_v2>>
heartbeat_v2(model::node_id, heartbeat_request_v2&&, rpc::client_opts)
heartbeat_v2(model::node_id, heartbeat_request_v2, rpc::client_opts)
= 0;

virtual ss::future<result<install_snapshot_reply>> install_snapshot(
model::node_id, install_snapshot_request&&, rpc::client_opts)
model::node_id, install_snapshot_request, rpc::client_opts)
= 0;

virtual ss::future<result<timeout_now_reply>>
timeout_now(model::node_id, timeout_now_request&&, rpc::client_opts)
timeout_now(model::node_id, timeout_now_request, rpc::client_opts)
= 0;

virtual ss::future<bool> ensure_disconnect(model::node_id) = 0;

virtual ss::future<result<transfer_leadership_reply>>
transfer_leadership(
model::node_id, transfer_leadership_request&&, rpc::client_opts)
transfer_leadership(
model::node_id, transfer_leadership_request, rpc::client_opts)
= 0;

virtual ss::future<> reset_backoff(model::node_id) = 0;
Expand All @@ -64,42 +64,40 @@ class consensus_client_protocol final {
explicit consensus_client_protocol(ss::shared_ptr<impl> i)
: _impl(std::move(i)) {}
ss::future<result<vote_reply>>
vote(model::node_id target_node, vote_request&& r, rpc::client_opts opts) {
vote(model::node_id target_node, vote_request r, rpc::client_opts opts) {
return _impl->vote(target_node, std::move(r), std::move(opts));
}

ss::future<result<append_entries_reply>> append_entries(
model::node_id target_node,
append_entries_request&& r,
append_entries_request r,
rpc::client_opts opts) {
return _impl->append_entries(
target_node, std::move(r), std::move(opts));
}

ss::future<result<heartbeat_reply>> heartbeat(
model::node_id target_node,
heartbeat_request&& r,
rpc::client_opts opts) {
model::node_id target_node, heartbeat_request r, rpc::client_opts opts) {
return _impl->heartbeat(target_node, std::move(r), std::move(opts));
}
ss::future<result<heartbeat_reply_v2>> heartbeat_v2(
model::node_id target_node,
heartbeat_request_v2&& r,
heartbeat_request_v2 r,
rpc::client_opts opts) {
return _impl->heartbeat_v2(target_node, std::move(r), std::move(opts));
}

ss::future<result<install_snapshot_reply>> install_snapshot(
model::node_id target_node,
install_snapshot_request&& r,
install_snapshot_request r,
rpc::client_opts opts) {
return _impl->install_snapshot(
target_node, std::move(r), std::move(opts));
}

ss::future<result<timeout_now_reply>> timeout_now(
model::node_id target_node,
timeout_now_request&& r,
timeout_now_request r,
rpc::client_opts opts) {
return _impl->timeout_now(target_node, std::move(r), std::move(opts));
}
Expand All @@ -110,7 +108,7 @@ class consensus_client_protocol final {

ss::future<result<transfer_leadership_reply>> transfer_leadership(
model::node_id target_node,
transfer_leadership_request&& r,
transfer_leadership_request r,
rpc::client_opts opts) {
return _impl->transfer_leadership(
target_node, std::move(r), std::move(opts));
Expand Down
14 changes: 7 additions & 7 deletions src/v/raft/rpc_client_protocol.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
namespace raft {

ss::future<result<vote_reply>> rpc_client_protocol::vote(
model::node_id n, vote_request&& r, rpc::client_opts opts) {
model::node_id n, vote_request r, rpc::client_opts opts) {
auto timeout = opts.timeout;
return _connection_cache.local().with_node_client<raftgen_client_protocol>(
_self,
Expand All @@ -37,7 +37,7 @@ ss::future<result<vote_reply>> rpc_client_protocol::vote(
}

ss::future<result<append_entries_reply>> rpc_client_protocol::append_entries(
model::node_id n, append_entries_request&& r, rpc::client_opts opts) {
model::node_id n, append_entries_request r, rpc::client_opts opts) {
auto timeout = opts.timeout;
return _connection_cache.local().with_node_client<raftgen_client_protocol>(
_self,
Expand All @@ -55,7 +55,7 @@ ss::future<result<append_entries_reply>> rpc_client_protocol::append_entries(
}

ss::future<result<heartbeat_reply>> rpc_client_protocol::heartbeat(
model::node_id n, heartbeat_request&& r, rpc::client_opts opts) {
model::node_id n, heartbeat_request r, rpc::client_opts opts) {
auto timeout = opts.timeout;
return _connection_cache.local().with_node_client<raftgen_client_protocol>(
_self,
Expand All @@ -69,7 +69,7 @@ ss::future<result<heartbeat_reply>> rpc_client_protocol::heartbeat(
});
}
ss::future<result<heartbeat_reply_v2>> rpc_client_protocol::heartbeat_v2(
model::node_id n, heartbeat_request_v2&& r, rpc::client_opts opts) {
model::node_id n, heartbeat_request_v2 r, rpc::client_opts opts) {
auto timeout = opts.timeout;
return _connection_cache.local().with_node_client<raftgen_client_protocol>(
_self,
Expand All @@ -85,7 +85,7 @@ ss::future<result<heartbeat_reply_v2>> rpc_client_protocol::heartbeat_v2(

ss::future<result<install_snapshot_reply>>
rpc_client_protocol::install_snapshot(
model::node_id n, install_snapshot_request&& r, rpc::client_opts opts) {
model::node_id n, install_snapshot_request r, rpc::client_opts opts) {
auto timeout = opts.timeout;
return _connection_cache.local().with_node_client<raftgen_client_protocol>(
_self,
Expand All @@ -100,7 +100,7 @@ rpc_client_protocol::install_snapshot(
}

ss::future<result<timeout_now_reply>> rpc_client_protocol::timeout_now(
model::node_id n, timeout_now_request&& r, rpc::client_opts opts) {
model::node_id n, timeout_now_request r, rpc::client_opts opts) {
auto timeout = opts.timeout;
return _connection_cache.local().with_node_client<raftgen_client_protocol>(
_self,
Expand Down Expand Up @@ -149,7 +149,7 @@ ss::future<bool> rpc_client_protocol::ensure_disconnect(model::node_id n) {

ss::future<result<transfer_leadership_reply>>
rpc_client_protocol::transfer_leadership(
model::node_id n, transfer_leadership_request&& r, rpc::client_opts opts) {
model::node_id n, transfer_leadership_request r, rpc::client_opts opts) {
auto timeout = opts.timeout;
return _connection_cache.local().with_node_client<raftgen_client_protocol>(
_self,
Expand Down
14 changes: 7 additions & 7 deletions src/v/raft/rpc_client_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,27 @@ class rpc_client_protocol final : public consensus_client_protocol::impl {
, _connection_cache(cache) {}

ss::future<result<vote_reply>>
vote(model::node_id, vote_request&&, rpc::client_opts) final;
vote(model::node_id, vote_request, rpc::client_opts) final;

ss::future<result<append_entries_reply>> append_entries(
model::node_id, append_entries_request&&, rpc::client_opts) final;
model::node_id, append_entries_request, rpc::client_opts) final;

ss::future<result<heartbeat_reply>>
heartbeat(model::node_id, heartbeat_request&&, rpc::client_opts) final;
heartbeat(model::node_id, heartbeat_request, rpc::client_opts) final;

ss::future<result<heartbeat_reply_v2>> heartbeat_v2(
model::node_id, heartbeat_request_v2&&, rpc::client_opts) final;
model::node_id, heartbeat_request_v2, rpc::client_opts) final;

ss::future<result<install_snapshot_reply>> install_snapshot(
model::node_id, install_snapshot_request&&, rpc::client_opts) final;
model::node_id, install_snapshot_request, rpc::client_opts) final;

ss::future<result<timeout_now_reply>>
timeout_now(model::node_id, timeout_now_request&&, rpc::client_opts) final;
timeout_now(model::node_id, timeout_now_request, rpc::client_opts) final;

ss::future<bool> ensure_disconnect(model::node_id) final;

ss::future<result<transfer_leadership_reply>> transfer_leadership(
model::node_id, transfer_leadership_request&&, rpc::client_opts) final;
model::node_id, transfer_leadership_request, rpc::client_opts) final;

ss::future<> reset_backoff(model::node_id n);

Expand Down
14 changes: 7 additions & 7 deletions src/v/raft/tests/raft_fixture.cc
Original file line number Diff line number Diff line change
Expand Up @@ -293,43 +293,43 @@ in_memory_test_protocol::dispatch(model::node_id id, ReqT req) {
}

ss::future<result<vote_reply>> in_memory_test_protocol::vote(
model::node_id id, vote_request&& req, rpc::client_opts) {
model::node_id id, vote_request req, rpc::client_opts) {
return dispatch<vote_request, vote_reply>(id, req);
};

ss::future<result<append_entries_reply>>
in_memory_test_protocol::append_entries(
model::node_id id, append_entries_request&& req, rpc::client_opts) {
model::node_id id, append_entries_request req, rpc::client_opts) {
return dispatch<append_entries_request, append_entries_reply>(
id, std::move(req));
};

ss::future<result<heartbeat_reply>> in_memory_test_protocol::heartbeat(
model::node_id id, heartbeat_request&& req, rpc::client_opts) {
model::node_id id, heartbeat_request req, rpc::client_opts) {
return dispatch<heartbeat_request, heartbeat_reply>(id, std::move(req));
}

ss::future<result<heartbeat_reply_v2>> in_memory_test_protocol::heartbeat_v2(
model::node_id id, heartbeat_request_v2&& req, rpc::client_opts) {
model::node_id id, heartbeat_request_v2 req, rpc::client_opts) {
return dispatch<heartbeat_request_v2, heartbeat_reply_v2>(
id, std::move(req));
}

ss::future<result<install_snapshot_reply>>
in_memory_test_protocol::install_snapshot(
model::node_id id, install_snapshot_request&& req, rpc::client_opts) {
model::node_id id, install_snapshot_request req, rpc::client_opts) {
return dispatch<install_snapshot_request, install_snapshot_reply>(
id, std::move(req));
}

ss::future<result<timeout_now_reply>> in_memory_test_protocol::timeout_now(
model::node_id id, timeout_now_request&& req, rpc::client_opts) {
model::node_id id, timeout_now_request req, rpc::client_opts) {
return dispatch<timeout_now_request, timeout_now_reply>(id, std::move(req));
}

ss::future<result<transfer_leadership_reply>>
in_memory_test_protocol::transfer_leadership(
model::node_id id, transfer_leadership_request&& req, rpc::client_opts) {
model::node_id id, transfer_leadership_request req, rpc::client_opts) {
return dispatch<transfer_leadership_request, transfer_leadership_reply>(
id, std::move(req));
}
Expand Down
Loading

0 comments on commit a7be240

Please sign in to comment.