Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: don't allow to open more than 256 unacknowledged streams #153

Merged
merged 39 commits into from
Jul 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
570d9d3
Add test for ACK backlog
thomaseizinger Feb 21, 2023
639687c
Don't allow opening more than 256 without receiving an ACK
thomaseizinger Feb 21, 2023
4e477e4
Add test for acknowledging streams
thomaseizinger May 10, 2023
0bd7e0e
Correctly track acknowledged state
thomaseizinger May 10, 2023
7c8f2f8
Update yamux/src/lib.rs
thomaseizinger May 22, 2023
8f67482
Update yamux/src/connection.rs
thomaseizinger May 22, 2023
2d92a77
Don't duplicate data
thomaseizinger May 23, 2023
e675e79
Merge remote-tracking branch 'origin/feat/256-backlog' into feat/256-…
thomaseizinger May 23, 2023
471edc7
Reduce diff
thomaseizinger May 23, 2023
39db682
Merge branch 'master' into feat/256-backlog
thomaseizinger May 24, 2023
fbcbff0
Fix bug where we count inbound and outbound streams for ack backlog
thomaseizinger May 24, 2023
d715980
Window size for inbound streams always starts with DEFAULT_CREDIT
thomaseizinger May 24, 2023
5793d27
Credit for outbound stream always starts with DEFAULT_CREDIT
thomaseizinger May 24, 2023
e1cbf7a
Inline constructor
thomaseizinger May 24, 2023
bc615d4
Introduce `is_outbound` function
thomaseizinger May 24, 2023
c07a5ad
Compute `is_outbound` based on `Mode`
thomaseizinger May 24, 2023
f377492
Track acknowledged as part of state
thomaseizinger May 24, 2023
0e201a4
Merge branch 'master' into feat/256-backlog
thomaseizinger Jun 28, 2023
f2f5bc8
Unify harness code
thomaseizinger Jun 28, 2023
3b35567
Port `concurrent_streams` test to poll-api
thomaseizinger Jun 28, 2023
384ade8
Replace custom channel with `futures_ringbuf`
thomaseizinger Jun 28, 2023
9c090d3
Remove `prop_send_recv`
thomaseizinger Jun 28, 2023
660e6c9
Rewrite `prop_send_recv_half_closed` to poll-api and move
thomaseizinger Jun 28, 2023
7c87447
Rewrite `prop_config_send_recv_single` to poll-api
thomaseizinger Jun 28, 2023
05eee85
Move test to `poll_api`
thomaseizinger Jun 28, 2023
efa7310
Rewrite deadlock test to use poll-api
thomaseizinger Jun 28, 2023
0f10d51
Move final test to poll_api
thomaseizinger Jun 28, 2023
0337b85
Merge branch 'feat/remove-control' into feat/256-backlog
thomaseizinger Jun 30, 2023
cb56b70
Fix errors after merge
thomaseizinger Jun 30, 2023
749fd9a
Move `MessageSender` to test-harness
thomaseizinger Jun 30, 2023
7ff430f
Move benchmarks to test-harness
thomaseizinger Jun 30, 2023
d3667e3
Add message multiplier to `MessageSender`
thomaseizinger Jun 30, 2023
358a728
Migrate benchmark away from `Control`
thomaseizinger Jun 30, 2023
d5d5972
Remove `Control` and `ControlledConnection`
thomaseizinger Jun 30, 2023
40f01c6
Fix formatting
thomaseizinger Jun 30, 2023
ab16660
Merge branch 'feat/remove-control' into feat/256-backlog
thomaseizinger Jun 30, 2023
8e78dbb
Merge branch 'master' into feat/256-backlog
thomaseizinger Jul 3, 2023
d6a4261
Update docs
thomaseizinger Jul 3, 2023
7b4fdba
Add docs
thomaseizinger Jul 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
256 changes: 256 additions & 0 deletions test-harness/tests/ack_backlog.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
use futures::channel::oneshot;
use futures::future::BoxFuture;
use futures::future::FutureExt;
use futures::stream::FuturesUnordered;
use futures::{future, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, StreamExt};
use std::future::Future;
use std::mem;
use std::pin::Pin;
use std::task::{Context, Poll};
use test_harness::bind;
use tokio::net::TcpStream;
use tokio_util::compat::TokioAsyncReadCompatExt;
use yamux::{Config, Connection, ConnectionError, Mode, Stream};

#[tokio::test]
async fn honours_ack_backlog_of_256() {
let _ = env_logger::try_init();

let (tx, rx) = oneshot::channel();

let (listener, address) = bind(None).await.expect("bind");

let server = async {
let socket = listener.accept().await.expect("accept").0.compat();
let connection = Connection::new(socket, Config::default(), Mode::Server);

Server::new(connection, rx).await
};

let client = async {
let socket = TcpStream::connect(address).await.expect("connect").compat();
let connection = Connection::new(socket, Config::default(), Mode::Client);

Client::new(connection, tx).await
};

let (server_processed, client_processed) = future::try_join(server, client).await.unwrap();

assert_eq!(server_processed, 257);
assert_eq!(client_processed, 257);
}

enum Server<T> {
Idle {
connection: Connection<T>,
trigger: oneshot::Receiver<()>,
},
Accepting {
connection: Connection<T>,
worker_streams: FuturesUnordered<BoxFuture<'static, yamux::Result<()>>>,
streams_processed: usize,
},
Poisoned,
}

impl<T> Server<T> {
fn new(connection: Connection<T>, trigger: oneshot::Receiver<()>) -> Self {
Server::Idle {
connection,
trigger,
}
}
}

impl<T> Future for Server<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{
type Output = yamux::Result<usize>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
let this = self.get_mut();

loop {
match mem::replace(this, Server::Poisoned) {
Server::Idle {
mut trigger,
connection,
} => match trigger.poll_unpin(cx) {
Poll::Ready(_) => {
*this = Server::Accepting {
connection,
worker_streams: Default::default(),
streams_processed: 0,
};
continue;
}
Poll::Pending => {
*this = Server::Idle {
trigger,
connection,
};
return Poll::Pending;
}
},
Server::Accepting {
mut connection,
mut streams_processed,
mut worker_streams,
} => {
match connection.poll_next_inbound(cx)? {
Poll::Ready(Some(stream)) => {
worker_streams.push(pong_ping(stream).boxed());
*this = Server::Accepting {
connection,
streams_processed,
worker_streams,
};
continue;
}
Poll::Ready(None) => {
return Poll::Ready(Ok(streams_processed));
}
Poll::Pending => {}
}

match worker_streams.poll_next_unpin(cx)? {
Poll::Ready(Some(())) => {
streams_processed += 1;
*this = Server::Accepting {
connection,
streams_processed,
worker_streams,
};
continue;
}
Poll::Ready(None) | Poll::Pending => {}
}

*this = Server::Accepting {
connection,
streams_processed,
worker_streams,
};
return Poll::Pending;
}
Server::Poisoned => unreachable!(),
}
}
}
}

struct Client<T> {
connection: Connection<T>,
worker_streams: FuturesUnordered<BoxFuture<'static, yamux::Result<()>>>,
trigger: Option<oneshot::Sender<()>>,
streams_processed: usize,
}

impl<T> Client<T> {
fn new(connection: Connection<T>, trigger: oneshot::Sender<()>) -> Self {
Self {
connection,
trigger: Some(trigger),
worker_streams: FuturesUnordered::default(),
streams_processed: 0,
}
}
}

impl<T> Future for Client<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{
type Output = yamux::Result<usize>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

loop {
// First, try to open 256 streams
if this.worker_streams.len() < 256 && this.streams_processed == 0 {
match this.connection.poll_new_outbound(cx)? {
Poll::Ready(stream) => {
this.worker_streams.push(ping_pong(stream).boxed());
continue;
}
Poll::Pending => {
panic!("Should be able to open 256 streams without yielding")
}
}
}

if this.worker_streams.len() == 256 && this.streams_processed == 0 {
let poll_result = this.connection.poll_new_outbound(cx);

match (poll_result, this.trigger.take()) {
(Poll::Pending, Some(trigger)) => {
// This is what we want, our task gets parked because we have hit the limit.
// Tell the server to start processing streams and wait until we get woken.

trigger.send(()).unwrap();
return Poll::Pending;
}
(Poll::Ready(stream), None) => {
// We got woken because the server has started to acknowledge streams.
this.worker_streams.push(ping_pong(stream.unwrap()).boxed());
continue;
}
(Poll::Ready(_), Some(_)) => {
panic!("should not be able to open stream if server hasn't acknowledged existing streams")
}
(Poll::Pending, None) => {}
}
}

match this.worker_streams.poll_next_unpin(cx)? {
Poll::Ready(Some(())) => {
this.streams_processed += 1;
continue;
}
Poll::Ready(None) if this.streams_processed > 0 => {
return Poll::Ready(Ok(this.streams_processed));
}
Poll::Ready(None) | Poll::Pending => {}
}

// Allow the connection to make progress
match this.connection.poll_next_inbound(cx)? {
Poll::Ready(Some(_)) => {
panic!("server never opens stream")
}
Poll::Ready(None) => {
return Poll::Ready(Ok(this.streams_processed));
}
Poll::Pending => {}
}

return Poll::Pending;
}
}
}

async fn ping_pong(mut stream: Stream) -> Result<(), ConnectionError> {
let mut buffer = [0u8; 4];
stream.write_all(b"ping").await?;
stream.read_exact(&mut buffer).await?;

assert_eq!(&buffer, b"pong");

stream.close().await?;

Ok(())
}

async fn pong_ping(mut stream: Stream) -> Result<(), ConnectionError> {
let mut buffer = [0u8; 4];
stream.write_all(b"pong").await?;
stream.read_exact(&mut buffer).await?;

assert_eq!(&buffer, b"ping");

stream.close().await?;

Ok(())
}
Loading