diff --git a/src/v/kafka/server/handlers/fetch.cc b/src/v/kafka/server/handlers/fetch.cc index 34f49de60646..3b8a62f24898 100644 --- a/src/v/kafka/server/handlers/fetch.cc +++ b/src/v/kafka/server/handlers/fetch.cc @@ -169,7 +169,7 @@ read_result::memory_units_t::~memory_units_t() noexcept { * is assumed that a batch size has already been consumed from kafka * memory semaphore for it. */ -read_result::memory_units_t reserve_memory_units( +static read_result::memory_units_t reserve_memory_units( ssx::semaphore& memory_sem, ssx::semaphore& memory_fetch_sem, const size_t max_bytes, @@ -357,6 +357,8 @@ static ss::future do_read_from_ntp( co_return result; } +namespace testing { + ss::future read_from_ntp( cluster::partition_manager& cluster_pm, const replica_selector& replica_selector, @@ -378,6 +380,17 @@ ss::future read_from_ntp( memory_fetch_sem); } +read_result::memory_units_t reserve_memory_units( + ssx::semaphore& memory_sem, + ssx::semaphore& memory_fetch_sem, + const size_t max_bytes, + const bool obligatory_batch_read) { + return kafka::reserve_memory_units( + memory_sem, memory_fetch_sem, max_bytes, obligatory_batch_read); +} + +} // namespace testing + static void fill_fetch_responses( op_context& octx, std::vector results, diff --git a/src/v/kafka/server/handlers/fetch.h b/src/v/kafka/server/handlers/fetch.h index bab4e1a421d4..0b14470abbad 100644 --- a/src/v/kafka/server/handlers/fetch.h +++ b/src/v/kafka/server/handlers/fetch.h @@ -367,6 +367,11 @@ struct fetch_plan { } }; +/* + * Unit Tests Exposure + */ +namespace testing { + ss::future read_from_ntp( cluster::partition_manager&, const replica_selector&, @@ -378,13 +383,18 @@ ss::future read_from_ntp( ssx::semaphore& memory_sem, ssx::semaphore& memory_fetch_sem); -namespace testing { /** * Create a fetch plan with the simple fetch planner. * * Exposed for testing/benchmarking only. */ kafka::fetch_plan make_simple_fetch_plan(op_context& octx); -} // namespace testing +read_result::memory_units_t reserve_memory_units( + ssx::semaphore& memory_sem, + ssx::semaphore& memory_fetch_sem, + const size_t max_bytes, + const bool obligatory_batch_read); + +} // namespace testing } // namespace kafka diff --git a/src/v/kafka/server/tests/CMakeLists.txt b/src/v/kafka/server/tests/CMakeLists.txt index a7bdbb59bea7..257e3ab29a0e 100644 --- a/src/v/kafka/server/tests/CMakeLists.txt +++ b/src/v/kafka/server/tests/CMakeLists.txt @@ -9,6 +9,7 @@ rp_test( handler_interface_test.cc quota_managers_test.cc validator_tests.cc + fetch_unit_test.cc DEFINITIONS BOOST_TEST_DYN_LINK LIBRARIES Boost::unit_test_framework v::kafka v::coproc LABELS kafka diff --git a/src/v/kafka/server/tests/fetch_test.cc b/src/v/kafka/server/tests/fetch_test.cc index 3a4e5371e8b3..0ca145c6aff5 100644 --- a/src/v/kafka/server/tests/fetch_test.cc +++ b/src/v/kafka/server/tests/fetch_test.cc @@ -173,7 +173,7 @@ FIXTURE_TEST(read_from_ntp_max_bytes, redpanda_thread_fixture) { .invoke_on( shard, [&octx, ktp, config](cluster::partition_manager& pm) { - return kafka::read_from_ntp( + return kafka::testing::read_from_ntp( pm, octx.rctx.server().local().get_replica_selector(), ktp, diff --git a/src/v/kafka/server/tests/fetch_unit_test.cc b/src/v/kafka/server/tests/fetch_unit_test.cc new file mode 100644 index 000000000000..34ca55eed0a9 --- /dev/null +++ b/src/v/kafka/server/tests/fetch_unit_test.cc @@ -0,0 +1,147 @@ +// Copyright 2023 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "kafka/server/handlers/fetch.h" +#include "seastarx.h" +#include "ssx/semaphore.h" + +#include +#include + +struct reserve_mem_units_test_result { + size_t kafka, fetch; + explicit reserve_mem_units_test_result(size_t size) + : kafka(size) + , fetch(size) {} + reserve_mem_units_test_result(size_t kafka_, size_t fetch_) + : kafka(kafka_) + , fetch(fetch_) {} + friend bool operator==( + const reserve_mem_units_test_result&, + const reserve_mem_units_test_result&) + = default; + friend std::ostream& + operator<<(std::ostream& s, const reserve_mem_units_test_result& v) { + return s << "{kafka: " << v.kafka << ", fetch: " << v.fetch << "}"; + } +}; + +BOOST_AUTO_TEST_CASE(reserve_memory_units_test) { + using namespace kafka; + using namespace std::chrono_literals; + using r = reserve_mem_units_test_result; + + // reserve memory units, return how many memory units have been reserved + // from each memory semaphore + ssx::semaphore memory_sem{100_MiB, "test_memory_sem"}; + ssx::semaphore memory_fetch_sem{50_MiB, "test_memory_fetch_sem"}; + const auto test_case = + [&memory_sem, &memory_fetch_sem]( + size_t max_bytes, + bool obligatory_batch_read) -> reserve_mem_units_test_result { + auto mu = kafka::testing::reserve_memory_units( + memory_sem, memory_fetch_sem, max_bytes, obligatory_batch_read); + return {mu.kafka.count(), mu.fetch.count()}; + }; + + static constexpr size_t batch_size = 1_MiB; + + // below are test prerequisites, tests are done based on these assumptions + // if these are not valid, the test needs a change + size_t kafka_mem = memory_sem.available_units(); + size_t fetch_mem = memory_fetch_sem.available_units(); + BOOST_TEST(fetch_mem > batch_size * 3); + BOOST_TEST_REQUIRE(kafka_mem > fetch_mem); + BOOST_TEST_REQUIRE(batch_size > 100); + + // *** plenty of memory cases + // kafka_mem > fetch_mem > batch_size + // Reserved memory is limited by the fetch memory semaphore + BOOST_TEST(test_case(batch_size / 100, false) == r(batch_size)); + BOOST_TEST(test_case(batch_size / 100, true) == r(batch_size)); + BOOST_TEST(test_case(batch_size, false) == r(batch_size)); + BOOST_TEST(test_case(batch_size, true) == r(batch_size)); + BOOST_TEST(test_case(batch_size * 3, false) == r(batch_size * 3)); + BOOST_TEST(test_case(batch_size * 3, true) == r(batch_size * 3)); + BOOST_TEST(test_case(fetch_mem, false) == r(fetch_mem)); + BOOST_TEST(test_case(fetch_mem, true) == r(fetch_mem)); + BOOST_TEST(test_case(fetch_mem + 1, false) == r(fetch_mem)); + BOOST_TEST(test_case(fetch_mem + 1, true) == r(fetch_mem)); + BOOST_TEST(test_case(kafka_mem, false) == r(fetch_mem)); + BOOST_TEST(test_case(kafka_mem, true) == r(fetch_mem)); + + // *** still a lot of mem but kafka mem somewhat used: + // fetch_mem > kafka_mem > batch_size (fetch_mem - kafka_mem < batch_size) + // Obligatory reads to not come into play yet because we still have more + // memory than a single batch, but the amount of memory reserved is limited + // by the smaller semaphore, which is kafka_mem in this case + auto memsemunits = ss::consume_units( + memory_sem, kafka_mem - fetch_mem + 1000); + kafka_mem = memory_sem.available_units(); + BOOST_TEST_REQUIRE(kafka_mem < fetch_mem); + BOOST_TEST_REQUIRE(kafka_mem > batch_size + 1000); + + BOOST_TEST(test_case(batch_size, false) == r(batch_size)); + BOOST_TEST(test_case(batch_size, true) == r(batch_size)); + BOOST_TEST(test_case(kafka_mem - 100, false) == r(kafka_mem - 100)); + BOOST_TEST(test_case(kafka_mem - 100, true) == r(kafka_mem - 100)); + BOOST_TEST(test_case(kafka_mem + 100, false) == r(kafka_mem)); + BOOST_TEST(test_case(kafka_mem + 100, true) == r(kafka_mem)); + BOOST_TEST(test_case(fetch_mem + 100, false) == r(kafka_mem)); + BOOST_TEST(test_case(fetch_mem + 100, true) == r(kafka_mem)); + + memsemunits.return_all(); + kafka_mem = memory_sem.available_units(); + + // *** low on fetch memory tests + // kafka_mem > batch_size > fetch_mem + // Under this condition, unless obligatory_batch_read, we cannot reserve + // memory as it's not enough for at least a single batch. + // If obligatory_batch_read, the reserved amount will always be a single + // batch. + memsemunits = ss::consume_units( + memory_fetch_sem, fetch_mem - batch_size + 1000); + fetch_mem = memory_fetch_sem.available_units(); + BOOST_TEST_REQUIRE(kafka_mem > batch_size); + BOOST_TEST_REQUIRE(fetch_mem < batch_size); + + BOOST_TEST(test_case(fetch_mem - 100, false) == r(0)); + BOOST_TEST(test_case(fetch_mem - 100, true) == r(batch_size)); + BOOST_TEST(test_case(batch_size - 100, false) == r(0)); + BOOST_TEST(test_case(batch_size - 100, true) == r(batch_size)); + BOOST_TEST(test_case(kafka_mem - 100, false) == r(0)); + BOOST_TEST(test_case(kafka_mem - 100, true) == r(batch_size)); + BOOST_TEST(test_case(kafka_mem + 100, false) == r(0)); + BOOST_TEST(test_case(kafka_mem + 100, true) == r(batch_size)); + + memsemunits.return_all(); + fetch_mem = memory_fetch_sem.available_units(); + + // *** low on kafka memory tests + // fetch_mem > batch_size > kafka_mem + // Essentially the same behaviour as in low fetch memory cases + memsemunits = ss::consume_units(memory_sem, kafka_mem - batch_size + 1000); + kafka_mem = memory_sem.available_units(); + BOOST_TEST_REQUIRE(kafka_mem < batch_size); + BOOST_TEST_REQUIRE(fetch_mem > batch_size); + + BOOST_TEST(test_case(kafka_mem - 100, false) == r(0)); + BOOST_TEST(test_case(kafka_mem - 100, true) == r(batch_size)); + BOOST_TEST(test_case(batch_size - 100, false) == r(0)); + BOOST_TEST(test_case(batch_size - 100, true) == r(batch_size)); + BOOST_TEST(test_case(batch_size + 100, false) == r(0)); + BOOST_TEST(test_case(batch_size + 100, true) == r(batch_size)); + BOOST_TEST(test_case(fetch_mem - 100, false) == r(0)); + BOOST_TEST(test_case(fetch_mem - 100, true) == r(batch_size)); + BOOST_TEST(test_case(fetch_mem + 100, false) == r(0)); + BOOST_TEST(test_case(fetch_mem + 100, true) == r(batch_size)); + + memsemunits.return_all(); + kafka_mem = memory_sem.available_units(); +}