Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delta per ntp v1 #16

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 107 additions & 103 deletions src/v/cluster/controller_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,24 +59,27 @@ ss::future<> controller_backend::start() {
return ss::now();
}

ss::future<> controller_backend::fetch_deltas() {
return _topics.local()
.wait_for_changes(_as.local())
.then([this](deltas_t deltas) {
return ss::with_semaphore(
_topics_sem, 1, [this, deltas = std::move(deltas)]() mutable {
for (auto& d : deltas) {
auto ntp = d.ntp;
_topic_deltas[ntp].push_back(std::move(d));
}
});
});
}

void controller_backend::start_topics_reconciliation_loop() {
(void)ss::with_gate(_gate, [this] {
return ss::do_until(
[this] { return _as.local().abort_requested(); },
[this] {
return _topics.local()
.wait_for_changes(_as.local())
.then([this](std::vector<topic_table::delta> deltas) {
return ss::with_semaphore(
_topics_sem,
1,
[this, deltas = std::move(deltas)]() mutable {
for (auto& d : deltas) {
_topic_deltas.emplace_back(std::move(d));
}
return reconcile_topics();
});
})
return fetch_deltas()
.then([this] { return reconcile_topics(); })
.handle_exception([](const std::exception_ptr& e) {
vlog(
clusterlog.error,
Expand All @@ -89,30 +92,61 @@ void controller_backend::start_topics_reconciliation_loop() {

void controller_backend::housekeeping() {
if (!_topic_deltas.empty() && _topics_sem.available_units() > 0) {
(void)ss::with_gate(_gate, [this] {
return ss::with_semaphore(
_topics_sem, 1, [this] { return reconcile_topics(); });
});
(void)ss::with_gate(_gate, [this] { return reconcile_topics(); });
}
}

ss::future<> controller_backend::reconcile_ntp(deltas_t& deltas) {
return ss::do_with(
bool{false},
deltas.cbegin(),
[this, &deltas](bool& stop, deltas_t::const_iterator& it) {
return ss::do_until(
[&stop, &it, &deltas] {
return stop || it == deltas.cend();
},
[this, &it, &stop] {
return execute_partitition_op(*it).then(
[&it, &stop](std::error_code ec) {
if (ec) {
stop = true;
return;
}
it++;
});
})
.then([&deltas, &it] {
// remove finished tasks
deltas.erase(deltas.cbegin(), it);
});
});
}

// caller must hold _topics_sem lock
ss::future<> controller_backend::reconcile_topics() {
using meta_t = task_meta<topic_table::delta>;
// _topic_deltas are chronologically ordered, we cannot reorder applying
// them, hence we have to use `ss::do_for_each`
return ss::do_for_each(
std::begin(_topic_deltas),
std::end(_topic_deltas),
[this](meta_t& task) { return do_reconcile_topic(task); })
.then([this] {
// remove finished tasks
auto it = std::stable_partition(
std::begin(_topic_deltas),
std::end(_topic_deltas),
[](meta_t& task) { return task.finished; });
_topic_deltas.erase(_topic_deltas.begin(), it);
});
return ss::with_semaphore(_topics_sem, 1, [this] {
if (_topic_deltas.empty()) {
return ss::now();
}
// reconcile NTPs in parallel
return ss::parallel_for_each(
_topic_deltas.begin(),
_topic_deltas.end(),
[this](underlying_t::value_type& ntp_deltas) {
return reconcile_ntp(ntp_deltas.second);
})
.then([this] {
// cleanup empty NTP keys
for (auto it = _topic_deltas.cbegin();
it != _topic_deltas.cend();) {
if (it->second.empty()) {
_topic_deltas.erase(it++);
} else {
++it;
}
}
});
});
}

bool has_local_replicas(
Expand Down Expand Up @@ -146,101 +180,68 @@ std::vector<model::broker> create_brokers_set(
return brokers;
}

controller_backend::results_t join_results(
controller_backend::results_t prev_results,
controller_backend::results_t new_results) {
std::move(
new_results.begin(), new_results.end(), std::back_inserter(prev_results));
return prev_results;
}

ss::future<>
controller_backend::do_reconcile_topic(task_meta<topic_table::delta>& task) {
ss::future<std::error_code>
controller_backend::execute_partitition_op(const topic_table::delta& delta) {
/**
* Revision is derived from delta offset, i.e. offset of a command that
* the delta is derived from. The offset is always monotonically
* increasing together with cluster state evelotion hence it is perfect
* as a source of revision_id
*/
vlog(clusterlog.trace, "Executing operation: {}", delta);
model::revision_id rev(delta.offset());
// new partitions
auto f = ssx::parallel_transform(
task.delta.partitions.additions,
[this, o = task.delta.offset](topic_table::delta::partition p) {
// only create partitions for this backend
// partitions created on current shard at this node
if (!has_local_replicas(_self, p.second.replicas)) {
return ss::make_ready_future<std::error_code>(errc::success);
}
return create_partition(
model::ntp(p.first.ns, p.first.tp, p.second.id),
p.second.group,
o,
create_brokers_set(p.second.replicas, _members_table.local()));
});

// delete partitions
f = f.then([this, &task](results_t results) {
return ssx::parallel_transform(
task.delta.partitions.deletions,
[this](const topic_table::delta::partition& p) {
return delete_partition(
model::ntp(p.first.ns, p.first.tp, p.second.id));
})
.then([results = std::move(results)](results_t new_results) mutable {
return join_results(std::move(results), std::move(new_results));
});
});

// partition updates
f = f.then([this, &task](results_t results) {
return ssx::parallel_transform(
task.delta.partitions.updates,
[this, o = task.delta.offset](
const topic_table::delta::partition& p) {
return process_partition_update(p, o);
})
.then([results = std::move(results)](results_t new_res) mutable {
return join_results(std::move(results), std::move(new_res));
});
});

return f.then([&task](results_t all_results) {
auto success = std::all_of(
std::cbegin(all_results),
std::cend(all_results),
[](std::error_code ec) { return !ec; });
// only mark task as finished if all operations are successfull
if (success) {
task.finished = true;
// only create partitions for this backend
// partitions created on current shard at this node
auto f = ss::make_ready_future<std::error_code>(errc::success);
switch (delta.type) {
case topic_table::delta::op_type::add:
if (!has_local_replicas(_self, delta.p_as.replicas)) {
return ss::make_ready_future<std::error_code>(errc::success);
}
});
return create_partition(
delta.ntp,
delta.p_as.group,
rev,
create_brokers_set(delta.p_as.replicas, _members_table.local()));
case topic_table::delta::op_type::del:
return delete_partition(delta.ntp, rev);
case topic_table::delta::op_type::update:
return process_partition_update(delta.ntp, delta.p_as, rev);
}
}

ss::future<std::error_code> controller_backend::process_partition_update(
const topic_table::delta::partition& p, model::offset offset) {
model::ntp ntp(p.first.ns, p.first.tp, p.second.id);
model::ntp ntp, const partition_assignment& p_as, model::revision_id rev) {
// if there is no local replica in replica set but,
// partition with requested ntp exists on this broker core
// it has to be removed after new configuration is stable
auto partition = _partition_manager.local().get(ntp);
if (!has_local_replicas(_self, p.second.replicas)) {
if (!has_local_replicas(_self, p_as.replicas)) {
// we do not have local replicas and partition does not
// exists, it is ok
if (!partition) {
return ss::make_ready_future<std::error_code>(errc::success);
}
// this partition has to be removed eventually

return update_partition_replica_set(ntp, p.second.replicas)
.then([this, ntp](std::error_code ec) {
return update_partition_replica_set(ntp, p_as.replicas)
.then([this, ntp, rev](std::error_code ec) {
if (!ec) {
return delete_partition(ntp);
return delete_partition(ntp, rev);
}
return ss::make_ready_future<std::error_code>(ec);
});
}

// partition already exists, update configuration
if (partition) {
return update_partition_replica_set(ntp, p.second.replicas);
return update_partition_replica_set(ntp, p_as.replicas);
}
// create partition with empty configuration. Configuration
// will be populated during node recovery
return create_partition(ntp, p.second.group, offset, {});
return create_partition(ntp, p_as.group, rev, {});
}

bool is_configuration_up_to_date(
Expand Down Expand Up @@ -307,7 +308,7 @@ ss::future<> controller_backend::add_to_shard_table(
ss::future<std::error_code> controller_backend::create_partition(
model::ntp ntp,
raft::group_id group_id,
model::offset offset,
model::revision_id rev,
std::vector<model::broker> members) {
auto cfg = _topics.local().get_topic_cfg(model::topic_namespace_view(ntp));

Expand All @@ -321,10 +322,9 @@ ss::future<std::error_code> controller_backend::create_partition(
if (likely(_partition_manager.local().get(ntp).get() == nullptr)) {
// we use offset as an ntp_id as it is always increasing and it
// increases while ntp is being created again
auto ntp_id = storage::ntp_config::ntp_id(offset());
f = _partition_manager.local()
.manage(
cfg->make_ntp_config(_data_directory, ntp.tp.partition, ntp_id),
cfg->make_ntp_config(_data_directory, ntp.tp.partition, rev),
group_id,
std::move(members))
.discard_result();
Expand All @@ -340,11 +340,15 @@ ss::future<std::error_code> controller_backend::create_partition(
}

ss::future<std::error_code>
controller_backend::delete_partition(model::ntp ntp) {
controller_backend::delete_partition(model::ntp ntp, model::revision_id rev) {
auto part = _partition_manager.local().get(ntp);
if (unlikely(part.get() == nullptr)) {
return ss::make_ready_future<std::error_code>(errc::success);
}
// partition was already recreated with greater rev, do nothing
if (unlikely(part->get_revision_id() > rev)) {
return ss::make_ready_future<std::error_code>(errc::success);
}
auto group_id = part->group();

return _shard_table
Expand Down
28 changes: 14 additions & 14 deletions src/v/cluster/controller_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,26 @@ class controller_backend {
ss::future<> start();

private:
template<typename T>
struct task_meta {
explicit task_meta(T t)
: delta(std::move(t)) {}

bool finished = false;
T delta;
};

using deltas_t = std::vector<topic_table::delta>;
using underlying_t = absl::flat_hash_map<model::ntp, deltas_t>;
// Topics
void start_topics_reconciliation_loop();
ss::future<> reconcile_topics();
ss::future<> do_reconcile_topic(task_meta<topic_table::delta>&);
ss::future<std::error_code>
execute_partitition_op(const topic_table::delta&);
ss::future<std::error_code> create_partition(
model::ntp, raft::group_id, model::offset, std::vector<model::broker>);
model::ntp,
raft::group_id,
model::revision_id,
std::vector<model::broker>);
ss::future<> add_to_shard_table(model::ntp, raft::group_id, ss::shard_id);
ss::future<std::error_code> process_partition_update(
const topic_table::delta::partition&, model::offset);
model::ntp, const partition_assignment&, model::revision_id);
ss::future<> fetch_deltas();
ss::future<> reconcile_ntp(deltas_t&);

ss::future<std::error_code> delete_partition(model::ntp);
ss::future<std::error_code>
delete_partition(model::ntp, model::revision_id);
ss::future<std::error_code> update_partition_replica_set(
const model::ntp&, const std::vector<model::broker_shard>&);

Expand All @@ -63,7 +63,7 @@ class controller_backend {
model::node_id _self;
ss::sstring _data_directory;
ss::sharded<ss::abort_source>& _as;
std::vector<task_meta<topic_table::delta>> _topic_deltas;
underlying_t _topic_deltas;
ss::timer<> _housekeeping_timer;
ss::semaphore _topics_sem{1};
ss::gate _gate;
Expand Down
8 changes: 7 additions & 1 deletion src/v/cluster/errc.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ enum class errc {
topic_not_exists,
invalid_topic_name,
partition_not_exists,
not_leader
not_leader,
partition_already_exists,
waiting_for_recovery,
};
struct errc_category final : public std::error_category {
const char* name() const noexcept final { return "cluster::errc"; }
Expand Down Expand Up @@ -69,6 +71,10 @@ struct errc_category final : public std::error_category {
return "Invalid topic name";
case errc::partition_not_exists:
return "Requested partition does not exists";
case errc::partition_already_exists:
return "Requested partition already exists";
case errc::waiting_for_recovery:
return "Waiting for partition to recover";
default:
return "cluster::errc::unknown";
}
Expand Down
4 changes: 4 additions & 0 deletions src/v/cluster/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ class partition {

partition_probe& probe() { return _probe; }

model::revision_id get_revision_id() const {
return _raft->log_config().get_revision();
}

private:
friend partition_manager;

Expand Down
Loading