From 2db08a318ccd0d90d761276bcb61b906dba33d82 Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Wed, 13 Jul 2022 23:33:17 -0700 Subject: [PATCH] Base metadata size estimate on topic_table Rather than use a hardcoded estimate, we assume metadata responses will have a worse case proportional to the amount of topics and partitions in the system. --- src/v/kafka/server/handlers/metadata.cc | 84 ++++++++++++++++++------- 1 file changed, 61 insertions(+), 23 deletions(-) diff --git a/src/v/kafka/server/handlers/metadata.cc b/src/v/kafka/server/handlers/metadata.cc index abf0ad13142e9..705fa01b2687a 100644 --- a/src/v/kafka/server/handlers/metadata.cc +++ b/src/v/kafka/server/handlers/metadata.cc @@ -20,6 +20,7 @@ #include "kafka/server/handlers/details/leader_epoch.h" #include "kafka/server/handlers/details/security.h" #include "kafka/server/handlers/topics/topic_utils.h" +#include "kafka/server/response.h" #include "kafka/types.h" #include "likely.h" #include "model/metadata.h" @@ -424,31 +425,68 @@ ss::future metadata_handler::handle( co_return co_await ctx.respond(std::move(reply)); } -size_t metadata_memory_estimator(size_t, connection_context&) { +size_t +metadata_memory_estimator(size_t request_size, connection_context& conn_ctx) { // We cannot make a precise estimate of the size of a metadata response by // examining only the size of the request (nor even by examining the entire // request) since the response depends on the number of partitions in the - // cluster. Instead, we return a conservative estimate based on the soft - // limit of total cluster-wide partition counts, mutiplied by an empirical - // "bytes per partition". - // - // The bytes per partition is roughly split evenly between the number of - // bytes needed to encode the response (about 100 bytes per partition, - // assuming 3 replicas) as well as the input structure to the encoder (i.e., - // the in-memory representation of that same metadata). - // - // This estimate is additionally inexact in the sense that more a higher - // replication factor would increase the per-partition size without bound. - // An additional inaccuracy is that we don't consider topics but lump all - // partitions together: this works for moderate to large partition counts, - // but it likely that a cluster with (for example) many topics with only 1 - // partition each might produce a larger metadata response than this - // calculation accounts for. - - // Empirical highwater memory allocated per partition while processing a - // metadata response. - constexpr size_t bytes_per_partition = 200; - - return max_clusterwide_partitions * bytes_per_partition; + // cluster. Instead, we return a conservative estimate based on the current + // number of topics & partitions in the cluster. + + // Essentially we need to estimate the size taken by a "maximum size" + // metadata_response_data response. The maximum size is when metadata for + // all topics is returned, which is also a common case in practice. This + // involves calculating the size for each topic's portion of the response, + // since the size varies both based on the number of partitions and the + // replica count. + + // We start with a base estimate of 10K and then proceed to ignore + // everything other than the topic/partition part of the response, since + // that's what takes space in large responses and we assume the remaining + // part of the response (the broker list being the second largest part) will + // fit in this 10000k slush fund. + size_t size_estimate = 10000; + + auto& md = conn_ctx.server().metadata_cache().all_topics_metadata(); + + for (auto& [tp_ns, topic_metadata] : md) { + // metadata_response_topic + size_estimate += sizeof(kafka::metadata_response_topic); + size_estimate += tp_ns.tp().size(); + + using partition = kafka::metadata_response_partition; + + // Base number of bytes needed to represent each partition, ignoring the + // variable part attributable to the replica count, we just take as the + // size of the partition response structure. + constexpr size_t bytes_per_partition = sizeof(partition); + + // Then, we need the number of additional bytes per replica, per + // partition, associated with storing the replica list in + // metadata_response_partition::replicas/isr_nodes, which we take to + // be the size of the elements in those lists (4 bytes each). + constexpr size_t bytes_per_replica = sizeof(partition::replica_nodes[0]) + + sizeof(partition::isr_nodes[0]); + + // The actual partition and replica count for this topic. + int32_t pcount = topic_metadata.get_configuration().partition_count; + int32_t rcount = topic_metadata.get_configuration().replication_factor; + + size_estimate += pcount + * (bytes_per_partition + bytes_per_replica * rcount); + } + + // Finally, we double the estimate, because the highwater mark for memory + // use comes when the in-memory structures (metadata_response_data and + // subobjects) exist on the heap and they are encoded into the reponse, + // which will also exist on the heap. The calculation above handles the + // first size, and the encoded response ends up being very similar in size, + // so we double the estimate to account for both. + size_estimate *= 2; + + // We still add on the default_estimate to handle the size of the request + // itself and miscellaneous other procesing (this is a small adjustment, + // generally ~8000 bytes). + return default_memory_estimate(request_size) + size_estimate; } } // namespace kafka