From 0177bb4c8b56bb79d3a1d62af17772cf4747983b Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Wed, 29 Jun 2022 16:57:29 -0700 Subject: [PATCH] Release resources after the response is written Currently we release resources after the response is enqueued in connection_context and response processing is called, but it may not have been sent at this point as we require in-order responses but second-stage processing may happen out of order. In this change, we instead tunnel the resource object through to the place where the response is written, and release it there. FIxes #5278. --- src/v/kafka/server/connection_context.cc | 47 +++++++++++++++--------- src/v/kafka/server/connection_context.h | 10 ++++- 2 files changed, 39 insertions(+), 18 deletions(-) diff --git a/src/v/kafka/server/connection_context.cc b/src/v/kafka/server/connection_context.cc index bffc5dd8a9d3..d046f5cbfdb0 100644 --- a/src/v/kafka/server/connection_context.cc +++ b/src/v/kafka/server/connection_context.cc @@ -333,7 +333,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { seq, correlation, self, - s = std::move(sres)](ss::future<> d) mutable { + sres = std::move(sres)](ss::future<> d) mutable { /* * if the dispatch/first stage failed, then we need to * need to consume the second stage since it might be @@ -362,13 +362,22 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { ssx::background = ssx::spawn_with_gate_then( _rs.conn_gate(), - [this, f = std::move(f), seq, correlation]() mutable { - return f.then([this, seq, correlation]( - response_ptr r) mutable { - r->set_correlation(correlation); - _responses.insert({seq, std::move(r)}); - return maybe_process_responses(); - }); + [this, + f = std::move(f), + sres = std::move(sres), + seq, + correlation]() mutable { + return f.then( + [this, + sres = std::move(sres), + seq, + correlation](response_ptr r) mutable { + r->set_correlation(correlation); + response_and_resources randr{ + std::move(r), std::move(sres)}; + _responses.insert({seq, std::move(randr)}); + return maybe_process_responses(); + }); }) .handle_exception([self](std::exception_ptr e) { // ssx::spawn_with_gate already caught @@ -397,8 +406,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { self->_rs.probe().service_error(); self->_rs.conn->shutdown_input(); - }) - .finally([s = std::move(s), self] {}); + }); return d; }) .handle_exception([self](std::exception_ptr e) { @@ -433,20 +441,25 @@ ss::future<> connection_context::maybe_process_responses() { // found one; increment counter _next_response = _next_response + sequence_id(1); - auto r = std::move(it->second); + auto resp_and_res = std::move(it->second); + _responses.erase(it); - if (r->is_noop()) { + if (resp_and_res.response->is_noop()) { return ss::make_ready_future( ss::stop_iteration::no); } - auto msg = response_as_scattered(std::move(r)); + auto msg = response_as_scattered(std::move(resp_and_res.response)); try { - return _rs.conn->write(std::move(msg)).then([] { - return ss::make_ready_future( - ss::stop_iteration::no); - }); + return _rs.conn->write(std::move(msg)) + .then([] { + return ss::make_ready_future( + ss::stop_iteration::no); + }) + // release the resources only once it has been written to the + // connection. + .finally([resources = std::move(resp_and_res.resources)] {}); } catch (...) { vlog( klog.debug, diff --git a/src/v/kafka/server/connection_context.h b/src/v/kafka/server/connection_context.h index 12ef73e40d18..e0b5766827cf 100644 --- a/src/v/kafka/server/connection_context.h +++ b/src/v/kafka/server/connection_context.h @@ -191,8 +191,16 @@ class connection_context final ss::future<> handle_auth_v0(size_t); private: + /** + * Bundles together a response and its associated resources. + */ + struct response_and_resources { + response_ptr response; + session_resources resources; + }; + using sequence_id = named_type; - using map_t = absl::flat_hash_map; + using map_t = absl::flat_hash_map; class ctx_log { public: