-
Notifications
You must be signed in to change notification settings - Fork 14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow read_to_end with ChunkedEncoding #61
Merged
Merged
Changes from all commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
d0f9646
Improve tests
bugadani 72937f1
Reorganize response module
bugadani d96c0f5
Introduce TryBufRead, clean up
bugadani 45d9a47
unit test reproducing google panic
lulf 14fa5f2
Fix read_to_end with chunked encoding
bugadani 841af88
Remove outdated documentation
bugadani 3f68168
Hide ReadBuffer
bugadani b982905
Fix using the entire available buffer
bugadani 4f9853c
Return BufferTooSmall when reading to end
bugadani bf5f221
Reuse body readers
bugadani 9c2e257
Detect BufferTooSmall error for ToEnd bodies
bugadani File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,235 @@ | ||
use embedded_io_async::{BufRead, Error as _, ErrorType, Read}; | ||
|
||
use crate::{ | ||
reader::{BufferingReader, ReadBuffer}, | ||
Error, TryBufRead, | ||
}; | ||
|
||
#[derive(Clone, Copy, PartialEq, Eq, Debug)] | ||
enum ChunkState { | ||
NoChunk, | ||
NotEmpty(u32), | ||
Empty, | ||
} | ||
|
||
impl ChunkState { | ||
fn consume(&mut self, amt: usize) -> usize { | ||
if let ChunkState::NotEmpty(remaining) = self { | ||
let consumed = (amt as u32).min(*remaining); | ||
*remaining -= consumed; | ||
consumed as usize | ||
} else { | ||
0 | ||
} | ||
} | ||
|
||
fn len(self) -> usize { | ||
if let ChunkState::NotEmpty(len) = self { | ||
len as usize | ||
} else { | ||
0 | ||
} | ||
} | ||
} | ||
|
||
/// Chunked response body reader | ||
pub struct ChunkedBodyReader<B> { | ||
pub raw_body: B, | ||
chunk_remaining: ChunkState, | ||
} | ||
|
||
impl<C> ChunkedBodyReader<C> | ||
where | ||
C: Read, | ||
{ | ||
pub fn new(raw_body: C) -> Self { | ||
Self { | ||
raw_body, | ||
chunk_remaining: ChunkState::NoChunk, | ||
} | ||
} | ||
|
||
pub fn is_done(&self) -> bool { | ||
self.chunk_remaining == ChunkState::Empty | ||
} | ||
|
||
async fn read_next_chunk_length(&mut self) -> Result<(), Error> { | ||
let mut header_buf = [0; 8 + 2]; // 32 bit hex + \r + \n | ||
let mut total_read = 0; | ||
|
||
'read_size: loop { | ||
let mut byte = 0; | ||
self.raw_body | ||
.read_exact(core::slice::from_mut(&mut byte)) | ||
.await | ||
.map_err(|e| Error::from(e).kind())?; | ||
|
||
if byte != b'\n' { | ||
header_buf[total_read] = byte; | ||
total_read += 1; | ||
|
||
if total_read == header_buf.len() { | ||
return Err(Error::Codec); | ||
} | ||
} else { | ||
if total_read == 0 || header_buf[total_read - 1] != b'\r' { | ||
return Err(Error::Codec); | ||
} | ||
break 'read_size; | ||
} | ||
} | ||
|
||
let hex_digits = total_read - 1; | ||
|
||
// Prepend hex with zeros | ||
let mut hex = [b'0'; 8]; | ||
hex[8 - hex_digits..].copy_from_slice(&header_buf[..hex_digits]); | ||
|
||
let mut bytes = [0; 4]; | ||
hex::decode_to_slice(hex, &mut bytes).map_err(|_| Error::Codec)?; | ||
|
||
let chunk_length = u32::from_be_bytes(bytes); | ||
|
||
debug!("Chunk length: {}", chunk_length); | ||
|
||
self.chunk_remaining = match chunk_length { | ||
0 => ChunkState::Empty, | ||
other => ChunkState::NotEmpty(other), | ||
}; | ||
|
||
Ok(()) | ||
} | ||
|
||
async fn read_chunk_end(&mut self) -> Result<(), Error> { | ||
// All chunks are terminated with a \r\n | ||
let mut newline_buf = [0; 2]; | ||
self.raw_body.read_exact(&mut newline_buf).await?; | ||
|
||
if newline_buf != [b'\r', b'\n'] { | ||
return Err(Error::Codec); | ||
} | ||
Ok(()) | ||
} | ||
|
||
/// Handles chunk boundary and returns the number of bytes in the current (or new) chunk. | ||
async fn handle_chunk_boundary(&mut self) -> Result<usize, Error> { | ||
match self.chunk_remaining { | ||
ChunkState::NoChunk => self.read_next_chunk_length().await?, | ||
|
||
ChunkState::NotEmpty(0) => { | ||
// The current chunk is currently empty, advance into a new chunk... | ||
self.read_chunk_end().await?; | ||
self.read_next_chunk_length().await?; | ||
} | ||
|
||
ChunkState::NotEmpty(_) => {} | ||
|
||
ChunkState::Empty => return Ok(0), | ||
} | ||
|
||
if self.chunk_remaining == ChunkState::Empty { | ||
// Read final chunk termination | ||
self.read_chunk_end().await?; | ||
} | ||
|
||
Ok(self.chunk_remaining.len()) | ||
} | ||
} | ||
|
||
impl<'conn, 'buf, C> ChunkedBodyReader<BufferingReader<'conn, 'buf, C>> | ||
where | ||
C: Read + TryBufRead, | ||
{ | ||
pub(crate) async fn read_to_end(self) -> Result<&'buf mut [u8], Error> { | ||
let buffer = self.raw_body.buffer.buffer; | ||
|
||
// We reconstruct the reader to change the 'buf lifetime. | ||
let mut reader = ChunkedBodyReader { | ||
raw_body: BufferingReader { | ||
buffer: ReadBuffer { | ||
buffer: &mut buffer[..], | ||
loaded: self.raw_body.buffer.loaded, | ||
}, | ||
stream: self.raw_body.stream, | ||
}, | ||
chunk_remaining: self.chunk_remaining, | ||
}; | ||
|
||
let mut len = 0; | ||
while !reader.raw_body.buffer.buffer.is_empty() { | ||
// Read some | ||
let read = reader.fill_buf().await?.len(); | ||
len += read; | ||
|
||
// Make sure we don't erase the newly read data | ||
let was_loaded = reader.raw_body.buffer.loaded; | ||
let fake_loaded = read.min(was_loaded); | ||
reader.raw_body.buffer.loaded = fake_loaded; | ||
|
||
// Consume the returned buffer | ||
reader.consume(read); | ||
|
||
if reader.is_done() { | ||
// If we're done, we don't care about the rest of the housekeeping. | ||
break; | ||
} | ||
|
||
// How many bytes were actually consumed from the preloaded buffer? | ||
let consumed_from_buffer = fake_loaded - reader.raw_body.buffer.loaded; | ||
|
||
// ... move the buffer by that many bytes to avoid overwriting in the next iteration. | ||
reader.raw_body.buffer.loaded = was_loaded - consumed_from_buffer; | ||
reader.raw_body.buffer.buffer = &mut reader.raw_body.buffer.buffer[consumed_from_buffer..]; | ||
} | ||
|
||
if !reader.is_done() { | ||
return Err(Error::BufferTooSmall); | ||
} | ||
|
||
Ok(&mut buffer[..len]) | ||
} | ||
} | ||
|
||
impl<C> ErrorType for ChunkedBodyReader<C> { | ||
type Error = Error; | ||
} | ||
|
||
impl<C> Read for ChunkedBodyReader<C> | ||
where | ||
C: Read, | ||
{ | ||
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> { | ||
let remaining = self.handle_chunk_boundary().await?; | ||
let max_len = buf.len().min(remaining); | ||
|
||
let len = self | ||
.raw_body | ||
.read(&mut buf[..max_len]) | ||
.await | ||
.map_err(|e| Error::Network(e.kind()))?; | ||
|
||
self.chunk_remaining.consume(len); | ||
|
||
Ok(len) | ||
} | ||
} | ||
|
||
impl<C> BufRead for ChunkedBodyReader<C> | ||
where | ||
C: BufRead + Read, | ||
{ | ||
async fn fill_buf(&mut self) -> Result<&[u8], Self::Error> { | ||
let remaining = self.handle_chunk_boundary().await?; | ||
|
||
let buf = self.raw_body.fill_buf().await.map_err(|e| Error::Network(e.kind()))?; | ||
|
||
let len = buf.len().min(remaining); | ||
|
||
Ok(&buf[..len]) | ||
} | ||
|
||
fn consume(&mut self, amt: usize) { | ||
let consumed = self.chunk_remaining.consume(amt); | ||
self.raw_body.consume(consumed); | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rmja This got handy here, too :)