Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release kafka request resources at the right time #5280

Draft
wants to merge 3 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 44 additions & 18 deletions src/v/kafka/server/connection_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) {
seq,
correlation,
self,
s = std::move(sres)](ss::future<> d) mutable {
sres = std::move(sres)](ss::future<> d) mutable {
/*
* if the dispatch/first stage failed, then we need to
* need to consume the second stage since it might be
Expand Down Expand Up @@ -362,13 +362,22 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) {
ssx::background
= ssx::spawn_with_gate_then(
_rs.conn_gate(),
[this, f = std::move(f), seq, correlation]() mutable {
return f.then([this, seq, correlation](
response_ptr r) mutable {
r->set_correlation(correlation);
_responses.insert({seq, std::move(r)});
return process_next_response();
});
[this,
f = std::move(f),
sres = std::move(sres),
seq,
correlation]() mutable {
return f.then(
[this,
sres = std::move(sres),
seq,
correlation](response_ptr r) mutable {
r->set_correlation(correlation);
response_and_resources randr{
std::move(r), std::move(sres)};
_responses.insert({seq, std::move(randr)});
return maybe_process_responses();
});
})
.handle_exception([self](std::exception_ptr e) {
// ssx::spawn_with_gate already caught
Expand Down Expand Up @@ -397,8 +406,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) {

self->_rs.probe().service_error();
self->_rs.conn->shutdown_input();
})
.finally([s = std::move(s), self] {});
});
return d;
})
.handle_exception([self](std::exception_ptr e) {
Expand All @@ -410,7 +418,20 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) {
});
}

ss::future<> connection_context::process_next_response() {
/**
* This method processes as many responses as possible, in request order. Since
* we proces the second stage asynchronously within a given connection, reponses
* may become ready out of order, but Kafka clients expect responses exactly in
* request order.
*
* The _responses queue handles that: responses are enqueued there in completion
* order, but only sent to the client in response order. So this method, called
* after every response is ready, may end up sending zero, one or more requests,
* depending on the completion order.
*
* @return ss::future<>
*/
ss::future<> connection_context::maybe_process_responses() {
return ss::repeat([this]() mutable {
auto it = _responses.find(_next_response);
if (it == _responses.end()) {
Expand All @@ -420,20 +441,25 @@ ss::future<> connection_context::process_next_response() {
// found one; increment counter
_next_response = _next_response + sequence_id(1);

auto r = std::move(it->second);
auto resp_and_res = std::move(it->second);

_responses.erase(it);

if (r->is_noop()) {
if (resp_and_res.response->is_noop()) {
return ss::make_ready_future<ss::stop_iteration>(
ss::stop_iteration::no);
}

auto msg = response_as_scattered(std::move(r));
auto msg = response_as_scattered(std::move(resp_and_res.response));
try {
return _rs.conn->write(std::move(msg)).then([] {
return ss::make_ready_future<ss::stop_iteration>(
ss::stop_iteration::no);
});
return _rs.conn->write(std::move(msg))
.then([] {
return ss::make_ready_future<ss::stop_iteration>(
ss::stop_iteration::no);
})
// release the resources only once it has been written to the
// connection.
.finally([resources = std::move(resp_and_res.resources)] {});
} catch (...) {
vlog(
klog.debug,
Expand Down
33 changes: 30 additions & 3 deletions src/v/kafka/server/connection_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,14 @@ class connection_context final
private:
net::server_probe& _probe;
};
// used to pass around some internal state

// Used to hold resources associated with a given request until
// the response has been send, as well as to track some statistics
// about the request.
//
// The resources in particular should be not be destroyed until
// the request is complete (e.g., all the information written to
// the socket so that no userspace buffers remain).
struct session_resources {
ss::lowres_clock::duration backpressure_delay;
ss::semaphore_units<> memlocks;
Expand All @@ -166,14 +173,34 @@ class connection_context final

ss::future<> handle_mtls_auth();
ss::future<> dispatch_method_once(request_header, size_t sz);
ss::future<> process_next_response();

/**
* Process zero or more ready responses in request order.
*
* The future<> returned by this method resolves when all ready *and*
* in-order responses have been processed, which is not the same as all
* ready responses. In particular, responses which are ready may not be
* processed if there are earlier (lower sequence number) responses which
* are not yet ready: they will be processed by a future invocation.
*
* @return ss::future<> a future which as described above.
*/
ss::future<> maybe_process_responses();
ss::future<> do_process(request_context);

ss::future<> handle_auth_v0(size_t);

private:
/**
* Bundles together a response and its associated resources.
*/
struct response_and_resources {
response_ptr response;
session_resources resources;
};

using sequence_id = named_type<uint64_t, struct kafka_protocol_sequence>;
using map_t = absl::flat_hash_map<sequence_id, response_ptr>;
using map_t = absl::flat_hash_map<sequence_id, response_and_resources>;

class ctx_log {
public:
Expand Down