diff --git a/src/v/kafka/server/connection_context.cc b/src/v/kafka/server/connection_context.cc index b22669ad33bd..aafd692c012b 100644 --- a/src/v/kafka/server/connection_context.cc +++ b/src/v/kafka/server/connection_context.cc @@ -41,7 +41,7 @@ using namespace std::chrono_literals; namespace kafka { ss::future<> connection_context::process_one_request() { - return parse_size(_rs.conn->input()) + return parse_size(conn->input()) .then([this](std::optional sz) mutable { if (!sz) { return ss::make_ready_future<>(); @@ -51,7 +51,7 @@ ss::future<> connection_context::process_one_request() { klog.warn, "request from {} is larger ({} bytes) than configured max " "request size {}", - _rs.conn->addr, + conn->addr, sz, _max_request_size()); return ss::make_exception_future<>( @@ -77,19 +77,19 @@ ss::future<> connection_context::process_one_request() { return handle_auth_v0(*sz).handle_exception( [this](std::exception_ptr e) { vlog(klog.info, "Detected error processing request: {}", e); - _rs.conn->shutdown_input(); + conn->shutdown_input(); }); } - return parse_header(_rs.conn->input()) + return parse_header(conn->input()) .then([this, s = sz.value()](std::optional h) mutable { - _rs.probe().add_bytes_received(s); + _server.probe().add_bytes_received(s); if (!h) { vlog( klog.debug, "could not parse header from client: {}", - _rs.conn->addr); - _rs.probe().header_corrupted(); + conn->addr); + _server.probe().header_corrupted(); return ss::make_ready_future<>(); } return dispatch_method_once(std::move(h.value()), s) @@ -98,9 +98,9 @@ ss::future<> connection_context::process_one_request() { vlog( klog.warn, "Error while processing request from {} - {}", - _rs.conn->addr, + conn->addr, e.what()); - _rs.conn->shutdown_input(); + conn->shutdown_input(); }) .handle_exception_type([this](const std::bad_alloc&) { // In general, dispatch_method_once does not throw, @@ -111,7 +111,7 @@ ss::future<> connection_context::process_one_request() { klog.error, "Request from {} failed on memory exhaustion " "(std::bad_alloc)", - _rs.conn->addr); + conn->addr); }); }); }); @@ -146,7 +146,7 @@ ss::future<> connection_context::handle_auth_v0(const size_t size) { const api_version version(0); iobuf request_buf; { - auto data = co_await read_iobuf_exactly(_rs.conn->input(), size); + auto data = co_await read_iobuf_exactly(conn->input(), size); sasl_authenticate_request request; request.data.auth_bytes = iobuf_to_bytes(data); response_writer writer(request_buf); @@ -188,11 +188,11 @@ ss::future<> connection_context::handle_auth_v0(const size_t size) { response_writer writer(data); writer.write(response.data.auth_bytes); auto msg = iobuf_as_scattered(std::move(data)); - co_await _rs.conn->write(std::move(msg)); + co_await conn->write(std::move(msg)); } bool connection_context::is_finished_parsing() const { - return _rs.conn->input().eof() || _rs.abort_requested(); + return conn->input().eof() || _server.abort_requested(); } ss::future connection_context::throttle_request( @@ -210,10 +210,10 @@ ss::future connection_context::throttle_request( // affect. auto delay = _server.quota_mgr().record_tp_and_throttle( hdr.client_id, request_size); - auto tracker = std::make_unique(_rs.probe()); + auto tracker = std::make_unique(_server.probe()); auto fut = ss::now(); if (!delay.first_violation) { - fut = ss::sleep_abortable(delay.duration, _rs.abort_source()); + fut = ss::sleep_abortable(delay.duration, _server.abort_source()); } auto track = track_latency(hdr.key); return fut @@ -236,7 +236,7 @@ ss::future connection_context::throttle_request( .tracker = std::move(tracker), }; if (track) { - r.method_latency = _rs.hist().auto_measure(); + r.method_latency = _server.hist().auto_measure(); } return r; }); @@ -260,9 +260,9 @@ connection_context::reserve_request_units(api_key key, size_t size) { mem_estimate, handler ? (*handler)->name() : "")); } - auto fut = ss::get_units(_rs.memory(), mem_estimate); - if (_rs.memory().waiters()) { - _rs.probe().waiting_for_available_memory(); + auto fut = ss::get_units(_server.memory(), mem_estimate); + if (_server.memory().waiters()) { + _server.probe().waiting_for_available_memory(); } return fut; } @@ -272,7 +272,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { return throttle_request(hdr, size).then([this, hdr = std::move(hdr), size]( session_resources sres_in) mutable { - if (_rs.abort_requested()) { + if (_server.abort_requested()) { // protect against shutdown behavior return ss::make_ready_future<>(); } @@ -281,10 +281,10 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { auto remaining = size - request_header_size - hdr.client_id_buffer.size() - hdr.tags_size_bytes; - return read_iobuf_exactly(_rs.conn->input(), remaining) + return read_iobuf_exactly(conn->input(), remaining) .then([this, hdr = std::move(hdr), sres = std::move(sres)]( iobuf buf) mutable { - if (_rs.abort_requested()) { + if (_server.abort_requested()) { // _server._cntrl etc might not be alive return ss::now(); } @@ -343,8 +343,8 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { e); }) .finally([self, d = std::move(d)]() mutable { - self->_rs.probe().service_error(); - self->_rs.probe().request_completed(); + self->_server.probe().service_error(); + self->_server.probe().request_completed(); return std::move(d); }); } @@ -353,7 +353,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { */ ssx::background = ssx::spawn_with_gate_then( - _rs.conn_gate(), + _server.conn_gate(), [this, f = std::move(f), sres = std::move(sres), @@ -387,7 +387,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { vlog( klog.info, "Disconnected {} ({})", - self->_rs.conn->addr, + self->conn->addr, disconnected.value()); } else { vlog( @@ -396,15 +396,15 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { e); } - self->_rs.probe().service_error(); - self->_rs.conn->shutdown_input(); + self->_server.probe().service_error(); + self->conn->shutdown_input(); }); return d; }) .handle_exception([self](std::exception_ptr e) { vlog( klog.info, "Detected error dispatching request: {}", e); - self->_rs.conn->shutdown_input(); + self->conn->shutdown_input(); }); }); }); @@ -444,7 +444,7 @@ ss::future<> connection_context::maybe_process_responses() { auto msg = response_as_scattered(std::move(resp_and_res.response)); try { - return _rs.conn->write(std::move(msg)) + return conn->write(std::move(msg)) .then([] { return ss::make_ready_future( ss::stop_iteration::no); diff --git a/src/v/kafka/server/connection_context.h b/src/v/kafka/server/connection_context.h index 5b532c732655..5b5ac945dd95 100644 --- a/src/v/kafka/server/connection_context.h +++ b/src/v/kafka/server/connection_context.h @@ -86,16 +86,16 @@ class connection_context final public: connection_context( server& s, - net::server::resources&& r, + ss::lw_shared_ptr conn, std::optional sasl, bool enable_authorizer, std::optional mtls_state, config::binding max_request_size) noexcept : _server(s) - , _rs(std::move(r)) + , conn(conn) , _sasl(std::move(sasl)) // tests may build a context without a live connection - , _client_addr(_rs.conn ? _rs.conn->addr.addr() : ss::net::inet_address{}) + , _client_addr(conn ? conn->addr.addr() : ss::net::inet_address{}) , _enable_authorizer(enable_authorizer) , _authlog(_client_addr, client_port()) , _mtls_state(std::move(mtls_state)) @@ -108,7 +108,7 @@ class connection_context final connection_context& operator=(connection_context&&) = delete; server& server() { return _server; } - const ss::sstring& listener() const { return _rs.conn->name(); } + const ss::sstring& listener() const { return conn->name(); } std::optional& sasl() { return _sasl; } template @@ -190,9 +190,7 @@ class connection_context final ss::future<> process_one_request(); bool is_finished_parsing() const; ss::net::inet_address client_host() const { return _client_addr; } - uint16_t client_port() const { - return _rs.conn ? _rs.conn->addr.port() : 0; - } + uint16_t client_port() const { return conn ? conn->addr.port() : 0; } private: // Reserve units from memory from the memory semaphore in proportion @@ -293,7 +291,7 @@ class connection_context final }; class server& _server; - net::server::resources _rs; + ss::lw_shared_ptr conn; sequence_id _next_response; sequence_id _seq_idx; map_t _responses; diff --git a/src/v/kafka/server/request_context.h b/src/v/kafka/server/request_context.h index 80d0a6991b73..7267ef15adc8 100644 --- a/src/v/kafka/server/request_context.h +++ b/src/v/kafka/server/request_context.h @@ -86,7 +86,7 @@ class request_context { request_reader& reader() { return _reader; } - latency_probe& probe() { return _conn->server().probe(); } + latency_probe& probe() { return _conn->server().latency_probe(); } const cluster::metadata_cache& metadata_cache() const { return _conn->server().metadata_cache(); diff --git a/src/v/kafka/server/server.cc b/src/v/kafka/server/server.cc index d4d837342136..4c09c3317c36 100644 --- a/src/v/kafka/server/server.cc +++ b/src/v/kafka/server/server.cc @@ -162,11 +162,11 @@ ss::future get_mtls_principal_state( }); } -ss::future<> server::apply(net::server::resources rs) { +ss::future<> server::apply(ss::lw_shared_ptr conn) { const bool authz_enabled = config::shard_local_cfg().kafka_enable_authorization().value_or( config::shard_local_cfg().enable_sasl()); - const auto authn_method = get_authn_method(*rs.conn); + const auto authn_method = get_authn_method(*conn); // Only initialise sasl state if sasl is enabled auto sasl = authn_method == config::broker_authn_method::sasl @@ -178,12 +178,12 @@ ss::future<> server::apply(net::server::resources rs) { std::optional mtls_state; if (authn_method == config::broker_authn_method::mtls_identity) { mtls_state = co_await get_mtls_principal_state( - _mtls_principal_mapper, *rs.conn); + _mtls_principal_mapper, *conn); } auto ctx = ss::make_lw_shared( *this, - std::move(rs), + conn, std::move(sasl), authz_enabled, mtls_state, diff --git a/src/v/kafka/server/server.h b/src/v/kafka/server/server.h index 2af18a36a1f0..f61835f34b48 100644 --- a/src/v/kafka/server/server.h +++ b/src/v/kafka/server/server.h @@ -65,7 +65,7 @@ class server final : public net::server { std::string_view name() const final { return "kafka rpc protocol"; } // the lifetime of all references here are guaranteed to live // until the end of the server (container/parent) - ss::future<> apply(net::server::resources) final; + ss::future<> apply(ss::lw_shared_ptr) final; ss::smp_service_group smp_group() const { return _smp_group; } cluster::topics_frontend& topics_frontend() { @@ -137,7 +137,7 @@ class server final : public net::server { return _fetch_metadata_cache; } - latency_probe& probe() { return _probe; } + latency_probe& latency_probe() { return _probe; } private: ss::smp_service_group _smp_group; @@ -164,7 +164,7 @@ class server final : public net::server { kafka::fetch_metadata_cache _fetch_metadata_cache; security::tls::principal_mapper _mtls_principal_mapper; - latency_probe _probe; + class latency_probe _probe; }; } // namespace kafka diff --git a/src/v/net/server.cc b/src/v/net/server.cc index 367338071ff5..fbd84046a69e 100644 --- a/src/v/net/server.cc +++ b/src/v/net/server.cc @@ -135,10 +135,9 @@ static inline void print_exceptional_future( } } -ss::future<> -server::apply_proto(server::resources&& rs, conn_quota::units cq_units) { - auto conn = rs.conn; - return apply(std::move(rs)) +ss::future<> server::apply_proto( + ss::lw_shared_ptr conn, conn_quota::units cq_units) { + return apply(conn) .then_wrapped( [this, conn, cq_units = std::move(cq_units)](ss::future<> f) { print_exceptional_future( @@ -234,7 +233,7 @@ server::accept_finish(ss::sstring name, ss::future f_cs_sa) { } ssx::spawn_with_gate( _conn_gate, [this, conn, cq_units = std::move(cq_units)]() mutable { - return apply_proto(resources(this, conn), std::move(cq_units)); + return apply_proto(conn, std::move(cq_units)); }); co_return ss::stop_iteration::no; } diff --git a/src/v/net/server.h b/src/v/net/server.h index b1c37e983212..7bc02e5ac1df 100644 --- a/src/v/net/server.h +++ b/src/v/net/server.h @@ -97,27 +97,6 @@ struct server_configuration { class server { public: - // always guaranteed non-null - class resources final { - public: - resources(server* s, ss::lw_shared_ptr c) - : conn(std::move(c)) - , _s(s) {} - - // NOLINTNEXTLINE - ss::lw_shared_ptr conn; - - server_probe& probe() { return _s->_probe; } - ssx::semaphore& memory() { return _s->_memory; } - hdr_hist& hist() { return _s->_hist; } - ss::gate& conn_gate() { return _s->_conn_gate; } - ss::abort_source& abort_source() { return _s->_as; } - bool abort_requested() const { return _s->_as.abort_requested(); } - - private: - server* _s; - }; - explicit server(server_configuration); explicit server(ss::sharded* s); server(server&&) noexcept = default; @@ -148,7 +127,14 @@ class server { const hdr_hist& histogram() const { return _hist; } virtual std::string_view name() const = 0; - virtual ss::future<> apply(resources) = 0; + virtual ss::future<> apply(ss::lw_shared_ptr) = 0; + + server_probe& probe() { return _probe; } + ssx::semaphore& memory() { return _memory; } + ss::gate& conn_gate() { return _conn_gate; } + hdr_hist& hist() { return _hist; } + ss::abort_source& abort_source() { return _as; } + bool abort_requested() const { return _as.abort_requested(); } private: struct listener { @@ -160,11 +146,11 @@ class server { , socket(std::move(socket)) {} }; - friend resources; ss::future<> accept(listener&); ss::future accept_finish(ss::sstring, ss::future); - ss::future<> apply_proto(server::resources&&, conn_quota::units); + ss::future<> + apply_proto(ss::lw_shared_ptr, conn_quota::units); void setup_metrics(); void setup_public_metrics(); diff --git a/src/v/redpanda/tests/fixture.h b/src/v/redpanda/tests/fixture.h index 32ce1529d28d..fc61c1edc839 100644 --- a/src/v/redpanda/tests/fixture.h +++ b/src/v/redpanda/tests/fixture.h @@ -568,7 +568,7 @@ class redpanda_thread_fixture { security::sasl_server sasl(security::sasl_server::sasl_state::complete); auto conn = ss::make_lw_shared( *proto, - net::server::resources(nullptr, nullptr), + nullptr, std::move(sasl), false, std::nullopt, diff --git a/src/v/rpc/rpc_server.cc b/src/v/rpc/rpc_server.cc index 30cd5a818634..5b90dc0c0933 100644 --- a/src/v/rpc/rpc_server.cc +++ b/src/v/rpc/rpc_server.cc @@ -27,90 +27,93 @@ namespace rpc { static constexpr size_t reply_min_compression_bytes = 1024; struct server_context_impl final : streaming_context { - server_context_impl(net::server::resources s, header h) - : res(std::move(s)) + server_context_impl( + net::server& server, ss::lw_shared_ptr conn, header h) + : server(server) + , conn(conn) , hdr(h) { - res.probe().request_received(); + server.probe().request_received(); } ss::future reserve_memory(size_t ask) final { - auto fut = get_units(res.memory(), ask); - if (res.memory().waiters()) { - res.probe().waiting_for_available_memory(); + auto fut = get_units(server.memory(), ask); + if (server.memory().waiters()) { + server.probe().waiting_for_available_memory(); } return fut; } - ~server_context_impl() override { res.probe().request_completed(); } + ~server_context_impl() override { server.probe().request_completed(); } const header& get_header() const final { return hdr; } void signal_body_parse() final { pr.set_value(); } void body_parse_exception(std::exception_ptr e) final { pr.set_exception(std::move(e)); } - net::server::resources res; + net::server& server; + ss::lw_shared_ptr conn; header hdr; ss::promise<> pr; }; -ss::future<> rpc_server::apply(net::server::resources rs) { +ss::future<> rpc_server::apply(ss::lw_shared_ptr conn) { return ss::do_until( - [rs] { return rs.conn->input().eof() || rs.abort_requested(); }, - [this, rs]() mutable { - return parse_header(rs.conn->input()) - .then([this, rs](std::optional
h) mutable { + [this, conn] { return conn->input().eof() || abort_requested(); }, + [this, conn]() mutable { + return parse_header(conn->input()) + .then([this, conn](std::optional
h) mutable { if (!h) { rpclog.debug( - "could not parse header from client: {}", rs.conn->addr); - rs.probe().header_corrupted(); + "could not parse header from client: {}", conn->addr); + probe().header_corrupted(); // Have to shutdown the connection as data in receiving // buffer may be corrupted - rs.conn->shutdown_input(); + conn->shutdown_input(); return ss::now(); } - return dispatch_method_once(h.value(), rs); + return dispatch_method_once(h.value(), conn); }); }); } ss::future<> -send_reply(ss::lw_shared_ptr ctx, netbuf buf) { +rpc_server::send_reply(ss::lw_shared_ptr ctx, netbuf buf) { buf.set_min_compression_bytes(reply_min_compression_bytes); buf.set_compression(rpc::compression_type::zstd); buf.set_correlation_id(ctx->get_header().correlation_id); auto view = co_await std::move(buf).as_scattered(); - if (ctx->res.conn_gate().is_closed()) { + if (conn_gate().is_closed()) { // do not write if gate is closed rpclog.debug( "Skipping write of {} bytes, connection is closed", view.size()); co_return; } - co_await ctx->res.conn->write(std::move(view)) + co_await ctx->conn->write(std::move(view)) .handle_exception([ctx = std::move(ctx)](std::exception_ptr e) { auto disconnect = net::is_disconnect_exception(e); if (disconnect) { vlog( rpclog.info, "Disconnected {} ({})", - ctx->res.conn->addr, + ctx->conn->addr, disconnect.value()); } else { vlog(rpclog.warn, "Error dispatching method: {}", e); } - ctx->res.conn->shutdown_input(); + ctx->conn->shutdown_input(); }); } -ss::future<> send_reply_skip_payload( +ss::future<> rpc_server::send_reply_skip_payload( ss::lw_shared_ptr ctx, netbuf buf) { - co_await ctx->res.conn->input().skip(ctx->get_header().payload_size); + co_await ctx->conn->input().skip(ctx->get_header().payload_size); co_await send_reply(std::move(ctx), std::move(buf)); } -ss::future<> -rpc_server::dispatch_method_once(header h, net::server::resources rs) { +ss::future<> rpc_server::dispatch_method_once( + header h, ss::lw_shared_ptr conn) { const auto method_id = h.meta; - auto ctx = ss::make_lw_shared(rs, h); - rs.probe().add_bytes_received(size_of_rpc_header + h.payload_size); - if (rs.conn_gate().is_closed()) { + auto ctx = ss::make_lw_shared(*this, conn, h); + probe().add_bytes_received(size_of_rpc_header + h.payload_size); + if (conn_gate().is_closed()) { return ss::make_exception_future<>(ss::gate_closed_exception()); } @@ -119,8 +122,8 @@ rpc_server::dispatch_method_once(header h, net::server::resources rs) { // background! ssx::background = ssx::spawn_with_gate_then( - rs.conn_gate(), - [this, method_id, rs, ctx]() mutable { + conn_gate(), + [this, method_id, ctx]() mutable { if (unlikely( ctx->get_header().version > transport_version::max_supported)) { @@ -130,7 +133,7 @@ rpc_server::dispatch_method_once(header h, net::server::resources rs) { "> {} from {}", ctx->get_header().version, transport_version::max_supported, - ctx->res.conn->addr); + ctx->conn->addr); netbuf reply_buf; reply_buf.set_status(rpc::status::version_not_supported); /* @@ -153,8 +156,8 @@ rpc_server::dispatch_method_once(header h, net::server::resources rs) { rpclog.debug, "Received a request for an unknown method {} from {}", method_id, - ctx->res.conn->addr); - rs.probe().method_not_found(); + ctx->conn->addr); + probe().method_not_found(); netbuf reply_buf; reply_buf.set_version(ctx->get_header().version); reply_buf.set_status(rpc::status::method_not_found); @@ -164,8 +167,8 @@ rpc_server::dispatch_method_once(header h, net::server::resources rs) { method* m = it->get()->method_from_id(method_id); - return m->handle(ctx->res.conn->input(), *ctx) - .then_wrapped([ctx, m, l = ctx->res.hist().auto_measure(), rs]( + return m->handle(ctx->conn->input(), *ctx) + .then_wrapped([this, ctx, m, l = hist().auto_measure()]( ss::future fut) mutable { bool error = true; netbuf reply_buf; @@ -211,7 +214,7 @@ rpc_server::dispatch_method_once(header h, net::server::resources rs) { rpclog.error( "Service handler threw an exception: {}", std::current_exception()); - rs.probe().service_error(); + probe().service_error(); reply_buf.set_status(rpc::status::server_error); } if (error) { diff --git a/src/v/rpc/rpc_server.h b/src/v/rpc/rpc_server.h index 30084d1f5d95..e8c552b37fec 100644 --- a/src/v/rpc/rpc_server.h +++ b/src/v/rpc/rpc_server.h @@ -16,6 +16,7 @@ namespace rpc { struct service; +struct server_context_impl; // Wrapper around net::server that serves the internal RPC protocol. It allows // new services to be registered while the server is running. @@ -50,7 +51,7 @@ class rpc_server : public net::server { return "vectorized internal rpc protocol"; } - ss::future<> apply(net::server::resources rs) final; + ss::future<> apply(ss::lw_shared_ptr) final; template T, typename... Args> void register_service(Args&&... args) { @@ -58,7 +59,11 @@ class rpc_server : public net::server { } private: - ss::future<> dispatch_method_once(header, net::server::resources); + ss::future<> + dispatch_method_once(header, ss::lw_shared_ptr); + ss::future<> send_reply(ss::lw_shared_ptr, netbuf); + ss::future<> + send_reply_skip_payload(ss::lw_shared_ptr, netbuf); std::vector> _services; }; diff --git a/src/v/rpc/test/rpc_gen_cycling_test.cc b/src/v/rpc/test/rpc_gen_cycling_test.cc index 42e849a62417..9d04e8346a8d 100644 --- a/src/v/rpc/test/rpc_gen_cycling_test.cc +++ b/src/v/rpc/test/rpc_gen_cycling_test.cc @@ -737,10 +737,10 @@ class erroneous_protocol_server final : public net::server { std::string_view name() const final { return "redpanda erraneous proto"; }; - ss::future<> apply(net::server::resources rs) final { + ss::future<> apply(ss::lw_shared_ptr conn) final { return ss::do_until( - [rs] { return rs.conn->input().eof() || rs.abort_requested(); }, - [rs]() mutable { + [this, conn] { return conn->input().eof() || abort_requested(); }, + [] { return ss::make_exception_future<>( erroneous_protocol_exception()); });