diff --git a/src/v/kafka/server/handlers/metadata.cc b/src/v/kafka/server/handlers/metadata.cc index 7a6e674cc4987..e6bf79d1d467d 100644 --- a/src/v/kafka/server/handlers/metadata.cc +++ b/src/v/kafka/server/handlers/metadata.cc @@ -19,6 +19,7 @@ #include "kafka/server/handlers/details/leader_epoch.h" #include "kafka/server/handlers/details/security.h" #include "kafka/server/handlers/topics/topic_utils.h" +#include "kafka/server/response.h" #include "kafka/types.h" #include "likely.h" #include "model/metadata.h" @@ -423,4 +424,68 @@ ss::future metadata_handler::handle( co_return co_await ctx.respond(std::move(reply)); } +size_t +metadata_memory_estimator(size_t request_size, connection_context& conn_ctx) { + // We cannot make a precise estimate of the size of a metadata response by + // examining only the size of the request (nor even by examining the entire + // request) since the response depends on the number of partitions in the + // cluster. Instead, we return a conservative estimate based on the current + // number of topics & partitions in the cluster. + + // Essentially we need to estimate the size taken by a "maximum size" + // metadata_response_data response. The maximum size is when metadata for + // all topics is returned, which is also a common case in practice. This + // involves calculating the size for each topic's portion of the response, + // since the size varies both based on the number of partitions and the + // replica count. + + // We start with a base estimate of 10K and then proceed to ignore + // everything other than the topic/partition part of the response, since + // that's what takes space in large responses and we assume the remaining + // part of the response (the broker list being the second largest part) will + // fit in this 10000k slush fund. + size_t size_estimate = 10000; + + auto& md = conn_ctx.server().metadata_cache().all_topics_metadata(); + + for (auto& [tp_ns, topic_metadata] : md) { + // metadata_response_topic + size_estimate += sizeof(kafka::metadata_response_topic); + size_estimate += tp_ns.tp().size(); + + using partition = kafka::metadata_response_partition; + + // Base number of bytes needed to represent each partition, ignoring the + // variable part attributable to the replica count, we just take as the + // size of the partition response structure. + constexpr size_t bytes_per_partition = sizeof(partition); + + // Then, we need the number of additional bytes per replica, per + // partition, associated with storing the replica list in + // metadata_response_partition::replicas/isr_nodes, which we take to + // be the size of the elements in those lists (4 bytes each). + constexpr size_t bytes_per_replica = sizeof(partition::replica_nodes[0]) + + sizeof(partition::isr_nodes[0]); + + // The actual partition and replica count for this topic. + int32_t pcount = topic_metadata.get_configuration().partition_count; + int32_t rcount = topic_metadata.get_configuration().replication_factor; + + size_estimate += pcount + * (bytes_per_partition + bytes_per_replica * rcount); + } + + // Finally, we double the estimate, because the highwater mark for memory + // use comes when the in-memory structures (metadata_response_data and + // subobjects) exist on the heap and they are encoded into the reponse, + // which will also exist on the heap. The calculation above handles the + // first size, and the encoded response ends up being very similar in size, + // so we double the estimate to account for both. + size_estimate *= 2; + + // We still add on the default_estimate to handle the size of the request + // itself and miscellaneous other procesing (this is a small adjustment, + // generally ~8000 bytes). + return default_memory_estimate(request_size) + size_estimate; +} } // namespace kafka diff --git a/src/v/kafka/server/handlers/metadata.h b/src/v/kafka/server/handlers/metadata.h index 89445b193fd0f..24c71472e64af 100644 --- a/src/v/kafka/server/handlers/metadata.h +++ b/src/v/kafka/server/handlers/metadata.h @@ -14,6 +14,16 @@ namespace kafka { -using metadata_handler = handler; +/** + * Estimate the size of a metadata request. + * + * Metadata requests are generally very small (a request for *all* metadata + * about a cluster is less than 30 bytes) but the response may be very large, so + * the default estimator is unsuitable. See the implementation for further + * notes. + */ +memory_estimate_fn metadata_memory_estimator; + +using metadata_handler = handler; -} +} // namespace kafka