Skip to content

Commit

Permalink
Use a better estimator for metadata requests
Browse files Browse the repository at this point in the history
Currently we estimate that metadata requests take 8000 + rsize * 2 bytes
of memory to process, where rsize is the size of the request. Since
metadata requests are very small, this end up being roughly 8000 bytes.

However, metadata requests which return information about every
partition and replica may easily be several MBs in size.

To fix this for metadata requests specifically, we use a new more
conservative estimate which uses the current topic and partition
configuration to give an upper bound on the size.
  • Loading branch information
travisdowns committed Jul 14, 2022
1 parent ec67a5b commit 7827f4d
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 2 deletions.
65 changes: 65 additions & 0 deletions src/v/kafka/server/handlers/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "kafka/server/handlers/details/leader_epoch.h"
#include "kafka/server/handlers/details/security.h"
#include "kafka/server/handlers/topics/topic_utils.h"
#include "kafka/server/response.h"
#include "kafka/types.h"
#include "likely.h"
#include "model/metadata.h"
Expand Down Expand Up @@ -423,4 +424,68 @@ ss::future<response_ptr> metadata_handler::handle(
co_return co_await ctx.respond(std::move(reply));
}

size_t
metadata_memory_estimator(size_t request_size, connection_context& conn_ctx) {
// We cannot make a precise estimate of the size of a metadata response by
// examining only the size of the request (nor even by examining the entire
// request) since the response depends on the number of partitions in the
// cluster. Instead, we return a conservative estimate based on the current
// number of topics & partitions in the cluster.

// Essentially we need to estimate the size taken by a "maximum size"
// metadata_response_data response. The maximum size is when metadata for
// all topics is returned, which is also a common case in practice. This
// involves calculating the size for each topic's portion of the response,
// since the size varies both based on the number of partitions and the
// replica count.

// We start with a base estimate of 10K and then proceed to ignore
// everything other than the topic/partition part of the response, since
// that's what takes space in large responses and we assume the remaining
// part of the response (the broker list being the second largest part) will
// fit in this 10000k slush fund.
size_t size_estimate = 10000;

auto& md = conn_ctx.server().metadata_cache().all_topics_metadata();

for (auto& [tp_ns, topic_metadata] : md) {
// metadata_response_topic
size_estimate += sizeof(kafka::metadata_response_topic);
size_estimate += tp_ns.tp().size();

using partition = kafka::metadata_response_partition;

// Base number of bytes needed to represent each partition, ignoring the
// variable part attributable to the replica count, we just take as the
// size of the partition response structure.
constexpr size_t bytes_per_partition = sizeof(partition);

// Then, we need the number of additional bytes per replica, per
// partition, associated with storing the replica list in
// metadata_response_partition::replicas/isr_nodes, which we take to
// be the size of the elements in those lists (4 bytes each).
constexpr size_t bytes_per_replica = sizeof(partition::replica_nodes[0])
+ sizeof(partition::isr_nodes[0]);

// The actual partition and replica count for this topic.
int32_t pcount = topic_metadata.get_configuration().partition_count;
int32_t rcount = topic_metadata.get_configuration().replication_factor;

size_estimate += pcount
* (bytes_per_partition + bytes_per_replica * rcount);
}

// Finally, we double the estimate, because the highwater mark for memory
// use comes when the in-memory structures (metadata_response_data and
// subobjects) exist on the heap and they are encoded into the reponse,
// which will also exist on the heap. The calculation above handles the
// first size, and the encoded response ends up being very similar in size,
// so we double the estimate to account for both.
size_estimate *= 2;

// We still add on the default_estimate to handle the size of the request
// itself and miscellaneous other procesing (this is a small adjustment,
// generally ~8000 bytes).
return default_memory_estimate(request_size) + size_estimate;
}
} // namespace kafka
14 changes: 12 additions & 2 deletions src/v/kafka/server/handlers/metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,16 @@

namespace kafka {

using metadata_handler = handler<metadata_api, 0, 7>;
/**
* Estimate the size of a metadata request.
*
* Metadata requests are generally very small (a request for *all* metadata
* about a cluster is less than 30 bytes) but the response may be very large, so
* the default estimator is unsuitable. See the implementation for further
* notes.
*/
memory_estimate_fn metadata_memory_estimator;

using metadata_handler = handler<metadata_api, 0, 7, metadata_memory_estimator>;

}
} // namespace kafka

0 comments on commit 7827f4d

Please sign in to comment.