diff --git a/src/v/kafka/server/connection_context.cc b/src/v/kafka/server/connection_context.cc index bffc5dd8a9d3..d046f5cbfdb0 100644 --- a/src/v/kafka/server/connection_context.cc +++ b/src/v/kafka/server/connection_context.cc @@ -333,7 +333,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { seq, correlation, self, - s = std::move(sres)](ss::future<> d) mutable { + sres = std::move(sres)](ss::future<> d) mutable { /* * if the dispatch/first stage failed, then we need to * need to consume the second stage since it might be @@ -362,13 +362,22 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { ssx::background = ssx::spawn_with_gate_then( _rs.conn_gate(), - [this, f = std::move(f), seq, correlation]() mutable { - return f.then([this, seq, correlation]( - response_ptr r) mutable { - r->set_correlation(correlation); - _responses.insert({seq, std::move(r)}); - return maybe_process_responses(); - }); + [this, + f = std::move(f), + sres = std::move(sres), + seq, + correlation]() mutable { + return f.then( + [this, + sres = std::move(sres), + seq, + correlation](response_ptr r) mutable { + r->set_correlation(correlation); + response_and_resources randr{ + std::move(r), std::move(sres)}; + _responses.insert({seq, std::move(randr)}); + return maybe_process_responses(); + }); }) .handle_exception([self](std::exception_ptr e) { // ssx::spawn_with_gate already caught @@ -397,8 +406,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { self->_rs.probe().service_error(); self->_rs.conn->shutdown_input(); - }) - .finally([s = std::move(s), self] {}); + }); return d; }) .handle_exception([self](std::exception_ptr e) { @@ -433,20 +441,25 @@ ss::future<> connection_context::maybe_process_responses() { // found one; increment counter _next_response = _next_response + sequence_id(1); - auto r = std::move(it->second); + auto resp_and_res = std::move(it->second); + _responses.erase(it); - if (r->is_noop()) { + if (resp_and_res.response->is_noop()) { return ss::make_ready_future( ss::stop_iteration::no); } - auto msg = response_as_scattered(std::move(r)); + auto msg = response_as_scattered(std::move(resp_and_res.response)); try { - return _rs.conn->write(std::move(msg)).then([] { - return ss::make_ready_future( - ss::stop_iteration::no); - }); + return _rs.conn->write(std::move(msg)) + .then([] { + return ss::make_ready_future( + ss::stop_iteration::no); + }) + // release the resources only once it has been written to the + // connection. + .finally([resources = std::move(resp_and_res.resources)] {}); } catch (...) { vlog( klog.debug, diff --git a/src/v/kafka/server/connection_context.h b/src/v/kafka/server/connection_context.h index 12ef73e40d18..e0b5766827cf 100644 --- a/src/v/kafka/server/connection_context.h +++ b/src/v/kafka/server/connection_context.h @@ -191,8 +191,16 @@ class connection_context final ss::future<> handle_auth_v0(size_t); private: + /** + * Bundles together a response and its associated resources. + */ + struct response_and_resources { + response_ptr response; + session_resources resources; + }; + using sequence_id = named_type; - using map_t = absl::flat_hash_map; + using map_t = absl::flat_hash_map; class ctx_log { public: