Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Td 4804 oom fix v2 #5464

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/v/kafka/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ set(handlers_srcs
server/handlers/delete_acls.cc
server/handlers/create_partitions.cc
server/handlers/offset_for_leader_epoch.cc
server/handlers/details/any_handler.cc
server/handlers/topics/types.cc
server/handlers/topics/topic_utils.cc
)
Expand Down
37 changes: 37 additions & 0 deletions src/v/kafka/cluster_limits.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2022 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/
#pragma once

#include <cinttypes>

// Some cluster limts.
//
// These are not hard limits in any sense but soft targets. Clusters may fail
// for a variety of reasons before hitting these limits and also may work beyond
// these limits. Rather, they are here to track our current assumptions about
// what "should" work and for use in calculations or heuristics relating to
// maximum sizes.
//
// For exmaple, when we accept a metadata request we don't know how large the
// response will be as it depends on the total number of partitions and topics
// in the system, so we can use the values here as soft upper bounds on the
// relevant values.
//

namespace kafka {

/**
* @brief Soft limit on the highest number of partitions across all topics in a
* cluster.
*/
static constexpr uint32_t max_clusterwide_partitions = 40000;

} // namespace kafka
104 changes: 72 additions & 32 deletions src/v/kafka/server/connection_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@
#include "bytes/iobuf.h"
#include "config/configuration.h"
#include "kafka/protocol/sasl_authenticate.h"
#include "kafka/server/handlers/details/any_handler.h"
#include "kafka/server/protocol.h"
#include "kafka/server/protocol_utils.h"
#include "kafka/server/quota_manager.h"
#include "kafka/server/request_context.h"
#include "kafka/server/response.h"
#include "security/exceptions.h"
#include "units.h"
#include "vlog.h"

#include <seastar/core/coroutine.hh>
#include <seastar/core/scattered_message.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/with_timeout.hh>

Expand Down Expand Up @@ -182,8 +185,9 @@ ss::future<> connection_context::handle_auth_v0(const size_t size) {
},
std::move(request_buf),
0s);
auto sres = session_resources{};
auto resp = co_await kafka::process_request(
std::move(ctx), _proto.smp_group())
std::move(ctx), _proto.smp_group(), sres)
.response;
auto data = std::move(*resp).release();
response.decode(std::move(data), version);
Expand Down Expand Up @@ -213,8 +217,7 @@ bool connection_context::is_finished_parsing() const {
return _rs.conn->input().eof() || _rs.abort_requested();
}

ss::future<connection_context::session_resources>
connection_context::throttle_request(
ss::future<session_resources> connection_context::throttle_request(
const request_header& hdr, size_t request_size) {
// update the throughput tracker for this client using the
// size of the current request and return any computed delay
Expand All @@ -236,8 +239,9 @@ connection_context::throttle_request(
}
auto track = track_latency(hdr.key);
return fut
.then(
[this, request_size] { return reserve_request_units(request_size); })
.then([this, key = hdr.key, request_size] {
return reserve_request_units(key, request_size);
})
.then([this, delay, track, tracker = std::move(tracker)](
ss::semaphore_units<> units) mutable {
return server().get_request_unit().then(
Expand All @@ -262,15 +266,21 @@ connection_context::throttle_request(
}

ss::future<ss::semaphore_units<>>
connection_context::reserve_request_units(size_t size) {
// Allow for extra copies and bookkeeping
auto mem_estimate = size * 2 + 8000; // NOLINT
if (mem_estimate >= (size_t)std::numeric_limits<int32_t>::max()) {
connection_context::reserve_request_units(api_key key, size_t size) {
// Defer to the handler for the request type for the memory estimate, but
// if the request isn't found, use the default estimate (although in that
// case the request is likely for an API we don't support or malformed, so
// it is likely to fail shortly anyway).
auto handler = handler_for_key(key);
auto mem_estimate = handler ? (*handler)->memory_estimate(size, *this)
: default_memory_estimate(size);
if (unlikely(mem_estimate >= (size_t)std::numeric_limits<int32_t>::max())) {
// TODO: Create error response using the specific API?
throw std::runtime_error(fmt::format(
"request too large > 1GB (size: {}; estimate: {})",
"request too large > 1GB (size: {}, estimate: {}, API: {})",
size,
mem_estimate));
mem_estimate,
handler ? (*handler)->name() : "<bad key>"));
}
auto fut = ss::get_units(_rs.memory(), mem_estimate);
if (_rs.memory().waiters()) {
Expand All @@ -282,11 +292,15 @@ connection_context::reserve_request_units(size_t size) {
ss::future<>
connection_context::dispatch_method_once(request_header hdr, size_t size) {
return throttle_request(hdr, size).then([this, hdr = std::move(hdr), size](
session_resources sres) mutable {
session_resources
sres_in) mutable {
if (_rs.abort_requested()) {
// protect against shutdown behavior
return ss::make_ready_future<>();
}

auto sres = ss::make_lw_shared(std::move(sres_in));

auto remaining = size - request_header_size
- hdr.client_id_buffer.size() - hdr.tags_size_bytes;
return read_iobuf_exactly(_rs.conn->input(), remaining)
Expand All @@ -298,7 +312,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) {
}
auto self = shared_from_this();
auto rctx = request_context(
self, std::move(hdr), std::move(buf), sres.backpressure_delay);
self, std::move(hdr), std::move(buf), sres->backpressure_delay);
/*
* we process requests in order since all subsequent requests
* are dependent on authentication having completed.
Expand All @@ -323,7 +337,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) {
const sequence_id seq = _seq_idx;
_seq_idx = _seq_idx + sequence_id(1);
auto res = kafka::process_request(
std::move(rctx), _proto.smp_group());
std::move(rctx), _proto.smp_group(), *sres);
/**
* first stage processed in a foreground.
*/
Expand All @@ -333,7 +347,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) {
seq,
correlation,
self,
s = std::move(sres)](ss::future<> d) mutable {
sres = std::move(sres)](ss::future<> d) mutable {
/*
* if the dispatch/first stage failed, then we need to
* need to consume the second stage since it might be
Expand Down Expand Up @@ -362,13 +376,22 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) {
ssx::background
= ssx::spawn_with_gate_then(
_rs.conn_gate(),
[this, f = std::move(f), seq, correlation]() mutable {
return f.then([this, seq, correlation](
response_ptr r) mutable {
r->set_correlation(correlation);
_responses.insert({seq, std::move(r)});
return process_next_response();
});
[this,
f = std::move(f),
sres = std::move(sres),
seq,
correlation]() mutable {
return f.then(
[this,
sres = std::move(sres),
seq,
correlation](response_ptr r) mutable {
r->set_correlation(correlation);
response_and_resources randr{
std::move(r), std::move(sres)};
_responses.insert({seq, std::move(randr)});
return maybe_process_responses();
});
})
.handle_exception([self](std::exception_ptr e) {
// ssx::spawn_with_gate already caught
Expand Down Expand Up @@ -397,8 +420,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) {

self->_rs.probe().service_error();
self->_rs.conn->shutdown_input();
})
.finally([s = std::move(s), self] {});
});
return d;
})
.handle_exception([self](std::exception_ptr e) {
Expand All @@ -410,7 +432,20 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) {
});
}

ss::future<> connection_context::process_next_response() {
/**
* This method processes as many responses as possible, in request order. Since
* we proces the second stage asynchronously within a given connection, reponses
* may become ready out of order, but Kafka clients expect responses exactly in
* request order.
*
* The _responses queue handles that: responses are enqueued there in completion
* order, but only sent to the client in response order. So this method, called
* after every response is ready, may end up sending zero, one or more requests,
* depending on the completion order.
*
* @return ss::future<>
*/
ss::future<> connection_context::maybe_process_responses() {
return ss::repeat([this]() mutable {
auto it = _responses.find(_next_response);
if (it == _responses.end()) {
Expand All @@ -420,20 +455,25 @@ ss::future<> connection_context::process_next_response() {
// found one; increment counter
_next_response = _next_response + sequence_id(1);

auto r = std::move(it->second);
auto resp_and_res = std::move(it->second);

_responses.erase(it);

if (r->is_noop()) {
if (resp_and_res.response->is_noop()) {
return ss::make_ready_future<ss::stop_iteration>(
ss::stop_iteration::no);
}

auto msg = response_as_scattered(std::move(r));
auto msg = response_as_scattered(std::move(resp_and_res.response));
try {
return _rs.conn->write(std::move(msg)).then([] {
return ss::make_ready_future<ss::stop_iteration>(
ss::stop_iteration::no);
});
return _rs.conn->write(std::move(msg))
.then([] {
return ss::make_ready_future<ss::stop_iteration>(
ss::stop_iteration::no);
})
// release the resources only once it has been written to the
// connection.
.finally([resources = std::move(resp_and_res.resources)] {});
} catch (...) {
vlog(
klog.debug,
Expand Down
Loading