Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Per-handler memory estimation, more accurate estimate for metadata handler #5346

Merged
merged 20 commits into from
Jul 15, 2022
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/v/kafka/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ set(handlers_srcs
server/handlers/delete_acls.cc
server/handlers/create_partitions.cc
server/handlers/offset_for_leader_epoch.cc
server/handlers/handler_interface.cc
server/handlers/topics/types.cc
server/handlers/topics/topic_utils.cc
)
Expand Down
104 changes: 72 additions & 32 deletions src/v/kafka/server/connection_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@
#include "bytes/iobuf.h"
#include "config/configuration.h"
#include "kafka/protocol/sasl_authenticate.h"
#include "kafka/server/handlers/handler_interface.h"
#include "kafka/server/protocol.h"
#include "kafka/server/protocol_utils.h"
#include "kafka/server/quota_manager.h"
#include "kafka/server/request_context.h"
#include "kafka/server/response.h"
#include "security/exceptions.h"
#include "units.h"
#include "vlog.h"

#include <seastar/core/coroutine.hh>
#include <seastar/core/scattered_message.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/with_timeout.hh>

Expand Down Expand Up @@ -182,8 +185,9 @@ ss::future<> connection_context::handle_auth_v0(const size_t size) {
},
std::move(request_buf),
0s);
auto sres = session_resources{};
auto resp = co_await kafka::process_request(
std::move(ctx), _proto.smp_group())
std::move(ctx), _proto.smp_group(), sres)
.response;
auto data = std::move(*resp).release();
response.decode(std::move(data), version);
Expand Down Expand Up @@ -213,8 +217,7 @@ bool connection_context::is_finished_parsing() const {
return _rs.conn->input().eof() || _rs.abort_requested();
}

ss::future<connection_context::session_resources>
connection_context::throttle_request(
ss::future<session_resources> connection_context::throttle_request(
const request_header& hdr, size_t request_size) {
// update the throughput tracker for this client using the
// size of the current request and return any computed delay
Expand All @@ -236,8 +239,9 @@ connection_context::throttle_request(
}
auto track = track_latency(hdr.key);
return fut
.then(
[this, request_size] { return reserve_request_units(request_size); })
.then([this, key = hdr.key, request_size] {
return reserve_request_units(key, request_size);
})
.then([this, delay, track, tracker = std::move(tracker)](
ss::semaphore_units<> units) mutable {
return server().get_request_unit().then(
Expand All @@ -262,15 +266,21 @@ connection_context::throttle_request(
}

ss::future<ss::semaphore_units<>>
connection_context::reserve_request_units(size_t size) {
// Allow for extra copies and bookkeeping
auto mem_estimate = size * 2 + 8000; // NOLINT
if (mem_estimate >= (size_t)std::numeric_limits<int32_t>::max()) {
connection_context::reserve_request_units(api_key key, size_t size) {
// Defer to the handler for the request type for the memory estimate, but
// if the request isn't found, use the default estimate (although in that
// case the request is likely for an API we don't support or malformed, so
// it is likely to fail shortly anyway).
auto handler = handler_for_key(key);
auto mem_estimate = handler ? (*handler)->memory_estimate(size, *this)
: default_memory_estimate(size);
if (unlikely(mem_estimate >= (size_t)std::numeric_limits<int32_t>::max())) {
// TODO: Create error response using the specific API?
throw std::runtime_error(fmt::format(
"request too large > 1GB (size: {}; estimate: {})",
"request too large > 1GB (size: {}, estimate: {}, API: {})",
size,
mem_estimate));
mem_estimate,
handler ? (*handler)->name() : "<bad key>"));
}
auto fut = ss::get_units(_rs.memory(), mem_estimate);
if (_rs.memory().waiters()) {
Expand All @@ -282,11 +292,15 @@ connection_context::reserve_request_units(size_t size) {
ss::future<>
connection_context::dispatch_method_once(request_header hdr, size_t size) {
return throttle_request(hdr, size).then([this, hdr = std::move(hdr), size](
session_resources sres) mutable {
session_resources
sres_in) mutable {
if (_rs.abort_requested()) {
// protect against shutdown behavior
return ss::make_ready_future<>();
}

auto sres = ss::make_lw_shared(std::move(sres_in));

auto remaining = size - request_header_size
- hdr.client_id_buffer.size() - hdr.tags_size_bytes;
return read_iobuf_exactly(_rs.conn->input(), remaining)
Expand All @@ -298,7 +312,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) {
}
auto self = shared_from_this();
auto rctx = request_context(
self, std::move(hdr), std::move(buf), sres.backpressure_delay);
self, std::move(hdr), std::move(buf), sres->backpressure_delay);
/*
* we process requests in order since all subsequent requests
* are dependent on authentication having completed.
Expand All @@ -323,7 +337,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) {
const sequence_id seq = _seq_idx;
_seq_idx = _seq_idx + sequence_id(1);
auto res = kafka::process_request(
std::move(rctx), _proto.smp_group());
std::move(rctx), _proto.smp_group(), *sres);
/**
* first stage processed in a foreground.
*/
Expand All @@ -333,7 +347,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) {
seq,
correlation,
self,
s = std::move(sres)](ss::future<> d) mutable {
sres = std::move(sres)](ss::future<> d) mutable {
/*
* if the dispatch/first stage failed, then we need to
* need to consume the second stage since it might be
Expand Down Expand Up @@ -362,13 +376,22 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) {
ssx::background
= ssx::spawn_with_gate_then(
_rs.conn_gate(),
[this, f = std::move(f), seq, correlation]() mutable {
return f.then([this, seq, correlation](
response_ptr r) mutable {
r->set_correlation(correlation);
_responses.insert({seq, std::move(r)});
return process_next_response();
});
[this,
f = std::move(f),
sres = std::move(sres),
seq,
correlation]() mutable {
return f.then(
[this,
sres = std::move(sres),
seq,
correlation](response_ptr r) mutable {
r->set_correlation(correlation);
response_and_resources randr{
std::move(r), std::move(sres)};
_responses.insert({seq, std::move(randr)});
return maybe_process_responses();
});
})
.handle_exception([self](std::exception_ptr e) {
// ssx::spawn_with_gate already caught
Expand Down Expand Up @@ -397,8 +420,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) {

self->_rs.probe().service_error();
self->_rs.conn->shutdown_input();
})
.finally([s = std::move(s), self] {});
});
return d;
})
.handle_exception([self](std::exception_ptr e) {
Expand All @@ -410,7 +432,20 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) {
});
}

ss::future<> connection_context::process_next_response() {
/**
* This method processes as many responses as possible, in request order. Since
* we proces the second stage asynchronously within a given connection, reponses
* may become ready out of order, but Kafka clients expect responses exactly in
* request order.
*
* The _responses queue handles that: responses are enqueued there in completion
* order, but only sent to the client in response order. So this method, called
* after every response is ready, may end up sending zero, one or more requests,
* depending on the completion order.
*
* @return ss::future<>
*/
ss::future<> connection_context::maybe_process_responses() {
return ss::repeat([this]() mutable {
auto it = _responses.find(_next_response);
if (it == _responses.end()) {
Expand All @@ -420,20 +455,25 @@ ss::future<> connection_context::process_next_response() {
// found one; increment counter
_next_response = _next_response + sequence_id(1);

auto r = std::move(it->second);
auto resp_and_res = std::move(it->second);

_responses.erase(it);

if (r->is_noop()) {
if (resp_and_res.response->is_noop()) {
return ss::make_ready_future<ss::stop_iteration>(
ss::stop_iteration::no);
}

auto msg = response_as_scattered(std::move(r));
auto msg = response_as_scattered(std::move(resp_and_res.response));
try {
return _rs.conn->write(std::move(msg)).then([] {
return ss::make_ready_future<ss::stop_iteration>(
ss::stop_iteration::no);
});
return _rs.conn->write(std::move(msg))
.then([] {
return ss::make_ready_future<ss::stop_iteration>(
ss::stop_iteration::no);
})
// release the resources only once it has been written to the
// connection.
.finally([resources = std::move(resp_and_res.resources)] {});
} catch (...) {
vlog(
klog.debug,
Expand Down
103 changes: 71 additions & 32 deletions src/v/kafka/server/connection_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#pragma once
#include "kafka/server/protocol.h"
#include "kafka/server/response.h"
#include "kafka/types.h"
#include "net/server.h"
#include "seastarx.h"
#include "security/acl.h"
Expand All @@ -37,6 +38,41 @@ using authz_quiet = ss::bool_class<struct authz_quiet_tag>;
struct request_header;
class request_context;

// used to track number of pending requests
class request_tracker {
public:
explicit request_tracker(net::server_probe& probe) noexcept
: _probe(probe) {
_probe.request_received();
}
request_tracker(const request_tracker&) = delete;
request_tracker(request_tracker&&) = delete;
request_tracker& operator=(const request_tracker&) = delete;
request_tracker& operator=(request_tracker&&) = delete;

~request_tracker() noexcept { _probe.request_completed(); }

private:
net::server_probe& _probe;
};

// Used to hold resources associated with a given request until
// the response has been send, as well as to track some statistics
// about the request.
//
// The resources in particular should be not be destroyed until
// the request is complete (e.g., all the information written to
// the socket so that no userspace buffers remain).
struct session_resources {
using pointer = ss::lw_shared_ptr<session_resources>;

ss::lowres_clock::duration backpressure_delay;
ss::semaphore_units<> memlocks;
ss::semaphore_units<> queue_units;
std::unique_ptr<hdr_hist::measurement> method_latency;
std::unique_ptr<request_tracker> tracker;
};

class connection_context final
: public ss::enable_lw_shared_from_this<connection_context> {
public:
Expand Down Expand Up @@ -131,49 +167,52 @@ class connection_context final
}

private:
// used to track number of pending requests
class request_tracker {
public:
explicit request_tracker(net::server_probe& probe) noexcept
: _probe(probe) {
_probe.request_received();
}
request_tracker(const request_tracker&) = delete;
request_tracker(request_tracker&&) = delete;
request_tracker& operator=(const request_tracker&) = delete;
request_tracker& operator=(request_tracker&&) = delete;

~request_tracker() noexcept { _probe.request_completed(); }

private:
net::server_probe& _probe;
};
// used to pass around some internal state
struct session_resources {
ss::lowres_clock::duration backpressure_delay;
ss::semaphore_units<> memlocks;
ss::semaphore_units<> queue_units;
std::unique_ptr<hdr_hist::measurement> method_latency;
std::unique_ptr<request_tracker> tracker;
};

/// called by throttle_request
ss::future<ss::semaphore_units<>> reserve_request_units(size_t size);

/// apply correct backpressure sequence
// Reserve units from memory from the memory semaphore in proportion
// to the number of bytes the request procesisng is expected to
// take.
ss::future<ss::semaphore_units<>>
reserve_request_units(api_key key, size_t size);

// Apply backpressure sequence, where the request processing may be
// delayed for various reasons, including throttling but also because
// too few server resources are available to accomodate the request
// currently.
// When the returned future resolves, the throttling period is over and
// the associated resouces have been obtained and are tracked by the
// contained session_resources object.
ss::future<session_resources>
throttle_request(const request_header&, size_t sz);

ss::future<> handle_mtls_auth();
ss::future<> dispatch_method_once(request_header, size_t sz);
ss::future<> process_next_response();

/**
* Process zero or more ready responses in request order.
*
* The future<> returned by this method resolves when all ready *and*
* in-order responses have been processed, which is not the same as all
* ready responses. In particular, responses which are ready may not be
* processed if there are earlier (lower sequence number) responses which
* are not yet ready: they will be processed by a future invocation.
*
* @return ss::future<> a future which as described above.
*/
ss::future<> maybe_process_responses();
ss::future<> do_process(request_context);

ss::future<> handle_auth_v0(size_t);

private:
/**
* Bundles together a response and its associated resources.
*/
struct response_and_resources {
response_ptr response;
session_resources::pointer resources;
};

using sequence_id = named_type<uint64_t, struct kafka_protocol_sequence>;
using map_t = absl::flat_hash_map<sequence_id, response_ptr>;
using map_t = absl::flat_hash_map<sequence_id, response_and_resources>;
travisdowns marked this conversation as resolved.
Show resolved Hide resolved

class ctx_log {
public:
Expand Down
7 changes: 0 additions & 7 deletions src/v/kafka/server/flex_versions.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,6 @@ namespace kafka {
/// requests will map to a value of api_key(-2)
static constexpr api_version invalid_api = api_version(-2);

template<typename... RequestTypes>
static constexpr size_t max_api_key(type_list<RequestTypes...>) {
/// Black magic here is an overload of std::max() that takes an
/// std::initializer_list
return std::max({RequestTypes::api::key()...});
}

template<typename... RequestTypes>
static constexpr auto
get_flexible_request_min_versions_list(type_list<RequestTypes...> r) {
Expand Down
6 changes: 4 additions & 2 deletions src/v/kafka/server/fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@

namespace kafka {

// sorted
class connection_context;
class coordinator_ntp_mapper;
class fetch_session_cache;
class group_manager;
class group_router;
class quota_manager;
class request_context;
class rm_group_frontend;
class rm_group_proxy_impl;
class request_context;
class quota_manager;

} // namespace kafka
Loading