diff --git a/src/v/kafka/server/handlers/create_topics.cc b/src/v/kafka/server/handlers/create_topics.cc index 67b6ee2e667f0..e088df7371a57 100644 --- a/src/v/kafka/server/handlers/create_topics.cc +++ b/src/v/kafka/server/handlers/create_topics.cc @@ -33,7 +33,7 @@ namespace kafka { -static constexpr std::array supported_configs{ +static constexpr std::array supported_configs{ topic_property_compression, topic_property_cleanup_policy, topic_property_timestamp_type, @@ -44,8 +44,7 @@ static constexpr std::array supported_configs{ topic_property_recovery, topic_property_remote_write, topic_property_remote_read, - topic_property_read_replica, - topic_property_read_replica_bucket}; + topic_property_read_replica}; bool is_supported(std::string_view name) { return std::any_of( @@ -65,9 +64,7 @@ using validators = make_validator_types< compaction_strategy_validator, timestamp_type_validator, cleanup_policy_validator, - remote_read_and_write_are_not_supported_for_read_replica, - s3_bucket_is_required_for_read_replica, - s3_bucket_is_supported_only_for_read_replica>; + remote_read_and_write_are_not_supported_for_read_replica>; static std::vector properties_to_result_configs(config_map_t config_map) { diff --git a/src/v/kafka/server/handlers/topics/types.cc b/src/v/kafka/server/handlers/topics/types.cc index 8eda38c9996b6..80594c920795c 100644 --- a/src/v/kafka/server/handlers/topics/types.cc +++ b/src/v/kafka/server/handlers/topics/types.cc @@ -152,10 +152,11 @@ to_cluster_type(const creatable_topic& t) { cfg.properties.recovery = get_bool_value( config_entries, topic_property_recovery); cfg.properties.shadow_indexing = get_shadow_indexing_mode(config_entries); - cfg.properties.read_replica = get_bool_value( - config_entries, topic_property_read_replica); cfg.properties.read_replica_bucket = get_string_value( - config_entries, topic_property_read_replica_bucket); + config_entries, topic_property_read_replica); + if (cfg.properties.read_replica_bucket.has_value()) { + cfg.properties.read_replica = true; + } /// Final topic_property not decoded here is \ref remote_topic_properties, /// is more of an implementation detail no need to ever show user @@ -246,12 +247,8 @@ config_map_t from_cluster_type(const cluster::topic_properties& properties) { break; } } - if (properties.read_replica) { - config_entries[topic_property_read_replica] = from_config_type( - *properties.read_replica); - } if (properties.read_replica_bucket) { - config_entries[topic_property_read_replica_bucket] = from_config_type( + config_entries[topic_property_read_replica] = from_config_type( *properties.read_replica_bucket); } /// Final topic_property not encoded here is \ref remote_topic_properties, diff --git a/src/v/kafka/server/handlers/topics/types.h b/src/v/kafka/server/handlers/topics/types.h index b2a66303ebe43..0c57d1bd458a7 100644 --- a/src/v/kafka/server/handlers/topics/types.h +++ b/src/v/kafka/server/handlers/topics/types.h @@ -53,8 +53,6 @@ static constexpr std::string_view topic_property_remote_read = "redpanda.remote.read"; static constexpr std::string_view topic_property_read_replica = "redpanda.remote.readreplica"; -static constexpr std::string_view topic_property_read_replica_bucket - = "redpanda.remote.readreplica.bucket"; // Data-policy property static constexpr std::string_view topic_property_data_policy_function_name diff --git a/src/v/kafka/server/handlers/topics/validators.h b/src/v/kafka/server/handlers/topics/validators.h index cb346517beff0..12a92e67a89bb 100644 --- a/src/v/kafka/server/handlers/topics/validators.h +++ b/src/v/kafka/server/handlers/topics/validators.h @@ -163,47 +163,6 @@ struct remote_read_and_write_are_not_supported_for_read_replica { } }; -struct s3_bucket_is_required_for_read_replica { - static constexpr error_code ec = error_code::invalid_config; - static constexpr const char* error_message - = "s3 bucket should be provided for read replica topic"; - - static bool is_valid(const creatable_topic& c) { - auto config_entries = config_map(c.configs); - auto end = config_entries.end(); - bool is_read_replica - = (config_entries.find(topic_property_read_replica) != end); - bool s3_bucket_provided - = (config_entries.find(topic_property_read_replica_bucket) != end); - - if (is_read_replica && !s3_bucket_provided) { - return false; - } - return true; - } -}; - -struct s3_bucket_is_supported_only_for_read_replica { - static constexpr error_code ec = error_code::invalid_config; - static constexpr const char* error_message - = "s3 bucket is supported only when redpanda.remote.readreplica is " - "enabled"; - - static bool is_valid(const creatable_topic& c) { - auto config_entries = config_map(c.configs); - auto end = config_entries.end(); - bool is_read_replica - = (config_entries.find(topic_property_read_replica) != end); - bool s3_bucket_provided - = (config_entries.find(topic_property_read_replica_bucket) != end); - - if (!is_read_replica && s3_bucket_provided) { - return false; - } - return true; - } -}; - struct compression_type_validator_details { using validated_type = model::compression; diff --git a/src/v/kafka/server/tests/create_topics_test.cc b/src/v/kafka/server/tests/create_topics_test.cc index afb0532b41eec..2d7707767fe8a 100644 --- a/src/v/kafka/server/tests/create_topics_test.cc +++ b/src/v/kafka/server/tests/create_topics_test.cc @@ -401,64 +401,18 @@ FIXTURE_TEST(read_replica, create_topic_fixture) { std::nullopt, std::nullopt, std::map{ - {"redpanda.remote.readreplica", "true"}, - {"redpanda.remote.readreplica.bucket", "panda-bucket"}}); + {"redpanda.remote.readreplica", "panda-bucket"}}); test_create_read_replica_topic(make_req({topic}), 32, 10); } -FIXTURE_TEST(s3bucket_is_missing, create_topic_fixture) { - auto topic = make_topic( - "topic1", - std::nullopt, - std::nullopt, - std::map{ - {"redpanda.remote.readreplica", "true"}}); - - auto req = make_req({topic}); - - auto client = make_kafka_client().get0(); - client.connect().get(); - auto resp = client.dispatch(req, kafka::api_version(2)).get0(); - - BOOST_CHECK( - resp.data.topics[0].error_code == kafka::error_code::invalid_config); - BOOST_CHECK( - resp.data.topics[0].error_message - == "s3 bucket should be provided for read replica topic"); - BOOST_CHECK(resp.data.topics[0].name == "topic1"); -} - -FIXTURE_TEST(s3bucket_but_not_read_replica, create_topic_fixture) { - auto topic = make_topic( - "topic1", - std::nullopt, - std::nullopt, - std::map{ - {"redpanda.remote.readreplica.bucket", "panda-bucket"}}); - - auto req = make_req({topic}); - - auto client = make_kafka_client().get0(); - client.connect().get(); - auto resp = client.dispatch(req, kafka::api_version(2)).get0(); - - BOOST_CHECK( - resp.data.topics[0].error_code == kafka::error_code::invalid_config); - BOOST_CHECK( - resp.data.topics[0].error_message - == "s3 bucket is supported only when redpanda.remote.readreplica is " - "enabled"); - BOOST_CHECK(resp.data.topics[0].name == "topic1"); -} - FIXTURE_TEST(read_replica_and_remote_write, create_topic_fixture) { auto topic = make_topic( "topic1", std::nullopt, std::nullopt, std::map{ - {"redpanda.remote.readreplica", "true"}, + {"redpanda.remote.readreplica", "panda-bucket"}, {"redpanda.remote.write", "true"}}); auto req = make_req({topic});