Skip to content

Commit

Permalink
cluster: shift offset translation to partition
Browse files Browse the repository at this point in the history
Shifting offset translation down the abstraction well to eventually
reach rm_stm
  • Loading branch information
rystsov committed Jul 9, 2022
1 parent 9335075 commit e3d24d9
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 19 deletions.
42 changes: 32 additions & 10 deletions src/v/cluster/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,15 @@ partition::partition(
}
}

ss::future<result<raft::replicate_result>> partition::replicate(
ss::future<result<kafka_result>> partition::replicate(
model::record_batch_reader&& r, raft::replicate_options opts) {
return _raft->replicate(std::move(r), opts);
using ret_t = result<kafka_result>;
auto res = co_await _raft->replicate(std::move(r), opts);
if (!res) {
co_return ret_t(res.error());
}
co_return ret_t(kafka_result{
kafka::offset(_translator->from_log_offset(res.value().last_offset)())});
}

ss::shared_ptr<cluster::rm_stm> partition::rm_stm() {
Expand All @@ -126,26 +132,27 @@ ss::shared_ptr<cluster::rm_stm> partition::rm_stm() {
return _rm_stm;
}

raft::replicate_stages partition::replicate_in_stages(
kafka_stages partition::replicate_in_stages(
model::batch_identity bid,
model::record_batch_reader&& r,
raft::replicate_options opts) {
using ret_t = result<kafka_result>;
if (bid.is_transactional) {
if (!_is_tx_enabled) {
vlog(
clusterlog.error,
"Can't process a transactional request to {}. Transactional "
"processing isn't enabled.",
_raft->ntp());
return raft::replicate_stages(raft::errc::timeout);
return kafka_stages(raft::errc::timeout);
}

if (!_rm_stm) {
vlog(
clusterlog.error,
"Topic {} doesn't support transactional processing.",
_raft->ntp());
return raft::replicate_stages(raft::errc::timeout);
return kafka_stages(raft::errc::timeout);
}
}

Expand All @@ -156,31 +163,46 @@ raft::replicate_stages partition::replicate_in_stages(
"Can't process an idempotent request to {}. Idempotency isn't "
"enabled.",
_raft->ntp());
return raft::replicate_stages(raft::errc::timeout);
return kafka_stages(raft::errc::timeout);
}

if (!_rm_stm) {
vlog(
clusterlog.error,
"Topic {} doesn't support idempotency.",
_raft->ntp());
return raft::replicate_stages(raft::errc::timeout);
return kafka_stages(raft::errc::timeout);
}
}

ss::lw_shared_ptr<raft::replicate_stages> res;
if (_rm_stm) {
return _rm_stm->replicate_in_stages(bid, std::move(r), opts);
res = _rm_stm->replicate_in_stages(bid, std::move(r), opts);
} else {
return _raft->replicate_in_stages(std::move(r), opts);
res = _raft->replicate_in_stages(std::move(r), opts);
}

auto replicate_finished = res->replicate_finished.then(
[this](result<raft::replicate_result> r) {
if (!r) {
return ret_t(r.error());
}
auto old_offset = r.value().last_offset;
auto new_offset = kafka::offset(
_translator->from_log_offset(old_offset)());
return ret_t(kafka_result{new_offset});
});
return kafka_stages(
std::move(res->request_enqueued), std::move(replicate_finished));
}

ss::future<> partition::start() {
auto ntp = _raft->ntp();

_probe.setup_metrics(ntp);

auto f = _raft->start();
auto f = _raft->start().then(
[this] { _translator = _raft->get_offset_translator_state(); });

if (is_id_allocator_topic(ntp)) {
return f.then([this] { return _id_allocator_stm->start(); });
Expand Down
5 changes: 3 additions & 2 deletions src/v/cluster/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ class partition {
ss::future<> start();
ss::future<> stop();

ss::future<result<raft::replicate_result>>
ss::future<result<kafka_result>>
replicate(model::record_batch_reader&&, raft::replicate_options);

raft::replicate_stages replicate_in_stages(
kafka_stages replicate_in_stages(
model::batch_identity,
model::record_batch_reader&&,
raft::replicate_options);
Expand Down Expand Up @@ -293,6 +293,7 @@ class partition {
bool _is_tx_enabled{false};
bool _is_idempotence_enabled{false};
ss::lw_shared_ptr<cloud_storage::remote_partition> _cloud_storage_partition;
ss::lw_shared_ptr<const storage::offset_translator_state> _translator;

friend std::ostream& operator<<(std::ostream& o, const partition& x);
};
Expand Down
17 changes: 10 additions & 7 deletions src/v/kafka/server/replicated_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,11 @@ ss::future<result<model::offset>> replicated_partition::replicate(
model::record_batch_reader rdr, raft::replicate_options opts) {
using ret_t = result<model::offset>;
return _partition->replicate(std::move(rdr), opts)
.then([this](result<raft::replicate_result> r) {
.then([](result<cluster::kafka_result> r) {
if (!r) {
return ret_t(r.error());
}
return ret_t(_translator->from_log_offset(r.value().last_offset));
return ret_t(model::offset(r.value().last_offset()));
});
}

Expand All @@ -179,15 +179,18 @@ raft::replicate_stages replicated_partition::replicate(
raft::replicate_options opts) {
using ret_t = result<raft::replicate_result>;
auto res = _partition->replicate_in_stages(batch_id, std::move(rdr), opts);
res.replicate_finished = res.replicate_finished.then(
[this](result<raft::replicate_result> r) {

raft::replicate_stages out(raft::errc::success);
out.request_enqueued = std::move(res.request_enqueued);
out.replicate_finished = res.replicate_finished.then(
[](result<cluster::kafka_result> r) {
if (!r) {
return ret_t(r.error());
}
return ret_t(raft::replicate_result{
_translator->from_log_offset(r.value().last_offset)});
return ret_t(
raft::replicate_result{model::offset(r.value().last_offset())});
});
return res;
return out;
}

std::optional<model::offset> replicated_partition::get_leader_epoch_last_offset(
Expand Down

0 comments on commit e3d24d9

Please sign in to comment.