diff --git a/src/v/rpc/CMakeLists.txt b/src/v/rpc/CMakeLists.txt index 5f7bb440d097..3c68de0d0195 100644 --- a/src/v/rpc/CMakeLists.txt +++ b/src/v/rpc/CMakeLists.txt @@ -18,4 +18,3 @@ v_cc_library( v::net ) add_subdirectory(test) -add_subdirectory(demo) diff --git a/src/v/rpc/demo/CMakeLists.txt b/src/v/rpc/demo/CMakeLists.txt deleted file mode 100644 index 90d5514e27ba..000000000000 --- a/src/v/rpc/demo/CMakeLists.txt +++ /dev/null @@ -1,33 +0,0 @@ -include(rpcgen) -rpcgen( - TARGET demo_gen - IN_FILE ${CMAKE_CURRENT_SOURCE_DIR}/gen.json - OUT_FILE ${CMAKE_CURRENT_BINARY_DIR}/simple_service.h - INCLUDES ${CMAKE_BINARY_DIR}/src/v - ) - -add_executable(demo_server server.cc) -target_link_libraries(demo_server PUBLIC demo_gen v::model) -set_property(TARGET demo_server PROPERTY POSITION_INDEPENDENT_CODE ON) - -if(CMAKE_BUILD_TYPE MATCHES Release) - include(CheckIPOSupported) - check_ipo_supported(RESULT ltosupported OUTPUT error) - if(ltosupported) - set_property(TARGET demo_server PROPERTY INTERPROCEDURAL_OPTIMIZATION ON) - endif() -endif() - -# client -add_executable(demo_client client.cc) -target_link_libraries(demo_client PUBLIC demo_gen v::model) -set_property(TARGET demo_client PROPERTY POSITION_INDEPENDENT_CODE ON) - - -rp_test( - UNIT_TEST - BINARY_NAME roundtrip_demo_types - SOURCES type_tests.cc - LIBRARIES v::seastar_testing_main demo_gen v::model - ARGS "-- -c 1" -) diff --git a/src/v/rpc/demo/client.cc b/src/v/rpc/demo/client.cc deleted file mode 100644 index 4021e2f551ea..000000000000 --- a/src/v/rpc/demo/client.cc +++ /dev/null @@ -1,291 +0,0 @@ -// Copyright 2020 Redpanda Data, Inc. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.md -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0 - -#include "model/timeout_clock.h" -#include "rpc/demo/demo_utils.h" -#include "rpc/demo/simple_service.h" -#include "rpc/types.h" -#include "syschecks/syschecks.h" -#include "utils/hdr_hist.h" -#include "vlog.h" - -#include -#include -#include -#include -#include - -#include - -namespace ch = std::chrono; -static ss::logger lgr{"demo client"}; - -void cli_opts(boost::program_options::options_description_easy_init o) { - namespace po = boost::program_options; - o("ip", - po::value()->default_value("127.0.0.1"), - "ip to connect to"); - o("port", po::value()->default_value(20776), "port for service"); - o("concurrency", - po::value()->default_value(1000), - "number of concurrent requests per TCP connection"); - o("data-size", - po::value()->default_value(1 << 20), - "1MB default data _per_ request"); - o("chunk-size", - po::value()->default_value(1 << 15), - "fragment data_size by this chunk size step (32KB default)"); - o("parallelism", - po::value()->default_value(2), - "number of TCP connections per core"); - o("test-case", - po::value()->default_value(1), - "1: large payload, 2: complex struct, 3: interspersed"); - o("ca-cert", - po::value()->default_value(""), - "CA root certificate"); -} - -struct load_gen_cfg { - std::size_t global_total_requests() const { - return core_total_requests() * ss::smp::count; - } - std::size_t global_size_test1() const { - return global_total_requests() * data_size; - } - std::size_t global_size_test2() const { - return global_total_requests() * sizeof(demo::complex_request); - } - std::size_t core_total_requests() const { - return parallelism * concurrency; - } - - std::size_t data_size; - std::size_t chunk_size; - std::size_t concurrency; - std::size_t parallelism; - std::size_t test_case; - rpc::transport_configuration client_cfg; - ss::sharded* hist; -}; - -inline std::ostream& operator<<(std::ostream& o, const load_gen_cfg& cfg) { - // make the output json-able so we can consume it in python for analysis - return o << "{'data_size':" << cfg.data_size - << ", 'chunk_size':" << cfg.chunk_size - << ", 'concurrency':" << cfg.concurrency - << ", 'parallelism':" << cfg.parallelism - << ", 'test_case':" << cfg.test_case - << ", 'max_queued_bytes_per_tcp':" - << cfg.client_cfg.max_queued_bytes - << ", 'global_test_1_data_size':" << cfg.global_size_test1() - << ", 'global_test_2_data_size':" << cfg.global_size_test2() - << ", 'global_requests':" << cfg.global_total_requests() - << ", 'cores':" << ss::smp::count << "}"; -} - -// 1. creates cfg.parallelism number of TCP connections -// 2. launches cfg.concurrency * parallelism number of requests -// 3. each request is cfg.data_size large -// 4. each cfg.data_size is split into cfg.chunk_size # of chunks -// 5. profit -class client_loadgen { -public: - using cli = rpc::client; - client_loadgen(load_gen_cfg cfg) - : _cfg(std::move(cfg)) - , _mem(ss::memory::stats().total_memory() * .9) { - lgr.debug("Mem for loadgen: {}", _mem.available_units()); - for (std::size_t i = 0; i < _cfg.parallelism; ++i) { - _clients.push_back(std::make_unique(_cfg.client_cfg)); - } - } - ss::future<> execute_loadgen() { - return ss::parallel_for_each( - _clients.begin(), _clients.end(), [this](auto& c) { - return ss::parallel_for_each( - boost::irange(std::size_t(0), _cfg.concurrency), - [this, &c](std::size_t) mutable { - return execute_one(c.get()); - }); - }); - } - ss::future<> connect() { - return ss::parallel_for_each( - _clients.begin(), _clients.end(), [](auto& c) { - return c->connect(model::no_timeout); - }); - } - ss::future<> stop() { - return ss::parallel_for_each( - _clients.begin(), _clients.end(), [](auto& c) { return c->stop(); }); - } - -private: - ss::future<> execute_one(cli* const c) { - if (_cfg.test_case < 1 || _cfg.test_case > 3) { - throw std::runtime_error(fmt::format( - "Unknown test:{}, bad config:{}", _cfg.test_case, _cfg)); - } - if (_cfg.test_case == 1) { - return get_units(_mem, _cfg.data_size) - .then([this, c](ss::semaphore_units<> u) { - return c - ->put( - demo::gen_simple_request(_cfg.data_size, _cfg.chunk_size), - rpc::client_opts(rpc::no_timeout)) - .then([m = _cfg.hist->local().auto_measure(), - u = std::move(u)](auto _) {}); - }); - } else if (_cfg.test_case == 2) { - return get_units(_mem, sizeof(demo::complex_request{})) - .then([this, &c](ss::semaphore_units<> u) { - return c - ->put_complex( - demo::complex_request{}, - rpc::client_opts(rpc::no_timeout)) - .then([m = _cfg.hist->local().auto_measure(), - u = std::move(u)](auto _) {}); - }); - } else if (_cfg.test_case == 3) { - return get_units(_mem, _cfg.data_size) - .then([this, &c](ss::semaphore_units<> u) { - auto r = demo::gen_interspersed_request( - _cfg.data_size, _cfg.chunk_size); - return c - ->put_interspersed( - std::move(r), rpc::client_opts(rpc::no_timeout)) - .then([m = _cfg.hist->local().auto_measure(), - u = std::move(u)](auto _) {}); - }); - } - __builtin_unreachable(); - } - - load_gen_cfg _cfg; - ss::semaphore _mem; - std::vector> _clients; -}; - -inline load_gen_cfg -cfg_from(boost::program_options::variables_map& m, ss::sharded* h) { - rpc::transport_configuration client_cfg; - client_cfg.server_addr = net::unresolved_address( - m["ip"].as(), m["port"].as()); - auto ca_cert = m["ca-cert"].as(); - if (ca_cert != "") { - auto builder = ss::tls::credentials_builder(); - // FIXME - // builder.set_dh_level(tls::dh_params::level::MEDIUM); - vlog(lgr.info, "Using {} as CA root certificate", ca_cert); - builder.set_x509_trust_file(ca_cert, ss::tls::x509_crt_format::PEM) - .get0(); - client_cfg.credentials - = builder.build_reloadable_certificate_credentials().get0(); - } - client_cfg.max_queued_bytes = ss::memory::stats().total_memory() * .8; - return load_gen_cfg{ - .data_size = m["data-size"].as(), - .chunk_size = m["chunk-size"].as(), - .concurrency = m["concurrency"].as(), - .parallelism = m["parallelism"].as(), - .test_case = m["test-case"].as(), - .client_cfg = std::move(client_cfg), - .hist = h}; -} - -class throughput { -public: - using time_t = std::chrono::steady_clock::time_point; - throughput(std::size_t total_requests) - : _total_requests(total_requests) - , _begin(now()) {} - double qps() const { - if (_end < _begin) { - throw std::runtime_error("call ::stop() first"); - } - double d = static_cast(duration_ms()); - d /= 1000.0; - return static_cast(_total_requests) / d; - } - void stop() { - if (_end > _begin) { - throw std::runtime_error("throughput time already stopped"); - } - _end = now(); - } - std::size_t duration_ms() const { - return ch::duration_cast(_end - _begin).count(); - } - -private: - time_t now() const { return std::chrono::steady_clock::now(); } - std::size_t _total_requests; - time_t _begin; - time_t _end; -}; - -inline std::ostream& operator<<(std::ostream& o, const throughput& t) { - return o << "{'qps':" << t.qps() << ",'duration_ms':" << t.duration_ms() - << "}"; -} - -inline hdr_hist aggregate_in_thread(ss::sharded& h) { - hdr_hist retval; - for (auto i = 0; i < ss::smp::count; ++i) { - h.invoke_on(i, [&retval](const hdr_hist& o) { retval += o; }).get(); - } - return retval; -} - -void write_configuration_in_thread( - const throughput& tp, const load_gen_cfg& cfg) { - std::ostringstream to; - to << "{'throughput':" << tp << ", 'config':" << cfg << "}"; - const ss::sstring s = to.str(); - force_write_ptr("test_config.json", s.data(), s.size()).get(); -} - -void write_latency_in_thread(ss::sharded& hist) { - auto h = aggregate_in_thread(hist); - write_histogram("clients.hdr", h).get(); -} - -int main(int args, char** argv, char** env) { - syschecks::initialize_intrinsics(); - std::setvbuf(stdout, nullptr, _IOLBF, 1024); - ss::app_template app; - cli_opts(app.add_options()); - ss::sharded client; - ss::sharded hist; - return app.run(args, argv, [&] { - auto& cfg = app.configuration(); - return ss::async([&] { - vlog(lgr.info, "constructing histogram"); - hist.start().get(); - auto hd = ss::defer([&hist] { hist.stop().get(); }); - const load_gen_cfg lcfg = cfg_from(cfg, &hist); - vlog(lgr.info, "config:{}", lcfg); - vlog(lgr.info, "constructing client"); - client.start(lcfg).get(); - auto cd = ss::defer([&client] { client.stop().get(); }); - vlog(lgr.info, "connecting clients"); - client.invoke_on_all(&client_loadgen::connect).get(); - auto tp = throughput(lcfg.global_total_requests()); - vlog(lgr.info, "invoking loadgen"); - client.invoke_on_all(&client_loadgen::execute_loadgen).get(); - tp.stop(); - vlog(lgr.info, "{}", tp); - vlog(lgr.info, "writing results"); - write_configuration_in_thread(tp, lcfg); - write_latency_in_thread(hist); - vlog(lgr.info, "stopping"); - }); - }); -} diff --git a/src/v/rpc/demo/demo_utils.h b/src/v/rpc/demo/demo_utils.h deleted file mode 100644 index 43edeec126fc..000000000000 --- a/src/v/rpc/demo/demo_utils.h +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Copyright 2020 Redpanda Data, Inc. - * - * Use of this software is governed by the Business Source License - * included in the file licenses/BSL.md - * - * As of the Change Date specified in that file, in accordance with - * the Business Source License, use of this software will be governed - * by the Apache License, Version 2.0 - */ - -#include "reflection/arity.h" -#include "reflection/for_each_field.h" -#include "rpc/demo/types.h" -#include "seastarx.h" -#include "ssx/sformat.h" -#include "utils/hdr_hist.h" - -#include -#include - -inline ss::future<> -force_write_ptr(ss::sstring filename, const char* ptr, std::size_t len) { - auto flags = ss::open_flags::rw | ss::open_flags::create - | ss::open_flags::truncate; - return open_file_dma(filename, flags) - .then( - [](ss::file f) { return ss::make_file_output_stream(std::move(f)); }) - .then([ptr, len](ss::output_stream o) mutable { - auto out = ss::make_lw_shared>(std::move(o)); - return out->write(ptr, len) - .then([out] { return out->flush(); }) - .then([out] { return out->close(); }) - .finally([out] {}); - }); -} - -inline ss::future<> -force_write_buffer(ss::sstring filename, ss::temporary_buffer b) { - const char* ptr = b.get(); - std::size_t len = b.size(); - return force_write_ptr(std::move(filename), ptr, len) - .then([b = std::move(b)] {}); -} - -inline ss::future<> write_histogram(ss::sstring filename, const hdr_hist& h) { - return force_write_buffer(std::move(filename), h.print_classic()); -} - -namespace demo { -inline iobuf rand_iobuf(std::size_t chunks, std::size_t chunk_size) { - iobuf b; - for (size_t i = 0; i < chunks; ++i) { - b.append(ss::temporary_buffer(chunk_size)); - } - return b; -} - -inline demo::simple_request -gen_simple_request(size_t data_size, size_t chunk_size) { - const std::size_t chunks = data_size / chunk_size; - return demo::simple_request{.data = rand_iobuf(chunks, chunk_size)}; -} - -inline interspersed_request -gen_interspersed_request(size_t data_size, size_t chunk_size) { - const std::size_t chunks = data_size / chunk_size / 8; - // clang-format off - return interspersed_request{ - .data = interspersed_request:: - payload{._one = i1{.y = rand_iobuf(chunks, chunk_size)}, - ._two = i2{.x = i1{.y = rand_iobuf(chunks, chunk_size)}, - .y = rand_iobuf(chunks, chunk_size)}, - ._three = i3{.x = i2{.x = i1{.y = rand_iobuf( - chunks, chunk_size)}, - .y = rand_iobuf(chunks, chunk_size)}, - .y = rand_iobuf(chunks, chunk_size)}}, - .x = rand_iobuf(chunks, chunk_size), - .y = rand_iobuf(chunks, chunk_size)}; - // clang-format on -} - -} // namespace demo diff --git a/src/v/rpc/demo/gen.json b/src/v/rpc/demo/gen.json deleted file mode 100644 index 3f5e2ef73228..000000000000 --- a/src/v/rpc/demo/gen.json +++ /dev/null @@ -1,25 +0,0 @@ -{ - "namespace": "demo", - "service_name": "simple", - "includes": [ - "rpc/demo/types.h" - ], - "methods": [ - { - "name": "put", - "input_type": "simple_request", - "output_type": "simple_reply" - }, - { - "name": "put_complex", - "input_type": "complex_request", - "output_type": "complex_reply" - }, - { - "name": "put_interspersed", - "input_type": "interspersed_request", - "output_type": "interspersed_reply" - } - - ] -} diff --git a/src/v/rpc/demo/server.cc b/src/v/rpc/demo/server.cc deleted file mode 100644 index c30a94ff8a78..000000000000 --- a/src/v/rpc/demo/server.cc +++ /dev/null @@ -1,116 +0,0 @@ -// Copyright 2020 Redpanda Data, Inc. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.md -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0 - -#include "net/server.h" - -#include "rpc/demo/demo_utils.h" -#include "rpc/demo/simple_service.h" -#include "rpc/simple_protocol.h" -#include "syschecks/syschecks.h" -#include "vlog.h" - -#include -#include -#include -#include - -#include - -static ss::logger lgr{"demo server"}; - -struct service final : demo::simple_service { - using demo::simple_service::simple_service; - ss::future - put(demo::simple_request&&, rpc::streaming_context&) final { - return ss::make_ready_future( - demo::simple_reply{{}}); - } - ss::future - put_complex(demo::complex_request&&, rpc::streaming_context&) final { - return ss::make_ready_future( - demo::complex_reply{{}}); - } - ss::future put_interspersed( - demo::interspersed_request&&, rpc::streaming_context&) final { - return ss::make_ready_future( - demo::interspersed_reply{{}}); - } -}; - -void cli_opts(boost::program_options::options_description_easy_init o) { - namespace po = boost::program_options; - o("ip", - po::value()->default_value("127.0.0.1"), - "ip to connect to"); - o("port", po::value()->default_value(20776), "port for service"); - o("key", - po::value()->default_value(""), - "key for TLS seccured connection"); - o("cert", - po::value()->default_value(""), - "cert for TLS seccured connection"); -} - -int main(int args, char** argv, char** env) { - syschecks::initialize_intrinsics(); - std::setvbuf(stdout, nullptr, _IOLBF, 1024); - ss::sharded serv; - ss::app_template app; - cli_opts(app.add_options()); - return app.run_deprecated(args, argv, [&] { - ss::engine().at_exit([&serv] { - // clang-format off - return ss::do_with(hdr_hist{}, [&serv](hdr_hist& h) { - auto begin = boost::make_counting_iterator(uint32_t(0)); - auto end = boost::make_counting_iterator(uint32_t(ss::smp::count)); - return ss::do_for_each(begin, end, [&h, &serv](uint32_t i) { - return serv.invoke_on(i, [&h](const net::server& s) { - h += s.histogram(); - }); - }) .then([&h] { return write_histogram("server.hdr", h); }); - }).then([&serv] { return serv.stop(); }); - // clang-format on - }); - auto& cfg = app.configuration(); - return ss::async([&] { - net::server_configuration scfg("demo_rpc"); - auto key = cfg["key"].as(); - auto cert = cfg["cert"].as(); - ss::shared_ptr creds; - if (key != "" && cert != "") { - auto builder = ss::tls::credentials_builder(); - builder.set_dh_level(ss::tls::dh_params::level::MEDIUM); - builder - .set_x509_key_file(cert, key, ss::tls::x509_crt_format::PEM) - .get(); - creds = builder.build_reloadable_server_credentials().get0(); - } - scfg.addrs.emplace_back( - ss::socket_address(ss::ipv4_addr( - cfg["ip"].as(), cfg["port"].as())), - creds); - scfg.max_service_memory_per_core - = ss::memory::stats().total_memory() * .9 /*90%*/; - - serv.start(scfg).get(); - vlog(lgr.info, "registering service on all cores"); - serv - .invoke_on_all([](net::server& s) { - auto proto = std::make_unique(); - proto->register_service( - ss::default_scheduling_group(), - ss::default_smp_service_group()); - s.set_protocol(std::move(proto)); - }) - .get(); - vlog(lgr.info, "Invoking rpc start on all cores"); - serv.invoke_on_all(&net::server::start).get(); - }); - }); -} diff --git a/src/v/rpc/demo/type_tests.cc b/src/v/rpc/demo/type_tests.cc deleted file mode 100644 index 228559f39262..000000000000 --- a/src/v/rpc/demo/type_tests.cc +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright 2020 Redpanda Data, Inc. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.md -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0 - -#include "reflection/adl.h" -#include "rpc/demo/demo_utils.h" -#include "rpc/demo/types.h" - -#include -#include - -using namespace demo; // NOLINT - -SEASTAR_THREAD_TEST_CASE(roundtrip_interspersed) { - auto b = reflection::to_iobuf(gen_interspersed_request(1 << 20, 1 << 15)); - BOOST_REQUIRE_EQUAL(b.size_bytes(), (1 << 20) + 80 /*80bytes overhead*/); - auto expected = reflection::adl{}.from(std::move(b)); - BOOST_REQUIRE_EQUAL(expected.data._three.y.size_bytes(), (1 << 20) / 8); -} diff --git a/src/v/rpc/demo/types.h b/src/v/rpc/demo/types.h deleted file mode 100644 index 66cf48d2786f..000000000000 --- a/src/v/rpc/demo/types.h +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright 2020 Redpanda Data, Inc. - * - * Use of this software is governed by the Business Source License - * included in the file licenses/BSL.md - * - * As of the Change Date specified in that file, in accordance with - * the Business Source License, use of this software will be governed - * by the Apache License, Version 2.0 - */ - -#pragma once - -#include "bytes/iobuf.h" - -namespace demo { -struct simple_request { - iobuf data; -}; -struct simple_reply { - iobuf data; -}; - -struct c1 { - int32_t a{42}, b{1}, c{66}, d{-8}; -}; -struct c2 { - c1 _a; - int32_t b{1}, c{66}, d{-8}, e{0}; -}; -struct c3 { - c2 _a; - int32_t b{1}, c{66}, d{-8}, e{0}; -}; -struct c4 { - c3 _a; - int32_t b{1}, c{66}, d{-8}, e{0}; -}; -struct c5 { - c4 _a; - int32_t b{1}, c{66}, d{-8}, e{0}; -}; - -struct complex_request { - struct payload { - c1 _one; // 4 fields - c2 _two; // 8 fields - c3 _three; // 16 fields - c4 _four; // 32 fields - c5 _five; // 64 fields - }; - // 128 fields - payload data; -}; -struct complex_reply { - int32_t x{-1}; -}; - -struct i1 { - c1 x; - iobuf y; -}; -struct i2 { - i1 x; - iobuf y; -}; - -struct i3 { - i2 x; - iobuf y; -}; -struct interspersed_request { - struct payload { - i1 _one; - i2 _two; - i3 _three; - }; - payload data; - iobuf x; - iobuf y; -}; -struct interspersed_reply { - int32_t x{-1}; -}; - -} // namespace demo