From 33270b9246b1fc3bfa1b1c89d37706e191c19c1e Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Thu, 5 May 2022 13:29:41 +0200 Subject: [PATCH] c/topic_table: introduced method returning reference to topic metadata Introduced method returning reference to topic metadata. The reference returning method allow us to avoid copying metadata object when it has to be accessed by synchronous parts of code. Important: Remember not to use the reference when its lifetime has to span across multiple scheduling point. Reference returning method is provided not to copy metadata object when used by synchronous parts of code Signed-off-by: Michal Maslanka --- src/v/cluster/controller_api.cc | 10 +++++----- src/v/cluster/metadata_cache.cc | 5 +++++ src/v/cluster/metadata_cache.h | 11 +++++++++++ src/v/cluster/topic_table.cc | 7 +++++++ src/v/cluster/topic_table.h | 11 +++++++++++ src/v/kafka/server/handlers/topics/topic_utils.cc | 4 ++-- 6 files changed, 41 insertions(+), 7 deletions(-) diff --git a/src/v/cluster/controller_api.cc b/src/v/cluster/controller_api.cc index 83a0b5e0bfc3c..8fbef005e2d6e 100644 --- a/src/v/cluster/controller_api.cc +++ b/src/v/cluster/controller_api.cc @@ -63,7 +63,7 @@ bool has_node_local_replicas( ss::future>> controller_api::get_reconciliation_state(model::topic_namespace_view tp_ns) { using ret_t = result>; - auto metadata = _topics.local().get_topic_metadata(tp_ns); + auto metadata = _topics.local().get_topic_metadata_ref(tp_ns); if (!metadata) { co_return ret_t(errc::topic_not_exists); } @@ -71,8 +71,8 @@ controller_api::get_reconciliation_state(model::topic_namespace_view tp_ns) { ntps.reserve(metadata->get_assignments().size()); std::transform( - metadata->get_assignments().cbegin(), - metadata->get_assignments().cend(), + metadata->get().get_assignments().cbegin(), + metadata->get().get_assignments().cend(), std::back_inserter(ntps), [tp_ns](const partition_assignment& p_as) { return model::ntp(tp_ns.ns, tp_ns.tp, p_as.id); @@ -211,7 +211,7 @@ controller_api::get_reconciliation_state( // high level APIs ss::future controller_api::wait_for_topic( model::topic_namespace_view tp_ns, model::timeout_clock::time_point timeout) { - auto metadata = _topics.local().get_topic_metadata(tp_ns); + auto metadata = _topics.local().get_topic_metadata_ref(tp_ns); if (!metadata) { vlog(clusterlog.trace, "topic {} does not exists", tp_ns); co_return make_error_code(errc::topic_not_exists); @@ -219,7 +219,7 @@ ss::future controller_api::wait_for_topic( absl::node_hash_map> requests; // collect ntps per node - for (const auto& p_as : metadata->get_assignments()) { + for (const auto& p_as : metadata->get().get_assignments()) { for (const auto& bs : p_as.replicas) { requests[bs.node_id].emplace_back(tp_ns.ns, tp_ns.tp, p_as.id); } diff --git a/src/v/cluster/metadata_cache.cc b/src/v/cluster/metadata_cache.cc index 18b9ca39e2e1e..7a39ca4038ee7 100644 --- a/src/v/cluster/metadata_cache.cc +++ b/src/v/cluster/metadata_cache.cc @@ -68,6 +68,11 @@ metadata_cache::get_topic_metadata(model::topic_namespace_view tp) const { return _topics_state.local().get_topic_metadata(tp); } +std::optional> +metadata_cache::get_topic_metadata_ref(model::topic_namespace_view tn) const { + return _topics_state.local().get_topic_metadata_ref(tn); +} + std::optional metadata_cache::get_model_topic_metadata( model::topic_namespace_view tp, metadata_cache::with_leaders leaders) const { auto md = _topics_state.local().get_topic_metadata(tp); diff --git a/src/v/cluster/metadata_cache.h b/src/v/cluster/metadata_cache.h index 3875ff9a5e0f7..d25a7d2b31c36 100644 --- a/src/v/cluster/metadata_cache.h +++ b/src/v/cluster/metadata_cache.h @@ -81,6 +81,17 @@ class metadata_cache { std::optional get_topic_metadata(model::topic_namespace_view) const; + ///\brief Returns reference to metadata of single topic. + /// + /// If topic does not exists it returns an empty optional + /// + /// IMPORTANT: remember not to use the reference when its lifetime has to + /// span across multiple scheduling point. Reference returning method is + /// provided not to copy metadata object when used by synchronous parts of + /// code + std::optional> + get_topic_metadata_ref(model::topic_namespace_view) const; + ///\brief Returns configuration of single topic. /// /// If topic does not exists it returns an empty optional diff --git a/src/v/cluster/topic_table.cc b/src/v/cluster/topic_table.cc index eafc4ff5a3827..83ad4bde67144 100644 --- a/src/v/cluster/topic_table.cc +++ b/src/v/cluster/topic_table.cc @@ -502,6 +502,13 @@ topic_table::get_topic_metadata(model::topic_namespace_view tp) const { } return {}; } +std::optional> +topic_table::get_topic_metadata_ref(model::topic_namespace_view tp) const { + if (auto it = _topics.find(tp); it != _topics.end()) { + return it->second; + } + return {}; +} std::optional topic_table::get_topic_cfg(model::topic_namespace_view tp) const { diff --git a/src/v/cluster/topic_table.h b/src/v/cluster/topic_table.h index f1fe4edbd2037..4cc63f87c2e30 100644 --- a/src/v/cluster/topic_table.h +++ b/src/v/cluster/topic_table.h @@ -118,6 +118,17 @@ class topic_table { std::optional get_topic_metadata(model::topic_namespace_view) const; + ///\brief Returns reference to metadata of single topic. + /// + /// If topic does not exists it returns an empty optional + /// + /// IMPORTANT: remember not to use the reference when its lifetime has to + /// span across multiple scheduling point. Reference returning method is + /// provided not to copy metadata object when used by synchronous parts of + /// code + std::optional> + get_topic_metadata_ref(model::topic_namespace_view) const; + ///\brief Returns configuration of single topic. /// /// If topic does not exists it returns an empty optional diff --git a/src/v/kafka/server/handlers/topics/topic_utils.cc b/src/v/kafka/server/handlers/topics/topic_utils.cc index 708408bdea546..2e651668596a6 100644 --- a/src/v/kafka/server/handlers/topics/topic_utils.cc +++ b/src/v/kafka/server/handlers/topics/topic_utils.cc @@ -42,13 +42,13 @@ ss::future> wait_for_leaders( continue; } // collect partitions - auto md = md_cache.get_topic_metadata(r.tp_ns); + auto md = md_cache.get_topic_metadata_ref(r.tp_ns); if (!md) { // topic already deleted continue; } // for each partition ask for leader - for (auto& pmd : md->get_assignments()) { + for (auto& pmd : md->get().get_assignments()) { futures.push_back(md_cache.get_leader( model::ntp(r.tp_ns.ns, r.tp_ns.tp, pmd.id), timeout)); }