Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimized partition metadata lookup #4443

Merged
merged 2 commits into from
May 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions src/v/cluster/controller_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,19 @@ bool has_node_local_replicas(
ss::future<result<std::vector<ntp_reconciliation_state>>>
controller_api::get_reconciliation_state(model::topic_namespace_view tp_ns) {
using ret_t = result<std::vector<ntp_reconciliation_state>>;
auto metadata = _topics.local().get_topic_metadata(tp_ns);
auto metadata = _topics.local().get_topic_metadata_ref(tp_ns);
if (!metadata) {
co_return ret_t(errc::topic_not_exists);
}
std::vector<model::ntp> ntps;
ntps.reserve(metadata->partitions.size());
ntps.reserve(metadata->get().get_assignments().size());

std::transform(
metadata->partitions.cbegin(),
metadata->partitions.cend(),
metadata->get().get_assignments().cbegin(),
metadata->get().get_assignments().cend(),
std::back_inserter(ntps),
[tp_ns](const model::partition_metadata& p_md) {
return model::ntp(tp_ns.ns, tp_ns.tp, p_md.id);
[tp_ns](const partition_assignment& p_as) {
return model::ntp(tp_ns.ns, tp_ns.tp, p_as.id);
});

co_return co_await get_reconciliation_state(std::move(ntps));
Expand Down Expand Up @@ -211,17 +211,17 @@ controller_api::get_reconciliation_state(
// high level APIs
ss::future<std::error_code> controller_api::wait_for_topic(
model::topic_namespace_view tp_ns, model::timeout_clock::time_point timeout) {
auto metadata = _topics.local().get_topic_metadata(tp_ns);
auto metadata = _topics.local().get_topic_metadata_ref(tp_ns);
if (!metadata) {
vlog(clusterlog.trace, "topic {} does not exists", tp_ns);
co_return make_error_code(errc::topic_not_exists);
}

absl::node_hash_map<model::node_id, std::vector<model::ntp>> requests;
// collect ntps per node
for (const auto& p_md : metadata->partitions) {
for (const auto& bs : p_md.replicas) {
requests[bs.node_id].emplace_back(tp_ns.ns, tp_ns.tp, p_md.id);
for (const auto& p_as : metadata->get().get_assignments()) {
for (const auto& bs : p_as.replicas) {
requests[bs.node_id].emplace_back(tp_ns.ns, tp_ns.tp, p_as.id);
}
}
bool ready = false;
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/health_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ health_manager::ensure_topic_replication(model::topic_namespace_view topic) {
co_return true;
}

for (const auto& partition : metadata->partitions) {
for (const auto& partition : metadata->get_assignments()) {
model::ntp ntp(topic.ns, topic.tp, partition.id);
if (!co_await ensure_partition_replication(std::move(ntp))) {
co_return false;
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/members_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ void members_backend::calculate_reallocations(update_meta& meta) {
if (!cfg.is_topic_replicable()) {
continue;
}
for (auto& pas : cfg.get_configuration().assignments) {
for (const auto& pas : cfg.get_assignments()) {
if (is_in_replica_set(pas.replicas, meta.update.id)) {
partition_reallocation reallocation(
model::ntp(tp_ns.ns, tp_ns.tp, pas.id),
Expand Down Expand Up @@ -284,7 +284,7 @@ void members_backend::calculate_reallocations_after_node_added(
meta.update.offset);
continue;
}
for (auto& p : metadata.get_configuration().assignments) {
for (const auto& p : metadata.get_assignments()) {
std::erase_if(to_move_from_node, [](const replicas_to_move& v) {
return v.left_to_move == 0;
});
Expand Down
41 changes: 28 additions & 13 deletions src/v/cluster/metadata_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include "cluster/metadata_cache.h"

#include "cluster/fwd.h"
#include "cluster/health_monitor_frontend.h"
#include "cluster/members_table.h"
#include "cluster/partition_leaders_table.h"
Expand Down Expand Up @@ -62,17 +63,34 @@ metadata_cache::get_source_topic(model::topic_namespace_view tp) const {
return mt->second.get_source_topic();
}

std::optional<model::topic_metadata> metadata_cache::get_topic_metadata(
std::optional<cluster::topic_metadata>
metadata_cache::get_topic_metadata(model::topic_namespace_view tp) const {
return _topics_state.local().get_topic_metadata(tp);
}

std::optional<std::reference_wrapper<const cluster::topic_metadata>>
metadata_cache::get_topic_metadata_ref(model::topic_namespace_view tn) const {
return _topics_state.local().get_topic_metadata_ref(tn);
}

std::optional<model::topic_metadata> metadata_cache::get_model_topic_metadata(
model::topic_namespace_view tp, metadata_cache::with_leaders leaders) const {
auto md = _topics_state.local().get_topic_metadata(tp);
if (!md) {
return md;
return std::nullopt;
}

model::topic_metadata metadata(md->get_configuration().tp_ns);
metadata.partitions.reserve(md->get_assignments().size());
for (const auto& p_as : md->get_assignments()) {
metadata.partitions.push_back(p_as.create_partition_metadata());
}

if (leaders) {
fill_partition_leaders(_leaders.local(), *md);
fill_partition_leaders(_leaders.local(), metadata);
}

return md;
return metadata;
}
std::optional<topic_configuration>
metadata_cache::get_topic_cfg(model::topic_namespace_view tp) const {
Expand All @@ -84,15 +102,8 @@ metadata_cache::get_topic_timestamp_type(model::topic_namespace_view tp) const {
return _topics_state.local().get_topic_timestamp_type(tp);
}

std::vector<model::topic_metadata> metadata_cache::all_topics_metadata(
metadata_cache::with_leaders leaders) const {
auto all_md = _topics_state.local().all_topics_metadata();
if (leaders) {
for (auto& md : all_md) {
fill_partition_leaders(_leaders.local(), md);
}
}
return all_md;
const topic_table::underlying_t& metadata_cache::all_topics_metadata() const {
return _topics_state.local().all_topics_metadata();
}

std::optional<broker_ptr> metadata_cache::get_broker(model::node_id nid) const {
Expand Down Expand Up @@ -148,6 +159,10 @@ bool metadata_cache::contains(
return _topics_state.local().contains(tp, pid);
}

bool metadata_cache::contains(model::topic_namespace_view tp) const {
return _topics_state.local().contains(tp);
}

ss::future<model::node_id> metadata_cache::get_leader(
const model::ntp& ntp,
ss::lowres_clock::time_point tout,
Expand Down
24 changes: 20 additions & 4 deletions src/v/cluster/metadata_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "cluster/fwd.h"
#include "cluster/health_monitor_types.h"
#include "cluster/partition_leaders_table.h"
#include "cluster/topic_table.h"
#include "cluster/types.h"
#include "model/fundamental.h"
#include "model/metadata.h"
Expand Down Expand Up @@ -72,8 +73,24 @@ class metadata_cache {
///\brief Returns metadata of single topic.
///
/// If topic does not exists it returns an empty optional
std::optional<model::topic_metadata> get_topic_metadata(
std::optional<model::topic_metadata> get_model_topic_metadata(
model::topic_namespace_view, with_leaders = with_leaders::yes) const;
///\brief Returns metadata of single topic.
///
/// If topic does not exists it returns an empty optional
std::optional<cluster::topic_metadata>
get_topic_metadata(model::topic_namespace_view) const;

///\brief Returns reference to metadata of single topic.
///
/// If topic does not exists it returns an empty optional
///
/// IMPORTANT: remember not to use the reference when its lifetime has to
/// span across multiple scheduling point. Reference returning method is
/// provided not to copy metadata object when used by synchronous parts of
/// code
std::optional<std::reference_wrapper<const cluster::topic_metadata>>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice comment, thanks!

Copy link
Member

@dotnwat dotnwat May 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mmaslankaprv thanks for the comment.

It might be worthwhile to consider making this harder to use incorrectly. Perhaps by wrapping in a type that can't be copied or moved, so that it doesn't escape the scope as easily. The issue is that it really looks like it has value semantics at the call site:

    auto metadata = _topics.local().get_topic_metadata_ref(tp_ns);

The other issue with visually seeing value semantics, is that the call site might not have a scheduling point to worry about, but might do something to the container that rebalances it--like any mutation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, i will consider wrapping it in a noncopyable nonmovable type.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nbd. just thoughts in passing reading old PRs

get_topic_metadata_ref(model::topic_namespace_view) const;

///\brief Returns configuration of single topic.
///
Expand All @@ -87,9 +104,7 @@ class metadata_cache {
std::optional<model::timestamp_type>
get_topic_timestamp_type(model::topic_namespace_view) const;

/// Returns metadata of all topics.
std::vector<model::topic_metadata>
all_topics_metadata(with_leaders = with_leaders::yes) const;
const topic_table::underlying_t& all_topics_metadata() const;

/// Returns all brokers, returns copy as the content of broker can change
std::vector<broker_ptr> all_brokers() const;
Expand All @@ -105,6 +120,7 @@ class metadata_cache {
std::optional<broker_ptr> get_broker(model::node_id) const;

bool contains(model::topic_namespace_view, model::partition_id) const;
bool contains(model::topic_namespace_view) const;

bool contains(const model::ntp& ntp) const {
return contains(model::topic_namespace_view(ntp), ntp.tp.partition);
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/metrics_reporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ metrics_reporter::build_metrics_snapshot() {
}

snapshot.topic_count++;
snapshot.partition_count += md.get_configuration().cfg.partition_count;
snapshot.partition_count += md.get_configuration().partition_count;
}

snapshot.nodes.reserve(metrics_map.size());
Expand Down
9 changes: 3 additions & 6 deletions src/v/cluster/scheduling/leader_balancer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,7 @@ ss::future<ss::stop_iteration> leader_balancer::balance() {
if (!topic.second.is_topic_replicable()) {
continue;
}
for (const auto& partition :
topic.second.get_configuration().assignments) {
for (const auto& partition : topic.second.get_assignments()) {
if (partition.group == transfer->group) {
ntp = model::ntp(topic.first.ns, topic.first.tp, partition.id);
break;
Expand Down Expand Up @@ -438,8 +437,7 @@ leader_balancer::find_leader_shard(const model::ntp& ntp) {

// If the inital query was for a non_replicable ntp, the get_leader() call
// would have returned its source topic
for (const auto& partition :
config_it->second.get_configuration().assignments) {
for (const auto& partition : config_it->second.get_assignments()) {
if (partition.id != ntp.tp.partition) {
continue;
}
Expand Down Expand Up @@ -474,8 +472,7 @@ leader_balancer::index_type leader_balancer::build_index() {
if (!topic.second.is_topic_replicable()) {
continue;
}
for (const auto& partition :
topic.second.get_configuration().assignments) {
for (const auto& partition : topic.second.get_assignments()) {
if (partition.replicas.empty()) {
vlog(
clusterlog.warn,
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ service::fetch_metadata_and_cfg(const std::vector<topic_result>& res) {
md.reserve(res.size());
for (const auto& r : res) {
if (r.ec == errc::success) {
auto topic_md = _md_cache.local().get_topic_metadata(r.tp_ns);
auto topic_md = _md_cache.local().get_model_topic_metadata(r.tp_ns);
auto topic_cfg = _md_cache.local().get_topic_cfg(r.tp_ns);
if (topic_md && topic_cfg) {
md.push_back(std::move(topic_md.value()));
Expand Down
18 changes: 11 additions & 7 deletions src/v/cluster/tests/autocreate_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ void validate_topic_metadata(cluster::metadata_cache& cache) {
for (auto& t_cfg : expected_topics) {
auto tp_md = cache.get_topic_metadata(t_cfg.tp_ns);
BOOST_REQUIRE_EQUAL(tp_md.has_value(), true);
BOOST_REQUIRE_EQUAL(tp_md->partitions.size(), t_cfg.partition_count);
BOOST_REQUIRE_EQUAL(
tp_md->get_assignments().size(), t_cfg.partition_count);
auto cfg = cache.get_topic_cfg(t_cfg.tp_ns);
BOOST_REQUIRE_EQUAL(cfg->tp_ns, t_cfg.tp_ns);
BOOST_REQUIRE_EQUAL(cfg->partition_count, t_cfg.partition_count);
Expand All @@ -54,11 +55,14 @@ void validate_topic_metadata(cluster::metadata_cache& cache) {

void wait_for_leaders(
cluster::partition_leaders_table& leader_table,
const model::topic_metadata md) {
const cluster::topic_metadata md) {
std::vector<ss::future<>> f;
f.reserve(md.partitions.size());
for (const auto& p : md.partitions) {
model::ntp ntp(md.tp_ns.ns, md.tp_ns.tp, p.id);
f.reserve(md.get_assignments().size());
for (const auto& p : md.get_assignments()) {
model::ntp ntp(
md.get_configuration().tp_ns.ns,
md.get_configuration().tp_ns.tp,
p.id);

f.push_back(leader_table
.wait_for_leader(
Expand Down Expand Up @@ -109,7 +113,7 @@ FIXTURE_TEST(create_single_topic_test_at_current_broker, cluster_test_fixture) {

BOOST_REQUIRE_EQUAL(md.has_value(), true);
wait_for_leaders(app->controller->get_partition_leaders().local(), *md);
BOOST_REQUIRE_EQUAL(md.value().tp_ns, r.tp_ns);
BOOST_REQUIRE_EQUAL(md->get_configuration().tp_ns, r.tp_ns);
}
}

Expand Down Expand Up @@ -151,7 +155,7 @@ FIXTURE_TEST(test_autocreate_on_non_leader, cluster_test_fixture) {
BOOST_REQUIRE_EQUAL(md.has_value(), true);
wait_for_leaders(
app_0->controller->get_partition_leaders().local(), *md);
BOOST_REQUIRE_EQUAL(md.value().tp_ns, r.tp_ns);
BOOST_REQUIRE_EQUAL(md->get_configuration().tp_ns, r.tp_ns);
}
// Make sure caches are the same
validate_topic_metadata(get_local_cache(n_1));
Expand Down
10 changes: 6 additions & 4 deletions src/v/cluster/tests/create_partitions_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,17 @@ FIXTURE_TEST(test_creating_partitions, rebalancing_tests_fixture) {
auto tp_2_md = topics.local().get_topic_metadata(make_tp_ns("test-2"));

// total 6 partitions
BOOST_REQUIRE_EQUAL(tp_1_md->partitions.size(), 6);
BOOST_REQUIRE_EQUAL(tp_1_md->get_assignments().size(), 6);
for (auto i = 0; i < 6; ++i) {
BOOST_REQUIRE_EQUAL(tp_1_md->partitions[i].id, model::partition_id(i));
BOOST_REQUIRE(
tp_1_md->get_assignments().contains(model::partition_id(i)));
}

// total 9 partitions
BOOST_REQUIRE_EQUAL(tp_2_md->partitions.size(), 9);
BOOST_REQUIRE_EQUAL(tp_2_md->get_assignments().size(), 9);
for (auto i = 0; i < 9; ++i) {
BOOST_REQUIRE_EQUAL(tp_2_md->partitions[i].id, model::partition_id(i));
BOOST_REQUIRE(
tp_2_md->get_assignments().contains(model::partition_id(i)));
}
}

Expand Down
15 changes: 9 additions & 6 deletions src/v/cluster/tests/metadata_dissemination_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,22 @@ wait_for_leaders_updates(int id, cluster::metadata_cache& cache) {
std::chrono::seconds(10),
[&cache, &leaders, id] {
leaders.clear();
auto tp_md = cache.get_topic_metadata(model::topic_namespace(
model::ns("default"), model::topic("test_1")));
const model::topic_namespace tn(
model::ns("default"), model::topic("test_1"));
auto tp_md = cache.get_topic_metadata(tn);

if (!tp_md) {
return false;
}
if (tp_md->partitions.size() != 3) {
if (tp_md->get_assignments().size() != 3) {
return false;
}
for (auto& p_md : tp_md->partitions) {
if (!p_md.leader_node) {
for (auto& p_md : tp_md->get_assignments()) {
auto leader_id = cache.get_leader_id(tn, p_md.id);
if (!leader_id) {
return false;
}
leaders.push_back(*p_md.leader_node);
leaders.push_back(*leader_id);
}
return true;
})
Expand Down
11 changes: 5 additions & 6 deletions src/v/cluster/tests/partition_moving_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,11 @@ class partition_assignment_test_fixture : public cluster_test_fixture {

std::vector<model::broker_shard>
get_replicas(int source_node, const model::ntp& ntp) {
auto md = controller(source_node)
->get_topics_state()
.local()
.get_topic_metadata(model::topic_namespace_view(ntp));

return md->partitions.begin()->replicas;
return controller(source_node)
->get_topics_state()
.local()
.get_partition_assignment(ntp)
->replicas;
}

void wait_for_replica_set_partitions(
Expand Down
9 changes: 4 additions & 5 deletions src/v/cluster/tests/rebalancing_tests_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class rebalancing_tests_fixture : public cluster_test_fixture {
.local()
.get_topic_metadata(model::topic_namespace_view(ntp));

return md->partitions.begin()->replicas;
return md->get_assignments().begin()->replicas;
}

void create_topic(cluster::topic_configuration cfg) {
Expand Down Expand Up @@ -204,10 +204,9 @@ class rebalancing_tests_fixture : public cluster_test_fixture {

void populate_all_topics_with_data() {
auto md = get_local_cache(model::node_id(0)).all_topics_metadata();
for (auto& topic_metadata : md) {
for (auto& p : topic_metadata.partitions) {
model::ntp ntp(
topic_metadata.tp_ns.ns, topic_metadata.tp_ns.tp, p.id);
for (auto& [tp_ns, topic_metadata] : md) {
for (auto& p : topic_metadata.get_assignments()) {
model::ntp ntp(tp_ns.ns, tp_ns.tp, p.id);
replicate_data(ntp, 10);
}
}
Expand Down
Loading