Skip to content

Commit

Permalink
r/stm: added support for state machines without snapshot at offset cap
Browse files Browse the repository at this point in the history
To support fast partition movement in Redpanda the
`raft::state_machine_base` interface requires state machine implementer
to provide support for taking snapshot at arbitrary, already applied
offset. This requirement is easy to achieve for all existing state
machines as they do not require raft snapshots at all.

In future we may leverage the Raft protocol snapshot as they provide a
nice way to maintain the bounded log size without compromising
consistency.

This PR introduces changes to `state_machine_manager` and
`state_machine_base` interfaces allowing state machine implementer to
opt out from the requirement to provide snapshot at offset capability.
Without this feature the partition that the STM is build at will not be
fast movable but the `take_snapshot` implementation will be very simple
as it will require simply serializing the current in memory state of the
state machine.

Signed-off-by: Michał Maślanka <michal@redpanda.com>
  • Loading branch information
mmaslankaprv committed Aug 27, 2024
1 parent 5af4e9f commit c17b5ec
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 27 deletions.
6 changes: 3 additions & 3 deletions src/v/cluster/log_eviction_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ log_eviction_stm::do_write_raft_snapshot(model::offset truncation_point) {
_log.debug,
"Requesting raft snapshot with final offset: {}",
truncation_point);
auto snapshot_data = co_await _raft->stm_manager()->take_snapshot(
auto snapshot_result = co_await _raft->stm_manager()->take_snapshot(
truncation_point);
// we need to check snapshot index again as it may already progressed after
// snapshot is taken by stm_manager
Expand All @@ -214,8 +214,8 @@ log_eviction_stm::do_write_raft_snapshot(model::offset truncation_point) {
truncation_point);
co_return;
}
co_await _raft->write_snapshot(
raft::write_snapshot_cfg(truncation_point, std::move(snapshot_data)));
co_await _raft->write_snapshot(raft::write_snapshot_cfg(
snapshot_result.last_included_offset, std::move(snapshot_result.data)));
}

kafka::offset log_eviction_stm::kafka_start_offset_override() {
Expand Down
8 changes: 5 additions & 3 deletions src/v/raft/recovery_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ recovery_stm::required_snapshot_type recovery_stm::get_required_snapshot_type(
*/
if (
follower_metadata.is_learner && _ptr->get_learner_start_offset()
&& follower_metadata.next_index < *_ptr->get_learner_start_offset()) {
&& follower_metadata.next_index < *_ptr->get_learner_start_offset()
&& _ptr->stm_manager()->supports_snapshot_at_offset()) {
// current snapshot moved beyond configured learner start offset, we can
// use current snapshot instead creating a new on demand one
if (*_ptr->get_learner_start_offset() <= _ptr->last_snapshot_index()) {
Expand Down Expand Up @@ -470,8 +471,9 @@ recovery_stm::take_on_demand_snapshot(model::offset last_included_offset) {
iobuf snapshot_data;

if (_ptr->stm_manager()) {
snapshot_data = co_await _ptr->stm_manager()->take_snapshot(
last_included_offset);
snapshot_data = (co_await _ptr->stm_manager()->take_snapshot(
last_included_offset))
.data;
}
auto cfg = _ptr->_configuration_manager.get(last_included_offset);
const auto term = _ptr->log()->get_term(last_included_offset);
Expand Down
32 changes: 27 additions & 5 deletions src/v/raft/state_machine_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
#include "raft/offset_monitor.h"

namespace raft {

using snapshot_at_offset_supported
= ss::bool_class<struct snapshot_at_offset_supported_tag>;
class consensus;
/**
* State machine interface. The class provides an interface that must be
Expand Down Expand Up @@ -56,13 +57,19 @@ class state_machine_base {
* returned in previous `state_machine_base::take_snapshot()` calls
*/
virtual ss::future<> apply_raft_snapshot(const iobuf&) = 0;

/**
* Returns a snapshot of an STM state with requested last included offset
* Returns a snapshot of an STM state with requested last included offset.
*/
virtual ss::future<iobuf>
take_snapshot(model::offset last_included_offset) = 0;
ss::future<iobuf> take_raft_snapshot(model::offset last_included_offset)

{
vassert(
supports_snapshot_at_offset()
|| last_included_offset == last_applied_offset(),
"if state machine do not support snapshots at offset it is only "
"allowed to call take snapshot with the last applied offset");
return take_snapshot(last_included_offset);
};
/**
* Last successfully applied offset
*/
Expand All @@ -82,7 +89,22 @@ class state_machine_base {
*/
virtual ss::future<> remove_local_state() = 0;

/**
* Returns true if a state machine supports taking a snapshot at any offset.
* It the state machine allows taking snapshot at any applied offset the
* partition will support fast reconfigurations.
*/
virtual snapshot_at_offset_supported supports_snapshot_at_offset() const {
return snapshot_at_offset_supported::yes;
}

protected:
/**
* Returns a snapshot of an STM state with requested last included offset.
* For implementer
*/
virtual ss::future<iobuf>
take_snapshot(model::offset last_included_offset) = 0;
/**
* Lifecycle is managed by state_machine_manager
*/
Expand Down
49 changes: 46 additions & 3 deletions src/v/raft/state_machine_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ state_machine_manager::state_machine_manager(
, _log(ctx_log(_raft->group(), _raft->ntp()))
, _apply_sg(apply_sg) {
for (auto& n_stm : stms) {
_supports_snapshot_at_offset
= _supports_snapshot_at_offset
&& n_stm.stm->supports_snapshot_at_offset();
_machines.try_emplace(
n_stm.name,
ss::make_lw_shared<state_machine_entry>(
Expand Down Expand Up @@ -482,8 +485,13 @@ ss::future<> state_machine_manager::background_apply_fiber(
entry->name);
}

ss::future<iobuf>
ss::future<state_machine_manager::snapshot_result>
state_machine_manager::take_snapshot(model::offset last_included_offset) {
vassert(
static_cast<bool>(_supports_snapshot_at_offset),
"Snapshot at arbitrary offset can only be taken if manager supports fast "
"reconfigurations");

vlog(
_log.debug,
"taking snapshot with last included offset: {}",
Expand All @@ -506,14 +514,49 @@ state_machine_manager::take_snapshot(model::offset last_included_offset) {
managed_snapshot snapshot;
co_await ss::coroutine::parallel_for_each(
_machines, [last_included_offset, &snapshot](auto entry_pair) {
return entry_pair.second->stm->take_snapshot(last_included_offset)
return entry_pair.second->stm
->take_raft_snapshot(last_included_offset)
.then([&snapshot, key = entry_pair.first](auto snapshot_part) {
snapshot.snapshot_map.try_emplace(
key, std::move(snapshot_part));
});
});

co_return state_machine_manager::snapshot_result{
serde::to_iobuf(std::move(snapshot)), last_included_offset};
}

ss::future<state_machine_manager::snapshot_result>
state_machine_manager::take_snapshot() {
if (last_applied() < _raft->start_offset()) {
throw std::logic_error(fmt::format(
"Can not take snapshot of a state from before raft start offset. "
"Requested offset: {}, start offset: {}",
last_applied(),
_raft->start_offset()));
}
auto holder = _gate.hold();
// wait for all STMs to be on the same page
co_await wait(last_applied(), model::no_timeout, _as);

auto u = co_await _apply_mutex.get_units();
// wait once again for all state machines to apply the same baches
co_await wait(last_applied(), model::no_timeout, _as);
// snapshot can only be taken after all background applies finished
auto units = co_await acquire_background_apply_mutexes();
auto snapshot_offset = last_applied();
managed_snapshot snapshot;
co_await ss::coroutine::parallel_for_each(
_machines, [snapshot_offset, &snapshot](auto entry_pair) {
return entry_pair.second->stm->take_raft_snapshot(snapshot_offset)
.then([&snapshot, key = entry_pair.first](auto snapshot_part) {
snapshot.snapshot_map.try_emplace(
key, std::move(snapshot_part));
});
});

co_return serde::to_iobuf(std::move(snapshot));
co_return state_machine_manager::snapshot_result{
serde::to_iobuf(std::move(snapshot)), snapshot_offset};
}

ss::future<> state_machine_manager::wait(
Expand Down
27 changes: 26 additions & 1 deletion src/v/raft/state_machine_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@ concept StateMachineIterateFunc = requires(
*/
class state_machine_manager final {
public:
/**
* A result returned after taking a snapshot it contains a serde serialized
* snapshot data and last offset included into the snapshot.
*/
struct snapshot_result {
iobuf data;
model::offset last_included_offset;
};

// wait until at least offset is applied to all the state machines
ss::future<> wait(
model::offset,
Expand All @@ -75,13 +84,28 @@ class state_machine_manager final {
* state i.e last snapshot index is derived from last_applied_offset. In
* Redpanda we use different approach. Data eviction policy forces us to
* allow state machines to take snapshot at arbitrary offsets.
*
* IMPORTANT: This API is only supported if all state machines support
* taking snapshots at arbitrary offset.
*/
ss::future<iobuf> take_snapshot(model::offset);
ss::future<snapshot_result> take_snapshot(model::offset);

/**
* If any of the state machines in the manager doesn't support fast
* reconfigurations this is the only API that the user is allowed to call,
* the take snapshot with offset other than _last_applied_offset will fail.
*/
ss::future<snapshot_result> take_snapshot();

ss::future<> start();
ss::future<> stop();

model::offset last_applied() const { return model::prev_offset(_next); }

snapshot_at_offset_supported supports_snapshot_at_offset() const {
return _supports_snapshot_at_offset;
}

/**
* Returns a pointer to specific type of state machine.
*
Expand Down Expand Up @@ -192,6 +216,7 @@ class state_machine_manager final {
ss::gate _gate;
ss::abort_source _as;
ss::scheduling_group _apply_sg;
snapshot_at_offset_supported _supports_snapshot_at_offset{true};
};

/**
Expand Down
21 changes: 11 additions & 10 deletions src/v/raft/tests/persisted_stm_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -433,16 +433,17 @@ struct persisted_stm_test_fixture : state_machine_fixture {
});

// take snapshots on all of the nodes
co_await parallel_for_each_node(
[snapshot_offset](raft_node_instance& n) {
return n.raft()
->stm_manager()
->take_snapshot(snapshot_offset)
.then([raft = n.raft(), snapshot_offset](iobuf snapshot_data) {
return raft->write_snapshot(raft::write_snapshot_cfg(
snapshot_offset, std::move(snapshot_data)));
});
});
co_await parallel_for_each_node([snapshot_offset](
raft_node_instance& n) {
return n.raft()
->stm_manager()
->take_snapshot(snapshot_offset)
.then([raft = n.raft(), snapshot_offset](
state_machine_manager::snapshot_result snapshot_result) {
return raft->write_snapshot(raft::write_snapshot_cfg(
snapshot_offset, std::move(snapshot_result.data)));
});
});
}

ss::future<> take_local_snapshot_on_every_node() {
Expand Down
62 changes: 60 additions & 2 deletions src/v/raft/tests/stm_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,10 @@ TEST_F_CORO(state_machine_fixture, test_recovery_from_snapshot) {
return n.raft()
->stm_manager()
->take_snapshot(snapshot_offset)
.then([raft = n.raft(), snapshot_offset](iobuf snapshot_data) {
.then([raft = n.raft(), snapshot_offset](
state_machine_manager::snapshot_result snapshot_result) {
return raft->write_snapshot(raft::write_snapshot_cfg(
snapshot_offset, std::move(snapshot_data)));
snapshot_offset, std::move(snapshot_result.data)));
});
});

Expand Down Expand Up @@ -555,3 +556,60 @@ TEST_F_CORO(state_machine_fixture, test_all_machines_throw) {
ASSERT_EQ_CORO(stm->state, expected);
}
}

class non_fast_movable_kv : public simple_kv {
public:
static constexpr std::string_view name = "other_persited_kv_stm";
explicit non_fast_movable_kv(raft_node_instance& rn)
: simple_kv(rn) {}

ss::future<iobuf> take_snapshot(model::offset last_included_offset) final {
vassert(
last_included_offset == last_applied_offset(),
"state machine do not support snapshot at arbitrary offset");
co_return serde::to_iobuf(state);
}

snapshot_at_offset_supported supports_snapshot_at_offset() const final {
return snapshot_at_offset_supported::no;
}
};

TEST_F_CORO(state_machine_fixture, test_opt_out_from_snapshot_at_offset) {
create_nodes();
std::vector<ss::shared_ptr<simple_kv>> stms;
for (auto& [id, node] : nodes()) {
raft::state_machine_manager_builder builder;
builder.create_stm<simple_kv>(*node);
builder.create_stm<non_fast_movable_kv>(*node);

co_await node->init_and_start(all_vnodes(), std::move(builder));
}

for (auto& [_, node] : nodes()) {
ASSERT_FALSE_CORO(
node->raft()->stm_manager()->supports_snapshot_at_offset());
}

auto expected = co_await build_random_state(1000);

// take snapshots on all of the nodes
absl::flat_hash_map<model::node_id, model::offset> offsets;
for (auto& [id, node] : nodes()) {
auto o = co_await node->raft()->stm_manager()->take_snapshot().then(
[raft = node->raft()](
state_machine_manager::snapshot_result snapshot_data) {
return raft
->write_snapshot(raft::write_snapshot_cfg(
snapshot_data.last_included_offset,
std::move(snapshot_data.data)))
.then([o = snapshot_data.last_included_offset] { return o; });
});
offsets[id] = o;
}

for (const auto& [id, n] : nodes()) {
ASSERT_EQ_CORO(
n->raft()->start_offset(), model::next_offset(offsets[id]));
}
}

0 comments on commit c17b5ec

Please sign in to comment.