diff --git a/src/v/cluster/controller_api.cc b/src/v/cluster/controller_api.cc index 83a0b5e0bfc3c..8fbef005e2d6e 100644 --- a/src/v/cluster/controller_api.cc +++ b/src/v/cluster/controller_api.cc @@ -63,7 +63,7 @@ bool has_node_local_replicas( ss::future>> controller_api::get_reconciliation_state(model::topic_namespace_view tp_ns) { using ret_t = result>; - auto metadata = _topics.local().get_topic_metadata(tp_ns); + auto metadata = _topics.local().get_topic_metadata_ref(tp_ns); if (!metadata) { co_return ret_t(errc::topic_not_exists); } @@ -71,8 +71,8 @@ controller_api::get_reconciliation_state(model::topic_namespace_view tp_ns) { ntps.reserve(metadata->get_assignments().size()); std::transform( - metadata->get_assignments().cbegin(), - metadata->get_assignments().cend(), + metadata->get().get_assignments().cbegin(), + metadata->get().get_assignments().cend(), std::back_inserter(ntps), [tp_ns](const partition_assignment& p_as) { return model::ntp(tp_ns.ns, tp_ns.tp, p_as.id); @@ -211,7 +211,7 @@ controller_api::get_reconciliation_state( // high level APIs ss::future controller_api::wait_for_topic( model::topic_namespace_view tp_ns, model::timeout_clock::time_point timeout) { - auto metadata = _topics.local().get_topic_metadata(tp_ns); + auto metadata = _topics.local().get_topic_metadata_ref(tp_ns); if (!metadata) { vlog(clusterlog.trace, "topic {} does not exists", tp_ns); co_return make_error_code(errc::topic_not_exists); @@ -219,7 +219,7 @@ ss::future controller_api::wait_for_topic( absl::node_hash_map> requests; // collect ntps per node - for (const auto& p_as : metadata->get_assignments()) { + for (const auto& p_as : metadata->get().get_assignments()) { for (const auto& bs : p_as.replicas) { requests[bs.node_id].emplace_back(tp_ns.ns, tp_ns.tp, p_as.id); } diff --git a/src/v/cluster/metadata_cache.cc b/src/v/cluster/metadata_cache.cc index 18b9ca39e2e1e..7a39ca4038ee7 100644 --- a/src/v/cluster/metadata_cache.cc +++ b/src/v/cluster/metadata_cache.cc @@ -68,6 +68,11 @@ metadata_cache::get_topic_metadata(model::topic_namespace_view tp) const { return _topics_state.local().get_topic_metadata(tp); } +std::optional> +metadata_cache::get_topic_metadata_ref(model::topic_namespace_view tn) const { + return _topics_state.local().get_topic_metadata_ref(tn); +} + std::optional metadata_cache::get_model_topic_metadata( model::topic_namespace_view tp, metadata_cache::with_leaders leaders) const { auto md = _topics_state.local().get_topic_metadata(tp); diff --git a/src/v/cluster/metadata_cache.h b/src/v/cluster/metadata_cache.h index 3875ff9a5e0f7..d25a7d2b31c36 100644 --- a/src/v/cluster/metadata_cache.h +++ b/src/v/cluster/metadata_cache.h @@ -81,6 +81,17 @@ class metadata_cache { std::optional get_topic_metadata(model::topic_namespace_view) const; + ///\brief Returns reference to metadata of single topic. + /// + /// If topic does not exists it returns an empty optional + /// + /// IMPORTANT: remember not to use the reference when its lifetime has to + /// span across multiple scheduling point. Reference returning method is + /// provided not to copy metadata object when used by synchronous parts of + /// code + std::optional> + get_topic_metadata_ref(model::topic_namespace_view) const; + ///\brief Returns configuration of single topic. /// /// If topic does not exists it returns an empty optional diff --git a/src/v/cluster/topic_table.cc b/src/v/cluster/topic_table.cc index eafc4ff5a3827..83ad4bde67144 100644 --- a/src/v/cluster/topic_table.cc +++ b/src/v/cluster/topic_table.cc @@ -502,6 +502,13 @@ topic_table::get_topic_metadata(model::topic_namespace_view tp) const { } return {}; } +std::optional> +topic_table::get_topic_metadata_ref(model::topic_namespace_view tp) const { + if (auto it = _topics.find(tp); it != _topics.end()) { + return it->second; + } + return {}; +} std::optional topic_table::get_topic_cfg(model::topic_namespace_view tp) const { diff --git a/src/v/cluster/topic_table.h b/src/v/cluster/topic_table.h index f1fe4edbd2037..4cc63f87c2e30 100644 --- a/src/v/cluster/topic_table.h +++ b/src/v/cluster/topic_table.h @@ -118,6 +118,17 @@ class topic_table { std::optional get_topic_metadata(model::topic_namespace_view) const; + ///\brief Returns reference to metadata of single topic. + /// + /// If topic does not exists it returns an empty optional + /// + /// IMPORTANT: remember not to use the reference when its lifetime has to + /// span across multiple scheduling point. Reference returning method is + /// provided not to copy metadata object when used by synchronous parts of + /// code + std::optional> + get_topic_metadata_ref(model::topic_namespace_view) const; + ///\brief Returns configuration of single topic. /// /// If topic does not exists it returns an empty optional diff --git a/src/v/kafka/server/handlers/topics/topic_utils.cc b/src/v/kafka/server/handlers/topics/topic_utils.cc index 708408bdea546..2e651668596a6 100644 --- a/src/v/kafka/server/handlers/topics/topic_utils.cc +++ b/src/v/kafka/server/handlers/topics/topic_utils.cc @@ -42,13 +42,13 @@ ss::future> wait_for_leaders( continue; } // collect partitions - auto md = md_cache.get_topic_metadata(r.tp_ns); + auto md = md_cache.get_topic_metadata_ref(r.tp_ns); if (!md) { // topic already deleted continue; } // for each partition ask for leader - for (auto& pmd : md->get_assignments()) { + for (auto& pmd : md->get().get_assignments()) { futures.push_back(md_cache.get_leader( model::ntp(r.tp_ns.ns, r.tp_ns.tp, pmd.id), timeout)); }