b/src/v/cluster/types.h @@ -22,9 +22,9 @@ #include "model/timeout_clock.h" #include "raft/types.h" #include "security/acl.h" -#include "security/credential_store.h" #include "security/license.h" #include "security/scram_credential.h" +#include "security/types.h" #include "serde/envelope.h" #include "serde/serde.h" #include "storage/ntp_config.h" @@ -42,7 +42,6 @@ namespace cluster { using consensus_ptr = ss::lw_shared_ptr; -using broker_ptr = ss::lw_shared_ptr; // A cluster version is a logical protocol version describing the content // of the raft0 on disk structures, and available features. These are @@ -52,7 +51,10 @@ using cluster_version = named_type; constexpr cluster_version invalid_version = cluster_version{-1}; struct allocate_id_request - : serde::envelope> { + : serde::envelope< + allocate_id_request, + serde::version<0>, + serde::compat_version<0>> { model::timeout_clock::duration timeout; allocate_id_request() noexcept = default; @@ -74,7 +76,8 @@ struct allocate_id_request }; struct allocate_id_reply - : serde::envelope> { + : serde:: + envelope, serde::compat_version<0>> { int64_t id; errc ec; @@ -124,55 +127,17 @@ enum class tx_errc { request_rejected, invalid_producer_id_mapping, invalid_txn_state, - invalid_producer_epoch + invalid_producer_epoch, + tx_not_found, + tx_id_not_found }; + +std::ostream& operator<<(std::ostream&, const tx_errc&); + struct tx_errc_category final : public std::error_category { const char* name() const noexcept final { return "cluster::tx_errc"; } - std::string message(int c) const final { - switch (static_cast(c)) { - case tx_errc::none: - return "None"; - case tx_errc::leader_not_found: - return "Leader not found"; - case tx_errc::shard_not_found: - return "Shard not found"; - case tx_errc::partition_not_found: - return "Partition not found"; - case tx_errc::stm_not_found: - return "Stm not found"; - case tx_errc::partition_not_exists: - return "Partition not exists"; - case tx_errc::pid_not_found: - return "Pid not found"; - case tx_errc::timeout: - return "Timeout"; - case tx_errc::conflict: - return "Conflict"; - case tx_errc::fenced: - return "Fenced"; - case tx_errc::stale: - return "Stale"; - case tx_errc::not_coordinator: - return "Not coordinator"; - case tx_errc::coordinator_not_available: - return "Coordinator not available"; - case tx_errc::preparing_rebalance: - return "Preparing rebalance"; - case tx_errc::rebalance_in_progress: - return "Rebalance in progress"; - case tx_errc::coordinator_load_in_progress: - return "Coordinator load in progress"; - case tx_errc::unknown_server_error: - return "Unknown server error"; - case tx_errc::request_rejected: - return "Request rejected"; - case tx_errc::invalid_producer_epoch: - return "Invalid producer epoch"; - default: - return "cluster::tx_errc::unknown"; - } - } + std::string message(int) const final; }; inline const std::error_category& tx_error_category() noexcept { static tx_errc_category e; @@ -211,11 +176,12 @@ enum class partition_removal_mode : uint8_t { }; struct try_abort_request - : serde::envelope> { + : serde:: + envelope, serde::compat_version<0>> { model::partition_id tm; model::producer_identity pid; model::tx_seq tx_seq; - model::timeout_clock::duration timeout; + model::timeout_clock::duration timeout{}; try_abort_request() noexcept = default; @@ -238,7 +204,9 @@ struct try_abort_request auto serde_fields() { return std::tie(tm, pid, tx_seq, timeout); } }; -struct try_abort_reply : serde::envelope> { +struct try_abort_reply + : serde:: + envelope, serde::compat_version<0>> { using committed_type = ss::bool_class; using aborted_type = ss::bool_class; @@ -273,10 +241,13 @@ struct try_abort_reply : serde::envelope> { }; struct init_tm_tx_request - : serde::envelope> { - kafka::transactional_id tx_id; - std::chrono::milliseconds transaction_timeout_ms; - model::timeout_clock::duration timeout; + : serde::envelope< + init_tm_tx_request, + serde::version<0>, + serde::compat_version<0>> { + kafka::transactional_id tx_id{}; + std::chrono::milliseconds transaction_timeout_ms{}; + model::timeout_clock::duration timeout{}; init_tm_tx_request() noexcept = default; @@ -299,7 +270,9 @@ struct init_tm_tx_request } }; -struct init_tm_tx_reply : serde::envelope> { +struct init_tm_tx_reply + : serde:: + envelope, serde::compat_version<0>> { // partition_not_exists, not_leader, topic_not_exists model::producer_identity pid; tx_errc ec; @@ -360,11 +333,140 @@ struct end_tx_request { struct end_tx_reply { tx_errc error_code{}; }; -struct begin_tx_request : serde::envelope> { +struct fetch_tx_request + : serde:: + envelope, serde::compat_version<0>> { + using rpc_adl_exempt = std::true_type; + + kafka::transactional_id tx_id{}; + model::term_id term{}; + + fetch_tx_request() noexcept = default; + + fetch_tx_request(kafka::transactional_id tx_id, model::term_id term) + : tx_id(tx_id) + , term(term) {} + + friend bool operator==(const fetch_tx_request&, const fetch_tx_request&) + = default; + + friend std::ostream& operator<<(std::ostream& o, const fetch_tx_request& r); + + auto serde_fields() { return std::tie(tx_id, term); } +}; + +struct fetch_tx_reply + : serde:: + envelope, serde::compat_version<0>> { + using rpc_adl_exempt = std::true_type; + + enum tx_status : int32_t { + ongoing, + preparing, + prepared, + aborting, + killed, + ready, + tombstone, + }; + + struct tx_partition + : serde:: + envelope, serde::compat_version<0>> { + using rpc_adl_exempt = std::true_type; + + model::ntp ntp; + model::term_id etag; + model::revision_id topic_revision; + + tx_partition() noexcept = default; + + tx_partition( + model::ntp ntp, + model::term_id etag, + model::revision_id topic_revision) + : ntp(ntp) + , etag(etag) + , topic_revision(topic_revision) {} + + friend bool operator==(const tx_partition&, const tx_partition&) + = default; + + friend std::ostream& operator<<(std::ostream& o, const tx_partition& r); + + auto serde_fields() { return std::tie(ntp, etag, topic_revision); } + }; + + struct tx_group + : serde::envelope, serde::compat_version<0>> { + using rpc_adl_exempt = std::true_type; + + kafka::group_id group_id; + model::term_id etag; + + tx_group() noexcept = default; + + tx_group(kafka::group_id group_id, model::term_id etag) + : group_id(group_id) + , etag(etag) {} + + friend bool operator==(const tx_group&, const tx_group&) = default; + + friend std::ostream& operator<<(std::ostream& o, const tx_partition& r); + + auto serde_fields() { return std::tie(group_id, etag); } + }; + + tx_errc ec{}; + model::producer_identity pid{}; + model::producer_identity last_pid{}; + model::tx_seq tx_seq{}; + std::chrono::milliseconds timeout_ms{}; + tx_status status{}; + std::vector partitions{}; + std::vector groups{}; + + fetch_tx_reply() noexcept = default; + + fetch_tx_reply(tx_errc ec) + : ec(ec) {} + + fetch_tx_reply( + tx_errc ec, + model::producer_identity pid, + model::producer_identity last_pid, + model::tx_seq tx_seq, + std::chrono::milliseconds timeout_ms, + tx_status status, + std::vector partitions, + std::vector groups) + : ec(ec) + , pid(pid) + , last_pid(last_pid) + , tx_seq(tx_seq) + , timeout_ms(timeout_ms) + , status(status) + , partitions(partitions) + , groups(groups) {} + + friend bool operator==(const fetch_tx_reply&, const fetch_tx_reply&) + = default; + + friend std::ostream& operator<<(std::ostream& o, const fetch_tx_reply& r); + + auto serde_fields() { + return std::tie( + ec, pid, last_pid, tx_seq, timeout_ms, status, partitions, groups); + } +}; + +struct begin_tx_request + : serde:: + envelope, serde::compat_version<0>> { model::ntp ntp; model::producer_identity pid; model::tx_seq tx_seq; - std::chrono::milliseconds transaction_timeout_ms; + std::chrono::milliseconds transaction_timeout_ms{}; begin_tx_request() noexcept = default; @@ -388,13 +490,26 @@ struct begin_tx_request : serde::envelope> { } }; -struct begin_tx_reply : serde::envelope> { +struct begin_tx_reply + : serde:: + envelope, serde::compat_version<0>> { model::ntp ntp; model::term_id etag; tx_errc ec; + model::revision_id topic_revision = model::revision_id(-1); begin_tx_reply() noexcept = default; + begin_tx_reply( + model::ntp ntp, + model::term_id etag, + tx_errc ec, + model::revision_id topic_revision) + : ntp(std::move(ntp)) + , etag(etag) + , ec(ec) + , topic_revision(topic_revision) {} + begin_tx_reply(model::ntp ntp, model::term_id etag, tx_errc ec) : ntp(std::move(ntp)) , etag(etag) @@ -409,17 +524,20 @@ struct begin_tx_reply : serde::envelope> { friend std::ostream& operator<<(std::ostream& o, const begin_tx_reply& r); - auto serde_fields() { return std::tie(ntp, etag, ec); } + auto serde_fields() { return std::tie(ntp, etag, ec, topic_revision); } }; struct prepare_tx_request - : serde::envelope> { + : serde::envelope< + prepare_tx_request, + serde::version<0>, + serde::compat_version<0>> { model::ntp ntp; model::term_id etag; model::partition_id tm; model::producer_identity pid; model::tx_seq tx_seq; - model::timeout_clock::duration timeout; + model::timeout_clock::duration timeout{}; prepare_tx_request() noexcept = default; @@ -448,8 +566,10 @@ struct prepare_tx_request } }; -struct prepare_tx_reply : serde::envelope> { - tx_errc ec; +struct prepare_tx_reply + : serde:: + envelope, serde::compat_version<0>> { + tx_errc ec{}; prepare_tx_reply() noexcept = default; @@ -465,11 +585,12 @@ struct prepare_tx_reply : serde::envelope> { }; struct commit_tx_request - : serde::envelope> { + : serde:: + envelope, serde::compat_version<0>> { model::ntp ntp; model::producer_identity pid; model::tx_seq tx_seq; - model::timeout_clock::duration timeout; + model::timeout_clock::duration timeout{}; commit_tx_request() noexcept = default; @@ -492,8 +613,10 @@ struct commit_tx_request operator<<(std::ostream& o, const commit_tx_request& r); }; -struct commit_tx_reply : serde::envelope> { - tx_errc ec; +struct commit_tx_reply + : serde:: + envelope, serde::compat_version<0>> { + tx_errc ec{}; commit_tx_reply() noexcept = default; @@ -508,11 +631,13 @@ struct commit_tx_reply : serde::envelope> { friend std::ostream& operator<<(std::ostream& o, const commit_tx_reply& r); }; -struct abort_tx_request : serde::envelope> { +struct abort_tx_request + : serde:: + envelope, serde::compat_version<0>> { model::ntp ntp; model::producer_identity pid; model::tx_seq tx_seq; - model::timeout_clock::duration timeout; + model::timeout_clock::duration timeout{}; abort_tx_request() noexcept = default; @@ -534,8 +659,10 @@ struct abort_tx_request : serde::envelope> { friend std::ostream& operator<<(std::ostream& o, const abort_tx_request& r); }; -struct abort_tx_reply : serde::envelope> { - tx_errc ec; +struct abort_tx_reply + : serde:: + envelope, serde::compat_version<0>> { + tx_errc ec{}; abort_tx_reply() noexcept = default; @@ -551,12 +678,15 @@ struct abort_tx_reply : serde::envelope> { }; struct begin_group_tx_request - : serde::envelope> { + : serde::envelope< + begin_group_tx_request, + serde::version<0>, + serde::compat_version<0>> { model::ntp ntp; kafka::group_id group_id; model::producer_identity pid; model::tx_seq tx_seq; - model::timeout_clock::duration timeout; + model::timeout_clock::duration timeout{}; begin_group_tx_request() noexcept = default; @@ -597,9 +727,12 @@ struct begin_group_tx_request }; struct begin_group_tx_reply - : serde::envelope> { + : serde::envelope< + begin_group_tx_reply, + serde::version<0>, + serde::compat_version<0>> { model::term_id etag; - tx_errc ec; + tx_errc ec{}; begin_group_tx_reply() noexcept = default; @@ -621,13 +754,16 @@ struct begin_group_tx_reply }; struct prepare_group_tx_request - : serde::envelope> { + : serde::envelope< + prepare_group_tx_request, + serde::version<0>, + serde::compat_version<0>> { model::ntp ntp; kafka::group_id group_id; model::term_id etag; model::producer_identity pid; model::tx_seq tx_seq; - model::timeout_clock::duration timeout; + model::timeout_clock::duration timeout{}; prepare_group_tx_request() noexcept = default; @@ -671,8 +807,11 @@ struct prepare_group_tx_request }; struct prepare_group_tx_reply - : serde::envelope> { - tx_errc ec; + : serde::envelope< + prepare_group_tx_reply, + serde::version<0>, + serde::compat_version<0>> { + tx_errc ec{}; prepare_group_tx_reply() noexcept = default; @@ -690,12 +829,15 @@ struct prepare_group_tx_reply }; struct commit_group_tx_request - : serde::envelope> { + : serde::envelope< + commit_group_tx_request, + serde::version<0>, + serde::compat_version<0>> { model::ntp ntp; model::producer_identity pid; model::tx_seq tx_seq; kafka::group_id group_id; - model::timeout_clock::duration timeout; + model::timeout_clock::duration timeout{}; commit_group_tx_request() noexcept = default; @@ -736,8 +878,11 @@ struct commit_group_tx_request }; struct commit_group_tx_reply - : serde::envelope> { - tx_errc ec; + : serde::envelope< + commit_group_tx_reply, + serde::version<0>, + serde::compat_version<0>> { + tx_errc ec{}; commit_group_tx_reply() noexcept = default; @@ -755,12 +900,15 @@ struct commit_group_tx_reply }; struct abort_group_tx_request - : serde::envelope> { + : serde::envelope< + abort_group_tx_request, + serde::version<0>, + serde::compat_version<0>> { model::ntp ntp; kafka::group_id group_id; model::producer_identity pid; model::tx_seq tx_seq; - model::timeout_clock::duration timeout; + model::timeout_clock::duration timeout{}; abort_group_tx_request() noexcept = default; @@ -801,8 +949,11 @@ struct abort_group_tx_request }; struct abort_group_tx_reply - : serde::envelope> { - tx_errc ec; + : serde::envelope< + abort_group_tx_reply, + serde::version<0>, + serde::compat_version<0>> { + tx_errc ec{}; abort_group_tx_reply() noexcept = default; @@ -824,7 +975,8 @@ struct abort_group_tx_reply /// - Always specifies node_id /// (remove this RPC two versions after join_node_request was /// added to replace it) -struct join_request : serde::envelope> { +struct join_request + : serde::envelope, serde::compat_version<0>> { join_request() noexcept = default; explicit join_request(model::broker b) @@ -842,7 +994,8 @@ struct join_request : serde::envelope> { auto serde_fields() { return std::tie(node); } }; -struct join_reply : serde::envelope> { +struct join_reply + : serde::envelope, serde::compat_version<0>> { bool success; join_reply() noexcept = default; @@ -866,7 +1019,8 @@ struct join_reply : serde::envelope> { /// node_id (https://github.com/redpanda-data/redpanda/issues/2793) /// in future. struct join_node_request - : serde::envelope> { + : serde:: + envelope, serde::compat_version<0>> { join_node_request() noexcept = default; explicit join_node_request( @@ -905,7 +1059,9 @@ struct join_node_request auto serde_fields() { return std::tie(logical_version, node_uuid, node); } }; -struct join_node_reply : serde::envelope> { +struct join_node_reply + : serde:: + envelope, serde::compat_version<0>> { bool success{false}; model::node_id id{model::unassigned_node_id}; @@ -927,7 +1083,10 @@ struct join_node_reply : serde::envelope> { }; struct configuration_update_request - : serde::envelope> { + : serde::envelope< + configuration_update_request, + serde::version<0>, + serde::compat_version<0>> { configuration_update_request() noexcept = default; explicit configuration_update_request(model::broker b, model::node_id tid) : node(std::move(b)) @@ -947,7 +1106,10 @@ struct configuration_update_request }; struct configuration_update_reply - : serde::envelope> { + : serde::envelope< + configuration_update_reply, + serde::version<0>, + serde::compat_version<0>> { configuration_update_reply() noexcept = default; explicit configuration_update_reply(bool success) : success(success) {} @@ -967,7 +1129,10 @@ struct configuration_update_reply /// Partition assignment describes an assignment of all replicas for single NTP. /// The replicas are hold in vector of broker_shard. struct partition_assignment - : serde::envelope> { + : serde::envelope< + partition_assignment, + serde::version<0>, + serde::compat_version<0>> { partition_assignment() noexcept = default; partition_assignment( raft::group_id group, @@ -996,7 +1161,10 @@ struct partition_assignment }; struct remote_topic_properties - : serde::envelope> { + : serde::envelope< + remote_topic_properties, + serde::version<0>, + serde::compat_version<0>> { remote_topic_properties() = default; remote_topic_properties( model::initial_revision_id remote_revision, @@ -1028,7 +1196,7 @@ struct remote_topic_properties */ struct topic_properties : serde:: - envelope, serde::compat_version<0>> { + envelope, serde::compat_version<0>> { topic_properties() noexcept = default; topic_properties( std::optional compression, @@ -1046,7 +1214,8 @@ struct topic_properties std::optional batch_max_bytes, tristate retention_local_target_bytes, tristate retention_local_target_ms, - bool remote_delete) + bool remote_delete, + tristate segment_ms) : compression(compression) , cleanup_policy_bitflags(cleanup_policy_bitflags) , compaction_strategy(compaction_strategy) @@ -1062,7 +1231,8 @@ struct topic_properties , batch_max_bytes(batch_max_bytes) , retention_local_target_bytes(retention_local_target_bytes) , retention_local_target_ms(retention_local_target_ms) - , remote_delete(remote_delete) {} + , remote_delete(remote_delete) + , segment_ms(segment_ms) {} std::optional compression; std::optional cleanup_policy_bitflags; @@ -1081,12 +1251,13 @@ struct topic_properties tristate retention_local_target_ms{std::nullopt}; // Remote deletes are enabled by default in new tiered storage topics, - // disabled by default in legacy topics during upgrade (the legacy path - // is handled during adl/serde decode). + // disabled by default in legacy topics during upgrade. // This is intentionally not an optional: all topics have a concrete value // one way or another. There is no "use the cluster default". bool remote_delete{storage::ntp_config::default_remote_delete}; + tristate segment_ms{std::nullopt}; + bool is_compacted() const; bool has_overrides() const; @@ -1110,7 +1281,8 @@ struct topic_properties batch_max_bytes, retention_local_target_bytes, retention_local_target_ms, - remote_delete); + remote_delete, + segment_ms); } friend bool operator==(const topic_properties&, const topic_properties&) @@ -1135,7 +1307,10 @@ incremental_update_operation_as_string(incremental_update_operation op) { template struct property_update - : serde::envelope, serde::version<0>> { + : serde::envelope< + property_update, + serde::version<0>, + serde::compat_version<0>> { property_update() = default; property_update(T v, incremental_update_operation op) : value(std::move(v)) @@ -1162,7 +1337,10 @@ struct property_update template struct property_update> - : serde::envelope>, serde::version<0>> { + : serde::envelope< + property_update>, + serde::version<0>, + serde::compat_version<0>> { property_update() : value(std::nullopt){}; @@ -1192,19 +1370,19 @@ struct property_update> struct incremental_topic_updates : serde::envelope< incremental_topic_updates, - serde::version<2>, + serde::version<3>, serde::compat_version<0>> { static constexpr int8_t version_with_data_policy = -1; static constexpr int8_t version_with_shadow_indexing = -3; static constexpr int8_t version_with_batch_max_bytes_and_local_retention = -4; + static constexpr int8_t version_with_segment_ms = -5; // negative version indicating different format: // -1 - topic_updates with data_policy // -2 - topic_updates without data_policy // -3 - topic_updates with shadow_indexing // -4 - topic update with batch_max_bytes and retention.local.target - static constexpr int8_t version - = version_with_batch_max_bytes_and_local_retention; + static constexpr int8_t version = version_with_segment_ms; property_update> compression; property_update> cleanup_policy_bitflags; @@ -1221,6 +1399,7 @@ struct incremental_topic_updates retention_local_target_ms; property_update remote_delete{ false, incremental_update_operation::none}; + property_update> segment_ms; auto serde_fields() { return std::tie( @@ -1235,7 +1414,8 @@ struct incremental_topic_updates batch_max_bytes, retention_local_target_bytes, retention_local_target_ms, - remote_delete); + remote_delete, + segment_ms); } friend std::ostream& @@ -1251,10 +1431,15 @@ using replication_factor std::istream& operator>>(std::istream& i, replication_factor& cs); +replication_factor parsing_replication_factor(const ss::sstring& value); + // This class contains updates for topic properties which are replicates not by // topic_frontend struct incremental_topic_custom_updates - : serde::envelope> { + : serde::envelope< + incremental_topic_custom_updates, + serde::version<1>, + serde::compat_version<0>> { // Data-policy property is replicated by data_policy_frontend and handled by // data_policy_manager. property_update> data_policy; @@ -1276,7 +1461,10 @@ struct incremental_topic_custom_updates * Struct representing single topic properties update */ struct topic_properties_update - : serde::envelope> { + : serde::envelope< + topic_properties_update, + serde::version<0>, + serde::compat_version<0>> { // We need version to indetify request with custom_properties static constexpr int32_t version = -1; topic_properties_update() noexcept = default; @@ -1369,13 +1557,6 @@ struct topic_configuration properties = read_nested(in, h._bytes_left_limit); if (h._version < 1) { - maybe_adjust_retention_policies( - properties.shadow_indexing, - properties.retention_bytes, - properties.retention_duration, - properties.retention_local_target_bytes, - properties.retention_local_target_ms); - // Legacy tiered storage topics do not delete data on // topic deletion. properties.remote_delete @@ -1424,7 +1605,10 @@ struct custom_assignable_topic_configuration { }; struct create_partitions_configuration - : serde::envelope> { + : serde::envelope< + create_partitions_configuration, + serde::version<0>, + serde::compat_version<0>> { using custom_assignment = std::vector; create_partitions_configuration() = default; @@ -1452,7 +1636,10 @@ struct create_partitions_configuration }; struct topic_configuration_assignment - : serde::envelope> { + : serde::envelope< + topic_configuration_assignment, + serde::version<0>, + serde::compat_version<0>> { topic_configuration_assignment() = default; topic_configuration_assignment( @@ -1474,8 +1661,10 @@ struct topic_configuration_assignment }; struct create_partitions_configuration_assignment - : serde:: - envelope> { + : serde::envelope< + create_partitions_configuration_assignment, + serde::version<0>, + serde::compat_version<0>> { create_partitions_configuration_assignment() = default; create_partitions_configuration_assignment( create_partitions_configuration cfg, @@ -1497,7 +1686,8 @@ struct create_partitions_configuration_assignment = default; }; -struct topic_result : serde::envelope> { +struct topic_result + : serde::envelope, serde::compat_version<0>> { topic_result() noexcept = default; explicit topic_result(model::topic_namespace t, errc ec = errc::success) : tp_ns(std::move(t)) @@ -1513,7 +1703,10 @@ struct topic_result : serde::envelope> { }; struct create_topics_request - : serde::envelope> { + : serde::envelope< + create_topics_request, + serde::version<0>, + serde::compat_version<0>> { std::vector topics; model::timeout_clock::duration timeout; @@ -1528,7 +1721,10 @@ struct create_topics_request }; struct create_topics_reply - : serde::envelope> { + : serde::envelope< + create_topics_reply, + serde::version<0>, + serde::compat_version<0>> { std::vector results; std::vector metadata; std::vector configs; @@ -1552,7 +1748,10 @@ struct create_topics_reply }; struct finish_partition_update_request - : serde::envelope> { + : serde::envelope< + finish_partition_update_request, + serde::version<0>, + serde::compat_version<0>> { model::ntp ntp; std::vector new_replica_set; @@ -1568,7 +1767,10 @@ struct finish_partition_update_request }; struct finish_partition_update_reply - : serde::envelope> { + : serde::envelope< + finish_partition_update_reply, + serde::version<0>, + serde::compat_version<0>> { cluster::errc result; friend bool operator==( @@ -1583,7 +1785,10 @@ struct finish_partition_update_reply }; struct update_topic_properties_request - : serde::envelope> { + : serde::envelope< + update_topic_properties_request, + serde::version<0>, + serde::compat_version<0>> { std::vector updates; friend std::ostream& @@ -1598,7 +1803,10 @@ struct update_topic_properties_request }; struct update_topic_properties_reply - : serde::envelope> { + : serde::envelope< + update_topic_properties_reply, + serde::version<0>, + serde::compat_version<0>> { std::vector results; friend std::ostream& @@ -1612,17 +1820,6 @@ struct update_topic_properties_reply auto serde_fields() { return std::tie(results); } }; -template -struct patch { - std::vector additions; - std::vector deletions; - std::vector updates; - - bool empty() const { - return additions.empty() && deletions.empty() && updates.empty(); - } -}; - // generic type used for various registration handles such as in ntp_callbacks.h using notification_id_type = named_type; constexpr notification_id_type notification_id_type_invalid{-1}; @@ -1708,7 +1905,10 @@ struct topic_table_delta { }; struct create_acls_cmd_data - : serde::envelope> { + : serde::envelope< + create_acls_cmd_data, + serde::version<0>, + serde::compat_version<0>> { static constexpr int8_t current_version = 1; std::vector bindings; @@ -1726,7 +1926,10 @@ struct create_acls_cmd_data }; struct create_acls_request - : serde::envelope> { + : serde::envelope< + create_acls_request, + serde::version<0>, + serde::compat_version<0>> { create_acls_cmd_data data; model::timeout_clock::duration timeout; @@ -1750,7 +1953,8 @@ struct create_acls_request }; struct create_acls_reply - : serde::envelope> { + : serde:: + envelope, serde::compat_version<0>> { std::vector results; friend bool operator==(const create_acls_reply&, const create_acls_reply&) @@ -1766,7 +1970,10 @@ struct create_acls_reply }; struct delete_acls_cmd_data - : serde::envelope> { + : serde::envelope< + delete_acls_cmd_data, + serde::version<0>, + serde::compat_version<0>> { static constexpr int8_t current_version = 1; std::vector filters; @@ -1785,7 +1992,10 @@ struct delete_acls_cmd_data // result for a single filter struct delete_acls_result - : serde::envelope> { + : serde::envelope< + delete_acls_result, + serde::version<0>, + serde::compat_version<0>> { errc error; std::vector bindings; @@ -1802,7 +2012,10 @@ struct delete_acls_result }; struct delete_acls_request - : serde::envelope> { + : serde::envelope< + delete_acls_request, + serde::version<0>, + serde::compat_version<0>> { delete_acls_cmd_data data; model::timeout_clock::duration timeout; @@ -1826,7 +2039,8 @@ struct delete_acls_request }; struct delete_acls_reply - : serde::envelope> { + : serde:: + envelope, serde::compat_version<0>> { std::vector results; friend bool operator==(const delete_acls_reply&, const delete_acls_reply&) @@ -1842,7 +2056,8 @@ struct delete_acls_reply }; struct backend_operation - : serde::envelope> { + : serde:: + envelope, serde::compat_version<0>> { ss::shard_id source_shard; partition_assignment p_as; topic_table_delta::op_type type; @@ -1855,7 +2070,10 @@ struct backend_operation }; struct create_data_policy_cmd_data - : serde::envelope> { + : serde::envelope< + create_data_policy_cmd_data, + serde::version<0>, + serde::compat_version<0>> { static constexpr int8_t current_version = 1; // In future dp will be vector auto serde_fields() { return std::tie(dp); } @@ -1868,7 +2086,10 @@ struct create_data_policy_cmd_data }; struct non_replicable_topic - : serde::envelope> { + : serde::envelope< + non_replicable_topic, + serde::version<0>, + serde::compat_version<0>> { static constexpr int8_t current_version = 1; model::topic_namespace source; model::topic_namespace name; @@ -1885,7 +2106,9 @@ struct non_replicable_topic using config_version = named_type; constexpr config_version config_version_unset = config_version{-1}; -struct config_status : serde::envelope> { +struct config_status + : serde:: + envelope, serde::compat_version<0>> { model::node_id node; config_version version{config_version_unset}; bool restart{false}; @@ -1901,7 +2124,10 @@ struct config_status : serde::envelope> { }; struct cluster_property_kv - : serde::envelope> { + : serde::envelope< + cluster_property_kv, + serde::version<0>, + serde::compat_version<0>> { cluster_property_kv() = default; cluster_property_kv(ss::sstring k, ss::sstring v) : key(std::move(k)) @@ -1919,7 +2145,10 @@ struct cluster_property_kv }; struct cluster_config_delta_cmd_data - : serde::envelope> { + : serde::envelope< + cluster_config_delta_cmd_data, + serde::version<0>, + serde::compat_version<0>> { static constexpr int8_t current_version = 0; std::vector upsert; std::vector remove; @@ -1936,7 +2165,10 @@ struct cluster_config_delta_cmd_data }; struct cluster_config_status_cmd_data - : serde::envelope> { + : serde::envelope< + cluster_config_status_cmd_data, + serde::version<0>, + serde::compat_version<0>> { static constexpr int8_t current_version = 0; friend bool operator==( @@ -1950,7 +2182,10 @@ struct cluster_config_status_cmd_data }; struct feature_update_action - : serde::envelope> { + : serde::envelope< + feature_update_action, + serde::version<0>, + serde::compat_version<0>> { static constexpr int8_t current_version = 1; enum class action_t : std::uint16_t { // Notify when a feature is done with preparing phase @@ -1978,7 +2213,10 @@ struct feature_update_action }; struct feature_update_cmd_data - : serde::envelope> { + : serde::envelope< + feature_update_cmd_data, + serde::version<0>, + serde::compat_version<0>> { // To avoid ambiguity on 'versions' here: `current_version` // is the encoding version of the struct, subsequent version // fields are the payload. @@ -2000,8 +2238,10 @@ struct feature_update_cmd_data using force_abort_update = ss::bool_class; struct cancel_moving_partition_replicas_cmd_data - : serde:: - envelope> { + : serde::envelope< + cancel_moving_partition_replicas_cmd_data, + serde::version<0>, + serde::compat_version<0>> { cancel_moving_partition_replicas_cmd_data() = default; explicit cancel_moving_partition_replicas_cmd_data(force_abort_update force) : force(force) {} @@ -2011,7 +2251,10 @@ struct cancel_moving_partition_replicas_cmd_data }; struct move_topic_replicas_data - : serde::envelope> { + : serde::envelope< + move_topic_replicas_data, + serde::version<0>, + serde::compat_version<0>> { move_topic_replicas_data() noexcept = default; explicit move_topic_replicas_data( model::partition_id partition, std::vector replicas) @@ -2028,7 +2271,10 @@ struct move_topic_replicas_data }; struct feature_update_license_update_cmd_data - : serde::envelope> { + : serde::envelope< + feature_update_license_update_cmd_data, + serde::version<0>, + serde::compat_version<0>> { using rpc_adl_exempt = std::true_type; // Struct encoding version @@ -2043,7 +2289,10 @@ struct feature_update_license_update_cmd_data }; struct user_and_credential - : serde::envelope> { + : serde::envelope< + user_and_credential, + serde::version<0>, + serde::compat_version<0>> { using rpc_adl_exempt = std::true_type; static constexpr int8_t current_version = 0; @@ -2063,17 +2312,29 @@ struct user_and_credential }; struct bootstrap_cluster_cmd_data - : serde::envelope> { + : serde::envelope< + bootstrap_cluster_cmd_data, + serde::version<1>, + serde::compat_version<0>> { using rpc_adl_exempt = std::true_type; friend bool operator==( const bootstrap_cluster_cmd_data&, const bootstrap_cluster_cmd_data&) = default; - auto serde_fields() { return std::tie(uuid, bootstrap_user_cred); } + auto serde_fields() { + return std::tie( + uuid, bootstrap_user_cred, node_ids_by_uuid, founding_version); + } model::cluster_uuid uuid; std::optional bootstrap_user_cred; + absl::flat_hash_map node_ids_by_uuid; + + // If this is set, fast-forward the feature_table to enable features + // from this version. Indicates the version of Redpanda of + // the node that generated the bootstrap record. + cluster_version founding_version{invalid_version}; }; enum class reconciliation_status : int8_t { @@ -2084,7 +2345,10 @@ enum class reconciliation_status : int8_t { std::ostream& operator<<(std::ostream&, const reconciliation_status&); class ntp_reconciliation_state - : public serde::envelope> { + : public serde::envelope< + ntp_reconciliation_state, + serde::version<0>, + serde::compat_version<0>> { public: ntp_reconciliation_state() noexcept = default; @@ -2129,7 +2393,10 @@ private: }; struct reconciliation_state_request - : serde::envelope> { + : serde::envelope< + reconciliation_state_request, + serde::version<0>, + serde::compat_version<0>> { std::vector ntps; friend bool operator==( @@ -2146,7 +2413,10 @@ struct reconciliation_state_request }; struct reconciliation_state_reply - : serde::envelope> { + : serde::envelope< + reconciliation_state_reply, + serde::version<0>, + serde::compat_version<0>> { std::vector results; friend bool operator==( @@ -2163,7 +2433,10 @@ struct reconciliation_state_reply }; struct decommission_node_request - : serde::envelope> { + : serde::envelope< + decommission_node_request, + serde::version<0>, + serde::compat_version<0>> { model::node_id id; friend bool operator==( @@ -2180,7 +2453,10 @@ struct decommission_node_request }; struct decommission_node_reply - : serde::envelope> { + : serde::envelope< + decommission_node_reply, + serde::version<0>, + serde::compat_version<0>> { errc error; friend bool @@ -2197,7 +2473,10 @@ struct decommission_node_reply }; struct recommission_node_request - : serde::envelope> { + : serde::envelope< + recommission_node_request, + serde::version<0>, + serde::compat_version<0>> { model::node_id id; friend bool operator==( @@ -2214,7 +2493,10 @@ struct recommission_node_request }; struct recommission_node_reply - : serde::envelope> { + : serde::envelope< + recommission_node_reply, + serde::version<0>, + serde::compat_version<0>> { errc error; friend bool @@ -2231,7 +2513,10 @@ struct recommission_node_reply }; struct finish_reallocation_request - : serde::envelope> { + : serde::envelope< + finish_reallocation_request, + serde::version<0>, + serde::compat_version<0>> { model::node_id id; friend bool operator==( @@ -2248,7 +2533,10 @@ struct finish_reallocation_request }; struct finish_reallocation_reply - : serde::envelope> { + : serde::envelope< + finish_reallocation_reply, + serde::version<0>, + serde::compat_version<0>> { errc error; friend bool operator==( @@ -2265,7 +2553,10 @@ struct finish_reallocation_reply }; struct set_maintenance_mode_request - : serde::envelope> { + : serde::envelope< + set_maintenance_mode_request, + serde::version<0>, + serde::compat_version<0>> { static constexpr int8_t current_version = 1; model::node_id id; bool enabled; @@ -2284,7 +2575,10 @@ struct set_maintenance_mode_request }; struct set_maintenance_mode_reply - : serde::envelope> { + : serde::envelope< + set_maintenance_mode_reply, + serde::version<0>, + serde::compat_version<0>> { static constexpr int8_t current_version = 1; errc error; @@ -2302,7 +2596,10 @@ struct set_maintenance_mode_reply }; struct config_status_request - : serde::envelope> { + : serde::envelope< + config_status_request, + serde::version<0>, + serde::compat_version<0>> { config_status status; friend std::ostream& @@ -2316,7 +2613,10 @@ struct config_status_request }; struct config_status_reply - : serde::envelope> { + : serde::envelope< + config_status_reply, + serde::version<0>, + serde::compat_version<0>> { errc error; friend std::ostream& operator<<(std::ostream&, const config_status_reply&); @@ -2329,7 +2629,10 @@ struct config_status_reply }; struct feature_action_request - : serde::envelope> { + : serde::envelope< + feature_action_request, + serde::version<0>, + serde::compat_version<0>> { feature_update_action action; friend bool @@ -2343,7 +2646,10 @@ struct feature_action_request }; struct feature_action_response - : serde::envelope> { + : serde::envelope< + feature_action_response, + serde::version<0>, + serde::compat_version<0>> { errc error; friend bool @@ -2360,7 +2666,10 @@ using feature_barrier_tag = named_type; struct feature_barrier_request - : serde::envelope> { + : serde::envelope< + feature_barrier_request, + serde::version<0>, + serde::compat_version<0>> { static constexpr int8_t current_version = 1; feature_barrier_tag tag; // Each cooperative barrier must use a unique tag model::node_id peer; @@ -2377,7 +2686,10 @@ struct feature_barrier_request }; struct feature_barrier_response - : serde::envelope> { + : serde::envelope< + feature_barrier_response, + serde::version<0>, + serde::compat_version<0>> { static constexpr int8_t current_version = 1; bool entered; // Has the respondent entered? bool complete; // Has the respondent exited? @@ -2393,7 +2705,10 @@ struct feature_barrier_response }; struct create_non_replicable_topics_request - : serde::envelope> { + : serde::envelope< + create_non_replicable_topics_request, + serde::version<0>, + serde::compat_version<0>> { static constexpr int8_t current_version = 1; std::vector topics; model::timeout_clock::duration timeout; @@ -2410,7 +2725,10 @@ struct create_non_replicable_topics_request }; struct create_non_replicable_topics_reply - : serde::envelope> { + : serde::envelope< + create_non_replicable_topics_reply, + serde::version<0>, + serde::compat_version<0>> { static constexpr int8_t current_version = 1; std::vector results; @@ -2426,7 +2744,10 @@ struct create_non_replicable_topics_reply }; struct config_update_request final - : serde::envelope> { + : serde::envelope< + config_update_request, + serde::version<0>, + serde::compat_version<0>> { std::vector upsert; std::vector remove; @@ -2441,7 +2762,10 @@ struct config_update_request final }; struct config_update_reply - : serde::envelope> { + : serde::envelope< + config_update_reply, + serde::version<0>, + serde::compat_version<0>> { errc error; cluster::config_version latest_version{config_version_unset}; @@ -2454,7 +2778,9 @@ struct config_update_reply auto serde_fields() { return std::tie(error, latest_version); } }; -struct hello_request final : serde::envelope> { +struct hello_request final + : serde:: + envelope, serde::compat_version<0>> { model::node_id peer; // milliseconds since epoch @@ -2468,7 +2794,8 @@ struct hello_request final : serde::envelope> { friend std::ostream& operator<<(std::ostream&, const hello_request&); }; -struct hello_reply : serde::envelope> { +struct hello_reply + : serde::envelope, serde::compat_version<0>> { errc error; friend bool operator==(const hello_reply&, const hello_reply&) = default; @@ -2548,7 +2875,10 @@ private: }; struct move_cancellation_result - : serde::envelope> { + : serde::envelope< + move_cancellation_result, + serde::version<0>, + serde::compat_version<0>> { move_cancellation_result() = default; move_cancellation_result(model::ntp ntp, cluster::errc ec) @@ -2572,7 +2902,10 @@ enum class partition_move_direction { to_node, from_node, all }; std::ostream& operator<<(std::ostream&, const partition_move_direction&); struct cancel_all_partition_movements_request - : serde::envelope> { + : serde::envelope< + cancel_all_partition_movements_request, + serde::version<0>, + serde::compat_version<0>> { cancel_all_partition_movements_request() = default; auto serde_fields() { return std::tie(); } @@ -2589,8 +2922,10 @@ struct cancel_all_partition_movements_request } }; struct cancel_node_partition_movements_request - : serde:: - envelope> { + : serde::envelope< + cancel_node_partition_movements_request, + serde::version<0>, + serde::compat_version<0>> { model::node_id node_id; partition_move_direction direction; @@ -2606,7 +2941,10 @@ struct cancel_node_partition_movements_request }; struct cancel_partition_movements_reply - : serde::envelope> { + : serde::envelope< + cancel_partition_movements_reply, + serde::version<0>, + serde::compat_version<0>> { friend bool operator==( const cancel_partition_movements_reply&, const cancel_partition_movements_reply&) @@ -2621,6 +2959,137 @@ struct cancel_partition_movements_reply std::vector partition_results; }; +struct revert_cancel_partition_move_cmd_data + : serde::envelope< + revert_cancel_partition_move_cmd_data, + serde::version<0>, + serde::version<0>> { + model::ntp ntp; + + auto serde_fields() { return std::tie(ntp); } + + friend bool operator==( + const revert_cancel_partition_move_cmd_data&, + const revert_cancel_partition_move_cmd_data&) + = default; +}; + +struct revert_cancel_partition_move_request + : serde::envelope< + revert_cancel_partition_move_request, + serde::version<0>, + serde::version<0>> { + using rpc_adl_exempt = std::true_type; + model::ntp ntp; + + auto serde_fields() { return std::tie(ntp); } + + friend bool operator==( + const revert_cancel_partition_move_request&, + const revert_cancel_partition_move_request&) + = default; +}; + +struct revert_cancel_partition_move_reply + : serde::envelope< + revert_cancel_partition_move_reply, + serde::version<0>, + serde::version<0>> { + using rpc_adl_exempt = std::true_type; + errc result; + + auto serde_fields() { return std::tie(result); } + + friend bool operator==( + const revert_cancel_partition_move_reply&, + const revert_cancel_partition_move_reply&) + = default; +}; + +/** + * Broker state transitions are coordinated centrally as opposite to + * configuration which change is requested by the described node itself. Broker + * state represents centrally managed node properties. The difference between + * broker state and configuration is that the configuration change is made on + * the node while state changes are managed by the cluster controller. + */ +class broker_state + : public serde:: + envelope, serde::compat_version<0>> { +public: + model::membership_state get_membership_state() const { + return _membership_state; + } + void set_membership_state(model::membership_state st) { + _membership_state = st; + } + + model::maintenance_state get_maintenance_state() const { + return _maintenance_state; + } + void set_maintenance_state(model::maintenance_state st) { + _maintenance_state = st; + } + friend bool operator==(const broker_state&, const broker_state&) = default; + + friend std::ostream& operator<<(std::ostream&, const broker_state&); + + auto serde_fields() { + return std::tie(_membership_state, _maintenance_state); + } + +private: + model::membership_state _membership_state = model::membership_state::active; + model::maintenance_state _maintenance_state + = model::maintenance_state::inactive; +}; + +/** + * Node metadata describes a cluster node with its state and configuration + */ +struct node_metadata { + model::broker broker; + broker_state state; + + friend bool operator==(const node_metadata&, const node_metadata&) + = default; + friend std::ostream& operator<<(std::ostream&, const node_metadata&); +}; +/** + * Reconfiguration state indicates if ongoing reconfiguration is a result of + * partition movement, cancellation or forced cancellation + */ +enum class reconfiguration_state { in_progress, cancelled, force_cancelled }; + +std::ostream& operator<<(std::ostream&, reconfiguration_state); + +struct replica_bytes { + model::node_id node; + size_t bytes{0}; +}; + +struct partition_reconfiguration_state { + model::ntp ntp; + // assignments + std::vector previous_assignment; + std::vector current_assignment; + // state indicating if reconfiguration was cancelled or requested + reconfiguration_state state; + // amount of bytes already transferred to new replicas + std::vector already_transferred_bytes; + // current size of partition + size_t current_partition_size{0}; +}; + +struct node_decommission_progress { + // indicate if node decommissioning finished + bool finished = false; + // number of replicas left on decommissioned node + size_t replicas_left{0}; + // list of currently ongoing partition reconfigurations + std::vector current_reconfigurations; +}; + /* * Partition Allocation Domains is the way to make certain partition replicas * distributed evenly across the nodes of the cluster. When partition allocation diff --git a/src/v/compat/cluster_compat.h b/src/v/compat/cluster_compat.h index 685fa1765..bb0fa1a83 100644 --- a/src/v/compat/cluster_compat.h =======++++++ +++ b/src/v/features/feature_table_snapshot.h @@ -0,0 +1,73 @@ +/* + * Copyright 2022 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once + +#include "cluster/types.h" +#include "feature_state.h" +#include "model/fundamental.h" +#include "security/license.h" +#include "serde/serde.h" + +namespace features { + +class feature_table; + +/** + * Part of feature_table_snapshot that corresponds to the feature_state + * stored within feature_table. + * + * Unlike, feature_state, this class embeds the feature name directly for + * serialization, rather than encapsulating a reference to a feature_spec. + */ +struct feature_state_snapshot + : serde::envelope< + feature_state_snapshot, + serde::version<0>, + serde::compat_version<0>> { + ss::sstring name; + feature_state::state state; + + auto serde_fields() { return std::tie(name, state); } +}; + +/** + * Rather than adding serialization methods to feature_table that would + * include some subset of its fields, define a clean separate serialization + * containers for when encoding or decoding a snapshot. + */ +struct feature_table_snapshot + : serde::envelope< + feature_table_snapshot, + serde::version<1>, + serde::compat_version<0>> { + model::offset applied_offset; + cluster::cluster_version version{cluster::invalid_version}; + std::optional license; + std::vector states; + cluster::cluster_version original_version{cluster::invalid_version}; + + auto serde_fields() { + return std::tie( + applied_offset, version, states, license, original_version); + } + + /// Create a snapshot from a live feature table + static feature_table_snapshot from(const feature_table&); + + /// Replace the feature table's state with this snapshot + void apply(feature_table&) const; + + /// Key for storing the snapshot in the shard 0 kvstore. + static bytes kvstore_key(); +}; + +} // namespace features diff --git a/src/v/features/fwd.h b/src/v/features/fwd.h index 69b8dc00e..3418b89b9 100644 --- a/src/v/features/fwd.h =======++++++ +++ b/src/v/security/license.h @@ -62,15 +62,17 @@ inline std::ostream& operator<<(std::ostream& os, license_type lt) { return os; } -struct license : serde::envelope> { +struct license + : serde::envelope, serde::compat_version<0>> { /// Expected encoded contents uint8_t format_version; license_type type; ss::sstring organization; std::chrono::seconds expiry; + ss::sstring checksum; auto serde_fields() { - return std::tie(format_version, type, organization, expiry); + return std::tie(format_version, type, organization, expiry, checksum); } /// true if todays date is greater then \ref expiry @@ -79,9 +81,13 @@ struct license : serde::envelope> { /// Seconds since epoch until license expiration std::chrono::seconds expires() const noexcept; + auto operator<=>(const license&) const = delete; + private: friend struct fmt::formatter; + friend bool operator==(const license& a, const license& b) = default; + friend std::ostream& operator<<(std::ostream& os, const license& lic); }; diff --git a/src/v/security/logger.h b/src/v/security/logger.h index 63bd7601b..cd0cfa2fb 100644 --- a/src/v/security/logger.h =======++++++ +++ b/src/v/storage/index_state.h @@ -12,15 +12,67 @@ #pragma once #include "bytes/iobuf.h" +#include "features/feature_table.h" #include "model/fundamental.h" #include "model/timestamp.h" #include "serde/envelope.h" #include "utils/fragmented_vector.h" +#include + #include #include namespace storage { + +using offset_delta_time = ss::bool_class; + +/* + * In order to be able to represent negative time deltas (required for + * out of order timestamps, the time delta stored in 'index_state' is + * offset by 2^31. The 'offset_time_index' class below deals with + * this translation. The range for time deltas is roughly from -596h to +596h. + */ +class offset_time_index { +public: + static constexpr model::timestamp::type offset = 2147483648; // 2^31 + static constexpr model::timestamp::type delta_time_min = -offset; + static constexpr model::timestamp::type delta_time_max = offset - 1; + + offset_time_index(model::timestamp ts, offset_delta_time with_offset) + : _with_offset(with_offset) { + if (_with_offset == offset_delta_time::yes) { + _val = static_cast( + std::clamp(ts(), delta_time_min, delta_time_max) + offset); + } else { + _val = _val = static_cast(std::clamp( + ts(), + model::timestamp::type{std::numeric_limits::min()}, + model::timestamp::type{std::numeric_limits::max()})); + } + } + + uint32_t operator()() const { + if (_with_offset == offset_delta_time::yes) { + return _val - static_cast(offset); + } else { + return _val; + } + } + +private: + offset_time_index(uint32_t val, offset_delta_time with_offset) + : _with_offset(with_offset) + , _val(val) {} + + uint32_t raw_value() const { return _val; } + + offset_delta_time _with_offset; + uint32_t _val; + + friend struct index_state; +}; + /* Fileformat: 1 byte - version 4 bytes - size - does not include the version or size @@ -34,10 +86,18 @@ namespace storage { [] relative_offset_index [] relative_time_index [] position_index + 1 byte - batch_timestamps_are_monotonic + 1 byte - with_offset + 1 byte - non_data_timestamps */ struct index_state - : serde::envelope, serde::compat_version<4>> { + : serde::envelope, serde::compat_version<4>> { + static constexpr auto monotonic_timestamps_version = 5; + + static index_state make_empty_index(offset_delta_time with_offset); + index_state() = default; + index_state(index_state&&) noexcept = default; index_state& operator=(index_state&&) noexcept = default; index_state& operator=(const index_state&) = delete; @@ -61,22 +121,65 @@ struct index_state fragmented_vector relative_time_index; fragmented_vector position_index; + // flag indicating whether the maximum timestamp on the batches + // of this segment are monontonically increasing. + bool batch_timestamps_are_monotonic{true}; + + // flag indicating whether the relative time index has been offset + offset_delta_time with_offset{false}; + + // flag indicating whether this segment contains non user-data timestamps + bool non_data_timestamps{false}; + + size_t size() const { return relative_offset_index.size(); } + bool empty() const { return relative_offset_index.empty(); } - void - add_entry(uint32_t relative_offset, uint32_t relative_time, uint64_t pos) { + void add_entry( + uint32_t relative_offset, offset_time_index relative_time, uint64_t pos) { relative_offset_index.push_back(relative_offset); - relative_time_index.push_back(relative_time); + relative_time_index.push_back(relative_time.raw_value()); position_index.push_back(pos); } void pop_back() { relative_offset_index.pop_back(); relative_time_index.pop_back(); position_index.pop_back(); + if (empty()) { + non_data_timestamps = false; + } } - std::tuple get_entry(size_t i) { + std::tuple get_entry(size_t i) { return { - relative_offset_index[i], relative_time_index[i], position_index[i]}; + relative_offset_index[i], + offset_time_index{relative_time_index[i], with_offset}, + position_index[i]}; + } + + std::optional> + find_entry(model::timestamp ts) { + const auto idx = offset_time_index{ts, with_offset}; + + auto it = std::lower_bound( + std::begin(relative_time_index), + std::end(relative_time_index), + idx.raw_value(), + std::less{}); + if (it == relative_offset_index.end()) { + return std::nullopt; + } + + const auto dist = std::distance(relative_offset_index.begin(), it); + + // lower_bound will place us on the first batch in the index that has + // 'max_timestamp' greater than 'ts'. Since not every batch is indexed, + // it's not guaranteed* that 'ts' will be present in the batch + // (i.e. 'ts > first_timestamp'). For this reason, we go back one batch. + // + // *In the case where lower_bound places on the first batch, we'll + // start the timequery from the beggining of the segment as the user + // data batch is always indexed. + return get_entry(dist > 0 ? dist - 1 : 0); } bool maybe_index( @@ -89,6 +192,10 @@ struct index_state model::timestamp last_timestamp, bool user_data); + void update_batch_timestamps_are_monotonic(bool pred) { + batch_timestamps_are_monotonic = batch_timestamps_are_monotonic && pred; + } + friend bool operator==(const index_state&, const index_state&) = default; friend std::ostream& operator<<(std::ostream&, const index_state&); @@ -97,8 +204,6 @@ struct index_state friend void read_nested(iobuf_parser&, index_state&, const size_t); private: - bool non_data_timestamps{false}; - index_state(const index_state& o) noexcept : bitflags(o.bitflags) , base_offset(o.base_offset) @@ -107,7 +212,9 @@ private: , max_timestamp(o.max_timestamp) , relative_offset_index(o.relative_offset_index.copy()) , relative_time_index(o.relative_time_index.copy()) - , position_index(o.position_index.copy()) {} + , position_index(o.position_index.copy()) + , batch_timestamps_are_monotonic(o.batch_timestamps_are_monotonic) + , with_offset(o.with_offset) {} }; } // namespace storage diff --git a/src/v/storage/kvstore.cc b/src/v/storage/kvstore.cc index 2e8133b3e..92a490d89 100644 --- a/src/v/storage/kvstore.cc =======++++++ +++