Skip to content

Commit

Permalink
Expose RecvStream::poll_read
Browse files Browse the repository at this point in the history
  • Loading branch information
jean-airoldie authored and djc committed Mar 19, 2024
1 parent a100fc7 commit a5b6fac
Showing 1 changed file with 31 additions and 6 deletions.
37 changes: 31 additions & 6 deletions quinn/src/recv_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,25 @@ impl RecvStream {
.await
}

fn poll_read(
/// Attempts to read from the stream into buf.
///
/// On success, returns Poll::Ready(Ok(num_bytes_read)) and places data in
/// the buf. If no data was read, it implies that EOF has been reached.
///
/// If no data is available for reading, the method returns Poll::Pending
/// and arranges for the current task (via cx.waker()) to receive a notification
/// when the stream becomes readable or is closed.
pub fn poll_read(
&mut self,
cx: &mut Context,
buf: &mut [u8],
) -> Poll<Result<usize, ReadError>> {
let mut buf = ReadBuf::new(buf);
ready!(self.poll_read_buf(cx, &mut buf))?;
Poll::Ready(Ok(buf.filled().len()))
}

fn poll_read_buf(
&mut self,
cx: &mut Context,
buf: &mut ReadBuf<'_>,
Expand Down Expand Up @@ -196,7 +214,14 @@ impl RecvStream {
.await
}

/// Foundation of [`Self::read_chunk`]
/// Attempts to read a chunk from the stream.
///
/// On success, returns `Poll::Ready(Ok(Some(chunk)))`. If `Poll::Ready(Ok(None))`
/// is returned, it implies that EOF has been reached.
///
/// If no data is available for reading, the method returns `Poll::Pending`
/// and arranges for the current task (via cx.waker()) to receive a notification
/// when the stream becomes readable or is closed.
fn poll_read_chunk(
&mut self,
cx: &mut Context,
Expand Down Expand Up @@ -451,7 +476,7 @@ impl futures_io::AsyncRead for RecvStream {
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let mut buf = ReadBuf::new(buf);
ready!(RecvStream::poll_read(self.get_mut(), cx, &mut buf))?;
ready!(RecvStream::poll_read_buf(self.get_mut(), cx, &mut buf))?;
Poll::Ready(Ok(buf.filled().len()))
}
}
Expand All @@ -462,7 +487,7 @@ impl tokio::io::AsyncRead for RecvStream {
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
ready!(Self::poll_read(self.get_mut(), cx, buf))?;
ready!(Self::poll_read_buf(self.get_mut(), cx, buf))?;
Poll::Ready(Ok(()))
}
}
Expand Down Expand Up @@ -550,7 +575,7 @@ impl<'a> Future for Read<'a> {

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = self.get_mut();
ready!(this.stream.poll_read(cx, &mut this.buf))?;
ready!(this.stream.poll_read_buf(cx, &mut this.buf))?;
match this.buf.filled().len() {
0 if this.buf.capacity() != 0 => Poll::Ready(Ok(None)),
n => Poll::Ready(Ok(Some(n))),
Expand All @@ -574,7 +599,7 @@ impl<'a> Future for ReadExact<'a> {
let total = this.buf.remaining();
let mut remaining = total;
while remaining > 0 {
ready!(this.stream.poll_read(cx, &mut this.buf))?;
ready!(this.stream.poll_read_buf(cx, &mut this.buf))?;
let new = this.buf.remaining();
if new == remaining {
let read = total - remaining;
Expand Down

0 comments on commit a5b6fac

Please sign in to comment.