diff --git a/src/v/cluster/types.cc b/src/v/cluster/types.cc index 9769e8e07a19..474bfb6a2d9e 100644 --- a/src/v/cluster/types.cc +++ b/src/v/cluster/types.cc @@ -32,6 +32,16 @@ namespace cluster { +kafka_stages::kafka_stages( + ss::future<> enq, ss::future> offset_future) + : request_enqueued(std::move(enq)) + , replicate_finished(std::move(offset_future)) {} + +kafka_stages::kafka_stages(raft::errc ec) + : request_enqueued(ss::now()) + , replicate_finished( + ss::make_ready_future>(make_error_code(ec))){}; + bool topic_properties::is_compacted() const { if (!cleanup_policy_bitflags) { return false; diff --git a/src/v/cluster/types.h b/src/v/cluster/types.h index a5abd73d555f..b101b19a88bf 100644 --- a/src/v/cluster/types.h +++ b/src/v/cluster/types.h @@ -175,6 +175,20 @@ inline std::error_code make_error_code(tx_errc e) noexcept { return std::error_code(static_cast(e), tx_error_category()); } +struct kafka_result { + kafka::offset last_offset; +}; +struct kafka_stages { + kafka_stages(ss::future<>, ss::future>); + explicit kafka_stages(raft::errc); + // after this future is ready, request in enqueued in raft and it will not + // be reorderd + ss::future<> request_enqueued; + // after this future is ready, request was successfully replicated with + // requested consistency level + ss::future> replicate_finished; +}; + struct try_abort_request : serde::envelope> { model::partition_id tm; diff --git a/src/v/model/fundamental.h b/src/v/model/fundamental.h index 3ecfdf1cb688..86e92f865e14 100644 --- a/src/v/model/fundamental.h +++ b/src/v/model/fundamental.h @@ -29,6 +29,12 @@ #include #include +namespace kafka { + +using offset = named_type; + +} // namespace kafka + namespace model { // Named after Kafka cleanup.policy topic property