Skip to content

Commit

Permalink
c/types: introduce kafka offset types
Browse files Browse the repository at this point in the history
We're going to mix raft and kafka offset in the same class, since
both the offsets uses the same type it's easy to make an error and
treat one as it was another. Introducing kafka offset to rely on the
type system to prevent such errors.
  • Loading branch information
rystsov committed Jul 9, 2022
1 parent e693bea commit 9335075
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 0 deletions.
10 changes: 10 additions & 0 deletions src/v/cluster/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@

namespace cluster {

kafka_stages::kafka_stages(
ss::future<> enq, ss::future<result<kafka_result>> offset_future)
: request_enqueued(std::move(enq))
, replicate_finished(std::move(offset_future)) {}

kafka_stages::kafka_stages(raft::errc ec)
: request_enqueued(ss::now())
, replicate_finished(
ss::make_ready_future<result<kafka_result>>(make_error_code(ec))){};

bool topic_properties::is_compacted() const {
if (!cleanup_policy_bitflags) {
return false;
Expand Down
14 changes: 14 additions & 0 deletions src/v/cluster/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,20 @@ inline std::error_code make_error_code(tx_errc e) noexcept {
return std::error_code(static_cast<int>(e), tx_error_category());
}

struct kafka_result {
kafka::offset last_offset;
};
struct kafka_stages {
kafka_stages(ss::future<>, ss::future<result<kafka_result>>);
explicit kafka_stages(raft::errc);
// after this future is ready, request in enqueued in raft and it will not
// be reorderd
ss::future<> request_enqueued;
// after this future is ready, request was successfully replicated with
// requested consistency level
ss::future<result<kafka_result>> replicate_finished;
};

struct try_abort_request
: serde::envelope<try_abort_request, serde::version<0>> {
model::partition_id tm;
Expand Down
6 changes: 6 additions & 0 deletions src/v/model/fundamental.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@
#include <string_view>
#include <type_traits>

namespace kafka {

using offset = named_type<int64_t, struct kafka_offset_type>;

} // namespace kafka

namespace model {

// Named after Kafka cleanup.policy topic property
Expand Down

0 comments on commit 9335075

Please sign in to comment.