Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] storage: support tombstone deletion from local storage #23071

Draft
wants to merge 24 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
e1b3d79
`config`: mark `cloud_storage_enabled()` as `needs_restart::yes`
WillemKauf Sep 5, 2024
9a8c91b
`utils`: change `tristate` print format
WillemKauf Sep 5, 2024
f20c248
`kafka`: use `tristate::is_engaged()` in `add_topic_config()`
WillemKauf Sep 5, 2024
3f1da1b
`kafka`: make disabled `tristate` construction site explicit
WillemKauf Sep 5, 2024
9d6d9df
`kafka`: fix typo in function name
WillemKauf Sep 3, 2024
3d5f17f
`kafka`: fix `make_topic_configs()` for some "sticky" properties
WillemKauf Sep 6, 2024
738ceaa
`storage`: adjust `may_have_compactible_records()` condition
WillemKauf Aug 27, 2024
6dad83e
`storage`: add `tombstone_retention_ms` to `compaction_config`
WillemKauf Sep 6, 2024
1874ee5
`model`: add `is_tombstone()` and modify `has_value()`
WillemKauf Aug 27, 2024
70b7cd8
`storage`: add `index_state::clean_compact_timestamp`
WillemKauf Aug 23, 2024
1b949f6
`storage`: add getters/setters for `_state.clean_compact_timestamp`
WillemKauf Aug 23, 2024
25d6fc5
`storage`: add `mark_segment_as_finished_window_compaction()`
WillemKauf Aug 27, 2024
e6aab07
`storage`: add `should_remove_tombstone_record()`
WillemKauf Aug 28, 2024
26e19d3
`storage`: add `get_tombstone_delete_horizon()`
WillemKauf Aug 29, 2024
8854d59
`storage`: use tombstone removal utilities in `do_copy_segment_data()`
WillemKauf Aug 27, 2024
2ce58d3
`storage`: use tombstone removal utilities in `deduplicate_segment()`
WillemKauf Aug 27, 2024
fa30e27
`kafka`: add `produce_tombstones` flag to `produce_consume_utils`
WillemKauf Aug 27, 2024
754e608
`storage:` add tombstone related tests to `compaction_e2e_test`
WillemKauf Aug 27, 2024
394e231
`storage`: adjust number of test cases in `test_offset_range_size2_co…
WillemKauf Aug 28, 2024
ae0183d
`config`: add `tombstone_retention_ms`
WillemKauf Aug 27, 2024
e192a42
`cluster`: add `tombstone_retention_ms` to `topic_configuration`
WillemKauf Sep 6, 2024
92eb4a7
`cluster`: add `topic_multi_property_validation()`
WillemKauf Sep 4, 2024
ae53e99
`kafka`: add `tombstone_retention_ms` to `topic` config handler
WillemKauf Aug 28, 2024
a3788da
`rptest`: add `tombstone_retention_ms_test.py`
WillemKauf Aug 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/v/cloud_storage/tests/cloud_storage_e2e_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ TEST_P(EndToEndFixture, TestProduceConsumeFromCloud) {
model::timestamp::min(),
1,
log->stm_manager()->max_collectible_offset(),
std::nullopt,
ss::default_priority_class(),
as);
partition->log()->housekeeping(housekeeping_conf).get();
Expand Down Expand Up @@ -516,6 +517,7 @@ TEST_P(CloudStorageEndToEndManualTest, TestTimequeryAfterArchivalGC) {
model::timestamp::min(),
1, // max_bytes_in_log
log->stm_manager()->max_collectible_offset(),
std::nullopt,
ss::default_priority_class(),
as);
partition->log()->housekeeping(housekeeping_conf).get();
Expand Down Expand Up @@ -809,6 +811,7 @@ TEST_P(EndToEndFixture, TestCloudStorageTimequery) {
model::timestamp::max(),
0,
log->stm_manager()->max_collectible_offset(),
std::nullopt,
ss::default_priority_class(),
as);
partition->log()->housekeeping(housekeeping_conf).get();
Expand Down
2 changes: 1 addition & 1 deletion src/v/cloud_storage/tests/topic_manifest_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ SEASTAR_THREAD_TEST_CASE(test_topic_manifest_serde_feature_table) {
std::nullopt,
std::nullopt,
std::nullopt,
};
std::nullopt};

auto random_initial_revision_id
= tests::random_named_int<model::initial_revision_id>();
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/archival/tests/async_data_uploader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ class async_data_uploader_fixture : public redpanda_thread_fixture {
model::timestamp::min(),
std::nullopt,
max_collect_offset,
std::nullopt,
ss::default_priority_class(),
as);

Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/archival/tests/ntp_archiver_reupload_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ struct reupload_fixture : public archiver_fixture {
model::timestamp::max(),
std::nullopt,
max_collectible,
std::nullopt,
ss::default_priority_class(),
abort_source})
.get();
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/archival/tests/ntp_archiver_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,7 @@ FIXTURE_TEST(
model::timestamp::now(),
std::nullopt,
model::offset{999},
std::nullopt,
ss::default_priority_class(),
as))
.get0();
Expand Down
7 changes: 7 additions & 0 deletions src/v/cluster/metadata_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "model/namespace.h"
#include "model/timestamp.h"
#include "storage/types.h"
#include "utils/tristate.h"

#include <seastar/core/coroutine.hh>
#include <seastar/core/sharded.hh>
Expand Down Expand Up @@ -318,6 +319,11 @@ metadata_cache::get_default_record_value_subject_name_strategy() const {
return pandaproxy::schema_registry::subject_name_strategy::topic_name;
}

std::optional<std::chrono::milliseconds>
metadata_cache::get_default_tombstone_retention_ms() const {
return config::shard_local_cfg().tombstone_retention_ms();
}

topic_properties metadata_cache::get_default_properties() const {
topic_properties tp;
tp.compression = {get_default_compression()};
Expand All @@ -335,6 +341,7 @@ topic_properties metadata_cache::get_default_properties() const {
get_default_retention_local_target_bytes()};
tp.retention_local_target_ms = tristate<std::chrono::milliseconds>{
get_default_retention_local_target_ms()};
tp.tombstone_retention_ms = get_default_tombstone_retention_ms();

return tp;
}
Expand Down
2 changes: 2 additions & 0 deletions src/v/cluster/metadata_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ class metadata_cache {
bool get_default_record_value_schema_id_validation() const;
pandaproxy::schema_registry::subject_name_strategy
get_default_record_value_subject_name_strategy() const;
std::optional<std::chrono::milliseconds>
get_default_tombstone_retention_ms() const;

topic_properties get_default_properties() const;
std::optional<partition_assignment>
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/tests/manual_log_deletion_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ struct manual_deletion_fixture : public raft_test_fixture {
retention_timestamp,
100_MiB,
model::offset::max(),
std::nullopt,
ss::default_priority_class(),
as,
storage::ntp_sanitizer_config{.sanitize_only = true}))
Expand Down
2 changes: 2 additions & 0 deletions src/v/cluster/tests/tx_compaction_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class tx_executor {
model::timestamp::now().value() - ret_duration.count()),
std::nullopt,
log->stm_manager()->max_collectible_offset(),
std::nullopt,
ss::default_priority_class(),
dummy_as,
})
Expand Down Expand Up @@ -228,6 +229,7 @@ class tx_executor {
model::timestamp::min(),
std::nullopt,
model::offset::max(),
std::nullopt,
ss::default_priority_class(),
as);
// Compacts until a single sealed segment remains, other than the
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/topic_configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ storage::ntp_config topic_configuration::make_ntp_config(
.write_caching = properties.write_caching,
.flush_ms = properties.flush_ms,
.flush_bytes = properties.flush_bytes,
});
.tombstone_retention_ms = properties.tombstone_retention_ms});
}
return {
model::ntp(tp_ns.ns, tp_ns.tp, p_id),
Expand Down
11 changes: 8 additions & 3 deletions src/v/cluster/topic_properties.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ std::ostream& operator<<(std::ostream& o, const topic_properties& properties) {
"write_caching: {}, "
"flush_ms: {}, "
"flush_bytes: {}, "
"remote_label: {}}}",
"remote_label: {}, "
"tombstone_retention_ms: {}}}",
properties.compression,
properties.cleanup_policy_bitflags,
properties.compaction_strategy,
Expand Down Expand Up @@ -72,7 +73,8 @@ std::ostream& operator<<(std::ostream& o, const topic_properties& properties) {
properties.write_caching,
properties.flush_ms,
properties.flush_bytes,
properties.remote_label);
properties.remote_label,
properties.tombstone_retention_ms);

return o;
}
Expand Down Expand Up @@ -108,7 +110,8 @@ bool topic_properties::has_overrides() const {
|| initial_retention_local_target_bytes.is_engaged()
|| initial_retention_local_target_ms.is_engaged()
|| write_caching.has_value() || flush_ms.has_value()
|| flush_bytes.has_value() || remote_label.has_value();
|| flush_bytes.has_value() || remote_label.has_value()
|| tombstone_retention_ms.has_value();
}

bool topic_properties::requires_remote_erase() const {
Expand Down Expand Up @@ -141,6 +144,7 @@ topic_properties::get_ntp_cfg_overrides() const {
ret.write_caching = write_caching;
ret.flush_ms = flush_ms;
ret.flush_bytes = flush_bytes;
ret.tombstone_retention_ms = tombstone_retention_ms;
return ret;
}

Expand Down Expand Up @@ -228,6 +232,7 @@ adl<cluster::topic_properties>::from(iobuf_parser& parser) {
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt};
}

Expand Down
12 changes: 8 additions & 4 deletions src/v/cluster/topic_properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace cluster {
*/
struct topic_properties
: serde::
envelope<topic_properties, serde::version<9>, serde::compat_version<0>> {
envelope<topic_properties, serde::version<10>, serde::compat_version<0>> {
topic_properties() noexcept = default;
topic_properties(
std::optional<model::compression> compression,
Expand Down Expand Up @@ -71,7 +71,8 @@ struct topic_properties
std::optional<model::vcluster_id> mpx_virtual_cluster_id,
std::optional<model::write_caching_mode> write_caching,
std::optional<std::chrono::milliseconds> flush_ms,
std::optional<size_t> flush_bytes)
std::optional<size_t> flush_bytes,
std::optional<std::chrono::milliseconds> tombstone_retention_ms)
: compression(compression)
, cleanup_policy_bitflags(cleanup_policy_bitflags)
, compaction_strategy(compaction_strategy)
Expand Down Expand Up @@ -108,7 +109,8 @@ struct topic_properties
, mpx_virtual_cluster_id(mpx_virtual_cluster_id)
, write_caching(write_caching)
, flush_ms(flush_ms)
, flush_bytes(flush_bytes) {}
, flush_bytes(flush_bytes)
, tombstone_retention_ms(tombstone_retention_ms) {}

std::optional<model::compression> compression;
std::optional<model::cleanup_policy_bitflags> cleanup_policy_bitflags;
Expand Down Expand Up @@ -162,6 +164,7 @@ struct topic_properties
std::optional<model::write_caching_mode> write_caching;
std::optional<std::chrono::milliseconds> flush_ms;
std::optional<size_t> flush_bytes;
std::optional<std::chrono::milliseconds> tombstone_retention_ms;

// Label to be used when generating paths of remote objects (manifests,
// segments, etc) of this topic.
Expand Down Expand Up @@ -217,7 +220,8 @@ struct topic_properties
flush_ms,
flush_bytes,
remote_label,
remote_topic_namespace_override);
remote_topic_namespace_override,
tombstone_retention_ms);
}

friend bool operator==(const topic_properties&, const topic_properties&)
Expand Down
29 changes: 28 additions & 1 deletion src/v/cluster/topic_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ topic_table::apply(create_topic_cmd cmd, model::offset offset) {
schema_id_validation_validator::ec);
}

if (!topic_multi_property_validation(cmd.value.cfg.properties)) {
return ss::make_ready_future<std::error_code>(
errc::topic_invalid_config);
}

std::optional<model::initial_revision_id> remote_revision
= cmd.value.cfg.properties.remote_topic_properties ? std::make_optional(
cmd.value.cfg.properties.remote_topic_properties->remote_revision)
Expand Down Expand Up @@ -776,6 +781,22 @@ std::error_code topic_table::validate_force_reconfigurable_partitions(
return result;
}

bool topic_table::topic_multi_property_validation(
const topic_properties& properties) const {
// tombstone.retention.ms validation. Cannot be enabled alongside tiered
// storage.
if (properties.tombstone_retention_ms.has_value()) {
if (
properties.shadow_indexing.has_value()
&& properties.shadow_indexing.value()
!= model::shadow_indexing_mode::disabled) {
return false;
}
}

return true;
}

template<typename T>
void incremental_update(
std::optional<T>& property, property_update<std::optional<T>> override) {
Expand Down Expand Up @@ -941,9 +962,11 @@ topic_table::apply(update_topic_properties_cmd cmd, model::offset o) {
updated_properties.write_caching, overrides.write_caching);
incremental_update(updated_properties.flush_ms, overrides.flush_ms);
incremental_update(updated_properties.flush_bytes, overrides.flush_bytes);
incremental_update(
updated_properties.tombstone_retention_ms,
overrides.tombstone_retention_ms);

auto& properties = tp->second.get_configuration().properties;

// no configuration change, no need to generate delta
if (updated_properties == properties) {
co_return errc::success;
Expand All @@ -953,6 +976,10 @@ topic_table::apply(update_topic_properties_cmd cmd, model::offset o) {
co_return schema_id_validation_validator::ec;
}

if (!topic_multi_property_validation(updated_properties)) {
co_return make_error_code(errc::topic_invalid_config);
}

// Apply the changes
properties = std::move(updated_properties);

Expand Down
9 changes: 9 additions & 0 deletions src/v/cluster/topic_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,15 @@ class topic_table {
std::error_code validate_force_reconfigurable_partition(
const ntp_with_majority_loss&) const;

// Validation for the final property configuration from a
// update_topic_properties_cmd application. Allows user to perform
// validations that depend on more than one topic property.
//
// Returns true if the configured topic_properties is valid, and false
// otherwise.
bool
topic_multi_property_validation(const topic_properties& properties) const;

underlying_t _topics;
lifecycle_markers_t _lifecycle_markers;
disabled_partitions_t _disabled_partitions;
Expand Down
11 changes: 10 additions & 1 deletion src/v/cluster/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1176,7 +1176,8 @@ void adl<cluster::incremental_topic_updates>::to(
t.initial_retention_local_target_ms,
t.write_caching,
t.flush_ms,
t.flush_bytes);
t.flush_bytes,
t.tombstone_retention_ms);
}

cluster::incremental_topic_updates
Expand Down Expand Up @@ -1311,6 +1312,14 @@ adl<cluster::incremental_topic_updates>::from(iobuf_parser& in) {
= adl<cluster::property_update<std::optional<size_t>>>{}.from(in);
}

if (
version <= cluster::incremental_topic_updates::
version_with_tombstone_retention_ms) {
updates.tombstone_retention_ms = adl<cluster::property_update<
std::optional<std::chrono::milliseconds>>>{}
.from(in);
}

return updates;
}

Expand Down
12 changes: 9 additions & 3 deletions src/v/cluster/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ struct property_update<tristate<T>>
struct incremental_topic_updates
: serde::envelope<
incremental_topic_updates,
serde::version<6>,
serde::version<7>,
serde::compat_version<0>> {
static constexpr int8_t version_with_data_policy = -1;
static constexpr int8_t version_with_shadow_indexing = -3;
Expand All @@ -573,6 +573,7 @@ struct incremental_topic_updates
static constexpr int8_t version_with_schema_id_validation = -6;
static constexpr int8_t version_with_initial_retention = -7;
static constexpr int8_t version_with_write_caching = -8;
static constexpr int8_t version_with_tombstone_retention_ms = -9;
// negative version indicating different format:
// -1 - topic_updates with data_policy
// -2 - topic_updates without data_policy
Expand All @@ -581,7 +582,9 @@ struct incremental_topic_updates
// -6 - topic updates with schema id validation
// -7 - topic updates with initial retention
// -8 - write caching properties
static constexpr int8_t version = version_with_write_caching;
// -9 - tombstone_retention_ms
static constexpr int8_t version = version_with_tombstone_retention_ms;

property_update<std::optional<model::compression>> compression;
property_update<std::optional<model::cleanup_policy_bitflags>>
cleanup_policy_bitflags;
Expand Down Expand Up @@ -622,6 +625,8 @@ struct incremental_topic_updates
property_update<std::optional<model::write_caching_mode>> write_caching;
property_update<std::optional<std::chrono::milliseconds>> flush_ms;
property_update<std::optional<size_t>> flush_bytes;
property_update<std::optional<std::chrono::milliseconds>>
tombstone_retention_ms;

auto serde_fields() {
return std::tie(
Expand Down Expand Up @@ -650,7 +655,8 @@ struct incremental_topic_updates
initial_retention_local_target_ms,
write_caching,
flush_ms,
flush_bytes);
flush_bytes,
tombstone_retention_ms);
}

friend std::ostream&
Expand Down
4 changes: 4 additions & 0 deletions src/v/compat/cluster_compat.h
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ struct compat_check<cluster::topic_properties> {
json_write(flush_ms);
json_write(flush_bytes);
json_write(remote_topic_namespace_override);
json_write(tombstone_retention_ms);
}

static cluster::topic_properties from_json(json::Value& rd) {
Expand Down Expand Up @@ -394,6 +395,7 @@ struct compat_check<cluster::topic_properties> {
json_read(flush_ms);
json_read(flush_bytes);
json_read(remote_topic_namespace_override);
json_read(tombstone_retention_ms);
return obj;
}

Expand Down Expand Up @@ -425,6 +427,7 @@ struct compat_check<cluster::topic_properties> {
obj.flush_bytes = std::nullopt;
obj.flush_ms = std::nullopt;
obj.remote_topic_namespace_override = std::nullopt;
obj.tombstone_retention_ms = std::nullopt;

if (reply != obj) {
throw compat_error(fmt::format(
Expand Down Expand Up @@ -507,6 +510,7 @@ struct compat_check<cluster::topic_configuration> {
obj.properties.write_caching = std::nullopt;
obj.properties.flush_bytes = std::nullopt;
obj.properties.flush_ms = std::nullopt;
obj.properties.tombstone_retention_ms = std::nullopt;

obj.properties.mpx_virtual_cluster_id = std::nullopt;

Expand Down
Loading