Skip to content

Commit

Permalink
Merge pull request #6769 from dotnwat/lambda-this-coro
Browse files Browse the repository at this point in the history
net: avoid `this` capture in lambda coroutine
  • Loading branch information
dotnwat committed Nov 1, 2022
2 parents 51ff604 + e5e7368 commit 8eca8bd
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 91 deletions.
2 changes: 1 addition & 1 deletion src/v/net/conn_quota.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

namespace net {

conn_quota::units::~units() {
conn_quota::units::~units() noexcept {
if (_quotas) {
(*_quotas).get().put(_addr);
}
Expand Down
13 changes: 7 additions & 6 deletions src/v/net/conn_quota.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,32 +52,33 @@ class conn_quota : public ss::peering_sharded_service<conn_quota> {
*/
class [[nodiscard]] units {
public:
units() {}
units() = default;

units(conn_quota& quotas, ss::net::inet_address const& addr)
: _quotas(std::ref(quotas))
, _addr(addr) {}

units(units const&) = delete;
units& operator=(units const&) = delete;
units(units&& rhs) noexcept
: _addr(std::move(rhs._addr)) {
: _addr(rhs._addr) {
_quotas = std::exchange(rhs._quotas, std::nullopt);
}
units& operator=(units&& rhs) {
units& operator=(units&& rhs) noexcept {
_quotas = std::exchange(rhs._quotas, std::nullopt);
_addr = std::move(rhs._addr);
_addr = rhs._addr;
return *this;
}

~units();
~units() noexcept;

/**
* A default-constructed `units` is not live, i.e.
* does not release anything on destruction. Once
* it is constructed with a reference to a `conn_quota`
* it becomes live.
*/
bool live() { return _quotas.has_value(); }
bool live() const { return _quotas.has_value(); }

private:
std::optional<std::reference_wrapper<conn_quota>> _quotas;
Expand Down
165 changes: 81 additions & 84 deletions src/v/net/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,94 +156,91 @@ static ss::future<> apply_proto(
ss::future<> server::accept(listener& s) {
return ss::repeat([this, &s]() mutable {
return s.socket.accept().then_wrapped(
[this, &s](ss::future<ss::accept_result> f_cs_sa) mutable
-> ss::future<ss::stop_iteration> {
if (_as.abort_requested()) {
f_cs_sa.ignore_ready_future();
co_return ss::stop_iteration::yes;
}
auto ar = f_cs_sa.get();
ar.connection.set_nodelay(true);
ar.connection.set_keepalive(true);

// `s` is a lambda reference argument to a coroutine:
// this is the last place we may refer to it before
// any scheduling points.
auto name = s.name;

conn_quota::units cq_units;
if (cfg.conn_quotas) {
cq_units = co_await cfg.conn_quotas->get().local().get(
ar.remote_address.addr());
if (!cq_units.live()) {
// Connection limit hit, drop this connection.
_probe.connection_rejected();
vlog(
rpc::rpclog.info,
"Connection limit reached, rejecting {}",
ar.remote_address.addr());
co_return ss::stop_iteration::no;
}
}

// Apply socket buffer size settings
if (cfg.tcp_recv_buf.has_value()) {
// Explicitly store in an int to decouple the
// config type from the set_sockopt type.
int recv_buf = cfg.tcp_recv_buf.value();
ar.connection.set_sockopt(
SOL_SOCKET, SO_RCVBUF, &recv_buf, sizeof(recv_buf));
}

if (cfg.tcp_send_buf.has_value()) {
int send_buf = cfg.tcp_send_buf.value();
ar.connection.set_sockopt(
SOL_SOCKET, SO_SNDBUF, &send_buf, sizeof(send_buf));
}

if (_connection_rates) {
try {
co_await _connection_rates->maybe_wait(
ar.remote_address.addr());
} catch (const std::exception& e) {
vlog(
rpc::rpclog.trace,
"Timeout while waiting free token for connection rate. "
"addr:{}",
ar.remote_address);
_probe.timeout_waiting_rate_limit();
co_return ss::stop_iteration::no;
}
}

auto conn = ss::make_lw_shared<net::connection>(
_connections,
name,
std::move(ar.connection),
ar.remote_address,
_probe,
cfg.stream_recv_buf);
vlog(
rpc::rpclog.trace,
"{} - Incoming connection from {} on \"{}\"",
_proto->name(),
ar.remote_address,
name);
if (_conn_gate.is_closed()) {
co_await conn->shutdown();
throw ss::gate_closed_exception();
}
ssx::spawn_with_gate(
_conn_gate,
[this, conn, cq_units = std::move(cq_units)]() mutable {
return apply_proto(
_proto.get(), resources(this, conn), std::move(cq_units));
});
co_return ss::stop_iteration::no;
[this, &s](ss::future<ss::accept_result> f_cs_sa) {
return accept_finish(s.name, std::move(f_cs_sa));
});
});
}

ss::future<ss::stop_iteration>
server::accept_finish(ss::sstring name, ss::future<ss::accept_result> f_cs_sa) {
if (_as.abort_requested()) {
f_cs_sa.ignore_ready_future();
co_return ss::stop_iteration::yes;
}
auto ar = f_cs_sa.get();
ar.connection.set_nodelay(true);
ar.connection.set_keepalive(true);

conn_quota::units cq_units;
if (cfg.conn_quotas) {
cq_units = co_await cfg.conn_quotas->get().local().get(
ar.remote_address.addr());
if (!cq_units.live()) {
// Connection limit hit, drop this connection.
_probe.connection_rejected();
vlog(
rpc::rpclog.info,
"Connection limit reached, rejecting {}",
ar.remote_address.addr());
co_return ss::stop_iteration::no;
}
}

// Apply socket buffer size settings
if (cfg.tcp_recv_buf.has_value()) {
// Explicitly store in an int to decouple the
// config type from the set_sockopt type.
int recv_buf = cfg.tcp_recv_buf.value();
ar.connection.set_sockopt(
SOL_SOCKET, SO_RCVBUF, &recv_buf, sizeof(recv_buf));
}

if (cfg.tcp_send_buf.has_value()) {