From 7827f4dcf140825676db6b6c7b9839edee609491 Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Sun, 10 Jul 2022 22:27:57 -0700 Subject: [PATCH] Use a better estimator for metadata requests Currently we estimate that metadata requests take 8000 + rsize * 2 bytes of memory to process, where rsize is the size of the request. Since metadata requests are very small, this end up being roughly 8000 bytes. However, metadata requests which return information about every partition and replica may easily be several MBs in size. To fix this for metadata requests specifically, we use a new more conservative estimate which uses the current topic and partition configuration to give an upper bound on the size. --- src/v/kafka/server/handlers/metadata.cc | 65 +++++++++++++++++++++++++ src/v/kafka/server/handlers/metadata.h | 14 +++++- 2 files changed, 77 insertions(+), 2 deletions(-) diff --git a/src/v/kafka/server/handlers/metadata.cc b/src/v/kafka/server/handlers/metadata.cc index 7a6e674cc4987..e6bf79d1d467d 100644 --- a/src/v/kafka/server/handlers/metadata.cc +++ b/src/v/kafka/server/handlers/metadata.cc @@ -19,6 +19,7 @@ #include "kafka/server/handlers/details/leader_epoch.h" #include "kafka/server/handlers/details/security.h" #include "kafka/server/handlers/topics/topic_utils.h" +#include "kafka/server/response.h" #include "kafka/types.h" #include "likely.h" #include "model/metadata.h" @@ -423,4 +424,68 @@ ss::future metadata_handler::handle( co_return co_await ctx.respond(std::move(reply)); } +size_t +metadata_memory_estimator(size_t request_size, connection_context& conn_ctx) { + // We cannot make a precise estimate of the size of a metadata response by + // examining only the size of the request (nor even by examining the entire + // request) since the response depends on the number of partitions in the + // cluster. Instead, we return a conservative estimate based on the current + // number of topics & partitions in the cluster. + + // Essentially we need to estimate the size taken by a "maximum size" + // metadata_response_data response. The maximum size is when metadata for + // all topics is returned, which is also a common case in practice. This + // involves calculating the size for each topic's portion of the response, + // since the size varies both based on the number of partitions and the + // replica count. + + // We start with a base estimate of 10K and then proceed to ignore + // everything other than the topic/partition part of the response, since + // that's what takes space in large responses and we assume the remaining + // part of the response (the broker list being the second largest part) will + // fit in this 10000k slush fund. + size_t size_estimate = 10000; + + auto& md = conn_ctx.server().metadata_cache().all_topics_metadata(); + + for (auto& [tp_ns, topic_metadata] : md) { + // metadata_response_topic + size_estimate += sizeof(kafka::metadata_response_topic); + size_estimate += tp_ns.tp().size(); + + using partition = kafka::metadata_response_partition; + + // Base number of bytes needed to represent each partition, ignoring the + // variable part attributable to the replica count, we just take as the + // size of the partition response structure. + constexpr size_t bytes_per_partition = sizeof(partition); + + // Then, we need the number of additional bytes per replica, per + // partition, associated with storing the replica list in + // metadata_response_partition::replicas/isr_nodes, which we take to + // be the size of the elements in those lists (4 bytes each). + constexpr size_t bytes_per_replica = sizeof(partition::replica_nodes[0]) + + sizeof(partition::isr_nodes[0]); + + // The actual partition and replica count for this topic. + int32_t pcount = topic_metadata.get_configuration().partition_count; + int32_t rcount = topic_metadata.get_configuration().replication_factor; + + size_estimate += pcount + * (bytes_per_partition + bytes_per_replica * rcount); + } + + // Finally, we double the estimate, because the highwater mark for memory + // use comes when the in-memory structures (metadata_response_data and + // subobjects) exist on the heap and they are encoded into the reponse, + // which will also exist on the heap. The calculation above handles the + // first size, and the encoded response ends up being very similar in size, + // so we double the estimate to account for both. + size_estimate *= 2; + + // We still add on the default_estimate to handle the size of the request + // itself and miscellaneous other procesing (this is a small adjustment, + // generally ~8000 bytes). + return default_memory_estimate(request_size) + size_estimate; +} } // namespace kafka diff --git a/src/v/kafka/server/handlers/metadata.h b/src/v/kafka/server/handlers/metadata.h index 89445b193fd0f..24c71472e64af 100644 --- a/src/v/kafka/server/handlers/metadata.h +++ b/src/v/kafka/server/handlers/metadata.h @@ -14,6 +14,16 @@ namespace kafka { -using metadata_handler = handler; +/** + * Estimate the size of a metadata request. + * + * Metadata requests are generally very small (a request for *all* metadata + * about a cluster is less than 30 bytes) but the response may be very large, so + * the default estimator is unsuitable. See the implementation for further + * notes. + */ +memory_estimate_fn metadata_memory_estimator; + +using metadata_handler = handler; -} +} // namespace kafka