Skip to content

Commit

Permalink
kafka/create_topics: validate config literals
Browse files Browse the repository at this point in the history
Some of the topic creation configurations take string literals as
values. Prior to this patch creating a topic with an unexpected string
literal value failed without returning any error message due to an
unhandled exception thrown in the parsing of the literal.

This patch introduces validators which ensure that the string literal
is a valid configuration value and return a meaningful error otherwise.

Fixes #3081
  • Loading branch information
Vlad Lazar committed May 26, 2022
1 parent 5bb252b commit 0343702
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 1 deletion.
6 changes: 5 additions & 1 deletion src/v/kafka/server/handlers/create_topics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ using validators = make_validator_types<
partition_count_must_be_positive,
replication_factor_must_be_positive,
replication_factor_must_be_odd,
replicas_diversity>;
replicas_diversity,
compression_type_validator,
compaction_strategy_validator,
timestamp_type_validator,
cleanup_policy_validator>;

template<>
ss::future<response_ptr> create_topics_handler::handle(
Expand Down
64 changes: 64 additions & 0 deletions src/v/kafka/server/handlers/topics/validators.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,4 +143,68 @@ struct unsupported_configuration_entries {
}
};

struct compression_type_validator_details {
using validated_type = model::compression;

static constexpr const char* error_message
= "Unsupported compression type ";
static constexpr const auto config_name = topic_property_compression;
};

struct compaction_strategy_validator_details {
using validated_type = model::compaction_strategy;

static constexpr const char* error_message
= "Unsupported compaction strategy ";
static constexpr const auto config_name
= topic_property_compaction_strategy;
};

struct timestamp_type_validator_details {
using validated_type = model::timestamp_type;

static constexpr const char* error_message = "Unsupported timestamp type ";
static constexpr const auto config_name = topic_property_timestamp_type;
};

struct cleanup_policy_validator_details {
using validated_type = model::cleanup_policy_bitflags;

static constexpr const char* error_message = "Unsupported cleanup policy ";
static constexpr const auto config_name = topic_property_cleanup_policy;
};

template<typename T>
struct configuration_value_validator {
static constexpr const char* error_message = T::error_message;
static constexpr error_code ec = error_code::invalid_config;

static bool is_valid(const creatable_topic& c) {
auto config_entries = config_map(c.configs);
auto end = config_entries.end();

auto iter = config_entries.find(T::config_name);

if (end == iter) {
return true;
}

try {
boost::lexical_cast<typename T::validated_type>(iter->second);
return true;
} catch (...) {
return false;
}
}
};

using compression_type_validator
= configuration_value_validator<compression_type_validator_details>;
using compaction_strategy_validator
= configuration_value_validator<compaction_strategy_validator_details>;
using timestamp_type_validator
= configuration_value_validator<timestamp_type_validator_details>;
using cleanup_policy_validator
= configuration_value_validator<cleanup_policy_validator_details>;

} // namespace kafka

0 comments on commit 0343702

Please sign in to comment.