From 93350756123f6883192a0fb6bb2b123f394e9a4b Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Mon, 4 Jul 2022 14:50:26 -0700 Subject: [PATCH] c/types: introduce kafka offset types We're going to mix raft and kafka offset in the same class, since both the offsets uses the same type it's easy to make an error and treat one as it was another. Introducing kafka offset to rely on the type system to prevent such errors. --- src/v/cluster/types.cc | 10 ++++++++++ src/v/cluster/types.h | 14 ++++++++++++++ src/v/model/fundamental.h | 6 ++++++ 3 files changed, 30 insertions(+) diff --git a/src/v/cluster/types.cc b/src/v/cluster/types.cc index 9769e8e07a19..474bfb6a2d9e 100644 --- a/src/v/cluster/types.cc +++ b/src/v/cluster/types.cc @@ -32,6 +32,16 @@ namespace cluster { +kafka_stages::kafka_stages( + ss::future<> enq, ss::future> offset_future) + : request_enqueued(std::move(enq)) + , replicate_finished(std::move(offset_future)) {} + +kafka_stages::kafka_stages(raft::errc ec) + : request_enqueued(ss::now()) + , replicate_finished( + ss::make_ready_future>(make_error_code(ec))){}; + bool topic_properties::is_compacted() const { if (!cleanup_policy_bitflags) { return false; diff --git a/src/v/cluster/types.h b/src/v/cluster/types.h index a5abd73d555f..b101b19a88bf 100644 --- a/src/v/cluster/types.h +++ b/src/v/cluster/types.h @@ -175,6 +175,20 @@ inline std::error_code make_error_code(tx_errc e) noexcept { return std::error_code(static_cast(e), tx_error_category()); } +struct kafka_result { + kafka::offset last_offset; +}; +struct kafka_stages { + kafka_stages(ss::future<>, ss::future>); + explicit kafka_stages(raft::errc); + // after this future is ready, request in enqueued in raft and it will not + // be reorderd + ss::future<> request_enqueued; + // after this future is ready, request was successfully replicated with + // requested consistency level + ss::future> replicate_finished; +}; + struct try_abort_request : serde::envelope> { model::partition_id tm; diff --git a/src/v/model/fundamental.h b/src/v/model/fundamental.h index 3ecfdf1cb688..86e92f865e14 100644 --- a/src/v/model/fundamental.h +++ b/src/v/model/fundamental.h @@ -29,6 +29,12 @@ #include #include +namespace kafka { + +using offset = named_type; + +} // namespace kafka + namespace model { // Named after Kafka cleanup.policy topic property