diff --git a/src/v/cloud_storage/tests/cloud_storage_e2e_test.cc b/src/v/cloud_storage/tests/cloud_storage_e2e_test.cc index 69ee32267bf1..327b67cd4c1a 100644 --- a/src/v/cloud_storage/tests/cloud_storage_e2e_test.cc +++ b/src/v/cloud_storage/tests/cloud_storage_e2e_test.cc @@ -96,6 +96,7 @@ TEST_P(EndToEndFixture, TestProduceConsumeFromCloud) { model::timestamp::min(), 1, log->stm_manager()->max_collectible_offset(), + std::nullopt, ss::default_priority_class(), as); partition->log()->housekeeping(housekeeping_conf).get(); @@ -516,6 +517,7 @@ TEST_P(CloudStorageEndToEndManualTest, TestTimequeryAfterArchivalGC) { model::timestamp::min(), 1, // max_bytes_in_log log->stm_manager()->max_collectible_offset(), + std::nullopt, ss::default_priority_class(), as); partition->log()->housekeeping(housekeeping_conf).get(); @@ -809,6 +811,7 @@ TEST_P(EndToEndFixture, TestCloudStorageTimequery) { model::timestamp::max(), 0, log->stm_manager()->max_collectible_offset(), + std::nullopt, ss::default_priority_class(), as); partition->log()->housekeeping(housekeeping_conf).get(); diff --git a/src/v/cloud_storage/tests/topic_manifest_test.cc b/src/v/cloud_storage/tests/topic_manifest_test.cc index d449bbfb5ce9..a0d57f2c865b 100644 --- a/src/v/cloud_storage/tests/topic_manifest_test.cc +++ b/src/v/cloud_storage/tests/topic_manifest_test.cc @@ -479,7 +479,7 @@ SEASTAR_THREAD_TEST_CASE(test_topic_manifest_serde_feature_table) { std::nullopt, std::nullopt, std::nullopt, - }; + std::nullopt}; auto random_initial_revision_id = tests::random_named_int(); diff --git a/src/v/cluster/archival/tests/async_data_uploader_test.cc b/src/v/cluster/archival/tests/async_data_uploader_test.cc index e1b30b8dee0d..0f2eb68a01e3 100644 --- a/src/v/cluster/archival/tests/async_data_uploader_test.cc +++ b/src/v/cluster/archival/tests/async_data_uploader_test.cc @@ -167,6 +167,7 @@ class async_data_uploader_fixture : public redpanda_thread_fixture { model::timestamp::min(), std::nullopt, max_collect_offset, + std::nullopt, ss::default_priority_class(), as); diff --git a/src/v/cluster/archival/tests/ntp_archiver_reupload_test.cc b/src/v/cluster/archival/tests/ntp_archiver_reupload_test.cc index 9c2bbb16b527..58378a46ce8f 100644 --- a/src/v/cluster/archival/tests/ntp_archiver_reupload_test.cc +++ b/src/v/cluster/archival/tests/ntp_archiver_reupload_test.cc @@ -235,6 +235,7 @@ struct reupload_fixture : public archiver_fixture { model::timestamp::max(), std::nullopt, max_collectible, + std::nullopt, ss::default_priority_class(), abort_source}) .get(); diff --git a/src/v/cluster/archival/tests/ntp_archiver_test.cc b/src/v/cluster/archival/tests/ntp_archiver_test.cc index 98be346711e1..e5837ec4b6b9 100644 --- a/src/v/cluster/archival/tests/ntp_archiver_test.cc +++ b/src/v/cluster/archival/tests/ntp_archiver_test.cc @@ -941,6 +941,7 @@ FIXTURE_TEST( model::timestamp::now(), std::nullopt, model::offset{999}, + std::nullopt, ss::default_priority_class(), as)) .get0(); diff --git a/src/v/cluster/metadata_cache.cc b/src/v/cluster/metadata_cache.cc index 94f2045fd0e5..2f27beeb415d 100644 --- a/src/v/cluster/metadata_cache.cc +++ b/src/v/cluster/metadata_cache.cc @@ -22,6 +22,7 @@ #include "model/namespace.h" #include "model/timestamp.h" #include "storage/types.h" +#include "utils/tristate.h" #include #include @@ -318,6 +319,11 @@ metadata_cache::get_default_record_value_subject_name_strategy() const { return pandaproxy::schema_registry::subject_name_strategy::topic_name; } +std::optional +metadata_cache::get_default_tombstone_retention_ms() const { + return config::shard_local_cfg().tombstone_retention_ms(); +} + topic_properties metadata_cache::get_default_properties() const { topic_properties tp; tp.compression = {get_default_compression()}; @@ -335,6 +341,7 @@ topic_properties metadata_cache::get_default_properties() const { get_default_retention_local_target_bytes()}; tp.retention_local_target_ms = tristate{ get_default_retention_local_target_ms()}; + tp.tombstone_retention_ms = get_default_tombstone_retention_ms(); return tp; } diff --git a/src/v/cluster/metadata_cache.h b/src/v/cluster/metadata_cache.h index fe425f428622..2c6562a54e19 100644 --- a/src/v/cluster/metadata_cache.h +++ b/src/v/cluster/metadata_cache.h @@ -202,6 +202,8 @@ class metadata_cache { bool get_default_record_value_schema_id_validation() const; pandaproxy::schema_registry::subject_name_strategy get_default_record_value_subject_name_strategy() const; + std::optional + get_default_tombstone_retention_ms() const; topic_properties get_default_properties() const; std::optional diff --git a/src/v/cluster/tests/manual_log_deletion_test.cc b/src/v/cluster/tests/manual_log_deletion_test.cc index 6de06350793e..d8d54e7f19c4 100644 --- a/src/v/cluster/tests/manual_log_deletion_test.cc +++ b/src/v/cluster/tests/manual_log_deletion_test.cc @@ -112,6 +112,7 @@ struct manual_deletion_fixture : public raft_test_fixture { retention_timestamp, 100_MiB, model::offset::max(), + std::nullopt, ss::default_priority_class(), as, storage::ntp_sanitizer_config{.sanitize_only = true})) diff --git a/src/v/cluster/tests/tx_compaction_utils.h b/src/v/cluster/tests/tx_compaction_utils.h index 89e040123732..a3f2aff37218 100644 --- a/src/v/cluster/tests/tx_compaction_utils.h +++ b/src/v/cluster/tests/tx_compaction_utils.h @@ -107,6 +107,7 @@ class tx_executor { model::timestamp::now().value() - ret_duration.count()), std::nullopt, log->stm_manager()->max_collectible_offset(), + std::nullopt, ss::default_priority_class(), dummy_as, }) @@ -228,6 +229,7 @@ class tx_executor { model::timestamp::min(), std::nullopt, model::offset::max(), + std::nullopt, ss::default_priority_class(), as); // Compacts until a single sealed segment remains, other than the diff --git a/src/v/cluster/topic_configuration.cc b/src/v/cluster/topic_configuration.cc index 109d559271d0..8bb09bbab961 100644 --- a/src/v/cluster/topic_configuration.cc +++ b/src/v/cluster/topic_configuration.cc @@ -55,7 +55,7 @@ storage::ntp_config topic_configuration::make_ntp_config( .write_caching = properties.write_caching, .flush_ms = properties.flush_ms, .flush_bytes = properties.flush_bytes, - }); + .tombstone_retention_ms = properties.tombstone_retention_ms}); } return { model::ntp(tp_ns.ns, tp_ns.tp, p_id), diff --git a/src/v/cluster/topic_properties.cc b/src/v/cluster/topic_properties.cc index ece5278fa0f7..17356d3602f9 100644 --- a/src/v/cluster/topic_properties.cc +++ b/src/v/cluster/topic_properties.cc @@ -39,7 +39,8 @@ std::ostream& operator<<(std::ostream& o, const topic_properties& properties) { "write_caching: {}, " "flush_ms: {}, " "flush_bytes: {}, " - "remote_label: {}}}", + "remote_label: {}, " + "tombstone_retention_ms: {}}}", properties.compression, properties.cleanup_policy_bitflags, properties.compaction_strategy, @@ -72,7 +73,8 @@ std::ostream& operator<<(std::ostream& o, const topic_properties& properties) { properties.write_caching, properties.flush_ms, properties.flush_bytes, - properties.remote_label); + properties.remote_label, + properties.tombstone_retention_ms); return o; } @@ -108,7 +110,8 @@ bool topic_properties::has_overrides() const { || initial_retention_local_target_bytes.is_engaged() || initial_retention_local_target_ms.is_engaged() || write_caching.has_value() || flush_ms.has_value() - || flush_bytes.has_value() || remote_label.has_value(); + || flush_bytes.has_value() || remote_label.has_value() + || tombstone_retention_ms.has_value(); } bool topic_properties::requires_remote_erase() const { @@ -141,6 +144,7 @@ topic_properties::get_ntp_cfg_overrides() const { ret.write_caching = write_caching; ret.flush_ms = flush_ms; ret.flush_bytes = flush_bytes; + ret.tombstone_retention_ms = tombstone_retention_ms; return ret; } @@ -228,6 +232,7 @@ adl::from(iobuf_parser& parser) { std::nullopt, std::nullopt, std::nullopt, + std::nullopt, std::nullopt}; } diff --git a/src/v/cluster/topic_properties.h b/src/v/cluster/topic_properties.h index 49d33060af08..a40530a49dbb 100644 --- a/src/v/cluster/topic_properties.h +++ b/src/v/cluster/topic_properties.h @@ -33,7 +33,7 @@ namespace cluster { */ struct topic_properties : serde:: - envelope, serde::compat_version<0>> { + envelope, serde::compat_version<0>> { topic_properties() noexcept = default; topic_properties( std::optional compression, @@ -71,7 +71,8 @@ struct topic_properties std::optional mpx_virtual_cluster_id, std::optional write_caching, std::optional flush_ms, - std::optional flush_bytes) + std::optional flush_bytes, + std::optional tombstone_retention_ms) : compression(compression) , cleanup_policy_bitflags(cleanup_policy_bitflags) , compaction_strategy(compaction_strategy) @@ -108,7 +109,8 @@ struct topic_properties , mpx_virtual_cluster_id(mpx_virtual_cluster_id) , write_caching(write_caching) , flush_ms(flush_ms) - , flush_bytes(flush_bytes) {} + , flush_bytes(flush_bytes) + , tombstone_retention_ms(tombstone_retention_ms) {} std::optional compression; std::optional cleanup_policy_bitflags; @@ -162,6 +164,7 @@ struct topic_properties std::optional write_caching; std::optional flush_ms; std::optional flush_bytes; + std::optional tombstone_retention_ms; // Label to be used when generating paths of remote objects (manifests, // segments, etc) of this topic. @@ -217,7 +220,8 @@ struct topic_properties flush_ms, flush_bytes, remote_label, - remote_topic_namespace_override); + remote_topic_namespace_override, + tombstone_retention_ms); } friend bool operator==(const topic_properties&, const topic_properties&) diff --git a/src/v/cluster/topic_table.cc b/src/v/cluster/topic_table.cc index 88859587dcdb..844e82bd4e54 100644 --- a/src/v/cluster/topic_table.cc +++ b/src/v/cluster/topic_table.cc @@ -63,6 +63,11 @@ topic_table::apply(create_topic_cmd cmd, model::offset offset) { schema_id_validation_validator::ec); } + if (!topic_multi_property_validation(cmd.value.cfg.properties)) { + return ss::make_ready_future( + errc::topic_invalid_config); + } + std::optional remote_revision = cmd.value.cfg.properties.remote_topic_properties ? std::make_optional( cmd.value.cfg.properties.remote_topic_properties->remote_revision) @@ -776,6 +781,22 @@ std::error_code topic_table::validate_force_reconfigurable_partitions( return result; } +bool topic_table::topic_multi_property_validation( + const topic_properties& properties) const { + // tombstone.retention.ms validation. Cannot be enabled alongside tiered + // storage. + if (properties.tombstone_retention_ms.has_value()) { + if ( + properties.shadow_indexing.has_value() + && properties.shadow_indexing.value() + != model::shadow_indexing_mode::disabled) { + return false; + } + } + + return true; +} + template void incremental_update( std::optional& property, property_update> override) { @@ -941,9 +962,11 @@ topic_table::apply(update_topic_properties_cmd cmd, model::offset o) { updated_properties.write_caching, overrides.write_caching); incremental_update(updated_properties.flush_ms, overrides.flush_ms); incremental_update(updated_properties.flush_bytes, overrides.flush_bytes); + incremental_update( + updated_properties.tombstone_retention_ms, + overrides.tombstone_retention_ms); auto& properties = tp->second.get_configuration().properties; - // no configuration change, no need to generate delta if (updated_properties == properties) { co_return errc::success; @@ -953,6 +976,10 @@ topic_table::apply(update_topic_properties_cmd cmd, model::offset o) { co_return schema_id_validation_validator::ec; } + if (!topic_multi_property_validation(updated_properties)) { + co_return make_error_code(errc::topic_invalid_config); + } + // Apply the changes properties = std::move(updated_properties); diff --git a/src/v/cluster/topic_table.h b/src/v/cluster/topic_table.h index 2d427cf8b50a..655a95b4375a 100644 --- a/src/v/cluster/topic_table.h +++ b/src/v/cluster/topic_table.h @@ -656,6 +656,15 @@ class topic_table { std::error_code validate_force_reconfigurable_partition( const ntp_with_majority_loss&) const; + // Validation for the final property configuration from a + // update_topic_properties_cmd application. Allows user to perform + // validations that depend on more than one topic property. + // + // Returns true if the configured topic_properties is valid, and false + // otherwise. + bool + topic_multi_property_validation(const topic_properties& properties) const; + underlying_t _topics; lifecycle_markers_t _lifecycle_markers; disabled_partitions_t _disabled_partitions; diff --git a/src/v/cluster/types.cc b/src/v/cluster/types.cc index 4539353b3b65..45e0c40d7126 100644 --- a/src/v/cluster/types.cc +++ b/src/v/cluster/types.cc @@ -1176,7 +1176,8 @@ void adl::to( t.initial_retention_local_target_ms, t.write_caching, t.flush_ms, - t.flush_bytes); + t.flush_bytes, + t.tombstone_retention_ms); } cluster::incremental_topic_updates @@ -1311,6 +1312,14 @@ adl::from(iobuf_parser& in) { = adl>>{}.from(in); } + if ( + version <= cluster::incremental_topic_updates:: + version_with_tombstone_retention_ms) { + updates.tombstone_retention_ms = adl>>{} + .from(in); + } + return updates; } diff --git a/src/v/cluster/types.h b/src/v/cluster/types.h index b24f811caaf2..de3888127754 100644 --- a/src/v/cluster/types.h +++ b/src/v/cluster/types.h @@ -563,7 +563,7 @@ struct property_update> struct incremental_topic_updates : serde::envelope< incremental_topic_updates, - serde::version<6>, + serde::version<7>, serde::compat_version<0>> { static constexpr int8_t version_with_data_policy = -1; static constexpr int8_t version_with_shadow_indexing = -3; @@ -573,6 +573,7 @@ struct incremental_topic_updates static constexpr int8_t version_with_schema_id_validation = -6; static constexpr int8_t version_with_initial_retention = -7; static constexpr int8_t version_with_write_caching = -8; + static constexpr int8_t version_with_tombstone_retention_ms = -9; // negative version indicating different format: // -1 - topic_updates with data_policy // -2 - topic_updates without data_policy @@ -581,7 +582,9 @@ struct incremental_topic_updates // -6 - topic updates with schema id validation // -7 - topic updates with initial retention // -8 - write caching properties - static constexpr int8_t version = version_with_write_caching; + // -9 - tombstone_retention_ms + static constexpr int8_t version = version_with_tombstone_retention_ms; + property_update> compression; property_update> cleanup_policy_bitflags; @@ -622,6 +625,8 @@ struct incremental_topic_updates property_update> write_caching; property_update> flush_ms; property_update> flush_bytes; + property_update> + tombstone_retention_ms; auto serde_fields() { return std::tie( @@ -650,7 +655,8 @@ struct incremental_topic_updates initial_retention_local_target_ms, write_caching, flush_ms, - flush_bytes); + flush_bytes, + tombstone_retention_ms); } friend std::ostream& diff --git a/src/v/compat/cluster_compat.h b/src/v/compat/cluster_compat.h index 744c36a2ad35..8a1c40029a8f 100644 --- a/src/v/compat/cluster_compat.h +++ b/src/v/compat/cluster_compat.h @@ -358,6 +358,7 @@ struct compat_check { json_write(flush_ms); json_write(flush_bytes); json_write(remote_topic_namespace_override); + json_write(tombstone_retention_ms); } static cluster::topic_properties from_json(json::Value& rd) { @@ -394,6 +395,7 @@ struct compat_check { json_read(flush_ms); json_read(flush_bytes); json_read(remote_topic_namespace_override); + json_read(tombstone_retention_ms); return obj; } @@ -425,6 +427,7 @@ struct compat_check { obj.flush_bytes = std::nullopt; obj.flush_ms = std::nullopt; obj.remote_topic_namespace_override = std::nullopt; + obj.tombstone_retention_ms = std::nullopt; if (reply != obj) { throw compat_error(fmt::format( @@ -507,6 +510,7 @@ struct compat_check { obj.properties.write_caching = std::nullopt; obj.properties.flush_bytes = std::nullopt; obj.properties.flush_ms = std::nullopt; + obj.properties.tombstone_retention_ms = std::nullopt; obj.properties.mpx_virtual_cluster_id = std::nullopt; diff --git a/src/v/compat/cluster_generator.h b/src/v/compat/cluster_generator.h index c7f810bfd279..27f0c7bef27d 100644 --- a/src/v/compat/cluster_generator.h +++ b/src/v/compat/cluster_generator.h @@ -649,7 +649,8 @@ struct instance_generator { }), tests::random_optional([] { return tests::random_duration_ms(); }), tests::random_optional( - [] { return random_generators::get_int(); })}; + [] { return random_generators::get_int(); }), + tests::random_optional([] { return tests::random_duration_ms(); })}; } static std::vector limits() { return {}; } diff --git a/src/v/compat/cluster_json.h b/src/v/compat/cluster_json.h index b06e544c04a5..a2ceffa3658b 100644 --- a/src/v/compat/cluster_json.h +++ b/src/v/compat/cluster_json.h @@ -625,6 +625,7 @@ inline void rjson_serialize( write_exceptional_member_type(w, "write_caching", tps.write_caching); write_member(w, "flush_bytes", tps.flush_bytes); write_member(w, "flush_ms", tps.flush_ms); + write_member(w, "tombstone_retention_ms", tps.tombstone_retention_ms); w.EndObject(); } @@ -695,6 +696,7 @@ inline void read_value(json::Value const& rd, cluster::topic_properties& obj) { read_member(rd, "write_caching", obj.write_caching); read_member(rd, "flush_bytes", obj.flush_bytes); read_member(rd, "flush_ms", obj.flush_ms); + read_member(rd, "tombstone_retention_ms", obj.tombstone_retention_ms); } inline void rjson_serialize( diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index f2bae6bb7a54..9bf9235a1c79 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -783,7 +783,6 @@ configuration::configuration() .visibility = visibility::user}, {}, validate_connection_rate) - , transactional_id_expiration_ms( *this, "transactional_id_expiration_ms", @@ -844,6 +843,13 @@ configuration::configuration() "How often do we trigger background compaction", {.needs_restart = needs_restart::no, .visibility = visibility::user}, 10s) + , tombstone_retention_ms( + *this, + "tombstone_retention_ms", + "The retention time for tombstone records in a compacted topic", + {.needs_restart = needs_restart::no, .visibility = visibility::user}, + std::nullopt, + validate_tombstone_retention_ms) , log_disable_housekeeping_for_tests( *this, "log_disable_housekeeping_for_tests", @@ -1631,7 +1637,7 @@ configuration::configuration() *this, "cloud_storage_enabled", "Enable archival storage", - {.visibility = visibility::user}, + {.needs_restart = needs_restart::yes, .visibility = visibility::user}, false) , cloud_storage_enable_remote_read( *this, diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index ece2c7e94c26..70048d036bb8 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -178,6 +178,8 @@ struct configuration final : public config_store { // same as log.retention.ms in kafka retention_duration_property log_retention_ms; property log_compaction_interval_ms; + // same as delete.retention.ms in kafka + property> tombstone_retention_ms; property log_disable_housekeeping_for_tests; property log_compaction_use_sliding_window; // same as retention.size in kafka - TODO: size not implemented diff --git a/src/v/config/validators.cc b/src/v/config/validators.cc index 9ae4e95a3f13..2a049e3ed355 100644 --- a/src/v/config/validators.cc +++ b/src/v/config/validators.cc @@ -247,4 +247,18 @@ validate_cloud_storage_api_endpoint(const std::optional& os) { return std::nullopt; } +std::optional validate_tombstone_retention_ms( + const std::optional& ms) { + const auto& cloud_storage_enabled + = config::shard_local_cfg().cloud_storage_enabled; + if (cloud_storage_enabled() && ms.has_value()) { + return fmt::format( + "cannot set {} if {} is enabled at the cluster level", + config::shard_local_cfg().tombstone_retention_ms.name(), + cloud_storage_enabled.name()); + } + + return std::nullopt; +} + }; // namespace config diff --git a/src/v/config/validators.h b/src/v/config/validators.h index 7a1230f79399..ffec081e61da 100644 --- a/src/v/config/validators.h +++ b/src/v/config/validators.h @@ -56,4 +56,7 @@ validate_audit_excluded_topics(const std::vector&); std::optional validate_cloud_storage_api_endpoint(const std::optional& os); +std::optional validate_tombstone_retention_ms( + const std::optional& ms); + }; // namespace config diff --git a/src/v/kafka/server/handlers/alter_configs.cc b/src/v/kafka/server/handlers/alter_configs.cc index 4d415ebcb7e3..cd5174987a9f 100644 --- a/src/v/kafka/server/handlers/alter_configs.cc +++ b/src/v/kafka/server/handlers/alter_configs.cc @@ -77,7 +77,7 @@ create_topic_properties_update(alter_configs_resource& resource) { std::apply(apply_op(op_t::none), update.custom_properties.serde_fields()); static_assert( - std::tuple_size_v == 26, + std::tuple_size_v == 27, "If you added a property, please decide on it's default alter config " "policy, and handle the update in the loop below"); static_assert( @@ -276,6 +276,15 @@ create_topic_properties_update(alter_configs_resource& resource) { flush_bytes_validator{}); continue; } + if (cfg.name == topic_property_tombstone_retention_ms) { + parse_and_set_optional_duration( + update.properties.tombstone_retention_ms, + cfg.value, + kafka::config_resource_operation::set, + tombstone_retention_ms_validator{}, + true); + continue; + } } catch (const validation_error& e) { return make_error_alter_config_resource_response< @@ -315,7 +324,7 @@ alter_topic_configuration( } static ss::future> -alter_broker_configuartion(chunked_vector resources) { +alter_broker_configuration(chunked_vector resources) { return unsupported_broker_configuration< alter_configs_resource, alter_configs_resource_response>( @@ -357,7 +366,7 @@ ss::future alter_configs_handler::handle( futures.push_back(alter_topic_configuration( ctx, std::move(groupped.topic_changes), request.data.validate_only)); futures.push_back( - alter_broker_configuartion(std::move(groupped.broker_changes))); + alter_broker_configuration(std::move(groupped.broker_changes))); auto ret = co_await ss::when_all_succeed(futures.begin(), futures.end()); // include authorization errors diff --git a/src/v/kafka/server/handlers/configs/config_response_utils.cc b/src/v/kafka/server/handlers/configs/config_response_utils.cc index 00b119c31174..284b6b2da0ae 100644 --- a/src/v/kafka/server/handlers/configs/config_response_utils.cc +++ b/src/v/kafka/server/handlers/configs/config_response_utils.cc @@ -100,7 +100,7 @@ consteval describe_configs_type property_config_type() { std::is_same_v || std::is_same_v || std::is_same_v || - std::is_same_v || + std::is_same_v || std::is_same_v || std::is_same_v; @@ -377,7 +377,7 @@ static void add_topic_config( // Wrap overrides in an optional because add_topic_config expects // optional where S = tristate std::optional> override_value; - if (overrides.is_disabled() || overrides.has_optional_value()) { + if (overrides.is_engaged()) { override_value = std::make_optional(overrides); } @@ -634,6 +634,9 @@ config_response_container_t make_topic_configs( &describe_as_string); // Shadow indexing properties + // These are "sticky" properties and do not use the cluster default after + // topic creation. The override should always govern the topic config's + // value. add_topic_config_if_requested( config_keys, result, @@ -643,7 +646,7 @@ config_response_container_t make_topic_configs( topic_property_remote_read, topic_properties.shadow_indexing.has_value() ? std::make_optional( model::is_fetch_enabled(*topic_properties.shadow_indexing)) - : std::nullopt, + : std::make_optional(false), include_synonyms, maybe_make_documentation( include_documentation, @@ -660,7 +663,7 @@ config_response_container_t make_topic_configs( topic_property_remote_write, topic_properties.shadow_indexing.has_value() ? std::make_optional( model::is_archival_enabled(*topic_properties.shadow_indexing)) - : std::nullopt, + : std::make_optional(false), include_synonyms, maybe_make_documentation( include_documentation, @@ -741,6 +744,25 @@ config_response_container_t make_topic_configs( }); } + // Also considered a sticky property, override should always govern topic + // config's value. + add_topic_config_if_requested( + config_keys, + result, + topic_property_tombstone_retention_ms, + metadata_cache.get_default_tombstone_retention_ms().value_or( + std::chrono::milliseconds{-1}), + topic_property_tombstone_retention_ms, + topic_properties.tombstone_retention_ms.has_value() + ? topic_properties.tombstone_retention_ms + : std::make_optional(-1), + include_synonyms, + maybe_make_documentation( + include_documentation, + config::shard_local_cfg().tombstone_retention_ms.desc()), + &describe_as_string, + /*hide_default_override=*/true); + constexpr std::string_view key_validation = "Enable validation of the schema id for keys on a record"; constexpr std::string_view val_validation diff --git a/src/v/kafka/server/handlers/configs/config_utils.h b/src/v/kafka/server/handlers/configs/config_utils.h index 7d538c2468db..53be5996c815 100644 --- a/src/v/kafka/server/handlers/configs/config_utils.h +++ b/src/v/kafka/server/handlers/configs/config_utils.h @@ -448,6 +448,24 @@ struct flush_bytes_validator { } }; +struct tombstone_retention_ms_validator { + std::optional operator()( + const ss::sstring&, + const std::optional& maybe_value) { + if (maybe_value.has_value()) { + const auto& value = maybe_value.value(); + if (value < 1ms || value > serde::max_serializable_ms) { + return fmt::format( + "tombstone.retention.ms value invalid, expected to be in " + "range " + "[1, {}]", + serde::max_serializable_ms); + } + } + return std::nullopt; + } +}; + template requires requires( model::topic_namespace_view tns, @@ -609,11 +627,18 @@ inline void parse_and_set_bool( } } -template +template>> +requires requires( + const tristate& value, const ss::sstring& str, Validator validator) { + { + validator(str, value) + } -> std::convertible_to>; +} void parse_and_set_tristate( cluster::property_update>& property, const std::optional& value, - config_resource_operation op) { + config_resource_operation op, + Validator validator = noop_validator>{}) { // remove property value if (op == config_resource_operation::remove) { property.op = cluster::incremental_update_operation::remove; @@ -628,6 +653,11 @@ void parse_and_set_tristate( property.value = tristate(std::make_optional(parsed)); } + auto v_error = validator(*value, property.value); + if (v_error) { + throw validation_error(*v_error); + } + property.op = cluster::incremental_update_operation::set; return; } diff --git a/src/v/kafka/server/handlers/create_topics.cc b/src/v/kafka/server/handlers/create_topics.cc index 073be27355a0..907e103ac3e0 100644 --- a/src/v/kafka/server/handlers/create_topics.cc +++ b/src/v/kafka/server/handlers/create_topics.cc @@ -67,7 +67,8 @@ static constexpr auto supported_configs = std::to_array( topic_property_initial_retention_local_target_ms, topic_property_write_caching, topic_property_flush_ms, - topic_property_flush_bytes}); + topic_property_flush_bytes, + topic_property_tombstone_retention_ms}); bool is_supported(std::string_view name) { return std::any_of( @@ -92,7 +93,8 @@ using validators = make_validator_types< subject_name_strategy_validator, replication_factor_must_be_greater_or_equal_to_minimum, vcluster_id_validator, - write_caching_configs_validator>; + write_caching_configs_validator, + tombstone_retention_ms_validator>; static void append_topic_configs(request_context& ctx, create_topics_response& response) { diff --git a/src/v/kafka/server/handlers/incremental_alter_configs.cc b/src/v/kafka/server/handlers/incremental_alter_configs.cc index 13e4ad1f9eda..8e76ae4f623e 100644 --- a/src/v/kafka/server/handlers/incremental_alter_configs.cc +++ b/src/v/kafka/server/handlers/incremental_alter_configs.cc @@ -300,6 +300,15 @@ create_topic_properties_update( flush_bytes_validator{}); continue; } + if (cfg.name == topic_property_tombstone_retention_ms) { + parse_and_set_optional_duration( + update.properties.tombstone_retention_ms, + cfg.value, + op, + tombstone_retention_ms_validator{}, + true); + continue; + } } catch (const validation_error& e) { vlog( @@ -355,6 +364,7 @@ inline std::string_view map_config_name(std::string_view input) { .match("log.message.timestamp.type", "log_message_timestamp_type") .match("log.compression.type", "log_compression_type") .match("log.roll.ms", "log_segment_ms") + .match("delete.retention.ms", "tombstone_retention_ms") .default_match(input); } diff --git a/src/v/kafka/server/handlers/topics/types.cc b/src/v/kafka/server/handlers/topics/types.cc index 0d7d61de5f99..7b3ed0bc7461 100644 --- a/src/v/kafka/server/handlers/topics/types.cc +++ b/src/v/kafka/server/handlers/topics/types.cc @@ -21,6 +21,7 @@ #include "model/timestamp.h" #include "pandaproxy/schema_registry/subject_name_strategy.h" #include "strings/string_switch.h" +#include "utils/tristate.h" #include @@ -105,46 +106,6 @@ get_string_value(const config_map_t& config, std::string_view key) { return std::nullopt; } -// Either parse configuration or return nullopt -static std::optional -get_bool_value(const config_map_t& config, std::string_view key) { - if (auto it = config.find(key); it != config.end()) { - return string_switch>(it->second) - .match("true", true) - .match("false", false) - .default_match(std::nullopt); - } - return std::nullopt; -} - -static model::shadow_indexing_mode -get_shadow_indexing_mode(const config_map_t& config) { - auto arch_enabled = get_bool_value(config, topic_property_remote_write); - auto si_enabled = get_bool_value(config, topic_property_remote_read); - - // If topic properties are missing, patch them with the cluster config. - if (!arch_enabled) { - arch_enabled - = config::shard_local_cfg().cloud_storage_enable_remote_write(); - } - - if (!si_enabled) { - si_enabled - = config::shard_local_cfg().cloud_storage_enable_remote_read(); - } - - model::shadow_indexing_mode mode = model::shadow_indexing_mode::disabled; - if (*arch_enabled) { - mode = model::shadow_indexing_mode::archival; - } - if (*si_enabled) { - mode = mode == model::shadow_indexing_mode::archival - ? model::shadow_indexing_mode::full - : model::shadow_indexing_mode::fetch; - } - return mode; -} - // Special case for options where Kafka allows -1 // In redpanda the mapping is following // @@ -162,7 +123,7 @@ get_tristate_value(const config_map_t& config, std::string_view key) { } // disabled case if (v <= 0) { - return tristate{}; + return tristate(disable_tristate); } return tristate(std::make_optional(*v)); } @@ -251,6 +212,17 @@ to_cluster_type(const creatable_topic& t) { cfg.properties.flush_bytes = get_config_value( config_entries, topic_property_flush_bytes); + cfg.properties.tombstone_retention_ms + = get_duration_value( + config_entries, topic_property_tombstone_retention_ms); + + // Attempt to set tombstone_retention_ms with the cluster default, if it + // does not have an assigned value. + if (!cfg.properties.tombstone_retention_ms.has_value()) { + cfg.properties.tombstone_retention_ms + = config::shard_local_cfg().tombstone_retention_ms(); + } + schema_id_validation_config_parser schema_id_validation_config_parser{ cfg.properties}; diff --git a/src/v/kafka/server/handlers/topics/types.h b/src/v/kafka/server/handlers/topics/types.h index d5aa76f2362e..37bc33b91a05 100644 --- a/src/v/kafka/server/handlers/topics/types.h +++ b/src/v/kafka/server/handlers/topics/types.h @@ -62,6 +62,8 @@ static constexpr std::string_view topic_property_write_caching static constexpr std::string_view topic_property_flush_ms = "flush.ms"; static constexpr std::string_view topic_property_flush_bytes = "flush.bytes"; +static constexpr std::string_view topic_property_tombstone_retention_ms + = "tombstone.retention.ms"; // Server side schema id validation static constexpr std::string_view topic_property_record_key_schema_id_validation @@ -144,4 +146,44 @@ std::vector report_topic_configs( const cluster::metadata_cache& metadata_cache, const cluster::topic_properties& topic_properties); +// Either parse configuration or return nullopt +inline std::optional +get_bool_value(const config_map_t& config, std::string_view key) { + if (auto it = config.find(key); it != config.end()) { + return string_switch>(it->second) + .match("true", true) + .match("false", false) + .default_match(std::nullopt); + } + return std::nullopt; +} + +inline model::shadow_indexing_mode +get_shadow_indexing_mode(const config_map_t& config) { + auto arch_enabled = get_bool_value(config, topic_property_remote_write); + auto si_enabled = get_bool_value(config, topic_property_remote_read); + + // If topic properties are missing, patch them with the cluster config. + if (!arch_enabled) { + arch_enabled + = config::shard_local_cfg().cloud_storage_enable_remote_write(); + } + + if (!si_enabled) { + si_enabled + = config::shard_local_cfg().cloud_storage_enable_remote_read(); + } + + model::shadow_indexing_mode mode = model::shadow_indexing_mode::disabled; + if (*arch_enabled) { + mode = model::shadow_indexing_mode::archival; + } + if (*si_enabled) { + mode = mode == model::shadow_indexing_mode::archival + ? model::shadow_indexing_mode::full + : model::shadow_indexing_mode::fetch; + } + return mode; +} + } // namespace kafka diff --git a/src/v/kafka/server/handlers/topics/validators.h b/src/v/kafka/server/handlers/topics/validators.h index a0cce4b0767a..51c8384b5681 100644 --- a/src/v/kafka/server/handlers/topics/validators.h +++ b/src/v/kafka/server/handlers/topics/validators.h @@ -14,6 +14,7 @@ #include "kafka/protocol/schemata/create_topics_request.h" #include "kafka/protocol/schemata/create_topics_response.h" #include "kafka/server/handlers/topics/types.h" +#include "model/fundamental.h" #include "model/metadata.h" #include "model/namespace.h" @@ -344,6 +345,32 @@ struct write_caching_configs_validator { } }; +struct tombstone_retention_ms_validator { + static constexpr const char* error_message + = "Unsupported tombstone_retention_ms configuration, cannot be enabled " + "at the same time as repanda.remote.read or redpanda.remote.write."; + static constexpr const auto config_name + = topic_property_tombstone_retention_ms; + static constexpr error_code ec = error_code::invalid_config; + + static bool is_valid(const creatable_topic& c) { + const auto config_entries = config_map(c.configs); + auto end = config_entries.end(); + bool tombstone_retention_ms + = (config_entries.find(topic_property_tombstone_retention_ms) != end); + + auto shadow_indexing_mode = get_shadow_indexing_mode(config_entries); + // Cannot set tombstone_retention_ms at the same time as any tiered + // storage properties. + if ( + tombstone_retention_ms + && shadow_indexing_mode != model::shadow_indexing_mode::disabled) { + return false; + } + return true; + } +}; + template struct configuration_value_validator { static constexpr const char* error_message = T::error_message; diff --git a/src/v/kafka/server/tests/alter_config_test.cc b/src/v/kafka/server/tests/alter_config_test.cc index 37475e00be5d..bfd1858a4c52 100644 --- a/src/v/kafka/server/tests/alter_config_test.cc +++ b/src/v/kafka/server/tests/alter_config_test.cc @@ -393,7 +393,8 @@ FIXTURE_TEST( "initial.retention.local.target.ms", "write.caching", "flush.ms", - "flush.bytes"}; + "flush.bytes", + "tombstone.retention.ms"}; // All properties_request auto all_describe_resp = describe_configs(test_tp); @@ -481,6 +482,7 @@ FIXTURE_TEST(test_alter_single_topic_config, alter_config_test_fixture) { properties.emplace("write.caching", "true"); properties.emplace("flush.ms", "225"); properties.emplace("flush.bytes", "32468"); + properties.emplace("tombstone.retention.ms", "5678"); auto resp = alter_configs( make_alter_topic_config_resource_cv(test_tp, properties)); @@ -496,6 +498,8 @@ FIXTURE_TEST(test_alter_single_topic_config, alter_config_test_fixture) { assert_property_value(test_tp, "cleanup.policy", "compact", describe_resp); assert_property_value( test_tp, "redpanda.remote.read", "true", describe_resp); + assert_property_value( + test_tp, "tombstone.retention.ms", "5678", describe_resp); } FIXTURE_TEST(test_alter_multiple_topics_config, alter_config_test_fixture) { @@ -512,6 +516,7 @@ FIXTURE_TEST(test_alter_multiple_topics_config, alter_config_test_fixture) { properties_1.emplace("write.caching", "true"); properties_1.emplace("flush.ms", "225"); properties_1.emplace("flush.bytes", "32468"); + properties_1.emplace("tombstone.retention.ms", "5678"); absl::flat_hash_map properties_2; properties_2.emplace("retention.bytes", "4096"); @@ -519,6 +524,7 @@ FIXTURE_TEST(test_alter_multiple_topics_config, alter_config_test_fixture) { properties_2.emplace("write.caching", "false"); properties_2.emplace("flush.ms", "100"); properties_2.emplace("flush.bytes", "990"); + properties_2.emplace("tombstone.retention.ms", "8765"); auto cv = make_alter_topic_config_resource_cv(topic_1, properties_1); cv.push_back(make_alter_topic_config_resource(topic_2, properties_2)); @@ -540,12 +546,16 @@ FIXTURE_TEST(test_alter_multiple_topics_config, alter_config_test_fixture) { assert_property_value(topic_1, "write.caching", "true", describe_resp_1); assert_property_value(topic_1, "flush.ms", "225", describe_resp_1); assert_property_value(topic_1, "flush.bytes", "32468", describe_resp_1); + assert_property_value( + topic_1, "tombstone.retention.ms", "5678", describe_resp_1); auto describe_resp_2 = describe_configs(topic_2); assert_property_value(topic_2, "retention.bytes", "4096", describe_resp_2); assert_property_value(topic_2, "write.caching", "false", describe_resp_2); assert_property_value(topic_2, "flush.ms", "100", describe_resp_2); assert_property_value(topic_2, "flush.bytes", "990", describe_resp_2); + assert_property_value( + topic_2, "tombstone.retention.ms", "8765", describe_resp_2); } FIXTURE_TEST( @@ -607,6 +617,7 @@ FIXTURE_TEST( properties.emplace("write.caching", "true"); properties.emplace("flush.ms", "225"); properties.emplace("flush.bytes", "32468"); + properties.emplace("tombstone.retention.ms", "5678"); auto resp = alter_configs( make_alter_topic_config_resource_cv(test_tp, properties)); @@ -622,6 +633,8 @@ FIXTURE_TEST( assert_property_value(test_tp, "write.caching", "true", describe_resp); assert_property_value(test_tp, "flush.ms", "225", describe_resp); assert_property_value(test_tp, "flush.bytes", "32468", describe_resp); + assert_property_value( + test_tp, "tombstone.retention.ms", "5678", describe_resp); /** * Set custom properties again, previous settings should be overriden @@ -632,6 +645,7 @@ FIXTURE_TEST( new_properties.emplace("write.caching", "false"); new_properties.emplace("flush.ms", "9999"); new_properties.emplace("flush.bytes", "8888"); + new_properties.emplace("tombstone.retention.ms", "7777"); alter_configs(make_alter_topic_config_resource_cv(test_tp, new_properties)); @@ -648,6 +662,8 @@ FIXTURE_TEST( assert_property_value(test_tp, "write.caching", "false", new_describe_resp); assert_property_value(test_tp, "flush.ms", "9999", new_describe_resp); assert_property_value(test_tp, "flush.bytes", "8888", new_describe_resp); + assert_property_value( + test_tp, "tombstone.retention.ms", "7777", new_describe_resp); } FIXTURE_TEST(test_incremental_alter_config, alter_config_test_fixture) { @@ -671,6 +687,9 @@ FIXTURE_TEST(test_incremental_alter_config, alter_config_test_fixture) { properties.emplace( "flush.bytes", std::make_pair("5678", kafka::config_resource_operation::set)); + properties.emplace( + "tombstone.retention.ms", + std::make_pair("5678", kafka::config_resource_operation::set)); auto resp = incremental_alter_configs( make_incremental_alter_topic_config_resource_cv(test_tp, properties)); @@ -686,6 +705,8 @@ FIXTURE_TEST(test_incremental_alter_config, alter_config_test_fixture) { assert_property_value(test_tp, "write.caching", "true", describe_resp); assert_property_value(test_tp, "flush.ms", "1234", describe_resp); assert_property_value(test_tp, "flush.bytes", "5678", describe_resp); + assert_property_value( + test_tp, "tombstone.retention.ms", "5678", describe_resp); /** * Set only few properties, only they should be updated @@ -713,6 +734,8 @@ FIXTURE_TEST(test_incremental_alter_config, alter_config_test_fixture) { assert_property_value(test_tp, "write.caching", "false", new_describe_resp); assert_property_value(test_tp, "flush.ms", "1234", new_describe_resp); assert_property_value(test_tp, "flush.bytes", "5678", new_describe_resp); + assert_property_value( + test_tp, "tombstone.retention.ms", "5678", new_describe_resp); } FIXTURE_TEST( @@ -758,6 +781,9 @@ FIXTURE_TEST(test_incremental_alter_config_remove, alter_config_test_fixture) { properties.emplace( "flush.bytes", std::make_pair("8888", kafka::config_resource_operation::set)); + properties.emplace( + "tombstone.retention.ms", + std::make_pair("7777", kafka::config_resource_operation::set)); auto resp = incremental_alter_configs( make_incremental_alter_topic_config_resource_cv(test_tp, properties)); @@ -773,6 +799,8 @@ FIXTURE_TEST(test_incremental_alter_config_remove, alter_config_test_fixture) { assert_property_value(test_tp, "write.caching", "true", describe_resp); assert_property_value(test_tp, "flush.ms", "9999", describe_resp); assert_property_value(test_tp, "flush.bytes", "8888", describe_resp); + assert_property_value( + test_tp, "tombstone.retention.ms", "7777", describe_resp); /** * Remove custom properties @@ -793,6 +821,9 @@ FIXTURE_TEST(test_incremental_alter_config_remove, alter_config_test_fixture) { new_properties.emplace( "flush.bytes", std::pair{std::nullopt, kafka::config_resource_operation::remove}); + new_properties.emplace( + "tombstone.retention.ms", + std::pair{std::nullopt, kafka::config_resource_operation::remove}); resp = incremental_alter_configs( make_incremental_alter_topic_config_resource_cv(test_tp, new_properties)); @@ -830,4 +861,11 @@ FIXTURE_TEST(test_incremental_alter_config_remove, alter_config_test_fixture) { .raft_replica_max_pending_flush_bytes() .value()), new_describe_resp); + assert_property_value( + test_tp, + "tombstone.retention.ms", + fmt::format( + "{}", + config::shard_local_cfg().tombstone_retention_ms().value_or(-1ms)), + new_describe_resp); } diff --git a/src/v/kafka/server/tests/group_tx_compaction_test.cc b/src/v/kafka/server/tests/group_tx_compaction_test.cc index 571102f3d5ae..a767c7d337cc 100644 --- a/src/v/kafka/server/tests/group_tx_compaction_test.cc +++ b/src/v/kafka/server/tests/group_tx_compaction_test.cc @@ -305,6 +305,7 @@ ss::future<> run_workload( model::timestamp::max(), std::nullopt, log->stm_manager()->max_collectible_offset(), + std::nullopt, ss::default_priority_class(), dummy_as, }) diff --git a/src/v/kafka/server/tests/produce_consume_utils.h b/src/v/kafka/server/tests/produce_consume_utils.h index 85ead9015f00..da2469f7f0d4 100644 --- a/src/v/kafka/server/tests/produce_consume_utils.h +++ b/src/v/kafka/server/tests/produce_consume_utils.h @@ -36,7 +36,8 @@ struct kv_t { size_t start, size_t num_records, std::optional val_start = std::nullopt, - size_t key_cardinality = 0) { + size_t key_cardinality = 0, + bool produce_tombstones = false) { size_t vstart = val_start.value_or(start); std::vector records; records.reserve(num_records); @@ -45,8 +46,10 @@ struct kv_t { if (key_cardinality > 0) { key = key % key_cardinality; } - records.emplace_back( - ssx::sformat("key{}", key), ssx::sformat("val{}", vstart + i)); + const auto value = produce_tombstones + ? "" + : ssx::sformat("val{}", vstart + i); + records.emplace_back(ssx::sformat("key{}", key), value); } return records; } diff --git a/src/v/model/record.h b/src/v/model/record.h index 1c16c79cb3fb..8a6cca51ab1b 100644 --- a/src/v/model/record.h +++ b/src/v/model/record.h @@ -227,7 +227,8 @@ class record { const iobuf& value() const { return _value; } iobuf release_value() { return std::exchange(_value, {}); } iobuf share_value() { return _value.share(0, _value.size_bytes()); } - bool has_value() const { return _val_size >= 0; } + bool has_value() const { return _val_size > 0; } + bool is_tombstone() const { return !has_value(); } const std::vector& headers() const { return _headers; } std::vector& headers() { return _headers; } @@ -360,7 +361,7 @@ class record_batch_attributes final { record_batch_attributes& operator|=(model::compression c) { // clang-format off _attributes |= - static_cast>(c) + static_cast>(c) & record_batch_attributes::compression_mask; // clang-format on return *this; diff --git a/src/v/raft/tests/append_entries_test.cc b/src/v/raft/tests/append_entries_test.cc index e07c12fc9fa1..d26f570c51ba 100644 --- a/src/v/raft/tests/append_entries_test.cc +++ b/src/v/raft/tests/append_entries_test.cc @@ -544,6 +544,7 @@ FIXTURE_TEST(test_collected_log_recovery, raft_test_fixture) { first_ts, 100_MiB, model::offset::max(), + std::nullopt, ss::default_priority_class(), as, storage::ntp_sanitizer_config{.sanitize_only = true})) diff --git a/src/v/redpanda/admin/server.cc b/src/v/redpanda/admin/server.cc index a9a2c9a552c6..1ba78cda380c 100644 --- a/src/v/redpanda/admin/server.cc +++ b/src/v/redpanda/admin/server.cc @@ -1698,6 +1698,24 @@ void config_multi_property_validation( errors[ss::sstring(name)] = ssx::sformat( "{} requires schema_registry to be enabled in redpanda.yaml", name); } + + // For simplicity's sake, cloud storage read/write permissions cannot be + // enabled at the same time as tombstone_retention_ms at the cluster level, + // to avoid the case in which topics are created with TS read/write + // permissions and bugs are encountered later with tombstone removal. + if ( + (updated_config.cloud_storage_enabled() + || updated_config.cloud_storage_enable_remote_read() + || updated_config.cloud_storage_enable_remote_write()) + && updated_config.tombstone_retention_ms().has_value()) { + errors["cloud_storage_enabled"] = ssx::sformat( + "cannot have {} set at the same time as any of {}, {}, {} at the " + "cluster level", + updated_config.tombstone_retention_ms.name(), + updated_config.cloud_storage_enabled.name(), + updated_config.cloud_storage_enable_remote_read.name(), + updated_config.cloud_storage_enable_remote_write.name()); + } } } // namespace diff --git a/src/v/storage/disk_log_impl.cc b/src/v/storage/disk_log_impl.cc index 216f93872a01..8aadd0020747 100644 --- a/src/v/storage/disk_log_impl.cc +++ b/src/v/storage/disk_log_impl.cc @@ -645,13 +645,18 @@ ss::future disk_log_impl::sliding_window_compact( if (cfg.asrc) { cfg.asrc->check(); } + // A segment is considered "clean" if it has been fully indexed (all + // keys are de-duplicated) + const bool is_clean_compacted = seg->offsets().get_base_offset() + >= idx_start_offset; if (seg->offsets().get_base_offset() > map.max_offset()) { // The map was built from newest to oldest segments within this // sliding range. If we see a new segment whose offsets are all // higher than those indexed, it may be because the segment is // entirely comprised of non-data batches. Mark it as compacted so // we can progress through compactions. - seg->mark_as_finished_windowed_compaction(); + internal::mark_segment_as_finished_window_compaction( + seg, is_clean_compacted); vlog( gclog.debug, "[{}] treating segment as compacted, offsets fall above highest " @@ -663,8 +668,14 @@ ss::future disk_log_impl::sliding_window_compact( } if (!seg->may_have_compactible_records()) { // All data records are already compacted away. Skip to avoid a - // needless rewrite. - seg->mark_as_finished_windowed_compaction(); + // needless rewrite. But, still flush index in case we are clean + // compacted to persist clean_compact_timestamp + internal::mark_segment_as_finished_window_compaction( + seg, is_clean_compacted); + if (is_clean_compacted) { + co_await seg->index().flush(); + } + vlog( gclog.trace, "[{}] treating segment as compacted, either all non-data " @@ -673,6 +684,7 @@ ss::future disk_log_impl::sliding_window_compact( seg->filename()); continue; } + // TODO: implement a segment replacement strategy such that each term // tries to write only one segment (or more if the term had a large // amount of data), rather than replacing N segments with N segments. @@ -758,11 +770,15 @@ ss::future disk_log_impl::sliding_window_compact( seg->index().swap_index_state(std::move(new_idx)); seg->force_set_commit_offset_from_index(); seg->release_batch_cache_index(); + + // Mark the segment as completed window compaction, and possibly set the + // clean_compact_timestamp in it's index. + internal::mark_segment_as_finished_window_compaction( + seg, is_clean_compacted); + co_await seg->index().flush(); co_await ss::rename_file( cmp_idx_tmpname.string(), cmp_idx_name.string()); - - seg->mark_as_finished_windowed_compaction(); _probe->segment_compacted(); _probe->add_compaction_removed_bytes( ssize_t(size_before) - ssize_t(size_after)); @@ -774,7 +790,9 @@ ss::future disk_log_impl::sliding_window_compact( vlog( gclog.debug, "[{}] Final compacted segment {}", config().ntp(), seg); } + _last_compaction_window_start_offset = idx_start_offset; + co_return true; } diff --git a/src/v/storage/index_state.cc b/src/v/storage/index_state.cc index 408084fb43be..055e35845d28 100644 --- a/src/v/storage/index_state.cc +++ b/src/v/storage/index_state.cc @@ -159,8 +159,9 @@ std::ostream& operator<<(std::ostream& o, const index_state& s) { << ", non_data_timestamps:" << s.non_data_timestamps << ", broker_timestamp:" << s.broker_timestamp << ", num_compactible_records_appended:" - << s.num_compactible_records_appended << ", index(" - << s.relative_offset_index.size() << "," + << s.num_compactible_records_appended + << ", clean_compact_timestamp:" << s.clean_compact_timestamp + << ", index(" << s.relative_offset_index.size() << "," << s.relative_time_index.size() << "," << s.position_index.size() << ")}"; } @@ -182,6 +183,7 @@ void index_state::serde_write(iobuf& out) const { write(tmp, non_data_timestamps); write(tmp, broker_timestamp); write(tmp, num_compactible_records_appended); + write(tmp, clean_compact_timestamp); crc::crc32c crc; crc_extend_iobuf(crc, tmp); @@ -278,6 +280,11 @@ void read_nested( } else { st.num_compactible_records_appended = std::nullopt; } + if (hdr._version >= index_state::clean_compact_timestamp_version) { + read_nested(p, st.clean_compact_timestamp, 0U); + } else { + st.clean_compact_timestamp = std::nullopt; + } } index_state index_state::copy() const { return *this; } @@ -357,7 +364,8 @@ index_state::index_state(const index_state& o) noexcept , with_offset(o.with_offset) , non_data_timestamps(o.non_data_timestamps) , broker_timestamp(o.broker_timestamp) - , num_compactible_records_appended(o.num_compactible_records_appended) {} + , num_compactible_records_appended(o.num_compactible_records_appended) + , clean_compact_timestamp(o.clean_compact_timestamp) {} namespace serde_compat { uint64_t index_state_serde::checksum(const index_state& r) { diff --git a/src/v/storage/index_state.h b/src/v/storage/index_state.h index 8c0863dc64b8..728bda60db87 100644 --- a/src/v/storage/index_state.h +++ b/src/v/storage/index_state.h @@ -74,10 +74,11 @@ class offset_time_index { 1 byte - non_data_timestamps */ struct index_state - : serde::envelope, serde::compat_version<4>> { + : serde::envelope, serde::compat_version<4>> { static constexpr auto monotonic_timestamps_version = 5; static constexpr auto broker_timestamp_version = 6; static constexpr auto num_compactible_records_version = 7; + static constexpr auto clean_compact_timestamp_version = 8; static index_state make_empty_index(offset_delta_time with_offset); @@ -131,6 +132,12 @@ struct index_state // support this field, and we can't conclude anything. std::optional num_compactible_records_appended{0}; + // If set, the timestamp at which every record up to and including + // those in this segment were first compacted via sliding window. + // If not yet set, sliding window compaction has not yet been applied to + // every previous record in the log. + std::optional clean_compact_timestamp{std::nullopt}; + size_t size() const; bool empty() const; diff --git a/src/v/storage/log_manager.cc b/src/v/storage/log_manager.cc index 9f95ddeedb77..d656626f51ee 100644 --- a/src/v/storage/log_manager.cc +++ b/src/v/storage/log_manager.cc @@ -275,6 +275,7 @@ log_manager::housekeeping_scan(model::timestamp collection_threshold) { collection_threshold, _config.retention_bytes(), current_log.handle->stm_manager()->max_collectible_offset(), + current_log.handle->config().tombstone_retention_ms(), _config.compaction_priority, _abort_source, std::move(ntp_sanitizer_cfg), diff --git a/src/v/storage/ntp_config.h b/src/v/storage/ntp_config.h index 9348cf5d54f8..c288d759b9be 100644 --- a/src/v/storage/ntp_config.h +++ b/src/v/storage/ntp_config.h @@ -77,6 +77,10 @@ class ntp_config { std::optional flush_ms; std::optional flush_bytes; + // Should not be enabled at the same time as any other tiered storage + // properties. + std::optional tombstone_retention_ms; + friend std::ostream& operator<<(std::ostream&, const default_overrides&); }; @@ -288,6 +292,29 @@ class ntp_config { : cluster_default; } + std::optional tombstone_retention_ms() const { + if (_overrides) { + // Sanity check, tombstone deletion should not be enabled at the + // same time as tiered storage. + if ( + _overrides->shadow_indexing_mode.has_value() + && _overrides->shadow_indexing_mode.value() + != model::shadow_indexing_mode::disabled) { + std::cout << "SI enabled, Returning nullopt from " + "tombstone_retention_ms\n"; + return std::nullopt; + } + if (_overrides->tombstone_retention_ms.has_value()) { + std::cout << "Returning " + << _overrides->tombstone_retention_ms.value() + << "from tombstone_retention_ms\n"; + return _overrides->tombstone_retention_ms.value(); + } + } + std::cout << "Returning nullopt from tombstone_retention_ms\n"; + return std::nullopt; + } + std::optional cleanup_policy_override() const { return _overrides ? _overrides->cleanup_policy_bitflags : std::nullopt; diff --git a/src/v/storage/opfuzz/opfuzz.cc b/src/v/storage/opfuzz/opfuzz.cc index 5e8f9fb241af..e9772c34ab43 100644 --- a/src/v/storage/opfuzz/opfuzz.cc +++ b/src/v/storage/opfuzz/opfuzz.cc @@ -511,6 +511,7 @@ struct compact_op final : opfuzz::op { model::timestamp::max(), std::nullopt, model::offset::max(), + std::nullopt, ss::default_priority_class(), *(ctx._as), storage::ntp_sanitizer_config{.sanitize_only = true}); diff --git a/src/v/storage/segment.cc b/src/v/storage/segment.cc index 41a3d8783d17..9116322d75be 100644 --- a/src/v/storage/segment.cc +++ b/src/v/storage/segment.cc @@ -897,7 +897,7 @@ bool segment::may_have_compactible_records() const { // that there were no data records, so err on the side of caution. return true; } - return num_compactible_records.value() > 1; + return num_compactible_records.value() > 0; } } // namespace storage diff --git a/src/v/storage/segment_deduplication_utils.cc b/src/v/storage/segment_deduplication_utils.cc index ac110eb1b800..66e4cc60ed48 100644 --- a/src/v/storage/segment_deduplication_utils.cc +++ b/src/v/storage/segment_deduplication_utils.cc @@ -9,6 +9,7 @@ #include "storage/segment_deduplication_utils.h" +#include "model/timestamp.h" #include "storage/compacted_index_writer.h" #include "storage/compaction_reducers.h" #include "storage/index_state.h" @@ -164,9 +165,13 @@ ss::future deduplicate_segment( seg, cfg, probe, std::move(read_holder)); auto compaction_placeholder_enabled = feature_table.local().is_active( features::feature::compaction_placeholder_batch); + + const std::optional tombstone_delete_horizon + = internal::get_tombstone_delete_horizon(seg, cfg); auto copy_reducer = internal::copy_data_segment_reducer( [&map, segment_last_offset = seg->offsets().get_committed_offset(), + tombstone_delete_horizon = tombstone_delete_horizon, compaction_placeholder_enabled]( const model::record_batch& b, const model::record& r, @@ -185,6 +190,12 @@ ss::future deduplicate_segment( b.header()); return ss::make_ready_future(true); } + // Potentially deal with tombstone record removal + if (internal::should_remove_tombstone_record( + r, tombstone_delete_horizon)) { + return ss::make_ready_future(false); + } + return should_keep(map, b, r); }, &appender, @@ -198,6 +209,7 @@ ss::future deduplicate_segment( auto new_idx = co_await std::move(rdr).consume( std::move(copy_reducer), model::no_timeout); new_idx.broker_timestamp = seg->index().broker_timestamp(); + new_idx.clean_compact_timestamp = seg->index().clean_compact_timestamp(); co_return new_idx; } diff --git a/src/v/storage/segment_index.cc b/src/v/storage/segment_index.cc index fbb8109742a3..a0bbf1452272 100644 --- a/src/v/storage/segment_index.cc +++ b/src/v/storage/segment_index.cc @@ -46,7 +46,8 @@ segment_index::segment_index( size_t step, ss::sharded& feature_table, std::optional sanitizer_config, - std::optional broker_timestamp) + std::optional broker_timestamp, + std::optional clean_compact_timestamp) : _path(std::move(path)) , _step(step) , _feature_table(std::ref(feature_table)) @@ -55,6 +56,7 @@ segment_index::segment_index( , _sanitizer_config(std::move(sanitizer_config)) { _state.base_offset = base; _state.broker_timestamp = broker_timestamp; + _state.clean_compact_timestamp = clean_compact_timestamp; } segment_index::segment_index( @@ -86,10 +88,15 @@ ss::future segment_index::open() { } void segment_index::reset() { + // Persist the base offset and clean compaction timestamp through a reset. auto base = _state.base_offset; + auto clean_compact_timestamp = _state.clean_compact_timestamp; + _state = index_state::make_empty_index( storage::internal::should_apply_delta_time_offset(_feature_table)); + _state.base_offset = base; + _state.clean_compact_timestamp = clean_compact_timestamp; _acc = 0; } diff --git a/src/v/storage/segment_index.h b/src/v/storage/segment_index.h index 5526754b08fe..5dc5d3b8a848 100644 --- a/src/v/storage/segment_index.h +++ b/src/v/storage/segment_index.h @@ -129,7 +129,8 @@ class segment_index { size_t step, ss::sharded& feature_table, std::optional sanitizer_config, - std::optional broker_timestamp = std::nullopt); + std::optional broker_timestamp = std::nullopt, + std::optional clean_compact_timestamp = std::nullopt); ~segment_index() noexcept = default; segment_index(segment_index&&) noexcept = default; @@ -196,6 +197,24 @@ class segment_index { _state.broker_timestamp, _state.max_timestamp, _retention_timestamp); } + // Check if compacted timestamp has a value. + bool has_clean_compact_timestamp() const { + return _state.clean_compact_timestamp.has_value(); + } + + // Set the compacted timestamp, if it doesn't already have a value. + void maybe_set_clean_compact_timestamp(model::timestamp t) { + if (!_state.clean_compact_timestamp.has_value()) { + _state.clean_compact_timestamp = t; + _needs_persistence = true; + } + } + + // Get the compacted timestamp. + std::optional clean_compact_timestamp() const { + return _state.clean_compact_timestamp; + } + ss::future materialize_index(); ss::future<> flush(); ss::future<> truncate(model::offset, model::timestamp); diff --git a/src/v/storage/segment_utils.cc b/src/v/storage/segment_utils.cc index e16fa4f17eb8..8e16c44f3239 100644 --- a/src/v/storage/segment_utils.cc +++ b/src/v/storage/segment_utils.cc @@ -376,8 +376,13 @@ ss::future do_copy_segment_data( storage_resources& resources, offset_delta_time apply_offset, ss::sharded& feature_table) { - // preserve broker_timestamp from the segment's index + // preserve broker_timestamp and clean_compact_timestamp from the segment's + // index auto old_broker_timestamp = seg->index().broker_timestamp(); + auto old_clean_compact_timestamp = seg->index().clean_compact_timestamp(); + + const std::optional tombstone_delete_horizon + = get_tombstone_delete_horizon(seg, cfg); // find out which offsets will survive compaction auto idx_path = seg->reader().path().to_compacted_index(); @@ -411,10 +416,16 @@ ss::future do_copy_segment_data( seg->reader().filename(), tmpname); - auto should_keep = [compacted_list = std::move(compacted_offsets)]( + auto should_keep = [compacted_list = std::move(compacted_offsets), + tombstone_delete_horizon = tombstone_delete_horizon]( const model::record_batch& b, const model::record& r, bool) { + // Potentially deal with tombstone record removal + if (should_remove_tombstone_record(r, tombstone_delete_horizon)) { + return ss::make_ready_future(false); + } + const auto o = b.base_offset() + model::offset_delta(r.offset_delta()); return ss::make_ready_future(compacted_list.contains(o)); }; @@ -448,8 +459,10 @@ ss::future do_copy_segment_data( }); }); - // restore broker timestamp + // restore broker timestamp and clean compact timestamp new_index.broker_timestamp = old_broker_timestamp; + new_index.clean_compact_timestamp = old_clean_compact_timestamp; + co_return new_index; } @@ -853,13 +866,44 @@ make_concatenated_segment( }); return seg_it->index().broker_timestamp(); }(); + + // If both of the segments have a clean_compact_timestamp set, then the new + // index should use the maximum timestamp. If at least one segment doesn't + // have a clean_compact_timestamp, then the new index should not either. + auto new_clean_compact_timestamp = + [&]() -> std::optional { + // invariants: segments is not empty, but for completeness handle the + // empty case + if (unlikely(segments.empty())) { + return std::nullopt; + } + std::optional new_ts; + for (const auto& seg : segments) { + // If even one segment in the set does not have a + // clean_compact_timestamp, we must not mark the new index as having + // one. + if (!seg->index().has_clean_compact_timestamp()) { + return std::nullopt; + } + + auto clean_ts = seg->index().clean_compact_timestamp(); + if (!new_ts.has_value()) { + new_ts = clean_ts; + } else { + new_ts = std::max(new_ts.value(), clean_ts.value()); + } + } + return new_ts; + }(); + segment_index index( index_name, offsets.get_base_offset(), segment_index::default_data_buffer_step, feature_table, cfg.sanitizer_config, - new_broker_timestamp); + new_broker_timestamp, + new_clean_compact_timestamp); co_return std::make_tuple( ss::make_lw_shared( @@ -1086,4 +1130,40 @@ offset_delta_time should_apply_delta_time_offset( && feature_table.local().is_active(features::feature::node_isolation)}; } +void mark_segment_as_finished_window_compaction( + ss::lw_shared_ptr seg, bool set_clean_compact_timestamp) { + seg->mark_as_finished_windowed_compaction(); + if (set_clean_compact_timestamp) { + seg->index().maybe_set_clean_compact_timestamp(model::timestamp::now()); + } +} + +std::optional get_tombstone_delete_horizon( + ss::lw_shared_ptr seg, const compaction_config& cfg) { + std::optional tombstone_delete_horizon = {}; + const auto& tombstone_retention_ms_opt = cfg.tombstone_retention_ms; + if ( + seg->index().has_clean_compact_timestamp() + && tombstone_retention_ms_opt.has_value()) { + tombstone_delete_horizon = model::timestamp( + seg->index().clean_compact_timestamp()->value() + + cfg.tombstone_retention_ms->count()); + } + return tombstone_delete_horizon; +} + +bool should_remove_tombstone_record( + const model::record& r, + std::optional tombstone_delete_horizon) { + if (!r.is_tombstone()) { + return false; + } + + if (!tombstone_delete_horizon.has_value()) { + return false; + } + + return (model::timestamp::now() > tombstone_delete_horizon.value()); +} + } // namespace storage::internal diff --git a/src/v/storage/segment_utils.h b/src/v/storage/segment_utils.h index ba70d84169b7..7cf0fb4c8078 100644 --- a/src/v/storage/segment_utils.h +++ b/src/v/storage/segment_utils.h @@ -258,6 +258,41 @@ inline bool is_compactible(const model::record_batch& b) { offset_delta_time should_apply_delta_time_offset( ss::sharded& feature_table); +// Optionally returns the timestamp past which a tombstone may be removed. +// This returns a value iff the segment `s` has been marked as cleanly +// compacted, and the compaction_config has a value assigned for +// `tombstone_retention_ms`. In all other cases, `std::nullopt` is returned, +// indicating that tombstone records will not be removed if encountered. +std::optional get_tombstone_delete_horizon( + ss::lw_shared_ptr seg, const compaction_config& cfg); + +// Returns true iff the record `r` is a tombstone, and the timestamp returned by +// `now()` is past the `tombstone_delete_horizon` timestamp (in the case it has +// a value). Returns false in all other cases. +bool should_remove_tombstone_record( + const model::record& r, + std::optional tombstone_delete_horizon); + +// Mark a segment as completed window compaction, and whether it is "clean" (in +// which case the `clean_compact_timestamp` is set in the segment's index). +void mark_segment_as_finished_window_compaction( + ss::lw_shared_ptr seg, bool set_clean_compact_timestamp); + +// Optionally returns the timestamp past which a tombstone may be removed. +// This returns a value iff the segment `s` has been marked as cleanly +// compacted, and the compaction_config has a value assigned for +// `tombstone_retention_ms`. In all other cases, `std::nullopt` is returned, +// indicating that tombstone records will not be removed if encountered. +std::optional get_tombstone_delete_horizon( + ss::lw_shared_ptr seg, const compaction_config& cfg); + +// Returns true iff the record `r` is a tombstone, and the timestamp returned by +// `now()` is past the `tombstone_delete_horizon` timestamp (in the case it has +// a value). Returns false in all other cases. +bool should_remove_tombstone_record( + const model::record& r, + std::optional tombstone_delete_horizon); + template auto with_segment_reader_handle(segment_reader_handle handle, Func func) { static_assert( diff --git a/src/v/storage/tests/compaction_e2e_multinode_test.cc b/src/v/storage/tests/compaction_e2e_multinode_test.cc index 43964e0a57f7..1b84563d5c4a 100644 --- a/src/v/storage/tests/compaction_e2e_multinode_test.cc +++ b/src/v/storage/tests/compaction_e2e_multinode_test.cc @@ -77,6 +77,7 @@ FIXTURE_TEST(replicate_after_compaction, compaction_multinode_test) { model::timestamp::min(), std::nullopt, first_log->stm_manager()->max_collectible_offset(), + std::nullopt, ss::default_priority_class(), as); first_log->housekeeping(conf).get(); @@ -125,6 +126,7 @@ FIXTURE_TEST(replicate_after_compaction, compaction_multinode_test) { model::timestamp::min(), std::nullopt, new_log->stm_manager()->max_collectible_offset(), + std::nullopt, ss::default_priority_class(), as); new_log->housekeeping(conf2).get(); @@ -197,6 +199,7 @@ FIXTURE_TEST(compact_transactions_and_replicate, compaction_multinode_test) { model::timestamp::min(), std::nullopt, first_log->stm_manager()->max_collectible_offset(), + std::nullopt, ss::default_priority_class(), as); first_log->housekeeping(conf).get(); @@ -225,6 +228,7 @@ FIXTURE_TEST(compact_transactions_and_replicate, compaction_multinode_test) { model::timestamp::min(), std::nullopt, new_log->stm_manager()->max_collectible_offset(), + std::nullopt, ss::default_priority_class(), as); new_log->housekeeping(conf2).get(); diff --git a/src/v/storage/tests/compaction_e2e_test.cc b/src/v/storage/tests/compaction_e2e_test.cc index 79a1d84b3737..44472375256e 100644 --- a/src/v/storage/tests/compaction_e2e_test.cc +++ b/src/v/storage/tests/compaction_e2e_test.cc @@ -8,6 +8,7 @@ // by the Apache License, Version 2.0 #include "base/vlog.h" +#include "gtest/gtest.h" #include "kafka/server/tests/produce_consume_utils.h" #include "model/namespace.h" #include "model/record_batch_types.h" @@ -21,9 +22,15 @@ #include "utils/directory_walker.h" #include +#include #include +#include +#include + +using namespace std::chrono_literals; + namespace { ss::logger cmp_testlog("cmp_testlog"); } // anonymous namespace @@ -81,6 +88,8 @@ class CompactionFixtureTest , public redpanda_thread_fixture , public seastar_test { public: + using map_t = absl::btree_map; + ss::future<> SetUpAsync() override { test_local_cfg.get("election_timeout_ms").set_value(100ms); cluster::topic_properties props; @@ -108,19 +117,27 @@ class CompactionFixtureTest size_t num_segments, size_t cardinality, size_t batches_per_segment, - size_t records_per_batch = 1) { + size_t records_per_batch = 1, + size_t starting_value = 0, + bool produce_tombstones = false, + map_t* latest_kv = nullptr) { tests::kafka_produce_transport producer(co_await make_kafka_client()); co_await producer.start(); // Generate some segments. - size_t val_count = 0; - absl::btree_map latest_kv; + size_t val_count = starting_value; for (size_t i = 0; i < num_segments; i++) { for (int r = 0; r < batches_per_segment; r++) { auto kvs = tests::kv_t::sequence( - val_count, records_per_batch, val_count, cardinality); - for (const auto& [k, v] : kvs) { - latest_kv[k] = v; + val_count, + records_per_batch, + val_count, + cardinality, + produce_tombstones); + if (latest_kv) { + for (const auto& [k, v] : kvs) { + latest_kv->insert_or_assign(k, v); + } } co_await producer.produce_to_partition( topic_name, model::partition_id(0), std::move(kvs)); @@ -130,6 +147,66 @@ class CompactionFixtureTest co_await log->force_roll(ss::default_priority_class()); } } + + ss::future<> generate_tombstones( + size_t num_segments, + size_t cardinality, + size_t batches_per_segment, + size_t records_per_batch = 1, + size_t starting_value = 0, + map_t* latest_kv = nullptr) { + return generate_data( + num_segments, + cardinality, + batches_per_segment, + records_per_batch, + starting_value, + true, + latest_kv); + } + + // Generates a random mixture of records (tombstones optionally included). + // Returns a map of the most recently produce k-v pair. + ss::future<> generate_random_assorted_data( + size_t num_segments, + size_t cardinality, + size_t batches_per_segment, + size_t records_per_batch = 1, + bool include_tombstones = false, + map_t* latest_kv = nullptr) { + tests::kafka_produce_transport producer(co_await make_kafka_client()); + co_await producer.start(); + + // Generate some segments. + for (size_t s = 0; s < num_segments; ++s) { + for (size_t b = 0; b < batches_per_segment; ++b) { + std::vector kvs; + kvs.reserve(records_per_batch); + for (size_t r = 0; r < records_per_batch; ++r) { + const auto random_int = random_generators::get_int( + cardinality); + const auto key = ssx::sformat("key{}", random_int); + const bool is_tombstone + = include_tombstones + ? random_generators::random_choice({false, true}) + : false; + auto val = is_tombstone ? "" + : ssx::sformat("val{}", random_int); + kvs.emplace_back(std::move(key), std::move(val)); + } + if (latest_kv) { + for (const auto& [k, v] : kvs) { + latest_kv->insert_or_assign(k, v); + } + } + co_await producer.produce_to_partition( + topic_name, model::partition_id(0), std::move(kvs)); + } + co_await log->flush(); + co_await log->force_roll(ss::default_priority_class()); + } + } + ss::future> check_records(size_t cardinality, size_t max_duplicates) { tests::kafka_consume_transport consumer(co_await make_kafka_client()); @@ -174,6 +251,7 @@ TEST_P(CompactionFixtureParamTest, TestDedupeOnePass) { auto& disk_log = dynamic_cast(*log); storage::compaction_config cfg( disk_log.segments().back()->offsets().get_base_offset(), + std::nullopt, ss::default_priority_class(), never_abort, std::nullopt, @@ -223,6 +301,414 @@ TEST_P(CompactionFixtureParamTest, TestDedupeOnePass) { INSTANTIATE_TEST_SUITE_P( DuplicatesPerKey, CompactionFixtureParamTest, ::testing::Values(1, 10, 100)); +TEST_F(CompactionFixtureTest, TestTombstones) { + auto duplicates_per_key = 2; + auto total_records = 10; + auto cardinality = total_records / duplicates_per_key; + auto num_segments = 5; + size_t record_batches_per_segment = total_records / num_segments; + + generate_data(num_segments, cardinality, record_batches_per_segment).get(); + // Generate a tombstone record for "key0". + generate_tombstones(1, 1, 1).get(); + + auto tombstone_retention_ms = 1000ms; + + // Helper function to perform sliding window compaction. + auto do_sliding_window_compact = [tombstone_retention_ms](auto& log) { + // Compact, allowing the map to grow as large as we need. + ss::abort_source never_abort; + storage::compaction_config cfg( + log->segments().back()->offsets().get_base_offset(), + tombstone_retention_ms, + ss::default_priority_class(), + never_abort, + std::nullopt, + std::nullopt, + nullptr, + nullptr); + auto& disk_log = dynamic_cast(*log); + disk_log.sliding_window_compact(cfg).get(); + }; + + auto num_tombstone_segments = 1; + auto total_segments = num_segments + num_tombstone_segments; + + auto log_segment_count_before = log->segment_count(); + // Sanity check we created the right number of segments. + // NOTE: ignore the active segment. + auto segment_count_before = log_segment_count_before - 1; + ASSERT_EQ(segment_count_before, total_segments); + + // Perform first round of sliding window compaction. + do_sliding_window_compact(log); + + // Another sanity check after compaction. + auto segment_count_after = log->segment_count() - 1; + ASSERT_EQ(total_segments, segment_count_after); + + auto summary_after = dir_summary().get(); + ASSERT_NO_FATAL_FAILURE(summary_after.check_clean(total_segments)); + + // The number of duplicates can't exceed the number of segments - 1: the + // latest closed segment should have no duplicates, and at worst, each + // preceding segment will have 1 duplicate (the last record). + { + auto consumed_kvs + = check_records(cardinality, total_segments - 1).get(); + ASSERT_NO_FATAL_FAILURE(); + } + + // Every segment sans the active segment should have a + // clean_compact_timestamp set, since we fully indexed all of them. + int num_clean_before = 0; + for (const auto& seg : log->segments()) { + if (seg->index().has_clean_compact_timestamp()) { + ++num_clean_before; + } + } + ASSERT_EQ(num_clean_before, total_segments); + + // Compacting again won't attempt again since the segments are marked as + // compacted. + auto segments_compacted = log->get_probe().get_segments_compacted(); + do_sliding_window_compact(log); + auto segments_compacted_again = log->get_probe().get_segments_compacted(); + ASSERT_EQ(segments_compacted, segments_compacted_again); + + // Check that the clean_compact_timestamps got persisted in the index_state. + int num_clean_again = 0; + for (const auto& seg : log->segments()) { + if (seg->index().has_clean_compact_timestamp()) { + ++num_clean_again; + } + } + ASSERT_EQ(num_clean_again, num_clean_before); + + // Consume again after restarting and ensure our assertions about + // duplicates are still valid. + restart(should_wipe::no); + + wait_for_leader(ntp).get(); + partition = app.partition_manager.local().get(ntp).get(); + log = partition->log().get(); + + ASSERT_EQ(log->segment_count(), log_segment_count_before); + + // Check that the clean_compact_timestamps got persisted in the index_state, + // even after a restart. + int num_clean_after = 0; + for (const auto& seg : log->segments()) { + if (seg->index().has_clean_compact_timestamp()) { + ++num_clean_after; + } + } + ASSERT_EQ(num_clean_after, num_clean_before); + + // Sleep for tombstone.retention.ms time, so that the next time we attempt + // to compact the tombstone record will be eligible for deletion. + ss::sleep(tombstone_retention_ms).get(); + + // Flush and force roll the log, so that sliding window compaction can + // occur. + log->flush().get(); + log->force_roll(ss::default_priority_class()).get(); + do_sliding_window_compact(log); + + { + tests::kafka_consume_transport consumer(make_kafka_client().get()); + consumer.start().get(); + auto consumed_kvs = consumer + .consume_from_partition( + topic_name, + model::partition_id(0), + model::offset(0)) + .get(); + + // Assert there is no tombstone record left. + auto find_tombstone_record = [](const auto& kv) { + return kv.val.empty(); + }; + + auto tombstone_it = std::find_if( + consumed_kvs.begin(), consumed_kvs.end(), find_tombstone_record); + + // The tombstone should have been removed after second round of + // compaction post tombstone.retention.ms. + ASSERT_EQ(tombstone_it, consumed_kvs.end()); + + // Assert the "key0" value which was used for a tombstone record is not + // present. Redundant, sanity check. + auto find_key0_value = [](const auto& kv) { return kv.key == "key0"; }; + + auto key0_it = std::find_if( + consumed_kvs.begin(), consumed_kvs.end(), find_key0_value); + + ASSERT_EQ(key0_it, consumed_kvs.end()); + } +} + +class CompactionFixtureTombstonesParamTest + : public CompactionFixtureTest + , public ::testing::WithParamInterface {}; + +TEST_P(CompactionFixtureTombstonesParamTest, TestTombstonesCompletelyEmptyLog) { + const auto num_segments = GetParam(); + + // Generate 1 tombstone records for each segment. + const auto num_tombstones = num_segments; + const auto batches_per_segment = 1; + const auto tombstones_per_batch = num_tombstones / batches_per_segment; + + generate_tombstones( + num_segments, num_tombstones, batches_per_segment, tombstones_per_batch) + .get(); + + auto tombstone_retention_ms = 1000ms; + + // Helper function to perform sliding window compaction. + auto do_sliding_window_compact = [tombstone_retention_ms](auto& log) { + // Compact, allowing the map to grow as large as we need. + ss::abort_source never_abort; + storage::compaction_config cfg( + log->segments().back()->offsets().get_base_offset(), + tombstone_retention_ms, + ss::default_priority_class(), + never_abort, + std::nullopt, + std::nullopt, + nullptr, + nullptr); + auto& disk_log = dynamic_cast(*log); + disk_log.sliding_window_compact(cfg).get(); + }; + + // Perform first round of sliding window compaction. + do_sliding_window_compact(log); + + { + tests::kafka_consume_transport consumer(make_kafka_client().get()); + consumer.start().get(); + auto consumed_kvs = consumer + .consume_from_partition( + topic_name, + model::partition_id(0), + model::offset(0)) + .get(); + + ASSERT_NO_FATAL_FAILURE(); + + // Sanity check the number of records consumed. + ASSERT_EQ(consumed_kvs.size(), num_tombstones); + + // Every record should be a tombstone without a value. + for (const auto& kv : consumed_kvs) { + ASSERT_EQ(kv.val, ""); + } + } + + // Sleep for tombstone.retention.ms time, so that the next time we attempt + // to compact the tombstone records will be eligible for deletion. + ss::sleep(tombstone_retention_ms).get(); + + // Generate one record, so that sliding window compaction can occur. + generate_data(1, 200, 1, 1, 100).get(); + do_sliding_window_compact(log); + + { + tests::kafka_consume_transport consumer(make_kafka_client().get()); + consumer.start().get(); + auto consumed_kvs = consumer + .consume_from_partition( + topic_name, + model::partition_id(0), + model::offset(0)) + .get(); + + // Assert there is no tombstone record left. + auto find_tombstone_record = [](const auto& kv) { + return kv.val.empty(); + }; + + auto tombstone_it = std::find_if( + consumed_kvs.begin(), consumed_kvs.end(), find_tombstone_record); + + // The tombstones should have been removed after second round of + // compaction post tombstone.retention.ms. + ASSERT_EQ(tombstone_it, consumed_kvs.end()); + + // Assert that there was only one k-v record left in the log (the one + // generated to allow for sliding window compaction to occur). + ASSERT_EQ(consumed_kvs.size(), 1); + ASSERT_EQ(consumed_kvs[0].key, "key100"); + ASSERT_EQ(consumed_kvs[0].val, "val100"); + } +} + +INSTANTIATE_TEST_SUITE_P( + NumSegments, + CompactionFixtureTombstonesParamTest, + ::testing::Values(1, 10, 100)); + +struct TombstonesRandomParamArgs { + static TombstonesRandomParamArgs create() { + static constexpr size_t max_segments = 100; + static constexpr size_t max_records = 1000; + static constexpr size_t max_cardinality = max_records; + static constexpr size_t max_batches_per_segment = 5; + return TombstonesRandomParamArgs{ + .num_segments = random_generators::get_int(size_t{1}, max_segments), + .total_records = random_generators::get_int(size_t{1}, max_records), + .cardinality = random_generators::get_int(size_t{1}, max_cardinality), + .batches_per_segment = random_generators::get_int( + size_t{1}, max_batches_per_segment)}; + } + + size_t num_segments; + size_t total_records; + size_t cardinality; + size_t batches_per_segment; +}; + +class CompactionFixtureTombstonesRandomParamTest + : public CompactionFixtureTest + , public ::testing::WithParamInterface< + std::tuple> {}; + +TEST_P( + CompactionFixtureTombstonesRandomParamTest, + TestTombstonesRandomDistribution) { + const auto [data_args, wait_for_retention_ms] = GetParam(); + const auto [num_segments, total_records, cardinality, batches_per_segment] + = data_args; + + const auto num_batches = num_segments * batches_per_segment; + // May not divide evenly. + const auto records_per_batch = std::max( + size_t{1}, total_records / num_batches); + + map_t latest_kv_map; + generate_random_assorted_data( + num_segments, + cardinality, + batches_per_segment, + records_per_batch, + true, + &latest_kv_map) + .get(); + + auto num_tombstones_produced = std::accumulate( + latest_kv_map.begin(), + latest_kv_map.end(), + 0, + [](size_t acc, const auto& p) { return acc + size_t{p.second.empty()}; }); + + auto num_records_produced = latest_kv_map.size() - num_tombstones_produced; + + auto tombstone_retention_ms = wait_for_retention_ms ? 1000ms : 86400000ms; + + // Helper function to perform sliding window compaction. + auto do_sliding_window_compact = [tombstone_retention_ms](auto& log) { + // Compact, allowing the map to grow as large as we need. + ss::abort_source never_abort; + storage::compaction_config cfg( + log->segments().back()->offsets().get_base_offset(), + tombstone_retention_ms, + ss::default_priority_class(), + never_abort, + std::nullopt, + std::nullopt, + nullptr, + nullptr); + auto& disk_log = dynamic_cast(*log); + disk_log.sliding_window_compact(cfg).get(); + }; + + // Perform first round of sliding window compaction. + do_sliding_window_compact(log); + + { + tests::kafka_consume_transport consumer(make_kafka_client().get()); + consumer.start().get(); + auto consumed_kvs = consumer + .consume_from_partition( + topic_name, + model::partition_id(0), + model::offset(0)) + .get(); + ASSERT_NO_FATAL_FAILURE(); + + ASSERT_EQ(consumed_kvs.size(), latest_kv_map.size()); + + // Assert the key consumed is in the latest_kv_map. + for (const auto& kv : consumed_kvs) { + ASSERT_TRUE(latest_kv_map.contains(kv.key)); + ASSERT_EQ(kv.val, latest_kv_map[kv.key]); + } + } + + // Maybe sleep for tombstone.retention.ms time, so that the next time we + // attempt to compact the tombstone records will be eligible for deletion. + if (wait_for_retention_ms) { + ss::sleep(tombstone_retention_ms).get(); + } + + // Generate one record, so that sliding window compaction can occur. + const auto num_extra_records = 1; + generate_data( + 1, cardinality * 2, 1, 1, cardinality + 1, false, &latest_kv_map) + .get(); + do_sliding_window_compact(log); + + { + tests::kafka_consume_transport consumer(make_kafka_client().get()); + consumer.start().get(); + auto consumed_kvs = consumer + .consume_from_partition( + topic_name, + model::partition_id(0), + model::offset(0)) + .get(); + + // Assert the key consumed is in the latest_kv_map. + for (const auto& kv : consumed_kvs) { + ASSERT_TRUE(latest_kv_map.contains(kv.key)); + ASSERT_EQ(kv.val, latest_kv_map[kv.key]); + } + + if (wait_for_retention_ms) { + // Assert there is no tombstone record left. + auto find_tombstone_record = [](const auto& kv) { + return kv.val.empty(); + }; + + auto tombstone_it = std::find_if( + consumed_kvs.begin(), consumed_kvs.end(), find_tombstone_record); + + // The tombstones should have been removed after second round of + // compaction post tombstone.retention.ms. + ASSERT_EQ(tombstone_it, consumed_kvs.end()); + ASSERT_EQ( + consumed_kvs.size(), num_records_produced + num_extra_records); + } else { + ASSERT_EQ( + consumed_kvs.size(), + num_tombstones_produced + num_records_produced + + num_extra_records); + } + } +} + +INSTANTIATE_TEST_SUITE_P( + RandomDistribution, + CompactionFixtureTombstonesRandomParamTest, + ::testing::Combine( + ::testing::Values( + TombstonesRandomParamArgs::create(), + TombstonesRandomParamArgs::create(), + TombstonesRandomParamArgs::create()), + ::testing::Bool())); + // Test where the key space doesn't fit in the offset map, forcing multiple // compactions. TEST_F(CompactionFixtureTest, TestDedupeMultiPass) { @@ -239,6 +725,7 @@ TEST_F(CompactionFixtureTest, TestDedupeMultiPass) { auto& disk_log = dynamic_cast(*log); storage::compaction_config cfg( disk_log.segments().back()->offsets().get_base_offset(), + std::nullopt, ss::default_priority_class(), never_abort, std::nullopt, @@ -281,6 +768,7 @@ TEST_P(CompactionFixtureBatchSizeParamTest, TestRecompactWithNewData) { auto& disk_log = dynamic_cast(*log); storage::compaction_config cfg( disk_log.segments().back()->offsets().get_base_offset(), + std::nullopt, ss::default_priority_class(), never_abort, std::nullopt, @@ -300,6 +788,7 @@ TEST_P(CompactionFixtureBatchSizeParamTest, TestRecompactWithNewData) { generate_data(1, cardinality, records_per_segment).get(); storage::compaction_config new_cfg( disk_log.segments().back()->offsets().get_base_offset(), + std::nullopt, ss::default_priority_class(), never_abort, std::nullopt, @@ -348,6 +837,7 @@ TEST_F(CompactionFixtureTest, TestCompactWithNonDataBatches) { = disk_log.get_probe().get_segments_compacted(); storage::compaction_config new_cfg( disk_log.segments().back()->offsets().get_base_offset(), + std::nullopt, ss::default_priority_class(), never_abort, std::nullopt); @@ -435,6 +925,7 @@ TEST_P(CompactionFilledReaderTest, ReadFilledGaps) { // when reading. storage::compaction_config cfg( disk_log.segments().back()->offsets().get_base_offset(), + std::nullopt, ss::default_priority_class(), never_abort, std::nullopt, @@ -493,6 +984,7 @@ TEST_F(CompactionFixtureTest, TestReadFilledGapsWithTerms) { storage::compaction_config cfg( disk_log.segments().back()->offsets().get_base_offset(), + std::nullopt, ss::default_priority_class(), never_abort, std::nullopt, diff --git a/src/v/storage/tests/log_truncate_test.cc b/src/v/storage/tests/log_truncate_test.cc index b412ee3b87a4..9a0d8b9a632a 100644 --- a/src/v/storage/tests/log_truncate_test.cc +++ b/src/v/storage/tests/log_truncate_test.cc @@ -333,7 +333,7 @@ FIXTURE_TEST(test_truncate_last_single_record_batch, storage_test_fixture) { } FIXTURE_TEST( - test_truncate_whole_log_when_logs_are_garbadge_collected, + test_truncate_whole_log_when_logs_are_garbage_collected, storage_test_fixture) { auto cfg = default_log_config(test_dir); storage::log_manager mgr = make_log_manager(cfg); @@ -364,6 +364,7 @@ FIXTURE_TEST( ts, std::nullopt, model::offset::max(), + std::nullopt, ss::default_priority_class(), as)) .get0(); @@ -545,6 +546,7 @@ FIXTURE_TEST(test_concurrent_prefix_truncate_and_gc, storage_test_fixture) { ts, std::nullopt, model::offset::max(), + std::nullopt, ss::default_priority_class(), as)); @@ -591,7 +593,7 @@ FIXTURE_TEST(test_concurrent_truncate_and_compaction, storage_test_fixture) { // leaving room for further windowed compaction. ss::abort_source as; compaction_config compaction_cfg( - model::offset::max(), ss::default_priority_class(), as); + model::offset::max(), std::nullopt, ss::default_priority_class(), as); auto& disk_log = *dynamic_cast(log.get()); disk_log.adjacent_merge_compact(compaction_cfg).get(); disk_log.adjacent_merge_compact(compaction_cfg).get(); @@ -608,7 +610,12 @@ FIXTURE_TEST(test_concurrent_truncate_and_compaction, storage_test_fixture) { auto ts = now(); auto sleep_ms1 = random_generators::get_int(0, 100); housekeeping_config housekeeping_cfg( - ts, std::nullopt, model::offset::max(), ss::default_priority_class(), as); + ts, + std::nullopt, + model::offset::max(), + std::nullopt, + ss::default_priority_class(), + as); auto f1 = ss::sleep(sleep_ms1 * 1ms).then([&] { return log->housekeeping(housekeeping_cfg); }); diff --git a/src/v/storage/tests/segment_deduplication_test.cc b/src/v/storage/tests/segment_deduplication_test.cc index c56de043ad6d..59a80d3264b1 100644 --- a/src/v/storage/tests/segment_deduplication_test.cc +++ b/src/v/storage/tests/segment_deduplication_test.cc @@ -76,7 +76,10 @@ TEST(FindSlidingRangeTest, TestCollectSegments) { for (int start = 0; start < 30; start += 5) { for (int end = start; end < 30; end += 5) { compaction_config cfg( - model::offset{end}, ss::default_priority_class(), never_abort); + model::offset{end}, + std::nullopt, + ss::default_priority_class(), + never_abort); auto segs = disk_log.find_sliding_range(cfg, model::offset{start}); if (end - start < 10) { // If the compactible range isn't a full segment, we can't @@ -98,7 +101,10 @@ TEST(FindSlidingRangeTest, TestCollectExcludesPrevious) { auto cleanup = ss::defer([&] { b.stop().get(); }); auto& disk_log = b.get_disk_log_impl(); compaction_config cfg( - model::offset{30}, ss::default_priority_class(), never_abort); + model::offset{30}, + std::nullopt, + ss::default_priority_class(), + never_abort); auto segs = disk_log.find_sliding_range(cfg); ASSERT_EQ(3, segs.size()); ASSERT_EQ(segs.front()->offsets().get_base_offset(), model::offset{0}); @@ -116,8 +122,6 @@ TEST(FindSlidingRangeTest, TestCollectExcludesPrevious) { ASSERT_EQ(segs.front()->offsets().get_base_offset(), model::offset{0}); } -// Even though segments with one record would be skipped over during -// compaction, that shouldn't be reflected by the sliding range. TEST(FindSlidingRangeTest, TestCollectOneRecordSegments) { storage::disk_log_builder b; build_segments( @@ -129,14 +133,16 @@ TEST(FindSlidingRangeTest, TestCollectOneRecordSegments) { auto cleanup = ss::defer([&] { b.stop().get(); }); auto& disk_log = b.get_disk_log_impl(); compaction_config cfg( - model::offset{30}, ss::default_priority_class(), never_abort); + model::offset{30}, + std::nullopt, + ss::default_priority_class(), + never_abort); auto segs = disk_log.find_sliding_range(cfg); - // Even though these segments don't have compactible records, they should - // be collected. E.g., they should still be self compacted to rebuild - // indexes if necessary, etc. ASSERT_EQ(5, segs.size()); + + // These segments are considered to have compactible records. for (const auto& seg : segs) { - ASSERT_FALSE(seg->may_have_compactible_records()); + ASSERT_TRUE(seg->may_have_compactible_records()); } // Add some segments with multiple records. They should be eligible for @@ -149,11 +155,8 @@ TEST(FindSlidingRangeTest, TestCollectOneRecordSegments) { /*mark_compacted=*/false); segs = disk_log.find_sliding_range(cfg); ASSERT_EQ(8, segs.size()); - int i = 0; for (const auto& seg : segs) { - bool should_have_records = i >= 5; - ASSERT_EQ(should_have_records, seg->may_have_compactible_records()); - i++; + ASSERT_TRUE(seg->may_have_compactible_records()); } } @@ -164,7 +167,10 @@ TEST(BuildOffsetMap, TestBuildSimpleMap) { auto& disk_log = b.get_disk_log_impl(); auto& segs = disk_log.segments(); compaction_config cfg( - model::offset{30}, ss::default_priority_class(), never_abort); + model::offset{30}, + std::nullopt, + ss::default_priority_class(), + never_abort); probe pb; feature_table.start().get(); @@ -237,7 +243,10 @@ TEST(BuildOffsetMap, TestBuildMapWithMissingCompactedIndex) { auto& disk_log = b.get_disk_log_impl(); auto& segs = disk_log.segments(); compaction_config cfg( - model::offset{30}, ss::default_priority_class(), never_abort); + model::offset{30}, + std::nullopt, + ss::default_priority_class(), + never_abort); for (const auto& s : segs) { auto idx_path = s->path().to_compacted_index(); ASSERT_FALSE(ss::file_exists(idx_path.string()).get()); @@ -279,7 +288,10 @@ TEST(DeduplicateSegmentsTest, TestBadReader) { // Build an offset map for our log. compaction_config cfg( - model::offset{0}, ss::default_priority_class(), never_abort); + model::offset{0}, + std::nullopt, + ss::default_priority_class(), + never_abort); simple_key_offset_map all_segs_map(50); auto map_start_offset = build_offset_map( cfg, diff --git a/src/v/storage/tests/storage_e2e_fixture_test.cc b/src/v/storage/tests/storage_e2e_fixture_test.cc index 6a5c5c61944c..6c3a8c38ba17 100644 --- a/src/v/storage/tests/storage_e2e_fixture_test.cc +++ b/src/v/storage/tests/storage_e2e_fixture_test.cc @@ -108,6 +108,7 @@ FIXTURE_TEST(test_concurrent_log_eviction_and_append, storage_e2e_fixture) { /*gc_upper=*/model::timestamp::max(), /*max_bytes_in_log=*/1, /*max_collect_offset=*/model::offset::min(), + /*tombstone_retention_ms=*/std::nullopt, ss::default_priority_class(), as); diff --git a/src/v/storage/tests/storage_e2e_test.cc b/src/v/storage/tests/storage_e2e_test.cc index 84cb07e5f2d4..209daa010f59 100644 --- a/src/v/storage/tests/storage_e2e_test.cc +++ b/src/v/storage/tests/storage_e2e_test.cc @@ -674,6 +674,7 @@ FIXTURE_TEST(test_time_based_eviction, storage_test_fixture) { model::to_timestamp(broker_t0 - (2 * broker_ts_sep)), std::nullopt, model::offset::min(), // should prevent compaction + std::nullopt, ss::default_priority_class(), as); auto before = disk_log->offsets(); @@ -688,6 +689,7 @@ FIXTURE_TEST(test_time_based_eviction, storage_test_fixture) { model::to_timestamp(timestamp), std::nullopt, model::offset::max(), + std::nullopt, ss::default_priority_class(), as); }; @@ -761,6 +763,7 @@ FIXTURE_TEST(test_size_based_eviction, storage_test_fixture) { model::timestamp::min(), total_size + first_size, model::offset::max(), + std::nullopt, ss::default_priority_class(), as); compact_and_prefix_truncate(*disk_log, ccfg_no_compact); @@ -790,6 +793,7 @@ FIXTURE_TEST(test_size_based_eviction, storage_test_fixture) { model::timestamp::min(), max_size, model::offset::max(), + std::nullopt, ss::default_priority_class(), as); compact_and_prefix_truncate(*disk_log, ccfg); @@ -860,6 +864,7 @@ FIXTURE_TEST(test_eviction_notification, storage_test_fixture) { gc_ts, std::nullopt, model::offset::max(), + std::nullopt, ss::default_priority_class(), as); @@ -969,6 +974,7 @@ FIXTURE_TEST(write_concurrently_with_gc, storage_test_fixture) { model::timestamp::min(), 1000, model::offset::max(), + std::nullopt, ss::default_priority_class(), as); return log->housekeeping(ccfg); @@ -1150,6 +1156,7 @@ FIXTURE_TEST(test_compation_preserve_state, storage_test_fixture) { model::timestamp::min(), 1, model::offset::max(), + std::nullopt, ss::default_priority_class(), as); auto log = mgr.manage(std::move(ntp_cfg)).get0(); @@ -1311,6 +1318,7 @@ FIXTURE_TEST(compacted_log_truncation, storage_test_fixture) { model::timestamp::min(), std::nullopt, model::offset::max(), + std::nullopt, ss::default_priority_class(), as); log->flush().get0(); @@ -1376,6 +1384,7 @@ FIXTURE_TEST( model::timestamp::min(), std::nullopt, model::offset::max(), + std::nullopt, ss::default_priority_class(), as); log->flush().get0(); @@ -1562,6 +1571,7 @@ FIXTURE_TEST(partition_size_while_cleanup, storage_test_fixture) { model::timestamp::min(), 50_KiB, model::offset::max(), + std::nullopt, ss::default_priority_class(), as); @@ -1681,6 +1691,7 @@ FIXTURE_TEST(adjacent_segment_compaction, storage_test_fixture) { model::timestamp::min(), std::nullopt, model::offset::max(), + std::nullopt, ss::default_priority_class(), as); @@ -1749,6 +1760,7 @@ FIXTURE_TEST(adjacent_segment_compaction_terms, storage_test_fixture) { model::timestamp::min(), std::nullopt, model::offset::max(), + std::nullopt, ss::default_priority_class(), as); @@ -1823,6 +1835,7 @@ FIXTURE_TEST(max_adjacent_segment_compaction, storage_test_fixture) { model::timestamp::min(), std::nullopt, model::offset::max(), + std::nullopt, ss::default_priority_class(), as); @@ -2020,6 +2033,7 @@ FIXTURE_TEST(compaction_backlog_calculation, storage_test_fixture) { model::timestamp::min(), std::nullopt, model::offset::max(), + std::nullopt, ss::default_priority_class(), as); /** @@ -2354,6 +2368,7 @@ FIXTURE_TEST(changing_cleanup_policy_back_and_forth, storage_test_fixture) { model::timestamp::min(), std::nullopt, model::offset::max(), + std::nullopt, ss::default_priority_class(), as); @@ -2595,6 +2610,7 @@ FIXTURE_TEST(test_compacting_batches_of_different_types, storage_test_fixture) { model::timestamp::min(), std::nullopt, model::offset::max(), + std::nullopt, ss::default_priority_class(), as); auto before_compaction = compact_in_memory(log); @@ -2836,6 +2852,7 @@ FIXTURE_TEST(write_truncate_compact, storage_test_fixture) { model::timestamp::min(), std::nullopt, model::offset::max(), + std::nullopt, ss::default_priority_class(), as)) .handle_exception_type( @@ -2927,6 +2944,7 @@ FIXTURE_TEST(compaction_truncation_corner_cases, storage_test_fixture) { model::timestamp::min(), std::nullopt, model::offset::max(), + std::nullopt, ss::default_priority_class(), as)) .get(); @@ -3060,6 +3078,7 @@ FIXTURE_TEST(test_max_compact_offset, storage_test_fixture) { model::timestamp::max(), // no time-based deletion std::nullopt, max_compact_offset, + std::nullopt, ss::default_priority_class(), as); log->housekeeping(ccfg).get0(); @@ -3124,6 +3143,7 @@ FIXTURE_TEST(test_self_compaction_while_reader_is_open, storage_test_fixture) { model::timestamp::max(), // no time-based deletion std::nullopt, model::offset::max(), + std::nullopt, ss::default_priority_class(), as); auto& segment = *(disk_log->segments().begin()); @@ -3178,6 +3198,7 @@ FIXTURE_TEST(test_simple_compaction_rebuild_index, storage_test_fixture) { model::timestamp::min(), std::nullopt, model::offset::max(), + std::nullopt, ss::default_priority_class(), as); @@ -3196,6 +3217,8 @@ struct compact_test_args { long msg_per_segment; long segments; long expected_compacted_segments; + long num_expected_gaps; + bool keys_use_segment_id; }; static void @@ -3218,36 +3241,42 @@ do_compact_test(const compact_test_args args, storage_test_fixture& f) { auto log = mgr.manage(std::move(ntp_cfg)).get0(); auto disk_log = log; - auto append_batch = - [](ss::shared_ptr log, model::term_id term) { - iobuf key = bytes_to_iobuf(bytes("key")); - iobuf value = random_generators::make_iobuf(100); + auto append_batch = []( + ss::shared_ptr log, + model::term_id term, + std::optional segment_id = std::nullopt) { + const auto key_str = ssx::sformat("key{}", segment_id.value_or(-1)); + iobuf key = bytes_to_iobuf(bytes(key_str.data())); + iobuf value = random_generators::make_iobuf(100); - storage::record_batch_builder builder( - model::record_batch_type::raft_data, model::offset(0)); + storage::record_batch_builder builder( + model::record_batch_type::raft_data, model::offset(0)); - builder.add_raw_kv(key.copy(), value.copy()); + builder.add_raw_kv(key.copy(), value.copy()); - auto batch = std::move(builder).build(); + auto batch = std::move(builder).build(); - batch.set_term(term); - batch.header().first_timestamp = model::timestamp::now(); - auto reader = model::make_memory_record_batch_reader( - {std::move(batch)}); - storage::log_append_config cfg{ - .should_fsync = storage::log_append_config::fsync::no, - .io_priority = ss::default_priority_class(), - .timeout = model::no_timeout, - }; + batch.set_term(term); + batch.header().first_timestamp = model::timestamp::now(); + auto reader = model::make_memory_record_batch_reader( + {std::move(batch)}); + storage::log_append_config cfg{ + .should_fsync = storage::log_append_config::fsync::no, + .io_priority = ss::default_priority_class(), + .timeout = model::no_timeout, + }; - std::move(reader) - .for_each_ref(log->make_appender(cfg), cfg.timeout) - .get(); - }; + std::move(reader) + .for_each_ref(log->make_appender(cfg), cfg.timeout) + .get(); + }; for (int s = 0; s < args.segments; s++) { + std::optional key = args.keys_use_segment_id + ? s + : std::optional{}; for (int i = 0; i < args.msg_per_segment; i++) { - append_batch(log, model::term_id(0)); + append_batch(log, model::term_id(0), key); } disk_log->force_roll(ss::default_priority_class()).get0(); } @@ -3266,6 +3295,7 @@ do_compact_test(const compact_test_args args, storage_test_fixture& f) { model::timestamp::max(), // no time-based deletion std::nullopt, model::offset(args.max_compact_offs), + std::nullopt, ss::default_priority_class(), as); log->housekeeping(ccfg).get0(); @@ -3275,9 +3305,11 @@ do_compact_test(const compact_test_args args, storage_test_fixture& f) { BOOST_REQUIRE_EQUAL( final_stats.committed_offset, args.segments * args.msg_per_segment); - // we used the same key for all messages, so we should have one huge gap at - // the beginning of each compacted segment - BOOST_REQUIRE_EQUAL(final_gaps.num_gaps, args.expected_compacted_segments); + // If we used keys with segment IDs for records in each segment, we + // should have one huge gap at the beginning of each compacted segment. + // If we used the same key for each record, we should only expect one gap + // after compaction runs across the entire window of segments. + BOOST_REQUIRE_EQUAL(final_gaps.num_gaps, args.num_expected_gaps); BOOST_REQUIRE_EQUAL(final_gaps.first_gap_start, model::offset(0)); // If adjacent segment compaction worked in order from oldest to newest, we @@ -3303,7 +3335,9 @@ FIXTURE_TEST(test_max_compact_offset_mid_segment, storage_test_fixture) { .num_compactable_msg = 100, .msg_per_segment = 100, .segments = 3, - .expected_compacted_segments = 1}, + .expected_compacted_segments = 1, + .num_expected_gaps = 1, + .keys_use_segment_id = false}, *this); } @@ -3316,7 +3350,25 @@ FIXTURE_TEST(test_max_compact_offset_unset, storage_test_fixture) { .num_compactable_msg = 200, .msg_per_segment = 100, .segments = 3, - .expected_compacted_segments = 3}, + .expected_compacted_segments = 3, + .num_expected_gaps = 1, + .keys_use_segment_id = false}, + *this); +} + +FIXTURE_TEST( + test_max_compact_offset_unset_use_segment_ids, storage_test_fixture) { + // Use segment IDs for keys, thereby preventing compaction from reducing + // down to just one record in the last segment (each segment will have 1, + // unique record) + do_compact_test( + {.max_compact_offs = model::offset::max(), + .num_compactable_msg = 200, + .msg_per_segment = 100, + .segments = 3, + .expected_compacted_segments = 3, + .num_expected_gaps = 3, + .keys_use_segment_id = true}, *this); } @@ -3515,6 +3567,7 @@ FIXTURE_TEST(test_bytes_eviction_overrides, storage_test_fixture) { model::timestamp::min(), cfg.retention_bytes(), model::offset::max(), + std::nullopt, ss::default_priority_class(), as)); @@ -3699,6 +3752,7 @@ FIXTURE_TEST(test_skipping_compaction_below_start_offset, log_builder_fixture) { model::timestamp::max(), 1, model::offset::max(), + std::nullopt, ss::default_priority_class(), abs}; @@ -4154,6 +4208,7 @@ FIXTURE_TEST(test_offset_range_size_compacted, storage_test_fixture) { model::timestamp::min(), std::nullopt, log->offsets().committed_offset, + std::nullopt, ss::default_priority_class(), as); log->housekeeping(h_cfg).get(); @@ -4291,10 +4346,10 @@ FIXTURE_TEST(test_offset_range_size_compacted, storage_test_fixture) { FIXTURE_TEST(test_offset_range_size2_compacted, storage_test_fixture) { #ifdef NDEBUG - size_t num_test_cases = 5000; + size_t num_test_cases = 1000; size_t num_segments = 300; #else - size_t num_test_cases = 500; + size_t num_test_cases = 100; size_t num_segments = 30; #endif // This test generates 300 segments and creates a record batch map. @@ -4357,6 +4412,7 @@ FIXTURE_TEST(test_offset_range_size2_compacted, storage_test_fixture) { model::timestamp::min(), std::nullopt, log->offsets().committed_offset, + std::nullopt, ss::default_priority_class(), as); log->housekeeping(h_cfg).get(); diff --git a/src/v/storage/tests/utils/disk_log_builder.cc b/src/v/storage/tests/utils/disk_log_builder.cc index 1c6b31d28efe..80802bc4de9d 100644 --- a/src/v/storage/tests/utils/disk_log_builder.cc +++ b/src/v/storage/tests/utils/disk_log_builder.cc @@ -119,7 +119,8 @@ ss::future<> disk_log_builder::truncate(model::offset o) { ss::future<> disk_log_builder::gc( model::timestamp collection_upper_bound, - std::optional max_partition_retention_size) { + std::optional max_partition_retention_size, + std::optional tombstone_retention_ms) { ss::abort_source as; auto eviction_future = get_log()->monitor_eviction(as); @@ -128,6 +129,7 @@ ss::future<> disk_log_builder::gc( collection_upper_bound, max_partition_retention_size, model::offset::max(), + tombstone_retention_ms, ss::default_priority_class(), _abort_source)) .get(); diff --git a/src/v/storage/tests/utils/disk_log_builder.h b/src/v/storage/tests/utils/disk_log_builder.h index 3261fd2a432d..40ff91da593c 100644 --- a/src/v/storage/tests/utils/disk_log_builder.h +++ b/src/v/storage/tests/utils/disk_log_builder.h @@ -304,7 +304,9 @@ class disk_log_builder { ss::future<> truncate(model::offset); ss::future<> gc( model::timestamp collection_upper_bound, - std::optional max_partition_retention_size); + std::optional max_partition_retention_size, + std::optional tombstone_retention_ms + = std::nullopt); ss::future disk_usage( model::timestamp collection_upper_bound, std::optional max_partition_retention_size); diff --git a/src/v/storage/types.cc b/src/v/storage/types.cc index e4d2c0e8345f..62ca0ef953c2 100644 --- a/src/v/storage/types.cc +++ b/src/v/storage/types.cc @@ -197,9 +197,11 @@ std::ostream& operator<<(std::ostream& o, const compaction_config& c) { fmt::print( o, "{{max_collectible_offset:{}, " - "should_sanitize:{}}}", + "should_sanitize:{}, " + "tombstone_retention_ms:{}}}", c.max_collectible_offset, - c.sanitizer_config); + c.sanitizer_config, + c.tombstone_retention_ms); return o; } diff --git a/src/v/storage/types.h b/src/v/storage/types.h index bc69914704f5..0824f957abd4 100644 --- a/src/v/storage/types.h +++ b/src/v/storage/types.h @@ -465,6 +465,7 @@ struct gc_config { struct compaction_config { compaction_config( model::offset max_collect_offset, + std::optional tombstone_ret_ms, ss::io_priority_class p, ss::abort_source& as, std::optional san_cfg = std::nullopt, @@ -472,6 +473,7 @@ struct compaction_config { hash_key_offset_map* key_map = nullptr, scoped_file_tracker::set_t* to_clean = nullptr) : max_collectible_offset(max_collect_offset) + , tombstone_retention_ms(tombstone_ret_ms) , iopc(p) , sanitizer_config(std::move(san_cfg)) , key_offset_map_max_keys(max_keys) @@ -482,6 +484,18 @@ struct compaction_config { // Cannot delete or compact past this offset (i.e. for unresolved txn // records): that is, only offsets <= this may be compacted. model::offset max_collectible_offset; + + // The retention time for tombstones. Tombstone removal occurs only for + // "clean" compacted segments past the tombstone deletion horizon timestamp, + // which is a segment's clean_compact_timestamp + tombstone_retention_ms. + // This means tombstones are only removed after the second time they are + // "seen" by compaction. + // + // Tombstone removal is only supported for topics with remote writes + // disabled. As a result, this field will only have a value for compaction + // ran on non-archival topics. + std::optional tombstone_retention_ms; + // priority for all IO in compaction ss::io_priority_class iopc; // use proxy fileops with assertions and/or failure injection @@ -514,12 +528,19 @@ struct housekeeping_config { model::timestamp upper, std::optional max_bytes_in_log, model::offset max_collect_offset, + std::optional tombstone_retention_ms, ss::io_priority_class p, ss::abort_source& as, std::optional san_cfg = std::nullopt, hash_key_offset_map* key_map = nullptr) : compact( - max_collect_offset, p, as, std::move(san_cfg), std::nullopt, key_map) + max_collect_offset, + tombstone_retention_ms, + p, + as, + std::move(san_cfg), + std::nullopt, + key_map) , gc(upper, max_bytes_in_log) {} compaction_config compact; diff --git a/src/v/utils/tristate.h b/src/v/utils/tristate.h index 516ca3fe6791..99d8fb27fa19 100644 --- a/src/v/utils/tristate.h +++ b/src/v/utils/tristate.h @@ -121,7 +121,7 @@ class tristate { fmt::print(o, "{{{}}}", t.value()); return o; } - return o << "{}"; + return o << "{{nullopt}}"; }; std::optional& get_optional() { diff --git a/tests/rptest/clients/kafka_cli_tools.py b/tests/rptest/clients/kafka_cli_tools.py index 8ec3a6254719..0b86d8fa8d6a 100644 --- a/tests/rptest/clients/kafka_cli_tools.py +++ b/tests/rptest/clients/kafka_cli_tools.py @@ -242,7 +242,10 @@ def describe_topic(self, topic: str): assert configs is not None, "didn't find Configs: section" def maybe_int(key: str, value: str): - if key in ["retention_ms", "retention_bytes", 'segment_bytes']: + if key in [ + "retention_ms", "retention_bytes", 'segment_bytes', + 'tombstone_retention_ms' + ]: return int(value) return value diff --git a/tests/rptest/clients/types.py b/tests/rptest/clients/types.py index fabe49c6031e..15c9523f57e0 100644 --- a/tests/rptest/clients/types.py +++ b/tests/rptest/clients/types.py @@ -39,6 +39,7 @@ class TopicSpec: PROPERTY_WRITE_CACHING = "write.caching" PROPERTY_FLUSH_MS = "flush.ms" PROPERTY_FLUSH_BYTES = "flush.bytes" + PROPERTY_TOMBSTONE_RETENTION_MS = "tombstone.retention.ms" class CompressionTypes(str, Enum): """ @@ -120,7 +121,8 @@ def __init__( | None = None, initial_retention_local_target_bytes: int | None = None, initial_retention_local_target_ms: int | None = None, - virtual_cluster_id: str | None = None): + virtual_cluster_id: str | None = None, + tombstone_retention_ms: int | None = None): self.name = name or f"topic-{self._random_topic_suffix()}" self.partition_count = partition_count self.replication_factor = replication_factor @@ -146,6 +148,7 @@ def __init__( self.initial_retention_local_target_bytes = initial_retention_local_target_bytes self.initial_retention_local_target_ms = initial_retention_local_target_ms self.virtual_cluster_id = virtual_cluster_id + self.tombstone_retention_ms = tombstone_retention_ms def __str__(self): return self.name diff --git a/tests/rptest/tests/alter_topic_configuration_test.py b/tests/rptest/tests/alter_topic_configuration_test.py index e72fcd729dd7..d615211e5e04 100644 --- a/tests/rptest/tests/alter_topic_configuration_test.py +++ b/tests/rptest/tests/alter_topic_configuration_test.py @@ -46,6 +46,8 @@ def __init__(self, test_context): @parametrize(property=TopicSpec.PROPERTY_RETENTION_TIME, value=360000) @parametrize(property=TopicSpec.PROPERTY_TIMESTAMP_TYPE, value="LogAppendTime") + @parametrize(property=TopicSpec.PROPERTY_TOMBSTONE_RETENTION_MS, + value=123456789) def test_altering_topic_configuration(self, property, value): topic = self.topics[0].name self.client().alter_topic_configs(topic, {property: value}) @@ -66,7 +68,8 @@ def test_alter_config_does_not_change_replication_factor(self): kcl.raw_alter_topic_config( 1, topic, { TopicSpec.PROPERTY_RETENTION_TIME: 360000, - TopicSpec.PROPERTY_TIMESTAMP_TYPE: "LogAppendTime" + TopicSpec.PROPERTY_TIMESTAMP_TYPE: "LogAppendTime", + TopicSpec.PROPERTY_TOMBSTONE_RETENTION_MS: 1234567890 }) kafka_tools = KafkaCliTools(self.redpanda) spec = kafka_tools.describe_topic(topic) @@ -74,6 +77,7 @@ def test_alter_config_does_not_change_replication_factor(self): assert spec.replication_factor == 3 assert spec.retention_ms == 360000 assert spec.message_timestamp_type == "LogAppendTime" + assert spec.tombstone_retention_ms == 1234567890 @cluster(num_nodes=3) def test_altering_multiple_topic_configurations(self): @@ -83,13 +87,15 @@ def test_altering_multiple_topic_configurations(self): topic, { TopicSpec.PROPERTY_SEGMENT_SIZE: 1024 * 1024, TopicSpec.PROPERTY_RETENTION_TIME: 360000, - TopicSpec.PROPERTY_TIMESTAMP_TYPE: "LogAppendTime" + TopicSpec.PROPERTY_TIMESTAMP_TYPE: "LogAppendTime", + TopicSpec.PROPERTY_TOMBSTONE_RETENTION_MS: 1234567890 }) spec = kafka_tools.describe_topic(topic) assert spec.segment_bytes == 1024 * 1024 assert spec.retention_ms == 360000 assert spec.message_timestamp_type == "LogAppendTime" + assert spec.tombstone_retention_ms == 1234567890 def random_string(self, size): return ''.join( diff --git a/tests/rptest/tests/describe_topics_test.py b/tests/rptest/tests/describe_topics_test.py index 67a1405a4ad0..b161d35a6ccb 100644 --- a/tests/rptest/tests/describe_topics_test.py +++ b/tests/rptest/tests/describe_topics_test.py @@ -12,6 +12,7 @@ from rptest.services.cluster import cluster from rptest.tests.redpanda_test import RedpandaTest +from rptest.clients.rpk import RpkTool from rptest.clients.types import TopicSpec from rptest.clients.kafka_cli_tools import KafkaCliTools from rptest.clients.kcl import KCL @@ -246,6 +247,13 @@ def test_describe_topics_with_documentation_and_types(self): "Max not flushed bytes per partition. If configured threshold is reached " "log will automatically be flushed even though it wasn't explicitly " "requested"), + "tombstone.retention.ms": + ConfigProperty( + config_type="LONG", + value="-1", + doc_string= + "The retention time for tombstone records in a compacted topic", + source_type="DYNAMIC_TOPIC_CONFIG"), } tp_spec = TopicSpec() @@ -319,3 +327,56 @@ def test_describe_topics_with_documentation_and_types(self): prop = properties[name] assert doc_string == prop.doc_string assert len(empty_line) == 0 + + @cluster(num_nodes=3) + def test_describe_topics_sticky_properties(self): + """Sticky properties are those defined as using the cluster config + default value at topic creation time, but still indicate SOURCE + as `DEFAULT_CONFIG` instead of `DYNAMIC_TOPIC_CONFIG`. Furthermore, + they do not fall back on the cluster default at any point. + Currently, this trait is found in `redpanda.remote.read` and + `redpanda.remote.write`. + Historical context: https://github.com/redpanda-data/redpanda/issues/7451 + """ + rpk = RpkTool(self.redpanda) + + rpk.cluster_config_set('cloud_storage_enable_remote_read', False) + + # Create topic using the default value of `cloud_storage_enable_remote_read=false` + tp_spec = TopicSpec() + self.client().create_topic([tp_spec]) + + describe_topic_output = rpk.describe_topic_configs(tp_spec.name) + remote_read_output = describe_topic_output.get('redpanda.remote.read', + None) + assert remote_read_output is not None + + value, source = remote_read_output + + # Value is false due to cluster default at topic creation time. + assert value == 'false' + + # Source is `DEFAULT_CONFIG` thanks to our internal override. + assert source == 'DEFAULT_CONFIG' + + # Now, set the cluster default to true. + rpk.cluster_config_set('cloud_storage_enable_remote_read', True) + assert rpk.cluster_config_get( + 'cloud_storage_enable_remote_read') == 'true' + + # Alter the topic config with --delete. + rpk.delete_topic_config(tp_spec.name, 'redpanda.remote.read') + + # Describe the topic again. + describe_topic_output = rpk.describe_topic_configs(tp_spec.name) + remote_read_output = describe_topic_output.get('redpanda.remote.read', + None) + assert remote_read_output is not None + + value, source = remote_read_output + + # Assert that the reported value is still false, and we don't fall back to the cluster default. + assert value == 'false' + + # Source type is no longer DEFAULT_CONFIG. + assert source == 'DYNAMIC_TOPIC_CONFIG' diff --git a/tests/rptest/tests/tombstone_retention_ms_test.py b/tests/rptest/tests/tombstone_retention_ms_test.py new file mode 100644 index 000000000000..d75dc6c0ca26 --- /dev/null +++ b/tests/rptest/tests/tombstone_retention_ms_test.py @@ -0,0 +1,96 @@ +import typing + +from ducktape.mark import matrix, defaults +from ducktape.utils.util import wait_until + +from rptest.clients.rpk import RpkTool, RpkException +from rptest.clients.types import TopicSpec +from rptest.services.cluster import cluster +from rptest.services.kafka_cli_consumer import KafkaCliConsumer + +from rptest.services.verifiable_consumer import VerifiableConsumer +from rptest.services.verifiable_producer import VerifiableProducer +from rptest.tests.redpanda_test import RedpandaTest +from rptest.util import expect_exception + + +class TombstoneRetentionMsTest(RedpandaTest): + def __init__(self, ctx): + self.ctx = ctx + super().__init__(ctx) + + self.rpk = RpkTool(self.redpanda) + + @cluster(num_nodes=1) + def test_get_cluster_config(self): + # Default value + cluster_default = self.rpk.cluster_config_get('tombstone_retention_ms') + assert cluster_default == 'null' + + @cluster(num_nodes=1) + def test_set_cluster_config(self): + self.rpk.cluster_config_set('tombstone_retention_ms', 1234567890) + cluster_prop = self.rpk.cluster_config_get('tombstone_retention_ms') + assert cluster_prop == "1234567890" + + topic_name = "tapioca" + topic = TopicSpec(name=topic_name) + self.rpk.create_topic(topic_name, partitions=1) + topic_desc = self.rpk.describe_topic_configs(topic_name) + assert topic_desc['tombstone.retention.ms'][0] == '1234567890' + + @cluster(num_nodes=1) + def test_alter_topic_config(self): + topic_name = "tapioca" + topic = TopicSpec(name=topic_name) + self.rpk.create_topic(topic_name, partitions=1) + topic_desc = self.rpk.describe_topic_configs(topic_name) + + # Default value + assert topic_desc['tombstone.retention.ms'][0] == '-1' + + self.rpk.alter_topic_config(topic_name, 'tombstone.retention.ms', + 1234567890) + + topic_desc = self.rpk.describe_topic_configs(topic_name) + assert topic_desc['tombstone.retention.ms'][0] == '1234567890' + + @cluster(num_nodes=1) + @matrix(tombstone_retention_ms=[None, 1000], + cloud_storage_remote_read=[False, True], + cloud_storage_remote_write=[False, True]) + def test_create_topic_configurations(self, tombstone_retention_ms, + cloud_storage_remote_read, + cloud_storage_remote_write): + topic_name = "tapioca" + #We cannot enable tombstone_retention_ms if any cloud storage properties are also enabled. + is_valid = False if any([ + cloud_storage_remote_read, cloud_storage_remote_write + ]) and tombstone_retention_ms is not None else True + self.rpk.cluster_config_set('cloud_storage_enable_remote_read', + cloud_storage_remote_read) + self.rpk.cluster_config_set('cloud_storage_enable_remote_write', + cloud_storage_remote_write) + + topic = TopicSpec(name=topic_name) + config = {} + if tombstone_retention_ms is not None: + config = {'tombstone.retention.ms': tombstone_retention_ms} + else: + tombstone_retention_ms = -1 + + if is_valid: + # Expect a successful topic creation. + self.rpk.create_topic(topic_name, partitions=1, config=config) + topic_desc = self.rpk.describe_topic_configs(topic_name) + print(topic_desc['tombstone.retention.ms']) + assert topic_desc['tombstone.retention.ms'][0] == str( + tombstone_retention_ms) + assert is_valid == True + else: + # Expect a failure due to misconfiguration. + with expect_exception( + RpkException, lambda e: + 'Unsupported tombstone_retention_ms configuration, cannot be enabled at the same time as repanda.remote.read or redpanda.remote.write.' + in str(e)): + self.rpk.create_topic(topic_name, partitions=1, config=config) diff --git a/tools/offline_log_viewer/controller.py b/tools/offline_log_viewer/controller.py index baa35db8c97e..03ea91127388 100644 --- a/tools/offline_log_viewer/controller.py +++ b/tools/offline_log_viewer/controller.py @@ -127,6 +127,10 @@ def read_topic_properties_serde(rdr: Reader, version): 'remote_label': rdr.read_optional(read_remote_label_serde), 'topic_namespace_override': rdr.read_optional(read_topic_namespace) } + if version >= 10: + topic_properties |= { + 'tombstone_retention_ms': rdr.read_optional(Reader.read_int64) + } return topic_properties @@ -142,7 +146,7 @@ def read_topic_config(rdr: Reader, version): 'replication_factor': rdr.read_int16(), 'properties': - rdr.read_envelope(read_topic_properties_serde, max_version=9), + rdr.read_envelope(read_topic_properties_serde, max_version=10), } if version < 1: # see https://github.com/redpanda-data/redpanda/pull/6613