Skip to content

Commit

Permalink
kafka: merge remote.readreplica and bucket
Browse files Browse the repository at this point in the history
The change is only UX, feature implementation and internal types don't
change.
  • Loading branch information
Elena Anyusheva committed Jul 13, 2022
1 parent c004cb8 commit b024531
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 105 deletions.
9 changes: 3 additions & 6 deletions src/v/kafka/server/handlers/create_topics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

namespace kafka {

static constexpr std::array<std::string_view, 12> supported_configs{
static constexpr std::array<std::string_view, 11> supported_configs{
topic_property_compression,
topic_property_cleanup_policy,
topic_property_timestamp_type,
Expand All @@ -44,8 +44,7 @@ static constexpr std::array<std::string_view, 12> supported_configs{
topic_property_recovery,
topic_property_remote_write,
topic_property_remote_read,
topic_property_read_replica,
topic_property_read_replica_bucket};
topic_property_read_replica};

bool is_supported(std::string_view name) {
return std::any_of(
Expand All @@ -65,9 +64,7 @@ using validators = make_validator_types<
compaction_strategy_validator,
timestamp_type_validator,
cleanup_policy_validator,
remote_read_and_write_are_not_supported_for_read_replica,
s3_bucket_is_required_for_read_replica,
s3_bucket_is_supported_only_for_read_replica>;
remote_read_and_write_are_not_supported_for_read_replica>;

static std::vector<creatable_topic_configs>
properties_to_result_configs(config_map_t config_map) {
Expand Down
13 changes: 5 additions & 8 deletions src/v/kafka/server/handlers/topics/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,11 @@ to_cluster_type(const creatable_topic& t) {
cfg.properties.recovery = get_bool_value(
config_entries, topic_property_recovery);
cfg.properties.shadow_indexing = get_shadow_indexing_mode(config_entries);
cfg.properties.read_replica = get_bool_value(
config_entries, topic_property_read_replica);
cfg.properties.read_replica_bucket = get_string_value(
config_entries, topic_property_read_replica_bucket);
config_entries, topic_property_read_replica);
if (cfg.properties.read_replica_bucket.has_value()) {
cfg.properties.read_replica = true;
}
/// Final topic_property not decoded here is \ref remote_topic_properties,
/// is more of an implementation detail no need to ever show user

Expand Down Expand Up @@ -246,12 +247,8 @@ config_map_t from_cluster_type(const cluster::topic_properties& properties) {
break;
}
}
if (properties.read_replica) {
config_entries[topic_property_read_replica] = from_config_type(
*properties.read_replica);
}
if (properties.read_replica_bucket) {
config_entries[topic_property_read_replica_bucket] = from_config_type(
config_entries[topic_property_read_replica] = from_config_type(
*properties.read_replica_bucket);
}
/// Final topic_property not encoded here is \ref remote_topic_properties,
Expand Down
2 changes: 0 additions & 2 deletions src/v/kafka/server/handlers/topics/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ static constexpr std::string_view topic_property_remote_read
= "redpanda.remote.read";
static constexpr std::string_view topic_property_read_replica
= "redpanda.remote.readreplica";
static constexpr std::string_view topic_property_read_replica_bucket
= "redpanda.remote.readreplica.bucket";

// Data-policy property
static constexpr std::string_view topic_property_data_policy_function_name
Expand Down
41 changes: 0 additions & 41 deletions src/v/kafka/server/handlers/topics/validators.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,47 +163,6 @@ struct remote_read_and_write_are_not_supported_for_read_replica {
}
};

struct s3_bucket_is_required_for_read_replica {
static constexpr error_code ec = error_code::invalid_config;
static constexpr const char* error_message
= "s3 bucket should be provided for read replica topic";

static bool is_valid(const creatable_topic& c) {
auto config_entries = config_map(c.configs);
auto end = config_entries.end();
bool is_read_replica
= (config_entries.find(topic_property_read_replica) != end);
bool s3_bucket_provided
= (config_entries.find(topic_property_read_replica_bucket) != end);

if (is_read_replica && !s3_bucket_provided) {
return false;
}
return true;
}
};

struct s3_bucket_is_supported_only_for_read_replica {
static constexpr error_code ec = error_code::invalid_config;
static constexpr const char* error_message
= "s3 bucket is supported only when redpanda.remote.readreplica is "
"enabled";

static bool is_valid(const creatable_topic& c) {
auto config_entries = config_map(c.configs);
auto end = config_entries.end();
bool is_read_replica
= (config_entries.find(topic_property_read_replica) != end);
bool s3_bucket_provided
= (config_entries.find(topic_property_read_replica_bucket) != end);

if (!is_read_replica && s3_bucket_provided) {
return false;
}
return true;
}
};

struct compression_type_validator_details {
using validated_type = model::compression;

Expand Down
50 changes: 2 additions & 48 deletions src/v/kafka/server/tests/create_topics_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -401,64 +401,18 @@ FIXTURE_TEST(read_replica, create_topic_fixture) {
std::nullopt,
std::nullopt,
std::map<ss::sstring, ss::sstring>{
{"redpanda.remote.readreplica", "true"},
{"redpanda.remote.readreplica.bucket", "panda-bucket"}});
{"redpanda.remote.readreplica", "panda-bucket"}});

test_create_read_replica_topic(make_req({topic}), 32, 10);
}

FIXTURE_TEST(s3bucket_is_missing, create_topic_fixture) {
auto topic = make_topic(
"topic1",
std::nullopt,
std::nullopt,
std::map<ss::sstring, ss::sstring>{
{"redpanda.remote.readreplica", "true"}});

auto req = make_req({topic});

auto client = make_kafka_client().get0();
client.connect().get();
auto resp = client.dispatch(req, kafka::api_version(2)).get0();

BOOST_CHECK(
resp.data.topics[0].error_code == kafka::error_code::invalid_config);
BOOST_CHECK(
resp.data.topics[0].error_message
== "s3 bucket should be provided for read replica topic");
BOOST_CHECK(resp.data.topics[0].name == "topic1");
}

FIXTURE_TEST(s3bucket_but_not_read_replica, create_topic_fixture) {
auto topic = make_topic(
"topic1",
std::nullopt,
std::nullopt,
std::map<ss::sstring, ss::sstring>{
{"redpanda.remote.readreplica.bucket", "panda-bucket"}});

auto req = make_req({topic});

auto client = make_kafka_client().get0();
client.connect().get();
auto resp = client.dispatch(req, kafka::api_version(2)).get0();

BOOST_CHECK(
resp.data.topics[0].error_code == kafka::error_code::invalid_config);
BOOST_CHECK(
resp.data.topics[0].error_message
== "s3 bucket is supported only when redpanda.remote.readreplica is "
"enabled");
BOOST_CHECK(resp.data.topics[0].name == "topic1");
}

FIXTURE_TEST(read_replica_and_remote_write, create_topic_fixture) {
auto topic = make_topic(
"topic1",
std::nullopt,
std::nullopt,
std::map<ss::sstring, ss::sstring>{
{"redpanda.remote.readreplica", "true"},
{"redpanda.remote.readreplica", "panda-bucket"},
{"redpanda.remote.write", "true"}});

auto req = make_req({topic});
Expand Down

0 comments on commit b024531

Please sign in to comment.