From 7f3f8fac67d6045848ac7d8ee345091d0c4e9cbe Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Fri, 22 Apr 2022 20:06:20 +0200 Subject: [PATCH] c/dissemination: do not query topic metadata for ntp Improve calculation of not overlapping nodes in metadata dissemination service. Previously for each partition we queried the whole topic metadata object which contain all the partitions metadata. One of the partition info was retrieved and whole object was destroyed. Signed-off-by: Michal Maslanka --- .../cluster/metadata_dissemination_service.cc | 10 ++-- src/v/cluster/metadata_dissemination_utils.cc | 38 ++++++--------- src/v/cluster/metadata_dissemination_utils.h | 9 ++-- .../metadata_dissemination_utils_test.cc | 46 +++++++------------ 4 files changed, 39 insertions(+), 64 deletions(-) diff --git a/src/v/cluster/metadata_dissemination_service.cc b/src/v/cluster/metadata_dissemination_service.cc index 0abe7a72878a..f53ec9abf42c 100644 --- a/src/v/cluster/metadata_dissemination_service.cc +++ b/src/v/cluster/metadata_dissemination_service.cc @@ -287,15 +287,15 @@ metadata_dissemination_service::dispatch_get_metadata_update( void metadata_dissemination_service::collect_pending_updates() { auto brokers = _members_table.local().all_broker_ids(); for (auto& ntp_leader : _requests) { - auto tp_md = _topics.local().get_topic_metadata( - model::topic_namespace_view(ntp_leader.ntp)); + auto assignment = _topics.local().get_partition_assignment( + ntp_leader.ntp); - if (!tp_md) { - // Topic metadata is not there anymore, partition was removed + if (!assignment) { + // Partition was removed, skip dissemination continue; } auto non_overlapping = calculate_non_overlapping_nodes( - get_partition_members(ntp_leader.ntp.tp.partition, *tp_md), brokers); + *assignment, brokers); /** * remove current node from non overlapping list, current node may be diff --git a/src/v/cluster/metadata_dissemination_utils.cc b/src/v/cluster/metadata_dissemination_utils.cc index 2ad16d991ede..d5f43991b768 100644 --- a/src/v/cluster/metadata_dissemination_utils.cc +++ b/src/v/cluster/metadata_dissemination_utils.cc @@ -9,23 +9,29 @@ #include "cluster/metadata_dissemination_utils.h" +#include "cluster/types.h" #include "likely.h" +#include "model/metadata.h" #include namespace cluster { std::vector calculate_non_overlapping_nodes( - const std::vector& partition_members, + const partition_assignment& partition_members, const std::vector& all_nodes) { std::vector non_overlapping; - non_overlapping.reserve(all_nodes.size() - partition_members.size()); + non_overlapping.reserve( + all_nodes.size() - partition_members.replicas.size()); for (auto& n : all_nodes) { - const bool contains = std::find( - std::cbegin(partition_members), - std::cend(partition_members), - n) - != std::cend(partition_members); + const bool contains = std::find_if( + partition_members.replicas.begin(), + partition_members.replicas.end(), + [n](const model::broker_shard& bs) { + return bs.node_id == n; + }) + != partition_members.replicas.end(); + if (!contains) { // This node is not a partition member non_overlapping.push_back(n); @@ -34,20 +40,4 @@ std::vector calculate_non_overlapping_nodes( return non_overlapping; } -std::vector get_partition_members( - model::partition_id pid, const model::topic_metadata& tp_md) { - std::vector members; - if (unlikely((size_t)pid() >= tp_md.partitions.size())) { - throw std::invalid_argument(fmt::format( - "Topic {} does not contain partion {}", tp_md.tp_ns, pid)); - } - auto& replicas = tp_md.partitions[pid()].replicas; - std::transform( - std::cbegin(replicas), - std::cend(replicas), - std::back_inserter(members), - [](model::broker_shard bs) { return bs.node_id; }); - - return members; -} -} // namespace cluster \ No newline at end of file +} // namespace cluster diff --git a/src/v/cluster/metadata_dissemination_utils.h b/src/v/cluster/metadata_dissemination_utils.h index d88db4861112..f5e89f70fd48 100644 --- a/src/v/cluster/metadata_dissemination_utils.h +++ b/src/v/cluster/metadata_dissemination_utils.h @@ -11,6 +11,7 @@ #pragma once +#include "cluster/types.h" #include "model/fundamental.h" #include "model/metadata.h" @@ -19,11 +20,7 @@ namespace cluster { // Calculate vector of nodes that belongs to the cluster but are not partition // replica set members std::vector calculate_non_overlapping_nodes( - const std::vector& partition_members, + const partition_assignment& partition_members, const std::vector& all_nodes); -// Returns a vector of nodes that are members of partition replica set -std::vector get_partition_members( - model::partition_id pid, const model::topic_metadata& tp_md); - -} // namespace cluster \ No newline at end of file +} // namespace cluster diff --git a/src/v/cluster/tests/metadata_dissemination_utils_test.cc b/src/v/cluster/tests/metadata_dissemination_utils_test.cc index 45a0d50b7d01..cd075b54e745 100644 --- a/src/v/cluster/tests/metadata_dissemination_utils_test.cc +++ b/src/v/cluster/tests/metadata_dissemination_utils_test.cc @@ -8,6 +8,7 @@ // by the Apache License, Version 2.0 #include "model/fundamental.h" +#include "random/generators.h" #include @@ -18,6 +19,19 @@ #include +cluster::partition_assignment +make_assignment(const std::vector& nodes) { + cluster::partition_assignment p_as{ + .group = raft::group_id(1), .id = model::partition_id(1)}; + p_as.replicas.reserve(nodes.size()); + for (auto n : nodes) { + p_as.replicas.push_back(model::broker_shard{ + .node_id = n, .shard = random_generators::get_int(10)}); + } + + return p_as; +} + BOOST_AUTO_TEST_CASE(test_calculation_non_overlapping_nodes) { std::vector cluster_nodes{ model::node_id{0}, @@ -30,7 +44,7 @@ BOOST_AUTO_TEST_CASE(test_calculation_non_overlapping_nodes) { model::node_id{0}, model::node_id{3}, model::node_id{4}}; auto non_overlapping_1 = cluster::calculate_non_overlapping_nodes( - single_node, cluster_nodes); + make_assignment(single_node), cluster_nodes); std::vector expected_1{ model::node_id{0}, model::node_id{1}, @@ -40,7 +54,7 @@ BOOST_AUTO_TEST_CASE(test_calculation_non_overlapping_nodes) { BOOST_REQUIRE_EQUAL(non_overlapping_1, expected_1); auto non_overlapping_2 = cluster::calculate_non_overlapping_nodes( - three_nodes, cluster_nodes); + make_assignment(three_nodes), cluster_nodes); std::vector expected_2{ model::node_id{1}, model::node_id{2}, @@ -49,33 +63,7 @@ BOOST_AUTO_TEST_CASE(test_calculation_non_overlapping_nodes) { BOOST_REQUIRE_EQUAL(non_overlapping_2, expected_2); auto non_overlapping_3 = cluster::calculate_non_overlapping_nodes( - cluster_nodes, cluster_nodes); + make_assignment(cluster_nodes), cluster_nodes); BOOST_REQUIRE_EQUAL(non_overlapping_3.size(), 0); }; - -BOOST_AUTO_TEST_CASE(test_get_partition_members) { - model::topic_metadata tp_md( - model::topic_namespace(model::ns("test-ns"), model::topic("test_tp"))); - - auto p0 = model::partition_metadata(model::partition_id(0)); - p0.replicas = { - model::broker_shard{model::node_id{0}, 1}, - model::broker_shard{model::node_id{1}, 1}, - model::broker_shard{model::node_id{2}, 1}, - }; - auto p1 = model::partition_metadata(model::partition_id(1)); - p1.replicas = { - model::broker_shard{model::node_id{3}, 1}, - model::broker_shard{model::node_id{4}, 1}, - model::broker_shard{model::node_id{5}, 1}, - }; - tp_md.partitions = {p0, p1}; - - auto members = cluster::get_partition_members( - model::partition_id{1}, tp_md); - std::vector expected = { - model::node_id{3}, model::node_id{4}, model::node_id{5}}; - - BOOST_REQUIRE_EQUAL(members, expected); -}; \ No newline at end of file