From 69404a1b9d1cd2d6de1760394ed6188e2aad83c3 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 3 Jul 2023 15:28:24 +0100 Subject: [PATCH] feat: remove `Control` and `ControlledConnection` (#164) --- CHANGELOG.md | 6 + test-harness/Cargo.toml | 10 + {yamux => test-harness}/benches/concurrent.rs | 73 +--- test-harness/src/lib.rs | 222 +++++++++- test-harness/tests/concurrent.rs | 153 ------- test-harness/tests/poll_api.rs | 282 +++++++++---- test-harness/tests/tests.rs | 392 ------------------ yamux/Cargo.toml | 14 +- yamux/src/connection.rs | 2 +- yamux/src/control.rs | 246 ----------- yamux/src/lib.rs | 8 - 11 files changed, 451 insertions(+), 957 deletions(-) rename {yamux => test-harness}/benches/concurrent.rs (61%) delete mode 100644 test-harness/tests/concurrent.rs delete mode 100644 test-harness/tests/tests.rs delete mode 100644 yamux/src/control.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index f5cbde49..b9389a98 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +# 0.12.0 - unreleased + +- Remove `Control` and `ControlledConnection`. + Users have to move to the `poll_` functions of `Connection`. + See [PR #164](https://github.com/libp2p/rust-yamux/pull/164). + # 0.11.1 - Avoid race condition between pending frames and closing stream. diff --git a/test-harness/Cargo.toml b/test-harness/Cargo.toml index 92fe8a2f..39b702d5 100644 --- a/test-harness/Cargo.toml +++ b/test-harness/Cargo.toml @@ -14,5 +14,15 @@ anyhow = "1" log = "0.4.17" [dev-dependencies] +criterion = "0.5" env_logger = "0.10" +futures = "0.3.4" +quickcheck = "1.0" +tokio = { version = "1.0", features = ["net", "rt-multi-thread", "macros", "time"] } +tokio-util = { version = "0.7", features = ["compat"] } constrained-connection = "0.1" +futures_ringbuf = "0.4.0" + +[[bench]] +name = "concurrent" +harness = false diff --git a/yamux/benches/concurrent.rs b/test-harness/benches/concurrent.rs similarity index 61% rename from yamux/benches/concurrent.rs rename to test-harness/benches/concurrent.rs index 5da8ce29..f7179cee 100644 --- a/yamux/benches/concurrent.rs +++ b/test-harness/benches/concurrent.rs @@ -10,10 +10,11 @@ use constrained_connection::{new_unconstrained_connection, samples, Endpoint}; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; -use futures::{channel::mpsc, future, io::AsyncReadExt, prelude::*}; +use std::iter; use std::sync::Arc; +use test_harness::{dev_null_server, MessageSender, MessageSenderStrategy, Msg}; use tokio::{runtime::Runtime, task}; -use yamux::{Config, Connection, Control, Mode}; +use yamux::{Config, Connection, Mode}; criterion_group!(benches, concurrent); criterion_main!(benches); @@ -86,62 +87,20 @@ async fn oneway( server: Endpoint, client: Endpoint, ) { - let msg_len = data.0.len(); - let (tx, rx) = mpsc::unbounded(); + let server = Connection::new(server, config(), Mode::Server); + let client = Connection::new(client, config(), Mode::Client); - let server = async move { - let mut connection = Connection::new(server, config(), Mode::Server); + task::spawn(dev_null_server(server)); - while let Some(Ok(mut stream)) = stream::poll_fn(|cx| connection.poll_next_inbound(cx)) - .next() - .await - { - let tx = tx.clone(); - - task::spawn(async move { - let mut n = 0; - let mut b = vec![0; msg_len]; - - // Receive `nmessages` messages. - for _ in 0..nmessages { - stream.read_exact(&mut b[..]).await.unwrap(); - n += b.len(); - } - - tx.unbounded_send(n).expect("unbounded_send"); - stream.close().await.unwrap(); - }); - } - }; - task::spawn(server); - - let conn = Connection::new(client, config(), Mode::Client); - let (mut ctrl, conn) = Control::new(conn); - - task::spawn(conn.for_each(|r| { - r.unwrap(); - future::ready(()) - })); - - for _ in 0..nstreams { - let data = data.clone(); - let mut ctrl = ctrl.clone(); - task::spawn(async move { - let mut stream = ctrl.open_stream().await.unwrap(); - - // Send `nmessages` messages. - for _ in 0..nmessages { - stream.write_all(data.as_ref()).await.unwrap(); - } - - stream.close().await.unwrap(); - }); - } - - let n = rx + let messages = iter::repeat(data) + .map(|b| Msg(b.0.to_vec())) .take(nstreams) - .fold(0, |acc, n| future::ready(acc + n)) - .await; - assert_eq!(n, nstreams * nmessages * msg_len); - ctrl.close().await.expect("close"); + .collect(); // `MessageSender` will use 1 stream per message. + let num_streams_used = MessageSender::new(client, messages, true) + .with_message_multiplier(nmessages as u64) + .with_strategy(MessageSenderStrategy::Send) + .await + .unwrap(); + + assert_eq!(num_streams_used, nstreams); } diff --git a/test-harness/src/lib.rs b/test-harness/src/lib.rs index 4c387b42..4c4820cc 100644 --- a/test-harness/src/lib.rs +++ b/test-harness/src/lib.rs @@ -10,7 +10,8 @@ use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::pin::Pin; use std::task::{Context, Poll}; use std::{fmt, io, mem}; -use tokio::net::{TcpListener, TcpStream}; +use tokio::net::{TcpListener, TcpSocket, TcpStream}; +use tokio::task; use tokio_util::compat::{Compat, TokioAsyncReadCompatExt}; use yamux::ConnectionError; use yamux::{Config, WindowUpdateMode}; @@ -19,8 +20,9 @@ use yamux::{Connection, Mode}; pub async fn connected_peers( server_config: Config, client_config: Config, + buffer_sizes: Option, ) -> io::Result<(Connection>, Connection>)> { - let (listener, addr) = bind().await?; + let (listener, addr) = bind(buffer_sizes).await?; let server = async { let (stream, _) = listener.accept().await?; @@ -31,7 +33,7 @@ pub async fn connected_peers( )) }; let client = async { - let stream = TcpStream::connect(addr).await?; + let stream = new_socket(buffer_sizes)?.connect(addr).await?; Ok(Connection::new( stream.compat(), client_config, @@ -42,12 +44,27 @@ pub async fn connected_peers( futures::future::try_join(server, client).await } -pub async fn bind() -> io::Result<(TcpListener, SocketAddr)> { - let i = Ipv4Addr::new(127, 0, 0, 1); - let s = SocketAddr::V4(SocketAddrV4::new(i, 0)); - let l = TcpListener::bind(&s).await?; - let a = l.local_addr()?; - Ok((l, a)) +pub async fn bind(buffer_sizes: Option) -> io::Result<(TcpListener, SocketAddr)> { + let socket = new_socket(buffer_sizes)?; + socket.bind(SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(127, 0, 0, 1), + 0, + )))?; + + let listener = socket.listen(1024)?; + let address = listener.local_addr()?; + + Ok((listener, address)) +} + +fn new_socket(buffer_sizes: Option) -> io::Result { + let socket = TcpSocket::new_v4()?; + if let Some(size) = buffer_sizes { + socket.set_send_buffer_size(size.send)?; + socket.set_recv_buffer_size(size.recv)?; + } + + Ok(socket) } /// For each incoming stream of `c` echo back to the sender. @@ -67,6 +84,145 @@ where .await } +/// For each incoming stream of `c`, read to end but don't write back. +pub async fn dev_null_server(mut c: Connection) -> Result<(), ConnectionError> +where + T: AsyncRead + AsyncWrite + Unpin, +{ + stream::poll_fn(|cx| c.poll_next_inbound(cx)) + .try_for_each_concurrent(None, |mut stream| async move { + let mut buf = [0u8; 1024]; + + while let Ok(n) = stream.read(&mut buf).await { + if n == 0 { + break; + } + } + + stream.close().await?; + Ok(()) + }) + .await +} + +pub struct MessageSender { + connection: Connection, + pending_messages: Vec, + worker_streams: FuturesUnordered>, + streams_processed: usize, + /// Whether to spawn a new task for each stream. + spawn_tasks: bool, + /// How many times to send each message on the stream + message_multiplier: u64, + strategy: MessageSenderStrategy, +} + +#[derive(Copy, Clone)] +pub enum MessageSenderStrategy { + SendRecv, + Send, +} + +impl MessageSender { + pub fn new(connection: Connection, messages: Vec, spawn_tasks: bool) -> Self { + Self { + connection, + pending_messages: messages, + worker_streams: FuturesUnordered::default(), + streams_processed: 0, + spawn_tasks, + message_multiplier: 1, + strategy: MessageSenderStrategy::SendRecv, + } + } + + pub fn with_message_multiplier(mut self, multiplier: u64) -> Self { + self.message_multiplier = multiplier; + self + } + + pub fn with_strategy(mut self, strategy: MessageSenderStrategy) -> Self { + self.strategy = strategy; + self + } +} + +impl Future for MessageSender +where + T: AsyncRead + AsyncWrite + Unpin, +{ + type Output = yamux::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + loop { + if this.pending_messages.is_empty() && this.worker_streams.is_empty() { + futures::ready!(this.connection.poll_close(cx)?); + + return Poll::Ready(Ok(this.streams_processed)); + } + + if let Some(message) = this.pending_messages.pop() { + match this.connection.poll_new_outbound(cx)? { + Poll::Ready(mut stream) => { + let multiplier = this.message_multiplier; + let strategy = this.strategy; + + let future = async move { + for _ in 0..multiplier { + match strategy { + MessageSenderStrategy::SendRecv => { + send_recv_message(&mut stream, &message).await.unwrap() + } + MessageSenderStrategy::Send => { + stream.write_all(&message.0).await.unwrap() + } + }; + } + + stream.close().await.unwrap(); + }; + + let worker_stream_future = if this.spawn_tasks { + async { task::spawn(future).await.unwrap() }.boxed() + } else { + future.boxed() + }; + + this.worker_streams.push(worker_stream_future); + continue; + } + Poll::Pending => { + this.pending_messages.push(message); + } + } + } + + match this.worker_streams.poll_next_unpin(cx) { + Poll::Ready(Some(())) => { + this.streams_processed += 1; + continue; + } + Poll::Ready(None) | Poll::Pending => {} + } + + match this.connection.poll_next_inbound(cx)? { + Poll::Ready(Some(stream)) => { + drop(stream); + panic!("Did not expect remote to open a stream"); + } + Poll::Ready(None) => { + panic!("Did not expect remote to close the connection"); + } + Poll::Pending => {} + } + + return Poll::Pending; + } + } +} + /// For each incoming stream, do nothing. pub async fn noop_server(c: impl Stream>) { c.for_each(|maybe_stream| { @@ -76,13 +232,39 @@ pub async fn noop_server(c: impl Stream io::Result<()> { +/// Send and receive buffer size for a TCP socket. +#[derive(Clone, Debug, Copy)] +pub struct TcpBufferSizes { + send: u32, + recv: u32, +} + +impl Arbitrary for TcpBufferSizes { + fn arbitrary(g: &mut Gen) -> Self { + let send = if bool::arbitrary(g) { + 16 * 1024 + } else { + 32 * 1024 + }; + + // Have receive buffer size be some multiple of send buffer size. + let recv = if bool::arbitrary(g) { + send * 2 + } else { + send * 4 + }; + + TcpBufferSizes { send, recv } + } +} + +pub async fn send_recv_message(stream: &mut yamux::Stream, Msg(msg): &Msg) -> io::Result<()> { let id = stream.id(); let (mut reader, mut writer) = AsyncReadExt::split(stream); let len = msg.len(); let write_fut = async { - writer.write_all(&msg).await.unwrap(); + writer.write_all(msg).await.unwrap(); log::debug!("C: {}: sent {} bytes", id, len); }; let mut data = vec![0; msg.len()]; @@ -91,7 +273,23 @@ pub async fn send_recv_message(stream: &mut yamux::Stream, Msg(msg): Msg) -> io: log::debug!("C: {}: received {} bytes", id, data.len()); }; futures::future::join(write_fut, read_fut).await; - assert_eq!(data, msg); + assert_eq!(&data, msg); + + Ok(()) +} + +/// Send all messages, using only a single stream. +pub async fn send_on_single_stream( + mut stream: yamux::Stream, + iter: impl IntoIterator, +) -> Result<(), ConnectionError> { + log::debug!("C: new stream: {}", stream); + + for msg in iter { + send_recv_message(&mut stream, &msg).await?; + } + + stream.close().await?; Ok(()) } diff --git a/test-harness/tests/concurrent.rs b/test-harness/tests/concurrent.rs deleted file mode 100644 index 46501158..00000000 --- a/test-harness/tests/concurrent.rs +++ /dev/null @@ -1,153 +0,0 @@ -// Copyright (c) 2018-2019 Parity Technologies (UK) Ltd. -// -// Licensed under the Apache License, Version 2.0 or MIT license, at your option. -// -// A copy of the Apache License, Version 2.0 is included in the software as -// LICENSE-APACHE and a copy of the MIT license is included in the software -// as LICENSE-MIT. You may also obtain a copy of the Apache License, Version 2.0 -// at https://www.apache.org/licenses/LICENSE-2.0 and a copy of the MIT license -// at https://opensource.org/licenses/MIT. - -use futures::prelude::*; -use futures::stream::FuturesUnordered; -use quickcheck::{Arbitrary, Gen, QuickCheck}; -use std::{ - io, - net::{Ipv4Addr, SocketAddr, SocketAddrV4}, -}; -use test_harness::*; -use tokio::net::{TcpListener, TcpStream}; -use tokio::{net::TcpSocket, runtime::Runtime, task}; -use tokio_util::compat::{Compat, TokioAsyncReadCompatExt}; -use yamux::{Config, Connection, ConnectionError, Control, Mode, WindowUpdateMode}; - -const PAYLOAD_SIZE: usize = 128 * 1024; - -#[test] -fn concurrent_streams() { - let _ = env_logger::try_init(); - - fn prop(tcp_buffer_sizes: Option) { - let data = Msg(vec![0x42; PAYLOAD_SIZE]); - let n_streams = 1000; - - Runtime::new().expect("new runtime").block_on(async move { - let (server, client) = connected_peers(tcp_buffer_sizes).await.unwrap(); - - task::spawn(echo_server(server)); - - let (mut ctrl, client) = Control::new(client); - task::spawn(noop_server(client)); - - let result = (0..n_streams) - .map(|_| { - let data = data.clone(); - let mut ctrl = ctrl.clone(); - - task::spawn(async move { - let mut stream = ctrl.open_stream().await?; - log::debug!("C: opened new stream {}", stream.id()); - - send_recv_message(&mut stream, data).await?; - stream.close().await?; - - Ok::<(), ConnectionError>(()) - }) - }) - .collect::>() - .try_collect::>() - .await - .unwrap() - .into_iter() - .collect::, ConnectionError>>(); - - ctrl.close().await.expect("close connection"); - - assert_eq!(result.unwrap().len(), n_streams); - }); - } - - QuickCheck::new().tests(3).quickcheck(prop as fn(_) -> _) -} - -/// Send and receive buffer size for a TCP socket. -#[derive(Clone, Debug, Copy)] -struct TcpBufferSizes { - send: u32, - recv: u32, -} - -impl Arbitrary for TcpBufferSizes { - fn arbitrary(g: &mut Gen) -> Self { - let send = if bool::arbitrary(g) { - 16 * 1024 - } else { - 32 * 1024 - }; - - // Have receive buffer size be some multiple of send buffer size. - let recv = if bool::arbitrary(g) { - send * 2 - } else { - send * 4 - }; - - TcpBufferSizes { send, recv } - } -} - -async fn connected_peers( - buffer_sizes: Option, -) -> io::Result<(Connection>, Connection>)> { - let (listener, addr) = bind(buffer_sizes).await?; - - let server = async { - let (stream, _) = listener.accept().await?; - Ok(Connection::new(stream.compat(), config(), Mode::Server)) - }; - let client = async { - let stream = new_socket(buffer_sizes)?.connect(addr).await?; - - Ok(Connection::new(stream.compat(), config(), Mode::Client)) - }; - - futures::future::try_join(server, client).await -} - -async fn bind(buffer_sizes: Option) -> io::Result<(TcpListener, SocketAddr)> { - let socket = new_socket(buffer_sizes)?; - socket.bind(SocketAddr::V4(SocketAddrV4::new( - Ipv4Addr::new(127, 0, 0, 1), - 0, - )))?; - - let listener = socket.listen(1024)?; - let address = listener.local_addr()?; - - Ok((listener, address)) -} - -fn new_socket(buffer_sizes: Option) -> io::Result { - let socket = TcpSocket::new_v4()?; - if let Some(size) = buffer_sizes { - socket.set_send_buffer_size(size.send)?; - socket.set_recv_buffer_size(size.recv)?; - } - - Ok(socket) -} - -fn config() -> Config { - let mut server_cfg = Config::default(); - // Use a large frame size to speed up the test. - server_cfg.set_split_send_size(PAYLOAD_SIZE); - // Use `WindowUpdateMode::OnRead` so window updates are sent by the - // `Stream`s and subject to backpressure from the stream command channel. Thus - // the `Connection` I/O loop will not need to send window updates - // directly as a result of reading a frame, which can otherwise - // lead to mutual write deadlocks if the socket send buffers are too small. - // With `OnRead` the socket send buffer can even be smaller than the size - // of a single frame for this test. - server_cfg.set_window_update_mode(WindowUpdateMode::OnRead); - server_cfg -} diff --git a/test-harness/tests/poll_api.rs b/test-harness/tests/poll_api.rs index a3ec5759..37f0ad72 100644 --- a/test-harness/tests/poll_api.rs +++ b/test-harness/tests/poll_api.rs @@ -1,10 +1,12 @@ -use futures::future::BoxFuture; -use futures::stream::FuturesUnordered; -use futures::{AsyncRead, AsyncWrite, AsyncWriteExt, FutureExt, StreamExt}; +use futures::executor::LocalPool; +use futures::future::join; +use futures::prelude::*; +use futures::task::{Spawn, SpawnExt}; +use futures::{future, stream, AsyncReadExt, AsyncWriteExt, FutureExt, StreamExt}; use quickcheck::QuickCheck; -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; +use std::iter; +use std::panic::panic_any; +use std::pin::pin; use test_harness::*; use tokio::net::TcpStream; use tokio::runtime::Runtime; @@ -20,7 +22,7 @@ fn prop_config_send_recv_multi() { Runtime::new().unwrap().block_on(async move { let num_messagses = msgs.len(); - let (listener, address) = bind().await.expect("bind"); + let (listener, address) = bind(None).await.expect("bind"); let server = async { let socket = listener.accept().await.expect("accept").0.compat(); @@ -33,7 +35,7 @@ fn prop_config_send_recv_multi() { let socket = TcpStream::connect(address).await.expect("connect").compat(); let connection = Connection::new(socket, cfg2.0, Mode::Client); - MessageSender::new(connection, msgs).await + MessageSender::new(connection, msgs, false).await }; let (server_processed, client_processed) = @@ -47,6 +49,40 @@ fn prop_config_send_recv_multi() { QuickCheck::new().quickcheck(prop as fn(_, _, _) -> _) } +#[test] +fn concurrent_streams() { + let _ = env_logger::try_init(); + + fn prop(tcp_buffer_sizes: Option) { + const PAYLOAD_SIZE: usize = 128 * 1024; + + let data = Msg(vec![0x42; PAYLOAD_SIZE]); + let n_streams = 1000; + + let mut cfg = Config::default(); + cfg.set_split_send_size(PAYLOAD_SIZE); // Use a large frame size to speed up the test. + + Runtime::new().expect("new runtime").block_on(async move { + let (server, client) = connected_peers(cfg.clone(), cfg, tcp_buffer_sizes) + .await + .unwrap(); + + task::spawn(echo_server(server)); + let client = MessageSender::new( + client, + iter::repeat(data).take(n_streams).collect::>(), + true, + ); + + let num_processed = client.await.unwrap(); + + assert_eq!(num_processed, n_streams); + }); + } + + QuickCheck::new().tests(3).quickcheck(prop as fn(_) -> _) +} + #[test] fn prop_max_streams() { fn prop(n: usize) -> Result { @@ -55,7 +91,7 @@ fn prop_max_streams() { cfg.set_max_num_streams(max_streams); Runtime::new().unwrap().block_on(async move { - let (server, client) = connected_peers(cfg.clone(), cfg).await?; + let (server, client) = connected_peers(cfg.clone(), cfg, None).await?; task::spawn(EchoServer::new(server)); @@ -71,78 +107,174 @@ fn prop_max_streams() { QuickCheck::new().tests(7).quickcheck(prop as fn(_) -> _) } -struct MessageSender { - connection: Connection, - pending_messages: Vec, - worker_streams: FuturesUnordered>, - streams_processed: usize, -} +#[test] +fn prop_send_recv_half_closed() { + fn prop(msg: Msg) -> Result<(), ConnectionError> { + let msg_len = msg.0.len(); -impl MessageSender { - fn new(connection: Connection, messages: Vec) -> Self { - Self { - connection, - pending_messages: messages, - worker_streams: FuturesUnordered::default(), - streams_processed: 0, - } + Runtime::new().unwrap().block_on(async move { + let (mut server, mut client) = + connected_peers(Config::default(), Config::default(), None).await?; + + // Server should be able to write on a stream shutdown by the client. + let server = async { + let mut server = stream::poll_fn(move |cx| server.poll_next_inbound(cx)); + + let mut first_stream = server.next().await.ok_or(ConnectionError::Closed)??; + + task::spawn(noop_server(server)); + + let mut buf = vec![0; msg_len]; + first_stream.read_exact(&mut buf).await?; + first_stream.write_all(&buf).await?; + first_stream.close().await?; + + Result::<(), ConnectionError>::Ok(()) + }; + + // Client should be able to read after shutting down the stream. + let client = async { + let mut stream = future::poll_fn(|cx| client.poll_new_outbound(cx)) + .await + .unwrap(); + task::spawn(noop_server(stream::poll_fn(move |cx| { + client.poll_next_inbound(cx) + }))); + + stream.write_all(&msg.0).await?; + stream.close().await?; + + assert!(stream.is_write_closed()); + let mut buf = vec![0; msg_len]; + stream.read_exact(&mut buf).await?; + + assert_eq!(buf, msg.0); + assert_eq!(Some(0), stream.read(&mut buf).await.ok()); + assert!(stream.is_closed()); + + Result::<(), ConnectionError>::Ok(()) + }; + + futures::future::try_join(server, client).await?; + + Ok(()) + }) } + QuickCheck::new().tests(7).quickcheck(prop as fn(_) -> _) } -impl Future for MessageSender -where - T: AsyncRead + AsyncWrite + Unpin, -{ - type Output = yamux::Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - - loop { - if this.pending_messages.is_empty() && this.worker_streams.is_empty() { - futures::ready!(this.connection.poll_close(cx)?); - - return Poll::Ready(Ok(this.streams_processed)); - } - - if let Some(message) = this.pending_messages.pop() { - match this.connection.poll_new_outbound(cx)? { - Poll::Ready(mut stream) => { - this.worker_streams.push( - async move { - send_recv_message(&mut stream, message).await.unwrap(); - stream.close().await.unwrap(); - } - .boxed(), - ); - continue; - } - Poll::Pending => { - this.pending_messages.push(message); - } - } - } +#[test] +fn prop_config_send_recv_single() { + fn prop( + mut msgs: Vec, + TestConfig(cfg1): TestConfig, + TestConfig(cfg2): TestConfig, + ) -> Result<(), ConnectionError> { + msgs.insert(0, Msg(vec![1u8; yamux::DEFAULT_CREDIT as usize])); - match this.worker_streams.poll_next_unpin(cx) { - Poll::Ready(Some(())) => { - this.streams_processed += 1; - continue; - } - Poll::Ready(None) | Poll::Pending => {} - } + Runtime::new().unwrap().block_on(async move { + let (server, mut client) = connected_peers(cfg1, cfg2, None).await?; + let server = echo_server(server); - match this.connection.poll_next_inbound(cx)? { - Poll::Ready(Some(stream)) => { - drop(stream); - panic!("Did not expect remote to open a stream"); - } - Poll::Ready(None) => { - panic!("Did not expect remote to close the connection"); - } - Poll::Pending => {} - } + let client = async { + let stream = future::poll_fn(|cx| client.poll_new_outbound(cx)) + .await + .unwrap(); + let client_task = noop_server(stream::poll_fn(|cx| client.poll_next_inbound(cx))); + + future::select(pin!(client_task), pin!(send_on_single_stream(stream, msgs))).await; + + future::poll_fn(|cx| client.poll_close(cx)).await.unwrap(); + + Ok(()) + }; - return Poll::Pending; - } + futures::future::try_join(server, client).await?; + + Ok(()) + }) } + QuickCheck::new() + .tests(10) + .quickcheck(prop as fn(_, _, _) -> _) +} + +/// This test simulates two endpoints of a Yamux connection which may be unable to +/// write simultaneously but can make progress by reading. If both endpoints +/// don't read in-between trying to finish their writes, a deadlock occurs. +#[test] +fn write_deadlock() { + let _ = env_logger::try_init(); + let mut pool = LocalPool::new(); + + // We make the message to transmit large enough s.t. the "server" + // is forced to start writing (i.e. echoing) the bytes before + // having read the entire payload. + let msg = vec![1u8; 1024 * 1024]; + + // We choose a "connection capacity" that is artificially below + // the size of a receive window. If it were equal or greater, + // multiple concurrently writing streams would be needed to non-deterministically + // provoke the write deadlock. This is supposed to reflect the + // fact that the sum of receive windows of all open streams can easily + // be larger than the send capacity of the connection at any point in time. + // Using such a low capacity here therefore yields a more reproducible test. + let capacity = 1024; + + // Create a bounded channel representing the underlying "connection". + // Each endpoint gets a name and a bounded capacity for its outbound + // channel (which is the other's inbound channel). + let (server_endpoint, client_endpoint) = futures_ringbuf::Endpoint::pair(capacity, capacity); + + // Create and spawn a "server" that echoes every message back to the client. + let server = Connection::new(server_endpoint, Config::default(), Mode::Server); + pool.spawner() + .spawn_obj( + async move { echo_server(server).await.unwrap() } + .boxed() + .into(), + ) + .unwrap(); + + // Create and spawn a "client" that sends messages expected to be echoed + // by the server. + let mut client = Connection::new(client_endpoint, Config::default(), Mode::Client); + + let stream = pool + .run_until(future::poll_fn(|cx| client.poll_new_outbound(cx))) + .unwrap(); + + // Continuously advance the Yamux connection of the client in a background task. + pool.spawner() + .spawn_obj( + noop_server(stream::poll_fn(move |cx| client.poll_next_inbound(cx))) + .boxed() + .into(), + ) + .unwrap(); + + // Send the message, expecting it to be echo'd. + pool.run_until( + pool.spawner() + .spawn_with_handle( + async move { + let (mut reader, mut writer) = AsyncReadExt::split(stream); + let mut b = vec![0; msg.len()]; + // Write & read concurrently, so that the client is able + // to start reading the echo'd bytes before it even finished + // sending them all. + let _ = join( + writer.write_all(msg.as_ref()).map_err(|e| panic_any(e)), + reader.read_exact(&mut b[..]).map_err(|e| panic_any(e)), + ) + .await; + let mut stream = reader.reunite(writer).unwrap(); + stream.close().await.unwrap(); + log::debug!("C: Stream {} done.", stream.id()); + assert_eq!(b, msg); + } + .boxed(), + ) + .unwrap(), + ); } diff --git a/test-harness/tests/tests.rs b/test-harness/tests/tests.rs deleted file mode 100644 index 5a4be48f..00000000 --- a/test-harness/tests/tests.rs +++ /dev/null @@ -1,392 +0,0 @@ -// Copyright (c) 2018-2019 Parity Technologies (UK) Ltd. -// -// Licensed under the Apache License, Version 2.0 or MIT license, at your option. -// -// A copy of the Apache License, Version 2.0 is included in the software as -// LICENSE-APACHE and a copy of the MIT license is included in the software -// as LICENSE-MIT. You may also obtain a copy of the Apache License, Version 2.0 -// at https://www.apache.org/licenses/LICENSE-2.0 and a copy of the MIT license -// at https://opensource.org/licenses/MIT. - -use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; -use futures::executor::LocalPool; -use futures::future::join; -use futures::io::AsyncReadExt; -use futures::prelude::*; -use futures::task::{Spawn, SpawnExt}; -use quickcheck::{QuickCheck, TestResult}; -use std::panic::panic_any; -use std::pin::Pin; -use std::sync::{Arc, Mutex}; -use std::task::{Context, Poll, Waker}; -use test_harness::*; -use tokio::{runtime::Runtime, task}; -use yamux::{Config, Connection, ConnectionError, Control, Mode}; - -#[test] -fn prop_config_send_recv_single() { - fn prop( - mut msgs: Vec, - TestConfig(cfg1): TestConfig, - TestConfig(cfg2): TestConfig, - ) -> Result<(), ConnectionError> { - msgs.insert(0, Msg(vec![1u8; yamux::DEFAULT_CREDIT as usize])); - - Runtime::new().unwrap().block_on(async move { - let (server, client) = connected_peers(cfg1, cfg2).await?; - - let server = echo_server(server); - let client = async { - let (control, client) = Control::new(client); - task::spawn(noop_server(client)); - send_on_single_stream(control, msgs).await?; - - Ok(()) - }; - - futures::future::try_join(server, client).await?; - - Ok(()) - }) - } - QuickCheck::new() - .tests(10) - .quickcheck(prop as fn(_, _, _) -> _) -} - -#[test] -fn prop_send_recv() { - fn prop(msgs: Vec) -> Result { - if msgs.is_empty() { - return Ok(TestResult::discard()); - } - - Runtime::new().unwrap().block_on(async move { - let (server, client) = connected_peers(Config::default(), Config::default()).await?; - - let server = echo_server(server); - let client = async { - let (control, client) = Control::new(client); - task::spawn(noop_server(client)); - send_on_separate_streams(control, msgs).await?; - - Ok(()) - }; - - futures::future::try_join(server, client).await?; - - Ok(TestResult::passed()) - }) - } - QuickCheck::new().tests(1).quickcheck(prop as fn(_) -> _) -} - -#[test] -fn prop_send_recv_half_closed() { - fn prop(msg: Msg) -> Result<(), ConnectionError> { - let msg_len = msg.0.len(); - - Runtime::new().unwrap().block_on(async move { - let (mut server, client) = - connected_peers(Config::default(), Config::default()).await?; - - // Server should be able to write on a stream shutdown by the client. - let server = async { - let mut server = stream::poll_fn(move |cx| server.poll_next_inbound(cx)); - - let mut first_stream = server.next().await.ok_or(ConnectionError::Closed)??; - - task::spawn(noop_server(server)); - - let mut buf = vec![0; msg_len]; - first_stream.read_exact(&mut buf).await?; - first_stream.write_all(&buf).await?; - first_stream.close().await?; - - Result::<(), ConnectionError>::Ok(()) - }; - - // Client should be able to read after shutting down the stream. - let client = async { - let (mut control, client) = Control::new(client); - task::spawn(noop_server(client)); - - let mut stream = control.open_stream().await?; - stream.write_all(&msg.0).await?; - stream.close().await?; - - assert!(stream.is_write_closed()); - let mut buf = vec![0; msg_len]; - stream.read_exact(&mut buf).await?; - - assert_eq!(buf, msg.0); - assert_eq!(Some(0), stream.read(&mut buf).await.ok()); - assert!(stream.is_closed()); - - Result::<(), ConnectionError>::Ok(()) - }; - - futures::future::try_join(server, client).await?; - - Ok(()) - }) - } - QuickCheck::new().tests(7).quickcheck(prop as fn(_) -> _) -} - -/// This test simulates two endpoints of a Yamux connection which may be unable to -/// write simultaneously but can make progress by reading. If both endpoints -/// don't read in-between trying to finish their writes, a deadlock occurs. -#[test] -fn write_deadlock() { - let _ = env_logger::try_init(); - let mut pool = LocalPool::new(); - - // We make the message to transmit large enough s.t. the "server" - // is forced to start writing (i.e. echoing) the bytes before - // having read the entire payload. - let msg = vec![1u8; 1024 * 1024]; - - // We choose a "connection capacity" that is artificially below - // the size of a receive window. If it were equal or greater, - // multiple concurrently writing streams would be needed to non-deterministically - // provoke the write deadlock. This is supposed to reflect the - // fact that the sum of receive windows of all open streams can easily - // be larger than the send capacity of the connection at any point in time. - // Using such a low capacity here therefore yields a more reproducible test. - let capacity = 1024; - - // Create a bounded channel representing the underlying "connection". - // Each endpoint gets a name and a bounded capacity for its outbound - // channel (which is the other's inbound channel). - let (server_endpoint, client_endpoint) = bounded::channel(("S", capacity), ("C", capacity)); - - // Create and spawn a "server" that echoes every message back to the client. - let server = Connection::new(server_endpoint, Config::default(), Mode::Server); - pool.spawner() - .spawn_obj( - async move { echo_server(server).await.unwrap() } - .boxed() - .into(), - ) - .unwrap(); - - // Create and spawn a "client" that sends messages expected to be echoed - // by the server. - let client = Connection::new(client_endpoint, Config::default(), Mode::Client); - let (mut ctrl, client) = Control::new(client); - - // Continuously advance the Yamux connection of the client in a background task. - pool.spawner() - .spawn_obj(noop_server(client).boxed().into()) - .unwrap(); - - // Send the message, expecting it to be echo'd. - pool.run_until( - pool.spawner() - .spawn_with_handle( - async move { - let stream = ctrl.open_stream().await.unwrap(); - let (mut reader, mut writer) = AsyncReadExt::split(stream); - let mut b = vec![0; msg.len()]; - // Write & read concurrently, so that the client is able - // to start reading the echo'd bytes before it even finished - // sending them all. - let _ = join( - writer.write_all(msg.as_ref()).map_err(|e| panic_any(e)), - reader.read_exact(&mut b[..]).map_err(|e| panic_any(e)), - ) - .await; - let mut stream = reader.reunite(writer).unwrap(); - stream.close().await.unwrap(); - log::debug!("C: Stream {} done.", stream.id()); - assert_eq!(b, msg); - } - .boxed(), - ) - .unwrap(), - ); -} - -/// Send all messages, opening a new stream for each one. -async fn send_on_separate_streams( - mut control: Control, - iter: impl IntoIterator, -) -> Result<(), ConnectionError> { - for msg in iter { - let mut stream = control.open_stream().await?; - log::debug!("C: new stream: {}", stream); - - send_recv_message(&mut stream, msg).await?; - stream.close().await?; - } - - log::debug!("C: closing connection"); - control.close().await?; - - Ok(()) -} - -/// Send all messages, using only a single stream. -async fn send_on_single_stream( - mut control: Control, - iter: impl IntoIterator, -) -> Result<(), ConnectionError> { - let mut stream = control.open_stream().await?; - log::debug!("C: new stream: {}", stream); - - for msg in iter { - send_recv_message(&mut stream, msg).await?; - } - - stream.close().await?; - - log::debug!("C: closing connection"); - control.close().await?; - - Ok(()) -} - -/// This module implements a duplex connection via channels with bounded -/// capacities. The channels used for the implementation are unbounded -/// as the operate at the granularity of variably-sized chunks of bytes -/// (`Vec`), whereas the capacity bounds (i.e. max. number of bytes -/// in transit in one direction) are enforced separately. -mod bounded { - use super::*; - use futures::ready; - use std::io::{Error, ErrorKind, Result}; - - pub struct Endpoint { - name: &'static str, - capacity: usize, - send: UnboundedSender>, - send_guard: Arc>, - recv: UnboundedReceiver>, - recv_buf: Vec, - recv_guard: Arc>, - } - - /// A `ChannelGuard` is used to enforce the maximum number of - /// bytes "in transit" across all chunks of an unbounded channel. - #[derive(Default)] - struct ChannelGuard { - size: usize, - waker: Option, - } - - pub fn channel( - (name_a, capacity_a): (&'static str, usize), - (name_b, capacity_b): (&'static str, usize), - ) -> (Endpoint, Endpoint) { - let (a_to_b_sender, a_to_b_receiver) = unbounded(); - let (b_to_a_sender, b_to_a_receiver) = unbounded(); - - let a_to_b_guard = Arc::new(Mutex::new(ChannelGuard::default())); - let b_to_a_guard = Arc::new(Mutex::new(ChannelGuard::default())); - - let a = Endpoint { - name: name_a, - capacity: capacity_a, - send: a_to_b_sender, - send_guard: a_to_b_guard.clone(), - recv: b_to_a_receiver, - recv_buf: Vec::new(), - recv_guard: b_to_a_guard.clone(), - }; - - let b = Endpoint { - name: name_b, - capacity: capacity_b, - send: b_to_a_sender, - send_guard: b_to_a_guard, - recv: a_to_b_receiver, - recv_buf: Vec::new(), - recv_guard: a_to_b_guard, - }; - - (a, b) - } - - impl AsyncRead for Endpoint { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - if self.recv_buf.is_empty() { - match ready!(self.recv.poll_next_unpin(cx)) { - Some(bytes) => { - self.recv_buf = bytes; - } - None => return Poll::Ready(Ok(0)), - } - } - - let n = std::cmp::min(buf.len(), self.recv_buf.len()); - buf[0..n].copy_from_slice(&self.recv_buf[0..n]); - self.recv_buf = self.recv_buf.split_off(n); - - let mut guard = self.recv_guard.lock().unwrap(); - if let Some(waker) = guard.waker.take() { - log::debug!( - "{}: read: notifying waker after read of {} bytes", - self.name, - n - ); - waker.wake(); - } - guard.size -= n; - - log::debug!( - "{}: read: channel: {}/{}", - self.name, - guard.size, - self.capacity - ); - - Poll::Ready(Ok(n)) - } - } - - impl AsyncWrite for Endpoint { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - debug_assert!(!buf.is_empty()); - let mut guard = self.send_guard.lock().unwrap(); - let n = std::cmp::min(self.capacity - guard.size, buf.len()); - if n == 0 { - log::debug!("{}: write: channel full, registering waker", self.name); - guard.waker = Some(cx.waker().clone()); - return Poll::Pending; - } - - self.send - .unbounded_send(buf[0..n].to_vec()) - .map_err(|e| Error::new(ErrorKind::ConnectionAborted, e))?; - - guard.size += n; - log::debug!( - "{}: write: channel: {}/{}", - self.name, - guard.size, - self.capacity - ); - - Poll::Ready(Ok(n)) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(self.send.poll_flush_unpin(cx)).unwrap(); - Poll::Ready(Ok(())) - } - - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(self.send.poll_close_unpin(cx)).unwrap(); - Poll::Ready(Ok(())) - } - } -} diff --git a/yamux/Cargo.toml b/yamux/Cargo.toml index 09322d3c..431bd6c7 100644 --- a/yamux/Cargo.toml +++ b/yamux/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yamux" -version = "0.11.1" +version = "0.12.0" authors = ["Parity Technologies "] license = "Apache-2.0 OR MIT" description = "Multiplexer over reliable, ordered connections" @@ -19,16 +19,4 @@ static_assertions = "1" pin-project = "1.1.0" [dev-dependencies] -anyhow = "1" -criterion = "0.5" -env_logger = "0.10" -futures = "0.3.4" quickcheck = "1.0" -tokio = { version = "1.0", features = ["net", "rt-multi-thread", "macros", "time"] } -tokio-util = { version = "0.7", features = ["compat"] } -constrained-connection = "0.1" -futures_ringbuf = "0.4.0" - -[[bench]] -name = "concurrent" -harness = false diff --git a/yamux/src/connection.rs b/yamux/src/connection.rs index 43f76226..5389e9e1 100644 --- a/yamux/src/connection.rs +++ b/yamux/src/connection.rs @@ -458,7 +458,7 @@ impl Active { match self.stream_receivers.poll_next_unpin(cx) { Poll::Ready(Some((_, Some(StreamCommand::SendFrame(frame))))) => { - self.on_send_frame(frame.into()); + self.on_send_frame(frame); continue; } Poll::Ready(Some((id, Some(StreamCommand::CloseStream { ack })))) => { diff --git a/yamux/src/control.rs b/yamux/src/control.rs deleted file mode 100644 index 48c9aa86..00000000 --- a/yamux/src/control.rs +++ /dev/null @@ -1,246 +0,0 @@ -// Copyright (c) 2018-2019 Parity Technologies (UK) Ltd. -// -// Licensed under the Apache License, Version 2.0 or MIT license, at your option. -// -// A copy of the Apache License, Version 2.0 is included in the software as -// LICENSE-APACHE and a copy of the MIT license is included in the software -// as LICENSE-MIT. You may also obtain a copy of the Apache License, Version 2.0 -// at https://www.apache.org/licenses/LICENSE-2.0 and a copy of the MIT license -// at https://opensource.org/licenses/MIT. - -use crate::MAX_COMMAND_BACKLOG; -use crate::{error::ConnectionError, Connection, Result, Stream}; -use futures::{ - channel::{mpsc, oneshot}, - prelude::*, -}; -use std::pin::Pin; -use std::task::{Context, Poll}; - -/// A Yamux [`Connection`] controller. -/// -/// This presents an alternative API for using a yamux [`Connection`]. -/// -/// A [`Control`] communicates with a [`ControlledConnection`] via a channel. This allows -/// a [`Control`] to be cloned and shared between tasks and threads. -#[derive(Clone, Debug)] -pub struct Control { - /// Command channel to [`ControlledConnection`]. - sender: mpsc::Sender, -} - -impl Control { - pub fn new(connection: Connection) -> (Self, ControlledConnection) { - let (sender, receiver) = mpsc::channel(MAX_COMMAND_BACKLOG); - - let control = Control { sender }; - let connection = ControlledConnection { - state: State::Idle(connection), - commands: receiver, - }; - - (control, connection) - } - - /// Open a new stream to the remote. - pub async fn open_stream(&mut self) -> Result { - let (tx, rx) = oneshot::channel(); - self.sender.send(ControlCommand::OpenStream(tx)).await?; - rx.await? - } - - /// Close the connection. - pub async fn close(&mut self) -> Result<()> { - let (tx, rx) = oneshot::channel(); - if self - .sender - .send(ControlCommand::CloseConnection(tx)) - .await - .is_err() - { - // The receiver is closed which means the connection is already closed. - return Ok(()); - } - // A dropped `oneshot::Sender` means the `Connection` is gone, - // so we do not treat receive errors differently here. - let _ = rx.await; - Ok(()) - } -} - -/// Wraps a [`Connection`] which can be controlled with a [`Control`]. -pub struct ControlledConnection { - state: State, - commands: mpsc::Receiver, -} - -impl ControlledConnection -where - T: AsyncRead + AsyncWrite + Unpin + Send + 'static, -{ - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - loop { - match std::mem::replace(&mut self.state, State::Poisoned) { - State::Idle(mut connection) => { - match connection.poll_next_inbound(cx) { - Poll::Ready(maybe_stream) => { - self.state = State::Idle(connection); - return Poll::Ready(maybe_stream); - } - Poll::Pending => {} - } - - match self.commands.poll_next_unpin(cx) { - Poll::Ready(Some(ControlCommand::OpenStream(reply))) => { - self.state = State::OpeningNewStream { reply, connection }; - continue; - } - Poll::Ready(Some(ControlCommand::CloseConnection(reply))) => { - self.commands.close(); - - self.state = State::Closing { - reply: Some(reply), - inner: Closing::DrainingControlCommands { connection }, - }; - continue; - } - Poll::Ready(None) => { - // Last `Control` sender was dropped, close te connection. - self.state = State::Closing { - reply: None, - inner: Closing::ClosingConnection { connection }, - }; - continue; - } - Poll::Pending => {} - } - - self.state = State::Idle(connection); - return Poll::Pending; - } - State::OpeningNewStream { - reply, - mut connection, - } => match connection.poll_new_outbound(cx) { - Poll::Ready(stream) => { - let _ = reply.send(stream); - - self.state = State::Idle(connection); - continue; - } - Poll::Pending => { - self.state = State::OpeningNewStream { reply, connection }; - return Poll::Pending; - } - }, - State::Closing { - reply, - inner: Closing::DrainingControlCommands { connection }, - } => match self.commands.poll_next_unpin(cx) { - Poll::Ready(Some(ControlCommand::OpenStream(new_reply))) => { - let _ = new_reply.send(Err(ConnectionError::Closed)); - - self.state = State::Closing { - reply, - inner: Closing::DrainingControlCommands { connection }, - }; - continue; - } - Poll::Ready(Some(ControlCommand::CloseConnection(new_reply))) => { - let _ = new_reply.send(()); - - self.state = State::Closing { - reply, - inner: Closing::DrainingControlCommands { connection }, - }; - continue; - } - Poll::Ready(None) => { - self.state = State::Closing { - reply, - inner: Closing::ClosingConnection { connection }, - }; - continue; - } - Poll::Pending => { - self.state = State::Closing { - reply, - inner: Closing::DrainingControlCommands { connection }, - }; - return Poll::Pending; - } - }, - State::Closing { - reply, - inner: Closing::ClosingConnection { mut connection }, - } => match connection.poll_close(cx) { - Poll::Ready(Ok(())) | Poll::Ready(Err(ConnectionError::Closed)) => { - if let Some(reply) = reply { - let _ = reply.send(()); - } - return Poll::Ready(None); - } - Poll::Ready(Err(other)) => { - if let Some(reply) = reply { - let _ = reply.send(()); - } - return Poll::Ready(Some(Err(other))); - } - Poll::Pending => { - self.state = State::Closing { - reply, - inner: Closing::ClosingConnection { connection }, - }; - return Poll::Pending; - } - }, - State::Poisoned => unreachable!(), - } - } - } -} - -#[derive(Debug)] -enum ControlCommand { - /// Open a new stream to the remote end. - OpenStream(oneshot::Sender>), - /// Close the whole connection. - CloseConnection(oneshot::Sender<()>), -} - -/// The state of a [`ControlledConnection`]. -enum State { - Idle(Connection), - OpeningNewStream { - reply: oneshot::Sender>, - connection: Connection, - }, - Closing { - /// A channel to the [`Control`] in case the close was requested. `None` if we are closing because the last [`Control`] was dropped. - reply: Option>, - inner: Closing, - }, - Poisoned, -} - -/// A sub-state of our larger state machine for a [`ControlledConnection`]. -/// -/// Closing connection involves two steps: -/// -/// 1. Draining and answered all remaining [`ControlCommands`]. -/// 1. Closing the underlying [`Connection`]. -enum Closing { - DrainingControlCommands { connection: Connection }, - ClosingConnection { connection: Connection }, -} - -impl futures::Stream for ControlledConnection -where - T: AsyncRead + AsyncWrite + Unpin + Send + 'static, -{ - type Item = Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.get_mut().poll_next(cx) - } -} diff --git a/yamux/src/lib.rs b/yamux/src/lib.rs index 040bd8d7..8de20344 100644 --- a/yamux/src/lib.rs +++ b/yamux/src/lib.rs @@ -25,7 +25,6 @@ #![forbid(unsafe_code)] mod chunks; -mod control; mod error; mod frame; @@ -33,7 +32,6 @@ pub(crate) mod connection; mod tagged_stream; pub use crate::connection::{Connection, Mode, Packet, Stream}; -pub use crate::control::{Control, ControlledConnection}; pub use crate::error::ConnectionError; pub use crate::frame::{ header::{HeaderDecodeError, StreamId}, @@ -44,12 +42,6 @@ pub const DEFAULT_CREDIT: u32 = 256 * 1024; // as per yamux specification pub type Result = std::result::Result; -/// Arbitrary limit of our internal command channels. -/// -/// Since each [`mpsc::Sender`] gets a guaranteed slot in a channel the -/// actual upper bound is this value + number of clones. -const MAX_COMMAND_BACKLOG: usize = 32; - /// Default maximum number of bytes a Yamux data frame might carry as its /// payload when being send. Larger Payloads will be split. ///