From ff860d081b25cfa367992ae6528959f731acb9fe Mon Sep 17 00:00:00 2001 From: Noah Watkins Date: Tue, 5 Jul 2022 18:25:06 -0700 Subject: [PATCH 1/2] raft: remove tron Signed-off-by: Noah Watkins --- src/v/raft/CMakeLists.txt | 1 - src/v/raft/tron/CMakeLists.txt | 27 --- src/v/raft/tron/client.cc | 216 ------------------- src/v/raft/tron/gen.json | 16 -- src/v/raft/tron/logger.cc | 14 -- src/v/raft/tron/logger.h | 20 -- src/v/raft/tron/server.cc | 365 --------------------------------- src/v/raft/tron/service.h | 74 ------- src/v/raft/tron/types.h | 25 --- 9 files changed, 758 deletions(-) delete mode 100644 src/v/raft/tron/CMakeLists.txt delete mode 100644 src/v/raft/tron/client.cc delete mode 100644 src/v/raft/tron/gen.json delete mode 100644 src/v/raft/tron/logger.cc delete mode 100644 src/v/raft/tron/logger.h delete mode 100644 src/v/raft/tron/server.cc delete mode 100644 src/v/raft/tron/service.h delete mode 100644 src/v/raft/tron/types.h diff --git a/src/v/raft/CMakeLists.txt b/src/v/raft/CMakeLists.txt index ae39a8d15cb7..8adc724f6c72 100644 --- a/src/v/raft/CMakeLists.txt +++ b/src/v/raft/CMakeLists.txt @@ -41,7 +41,6 @@ v_cc_library( v::model ) add_subdirectory(tests) -add_subdirectory(tron) add_subdirectory(kvelldb) set_source_files_properties( diff --git a/src/v/raft/tron/CMakeLists.txt b/src/v/raft/tron/CMakeLists.txt deleted file mode 100644 index 54009ac02c3a..000000000000 --- a/src/v/raft/tron/CMakeLists.txt +++ /dev/null @@ -1,27 +0,0 @@ -rpcgen( - TARGET tron_gen - IN_FILE ${CMAKE_CURRENT_SOURCE_DIR}/gen.json - OUT_FILE ${CMAKE_CURRENT_BINARY_DIR}/trongen_service.h - INCLUDES ${CMAKE_BINARY_DIR}/src/v - ) -# server -add_executable(srvtron - server.cc - logger.cc - ) -target_link_libraries(srvtron PUBLIC tron_gen v::raft) -set_property(TARGET srvtron PROPERTY POSITION_INDEPENDENT_CODE ON) -if(CMAKE_BUILD_TYPE MATCHES Release) - include(CheckIPOSupported) - check_ipo_supported(RESULT ltosupported OUTPUT error) - if(ltosupported) - set_property(TARGET srvtron PROPERTY INTERPROCEDURAL_OPTIMIZATION ON) - endif() -endif() -# client -add_executable(tron - client.cc - logger.cc - ) -target_link_libraries(tron PUBLIC tron_gen v::raft) -set_property(TARGET tron PROPERTY POSITION_INDEPENDENT_CODE ON) diff --git a/src/v/raft/tron/client.cc b/src/v/raft/tron/client.cc deleted file mode 100644 index ff15e13a2acc..000000000000 --- a/src/v/raft/tron/client.cc +++ /dev/null @@ -1,216 +0,0 @@ -// Copyright 2020 Redpanda Data, Inc. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.md -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0 - -#include "model/record_batch_reader.h" -#include "model/timeout_clock.h" -#include "raft/tron/logger.h" -#include "raft/tron/service.h" -#include "random/generators.h" -#include "reflection/adl.h" -#include "storage/record_batch_builder.h" -#include "syschecks/syschecks.h" -#include "utils/hdr_hist.h" -#include "vlog.h" - -#include -#include -#include -#include -#include - -#include - -auto& tronlog = raft::tron::tronlog; -namespace ch = std::chrono; - -void cli_opts(boost::program_options::options_description_easy_init o) { - namespace po = boost::program_options; - o("ip", - po::value()->default_value("127.0.0.1"), - "ip to connect to"); - o("port", po::value()->default_value(20776), "port for service"); - o("concurrency", - po::value()->default_value(1), - "number of concurrent requests per TCP connection"); - o("key-size", - po::value()->default_value(100), - "size in bytes"); - o("value-size", - po::value()->default_value(100), - "size in bytes"); - o("parallelism", - po::value()->default_value(1), - "number of TCP connections per core"); - o("ca-cert", - po::value()->default_value(""), - "CA root certificate"); -} - -struct load_gen_cfg { - std::size_t key_size; - std::size_t value_size; - std::size_t concurrency; - std::size_t parallelism; - rpc::transport_configuration client_cfg; - ss::sharded* hist; -}; - -inline std::ostream& operator<<(std::ostream& o, const load_gen_cfg& cfg) { - return o << "{'key_size':" << cfg.key_size - << ", 'value_size':" << cfg.value_size - << ", 'concurrency':" << cfg.concurrency - << ", 'parallelism':" << cfg.parallelism << "}"; -} - -// 1. creates cfg.parallelism number of TCP connections -// 2. launches cfg.concurrency * parallelism number of requests -class client_loadgen { -public: - using cli = rpc::client; - client_loadgen(load_gen_cfg cfg) - : _cfg(std::move(cfg)) - , _mem(ss::memory::stats().total_memory() * .9) { - vlog(tronlog.debug, "Mem for loadgen: {}", _mem.available_units()); - for (std::size_t i = 0; i < _cfg.parallelism; ++i) { - _clients.push_back(std::make_unique(_cfg.client_cfg)); - } - } - ss::future<> execute_loadgen() { - return ss::parallel_for_each( - _clients.begin(), _clients.end(), [this](auto& c) { - return ss::parallel_for_each( - boost::irange(std::size_t(0), _cfg.concurrency), - [this, &c](std::size_t) mutable { return execute_one(c); }); - }); - } - ss::future<> connect() { - return ss::parallel_for_each( - _clients.begin(), _clients.end(), [](auto& c) { - return c->connect(model::no_timeout); - }); - } - ss::future<> stop() { - return ss::parallel_for_each( - _clients.begin(), _clients.end(), [](auto& c) { return c->stop(); }); - } - -private: - ss::future<> execute_one(std::unique_ptr& c) { - auto mem_sz = _cfg.key_size + _cfg.value_size + 20; - return with_semaphore(_mem, mem_sz, [this, &c] { - return c - ->replicate( - gen_entry(), - rpc::client_opts(rpc::client_opts(rpc::no_timeout))) - .then_wrapped([m = _cfg.hist->local().auto_measure()](auto f) { - try { - (void)f.get0(); - } catch (...) { - vlog( - tronlog.info, - "Error sending payload:{}", - std::current_exception()); - } - }); - }); - } - - model::record_batch_reader gen_entry() { - ss::circular_buffer batches; - batches.reserve(1); - batches.push_back(data_batch()); - return model::make_memory_record_batch_reader(std::move(batches)); - } - model::record_batch data_batch() { - storage::record_batch_builder bldr( - model::record_batch_type::raft_data, _offset_index); - bldr.add_raw_kv(rand_iobuf(_cfg.key_size), rand_iobuf(_cfg.value_size)); - ++_offset_index; - return std::move(bldr).build(); - } - iobuf rand_iobuf(size_t n) const { - iobuf b; - auto data = random_generators::gen_alphanum_string(n); - b.append(data.data(), data.size()); - return b; - } - - model::offset _offset_index{0}; - load_gen_cfg _cfg; - ss::semaphore _mem; - std::vector> _clients; -}; - -inline load_gen_cfg cfg_from_opts_in_thread( - boost::program_options::variables_map& m, ss::sharded* h) { - rpc::transport_configuration client_cfg; - client_cfg.server_addr = net::unresolved_address( - m["ip"].as(), m["port"].as()); - auto ca_cert = m["ca-cert"].as(); - if (ca_cert != "") { - auto builder = ss::tls::credentials_builder(); - // FIXME - // builder.set_dh_level(tls::dh_params::level::MEDIUM); - vlog(tronlog.info, "Using {} as CA root certificate", ca_cert); - builder.set_x509_trust_file(ca_cert, ss::tls::x509_crt_format::PEM) - .get0(); - client_cfg.credentials - = builder.build_reloadable_certificate_credentials().get0(); - } - client_cfg.max_queued_bytes = ss::memory::stats().total_memory() * .8; - return load_gen_cfg{ - .key_size = m["key-size"].as(), - .value_size = m["value-size"].as(), - .concurrency = m["concurrency"].as(), - .parallelism = m["parallelism"].as(), - .client_cfg = std::move(client_cfg), - .hist = h}; -} - -inline hdr_hist aggregate_in_thread(ss::sharded& h) { - hdr_hist retval; - for (auto i = 0; i < ss::smp::count; ++i) { - h.invoke_on(i, [&retval](const hdr_hist& o) { retval += o; }).get(); - } - return retval; -} -void write_latency_in_thread(ss::sharded& hist) { - auto h = aggregate_in_thread(hist); - // write to file instead - std::cout << std::endl << h << std::endl; -} - -int main(int args, char** argv, char** env) { - syschecks::initialize_intrinsics(); - std::setvbuf(stdout, nullptr, _IOLBF, 1024); - ss::app_template app; - cli_opts(app.add_options()); - ss::sharded client; - ss::sharded hist; - return app.run(args, argv, [&] { - return ss::async([&] { - auto& cfg = app.configuration(); - vlog(tronlog.info, "constructing histogram"); - hist.start().get(); - auto hd = ss::defer([&hist] { hist.stop().get(); }); - const load_gen_cfg lcfg = cfg_from_opts_in_thread(cfg, &hist); - vlog(tronlog.info, "config:{}", lcfg); - vlog(tronlog.info, "constructing client"); - client.start(lcfg).get(); - auto cd = ss::defer([&client] { client.stop().get(); }); - vlog(tronlog.info, "connecting clients"); - client.invoke_on_all(&client_loadgen::connect).get(); - vlog(tronlog.info, "invoking loadgen"); - client.invoke_on_all(&client_loadgen::execute_loadgen).get(); - vlog(tronlog.info, "writing results"); - write_latency_in_thread(hist); - vlog(tronlog.info, "stopping"); - }); - }); -} diff --git a/src/v/raft/tron/gen.json b/src/v/raft/tron/gen.json deleted file mode 100644 index 9a48d66cd502..000000000000 --- a/src/v/raft/tron/gen.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "namespace": "raft::tron", - "service_name": "trongen", - "includes": ["raft/tron/types.h"], - "methods": [{ - "name": "stats", - "input_type": "stats_request", - "output_type": "stats_reply" - }, - { - "name": "replicate", - "input_type": "model::record_batch_reader", - "output_type": "put_reply" - } - ] -} diff --git a/src/v/raft/tron/logger.cc b/src/v/raft/tron/logger.cc deleted file mode 100644 index ad1f8056f244..000000000000 --- a/src/v/raft/tron/logger.cc +++ /dev/null @@ -1,14 +0,0 @@ -// Copyright 2020 Redpanda Data, Inc. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.md -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0 - -#include "raft/tron/logger.h" - -namespace raft::tron { -ss::logger tronlog{"tron"}; -} // namespace raft::tron diff --git a/src/v/raft/tron/logger.h b/src/v/raft/tron/logger.h deleted file mode 100644 index 7e9b0598e3d2..000000000000 --- a/src/v/raft/tron/logger.h +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright 2020 Redpanda Data, Inc. - * - * Use of this software is governed by the Business Source License - * included in the file licenses/BSL.md - * - * As of the Change Date specified in that file, in accordance with - * the Business Source License, use of this software will be governed - * by the Apache License, Version 2.0 - */ - -#pragma once - -#include "seastarx.h" - -#include - -namespace raft::tron { -extern ss::logger tronlog; -} // namespace raft::tron diff --git a/src/v/raft/tron/server.cc b/src/v/raft/tron/server.cc deleted file mode 100644 index 6f87eeada058..000000000000 --- a/src/v/raft/tron/server.cc +++ /dev/null @@ -1,365 +0,0 @@ -// Copyright 2020 Redpanda Data, Inc. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.md -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0 - -#include "net/server.h" - -#include "config/configuration.h" -#include "net/unresolved_address.h" -#include "platform/stop_signal.h" -#include "raft/consensus.h" -#include "raft/consensus_client_protocol.h" -#include "raft/heartbeat_manager.h" -#include "raft/logger.h" -#include "raft/rpc_client_protocol.h" -#include "raft/service.h" -#include "raft/tron/logger.h" -#include "raft/tron/service.h" -#include "raft/types.h" -#include "rpc/connection_cache.h" -#include "rpc/simple_protocol.h" -#include "storage/api.h" -#include "storage/logger.h" -#include "syschecks/syschecks.h" -#include "utils/hdr_hist.h" -#include "vlog.h" - -#include -#include -#include -#include -#include - -#include -#include -#include - -#include -#include -#include - -using namespace std::chrono_literals; // NOLINT - -auto& tronlog = raft::tron::tronlog; -namespace po = boost::program_options; // NOLINT - -void cli_opts(po::options_description_easy_init o) { - o("ip", - po::value()->default_value("127.0.0.1"), - "ip to listen to"); - o("workdir", - po::value()->default_value("."), - "work directory"); - o("peers", - po::value>()->multitoken(), - "--peers 1,127.0.0.1:11215 \n --peers 2,127.0.0.0.1:11216"); - o("port", po::value()->default_value(20776), "port for service"); - o("heartbeat-timeout-ms", - po::value()->default_value(100), - "raft heartbeat timeout in milliseconds"); - o("node-id", po::value(), "node-id required"); - o("key", - po::value()->default_value(""), - "key for TLS seccured connection"); - o("cert", - po::value()->default_value(""), - "cert for TLS seccured connection"); -} - -struct simple_shard_lookup { - ss::shard_id shard_for(raft::group_id g) { return g() % ss::smp::count; } - bool contains(raft::group_id g) { return true; } -}; - -class simple_group_manager { -public: - simple_group_manager( - model::node_id self, - ss::sstring directory, - std::chrono::milliseconds raft_heartbeat_interval, - ss::sharded& clients) - : _self(self) - , _consensus_client_protocol( - raft::make_rpc_client_protocol(self, clients)) - , _storage( - [directory]() { - return storage::kvstore_config( - 1_MiB, - config::mock_binding(10ms), - directory, - storage::debug_sanitize_files::yes); - }, - [directory]() { - return storage::log_config( - storage::log_config::storage_type::disk, - std::move(directory), - 1_GiB, - storage::debug_sanitize_files::yes); - }) - , _hbeats( - raft_heartbeat_interval, - _consensus_client_protocol, - self, - raft_heartbeat_interval * 20) - , _recovery_memory_quota([] { - return raft::recovery_memory_quota::configuration{ - .max_recovery_memory = config::mock_binding>( - std::nullopt), - .default_read_buffer_size = config::mock_binding(512_KiB), - }; - }) {} - - ss::lw_shared_ptr consensus_for(raft::group_id) { - return _consensus; - } - - ss::future<> start(raft::group_configuration init_cfg) { - return _storage.start() - .then([this, init_cfg = std::move(init_cfg)]() mutable { - return _storage.log_mgr() - .manage(storage::ntp_config( - _ntp, _storage.log_mgr().config().base_dir)) - .then( - [this, cfg = std::move(init_cfg)](storage::log log) mutable { - _consensus = ss::make_lw_shared( - _self, - raft::group_id(66), - std::move(cfg), - raft::timeout_jitter( - config::shard_local_cfg().raft_election_timeout_ms()), - log, - raft::scheduling_config( - seastar::default_scheduling_group(), - seastar::default_priority_class()), - std::chrono::seconds(1), - _consensus_client_protocol, - [this](raft::leadership_status st) { - if (!st.current_leader) { - vlog( - tronlog.info, - "No leader in group {}", - st.group); - return; - } - vlog( - tronlog.info, - "New leader {} elected in group {}", - st.current_leader.value(), - st.group); - }, - _storage, - std::nullopt, - _recovery_memory_quota); - return _consensus->start().then( - [this] { return _hbeats.register_group(_consensus); }); - }); - }) - .then([this] { return _hbeats.start(); }); - } - ss::future<> stop() { - return _consensus->stop() - .then([this] { return _hbeats.stop(); }) - .then([this] { return _storage.stop(); }); - } - -private: - model::node_id _self; - raft::consensus_client_protocol _consensus_client_protocol; - storage::api _storage; - raft::heartbeat_manager _hbeats; - raft::recovery_memory_quota _recovery_memory_quota; - model::ntp _ntp{ - model::ns("master_control_program"), - model::topic("tron"), - model::partition_id(ss::this_shard_id())}; - ss::lw_shared_ptr _consensus; -}; - -static std::pair -extract_peer(ss::sstring peer) { - std::vector parts; - parts.reserve(2); - boost::split(parts, peer, boost::is_any_of(",")); - if (parts.size() != 2) { - throw std::runtime_error(fmt::format("Could not parse peer:{}", peer)); - } - int32_t n = boost::lexical_cast(parts[0]); - rpc::transport_configuration cfg; - std::vector address_parts; - boost::split(parts, parts[1], boost::is_any_of(":")); - cfg.server_addr = net::unresolved_address( - address_parts[0], boost::lexical_cast(address_parts[1])); - return {model::node_id(n), cfg}; -} - -static void initialize_connection_cache_in_thread( - model::node_id self, - ss::sharded& cache, - std::vector opts) { - for (auto& i : opts) { - auto [node, cfg] = extract_peer(i); - for (ss::shard_id i = 0; i < ss::smp::count; ++i) { - auto shard = rpc::connection_cache::shard_for(self, i, node); - ss::smp::submit_to(shard, [&cache, shard, n = node, config = cfg] { - return cache.local().emplace( - n, - config, - rpc::make_exponential_backoff_policy( - std::chrono::seconds(1), std::chrono::seconds(60))); - }).get(); - } - } -} - -static model::broker broker_from_arg(ss::sstring peer) { - std::vector parts; - parts.reserve(2); - boost::split(parts, peer, boost::is_any_of(",")); - if (parts.size() != 2) { - throw std::runtime_error(fmt::format("Could not parse peer:{}", peer)); - } - int32_t id = boost::lexical_cast(parts[0]); - std::vector host_port; - host_port.reserve(2); - boost::split(host_port, parts[1], boost::is_any_of(":")); - if (host_port.size() != 2) { - throw std::runtime_error(fmt::format("Could not host:{}", parts[1])); - } - auto port = boost::lexical_cast(parts[0]); - return model::broker( - model::node_id(id), - net::unresolved_address(host_port[0], port), - net::unresolved_address(host_port[0], port), - std::nullopt, - model::broker_properties{.cores = ss::smp::count}); -} - -static raft::group_configuration -group_cfg_from_args(const po::variables_map& opts) { - std::vector brokers; - if (opts.find("peers") != opts.end()) { - auto peers = opts["peers"].as>(); - for (auto& arg : peers) { - brokers.push_back(broker_from_arg(arg)); - } - } - // add self - brokers.push_back(model::broker( - model::node_id(opts["node-id"].as()), - net::unresolved_address( - opts["ip"].as(), opts["port"].as()), - net::unresolved_address( - opts["ip"].as(), opts["port"].as()), - std::nullopt, - model::broker_properties{ - .cores = ss::smp::count, - })); - return raft::group_configuration(std::move(brokers), model::revision_id(0)); -} - -int main(int args, char** argv, char** env) { - syschecks::initialize_intrinsics(); - std::setvbuf(stdout, nullptr, _IOLBF, 1024); - ss::sharded serv; - ss::sharded connection_cache; - ss::sharded group_manager; - ss::app_template app; - cli_opts(app.add_options()); - return app.run(args, argv, [&] { - return ss::async([&] { -#ifndef NDEBUG - std::cout.setf(std::ios::unitbuf); -#endif - raft::raftlog.trace("ack"); - storage::stlog.trace("ack"); - stop_signal app_signal; - auto& cfg = app.configuration(); - connection_cache.start().get(); - auto ccd = ss::defer( - [&connection_cache] { connection_cache.stop().get(); }); - net::server_configuration scfg("tron_rpc"); - - scfg.max_service_memory_per_core - = ss::memory::stats().total_memory() * .7; - auto key = cfg["key"].as(); - auto cert = cfg["cert"].as(); - ss::shared_ptr credentials; - if (key != "" && cert != "") { - auto builder = ss::tls::credentials_builder(); - builder.set_dh_level(ss::tls::dh_params::level::MEDIUM); - builder - .set_x509_key_file(cert, key, ss::tls::x509_crt_format::PEM) - .get(); - credentials - = builder.build_reloadable_server_credentials().get0(); - } - scfg.addrs.emplace_back( - ss::socket_address( - ss::net::inet_address(cfg["ip"].as()), - cfg["port"].as()), - credentials); - auto self_id = cfg["node-id"].as(); - if (cfg.find("peers") != cfg.end()) { - initialize_connection_cache_in_thread( - model::node_id(self_id), - connection_cache, - cfg["peers"].as>()); - } - const ss::sstring workdir = ssx::sformat( - "{}/greetings-{}", - cfg["workdir"].as(), - cfg["node-id"].as()); - vlog(tronlog.info, "Work directory:{}", workdir); - - auto hbeat_interval = std::chrono::milliseconds( - cfg["heartbeat-timeout-ms"].as()); - // initialize group_manager - group_manager - .start( - model::node_id(cfg["node-id"].as()), - workdir, - hbeat_interval, - std::ref(connection_cache)) - .get(); - serv.start(scfg).get(); - auto dserv = ss::defer([&serv] { serv.stop().get(); }); - vlog(tronlog.info, "registering service on all cores"); - simple_shard_lookup shard_table; - serv - .invoke_on_all( - [&shard_table, &group_manager, hbeat_interval](net::server& s) { - auto proto = std::make_unique(); - proto->register_service>( - ss::default_scheduling_group(), - ss::default_smp_service_group(), - group_manager, - shard_table); - proto->register_service< - raft::service>( - ss::default_scheduling_group(), - ss::default_smp_service_group(), - group_manager, - shard_table, - hbeat_interval); - s.set_protocol(std::move(proto)); - }) - .get(); - vlog(tronlog.info, "Invoking rpc start on all cores"); - serv.invoke_on_all(&net::server::start).get(); - vlog(tronlog.info, "Starting group manager"); - group_manager - .invoke_on_all([&cfg](simple_group_manager& m) { - return m.start(group_cfg_from_args(cfg)); - }) - .get(); - app_signal.wait().get(); - }); - }); -} diff --git a/src/v/raft/tron/service.h b/src/v/raft/tron/service.h deleted file mode 100644 index 783370143f10..000000000000 --- a/src/v/raft/tron/service.h +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright 2020 Redpanda Data, Inc. - * - * Use of this software is governed by the Business Source License - * included in the file licenses/BSL.md - * - * As of the Change Date specified in that file, in accordance with - * the Business Source License, use of this software will be governed - * by the Apache License, Version 2.0 - */ - -#pragma once - -#include "model/record_batch_reader.h" -#include "raft/service.h" -#include "raft/tron/logger.h" -#include "raft/tron/trongen_service.h" -#include "raft/tron/types.h" -#include "raft/types.h" -#include "seastarx.h" -#include "ssx/sformat.h" - -namespace raft::tron { -template -requires raft::RaftGroupManager && raft::ShardLookupManager< - ShardLookup> -struct service final : trongen_service { - service( - ss::scheduling_group sc, - ss::smp_service_group ssg, - ss::sharded& mngr, - ShardLookup& tbl) - : trongen_service(sc, ssg) - , _group_manager(mngr) - , _shard_table(tbl) {} - ss::future - stats(stats_request&&, rpc::streaming_context&) final { - return ss::make_ready_future(stats_reply{}); - } - ss::future - replicate(model::record_batch_reader&& r, rpc::streaming_context&) final { - auto shard = _shard_table.shard_for(raft::group_id(66)); - return with_scheduling_group( - get_scheduling_group(), [this, shard, r = std::move(r)]() mutable { - return _group_manager.invoke_on( - shard, - get_smp_service_group(), - [this, r = std::move(r)](ConsensusManager& m) mutable { - return m.consensus_for(group_id(66)) - ->replicate( - std::move(r), - raft::replicate_options( - raft::consistency_level::quorum_ack)) - .then_wrapped([](ss::future> f) { - put_reply ret; - try { - f.get(); - ret.success = true; - } catch (...) { - ret.failure_reason = ssx::sformat( - "{}", std::current_exception()); - tronlog.error( - "failed to replicate: {}", ret.failure_reason); - } - return ret; - }); - }); - }); - } - ss::sharded& _group_manager; - ShardLookup& _shard_table; -}; - -} // namespace raft::tron diff --git a/src/v/raft/tron/types.h b/src/v/raft/tron/types.h deleted file mode 100644 index 09a2a2166ae4..000000000000 --- a/src/v/raft/tron/types.h +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright 2020 Redpanda Data, Inc. - * - * Use of this software is governed by the Business Source License - * included in the file licenses/BSL.md - * - * As of the Change Date specified in that file, in accordance with - * the Business Source License, use of this software will be governed - * by the Apache License, Version 2.0 - */ - -#pragma once - -#include "bytes/iobuf.h" -#include "model/async_adl_serde.h" -#include "raft/types.h" - -namespace raft::tron { -struct stats_request {}; -struct stats_reply {}; -struct put_reply { - bool success; - ss::sstring failure_reason; -}; -} // namespace raft::tron From 9d6c7b5da8e30db723ff98069a9f123d1ccc0f0e Mon Sep 17 00:00:00 2001 From: Noah Watkins Date: Tue, 5 Jul 2022 18:25:41 -0700 Subject: [PATCH 2/2] rpc: remove demo Signed-off-by: Noah Watkins --- src/v/rpc/CMakeLists.txt | 1 - src/v/rpc/demo/CMakeLists.txt | 33 ---- src/v/rpc/demo/client.cc | 291 ---------------------------------- src/v/rpc/demo/demo_utils.h | 83 ---------- src/v/rpc/demo/gen.json | 25 --- src/v/rpc/demo/server.cc | 116 -------------- src/v/rpc/demo/type_tests.cc | 24 --- src/v/rpc/demo/types.h | 86 ---------- 8 files changed, 659 deletions(-) delete mode 100644 src/v/rpc/demo/CMakeLists.txt delete mode 100644 src/v/rpc/demo/client.cc delete mode 100644 src/v/rpc/demo/demo_utils.h delete mode 100644 src/v/rpc/demo/gen.json delete mode 100644 src/v/rpc/demo/server.cc delete mode 100644 src/v/rpc/demo/type_tests.cc delete mode 100644 src/v/rpc/demo/types.h diff --git a/src/v/rpc/CMakeLists.txt b/src/v/rpc/CMakeLists.txt index 5f7bb440d097..3c68de0d0195 100644 --- a/src/v/rpc/CMakeLists.txt +++ b/src/v/rpc/CMakeLists.txt @@ -18,4 +18,3 @@ v_cc_library( v::net ) add_subdirectory(test) -add_subdirectory(demo) diff --git a/src/v/rpc/demo/CMakeLists.txt b/src/v/rpc/demo/CMakeLists.txt deleted file mode 100644 index 90d5514e27ba..000000000000 --- a/src/v/rpc/demo/CMakeLists.txt +++ /dev/null @@ -1,33 +0,0 @@ -include(rpcgen) -rpcgen( - TARGET demo_gen - IN_FILE ${CMAKE_CURRENT_SOURCE_DIR}/gen.json - OUT_FILE ${CMAKE_CURRENT_BINARY_DIR}/simple_service.h - INCLUDES ${CMAKE_BINARY_DIR}/src/v - ) - -add_executable(demo_server server.cc) -target_link_libraries(demo_server PUBLIC demo_gen v::model) -set_property(TARGET demo_server PROPERTY POSITION_INDEPENDENT_CODE ON) - -if(CMAKE_BUILD_TYPE MATCHES Release) - include(CheckIPOSupported) - check_ipo_supported(RESULT ltosupported OUTPUT error) - if(ltosupported) - set_property(TARGET demo_server PROPERTY INTERPROCEDURAL_OPTIMIZATION ON) - endif() -endif() - -# client -add_executable(demo_client client.cc) -target_link_libraries(demo_client PUBLIC demo_gen v::model) -set_property(TARGET demo_client PROPERTY POSITION_INDEPENDENT_CODE ON) - - -rp_test( - UNIT_TEST - BINARY_NAME roundtrip_demo_types - SOURCES type_tests.cc - LIBRARIES v::seastar_testing_main demo_gen v::model - ARGS "-- -c 1" -) diff --git a/src/v/rpc/demo/client.cc b/src/v/rpc/demo/client.cc deleted file mode 100644 index 4021e2f551ea..000000000000 --- a/src/v/rpc/demo/client.cc +++ /dev/null @@ -1,291 +0,0 @@ -// Copyright 2020 Redpanda Data, Inc. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.md -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0 - -#include "model/timeout_clock.h" -#include "rpc/demo/demo_utils.h" -#include "rpc/demo/simple_service.h" -#include "rpc/types.h" -#include "syschecks/syschecks.h" -#include "utils/hdr_hist.h" -#include "vlog.h" - -#include -#include -#include -#include -#include - -#include - -namespace ch = std::chrono; -static ss::logger lgr{"demo client"}; - -void cli_opts(boost::program_options::options_description_easy_init o) { - namespace po = boost::program_options; - o("ip", - po::value()->default_value("127.0.0.1"), - "ip to connect to"); - o("port", po::value()->default_value(20776), "port for service"); - o("concurrency", - po::value()->default_value(1000), - "number of concurrent requests per TCP connection"); - o("data-size", - po::value()->default_value(1 << 20), - "1MB default data _per_ request"); - o("chunk-size", - po::value()->default_value(1 << 15), - "fragment data_size by this chunk size step (32KB default)"); - o("parallelism", - po::value()->default_value(2), - "number of TCP connections per core"); - o("test-case", - po::value()->default_value(1), - "1: large payload, 2: complex struct, 3: interspersed"); - o("ca-cert", - po::value()->default_value(""), - "CA root certificate"); -} - -struct load_gen_cfg { - std::size_t global_total_requests() const { - return core_total_requests() * ss::smp::count; - } - std::size_t global_size_test1() const { - return global_total_requests() * data_size; - } - std::size_t global_size_test2() const { - return global_total_requests() * sizeof(demo::complex_request); - } - std::size_t core_total_requests() const { - return parallelism * concurrency; - } - - std::size_t data_size; - std::size_t chunk_size; - std::size_t concurrency; - std::size_t parallelism; - std::size_t test_case; - rpc::transport_configuration client_cfg; - ss::sharded* hist; -}; - -inline std::ostream& operator<<(std::ostream& o, const load_gen_cfg& cfg) { - // make the output json-able so we can consume it in python for analysis - return o << "{'data_size':" << cfg.data_size - << ", 'chunk_size':" << cfg.chunk_size - << ", 'concurrency':" << cfg.concurrency - << ", 'parallelism':" << cfg.parallelism - << ", 'test_case':" << cfg.test_case - << ", 'max_queued_bytes_per_tcp':" - << cfg.client_cfg.max_queued_bytes - << ", 'global_test_1_data_size':" << cfg.global_size_test1() - << ", 'global_test_2_data_size':" << cfg.global_size_test2() - << ", 'global_requests':" << cfg.global_total_requests() - << ", 'cores':" << ss::smp::count << "}"; -} - -// 1. creates cfg.parallelism number of TCP connections -// 2. launches cfg.concurrency * parallelism number of requests -// 3. each request is cfg.data_size large -// 4. each cfg.data_size is split into cfg.chunk_size # of chunks -// 5. profit -class client_loadgen { -public: - using cli = rpc::client; - client_loadgen(load_gen_cfg cfg) - : _cfg(std::move(cfg)) - , _mem(ss::memory::stats().total_memory() * .9) { - lgr.debug("Mem for loadgen: {}", _mem.available_units()); - for (std::size_t i = 0; i < _cfg.parallelism; ++i) { - _clients.push_back(std::make_unique(_cfg.client_cfg)); - } - } - ss::future<> execute_loadgen() { - return ss::parallel_for_each( - _clients.begin(), _clients.end(), [this](auto& c) { - return ss::parallel_for_each( - boost::irange(std::size_t(0), _cfg.concurrency), - [this, &c](std::size_t) mutable { - return execute_one(c.get()); - }); - }); - } - ss::future<> connect() { - return ss::parallel_for_each( - _clients.begin(), _clients.end(), [](auto& c) { - return c->connect(model::no_timeout); - }); - } - ss::future<> stop() { - return ss::parallel_for_each( - _clients.begin(), _clients.end(), [](auto& c) { return c->stop(); }); - } - -private: - ss::future<> execute_one(cli* const c) { - if (_cfg.test_case < 1 || _cfg.test_case > 3) { - throw std::runtime_error(fmt::format( - "Unknown test:{}, bad config:{}", _cfg.test_case, _cfg)); - } - if (_cfg.test_case == 1) { - return get_units(_mem, _cfg.data_size) - .then([this, c](ss::semaphore_units<> u) { - return c - ->put( - demo::gen_simple_request(_cfg.data_size, _cfg.chunk_size), - rpc::client_opts(rpc::no_timeout)) - .then([m = _cfg.hist->local().auto_measure(), - u = std::move(u)](auto _) {}); - }); - } else if (_cfg.test_case == 2) { - return get_units(_mem, sizeof(demo::complex_request{})) - .then([this, &c](ss::semaphore_units<> u) { - return c - ->put_complex( - demo::complex_request{}, - rpc::client_opts(rpc::no_timeout)) - .then([m = _cfg.hist->local().auto_measure(), - u = std::move(u)](auto _) {}); - }); - } else if (_cfg.test_case == 3) { - return get_units(_mem, _cfg.data_size) - .then([this, &c](ss::semaphore_units<> u) { - auto r = demo::gen_interspersed_request( - _cfg.data_size, _cfg.chunk_size); - return c - ->put_interspersed( - std::move(r), rpc::client_opts(rpc::no_timeout)) - .then([m = _cfg.hist->local().auto_measure(), - u = std::move(u)](auto _) {}); - }); - } - __builtin_unreachable(); - } - - load_gen_cfg _cfg; - ss::semaphore _mem; - std::vector> _clients; -}; - -inline load_gen_cfg -cfg_from(boost::program_options::variables_map& m, ss::sharded* h) { - rpc::transport_configuration client_cfg; - client_cfg.server_addr = net::unresolved_address( - m["ip"].as(), m["port"].as()); - auto ca_cert = m["ca-cert"].as(); - if (ca_cert != "") { - auto builder = ss::tls::credentials_builder(); - // FIXME - // builder.set_dh_level(tls::dh_params::level::MEDIUM); - vlog(lgr.info, "Using {} as CA root certificate", ca_cert); - builder.set_x509_trust_file(ca_cert, ss::tls::x509_crt_format::PEM) - .get0(); - client_cfg.credentials - = builder.build_reloadable_certificate_credentials().get0(); - } - client_cfg.max_queued_bytes = ss::memory::stats().total_memory() * .8; - return load_gen_cfg{ - .data_size = m["data-size"].as(), - .chunk_size = m["chunk-size"].as(), - .concurrency = m["concurrency"].as(), - .parallelism = m["parallelism"].as(), - .test_case = m["test-case"].as(), - .client_cfg = std::move(client_cfg), - .hist = h}; -} - -class throughput { -public: - using time_t = std::chrono::steady_clock::time_point; - throughput(std::size_t total_requests) - : _total_requests(total_requests) - , _begin(now()) {} - double qps() const { - if (_end < _begin) { - throw std::runtime_error("call ::stop() first"); - } - double d = static_cast(duration_ms()); - d /= 1000.0; - return static_cast(_total_requests) / d; - } - void stop() { - if (_end > _begin) { - throw std::runtime_error("throughput time already stopped"); - } - _end = now(); - } - std::size_t duration_ms() const { - return ch::duration_cast(_end - _begin).count(); - } - -private: - time_t now() const { return std::chrono::steady_clock::now(); } - std::size_t _total_requests; - time_t _begin; - time_t _end; -}; - -inline std::ostream& operator<<(std::ostream& o, const throughput& t) { - return o << "{'qps':" << t.qps() << ",'duration_ms':" << t.duration_ms() - << "}"; -} - -inline hdr_hist aggregate_in_thread(ss::sharded& h) { - hdr_hist retval; - for (auto i = 0; i < ss::smp::count; ++i) { - h.invoke_on(i, [&retval](const hdr_hist& o) { retval += o; }).get(); - } - return retval; -} - -void write_configuration_in_thread( - const throughput& tp, const load_gen_cfg& cfg) { - std::ostringstream to; - to << "{'throughput':" << tp << ", 'config':" << cfg << "}"; - const ss::sstring s = to.str(); - force_write_ptr("test_config.json", s.data(), s.size()).get(); -} - -void write_latency_in_thread(ss::sharded& hist) { - auto h = aggregate_in_thread(hist); - write_histogram("clients.hdr", h).get(); -} - -int main(int args, char** argv, char** env) { - syschecks::initialize_intrinsics(); - std::setvbuf(stdout, nullptr, _IOLBF, 1024); - ss::app_template app; - cli_opts(app.add_options()); - ss::sharded client; - ss::sharded hist; - return app.run(args, argv, [&] { - auto& cfg = app.configuration(); - return ss::async([&] { - vlog(lgr.info, "constructing histogram"); - hist.start().get(); - auto hd = ss::defer([&hist] { hist.stop().get(); }); - const load_gen_cfg lcfg = cfg_from(cfg, &hist); - vlog(lgr.info, "config:{}", lcfg); - vlog(lgr.info, "constructing client"); - client.start(lcfg).get(); - auto cd = ss::defer([&client] { client.stop().get(); }); - vlog(lgr.info, "connecting clients"); - client.invoke_on_all(&client_loadgen::connect).get(); - auto tp = throughput(lcfg.global_total_requests()); - vlog(lgr.info, "invoking loadgen"); - client.invoke_on_all(&client_loadgen::execute_loadgen).get(); - tp.stop(); - vlog(lgr.info, "{}", tp); - vlog(lgr.info, "writing results"); - write_configuration_in_thread(tp, lcfg); - write_latency_in_thread(hist); - vlog(lgr.info, "stopping"); - }); - }); -} diff --git a/src/v/rpc/demo/demo_utils.h b/src/v/rpc/demo/demo_utils.h deleted file mode 100644 index 43edeec126fc..000000000000 --- a/src/v/rpc/demo/demo_utils.h +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Copyright 2020 Redpanda Data, Inc. - * - * Use of this software is governed by the Business Source License - * included in the file licenses/BSL.md - * - * As of the Change Date specified in that file, in accordance with - * the Business Source License, use of this software will be governed - * by the Apache License, Version 2.0 - */ - -#include "reflection/arity.h" -#include "reflection/for_each_field.h" -#include "rpc/demo/types.h" -#include "seastarx.h" -#include "ssx/sformat.h" -#include "utils/hdr_hist.h" - -#include -#include - -inline ss::future<> -force_write_ptr(ss::sstring filename, const char* ptr, std::size_t len) { - auto flags = ss::open_flags::rw | ss::open_flags::create - | ss::open_flags::truncate; - return open_file_dma(filename, flags) - .then( - [](ss::file f) { return ss::make_file_output_stream(std::move(f)); }) - .then([ptr, len](ss::output_stream o) mutable { - auto out = ss::make_lw_shared>(std::move(o)); - return out->write(ptr, len) - .then([out] { return out->flush(); }) - .then([out] { return out->close(); }) - .finally([out] {}); - }); -} - -inline ss::future<> -force_write_buffer(ss::sstring filename, ss::temporary_buffer b) { - const char* ptr = b.get(); - std::size_t len = b.size(); - return force_write_ptr(std::move(filename), ptr, len) - .then([b = std::move(b)] {}); -} - -inline ss::future<> write_histogram(ss::sstring filename, const hdr_hist& h) { - return force_write_buffer(std::move(filename), h.print_classic()); -} - -namespace demo { -inline iobuf rand_iobuf(std::size_t chunks, std::size_t chunk_size) { - iobuf b; - for (size_t i = 0; i < chunks; ++i) { - b.append(ss::temporary_buffer(chunk_size)); - } - return b; -} - -inline demo::simple_request -gen_simple_request(size_t data_size, size_t chunk_size) { - const std::size_t chunks = data_size / chunk_size; - return demo::simple_request{.data = rand_iobuf(chunks, chunk_size)}; -} - -inline interspersed_request -gen_interspersed_request(size_t data_size, size_t chunk_size) { - const std::size_t chunks = data_size / chunk_size / 8; - // clang-format off - return interspersed_request{ - .data = interspersed_request:: - payload{._one = i1{.y = rand_iobuf(chunks, chunk_size)}, - ._two = i2{.x = i1{.y = rand_iobuf(chunks, chunk_size)}, - .y = rand_iobuf(chunks, chunk_size)}, - ._three = i3{.x = i2{.x = i1{.y = rand_iobuf( - chunks, chunk_size)}, - .y = rand_iobuf(chunks, chunk_size)}, - .y = rand_iobuf(chunks, chunk_size)}}, - .x = rand_iobuf(chunks, chunk_size), - .y = rand_iobuf(chunks, chunk_size)}; - // clang-format on -} - -} // namespace demo diff --git a/src/v/rpc/demo/gen.json b/src/v/rpc/demo/gen.json deleted file mode 100644 index 3f5e2ef73228..000000000000 --- a/src/v/rpc/demo/gen.json +++ /dev/null @@ -1,25 +0,0 @@ -{ - "namespace": "demo", - "service_name": "simple", - "includes": [ - "rpc/demo/types.h" - ], - "methods": [ - { - "name": "put", - "input_type": "simple_request", - "output_type": "simple_reply" - }, - { - "name": "put_complex", - "input_type": "complex_request", - "output_type": "complex_reply" - }, - { - "name": "put_interspersed", - "input_type": "interspersed_request", - "output_type": "interspersed_reply" - } - - ] -} diff --git a/src/v/rpc/demo/server.cc b/src/v/rpc/demo/server.cc deleted file mode 100644 index c30a94ff8a78..000000000000 --- a/src/v/rpc/demo/server.cc +++ /dev/null @@ -1,116 +0,0 @@ -// Copyright 2020 Redpanda Data, Inc. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.md -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0 - -#include "net/server.h" - -#include "rpc/demo/demo_utils.h" -#include "rpc/demo/simple_service.h" -#include "rpc/simple_protocol.h" -#include "syschecks/syschecks.h" -#include "vlog.h" - -#include -#include -#include -#include - -#include - -static ss::logger lgr{"demo server"}; - -struct service final : demo::simple_service { - using demo::simple_service::simple_service; - ss::future - put(demo::simple_request&&, rpc::streaming_context&) final { - return ss::make_ready_future( - demo::simple_reply{{}}); - } - ss::future - put_complex(demo::complex_request&&, rpc::streaming_context&) final { - return ss::make_ready_future( - demo::complex_reply{{}}); - } - ss::future put_interspersed( - demo::interspersed_request&&, rpc::streaming_context&) final { - return ss::make_ready_future( - demo::interspersed_reply{{}}); - } -}; - -void cli_opts(boost::program_options::options_description_easy_init o) { - namespace po = boost::program_options; - o("ip", - po::value()->default_value("127.0.0.1"), - "ip to connect to"); - o("port", po::value()->default_value(20776), "port for service"); - o("key", - po::value()->default_value(""), - "key for TLS seccured connection"); - o("cert", - po::value()->default_value(""), - "cert for TLS seccured connection"); -} - -int main(int args, char** argv, char** env) { - syschecks::initialize_intrinsics(); - std::setvbuf(stdout, nullptr, _IOLBF, 1024); - ss::sharded serv; - ss::app_template app; - cli_opts(app.add_options()); - return app.run_deprecated(args, argv, [&] { - ss::engine().at_exit([&serv] { - // clang-format off - return ss::do_with(hdr_hist{}, [&serv](hdr_hist& h) { - auto begin = boost::make_counting_iterator(uint32_t(0)); - auto end = boost::make_counting_iterator(uint32_t(ss::smp::count)); - return ss::do_for_each(begin, end, [&h, &serv](uint32_t i) { - return serv.invoke_on(i, [&h](const net::server& s) { - h += s.histogram(); - }); - }) .then([&h] { return write_histogram("server.hdr", h); }); - }).then([&serv] { return serv.stop(); }); - // clang-format on - }); - auto& cfg = app.configuration(); - return ss::async([&] { - net::server_configuration scfg("demo_rpc"); - auto key = cfg["key"].as(); - auto cert = cfg["cert"].as(); - ss::shared_ptr creds; - if (key != "" && cert != "") { - auto builder = ss::tls::credentials_builder(); - builder.set_dh_level(ss::tls::dh_params::level::MEDIUM); - builder - .set_x509_key_file(cert, key, ss::tls::x509_crt_format::PEM) - .get(); - creds = builder.build_reloadable_server_credentials().get0(); - } - scfg.addrs.emplace_back( - ss::socket_address(ss::ipv4_addr( - cfg["ip"].as(), cfg["port"].as())), - creds); - scfg.max_service_memory_per_core - = ss::memory::stats().total_memory() * .9 /*90%*/; - - serv.start(scfg).get(); - vlog(lgr.info, "registering service on all cores"); - serv - .invoke_on_all([](net::server& s) { - auto proto = std::make_unique(); - proto->register_service( - ss::default_scheduling_group(), - ss::default_smp_service_group()); - s.set_protocol(std::move(proto)); - }) - .get(); - vlog(lgr.info, "Invoking rpc start on all cores"); - serv.invoke_on_all(&net::server::start).get(); - }); - }); -} diff --git a/src/v/rpc/demo/type_tests.cc b/src/v/rpc/demo/type_tests.cc deleted file mode 100644 index 228559f39262..000000000000 --- a/src/v/rpc/demo/type_tests.cc +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright 2020 Redpanda Data, Inc. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.md -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0 - -#include "reflection/adl.h" -#include "rpc/demo/demo_utils.h" -#include "rpc/demo/types.h" - -#include -#include - -using namespace demo; // NOLINT - -SEASTAR_THREAD_TEST_CASE(roundtrip_interspersed) { - auto b = reflection::to_iobuf(gen_interspersed_request(1 << 20, 1 << 15)); - BOOST_REQUIRE_EQUAL(b.size_bytes(), (1 << 20) + 80 /*80bytes overhead*/); - auto expected = reflection::adl{}.from(std::move(b)); - BOOST_REQUIRE_EQUAL(expected.data._three.y.size_bytes(), (1 << 20) / 8); -} diff --git a/src/v/rpc/demo/types.h b/src/v/rpc/demo/types.h deleted file mode 100644 index 66cf48d2786f..000000000000 --- a/src/v/rpc/demo/types.h +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright 2020 Redpanda Data, Inc. - * - * Use of this software is governed by the Business Source License - * included in the file licenses/BSL.md - * - * As of the Change Date specified in that file, in accordance with - * the Business Source License, use of this software will be governed - * by the Apache License, Version 2.0 - */ - -#pragma once - -#include "bytes/iobuf.h" - -namespace demo { -struct simple_request { - iobuf data; -}; -struct simple_reply { - iobuf data; -}; - -struct c1 { - int32_t a{42}, b{1}, c{66}, d{-8}; -}; -struct c2 { - c1 _a; - int32_t b{1}, c{66}, d{-8}, e{0}; -}; -struct c3 { - c2 _a; - int32_t b{1}, c{66}, d{-8}, e{0}; -}; -struct c4 { - c3 _a; - int32_t b{1}, c{66}, d{-8}, e{0}; -}; -struct c5 { - c4 _a; - int32_t b{1}, c{66}, d{-8}, e{0}; -}; - -struct complex_request { - struct payload { - c1 _one; // 4 fields - c2 _two; // 8 fields - c3 _three; // 16 fields - c4 _four; // 32 fields - c5 _five; // 64 fields - }; - // 128 fields - payload data; -}; -struct complex_reply { - int32_t x{-1}; -}; - -struct i1 { - c1 x; - iobuf y; -}; -struct i2 { - i1 x; - iobuf y; -}; - -struct i3 { - i2 x; - iobuf y; -}; -struct interspersed_request { - struct payload { - i1 _one; - i2 _two; - i3 _three; - }; - payload data; - iobuf x; - iobuf y; -}; -struct interspersed_reply { - int32_t x{-1}; -}; - -} // namespace demo