Skip to content

Commit

Permalink
Merge pull request redpanda-data#7883 from dotnwat/net-server-resources
Browse files Browse the repository at this point in the history
net: eliminate indirection through `net::server:resources` type
  • Loading branch information
dotnwat committed Dec 21, 2022
2 parents cba5b3a + 208e09d commit 4e3ef8b
Show file tree
Hide file tree
Showing 11 changed files with 109 additions and 118 deletions.
60 changes: 30 additions & 30 deletions src/v/kafka/server/connection_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ using namespace std::chrono_literals;
namespace kafka {

ss::future<> connection_context::process_one_request() {
return parse_size(_rs.conn->input())
return parse_size(conn->input())
.then([this](std::optional<size_t> sz) mutable {
if (!sz) {
return ss::make_ready_future<>();
Expand All @@ -51,7 +51,7 @@ ss::future<> connection_context::process_one_request() {
klog.warn,
"request from {} is larger ({} bytes) than configured max "
"request size {}",
_rs.conn->addr,
conn->addr,
sz,
_max_request_size());
return ss::make_exception_future<>(
Expand All @@ -77,19 +77,19 @@ ss::future<> connection_context::process_one_request() {
return handle_auth_v0(*sz).handle_exception(
[this](std::exception_ptr e) {
vlog(klog.info, "Detected error processing request: {}", e);
_rs.conn->shutdown_input();
conn->shutdown_input();
});
}
return parse_header(_rs.conn->input())
return parse_header(conn->input())
.then([this,
s = sz.value()](std::optional<request_header> h) mutable {
_rs.probe().add_bytes_received(s);
_server.probe().add_bytes_received(s);
if (!h) {
vlog(
klog.debug,
"could not parse header from client: {}",
_rs.conn->addr);
_rs.probe().header_corrupted();
conn->addr);
_server.probe().header_corrupted();
return ss::make_ready_future<>();
}
return dispatch_method_once(std::move(h.value()), s)
Expand All @@ -98,9 +98,9 @@ ss::future<> connection_context::process_one_request() {
vlog(
klog.warn,
"Error while processing request from {} - {}",
_rs.conn->addr,
conn->addr,
e.what());
_rs.conn->shutdown_input();
conn->shutdown_input();
})
.handle_exception_type([this](const std::bad_alloc&) {
// In general, dispatch_method_once does not throw,
Expand All @@ -111,7 +111,7 @@ ss::future<> connection_context::process_one_request() {
klog.error,
"Request from {} failed on memory exhaustion "
"(std::bad_alloc)",
_rs.conn->addr);
conn->addr);
});
});
});
Expand Down Expand Up @@ -146,7 +146,7 @@ ss::future<> connection_context::handle_auth_v0(const size_t size) {
const api_version version(0);
iobuf request_buf;
{
auto data = co_await read_iobuf_exactly(_rs.conn->input(), size);
auto data = co_await read_iobuf_exactly(conn->input(), size);
sasl_authenticate_request request;
request.data.auth_bytes = iobuf_to_bytes(data);
response_writer writer(request_buf);
Expand Down Expand Up @@ -188,11 +188,11 @@ ss::future<> connection_context::handle_auth_v0(const size_t size) {
response_writer writer(data);
writer.write(response.data.auth_bytes);
auto msg = iobuf_as_scattered(std::move(data));
co_await _rs.conn->write(std::move(msg));
co_await conn->write(std::move(msg));
}

bool connection_context::is_finished_parsing() const {
return _rs.conn->input().eof() || _rs.abort_requested();
return conn->input().eof() || _server.abort_requested();
}

ss::future<session_resources> connection_context::throttle_request(
Expand All @@ -210,10 +210,10 @@ ss::future<session_resources> connection_context::throttle_request(
// affect.
auto delay = _server.quota_mgr().record_tp_and_throttle(
hdr.client_id, request_size);
auto tracker = std::make_unique<request_tracker>(_rs.probe());
auto tracker = std::make_unique<request_tracker>(_server.probe());
auto fut = ss::now();
if (!delay.first_violation) {
fut = ss::sleep_abortable(delay.duration, _rs.abort_source());
fut = ss::sleep_abortable(delay.duration, _server.abort_source());
}
auto track = track_latency(hdr.key);
return fut
Expand All @@ -236,7 +236,7 @@ ss::future<session_resources> connection_context::throttle_request(
.tracker = std::move(tracker),
};
if (track) {
r.method_latency = _rs.hist().auto_measure();
r.method_latency = _server.hist().auto_measure();
}
return r;
});
Expand All @@ -260,9 +260,9 @@ connection_context::reserve_request_units(api_key key, size_t size) {
mem_estimate,
handler ? (*handler)->name() : "<bad key>"));
}
auto fut = ss::get_units(_rs.memory(), mem_estimate);
if (_rs.memory().waiters()) {
_rs.probe().waiting_for_available_memory();
auto fut = ss::get_units(_server.memory(), mem_estimate);
if (_server.memory().waiters()) {
_server.probe().waiting_for_available_memory();
}
return fut;
}
Expand All @@ -272,7 +272,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) {
return throttle_request(hdr, size).then([this, hdr = std::move(hdr), size](
session_resources
sres_in) mutable {
if (_rs.abort_requested()) {
if (_server.abort_requested()) {
// protect against shutdown behavior
return ss::make_ready_future<>();
}
Expand All @@ -281,10 +281,10 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) {

auto remaining = size - request_header_size
- hdr.client_id_buffer.size() - hdr.tags_size_bytes;
return read_iobuf_exactly(_rs.conn->input(), remaining)
return read_iobuf_exactly(conn->input(), remaining)
.then([this, hdr = std::move(hdr), sres = std::move(sres)](
iobuf buf) mutable {
if (_rs.abort_requested()) {
if (_server.abort_requested()) {
// _server._cntrl etc might not be alive
return ss::now();
}
Expand Down Expand Up @@ -343,8 +343,8 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) {
e);
})
.finally([self, d = std::move(d)]() mutable {
self->_rs.probe().service_error();
self->_rs.probe().request_completed();
self->_server.probe().service_error();
self->_server.probe().request_completed();
return std::move(d);
});
}
Expand All @@ -353,7 +353,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) {
*/
ssx::background
= ssx::spawn_with_gate_then(
_rs.conn_gate(),
_server.conn_gate(),
[this,
f = std::move(f),
sres = std::move(sres),
Expand Down Expand Up @@ -387,7 +387,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) {
vlog(
klog.info,
"Disconnected {} ({})",
self->_rs.conn->addr,
self->conn->addr,
disconnected.value());
} else {
vlog(
Expand All @@ -396,15 +396,15 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) {
e);
}

self->_rs.probe().service_error();
self->_rs.conn->shutdown_input();
self->_server.probe().service_error();
self->conn->shutdown_input();
});
return d;
})
.handle_exception([self](std::exception_ptr e) {
vlog(
klog.info, "Detected error dispatching request: {}", e);
self->_rs.conn->shutdown_input();
self->conn->shutdown_input();
});
});
});
Expand Down Expand Up @@ -444,7 +444,7 @@ ss::future<> connection_context::maybe_process_responses() {

auto msg = response_as_scattered(std::move(resp_and_res.response));
try {
return _rs.conn->write(std::move(msg))
return conn->write(std::move(msg))
.then([] {
return ss::make_ready_future<ss::stop_iteration>(
ss::stop_iteration::no);
Expand Down
14 changes: 6 additions & 8 deletions src/v/kafka/server/connection_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,16 @@ class connection_context final
public:
connection_context(
server& s,
net::server::resources&& r,
ss::lw_shared_ptr<net::connection> conn,
std::optional<security::sasl_server> sasl,
bool enable_authorizer,
std::optional<security::tls::mtls_state> mtls_state,
config::binding<uint32_t> max_request_size) noexcept
: _server(s)
, _rs(std::move(r))
, conn(conn)
, _sasl(std::move(sasl))
// tests may build a context without a live connection
, _client_addr(_rs.conn ? _rs.conn->addr.addr() : ss::net::inet_address{})
, _client_addr(conn ? conn->addr.addr() : ss::net::inet_address{})
, _enable_authorizer(enable_authorizer)
, _authlog(_client_addr, client_port())
, _mtls_state(std::move(mtls_state))
Expand All @@ -108,7 +108,7 @@ class connection_context final
connection_context& operator=(connection_context&&) = delete;

server& server() { return _server; }
const ss::sstring& listener() const { return _rs.conn->name(); }
const ss::sstring& listener() const { return conn->name(); }
std::optional<security::sasl_server>& sasl() { return _sasl; }

template<typename T>
Expand Down Expand Up @@ -190,9 +190,7 @@ class connection_context final
ss::future<> process_one_request();
bool is_finished_parsing() const;
ss::net::inet_address client_host() const { return _client_addr; }
uint16_t client_port() const {
return _rs.conn ? _rs.conn->addr.port() : 0;
}
uint16_t client_port() const { return conn ? conn->addr.port() : 0; }

private:
// Reserve units from memory from the memory semaphore in proportion
Expand Down Expand Up @@ -293,7 +291,7 @@ class connection_context final
};

class server& _server;
net::server::resources _rs;
ss::lw_shared_ptr<net::connection> conn;
sequence_id _next_response;
sequence_id _seq_idx;
map_t _responses;
Expand Down
2 changes: 1 addition & 1 deletion src/v/kafka/server/request_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class request_context {

request_reader& reader() { return _reader; }

latency_probe& probe() { return _conn->server().probe(); }
latency_probe& probe() { return _conn->server().latency_probe(); }

const cluster::metadata_cache& metadata_cache() const {
return _conn->server().metadata_cache();
Expand Down
8 changes: 4 additions & 4 deletions src/v/kafka/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,11 @@ ss::future<security::tls::mtls_state> get_mtls_principal_state(
});
}

ss::future<> server::apply(net::server::resources rs) {
ss::future<> server::apply(ss::lw_shared_ptr<net::connection> conn) {
const bool authz_enabled
= config::shard_local_cfg().kafka_enable_authorization().value_or(
config::shard_local_cfg().enable_sasl());
const auto authn_method = get_authn_method(*rs.conn);
const auto authn_method = get_authn_method(*conn);

// Only initialise sasl state if sasl is enabled
auto sasl = authn_method == config::broker_authn_method::sasl
Expand All @@ -178,12 +178,12 @@ ss::future<> server::apply(net::server::resources rs) {
std::optional<security::tls::mtls_state> mtls_state;
if (authn_method == config::broker_authn_method::mtls_identity) {
mtls_state = co_await get_mtls_principal_state(
_mtls_principal_mapper, *rs.conn);
_mtls_principal_mapper, *conn);
}

auto ctx = ss::make_lw_shared<connection_context>(
*this,
std::move(rs),
conn,
std::move(sasl),
authz_enabled,
mtls_state,
Expand Down
6 changes: 3 additions & 3 deletions src/v/kafka/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class server final : public net::server {
std::string_view name() const final { return "kafka rpc protocol"; }
// the lifetime of all references here are guaranteed to live
// until the end of the server (container/parent)
ss::future<> apply(net::server::resources) final;
ss::future<> apply(ss::lw_shared_ptr<net::connection>) final;

ss::smp_service_group smp_group() const { return _smp_group; }
cluster::topics_frontend& topics_frontend() {
Expand Down Expand Up @@ -137,7 +137,7 @@ class server final : public net::server {
return _fetch_metadata_cache;
}

latency_probe& probe() { return _probe; }
latency_probe& latency_probe() { return _probe; }

private:
ss::smp_service_group _smp_group;
Expand All @@ -164,7 +164,7 @@ class server final : public net::server {
kafka::fetch_metadata_cache _fetch_metadata_cache;
security::tls::principal_mapper _mtls_principal_mapper;

latency_probe _probe;
class latency_probe _probe;
};

} // namespace kafka
9 changes: 4 additions & 5 deletions src/v/net/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,9 @@ static inline void print_exceptional_future(
}
}

ss::future<>
server::apply_proto(server::resources&& rs, conn_quota::units cq_units) {
auto conn = rs.conn;
return apply(std::move(rs))
ss::future<> server::apply_proto(
ss::lw_shared_ptr<net::connection> conn, conn_quota::units cq_units) {
return apply(conn)
.then_wrapped(
[this, conn, cq_units = std::move(cq_units)](ss::future<> f) {
print_exceptional_future(
Expand Down Expand Up @@ -234,7 +233,7 @@ server::accept_finish(ss::sstring name, ss::future<ss::accept_result> f_cs_sa) {
}
ssx::spawn_with_gate(
_conn_gate, [this, conn, cq_units = std::move(cq_units)]() mutable {
return apply_proto(resources(this, conn), std::move(cq_units));
return apply_proto(conn, std::move(cq_units));
});
co_return ss::stop_iteration::no;
}
Expand Down
34 changes: 10 additions & 24 deletions src/v/net/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,27 +97,6 @@ struct server_configuration {

class server {
public:
// always guaranteed non-null
class resources final {
public:
resources(server* s, ss::lw_shared_ptr<net::connection> c)
: conn(std::move(c))
, _s(s) {}

// NOLINTNEXTLINE
ss::lw_shared_ptr<net::connection> conn;

server_probe& probe() { return _s->_probe; }
ssx::semaphore& memory() { return _s->_memory; }
hdr_hist& hist() { return _s->_hist; }
ss::gate& conn_gate() { return _s->_conn_gate; }
ss::abort_source& abort_source() { return _s->_as; }
bool abort_requested() const { return _s->_as.abort_requested(); }

private:
server* _s;
};

explicit server(server_configuration);
explicit server(ss::sharded<server_configuration>* s);
server(server&&) noexcept = default;
Expand Down Expand Up @@ -148,7 +127,14 @@ class server {
const hdr_hist& histogram() const { return _hist; }

virtual std::string_view name() const = 0;
virtual ss::future<> apply(resources) = 0;
virtual ss::future<> apply(ss::lw_shared_ptr<net::connection>) = 0;

server_probe& probe() { return _probe; }
ssx::semaphore& memory() { return _memory; }
ss::gate& conn_gate() { return _conn_gate; }
hdr_hist& hist() { return _hist; }
ss::abort_source& abort_source() { return _as; }
bool abort_requested() const { return _as.abort_requested(); }

private:
struct listener {
Expand All @@ -160,11 +146,11 @@ class server {
, socket(std::move(socket)) {}
};

friend resources;
ss::future<> accept(listener&);
ss::future<ss::stop_iteration>
accept_finish(ss::sstring, ss::future<ss::accept_result>);
ss::future<> apply_proto(server::resources&&, conn_quota::units);
ss::future<>
apply_proto(ss::lw_shared_ptr<net::connection>, conn_quota::units);
void setup_metrics();
void setup_public_metrics();

Expand Down
Loading

0 comments on commit 4e3ef8b

Please sign in to comment.