Skip to content

Commit

Permalink
rm_stm: shift offset translation to rm_stm
Browse files Browse the repository at this point in the history
switching to caching seq-kafka offsets cache to avoid out of
range errors on translating offsets beyond the eviction point
  • Loading branch information
rystsov committed Jul 12, 2022
1 parent 63c5883 commit 4b42c7e
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 62 deletions.
10 changes: 4 additions & 6 deletions src/v/cluster/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -175,14 +175,12 @@ kafka_stages partition::replicate_in_stages(
}
}

ss::lw_shared_ptr<raft::replicate_stages> res;
if (_rm_stm) {
res = _rm_stm->replicate_in_stages(bid, std::move(r), opts);
} else {
res = _raft->replicate_in_stages(std::move(r), opts);
return _rm_stm->replicate_in_stages(bid, std::move(r), opts);
}

auto replicate_finished = res->replicate_finished.then(
auto res = _raft->replicate_in_stages(std::move(r), opts);
auto replicate_finished = res.replicate_finished.then(
[this](result<raft::replicate_result> r) {
if (!r) {
return ret_t(r.error());
Expand All @@ -193,7 +191,7 @@ kafka_stages partition::replicate_in_stages(
return ret_t(kafka_result{new_offset});
});
return kafka_stages(
std::move(res->request_enqueued), std::move(replicate_finished));
std::move(res.request_enqueued), std::move(replicate_finished));
}

ss::future<> partition::start() {
Expand Down
118 changes: 80 additions & 38 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ ss::future<tx_errc> rm_stm::do_abort_tx(
co_return tx_errc::none;
}

raft::replicate_stages rm_stm::replicate_in_stages(
kafka_stages rm_stm::replicate_in_stages(
model::batch_identity bid,
model::record_batch_reader r,
raft::replicate_options opts) {
Expand All @@ -804,10 +804,10 @@ raft::replicate_stages rm_stm::replicate_in_stages(
enqueued->set_value();
}
});
return raft::replicate_stages(std::move(f), std::move(replicate_finished));
return kafka_stages(std::move(f), std::move(replicate_finished));
}

ss::future<result<raft::replicate_result>> rm_stm::replicate(
ss::future<result<kafka_result>> rm_stm::replicate(
model::batch_identity bid,
model::record_batch_reader r,
raft::replicate_options opts) {
Expand All @@ -824,7 +824,7 @@ rm_stm::transfer_leadership(std::optional<model::node_id> target) {
});
}

ss::future<result<raft::replicate_result>> rm_stm::do_replicate(
ss::future<result<kafka_result>> rm_stm::do_replicate(
model::batch_identity bid,
model::record_batch_reader b,
raft::replicate_options opts,
Expand Down Expand Up @@ -854,6 +854,11 @@ ss::future<> rm_stm::stop() {
return raft::state_machine::stop();
}

ss::future<> rm_stm::start() {
_translator = _c->get_offset_translator_state();
return persisted_stm::start();
}

rm_stm::transaction_info::status_t
rm_stm::get_tx_status(model::producer_identity pid) const {
if (_mem_state.preparing.contains(pid)) {
Expand Down Expand Up @@ -947,7 +952,7 @@ bool rm_stm::check_seq(model::batch_identity bid) {
return false;
}

seq.update(bid.last_seq, model::offset{-1});
seq.update(bid.last_seq, kafka::offset{-1});

seq.pid = bid.pid;
seq.last_write_timestamp = last_write_timestamp;
Expand All @@ -957,7 +962,7 @@ bool rm_stm::check_seq(model::batch_identity bid) {
return true;
}

std::optional<model::offset>
std::optional<kafka::offset>
rm_stm::known_seq(model::batch_identity bid) const {
auto pid_seq = _log_state.seq_table.find(bid.pid);
if (pid_seq == _log_state.seq_table.end()) {
Expand All @@ -982,7 +987,7 @@ std::optional<int32_t> rm_stm::tail_seq(model::producer_identity pid) const {
return pid_seq->second.seq;
}

void rm_stm::set_seq(model::batch_identity bid, model::offset last_offset) {
void rm_stm::set_seq(model::batch_identity bid, kafka::offset last_offset) {
auto pid_seq = _log_state.seq_table.find(bid.pid);
if (pid_seq != _log_state.seq_table.end()) {
if (pid_seq->second.seq == bid.last_seq) {
Expand All @@ -995,14 +1000,14 @@ void rm_stm::reset_seq(model::batch_identity bid) {
_log_state.seq_table.erase(bid.pid);
auto& seq = _log_state.seq_table[bid.pid];
seq.seq = bid.last_seq;
seq.last_offset = model::offset{-1};
seq.last_offset = kafka::offset{-1};
seq.pid = bid.pid;
seq.last_write_timestamp = model::timestamp::now().value();
_oldest_session = std::min(
_oldest_session, model::timestamp(seq.last_write_timestamp));
}

ss::future<result<raft::replicate_result>>
ss::future<result<kafka_result>>
rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) {
if (!check_tx_permitted()) {
co_return errc::generic_tx_error;
Expand Down Expand Up @@ -1056,7 +1061,7 @@ rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) {
// this isn't the first attempt in the tx we should try dedupe
auto cached_offset = known_seq(bid);
if (cached_offset) {
if (cached_offset.value() < model::offset{0}) {
if (cached_offset.value() < kafka::offset{0}) {
vlog(
clusterlog.warn,
"Status of the original attempt is unknown (still is "
Expand All @@ -1070,8 +1075,7 @@ rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) {
// to propagate it to the app layer
co_return errc::generic_tx_error;
}
co_return raft::replicate_result{
.last_offset = cached_offset.value()};
co_return kafka_result{.last_offset = cached_offset.value()};
}

if (!check_seq(bid)) {
Expand Down Expand Up @@ -1107,26 +1111,28 @@ rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) {
expiration_it->second.last_update = clock_type::now();
expiration_it->second.is_expiration_requested = false;

auto replicated = r.value();
auto old_offset = r.value().last_offset;
auto new_offset = from_log_offset(old_offset);

set_seq(bid, replicated.last_offset);
set_seq(bid, new_offset);

auto last_offset = model::offset(replicated.last_offset());
if (!_mem_state.tx_start.contains(bid.pid)) {
auto base_offset = model::offset(
last_offset() - (bid.record_count - 1));
auto base_offset = model::offset(old_offset() - (bid.record_count - 1));
_mem_state.tx_start.emplace(bid.pid, base_offset);
_mem_state.tx_starts.insert(base_offset);
_mem_state.estimated.erase(bid.pid);
}
co_return replicated;

co_return kafka_result{.last_offset = new_offset};
}

ss::future<result<raft::replicate_result>> rm_stm::replicate_seq(
ss::future<result<kafka_result>> rm_stm::replicate_seq(
model::batch_identity bid,
model::record_batch_reader br,
raft::replicate_options opts,
ss::lw_shared_ptr<available_promise<>> enqueued) {
using ret_t = result<kafka_result>;

if (!co_await sync(_sync_timeout)) {
// it's ok not to set enqueued on early return because
// the safety check in replicate_in_stages sets it automatically
Expand Down Expand Up @@ -1185,16 +1191,16 @@ ss::future<result<raft::replicate_result>> rm_stm::replicate_seq(
// checking among the responded requests
auto cached_offset = known_seq(bid);
if (cached_offset) {
co_return raft::replicate_result{.last_offset = cached_offset.value()};
co_return kafka_result{.last_offset = cached_offset.value()};
}

// checking if the request is already being processed
for (auto& inflight : session->cache) {
if (inflight->last_seq == bid.last_seq && inflight->is_processing) {
// found an inflight request, parking the current request
// until the former is resolved
auto promise = ss::make_lw_shared<
available_promise<result<raft::replicate_result>>>();
auto promise
= ss::make_lw_shared<available_promise<result<kafka_result>>>();
inflight->parked.push_back(promise);
u.return_all();
co_return co_await promise->get_future();
Expand Down Expand Up @@ -1279,20 +1285,26 @@ ss::future<result<raft::replicate_result>> rm_stm::replicate_seq(
// we don't need session->lock because we never interleave
// access to is_processing and offset with sync point (await)
request->is_processing = false;
request->r = r;
if (r) {
auto old_offset = r.value().last_offset;
auto new_offset = from_log_offset(old_offset);
request->r = ret_t(kafka_result{new_offset});
} else {
request->r = ret_t(r.error());
}
for (auto& pending : request->parked) {
pending->set_value(r);
pending->set_value(request->r);
}
request->parked.clear();

if (!r) {
if (!request->r) {
// if r was failed at the consensus level (not because has_failed)
// it should guarantee that all follow up replication requests fail
// too but just in case stepping down to minimize the risk
if (_c->is_leader() && _c->term() == synced_term) {
co_await _c->step_down();
}
co_return r;
co_return request->r;
}

// requests get into session->cache in seq order so when we iterate
Expand Down Expand Up @@ -1324,21 +1336,30 @@ ss::future<result<raft::replicate_result>> rm_stm::replicate_seq(
_inflight_requests.erase(bid.pid);
}

co_return r;
co_return request->r;
}

ss::future<result<raft::replicate_result>> rm_stm::replicate_msg(
ss::future<result<kafka_result>> rm_stm::replicate_msg(
model::record_batch_reader br,
raft::replicate_options opts,
ss::lw_shared_ptr<available_promise<>> enqueued) {
using ret_t = result<kafka_result>;

if (!co_await sync(_sync_timeout)) {
co_return errc::not_leader;
}

auto ss = _c->replicate_in_stages(_insync_term, std::move(br), opts);
co_await std::move(ss.request_enqueued);
enqueued->set_value();
co_return co_await std::move(ss.replicate_finished);
auto r = co_await std::move(ss.replicate_finished);

if (!r) {
co_return ret_t(r.error());
}
auto old_offset = r.value().last_offset;
auto new_offset = from_log_offset(old_offset);
co_return ret_t(kafka_result{new_offset});
}

model::offset rm_stm::last_stable_offset() {
Expand Down Expand Up @@ -1785,12 +1806,13 @@ ss::future<> rm_stm::apply_control(
void rm_stm::apply_data(model::batch_identity bid, model::offset last_offset) {
if (bid.has_idempotent()) {
auto [seq_it, inserted] = _log_state.seq_table.try_emplace(bid.pid);
auto translated = from_log_offset(last_offset);
if (inserted) {
seq_it->second.pid = bid.pid;
seq_it->second.seq = bid.last_seq;
seq_it->second.last_offset = last_offset;
seq_it->second.last_offset = translated;
} else {
seq_it->second.update(bid.last_seq, last_offset);
seq_it->second.update(bid.last_seq, translated);
}
seq_it->second.last_write_timestamp = bid.first_timestamp.value();
_oldest_session = std::min(_oldest_session, bid.first_timestamp);
Expand Down Expand Up @@ -1849,11 +1871,21 @@ rm_stm::apply_snapshot(stm_snapshot_header hdr, iobuf&& tx_ss_buf) {
seq_entry seq;
seq.pid = seq_v1.pid;
seq.seq = seq_v1.seq;
seq.last_offset = seq_v1.last_offset;
try {
seq.last_offset = from_log_offset(seq_v1.last_offset);
} catch (...) {
// ignoring outside the translation range errors
continue;
}
seq.seq_cache.reserve(seq_v1.seq_cache.size());
for (auto& item : seq_v1.seq_cache) {
seq.seq_cache.push_back(
seq_cache_entry{.seq = item.seq, .offset = item.offset});
try {
seq.seq_cache.push_back(seq_cache_entry{
.seq = item.seq, .offset = from_log_offset(item.offset)});
} catch (...) {
// ignoring outside the translation range errors
continue;
}
}
seq.last_write_timestamp = seq_v1.last_write_timestamp;
data.seqs.push_back(std::move(seq));
Expand All @@ -1870,7 +1902,7 @@ rm_stm::apply_snapshot(stm_snapshot_header hdr, iobuf&& tx_ss_buf) {
auto seq = seq_entry{
.pid = seq_v0.pid,
.seq = seq_v0.seq,
.last_offset = model::offset{-1},
.last_offset = kafka::offset{-1},
.last_write_timestamp = seq_v0.last_write_timestamp};
data.seqs.push_back(std::move(seq));
}
Expand Down Expand Up @@ -1989,12 +2021,22 @@ ss::future<stm_snapshot> rm_stm::take_snapshot() {
seq_entry_v1 seqs;
seqs.pid = entry.pid;
seqs.seq = entry.seq;
seqs.last_offset = entry.last_offset;
try {
seqs.last_offset = to_log_offset(entry.last_offset);
} catch (...) {
// ignoring outside the translation range errors
continue;
}
seqs.last_write_timestamp = entry.last_write_timestamp;
seqs.seq_cache.reserve(seqs.seq_cache.size());
for (auto& item : entry.seq_cache) {
seqs.seq_cache.push_back(
seq_cache_entry_v1{.seq = item.seq, .offset = item.offset});
try {
seqs.seq_cache.push_back(seq_cache_entry_v1{
.seq = item.seq, .offset = to_log_offset(item.offset)});
} catch (...) {
// ignoring outside the translation range errors
continue;
}
}
tx_ss.seqs.push_back(std::move(seqs));
}
Expand Down
Loading

0 comments on commit 4b42c7e

Please sign in to comment.