Skip to content

Commit

Permalink
Base metadata size estimate on topic_table
Browse files Browse the repository at this point in the history
Rather than use a hardcoded estimate, we assume metadata responses
will have a worse case proportional to the amount of  topics and
partitions in the system.
  • Loading branch information
travisdowns committed Jul 14, 2022
1 parent 5429713 commit 2db08a3
Showing 1 changed file with 61 additions and 23 deletions.
84 changes: 61 additions & 23 deletions src/v/kafka/server/handlers/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "kafka/server/handlers/details/leader_epoch.h"
#include "kafka/server/handlers/details/security.h"
#include "kafka/server/handlers/topics/topic_utils.h"
#include "kafka/server/response.h"
#include "kafka/types.h"
#include "likely.h"
#include "model/metadata.h"
Expand Down Expand Up @@ -424,31 +425,68 @@ ss::future<response_ptr> metadata_handler::handle(
co_return co_await ctx.respond(std::move(reply));
}

size_t metadata_memory_estimator(size_t, connection_context&) {
size_t
metadata_memory_estimator(size_t request_size, connection_context& conn_ctx) {
// We cannot make a precise estimate of the size of a metadata response by
// examining only the size of the request (nor even by examining the entire
// request) since the response depends on the number of partitions in the
// cluster. Instead, we return a conservative estimate based on the soft
// limit of total cluster-wide partition counts, mutiplied by an empirical
// "bytes per partition".
//
// The bytes per partition is roughly split evenly between the number of
// bytes needed to encode the response (about 100 bytes per partition,
// assuming 3 replicas) as well as the input structure to the encoder (i.e.,
// the in-memory representation of that same metadata).
//
// This estimate is additionally inexact in the sense that more a higher
// replication factor would increase the per-partition size without bound.
// An additional inaccuracy is that we don't consider topics but lump all
// partitions together: this works for moderate to large partition counts,
// but it likely that a cluster with (for example) many topics with only 1
// partition each might produce a larger metadata response than this
// calculation accounts for.

// Empirical highwater memory allocated per partition while processing a
// metadata response.
constexpr size_t bytes_per_partition = 200;

return max_clusterwide_partitions * bytes_per_partition;
// cluster. Instead, we return a conservative estimate based on the current
// number of topics & partitions in the cluster.

// Essentially we need to estimate the size taken by a "maximum size"
// metadata_response_data response. The maximum size is when metadata for
// all topics is returned, which is also a common case in practice. This
// involves calculating the size for each topic's portion of the response,
// since the size varies both based on the number of partitions and the
// replica count.

// We start with a base estimate of 10K and then proceed to ignore
// everything other than the topic/partition part of the response, since
// that's what takes space in large responses and we assume the remaining
// part of the response (the broker list being the second largest part) will
// fit in this 10000k slush fund.
size_t size_estimate = 10000;

auto& md = conn_ctx.server().metadata_cache().all_topics_metadata();

for (auto& [tp_ns, topic_metadata] : md) {
// metadata_response_topic
size_estimate += sizeof(kafka::metadata_response_topic);
size_estimate += tp_ns.tp().size();

using partition = kafka::metadata_response_partition;

// Base number of bytes needed to represent each partition, ignoring the
// variable part attributable to the replica count, we just take as the
// size of the partition response structure.
constexpr size_t bytes_per_partition = sizeof(partition);

// Then, we need the number of additional bytes per replica, per
// partition, associated with storing the replica list in
// metadata_response_partition::replicas/isr_nodes, which we take to
// be the size of the elements in those lists (4 bytes each).
constexpr size_t bytes_per_replica = sizeof(partition::replica_nodes[0])
+ sizeof(partition::isr_nodes[0]);

// The actual partition and replica count for this topic.
int32_t pcount = topic_metadata.get_configuration().partition_count;
int32_t rcount = topic_metadata.get_configuration().replication_factor;

size_estimate += pcount
* (bytes_per_partition + bytes_per_replica * rcount);
}

// Finally, we double the estimate, because the highwater mark for memory
// use comes when the in-memory structures (metadata_response_data and
// subobjects) exist on the heap and they are encoded into the reponse,
// which will also exist on the heap. The calculation above handles the
// first size, and the encoded response ends up being very similar in size,
// so we double the estimate to account for both.
size_estimate *= 2;

// We still add on the default_estimate to handle the size of the request
// itself and miscellaneous other procesing (this is a small adjustment,
// generally ~8000 bytes).
return default_memory_estimate(request_size) + size_estimate;
}
} // namespace kafka

0 comments on commit 2db08a3

Please sign in to comment.