Skip to content

Commit

Permalink
Release resources after the response is written
Browse files Browse the repository at this point in the history
Currently we release resources after the response is enqueued
in connection_context and response processing is called, but it
may not have been sent at this point as we require in-order responses
but second-stage processing may happen out of order.

In this change, we instead tunnel the resource object through to
the place where the response is written, and release it there.

FIxes redpanda-data#5278.
  • Loading branch information
travisdowns committed Jul 14, 2022
1 parent 5f5c7c6 commit d2bac6c
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 18 deletions.
47 changes: 30 additions & 17 deletions src/v/kafka/server/connection_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) {
seq,
correlation,
self,
s = std::move(sres)](ss::future<> d) mutable {
sres = std::move(sres)](ss::future<> d) mutable {
/*
* if the dispatch/first stage failed, then we need to
* need to consume the second stage since it might be
Expand Down Expand Up @@ -362,13 +362,22 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) {
ssx::background
= ssx::spawn_with_gate_then(
_rs.conn_gate(),
[this, f = std::move(f), seq, correlation]() mutable {
return f.then([this, seq, correlation](
response_ptr r) mutable {
r->set_correlation(correlation);
_responses.insert({seq, std::move(r)});
return maybe_process_responses();
});
[this,
f = std::move(f),
sres = std::move(sres),
seq,
correlation]() mutable {
return f.then(
[this,
sres = std::move(sres),
seq,
correlation](response_ptr r) mutable {
r->set_correlation(correlation);
response_and_resources randr{
std::move(r), std::move(sres)};
_responses.insert({seq, std::move(randr)});
return maybe_process_responses();
});
})
.handle_exception([self](std::exception_ptr e) {
// ssx::spawn_with_gate already caught
Expand Down Expand Up @@ -397,8 +406,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) {

self->_rs.probe().service_error();
self->_rs.conn->shutdown_input();
})
.finally([s = std::move(s), self] {});
});
return d;
})
.handle_exception([self](std::exception_ptr e) {
Expand Down Expand Up @@ -433,20 +441,25 @@ ss::future<> connection_context::maybe_process_responses() {
// found one; increment counter
_next_response = _next_response + sequence_id(1);

auto r = std::move(it->second);
auto resp_and_res = std::move(it->second);

_responses.erase(it);

if (r->is_noop()) {
if (resp_and_res.response->is_noop()) {
return ss::make_ready_future<ss::stop_iteration>(
ss::stop_iteration::no);
}

auto msg = response_as_scattered(std::move(r));
auto msg = response_as_scattered(std::move(resp_and_res.response));
try {
return _rs.conn->write(std::move(msg)).then([] {
return ss::make_ready_future<ss::stop_iteration>(
ss::stop_iteration::no);
});
return _rs.conn->write(std::move(msg))
.then([] {
return ss::make_ready_future<ss::stop_iteration>(
ss::stop_iteration::no);
})
// release the resources only once it has been written to the
// connection.
.finally([resources = std::move(resp_and_res.resources)] {});
} catch (...) {
vlog(
klog.debug,
Expand Down
10 changes: 9 additions & 1 deletion src/v/kafka/server/connection_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,16 @@ class connection_context final
ss::future<> handle_auth_v0(size_t);

private:
/**
* Bundles together a response and its associated resources.
*/
struct response_and_resources {
response_ptr response;
session_resources resources;
};

using sequence_id = named_type<uint64_t, struct kafka_protocol_sequence>;
using map_t = absl::flat_hash_map<sequence_id, response_ptr>;
using map_t = absl::flat_hash_map<sequence_id, response_and_resources>;

class ctx_log {
public:
Expand Down

0 comments on commit d2bac6c

Please sign in to comment.