Skip to content

Commit

Permalink
c/topic_table: introduced method returning reference to topic metadata
Browse files Browse the repository at this point in the history
Introduced method returning reference to topic metadata. The reference
returning method allow us to avoid copying metadata object when it has
to be accessed by synchronous parts of code.

Important:
Remember not to use the reference when its lifetime has to
span across multiple scheduling point. Reference returning method is
provided not to copy metadata object when used by synchronous parts of
code

Signed-off-by: Michal Maslanka <michal@vectorized.io>
  • Loading branch information
mmaslankaprv committed May 5, 2022
1 parent abc00f0 commit d32572c
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 8 deletions.
12 changes: 6 additions & 6 deletions src/v/cluster/controller_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,16 @@ bool has_node_local_replicas(
ss::future<result<std::vector<ntp_reconciliation_state>>>
controller_api::get_reconciliation_state(model::topic_namespace_view tp_ns) {
using ret_t = result<std::vector<ntp_reconciliation_state>>;
auto metadata = _topics.local().get_topic_metadata(tp_ns);
auto metadata = _topics.local().get_topic_metadata_ref(tp_ns);
if (!metadata) {
co_return ret_t(errc::topic_not_exists);
}
std::vector<model::ntp> ntps;
ntps.reserve(metadata->get_assignments().size());
ntps.reserve(metadata->get().get_assignments().size());

std::transform(
metadata->get_assignments().cbegin(),
metadata->get_assignments().cend(),
metadata->get().get_assignments().cbegin(),
metadata->get().get_assignments().cend(),
std::back_inserter(ntps),
[tp_ns](const partition_assignment& p_as) {
return model::ntp(tp_ns.ns, tp_ns.tp, p_as.id);
Expand Down Expand Up @@ -211,15 +211,15 @@ controller_api::get_reconciliation_state(
// high level APIs
ss::future<std::error_code> controller_api::wait_for_topic(
model::topic_namespace_view tp_ns, model::timeout_clock::time_point timeout) {
auto metadata = _topics.local().get_topic_metadata(tp_ns);
auto metadata = _topics.local().get_topic_metadata_ref(tp_ns);
if (!metadata) {
vlog(clusterlog.trace, "topic {} does not exists", tp_ns);
co_return make_error_code(errc::topic_not_exists);
}

absl::node_hash_map<model::node_id, std::vector<model::ntp>> requests;
// collect ntps per node
for (const auto& p_as : metadata->get_assignments()) {
for (const auto& p_as : metadata->get().get_assignments()) {
for (const auto& bs : p_as.replicas) {
requests[bs.node_id].emplace_back(tp_ns.ns, tp_ns.tp, p_as.id);
}
Expand Down
5 changes: 5 additions & 0 deletions src/v/cluster/metadata_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ metadata_cache::get_topic_metadata(model::topic_namespace_view tp) const {
return _topics_state.local().get_topic_metadata(tp);
}

std::optional<std::reference_wrapper<const cluster::topic_metadata>>
metadata_cache::get_topic_metadata_ref(model::topic_namespace_view tn) const {
return _topics_state.local().get_topic_metadata_ref(tn);
}

std::optional<model::topic_metadata> metadata_cache::get_model_topic_metadata(
model::topic_namespace_view tp, metadata_cache::with_leaders leaders) const {
auto md = _topics_state.local().get_topic_metadata(tp);
Expand Down
11 changes: 11 additions & 0 deletions src/v/cluster/metadata_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,17 @@ class metadata_cache {
std::optional<cluster::topic_metadata>
get_topic_metadata(model::topic_namespace_view) const;

///\brief Returns reference to metadata of single topic.
///
/// If topic does not exists it returns an empty optional
///
/// IMPORTANT: remember not to use the reference when its lifetime has to
/// span across multiple scheduling point. Reference returning method is
/// provided not to copy metadata object when used by synchronous parts of
/// code
std::optional<std::reference_wrapper<const cluster::topic_metadata>>
get_topic_metadata_ref(model::topic_namespace_view) const;

///\brief Returns configuration of single topic.
///
/// If topic does not exists it returns an empty optional
Expand Down
7 changes: 7 additions & 0 deletions src/v/cluster/topic_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,13 @@ topic_table::get_topic_metadata(model::topic_namespace_view tp) const {
}
return {};
}
std::optional<std::reference_wrapper<const topic_metadata>>
topic_table::get_topic_metadata_ref(model::topic_namespace_view tp) const {
if (auto it = _topics.find(tp); it != _topics.end()) {
return it->second;
}
return {};
}

std::optional<topic_configuration>
topic_table::get_topic_cfg(model::topic_namespace_view tp) const {
Expand Down
11 changes: 11 additions & 0 deletions src/v/cluster/topic_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,17 @@ class topic_table {
std::optional<topic_metadata>
get_topic_metadata(model::topic_namespace_view) const;

///\brief Returns reference to metadata of single topic.
///
/// If topic does not exists it returns an empty optional
///
/// IMPORTANT: remember not to use the reference when its lifetime has to
/// span across multiple scheduling point. Reference returning method is
/// provided not to copy metadata object when used by synchronous parts of
/// code
std::optional<std::reference_wrapper<const topic_metadata>>
get_topic_metadata_ref(model::topic_namespace_view) const;

///\brief Returns configuration of single topic.
///
/// If topic does not exists it returns an empty optional
Expand Down
4 changes: 2 additions & 2 deletions src/v/kafka/server/handlers/topics/topic_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ ss::future<std::vector<model::node_id>> wait_for_leaders(
continue;
}
// collect partitions
auto md = md_cache.get_topic_metadata(r.tp_ns);
auto md = md_cache.get_topic_metadata_ref(r.tp_ns);
if (!md) {
// topic already deleted
continue;
}
// for each partition ask for leader
for (auto& pmd : md->get_assignments()) {
for (auto& pmd : md->get().get_assignments()) {
futures.push_back(md_cache.get_leader(
model::ntp(r.tp_ns.ns, r.tp_ns.tp, pmd.id), timeout));
}
Expand Down

0 comments on commit d32572c

Please sign in to comment.