Skip to content

Commit

Permalink
Merge pull request #4389 from mmaslankaprv/do-not-query-topic-metadata
Browse files Browse the repository at this point in the history
c/dissemination: do not query topic metadata for ntp
  • Loading branch information
mmaslankaprv committed Apr 26, 2022
2 parents bd2f0e6 + 7f3f8fa commit ddebd78
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 64 deletions.
10 changes: 5 additions & 5 deletions src/v/cluster/metadata_dissemination_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -287,15 +287,15 @@ metadata_dissemination_service::dispatch_get_metadata_update(
void metadata_dissemination_service::collect_pending_updates() {
auto brokers = _members_table.local().all_broker_ids();
for (auto& ntp_leader : _requests) {
auto tp_md = _topics.local().get_topic_metadata(
model::topic_namespace_view(ntp_leader.ntp));
auto assignment = _topics.local().get_partition_assignment(
ntp_leader.ntp);

if (!tp_md) {
// Topic metadata is not there anymore, partition was removed
if (!assignment) {
// Partition was removed, skip dissemination
continue;
}
auto non_overlapping = calculate_non_overlapping_nodes(
get_partition_members(ntp_leader.ntp.tp.partition, *tp_md), brokers);
*assignment, brokers);

/**
* remove current node from non overlapping list, current node may be
Expand Down
38 changes: 14 additions & 24 deletions src/v/cluster/metadata_dissemination_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,29 @@

#include "cluster/metadata_dissemination_utils.h"

#include "cluster/types.h"
#include "likely.h"
#include "model/metadata.h"

#include <fmt/core.h>

namespace cluster {

std::vector<model::node_id> calculate_non_overlapping_nodes(
const std::vector<model::node_id>& partition_members,
const partition_assignment& partition_members,
const std::vector<model::node_id>& all_nodes) {
std::vector<model::node_id> non_overlapping;
non_overlapping.reserve(all_nodes.size() - partition_members.size());
non_overlapping.reserve(
all_nodes.size() - partition_members.replicas.size());
for (auto& n : all_nodes) {
const bool contains = std::find(
std::cbegin(partition_members),
std::cend(partition_members),
n)
!= std::cend(partition_members);
const bool contains = std::find_if(
partition_members.replicas.begin(),
partition_members.replicas.end(),
[n](const model::broker_shard& bs) {
return bs.node_id == n;
})
!= partition_members.replicas.end();

if (!contains) {
// This node is not a partition member
non_overlapping.push_back(n);
Expand All @@ -34,20 +40,4 @@ std::vector<model::node_id> calculate_non_overlapping_nodes(
return non_overlapping;
}

std::vector<model::node_id> get_partition_members(
model::partition_id pid, const model::topic_metadata& tp_md) {
std::vector<model::node_id> members;
if (unlikely((size_t)pid() >= tp_md.partitions.size())) {
throw std::invalid_argument(fmt::format(
"Topic {} does not contain partion {}", tp_md.tp_ns, pid));
}
auto& replicas = tp_md.partitions[pid()].replicas;
std::transform(
std::cbegin(replicas),
std::cend(replicas),
std::back_inserter(members),
[](model::broker_shard bs) { return bs.node_id; });

return members;
}
} // namespace cluster
} // namespace cluster
9 changes: 3 additions & 6 deletions src/v/cluster/metadata_dissemination_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#pragma once

#include "cluster/types.h"
#include "model/fundamental.h"
#include "model/metadata.h"

Expand All @@ -19,11 +20,7 @@ namespace cluster {
// Calculate vector of nodes that belongs to the cluster but are not partition
// replica set members
std::vector<model::node_id> calculate_non_overlapping_nodes(
const std::vector<model::node_id>& partition_members,
const partition_assignment& partition_members,
const std::vector<model::node_id>& all_nodes);

// Returns a vector of nodes that are members of partition replica set
std::vector<model::node_id> get_partition_members(
model::partition_id pid, const model::topic_metadata& tp_md);

} // namespace cluster
} // namespace cluster
46 changes: 17 additions & 29 deletions src/v/cluster/tests/metadata_dissemination_utils_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// by the Apache License, Version 2.0

#include "model/fundamental.h"
#include "random/generators.h"

#include <boost/test/tools/old/interface.hpp>

Expand All @@ -18,6 +19,19 @@

#include <boost/test/unit_test.hpp>

cluster::partition_assignment
make_assignment(const std::vector<model::node_id>& nodes) {
cluster::partition_assignment p_as{
.group = raft::group_id(1), .id = model::partition_id(1)};
p_as.replicas.reserve(nodes.size());
for (auto n : nodes) {
p_as.replicas.push_back(model::broker_shard{
.node_id = n, .shard = random_generators::get_int<uint32_t>(10)});
}

return p_as;
}

BOOST_AUTO_TEST_CASE(test_calculation_non_overlapping_nodes) {
std::vector<model::node_id> cluster_nodes{
model::node_id{0},
Expand All @@ -30,7 +44,7 @@ BOOST_AUTO_TEST_CASE(test_calculation_non_overlapping_nodes) {
model::node_id{0}, model::node_id{3}, model::node_id{4}};

auto non_overlapping_1 = cluster::calculate_non_overlapping_nodes(
single_node, cluster_nodes);
make_assignment(single_node), cluster_nodes);
std::vector<model::node_id> expected_1{
model::node_id{0},
model::node_id{1},
Expand All @@ -40,7 +54,7 @@ BOOST_AUTO_TEST_CASE(test_calculation_non_overlapping_nodes) {
BOOST_REQUIRE_EQUAL(non_overlapping_1, expected_1);

auto non_overlapping_2 = cluster::calculate_non_overlapping_nodes(
three_nodes, cluster_nodes);
make_assignment(three_nodes), cluster_nodes);
std::vector<model::node_id> expected_2{
model::node_id{1},
model::node_id{2},
Expand All @@ -49,33 +63,7 @@ BOOST_AUTO_TEST_CASE(test_calculation_non_overlapping_nodes) {
BOOST_REQUIRE_EQUAL(non_overlapping_2, expected_2);

auto non_overlapping_3 = cluster::calculate_non_overlapping_nodes(
cluster_nodes, cluster_nodes);
make_assignment(cluster_nodes), cluster_nodes);

BOOST_REQUIRE_EQUAL(non_overlapping_3.size(), 0);
};

BOOST_AUTO_TEST_CASE(test_get_partition_members) {
model::topic_metadata tp_md(
model::topic_namespace(model::ns("test-ns"), model::topic("test_tp")));

auto p0 = model::partition_metadata(model::partition_id(0));
p0.replicas = {
model::broker_shard{model::node_id{0}, 1},
model::broker_shard{model::node_id{1}, 1},
model::broker_shard{model::node_id{2}, 1},
};
auto p1 = model::partition_metadata(model::partition_id(1));
p1.replicas = {
model::broker_shard{model::node_id{3}, 1},
model::broker_shard{model::node_id{4}, 1},
model::broker_shard{model::node_id{5}, 1},
};
tp_md.partitions = {p0, p1};

auto members = cluster::get_partition_members(
model::partition_id{1}, tp_md);
std::vector<model::node_id> expected = {
model::node_id{3}, model::node_id{4}, model::node_id{5}};

BOOST_REQUIRE_EQUAL(members, expected);
};

0 comments on commit ddebd78

Please sign in to comment.