From d32572cb0912e3d658001e07230c84b8df12da7e Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Thu, 5 May 2022 13:29:41 +0200 Subject: [PATCH] c/topic_table: introduced method returning reference to topic metadata Introduced method returning reference to topic metadata. The reference returning method allow us to avoid copying metadata object when it has to be accessed by synchronous parts of code. Important: Remember not to use the reference when its lifetime has to span across multiple scheduling point. Reference returning method is provided not to copy metadata object when used by synchronous parts of code Signed-off-by: Michal Maslanka --- src/v/cluster/controller_api.cc | 12 ++++++------ src/v/cluster/metadata_cache.cc | 5 +++++ src/v/cluster/metadata_cache.h | 11 +++++++++++ src/v/cluster/topic_table.cc | 7 +++++++ src/v/cluster/topic_table.h | 11 +++++++++++ src/v/kafka/server/handlers/topics/topic_utils.cc | 4 ++-- 6 files changed, 42 insertions(+), 8 deletions(-) diff --git a/src/v/cluster/controller_api.cc b/src/v/cluster/controller_api.cc index 83a0b5e0bfc3..70952ba13159 100644 --- a/src/v/cluster/controller_api.cc +++ b/src/v/cluster/controller_api.cc @@ -63,16 +63,16 @@ bool has_node_local_replicas( ss::future>> controller_api::get_reconciliation_state(model::topic_namespace_view tp_ns) { using ret_t = result>; - auto metadata = _topics.local().get_topic_metadata(tp_ns); + auto metadata = _topics.local().get_topic_metadata_ref(tp_ns); if (!metadata) { co_return ret_t(errc::topic_not_exists); } std::vector ntps; - ntps.reserve(metadata->get_assignments().size()); + ntps.reserve(metadata->get().get_assignments().size()); std::transform( - metadata->get_assignments().cbegin(), - metadata->get_assignments().cend(), + metadata->get().get_assignments().cbegin(), + metadata->get().get_assignments().cend(), std::back_inserter(ntps), [tp_ns](const partition_assignment& p_as) { return model::ntp(tp_ns.ns, tp_ns.tp, p_as.id); @@ -211,7 +211,7 @@ controller_api::get_reconciliation_state( // high level APIs ss::future controller_api::wait_for_topic( model::topic_namespace_view tp_ns, model::timeout_clock::time_point timeout) { - auto metadata = _topics.local().get_topic_metadata(tp_ns); + auto metadata = _topics.local().get_topic_metadata_ref(tp_ns); if (!metadata) { vlog(clusterlog.trace, "topic {} does not exists", tp_ns); co_return make_error_code(errc::topic_not_exists); @@ -219,7 +219,7 @@ ss::future controller_api::wait_for_topic( absl::node_hash_map> requests; // collect ntps per node - for (const auto& p_as : metadata->get_assignments()) { + for (const auto& p_as : metadata->get().get_assignments()) { for (const auto& bs : p_as.replicas) { requests[bs.node_id].emplace_back(tp_ns.ns, tp_ns.tp, p_as.id); } diff --git a/src/v/cluster/metadata_cache.cc b/src/v/cluster/metadata_cache.cc index 18b9ca39e2e1..7a39ca4038ee 100644 --- a/src/v/cluster/metadata_cache.cc +++ b/src/v/cluster/metadata_cache.cc @@ -68,6 +68,11 @@ metadata_cache::get_topic_metadata(model::topic_namespace_view tp) const { return _topics_state.local().get_topic_metadata(tp); } +std::optional> +metadata_cache::get_topic_metadata_ref(model::topic_namespace_view tn) const { + return _topics_state.local().get_topic_metadata_ref(tn); +} + std::optional metadata_cache::get_model_topic_metadata( model::topic_namespace_view tp, metadata_cache::with_leaders leaders) const { auto md = _topics_state.local().get_topic_metadata(tp); diff --git a/src/v/cluster/metadata_cache.h b/src/v/cluster/metadata_cache.h index 3875ff9a5e0f..d25a7d2b31c3 100644 --- a/src/v/cluster/metadata_cache.h +++ b/src/v/cluster/metadata_cache.h @@ -81,6 +81,17 @@ class metadata_cache { std::optional get_topic_metadata(model::topic_namespace_view) const; + ///\brief Returns reference to metadata of single topic. + /// + /// If topic does not exists it returns an empty optional + /// + /// IMPORTANT: remember not to use the reference when its lifetime has to + /// span across multiple scheduling point. Reference returning method is + /// provided not to copy metadata object when used by synchronous parts of + /// code + std::optional> + get_topic_metadata_ref(model::topic_namespace_view) const; + ///\brief Returns configuration of single topic. /// /// If topic does not exists it returns an empty optional diff --git a/src/v/cluster/topic_table.cc b/src/v/cluster/topic_table.cc index eafc4ff5a382..83ad4bde6714 100644 --- a/src/v/cluster/topic_table.cc +++ b/src/v/cluster/topic_table.cc @@ -502,6 +502,13 @@ topic_table::get_topic_metadata(model::topic_namespace_view tp) const { } return {}; } +std::optional> +topic_table::get_topic_metadata_ref(model::topic_namespace_view tp) const { + if (auto it = _topics.find(tp); it != _topics.end()) { + return it->second; + } + return {}; +} std::optional topic_table::get_topic_cfg(model::topic_namespace_view tp) const { diff --git a/src/v/cluster/topic_table.h b/src/v/cluster/topic_table.h index f1fe4edbd203..4cc63f87c2e3 100644 --- a/src/v/cluster/topic_table.h +++ b/src/v/cluster/topic_table.h @@ -118,6 +118,17 @@ class topic_table { std::optional get_topic_metadata(model::topic_namespace_view) const; + ///\brief Returns reference to metadata of single topic. + /// + /// If topic does not exists it returns an empty optional + /// + /// IMPORTANT: remember not to use the reference when its lifetime has to + /// span across multiple scheduling point. Reference returning method is + /// provided not to copy metadata object when used by synchronous parts of + /// code + std::optional> + get_topic_metadata_ref(model::topic_namespace_view) const; + ///\brief Returns configuration of single topic. /// /// If topic does not exists it returns an empty optional diff --git a/src/v/kafka/server/handlers/topics/topic_utils.cc b/src/v/kafka/server/handlers/topics/topic_utils.cc index 708408bdea54..2e651668596a 100644 --- a/src/v/kafka/server/handlers/topics/topic_utils.cc +++ b/src/v/kafka/server/handlers/topics/topic_utils.cc @@ -42,13 +42,13 @@ ss::future> wait_for_leaders( continue; } // collect partitions - auto md = md_cache.get_topic_metadata(r.tp_ns); + auto md = md_cache.get_topic_metadata_ref(r.tp_ns); if (!md) { // topic already deleted continue; } // for each partition ask for leader - for (auto& pmd : md->get_assignments()) { + for (auto& pmd : md->get().get_assignments()) { futures.push_back(md_cache.get_leader( model::ntp(r.tp_ns.ns, r.tp_ns.tp, pmd.id), timeout)); }