Skip to content

Commit

Permalink
Merge pull request #4443 from mmaslankaprv/limit-metadata-copies
Browse files Browse the repository at this point in the history
Optimized partition metadata lookup
  • Loading branch information
mmaslankaprv committed May 9, 2022
2 parents d7595a6 + d32572c commit 97d2ea2
Show file tree
Hide file tree
Showing 32 changed files with 388 additions and 318 deletions.
20 changes: 10 additions & 10 deletions src/v/cluster/controller_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,19 @@ bool has_node_local_replicas(
ss::future<result<std::vector<ntp_reconciliation_state>>>
controller_api::get_reconciliation_state(model::topic_namespace_view tp_ns) {
using ret_t = result<std::vector<ntp_reconciliation_state>>;
auto metadata = _topics.local().get_topic_metadata(tp_ns);
auto metadata = _topics.local().get_topic_metadata_ref(tp_ns);
if (!metadata) {
co_return ret_t(errc::topic_not_exists);
}
std::vector<model::ntp> ntps;
ntps.reserve(metadata->partitions.size());
ntps.reserve(metadata->get().get_assignments().size());

std::transform(
metadata->partitions.cbegin(),
metadata->partitions.cend(),
metadata->get().get_assignments().cbegin(),
metadata->get().get_assignments().cend(),
std::back_inserter(ntps),
[tp_ns](const model::partition_metadata& p_md) {
return model::ntp(tp_ns.ns, tp_ns.tp, p_md.id);
[tp_ns](const partition_assignment& p_as) {
return model::ntp(tp_ns.ns, tp_ns.tp, p_as.id);
});

co_return co_await get_reconciliation_state(std::move(ntps));
Expand Down Expand Up @@ -211,17 +211,17 @@ controller_api::get_reconciliation_state(
// high level APIs
ss::future<std::error_code> controller_api::wait_for_topic(
model::topic_namespace_view tp_ns, model::timeout_clock::time_point timeout) {
auto metadata = _topics.local().get_topic_metadata(tp_ns);
auto metadata = _topics.local().get_topic_metadata_ref(tp_ns);
if (!metadata) {
vlog(clusterlog.trace, "topic {} does not exists", tp_ns);
co_return make_error_code(errc::topic_not_exists);
}

absl::node_hash_map<model::node_id, std::vector<model::ntp>> requests;
// collect ntps per node
for (const auto& p_md : metadata->partitions) {
for (const auto& bs : p_md.replicas) {
requests[bs.node_id].emplace_back(tp_ns.ns, tp_ns.tp, p_md.id);
for (const auto& p_as : metadata->get().get_assignments()) {
for (const auto& bs : p_as.replicas) {
requests[bs.node_id].emplace_back(tp_ns.ns, tp_ns.tp, p_as.id);
}
}
bool ready = false;
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/health_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ health_manager::ensure_topic_replication(model::topic_namespace_view topic) {
co_return true;
}

for (const auto& partition : metadata->partitions) {
for (const auto& partition : metadata->get_assignments()) {
model::ntp ntp(topic.ns, topic.tp, partition.id);
if (!co_await ensure_partition_replication(std::move(ntp))) {
co_return false;
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/members_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ void members_backend::calculate_reallocations(update_meta& meta) {
if (!cfg.is_topic_replicable()) {
continue;
}
for (auto& pas : cfg.get_configuration().assignments) {
for (const auto& pas : cfg.get_assignments()) {
if (is_in_replica_set(pas.replicas, meta.update.id)) {
partition_reallocation reallocation(
model::ntp(tp_ns.ns, tp_ns.tp, pas.id),
Expand Down Expand Up @@ -284,7 +284,7 @@ void members_backend::calculate_reallocations_after_node_added(
meta.update.offset);
continue;
}
for (auto& p : metadata.get_configuration().assignments) {
for (const auto& p : metadata.get_assignments()) {
std::erase_if(to_move_from_node, [](const replicas_to_move& v) {
return v.left_to_move == 0;
});
Expand Down
41 changes: 28 additions & 13 deletions src/v/cluster/metadata_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include "cluster/metadata_cache.h"

#include "cluster/fwd.h"
#include "cluster/health_monitor_frontend.h"
#include "cluster/members_table.h"
#include "cluster/partition_leaders_table.h"
Expand Down Expand Up @@ -62,17 +63,34 @@ metadata_cache::get_source_topic(model::topic_namespace_view tp) const {
return mt->second.get_source_topic();
}

std::optional<model::topic_metadata> metadata_cache::get_topic_metadata(
std::optional<cluster::topic_metadata>
metadata_cache::get_topic_metadata(model::topic_namespace_view tp) const {
return _topics_state.local().get_topic_metadata(tp);
}

std::optional<std::reference_wrapper<const cluster::topic_metadata>>
metadata_cache::get_topic_metadata_ref(model::topic_namespace_view tn) const {
return _topics_state.local().get_topic_metadata_ref(tn);
}

std::optional<model::topic_metadata> metadata_cache::get_model_topic_metadata(
model::topic_namespace_view tp, metadata_cache::with_leaders leaders) const {
auto md = _topics_state.local().get_topic_metadata(tp);
if (!md) {
return md;
return std::nullopt;
}

model::topic_metadata metadata(md->get_configuration().tp_ns);
metadata.partitions.reserve(md->get_assignments().size());
for (const auto& p_as : md->get_assignments()) {
metadata.partitions.push_back(p_as.create_partition_metadata());
}

if (leaders) {
fill_partition_leaders(_leaders.local(), *md);
fill_partition_leaders(_leaders.local(), metadata);
}

return md;
return metadata;
}
std::optional<topic_configuration>
metadata_cache::get_topic_cfg(model::topic_namespace_view tp) const {
Expand All @@ -84,15 +102,8 @@ metadata_cache::get_topic_timestamp_type(model::topic_namespace_view tp) const {
return _topics_state.local().get_topic_timestamp_type(tp);
}

std::vector<model::topic_metadata> metadata_cache::all_topics_metadata(
metadata_cache::with_leaders leaders) const {
auto all_md = _topics_state.local().all_topics_metadata();
if (leaders) {
for (auto& md : all_md) {
fill_partition_leaders(_leaders.local(), md);
}
}
return all_md;
const topic_table::underlying_t& metadata_cache::all_topics_metadata() const {
return _topics_state.local().all_topics_metadata();
}

std::optional<broker_ptr> metadata_cache::get_broker(model::node_id nid) const {
Expand Down Expand Up @@ -148,6 +159,10 @@ bool metadata_cache::contains(
return _topics_state.local().contains(tp, pid);
}

bool metadata_cache::contains(model::topic_namespace_view tp) const {
return _topics_state.local().contains(tp);
}

ss::future<model::node_id> metadata_cache::get_leader(
const model::ntp& ntp,
ss::lowres_clock::time_point tout,
Expand Down
24 changes: 20 additions & 4 deletions src/v/cluster/metadata_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "cluster/fwd.h"
#include "cluster/health_monitor_types.h"
#include "cluster/partition_leaders_table.h"
#include "cluster/topic_table.h"
#include "cluster/types.h"
#include "model/fundamental.h"
#include "model/metadata.h"
Expand Down Expand Up @@ -72,8 +73,24 @@ class metadata_cache {
///\brief Returns metadata of single topic.
///
/// If topic does not exists it returns an empty optional
std::optional<model::topic_metadata> get_topic_metadata(
std::optional<model::topic_metadata> get_model_topic_metadata(
model::topic_namespace_view, with_leaders = with_leaders::yes) const;
///\brief Returns metadata of single topic.
///
/// If topic does not exists it returns an empty optional
std::optional<cluster::topic_metadata>
get_topic_metadata(model::topic_namespace_view) const;

///\brief Returns reference to metadata of single topic.
///
/// If topic does not exists it returns an empty optional
///
/// IMPORTANT: remember not to use the reference when its lifetime has to
/// span across multiple scheduling point. Reference returning method is
/// provided not to copy metadata object when used by synchronous parts of
/// code
std::optional<std::reference_wrapper<const cluster::topic_metadata>>
get_topic_metadata_ref(model::topic_namespace_view) const;

///\brief Returns configuration of single topic.
///
Expand All @@ -87,9 +104,7 @@ class metadata_cache {
std::optional<model::timestamp_type>
get_topic_timestamp_type(model::topic_namespace_view) const;

/// Returns metadata of all topics.
std::vector<model::topic_metadata>
all_topics_metadata(with_leaders = with_leaders::yes) const;
const topic_table::underlying_t& all_topics_metadata() const;

/// Returns all brokers, returns copy as the content of broker can change
std::vector<broker_ptr> all_brokers() const;
Expand All @@ -105,6 +120,7 @@ class metadata_cache {
std::optional<broker_ptr> get_broker(model::node_id) const;

bool contains(model::topic_namespace_view, model::partition_id) const;
bool contains(model::topic_namespace_view) const;

bool contains(const model::ntp& ntp) const {
return contains(model::topic_namespace_view(ntp), ntp.tp.partition);
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/metrics_reporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ metrics_reporter::build_metrics_snapshot() {
}

snapshot.topic_count++;
snapshot.partition_count += md.get_configuration().cfg.partition_count;
snapshot.partition_count += md.get_configuration().partition_count;
}

snapshot.nodes.reserve(metrics_map.size());
Expand Down
9 changes: 3 additions & 6 deletions src/v/cluster/scheduling/leader_balancer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,7 @@ ss::future<ss::stop_iteration> leader_balancer::balance() {
if (!topic.second.is_topic_replicable()) {
continue;
}
for (const auto& partition :
topic.second.get_configuration().assignments) {
for (const auto& partition : topic.second.get_assignments()) {
if (partition.group == transfer->group) {
ntp = model::ntp(topic.first.ns, topic.first.tp, partition.id);
break;
Expand Down Expand Up @@ -438,8 +437,7 @@ leader_balancer::find_leader_shard(const model::ntp& ntp) {

// If the inital query was for a non_replicable ntp, the get_leader() call
// would have returned its source topic
for (const auto& partition :
config_it->second.get_configuration().assignments) {
for (const auto& partition : config_it->second.get_assignments()) {
if (partition.id != ntp.tp.partition) {
continue;
}
Expand Down Expand Up @@ -474,8 +472,7 @@ leader_balancer::index_type leader_balancer::build_index() {
if (!topic.second.is_topic_replicable()) {
continue;
}
for (const auto& partition :
topic.second.get_configuration().assignments) {
for (const auto& partition : topic.second.get_assignments()) {
if (partition.replicas.empty()) {
vlog(
clusterlog.warn,
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ service::fetch_metadata_and_cfg(const std::vector<topic_result>& res) {
md.reserve(res.size());
for (const auto& r : res) {
if (r.ec == errc::success) {
auto topic_md = _md_cache.local().get_topic_metadata(r.tp_ns);
auto topic_md = _md_cache.local().get_model_topic_metadata(r.tp_ns);
auto topic_cfg = _md_cache.local().get_topic_cfg(r.tp_ns);
if (topic_md && topic_cfg) {
md.push_back(std::move(topic_md.value()));
Expand Down
18 changes: 11 additions & 7 deletions src/v/cluster/tests/autocreate_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ void validate_topic_metadata(cluster::metadata_cache& cache) {
for (auto& t_cfg : expected_topics) {
auto tp_md = cache.get_topic_metadata(t_cfg.tp_ns);
BOOST_REQUIRE_EQUAL(tp_md.has_value(), true);
BOOST_REQUIRE_EQUAL(tp_md->partitions.size(), t_cfg.partition_count);
BOOST_REQUIRE_EQUAL(
tp_md->get_assignments().size(), t_cfg.partition_count);
auto cfg = cache.get_topic_cfg(t_cfg.tp_ns);
BOOST_REQUIRE_EQUAL(cfg->tp_ns, t_cfg.tp_ns);
BOOST_REQUIRE_EQUAL(cfg->partition_count, t_cfg.partition_count);
Expand All @@ -54,11 +55,14 @@ void validate_topic_metadata(cluster::metadata_cache& cache) {

void wait_for_leaders(
cluster::partition_leaders_table& leader_table,
const model::topic_metadata md) {
const cluster::topic_metadata md) {
std::vector<ss::future<>> f;
f.reserve(md.partitions.size());
for (const auto& p : md.partitions) {
model::ntp ntp(md.tp_ns.ns, md.tp_ns.tp, p.id);
f.reserve(md.get_assignments().size());
for (const auto& p : md.get_assignments()) {
model::ntp ntp(
md.get_configuration().tp_ns.ns,
md.get_configuration().tp_ns.tp,
p.id);

f.push_back(leader_table
.wait_for_leader(
Expand Down Expand Up @@ -109,7 +113,7 @@ FIXTURE_TEST(create_single_topic_test_at_current_broker, cluster_test_fixture) {

BOOST_REQUIRE_EQUAL(md.has_value(), true);
wait_for_leaders(app->controller->get_partition_leaders().local(), *md);
BOOST_REQUIRE_EQUAL(md.value().tp_ns, r.tp_ns);
BOOST_REQUIRE_EQUAL(md->get_configuration().tp_ns, r.tp_ns);
}
}

Expand Down Expand Up @@ -151,7 +155,7 @@ FIXTURE_TEST(test_autocreate_on_non_leader, cluster_test_fixture) {
BOOST_REQUIRE_EQUAL(md.has_value(), true);
wait_for_leaders(
app_0->controller->get_partition_leaders().local(), *md);
BOOST_REQUIRE_EQUAL(md.value().tp_ns, r.tp_ns);
BOOST_REQUIRE_EQUAL(md->get_configuration().tp_ns, r.tp_ns);
}
// Make sure caches are the same
validate_topic_metadata(get_local_cache(n_1));
Expand Down
10 changes: 6 additions & 4 deletions src/v/cluster/tests/create_partitions_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,17 @@ FIXTURE_TEST(test_creating_partitions, rebalancing_tests_fixture) {
auto tp_2_md = topics.local().get_topic_metadata(make_tp_ns("test-2"));

// total 6 partitions
BOOST_REQUIRE_EQUAL(tp_1_md->partitions.size(), 6);
BOOST_REQUIRE_EQUAL(tp_1_md->get_assignments().size(), 6);
for (auto i = 0; i < 6; ++i) {
BOOST_REQUIRE_EQUAL(tp_1_md->partitions[i].id, model::partition_id(i));
BOOST_REQUIRE(
tp_1_md->get_assignments().contains(model::partition_id(i)));
}

// total 9 partitions
BOOST_REQUIRE_EQUAL(tp_2_md->partitions.size(), 9);
BOOST_REQUIRE_EQUAL(tp_2_md->get_assignments().size(), 9);
for (auto i = 0; i < 9; ++i) {
BOOST_REQUIRE_EQUAL(tp_2_md->partitions[i].id, model::partition_id(i));
BOOST_REQUIRE(
tp_2_md->get_assignments().contains(model::partition_id(i)));
}
}

Expand Down
15 changes: 9 additions & 6 deletions src/v/cluster/tests/metadata_dissemination_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,22 @@ wait_for_leaders_updates(int id, cluster::metadata_cache& cache) {
std::chrono::seconds(10),
[&cache, &leaders, id] {
leaders.clear();
auto tp_md = cache.get_topic_metadata(model::topic_namespace(
model::ns("default"), model::topic("test_1")));
const model::topic_namespace tn(
model::ns("default"), model::topic("test_1"));
auto tp_md = cache.get_topic_metadata(tn);

if (!tp_md) {
return false;
}
if (tp_md->partitions.size() != 3) {
if (tp_md->get_assignments().size() != 3) {
return false;
}
for (auto& p_md : tp_md->partitions) {
if (!p_md.leader_node) {
for (auto& p_md : tp_md->get_assignments()) {
auto leader_id = cache.get_leader_id(tn, p_md.id);
if (!leader_id) {
return false;
}
leaders.push_back(*p_md.leader_node);
leaders.push_back(*leader_id);
}
return true;
})
Expand Down
11 changes: 5 additions & 6 deletions src/v/cluster/tests/partition_moving_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,11 @@ class partition_assignment_test_fixture : public cluster_test_fixture {

std::vector<model::broker_shard>
get_replicas(int source_node, const model::ntp& ntp) {
auto md = controller(source_node)
->get_topics_state()
.local()
.get_topic_metadata(model::topic_namespace_view(ntp));

return md->partitions.begin()->replicas;
return controller(source_node)
->get_topics_state()
.local()
.get_partition_assignment(ntp)
->replicas;
}

void wait_for_replica_set_partitions(
Expand Down
9 changes: 4 additions & 5 deletions src/v/cluster/tests/rebalancing_tests_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class rebalancing_tests_fixture : public cluster_test_fixture {
.local()
.get_topic_metadata(model::topic_namespace_view(ntp));

return md->partitions.begin()->replicas;
return md->get_assignments().begin()->replicas;
}

void create_topic(cluster::topic_configuration cfg) {
Expand Down Expand Up @@ -204,10 +204,9 @@ class rebalancing_tests_fixture : public cluster_test_fixture {

void populate_all_topics_with_data() {
auto md = get_local_cache(model::node_id(0)).all_topics_metadata();
for (auto& topic_metadata : md) {
for (auto& p : topic_metadata.partitions) {
model::ntp ntp(
topic_metadata.tp_ns.ns, topic_metadata.tp_ns.tp, p.id);
for (auto& [tp_ns, topic_metadata] : md) {
for (auto& p : topic_metadata.get_assignments()) {
model::ntp ntp(tp_ns.ns, tp_ns.tp, p.id);
replicate_data(ntp, 10);
}
}
Expand Down
Loading

0 comments on commit 97d2ea2

Please sign in to comment.