From 306f8369049a1a192906a63ec808a2d243571619 Mon Sep 17 00:00:00 2001 From: Noah Watkins Date: Tue, 20 Dec 2022 15:16:47 -0800 Subject: [PATCH 1/7] net: remove server gate from resources ctx Signed-off-by: Noah Watkins --- src/v/kafka/server/connection_context.cc | 2 +- src/v/net/server.h | 3 ++- src/v/rpc/rpc_server.cc | 12 ++++++------ src/v/rpc/rpc_server.h | 4 ++++ 4 files changed, 13 insertions(+), 8 deletions(-) diff --git a/src/v/kafka/server/connection_context.cc b/src/v/kafka/server/connection_context.cc index b22669ad33bd..ab2e6b79bf55 100644 --- a/src/v/kafka/server/connection_context.cc +++ b/src/v/kafka/server/connection_context.cc @@ -353,7 +353,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { */ ssx::background = ssx::spawn_with_gate_then( - _rs.conn_gate(), + _server.conn_gate(), [this, f = std::move(f), sres = std::move(sres), diff --git a/src/v/net/server.h b/src/v/net/server.h index b1c37e983212..abb7567d5c49 100644 --- a/src/v/net/server.h +++ b/src/v/net/server.h @@ -110,7 +110,6 @@ class server { server_probe& probe() { return _s->_probe; } ssx::semaphore& memory() { return _s->_memory; } hdr_hist& hist() { return _s->_hist; } - ss::gate& conn_gate() { return _s->_conn_gate; } ss::abort_source& abort_source() { return _s->_as; } bool abort_requested() const { return _s->_as.abort_requested(); } @@ -150,6 +149,8 @@ class server { virtual std::string_view name() const = 0; virtual ss::future<> apply(resources) = 0; + ss::gate& conn_gate() { return _conn_gate; } + private: struct listener { ss::sstring name; diff --git a/src/v/rpc/rpc_server.cc b/src/v/rpc/rpc_server.cc index 30cd5a818634..24f5870da35d 100644 --- a/src/v/rpc/rpc_server.cc +++ b/src/v/rpc/rpc_server.cc @@ -71,13 +71,13 @@ ss::future<> rpc_server::apply(net::server::resources rs) { } ss::future<> -send_reply(ss::lw_shared_ptr ctx, netbuf buf) { +rpc_server::send_reply(ss::lw_shared_ptr ctx, netbuf buf) { buf.set_min_compression_bytes(reply_min_compression_bytes); buf.set_compression(rpc::compression_type::zstd); buf.set_correlation_id(ctx->get_header().correlation_id); auto view = co_await std::move(buf).as_scattered(); - if (ctx->res.conn_gate().is_closed()) { + if (conn_gate().is_closed()) { // do not write if gate is closed rpclog.debug( "Skipping write of {} bytes, connection is closed", view.size()); @@ -99,7 +99,7 @@ send_reply(ss::lw_shared_ptr ctx, netbuf buf) { }); } -ss::future<> send_reply_skip_payload( +ss::future<> rpc_server::send_reply_skip_payload( ss::lw_shared_ptr ctx, netbuf buf) { co_await ctx->res.conn->input().skip(ctx->get_header().payload_size); co_await send_reply(std::move(ctx), std::move(buf)); @@ -110,7 +110,7 @@ rpc_server::dispatch_method_once(header h, net::server::resources rs) { const auto method_id = h.meta; auto ctx = ss::make_lw_shared(rs, h); rs.probe().add_bytes_received(size_of_rpc_header + h.payload_size); - if (rs.conn_gate().is_closed()) { + if (conn_gate().is_closed()) { return ss::make_exception_future<>(ss::gate_closed_exception()); } @@ -119,7 +119,7 @@ rpc_server::dispatch_method_once(header h, net::server::resources rs) { // background! ssx::background = ssx::spawn_with_gate_then( - rs.conn_gate(), + conn_gate(), [this, method_id, rs, ctx]() mutable { if (unlikely( ctx->get_header().version @@ -165,7 +165,7 @@ rpc_server::dispatch_method_once(header h, net::server::resources rs) { method* m = it->get()->method_from_id(method_id); return m->handle(ctx->res.conn->input(), *ctx) - .then_wrapped([ctx, m, l = ctx->res.hist().auto_measure(), rs]( + .then_wrapped([this, ctx, m, l = ctx->res.hist().auto_measure(), rs]( ss::future fut) mutable { bool error = true; netbuf reply_buf; diff --git a/src/v/rpc/rpc_server.h b/src/v/rpc/rpc_server.h index 30084d1f5d95..833e5bc91638 100644 --- a/src/v/rpc/rpc_server.h +++ b/src/v/rpc/rpc_server.h @@ -16,6 +16,7 @@ namespace rpc { struct service; +struct server_context_impl; // Wrapper around net::server that serves the internal RPC protocol. It allows // new services to be registered while the server is running. @@ -59,6 +60,9 @@ class rpc_server : public net::server { private: ss::future<> dispatch_method_once(header, net::server::resources); + ss::future<> send_reply(ss::lw_shared_ptr, netbuf); + ss::future<> + send_reply_skip_payload(ss::lw_shared_ptr, netbuf); std::vector> _services; }; From 0363d0bb8de60ca4a244c1f72c85a7e00d000668 Mon Sep 17 00:00:00 2001 From: Noah Watkins Date: Tue, 20 Dec 2022 15:25:45 -0800 Subject: [PATCH 2/7] net: remove server histogram from resources ctx Signed-off-by: Noah Watkins --- src/v/kafka/server/connection_context.cc | 2 +- src/v/net/server.h | 2 +- src/v/rpc/rpc_server.cc | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/v/kafka/server/connection_context.cc b/src/v/kafka/server/connection_context.cc index ab2e6b79bf55..2d0d3b2810a9 100644 --- a/src/v/kafka/server/connection_context.cc +++ b/src/v/kafka/server/connection_context.cc @@ -236,7 +236,7 @@ ss::future connection_context::throttle_request( .tracker = std::move(tracker), }; if (track) { - r.method_latency = _rs.hist().auto_measure(); + r.method_latency = _server.hist().auto_measure(); } return r; }); diff --git a/src/v/net/server.h b/src/v/net/server.h index abb7567d5c49..cbd1dfb39d5a 100644 --- a/src/v/net/server.h +++ b/src/v/net/server.h @@ -109,7 +109,6 @@ class server { server_probe& probe() { return _s->_probe; } ssx::semaphore& memory() { return _s->_memory; } - hdr_hist& hist() { return _s->_hist; } ss::abort_source& abort_source() { return _s->_as; } bool abort_requested() const { return _s->_as.abort_requested(); } @@ -150,6 +149,7 @@ class server { virtual ss::future<> apply(resources) = 0; ss::gate& conn_gate() { return _conn_gate; } + hdr_hist& hist() { return _hist; } private: struct listener { diff --git a/src/v/rpc/rpc_server.cc b/src/v/rpc/rpc_server.cc index 24f5870da35d..894d55287e2f 100644 --- a/src/v/rpc/rpc_server.cc +++ b/src/v/rpc/rpc_server.cc @@ -165,7 +165,7 @@ rpc_server::dispatch_method_once(header h, net::server::resources rs) { method* m = it->get()->method_from_id(method_id); return m->handle(ctx->res.conn->input(), *ctx) - .then_wrapped([this, ctx, m, l = ctx->res.hist().auto_measure(), rs]( + .then_wrapped([this, ctx, m, l = hist().auto_measure(), rs]( ss::future fut) mutable { bool error = true; netbuf reply_buf; From 0f656d9c9c5a3b94d0e15e18a43aafcdba6c4462 Mon Sep 17 00:00:00 2001 From: Noah Watkins Date: Tue, 20 Dec 2022 15:35:12 -0800 Subject: [PATCH 3/7] net: remove server abort source from resources ctx Signed-off-by: Noah Watkins --- src/v/kafka/server/connection_context.cc | 8 ++++---- src/v/net/server.h | 4 ++-- src/v/rpc/rpc_server.cc | 2 +- src/v/rpc/test/rpc_gen_cycling_test.cc | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/v/kafka/server/connection_context.cc b/src/v/kafka/server/connection_context.cc index 2d0d3b2810a9..9311ad87ace9 100644 --- a/src/v/kafka/server/connection_context.cc +++ b/src/v/kafka/server/connection_context.cc @@ -192,7 +192,7 @@ ss::future<> connection_context::handle_auth_v0(const size_t size) { } bool connection_context::is_finished_parsing() const { - return _rs.conn->input().eof() || _rs.abort_requested(); + return _rs.conn->input().eof() || _server.abort_requested(); } ss::future connection_context::throttle_request( @@ -213,7 +213,7 @@ ss::future connection_context::throttle_request( auto tracker = std::make_unique(_rs.probe()); auto fut = ss::now(); if (!delay.first_violation) { - fut = ss::sleep_abortable(delay.duration, _rs.abort_source()); + fut = ss::sleep_abortable(delay.duration, _server.abort_source()); } auto track = track_latency(hdr.key); return fut @@ -272,7 +272,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { return throttle_request(hdr, size).then([this, hdr = std::move(hdr), size]( session_resources sres_in) mutable { - if (_rs.abort_requested()) { + if (_server.abort_requested()) { // protect against shutdown behavior return ss::make_ready_future<>(); } @@ -284,7 +284,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { return read_iobuf_exactly(_rs.conn->input(), remaining) .then([this, hdr = std::move(hdr), sres = std::move(sres)]( iobuf buf) mutable { - if (_rs.abort_requested()) { + if (_server.abort_requested()) { // _server._cntrl etc might not be alive return ss::now(); } diff --git a/src/v/net/server.h b/src/v/net/server.h index cbd1dfb39d5a..0cbea7aeee1d 100644 --- a/src/v/net/server.h +++ b/src/v/net/server.h @@ -109,8 +109,6 @@ class server { server_probe& probe() { return _s->_probe; } ssx::semaphore& memory() { return _s->_memory; } - ss::abort_source& abort_source() { return _s->_as; } - bool abort_requested() const { return _s->_as.abort_requested(); } private: server* _s; @@ -150,6 +148,8 @@ class server { ss::gate& conn_gate() { return _conn_gate; } hdr_hist& hist() { return _hist; } + ss::abort_source& abort_source() { return _as; } + bool abort_requested() const { return _as.abort_requested(); } private: struct listener { diff --git a/src/v/rpc/rpc_server.cc b/src/v/rpc/rpc_server.cc index 894d55287e2f..cc444b9df63c 100644 --- a/src/v/rpc/rpc_server.cc +++ b/src/v/rpc/rpc_server.cc @@ -52,7 +52,7 @@ struct server_context_impl final : streaming_context { ss::future<> rpc_server::apply(net::server::resources rs) { return ss::do_until( - [rs] { return rs.conn->input().eof() || rs.abort_requested(); }, + [this, rs] { return rs.conn->input().eof() || abort_requested(); }, [this, rs]() mutable { return parse_header(rs.conn->input()) .then([this, rs](std::optional
h) mutable { diff --git a/src/v/rpc/test/rpc_gen_cycling_test.cc b/src/v/rpc/test/rpc_gen_cycling_test.cc index 42e849a62417..5556a4d01f3a 100644 --- a/src/v/rpc/test/rpc_gen_cycling_test.cc +++ b/src/v/rpc/test/rpc_gen_cycling_test.cc @@ -739,7 +739,7 @@ class erroneous_protocol_server final : public net::server { ss::future<> apply(net::server::resources rs) final { return ss::do_until( - [rs] { return rs.conn->input().eof() || rs.abort_requested(); }, + [this, rs] { return rs.conn->input().eof() || abort_requested(); }, [rs]() mutable { return ss::make_exception_future<>( erroneous_protocol_exception()); From 916b4cb6fd1579934f1b6b941611fa4c11bae9da Mon Sep 17 00:00:00 2001 From: Noah Watkins Date: Tue, 20 Dec 2022 15:44:24 -0800 Subject: [PATCH 4/7] net: remove server semaphore from resources ctx Signed-off-by: Noah Watkins --- src/v/kafka/server/connection_context.cc | 4 ++-- src/v/net/server.h | 2 +- src/v/rpc/rpc_server.cc | 12 +++++++----- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/v/kafka/server/connection_context.cc b/src/v/kafka/server/connection_context.cc index 9311ad87ace9..b02c47627c5d 100644 --- a/src/v/kafka/server/connection_context.cc +++ b/src/v/kafka/server/connection_context.cc @@ -260,8 +260,8 @@ connection_context::reserve_request_units(api_key key, size_t size) { mem_estimate, handler ? (*handler)->name() : "")); } - auto fut = ss::get_units(_rs.memory(), mem_estimate); - if (_rs.memory().waiters()) { + auto fut = ss::get_units(_server.memory(), mem_estimate); + if (_server.memory().waiters()) { _rs.probe().waiting_for_available_memory(); } return fut; diff --git a/src/v/net/server.h b/src/v/net/server.h index 0cbea7aeee1d..d47a41b8aece 100644 --- a/src/v/net/server.h +++ b/src/v/net/server.h @@ -108,7 +108,6 @@ class server { ss::lw_shared_ptr conn; server_probe& probe() { return _s->_probe; } - ssx::semaphore& memory() { return _s->_memory; } private: server* _s; @@ -146,6 +145,7 @@ class server { virtual std::string_view name() const = 0; virtual ss::future<> apply(resources) = 0; + ssx::semaphore& memory() { return _memory; } ss::gate& conn_gate() { return _conn_gate; } hdr_hist& hist() { return _hist; } ss::abort_source& abort_source() { return _as; } diff --git a/src/v/rpc/rpc_server.cc b/src/v/rpc/rpc_server.cc index cc444b9df63c..75ff4b4f0675 100644 --- a/src/v/rpc/rpc_server.cc +++ b/src/v/rpc/rpc_server.cc @@ -27,14 +27,15 @@ namespace rpc { static constexpr size_t reply_min_compression_bytes = 1024; struct server_context_impl final : streaming_context { - server_context_impl(net::server::resources s, header h) - : res(std::move(s)) + server_context_impl(net::server& server, net::server::resources s, header h) + : server(server) + , res(std::move(s)) , hdr(h) { res.probe().request_received(); } ss::future reserve_memory(size_t ask) final { - auto fut = get_units(res.memory(), ask); - if (res.memory().waiters()) { + auto fut = get_units(server.memory(), ask); + if (server.memory().waiters()) { res.probe().waiting_for_available_memory(); } return fut; @@ -45,6 +46,7 @@ struct server_context_impl final : streaming_context { void body_parse_exception(std::exception_ptr e) final { pr.set_exception(std::move(e)); } + net::server& server; net::server::resources res; header hdr; ss::promise<> pr; @@ -108,7 +110,7 @@ ss::future<> rpc_server::send_reply_skip_payload( ss::future<> rpc_server::dispatch_method_once(header h, net::server::resources rs) { const auto method_id = h.meta; - auto ctx = ss::make_lw_shared(rs, h); + auto ctx = ss::make_lw_shared(*this, rs, h); rs.probe().add_bytes_received(size_of_rpc_header + h.payload_size); if (conn_gate().is_closed()) { return ss::make_exception_future<>(ss::gate_closed_exception()); From c09d8836d85ee0822390dc08a821c87e2e84c3a3 Mon Sep 17 00:00:00 2001 From: Noah Watkins Date: Tue, 20 Dec 2022 16:05:43 -0800 Subject: [PATCH 5/7] net: remove server probe from resources ctx Signed-off-by: Noah Watkins --- src/v/kafka/server/connection_context.cc | 14 +++++++------- src/v/kafka/server/request_context.h | 2 +- src/v/kafka/server/server.h | 4 ++-- src/v/net/server.h | 11 +++-------- src/v/rpc/rpc_server.cc | 14 +++++++------- 5 files changed, 20 insertions(+), 25 deletions(-) diff --git a/src/v/kafka/server/connection_context.cc b/src/v/kafka/server/connection_context.cc index b02c47627c5d..7d12a99e5aa1 100644 --- a/src/v/kafka/server/connection_context.cc +++ b/src/v/kafka/server/connection_context.cc @@ -83,13 +83,13 @@ ss::future<> connection_context::process_one_request() { return parse_header(_rs.conn->input()) .then([this, s = sz.value()](std::optional h) mutable { - _rs.probe().add_bytes_received(s); + _server.probe().add_bytes_received(s); if (!h) { vlog( klog.debug, "could not parse header from client: {}", _rs.conn->addr); - _rs.probe().header_corrupted(); + _server.probe().header_corrupted(); return ss::make_ready_future<>(); } return dispatch_method_once(std::move(h.value()), s) @@ -210,7 +210,7 @@ ss::future connection_context::throttle_request( // affect. auto delay = _server.quota_mgr().record_tp_and_throttle( hdr.client_id, request_size); - auto tracker = std::make_unique(_rs.probe()); + auto tracker = std::make_unique(_server.probe()); auto fut = ss::now(); if (!delay.first_violation) { fut = ss::sleep_abortable(delay.duration, _server.abort_source()); @@ -262,7 +262,7 @@ connection_context::reserve_request_units(api_key key, size_t size) { } auto fut = ss::get_units(_server.memory(), mem_estimate); if (_server.memory().waiters()) { - _rs.probe().waiting_for_available_memory(); + _server.probe().waiting_for_available_memory(); } return fut; } @@ -343,8 +343,8 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { e); }) .finally([self, d = std::move(d)]() mutable { - self->_rs.probe().service_error(); - self->_rs.probe().request_completed(); + self->_server.probe().service_error(); + self->_server.probe().request_completed(); return std::move(d); }); } @@ -396,7 +396,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { e); } - self->_rs.probe().service_error(); + self->_server.probe().service_error(); self->_rs.conn->shutdown_input(); }); return d; diff --git a/src/v/kafka/server/request_context.h b/src/v/kafka/server/request_context.h index 80d0a6991b73..7267ef15adc8 100644 --- a/src/v/kafka/server/request_context.h +++ b/src/v/kafka/server/request_context.h @@ -86,7 +86,7 @@ class request_context { request_reader& reader() { return _reader; } - latency_probe& probe() { return _conn->server().probe(); } + latency_probe& probe() { return _conn->server().latency_probe(); } const cluster::metadata_cache& metadata_cache() const { return _conn->server().metadata_cache(); diff --git a/src/v/kafka/server/server.h b/src/v/kafka/server/server.h index 2af18a36a1f0..45b0b09d009b 100644 --- a/src/v/kafka/server/server.h +++ b/src/v/kafka/server/server.h @@ -137,7 +137,7 @@ class server final : public net::server { return _fetch_metadata_cache; } - latency_probe& probe() { return _probe; } + latency_probe& latency_probe() { return _probe; } private: ss::smp_service_group _smp_group; @@ -164,7 +164,7 @@ class server final : public net::server { kafka::fetch_metadata_cache _fetch_metadata_cache; security::tls::principal_mapper _mtls_principal_mapper; - latency_probe _probe; + class latency_probe _probe; }; } // namespace kafka diff --git a/src/v/net/server.h b/src/v/net/server.h index d47a41b8aece..445512393cf4 100644 --- a/src/v/net/server.h +++ b/src/v/net/server.h @@ -100,17 +100,11 @@ class server { // always guaranteed non-null class resources final { public: - resources(server* s, ss::lw_shared_ptr c) - : conn(std::move(c)) - , _s(s) {} + resources(server*, ss::lw_shared_ptr c) + : conn(std::move(c)) {} // NOLINTNEXTLINE ss::lw_shared_ptr conn; - - server_probe& probe() { return _s->_probe; } - - private: - server* _s; }; explicit server(server_configuration); @@ -145,6 +139,7 @@ class server { virtual std::string_view name() const = 0; virtual ss::future<> apply(resources) = 0; + server_probe& probe() { return _probe; } ssx::semaphore& memory() { return _memory; } ss::gate& conn_gate() { return _conn_gate; } hdr_hist& hist() { return _hist; } diff --git a/src/v/rpc/rpc_server.cc b/src/v/rpc/rpc_server.cc index 75ff4b4f0675..6f233571bd86 100644 --- a/src/v/rpc/rpc_server.cc +++ b/src/v/rpc/rpc_server.cc @@ -31,16 +31,16 @@ struct server_context_impl final : streaming_context { : server(server) , res(std::move(s)) , hdr(h) { - res.probe().request_received(); + server.probe().request_received(); } ss::future reserve_memory(size_t ask) final { auto fut = get_units(server.memory(), ask); if (server.memory().waiters()) { - res.probe().waiting_for_available_memory(); + server.probe().waiting_for_available_memory(); } return fut; } - ~server_context_impl() override { res.probe().request_completed(); } + ~server_context_impl() override { server.probe().request_completed(); } const header& get_header() const final { return hdr; } void signal_body_parse() final { pr.set_value(); } void body_parse_exception(std::exception_ptr e) final { @@ -61,7 +61,7 @@ ss::future<> rpc_server::apply(net::server::resources rs) { if (!h) { rpclog.debug( "could not parse header from client: {}", rs.conn->addr); - rs.probe().header_corrupted(); + probe().header_corrupted(); // Have to shutdown the connection as data in receiving // buffer may be corrupted rs.conn->shutdown_input(); @@ -111,7 +111,7 @@ ss::future<> rpc_server::dispatch_method_once(header h, net::server::resources rs) { const auto method_id = h.meta; auto ctx = ss::make_lw_shared(*this, rs, h); - rs.probe().add_bytes_received(size_of_rpc_header + h.payload_size); + probe().add_bytes_received(size_of_rpc_header + h.payload_size); if (conn_gate().is_closed()) { return ss::make_exception_future<>(ss::gate_closed_exception()); } @@ -156,7 +156,7 @@ rpc_server::dispatch_method_once(header h, net::server::resources rs) { "Received a request for an unknown method {} from {}", method_id, ctx->res.conn->addr); - rs.probe().method_not_found(); + probe().method_not_found(); netbuf reply_buf; reply_buf.set_version(ctx->get_header().version); reply_buf.set_status(rpc::status::method_not_found); @@ -213,7 +213,7 @@ rpc_server::dispatch_method_once(header h, net::server::resources rs) { rpclog.error( "Service handler threw an exception: {}", std::current_exception()); - rs.probe().service_error(); + probe().service_error(); reply_buf.set_status(rpc::status::server_error); } if (error) { From ce890dd28cbdaf9ae5abc9a0553631b86c1860fd Mon Sep 17 00:00:00 2001 From: Noah Watkins Date: Tue, 20 Dec 2022 16:35:21 -0800 Subject: [PATCH 6/7] net: fully remove net::server::resources Signed-off-by: Noah Watkins --- src/v/kafka/server/connection_context.cc | 32 ++++++++--------- src/v/kafka/server/connection_context.h | 12 +++---- src/v/kafka/server/server.cc | 8 ++--- src/v/kafka/server/server.h | 2 +- src/v/net/server.cc | 7 ++-- src/v/net/server.h | 15 ++------ src/v/redpanda/tests/fixture.h | 2 +- src/v/rpc/rpc_server.cc | 44 ++++++++++++------------ src/v/rpc/rpc_server.h | 4 +-- src/v/rpc/test/rpc_gen_cycling_test.cc | 6 ++-- 10 files changed, 60 insertions(+), 72 deletions(-) diff --git a/src/v/kafka/server/connection_context.cc b/src/v/kafka/server/connection_context.cc index 7d12a99e5aa1..aafd692c012b 100644 --- a/src/v/kafka/server/connection_context.cc +++ b/src/v/kafka/server/connection_context.cc @@ -41,7 +41,7 @@ using namespace std::chrono_literals; namespace kafka { ss::future<> connection_context::process_one_request() { - return parse_size(_rs.conn->input()) + return parse_size(conn->input()) .then([this](std::optional sz) mutable { if (!sz) { return ss::make_ready_future<>(); @@ -51,7 +51,7 @@ ss::future<> connection_context::process_one_request() { klog.warn, "request from {} is larger ({} bytes) than configured max " "request size {}", - _rs.conn->addr, + conn->addr, sz, _max_request_size()); return ss::make_exception_future<>( @@ -77,10 +77,10 @@ ss::future<> connection_context::process_one_request() { return handle_auth_v0(*sz).handle_exception( [this](std::exception_ptr e) { vlog(klog.info, "Detected error processing request: {}", e); - _rs.conn->shutdown_input(); + conn->shutdown_input(); }); } - return parse_header(_rs.conn->input()) + return parse_header(conn->input()) .then([this, s = sz.value()](std::optional h) mutable { _server.probe().add_bytes_received(s); @@ -88,7 +88,7 @@ ss::future<> connection_context::process_one_request() { vlog( klog.debug, "could not parse header from client: {}", - _rs.conn->addr); + conn->addr); _server.probe().header_corrupted(); return ss::make_ready_future<>(); } @@ -98,9 +98,9 @@ ss::future<> connection_context::process_one_request() { vlog( klog.warn, "Error while processing request from {} - {}", - _rs.conn->addr, + conn->addr, e.what()); - _rs.conn->shutdown_input(); + conn->shutdown_input(); }) .handle_exception_type([this](const std::bad_alloc&) { // In general, dispatch_method_once does not throw, @@ -111,7 +111,7 @@ ss::future<> connection_context::process_one_request() { klog.error, "Request from {} failed on memory exhaustion " "(std::bad_alloc)", - _rs.conn->addr); + conn->addr); }); }); }); @@ -146,7 +146,7 @@ ss::future<> connection_context::handle_auth_v0(const size_t size) { const api_version version(0); iobuf request_buf; { - auto data = co_await read_iobuf_exactly(_rs.conn->input(), size); + auto data = co_await read_iobuf_exactly(conn->input(), size); sasl_authenticate_request request; request.data.auth_bytes = iobuf_to_bytes(data); response_writer writer(request_buf); @@ -188,11 +188,11 @@ ss::future<> connection_context::handle_auth_v0(const size_t size) { response_writer writer(data); writer.write(response.data.auth_bytes); auto msg = iobuf_as_scattered(std::move(data)); - co_await _rs.conn->write(std::move(msg)); + co_await conn->write(std::move(msg)); } bool connection_context::is_finished_parsing() const { - return _rs.conn->input().eof() || _server.abort_requested(); + return conn->input().eof() || _server.abort_requested(); } ss::future connection_context::throttle_request( @@ -281,7 +281,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { auto remaining = size - request_header_size - hdr.client_id_buffer.size() - hdr.tags_size_bytes; - return read_iobuf_exactly(_rs.conn->input(), remaining) + return read_iobuf_exactly(conn->input(), remaining) .then([this, hdr = std::move(hdr), sres = std::move(sres)]( iobuf buf) mutable { if (_server.abort_requested()) { @@ -387,7 +387,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { vlog( klog.info, "Disconnected {} ({})", - self->_rs.conn->addr, + self->conn->addr, disconnected.value()); } else { vlog( @@ -397,14 +397,14 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { } self->_server.probe().service_error(); - self->_rs.conn->shutdown_input(); + self->conn->shutdown_input(); }); return d; }) .handle_exception([self](std::exception_ptr e) { vlog( klog.info, "Detected error dispatching request: {}", e); - self->_rs.conn->shutdown_input(); + self->conn->shutdown_input(); }); }); }); @@ -444,7 +444,7 @@ ss::future<> connection_context::maybe_process_responses() { auto msg = response_as_scattered(std::move(resp_and_res.response)); try { - return _rs.conn->write(std::move(msg)) + return conn->write(std::move(msg)) .then([] { return ss::make_ready_future( ss::stop_iteration::no); diff --git a/src/v/kafka/server/connection_context.h b/src/v/kafka/server/connection_context.h index 5b532c732655..c60bc7c4c1a5 100644 --- a/src/v/kafka/server/connection_context.h +++ b/src/v/kafka/server/connection_context.h @@ -86,16 +86,16 @@ class connection_context final public: connection_context( server& s, - net::server::resources&& r, + ss::lw_shared_ptr conn, std::optional sasl, bool enable_authorizer, std::optional mtls_state, config::binding max_request_size) noexcept : _server(s) - , _rs(std::move(r)) + , conn(conn) , _sasl(std::move(sasl)) // tests may build a context without a live connection - , _client_addr(_rs.conn ? _rs.conn->addr.addr() : ss::net::inet_address{}) + , _client_addr(conn ? conn->addr.addr() : ss::net::inet_address{}) , _enable_authorizer(enable_authorizer) , _authlog(_client_addr, client_port()) , _mtls_state(std::move(mtls_state)) @@ -108,7 +108,7 @@ class connection_context final connection_context& operator=(connection_context&&) = delete; server& server() { return _server; } - const ss::sstring& listener() const { return _rs.conn->name(); } + const ss::sstring& listener() const { return conn->name(); } std::optional& sasl() { return _sasl; } template @@ -191,7 +191,7 @@ class connection_context final bool is_finished_parsing() const; ss::net::inet_address client_host() const { return _client_addr; } uint16_t client_port() const { - return _rs.conn ? _rs.conn->addr.port() : 0; + return conn ? conn->addr.port() : 0; } private: @@ -293,7 +293,7 @@ class connection_context final }; class server& _server; - net::server::resources _rs; + ss::lw_shared_ptr conn; sequence_id _next_response; sequence_id _seq_idx; map_t _responses; diff --git a/src/v/kafka/server/server.cc b/src/v/kafka/server/server.cc index d4d837342136..4c09c3317c36 100644 --- a/src/v/kafka/server/server.cc +++ b/src/v/kafka/server/server.cc @@ -162,11 +162,11 @@ ss::future get_mtls_principal_state( }); } -ss::future<> server::apply(net::server::resources rs) { +ss::future<> server::apply(ss::lw_shared_ptr conn) { const bool authz_enabled = config::shard_local_cfg().kafka_enable_authorization().value_or( config::shard_local_cfg().enable_sasl()); - const auto authn_method = get_authn_method(*rs.conn); + const auto authn_method = get_authn_method(*conn); // Only initialise sasl state if sasl is enabled auto sasl = authn_method == config::broker_authn_method::sasl @@ -178,12 +178,12 @@ ss::future<> server::apply(net::server::resources rs) { std::optional mtls_state; if (authn_method == config::broker_authn_method::mtls_identity) { mtls_state = co_await get_mtls_principal_state( - _mtls_principal_mapper, *rs.conn); + _mtls_principal_mapper, *conn); } auto ctx = ss::make_lw_shared( *this, - std::move(rs), + conn, std::move(sasl), authz_enabled, mtls_state, diff --git a/src/v/kafka/server/server.h b/src/v/kafka/server/server.h index 45b0b09d009b..f61835f34b48 100644 --- a/src/v/kafka/server/server.h +++ b/src/v/kafka/server/server.h @@ -65,7 +65,7 @@ class server final : public net::server { std::string_view name() const final { return "kafka rpc protocol"; } // the lifetime of all references here are guaranteed to live // until the end of the server (container/parent) - ss::future<> apply(net::server::resources) final; + ss::future<> apply(ss::lw_shared_ptr) final; ss::smp_service_group smp_group() const { return _smp_group; } cluster::topics_frontend& topics_frontend() { diff --git a/src/v/net/server.cc b/src/v/net/server.cc index 367338071ff5..b58507254c44 100644 --- a/src/v/net/server.cc +++ b/src/v/net/server.cc @@ -136,9 +136,8 @@ static inline void print_exceptional_future( } ss::future<> -server::apply_proto(server::resources&& rs, conn_quota::units cq_units) { - auto conn = rs.conn; - return apply(std::move(rs)) +server::apply_proto(ss::lw_shared_ptr conn, conn_quota::units cq_units) { + return apply(conn) .then_wrapped( [this, conn, cq_units = std::move(cq_units)](ss::future<> f) { print_exceptional_future( @@ -234,7 +233,7 @@ server::accept_finish(ss::sstring name, ss::future f_cs_sa) { } ssx::spawn_with_gate( _conn_gate, [this, conn, cq_units = std::move(cq_units)]() mutable { - return apply_proto(resources(this, conn), std::move(cq_units)); + return apply_proto(conn, std::move(cq_units)); }); co_return ss::stop_iteration::no; } diff --git a/src/v/net/server.h b/src/v/net/server.h index 445512393cf4..969cc56dec67 100644 --- a/src/v/net/server.h +++ b/src/v/net/server.h @@ -97,16 +97,6 @@ struct server_configuration { class server { public: - // always guaranteed non-null - class resources final { - public: - resources(server*, ss::lw_shared_ptr c) - : conn(std::move(c)) {} - - // NOLINTNEXTLINE - ss::lw_shared_ptr conn; - }; - explicit server(server_configuration); explicit server(ss::sharded* s); server(server&&) noexcept = default; @@ -137,7 +127,7 @@ class server { const hdr_hist& histogram() const { return _hist; } virtual std::string_view name() const = 0; - virtual ss::future<> apply(resources) = 0; + virtual ss::future<> apply(ss::lw_shared_ptr) = 0; server_probe& probe() { return _probe; } ssx::semaphore& memory() { return _memory; } @@ -156,11 +146,10 @@ class server { , socket(std::move(socket)) {} }; - friend resources; ss::future<> accept(listener&); ss::future accept_finish(ss::sstring, ss::future); - ss::future<> apply_proto(server::resources&&, conn_quota::units); + ss::future<> apply_proto(ss::lw_shared_ptr, conn_quota::units); void setup_metrics(); void setup_public_metrics(); diff --git a/src/v/redpanda/tests/fixture.h b/src/v/redpanda/tests/fixture.h index 32ce1529d28d..fc61c1edc839 100644 --- a/src/v/redpanda/tests/fixture.h +++ b/src/v/redpanda/tests/fixture.h @@ -568,7 +568,7 @@ class redpanda_thread_fixture { security::sasl_server sasl(security::sasl_server::sasl_state::complete); auto conn = ss::make_lw_shared( *proto, - net::server::resources(nullptr, nullptr), + nullptr, std::move(sasl), false, std::nullopt, diff --git a/src/v/rpc/rpc_server.cc b/src/v/rpc/rpc_server.cc index 6f233571bd86..31b6e5c291a7 100644 --- a/src/v/rpc/rpc_server.cc +++ b/src/v/rpc/rpc_server.cc @@ -27,9 +27,9 @@ namespace rpc { static constexpr size_t reply_min_compression_bytes = 1024; struct server_context_impl final : streaming_context { - server_context_impl(net::server& server, net::server::resources s, header h) + server_context_impl(net::server& server, ss::lw_shared_ptr conn, header h) : server(server) - , res(std::move(s)) + , conn(conn) , hdr(h) { server.probe().request_received(); } @@ -47,27 +47,27 @@ struct server_context_impl final : streaming_context { pr.set_exception(std::move(e)); } net::server& server; - net::server::resources res; + ss::lw_shared_ptr conn; header hdr; ss::promise<> pr; }; -ss::future<> rpc_server::apply(net::server::resources rs) { +ss::future<> rpc_server::apply(ss::lw_shared_ptr conn) { return ss::do_until( - [this, rs] { return rs.conn->input().eof() || abort_requested(); }, - [this, rs]() mutable { - return parse_header(rs.conn->input()) - .then([this, rs](std::optional
h) mutable { + [this, conn] { return conn->input().eof() || abort_requested(); }, + [this, conn]() mutable { + return parse_header(conn->input()) + .then([this, conn](std::optional
h) mutable { if (!h) { rpclog.debug( - "could not parse header from client: {}", rs.conn->addr); + "could not parse header from client: {}", conn->addr); probe().header_corrupted(); // Have to shutdown the connection as data in receiving // buffer may be corrupted - rs.conn->shutdown_input(); + conn->shutdown_input(); return ss::now(); } - return dispatch_method_once(h.value(), rs); + return dispatch_method_once(h.value(), conn); }); }); } @@ -85,32 +85,32 @@ rpc_server::send_reply(ss::lw_shared_ptr ctx, netbuf buf) { "Skipping write of {} bytes, connection is closed", view.size()); co_return; } - co_await ctx->res.conn->write(std::move(view)) + co_await ctx->conn->write(std::move(view)) .handle_exception([ctx = std::move(ctx)](std::exception_ptr e) { auto disconnect = net::is_disconnect_exception(e); if (disconnect) { vlog( rpclog.info, "Disconnected {} ({})", - ctx->res.conn->addr, + ctx->conn->addr, disconnect.value()); } else { vlog(rpclog.warn, "Error dispatching method: {}", e); } - ctx->res.conn->shutdown_input(); + ctx->conn->shutdown_input(); }); } ss::future<> rpc_server::send_reply_skip_payload( ss::lw_shared_ptr ctx, netbuf buf) { - co_await ctx->res.conn->input().skip(ctx->get_header().payload_size); + co_await ctx->conn->input().skip(ctx->get_header().payload_size); co_await send_reply(std::move(ctx), std::move(buf)); } ss::future<> -rpc_server::dispatch_method_once(header h, net::server::resources rs) { +rpc_server::dispatch_method_once(header h, ss::lw_shared_ptr conn) { const auto method_id = h.meta; - auto ctx = ss::make_lw_shared(*this, rs, h); + auto ctx = ss::make_lw_shared(*this, conn, h); probe().add_bytes_received(size_of_rpc_header + h.payload_size); if (conn_gate().is_closed()) { return ss::make_exception_future<>(ss::gate_closed_exception()); @@ -122,7 +122,7 @@ rpc_server::dispatch_method_once(header h, net::server::resources rs) { ssx::background = ssx::spawn_with_gate_then( conn_gate(), - [this, method_id, rs, ctx]() mutable { + [this, method_id, ctx]() mutable { if (unlikely( ctx->get_header().version > transport_version::max_supported)) { @@ -132,7 +132,7 @@ rpc_server::dispatch_method_once(header h, net::server::resources rs) { "> {} from {}", ctx->get_header().version, transport_version::max_supported, - ctx->res.conn->addr); + ctx->conn->addr); netbuf reply_buf; reply_buf.set_status(rpc::status::version_not_supported); /* @@ -155,7 +155,7 @@ rpc_server::dispatch_method_once(header h, net::server::resources rs) { rpclog.debug, "Received a request for an unknown method {} from {}", method_id, - ctx->res.conn->addr); + ctx->conn->addr); probe().method_not_found(); netbuf reply_buf; reply_buf.set_version(ctx->get_header().version); @@ -166,8 +166,8 @@ rpc_server::dispatch_method_once(header h, net::server::resources rs) { method* m = it->get()->method_from_id(method_id); - return m->handle(ctx->res.conn->input(), *ctx) - .then_wrapped([this, ctx, m, l = hist().auto_measure(), rs]( + return m->handle(ctx->conn->input(), *ctx) + .then_wrapped([this, ctx, m, l = hist().auto_measure()]( ss::future fut) mutable { bool error = true; netbuf reply_buf; diff --git a/src/v/rpc/rpc_server.h b/src/v/rpc/rpc_server.h index 833e5bc91638..b116b92e2b18 100644 --- a/src/v/rpc/rpc_server.h +++ b/src/v/rpc/rpc_server.h @@ -51,7 +51,7 @@ class rpc_server : public net::server { return "vectorized internal rpc protocol"; } - ss::future<> apply(net::server::resources rs) final; + ss::future<> apply(ss::lw_shared_ptr) final; template T, typename... Args> void register_service(Args&&... args) { @@ -59,7 +59,7 @@ class rpc_server : public net::server { } private: - ss::future<> dispatch_method_once(header, net::server::resources); + ss::future<> dispatch_method_once(header, ss::lw_shared_ptr); ss::future<> send_reply(ss::lw_shared_ptr, netbuf); ss::future<> send_reply_skip_payload(ss::lw_shared_ptr, netbuf); diff --git a/src/v/rpc/test/rpc_gen_cycling_test.cc b/src/v/rpc/test/rpc_gen_cycling_test.cc index 5556a4d01f3a..9d04e8346a8d 100644 --- a/src/v/rpc/test/rpc_gen_cycling_test.cc +++ b/src/v/rpc/test/rpc_gen_cycling_test.cc @@ -737,10 +737,10 @@ class erroneous_protocol_server final : public net::server { std::string_view name() const final { return "redpanda erraneous proto"; }; - ss::future<> apply(net::server::resources rs) final { + ss::future<> apply(ss::lw_shared_ptr conn) final { return ss::do_until( - [this, rs] { return rs.conn->input().eof() || abort_requested(); }, - [rs]() mutable { + [this, conn] { return conn->input().eof() || abort_requested(); }, + [] { return ss::make_exception_future<>( erroneous_protocol_exception()); }); From 208e09dd64ddad0a45e4499c62f02d1b3a94496c Mon Sep 17 00:00:00 2001 From: Noah Watkins Date: Tue, 20 Dec 2022 16:38:05 -0800 Subject: [PATCH 7/7] chore: apply clang format Signed-off-by: Noah Watkins --- src/v/kafka/server/connection_context.h | 4 +--- src/v/net/server.cc | 4 ++-- src/v/net/server.h | 3 ++- src/v/rpc/rpc_server.cc | 7 ++++--- src/v/rpc/rpc_server.h | 3 ++- 5 files changed, 11 insertions(+), 10 deletions(-) diff --git a/src/v/kafka/server/connection_context.h b/src/v/kafka/server/connection_context.h index c60bc7c4c1a5..5b5ac945dd95 100644 --- a/src/v/kafka/server/connection_context.h +++ b/src/v/kafka/server/connection_context.h @@ -190,9 +190,7 @@ class connection_context final ss::future<> process_one_request(); bool is_finished_parsing() const; ss::net::inet_address client_host() const { return _client_addr; } - uint16_t client_port() const { - return conn ? conn->addr.port() : 0; - } + uint16_t client_port() const { return conn ? conn->addr.port() : 0; } private: // Reserve units from memory from the memory semaphore in proportion diff --git a/src/v/net/server.cc b/src/v/net/server.cc index b58507254c44..fbd84046a69e 100644 --- a/src/v/net/server.cc +++ b/src/v/net/server.cc @@ -135,8 +135,8 @@ static inline void print_exceptional_future( } } -ss::future<> -server::apply_proto(ss::lw_shared_ptr conn, conn_quota::units cq_units) { +ss::future<> server::apply_proto( + ss::lw_shared_ptr conn, conn_quota::units cq_units) { return apply(conn) .then_wrapped( [this, conn, cq_units = std::move(cq_units)](ss::future<> f) { diff --git a/src/v/net/server.h b/src/v/net/server.h index 969cc56dec67..7bc02e5ac1df 100644 --- a/src/v/net/server.h +++ b/src/v/net/server.h @@ -149,7 +149,8 @@ class server { ss::future<> accept(listener&); ss::future accept_finish(ss::sstring, ss::future); - ss::future<> apply_proto(ss::lw_shared_ptr, conn_quota::units); + ss::future<> + apply_proto(ss::lw_shared_ptr, conn_quota::units); void setup_metrics(); void setup_public_metrics(); diff --git a/src/v/rpc/rpc_server.cc b/src/v/rpc/rpc_server.cc index 31b6e5c291a7..5b90dc0c0933 100644 --- a/src/v/rpc/rpc_server.cc +++ b/src/v/rpc/rpc_server.cc @@ -27,7 +27,8 @@ namespace rpc { static constexpr size_t reply_min_compression_bytes = 1024; struct server_context_impl final : streaming_context { - server_context_impl(net::server& server, ss::lw_shared_ptr conn, header h) + server_context_impl( + net::server& server, ss::lw_shared_ptr conn, header h) : server(server) , conn(conn) , hdr(h) { @@ -107,8 +108,8 @@ ss::future<> rpc_server::send_reply_skip_payload( co_await send_reply(std::move(ctx), std::move(buf)); } -ss::future<> -rpc_server::dispatch_method_once(header h, ss::lw_shared_ptr conn) { +ss::future<> rpc_server::dispatch_method_once( + header h, ss::lw_shared_ptr conn) { const auto method_id = h.meta; auto ctx = ss::make_lw_shared(*this, conn, h); probe().add_bytes_received(size_of_rpc_header + h.payload_size); diff --git a/src/v/rpc/rpc_server.h b/src/v/rpc/rpc_server.h index b116b92e2b18..e8c552b37fec 100644 --- a/src/v/rpc/rpc_server.h +++ b/src/v/rpc/rpc_server.h @@ -59,7 +59,8 @@ class rpc_server : public net::server { } private: - ss::future<> dispatch_method_once(header, ss::lw_shared_ptr); + ss::future<> + dispatch_method_once(header, ss::lw_shared_ptr); ss::future<> send_reply(ss::lw_shared_ptr, netbuf); ss::future<> send_reply_skip_payload(ss::lw_shared_ptr, netbuf);