Skip to content

Commit

Permalink
Merge pull request #4953 from VladLazar/issue-3081-unknown-compressio…
Browse files Browse the repository at this point in the history
…n-type

kafka: validate user provided 'create_topic' configuration literals
  • Loading branch information
Vlad Lazar committed May 31, 2022
2 parents dcb02ab + bac883a commit 4f010fa
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 2 deletions.
6 changes: 5 additions & 1 deletion src/v/kafka/server/handlers/create_topics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ using validators = make_validator_types<
partition_count_must_be_positive,
replication_factor_must_be_positive,
replication_factor_must_be_odd,
replicas_diversity>;
replicas_diversity,
compression_type_validator,
compaction_strategy_validator,
timestamp_type_validator,
cleanup_policy_validator>;

template<>
ss::future<response_ptr> create_topics_handler::handle(
Expand Down
64 changes: 64 additions & 0 deletions src/v/kafka/server/handlers/topics/validators.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,68 @@ struct unsupported_configuration_entries {
}
};

struct compression_type_validator_details {
using validated_type = model::compression;

static constexpr const char* error_message
= "Unsupported compression type ";
static constexpr const auto config_name = topic_property_compression;
};

struct compaction_strategy_validator_details {
using validated_type = model::compaction_strategy;

static constexpr const char* error_message
= "Unsupported compaction strategy ";
static constexpr const auto config_name
= topic_property_compaction_strategy;
};

struct timestamp_type_validator_details {
using validated_type = model::timestamp_type;

static constexpr const char* error_message = "Unsupported timestamp type ";
static constexpr const auto config_name = topic_property_timestamp_type;
};

struct cleanup_policy_validator_details {
using validated_type = model::cleanup_policy_bitflags;

static constexpr const char* error_message = "Unsupported cleanup policy ";
static constexpr const auto config_name = topic_property_cleanup_policy;
};

template<typename T>
struct configuration_value_validator {
static constexpr const char* error_message = T::error_message;
static constexpr error_code ec = error_code::invalid_config;

static bool is_valid(const creatable_topic& c) {
auto config_entries = config_map(c.configs);
auto end = config_entries.end();

auto iter = config_entries.find(T::config_name);

if (end == iter) {
return true;
}

try {
boost::lexical_cast<typename T::validated_type>(iter->second);
return true;
} catch (...) {
return false;
}
}
};

using compression_type_validator
= configuration_value_validator<compression_type_validator_details>;
using compaction_strategy_validator
= configuration_value_validator<compaction_strategy_validator_details>;
using timestamp_type_validator
= configuration_value_validator<timestamp_type_validator_details>;
using cleanup_policy_validator
= configuration_value_validator<cleanup_policy_validator_details>;

} // namespace kafka
14 changes: 13 additions & 1 deletion tests/rptest/tests/rpk_topic_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

from ducktape.mark import parametrize
from ducktape.utils.util import wait_until
from rptest.services.cluster import cluster
import ducktape.errors

from rptest.tests.redpanda_test import RedpandaTest
from rptest.clients.rpk import RpkTool
from rptest.clients.rpk import RpkTool, RpkException
from rptest.services.rpk_consumer import RpkConsumer
from rptest.util import expect_exception

import time
import random
Expand All @@ -34,6 +36,16 @@ def test_create_topic(self):
backoff_sec=1,
err_msg="Topic never appeared.")

@cluster(num_nodes=1)
@parametrize(config_type="compression.type")
@parametrize(config_type="compaction.strategy")
@parametrize(config_type="message.timestamp.type")
@parametrize(config_type="cleanup.policy")
def test_create_topic_with_invalid_config(self, config_type):
with expect_exception(RpkException,
lambda e: "INVALID_CONFIG" in str(e)):
out = self._rpk.create_topic("topic", config={config_type: "foo"})

@cluster(num_nodes=4)
def test_produce(self):
topic = 'topic'
Expand Down

0 comments on commit 4f010fa

Please sign in to comment.