From a35e8afcec974d45fec8dee8370264eb0655fd77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1niel=20Buga?= Date: Tue, 17 Oct 2023 21:23:31 +0200 Subject: [PATCH 01/18] Clean up initial body copy --- src/response.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/src/response.rs b/src/response.rs index 8b10017..f491442 100644 --- a/src/response.rs +++ b/src/response.rs @@ -151,18 +151,14 @@ where ReaderHint::ToEnd }; - // Move the body part of the bytes in the header buffer to the beginning of the buffer - let header_buf = self.header_buf; - for i in 0..self.raw_body_read { - header_buf[i] = header_buf[self.header_len + i]; - } - // From now on, the header buffer is now the body buffer as all header bytes have been overwritten - let body_buf = header_buf; + // Move the body part of the bytes in the header buffer to the beginning of the buffer. + self.header_buf + .copy_within(self.header_len..self.header_len + self.raw_body_read, 0); ResponseBody { conn: self.conn, reader_hint, - body_buf, + body_buf: self.header_buf, raw_body_read: self.raw_body_read, } } From 77c8c552ca85d92646e68ff1defa490c66076ea2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1niel=20Buga?= Date: Wed, 18 Oct 2023 08:43:22 +0200 Subject: [PATCH 02/18] Implement BufReader by reusing provided read buffer --- src/concat.rs | 88 ------------------------------------- src/lib.rs | 2 +- src/reader.rs | 112 +++++++++++++++++++++++++++++++++++++++++++++++ src/response.rs | 100 ++++++++++++++++++++++++++++++------------ tests/request.rs | 3 +- 5 files changed, 188 insertions(+), 117 deletions(-) delete mode 100644 src/concat.rs create mode 100644 src/reader.rs diff --git a/src/concat.rs b/src/concat.rs deleted file mode 100644 index 0d0853f..0000000 --- a/src/concat.rs +++ /dev/null @@ -1,88 +0,0 @@ -use embedded_io::{ErrorKind, ErrorType}; -use embedded_io_async::Read; - -pub struct ConcatReader -where - A: Read, - B: Read, -{ - first: A, - last: B, - first_exhausted: bool, -} - -impl ConcatReader -where - A: Read, - B: Read, -{ - pub const fn new(first: A, last: B) -> Self { - Self { - first, - last, - first_exhausted: false, - } - } -} - -pub enum ConcatReaderError -where - A: Read, - B: Read, -{ - First(A::Error), - Last(B::Error), -} - -impl core::fmt::Debug for ConcatReaderError -where - A: Read, - B: Read, -{ - fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - match self { - Self::First(arg0) => f.debug_tuple("First").field(arg0).finish(), - Self::Last(arg0) => f.debug_tuple("Last").field(arg0).finish(), - } - } -} - -impl embedded_io::Error for ConcatReaderError -where - A: Read, - B: Read, -{ - fn kind(&self) -> ErrorKind { - match self { - ConcatReaderError::First(a) => a.kind(), - ConcatReaderError::Last(b) => b.kind(), - } - } -} - -impl ErrorType for ConcatReader -where - A: Read, - B: Read, -{ - type Error = ConcatReaderError; -} - -impl Read for ConcatReader -where - A: Read, - B: Read, -{ - async fn read(&mut self, buf: &mut [u8]) -> Result { - if !self.first_exhausted { - let len = self.first.read(buf).await.map_err(ConcatReaderError::First)?; - if len > 0 { - return Ok(len); - } - - self.first_exhausted = true; - } - - self.last.read(buf).await.map_err(ConcatReaderError::Last) - } -} diff --git a/src/lib.rs b/src/lib.rs index cf47a6d..2a13cce 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,8 +11,8 @@ use embedded_io_async::ReadExactError; mod fmt; pub mod client; -mod concat; pub mod headers; +mod reader; pub mod request; pub mod response; diff --git a/src/reader.rs b/src/reader.rs new file mode 100644 index 0000000..5119a25 --- /dev/null +++ b/src/reader.rs @@ -0,0 +1,112 @@ +use embedded_io::{Error, ErrorKind, ErrorType}; +use embedded_io_async::{BufRead, Read, Write}; + +use crate::client::HttpConnection; + +struct ReadBuffer<'buf> { + buffer: &'buf mut [u8], + loaded: usize, +} + +impl<'buf> ReadBuffer<'buf> { + fn new(buffer: &'buf mut [u8], loaded: usize) -> Self { + Self { buffer, loaded } + } +} + +impl ReadBuffer<'_> { + fn is_empty(&self) -> bool { + self.loaded == 0 + } + + fn read(&mut self, buf: &mut [u8]) -> Result { + let amt = self.loaded.min(buf.len()); + buf[..amt].copy_from_slice(&self.buffer[0..amt]); + + self.consume(amt); + + Ok(amt) + } + + fn fill_buf(&mut self) -> Result<&[u8], ErrorKind> { + Ok(&self.buffer[..self.loaded]) + } + + fn consume(&mut self, amt: usize) -> usize { + let to_consume = amt.min(self.loaded); + + self.buffer.copy_within(to_consume..self.loaded, 0); + self.loaded -= to_consume; + + amt - to_consume + } +} + +pub struct BufferingReader<'buf, B> { + buffer: ReadBuffer<'buf>, + stream: B, +} + +impl<'buf, B> BufferingReader<'buf, B> { + pub fn new(buffer: &'buf mut [u8], loaded: usize, stream: B) -> Self { + Self { + buffer: ReadBuffer::new(buffer, loaded), + stream, + } + } +} + +impl ErrorType for BufferingReader<'_, &mut HttpConnection<'_, C>> +where + C: Read + Write, +{ + type Error = ErrorKind; +} + +impl Read for BufferingReader<'_, &mut HttpConnection<'_, C>> +where + C: Read + Write, +{ + async fn read(&mut self, buf: &mut [u8]) -> Result { + if !self.buffer.is_empty() { + let amt = self.buffer.read(buf)?; + return Ok(amt); + } + + self.stream.read(buf).await + } +} + +impl BufRead for BufferingReader<'_, &mut HttpConnection<'_, C>> +where + C: Read + Write, +{ + async fn fill_buf(&mut self) -> Result<&[u8], ErrorKind> { + // We need to consume the loaded bytes before we read mode. + if self.buffer.is_empty() { + // embedded-tls has its own internal buffer, let's prefer that if we can + #[cfg(feature = "embedded-tls")] + if let HttpConnection::Tls(ref mut tls) = self.stream { + return tls.fill_buf().await.map_err(|e| e.kind()); + } + + self.buffer.loaded = self.stream.read(&mut self.buffer.buffer).await?; + } + + self.buffer.fill_buf() + } + + fn consume(&mut self, amt: usize) { + // It's possible that the user requested more bytes to be consumed than loaded. Especially + // since it's also possible that nothing is loaded, after we consumed all and are using + // embedded-tls's buffering. + let unconsumed = self.buffer.consume(amt); + + if unconsumed > 0 { + #[cfg(feature = "embedded-tls")] + if let HttpConnection::Tls(tls) = &mut self.stream { + tls.consume(unconsumed); + } + } + } +} diff --git a/src/response.rs b/src/response.rs index f491442..c5e9793 100644 --- a/src/response.rs +++ b/src/response.rs @@ -2,8 +2,8 @@ use embedded_io::{Error as _, ErrorType}; use embedded_io_async::Read; use heapless::Vec; -use crate::concat::ConcatReader; use crate::headers::{ContentType, KeepAlive, TransferEncoding}; +use crate::reader::BufferingReader; use crate::request::Method; use crate::Error; @@ -206,8 +206,8 @@ impl<'buf, 'conn, C> ResponseBody<'buf, 'conn, C> where C: Read, { - pub fn reader(self) -> BodyReader> { - let raw_body = ConcatReader::new(&self.body_buf[..self.raw_body_read], self.conn); + pub fn reader(self) -> BodyReader> { + let raw_body = BufferingReader::new(self.body_buf, self.raw_body_read, self.conn); match self.reader_hint { ReaderHint::Empty => BodyReader::Empty, @@ -225,7 +225,11 @@ where } } -impl<'buf, 'conn, C: Read> ResponseBody<'buf, 'conn, C> { +impl<'buf, 'conn, C> ResponseBody<'buf, 'conn, C> +where + C: Read, + BufferingReader<'buf, &'conn mut C>: Read, +{ /// Read the entire body into the buffer originally provided [`Response::read()`]. /// This requires that this original buffer is large enough to contain the entire body. /// @@ -275,10 +279,7 @@ impl<'buf, 'conn, C: Read> ResponseBody<'buf, 'conn, C> { } /// A body reader -pub enum BodyReader -where - B: Read, -{ +pub enum BodyReader { Empty, FixedLength(FixedLengthBodyReader), Chunked(ChunkedBodyReader), @@ -351,7 +352,7 @@ where } /// Fixed length response body reader -pub struct FixedLengthBodyReader { +pub struct FixedLengthBodyReader { raw_body: B, remaining: usize, } @@ -377,10 +378,7 @@ impl Read for FixedLengthBodyReader { } /// Chunked response body reader -pub struct ChunkedBodyReader -where - B: Read, -{ +pub struct ChunkedBodyReader { raw_body: B, chunk_remaining: u32, empty_chunk_received: bool, @@ -597,11 +595,53 @@ impl From for Status { #[cfg(test)] mod tests { - use super::*; + use embedded_io::ErrorType; + use embedded_io_async::{Read, Write}; + + use crate::{ + client::HttpConnection, + request::Method, + response::{ChunkedBodyReader, Response}, + }; + + struct Buffer { + b: Vec, + } + impl Buffer { + fn from(b: &[u8]) -> Self { + Self { b: b.to_vec() } + } + } + + impl ErrorType for Buffer { + type Error = embedded_io::ErrorKind; + } + + impl Read for Buffer { + async fn read(&mut self, buf: &mut [u8]) -> Result { + let len = buf.len().min(self.b.len()); + buf[..len].copy_from_slice(&self.b[..len]); + self.b.drain(..len); + Ok(len) + } + } + + impl Write for Buffer { + async fn write(&mut self, buf: &[u8]) -> Result { + self.b.extend_from_slice(buf); + Ok(buf.len()) + } + + async fn flush(&mut self) -> Result<(), Self::Error> { + Ok(()) + } + } #[tokio::test] async fn can_read_with_content_length_with_same_buffer() { - let mut response = b"HTTP/1.1 200 OK\r\nContent-Length: 11\r\n\r\nHELLO WORLD".as_slice(); + let mut response = HttpConnection::Plain(Buffer::from( + b"HTTP/1.1 200 OK\r\nContent-Length: 11\r\n\r\nHELLO WORLD", + )); let mut response_buf = [0; 200]; let response = Response::read(&mut response, Method::GET, &mut response_buf) .await @@ -614,7 +654,9 @@ mod tests { #[tokio::test] async fn can_read_with_content_length_to_other_buffer() { - let mut response = b"HTTP/1.1 200 OK\r\nContent-Length: 11\r\n\r\nHELLO WORLD".as_slice(); + let mut response = HttpConnection::Plain(Buffer::from( + b"HTTP/1.1 200 OK\r\nContent-Length: 11\r\n\r\nHELLO WORLD", + )); let mut header_buf = [0; 200]; let response = Response::read(&mut response, Method::GET, &mut header_buf) .await @@ -628,7 +670,9 @@ mod tests { #[tokio::test] async fn can_discard_with_content_length() { - let mut response = b"HTTP/1.1 200 OK\r\nContent-Length: 11\r\n\r\nHELLO WORLD".as_slice(); + let mut response = HttpConnection::Plain(Buffer::from( + b"HTTP/1.1 200 OK\r\nContent-Length: 11\r\n\r\nHELLO WORLD", + )); let mut response_buf = [0; 200]; let response = Response::read(&mut response, Method::GET, &mut response_buf) .await @@ -639,8 +683,9 @@ mod tests { #[tokio::test] async fn can_read_with_chunked_encoding() { - let mut response = - b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\nB\r\nHELLO WORLD\r\n0\r\n\r\n".as_slice(); + let mut response = HttpConnection::Plain(Buffer::from( + b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\nB\r\nHELLO WORLD\r\n0\r\n\r\n", + )); let mut header_buf = [0; 200]; let response = Response::read(&mut response, Method::GET, &mut header_buf) .await @@ -654,8 +699,9 @@ mod tests { #[tokio::test] async fn can_discard_with_chunked_encoding() { - let mut response = - b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\nB\r\nHELLO WORLD\r\n0\r\n\r\n".as_slice(); + let mut response = HttpConnection::Plain(Buffer::from( + b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\nB\r\nHELLO WORLD\r\n0\r\n\r\n", + )); let mut header_buf = [0; 200]; let response = Response::read(&mut response, Method::GET, &mut header_buf) .await @@ -666,7 +712,7 @@ mod tests { #[tokio::test] async fn can_read_to_end_of_connection_with_same_buffer() { - let mut response = b"HTTP/1.1 200 OK\r\n\r\nHELLO WORLD".as_slice(); + let mut response = HttpConnection::Plain(Buffer::from(b"HTTP/1.1 200 OK\r\n\r\nHELLO WORLD")); let mut header_buf = [0; 200]; let response = Response::read(&mut response, Method::GET, &mut header_buf) .await @@ -679,7 +725,7 @@ mod tests { #[tokio::test] async fn can_read_to_end_of_connection_to_other_buffer() { - let mut response = b"HTTP/1.1 200 OK\r\n\r\nHELLO WORLD".as_slice(); + let mut response = HttpConnection::Plain(Buffer::from(b"HTTP/1.1 200 OK\r\n\r\nHELLO WORLD")); let mut header_buf = [0; 200]; let response = Response::read(&mut response, Method::GET, &mut header_buf) .await @@ -693,7 +739,7 @@ mod tests { #[tokio::test] async fn can_discard_to_end_of_connection() { - let mut response = b"HTTP/1.1 200 OK\r\n\r\nHELLO WORLD".as_slice(); + let mut response = HttpConnection::Plain(Buffer::from(b"HTTP/1.1 200 OK\r\n\r\nHELLO WORLD")); let mut header_buf = [0; 200]; let response = Response::read(&mut response, Method::GET, &mut header_buf) .await @@ -704,7 +750,7 @@ mod tests { #[tokio::test] async fn chunked_body_reader_can_read_with_large_buffer() { - let raw_body = "1\r\nX\r\n10\r\nYYYYYYYYYYYYYYYY\r\n0\r\n\r\n".as_bytes(); + let raw_body = HttpConnection::Plain(Buffer::from(b"1\r\nX\r\n10\r\nYYYYYYYYYYYYYYYY\r\n0\r\n\r\n")); let mut reader = ChunkedBodyReader { raw_body, chunk_remaining: 0, @@ -721,14 +767,14 @@ mod tests { #[tokio::test] async fn chunked_body_reader_can_read_with_tiny_buffer() { - let raw_body = "1\r\nX\r\n10\r\nYYYYYYYYYYYYYYYY\r\n0\r\n\r\n".as_bytes(); + let raw_body = HttpConnection::Plain(Buffer::from(b"1\r\nX\r\n10\r\nYYYYYYYYYYYYYYYY\r\n0\r\n\r\n")); let mut reader = ChunkedBodyReader { raw_body, chunk_remaining: 0, empty_chunk_received: false, }; - let mut body = Vec::::new(); + let mut body = heapless::Vec::::new(); for _ in 0..17 { let mut buf = [0; 1]; assert_eq!(1, reader.read(&mut buf).await.unwrap()); diff --git a/tests/request.rs b/tests/request.rs index 5c43e21..d9a504e 100644 --- a/tests/request.rs +++ b/tests/request.rs @@ -1,6 +1,7 @@ use embedded_io_adapters::tokio_1::FromTokio; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Server}; +use reqwless::client::HttpConnection; use reqwless::request::{Method, RequestBuilder}; use reqwless::{headers::ContentType, request::Request, response::Response}; use std::str::from_utf8; @@ -35,7 +36,7 @@ async fn test_request_response() { }); let stream = TcpStream::connect(addr).await.unwrap(); - let mut stream = FromTokio::new(stream); + let mut stream = HttpConnection::Plain(FromTokio::new(stream)); let request = Request::post("/") .body(b"PING".as_slice()) From 8f30992e84750681d044f2600de4a45c846df739 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1niel=20Buga?= Date: Wed, 18 Oct 2023 10:24:31 +0200 Subject: [PATCH 03/18] Implement BufRead for BodyReader --- src/response.rs | 331 ++++++++++++++++++++++++++++++------------------ 1 file changed, 210 insertions(+), 121 deletions(-) diff --git a/src/response.rs b/src/response.rs index c5e9793..0e396d7 100644 --- a/src/response.rs +++ b/src/response.rs @@ -1,5 +1,5 @@ use embedded_io::{Error as _, ErrorType}; -use embedded_io_async::Read; +use embedded_io_async::{BufRead, Read}; use heapless::Vec; use crate::headers::{ContentType, KeepAlive, TransferEncoding}; @@ -217,8 +217,7 @@ where }), ReaderHint::Chunked => BodyReader::Chunked(ChunkedBodyReader { raw_body, - chunk_remaining: 0, - empty_chunk_received: false, + chunk_remaining: ChunkState::NoChunk, }), ReaderHint::ToEnd => BodyReader::ToEnd(raw_body), } @@ -228,7 +227,7 @@ where impl<'buf, 'conn, C> ResponseBody<'buf, 'conn, C> where C: Read, - BufferingReader<'buf, &'conn mut C>: Read, + BufferingReader<'buf, &'conn mut C>: BufRead + Read, { /// Read the entire body into the buffer originally provided [`Response::read()`]. /// This requires that this original buffer is large enough to contain the entire body. @@ -288,7 +287,7 @@ pub enum BodyReader { impl BodyReader where - B: Read, + B: BufRead + Read, { /// Read the entire body pub async fn read_to_end(&mut self, buf: &mut [u8]) -> Result { @@ -304,7 +303,7 @@ where let is_done = match self { BodyReader::Empty => true, BodyReader::FixedLength(reader) => reader.remaining == 0, - BodyReader::Chunked(reader) => reader.empty_chunk_received, + BodyReader::Chunked(reader) => reader.chunk_remaining == ChunkState::Empty, BodyReader::ToEnd(_) => true, }; @@ -318,28 +317,26 @@ where async fn discard(&mut self) -> Result { let mut body_len = 0; loop { - let mut trash = [0; 256]; - let len = self.read(&mut trash).await?; - if len == 0 { + let buf = self.fill_buf().await?; + if buf.is_empty() { break; } - body_len += len; + let buf_len = buf.len(); + body_len += buf_len; + self.consume(buf_len); } Ok(body_len) } } -impl ErrorType for BodyReader -where - B: Read, -{ +impl ErrorType for BodyReader { type Error = Error; } impl Read for BodyReader where - B: Read, + B: BufRead + Read, { async fn read(&mut self, buf: &mut [u8]) -> Result { match self { @@ -351,28 +348,107 @@ where } } +impl BufRead for BodyReader +where + B: BufRead + Read, +{ + async fn fill_buf(&mut self) -> Result<&[u8], Self::Error> { + match self { + BodyReader::Empty => Ok(&[]), + BodyReader::FixedLength(reader) => reader.fill_buf().await, + BodyReader::Chunked(reader) => reader.fill_buf().await, + BodyReader::ToEnd(conn) => conn.fill_buf().await.map_err(|e| Error::Network(e.kind())), + } + } + + fn consume(&mut self, amt: usize) { + match self { + BodyReader::Empty => {} + BodyReader::FixedLength(reader) => reader.consume(amt), + BodyReader::Chunked(reader) => reader.consume(amt), + BodyReader::ToEnd(conn) => conn.consume(amt), + } + } +} + /// Fixed length response body reader pub struct FixedLengthBodyReader { raw_body: B, remaining: usize, } -impl ErrorType for FixedLengthBodyReader { +impl ErrorType for FixedLengthBodyReader { type Error = Error; } -impl Read for FixedLengthBodyReader { +impl Read for FixedLengthBodyReader +where + C: BufRead + Read, +{ async fn read(&mut self, buf: &mut [u8]) -> Result { + let loaded = self.fill_buf().await?; + let len = loaded.len().min(buf.len()); + + buf[..len].copy_from_slice(&loaded[..len]); + self.consume(len); + + Ok(len) + } +} + +impl BufRead for FixedLengthBodyReader +where + C: BufRead + Read, +{ + async fn fill_buf(&mut self) -> Result<&[u8], Self::Error> { if self.remaining == 0 { - return Ok(0); + return Ok(&[]); } - let to_read = usize::min(self.remaining, buf.len()); - let len = self.raw_body.read(&mut buf[..to_read]).await.map_err(|e| e.kind())?; - if len > 0 { - self.remaining -= len; - Ok(len) + + let loaded = self + .raw_body + .fill_buf() + .await + .map_err(|e| Error::Network(e.kind())) + .map(|data| &data[..data.len().min(self.remaining)])?; + + if loaded.is_empty() { + return Err(Error::ConnectionClosed); + } + + Ok(loaded) + } + + fn consume(&mut self, amt: usize) { + let amt = amt.min(self.remaining); + self.remaining -= amt; + self.raw_body.consume(amt) + } +} + +#[derive(Clone, Copy, PartialEq, Eq)] +pub enum ChunkState { + NoChunk, + NotEmpty(u32), + Empty, +} + +impl ChunkState { + fn consume(&mut self, amt: usize) -> usize { + if let ChunkState::NotEmpty(remaining) = self { + let consumed = (amt as u32).min(*remaining); + *remaining -= consumed; + consumed as usize } else { - Err(Error::ConnectionClosed) + 0 + } + } + + fn len(self) -> usize { + if let ChunkState::NotEmpty(len) = self { + len as usize + } else { + 0 } } } @@ -380,11 +456,63 @@ impl Read for FixedLengthBodyReader { /// Chunked response body reader pub struct ChunkedBodyReader { raw_body: B, - chunk_remaining: u32, - empty_chunk_received: bool, + chunk_remaining: ChunkState, } -impl ChunkedBodyReader { +impl ChunkedBodyReader +where + C: BufRead + Read, +{ + async fn read_next_chunk_length(&mut self) -> Result<(), Error> { + let mut header_buf = [0; 8 + 2]; // 32 bit hex + \r + \n + let mut total_read = 0; + + 'read_size: loop { + let buf = self.raw_body.fill_buf().await.map_err(|e| e.kind())?; + for (i, byte) in buf.iter().enumerate() { + if *byte != b'\n' { + header_buf[total_read] = *byte; + total_read += 1; + + if total_read == header_buf.len() { + self.raw_body.consume(i + 1); + return Err(Error::Codec); + } + } else { + self.raw_body.consume(i + 1); + break 'read_size; + } + } + + let consumed = buf.len(); + self.raw_body.consume(consumed); + } + + if header_buf[total_read - 1] != b'\r' { + return Err(Error::Codec); + } + + let hex_digits = total_read - 1; + + // Prepend hex with zeros + let mut hex = [b'0'; 8]; + hex[8 - hex_digits..].copy_from_slice(&header_buf[..hex_digits]); + + let mut bytes = [0; 4]; + hex::decode_to_slice(hex, &mut bytes).map_err(|_| Error::Codec)?; + + let chunk_length = u32::from_be_bytes(bytes); + + debug!("Chunk length: {}", chunk_length); + + self.chunk_remaining = match chunk_length { + 0 => ChunkState::Empty, + other => ChunkState::NotEmpty(other), + }; + + Ok(()) + } + async fn read_chunk_end(&mut self) -> Result<(), Error> { // All chunks are terminated with a \r\n let mut newline_buf = [0; 2]; @@ -397,106 +525,65 @@ impl ChunkedBodyReader { } } -impl ErrorType for ChunkedBodyReader { +impl ErrorType for ChunkedBodyReader { type Error = Error; } -impl Read for ChunkedBodyReader { +impl Read for ChunkedBodyReader +where + C: BufRead + Read, +{ async fn read(&mut self, buf: &mut [u8]) -> Result { - if buf.is_empty() || self.empty_chunk_received { + if buf.is_empty() { return Ok(0); } - if self.chunk_remaining == 0 { - // The current chunk is currently empty, advance into a new chunk... - - let mut header_buf = [0; 8 + 2]; // 32 bit hex + \r + \n - let mut total_read = 0; - - // For now, limit the number of bytes that we can read to avoid reading into a header after the current - let mut max_read = 3; // Single hex digit + \r + \n - loop { - let read = self - .raw_body - .read(&mut header_buf[total_read..max_read]) - .await - .map_err(|e| e.kind())?; - if read == 0 { - return Err(Error::ConnectionClosed); - } - total_read += read; + // If we receive an empty buffer here, the body includes an empty chunk. + // `fill_buf` will return an Err if the connection is closed. + let loaded = self.fill_buf().await?; - // Decode the chunked header - let header_and_body = &header_buf[..total_read]; - if let Some(nl) = header_and_body.iter().position(|x| *x == b'\n') { - let header = &header_and_body[..nl + 1]; - if nl == 0 || header[nl - 1] != b'\r' { - return Err(Error::Codec); - } - let hex_digits = nl - 1; - // Prepend hex with zeros - let mut hex = [b'0'; 8]; - hex[8 - hex_digits..].copy_from_slice(&header[..hex_digits]); - let mut bytes = [0; 4]; - hex::decode_to_slice(hex, &mut bytes).map_err(|_| Error::Codec)?; - self.chunk_remaining = u32::from_be_bytes(bytes); - - if self.chunk_remaining == 0 { - self.empty_chunk_received = true; - } + let len = loaded.len().min(buf.len()); - // Return the excess body bytes read during the header, if any - let excess_body_read = header_and_body.len() - header.len(); - if excess_body_read > 0 { - if excess_body_read > self.chunk_remaining as usize { - // We have read chunk bytes that exceed the size of the chunk - return Err(Error::Codec); - } - - buf[..excess_body_read].copy_from_slice(&header_and_body[header.len()..]); - self.chunk_remaining -= excess_body_read as u32; - return Ok(excess_body_read); - } + buf[..len].copy_from_slice(&loaded[..len]); + self.consume(len); - break; - } + Ok(len) + } +} - if total_read >= 3 { - // At least three bytes were read and a \n was not found - // This means that the chunk length is at least double-digit hex - // which in turn means that it is impossible for another header to - // be present within the 10 bytes header buffer. - // 10 is the length of the max header "ffffffff\r\n". - // For example, 10\r\nXXXXXXYYYYYYYYYY is more than 10 bytes - // - 10\r\n is the header - // - XXXXXX are the excess body 6 bytes that we may read - // - YYYYYYYYYY are the remaining unread chunk bytes. - // However, for reading these excess bytes into the actual chunk payload, - // the user buffer must be large enough to actually contain the excess read bytes. - // A \n was not found, and we can read that + buf.len(). - max_read = core::cmp::min(total_read + 1 + buf.len(), 10); - } - } - } +impl BufRead for ChunkedBodyReader +where + C: BufRead + Read, +{ + async fn fill_buf(&mut self) -> Result<&[u8], Self::Error> { + match self.chunk_remaining { + ChunkState::NoChunk => self.read_next_chunk_length().await?, - if self.empty_chunk_received { - self.read_chunk_end().await?; - Ok(0) - } else { - let max_len = usize::min(self.chunk_remaining as usize, buf.len()); - let len = self.raw_body.read(&mut buf[..max_len]).await.map_err(|e| e.kind())?; - if len == 0 { - return Err(Error::ConnectionClosed); + ChunkState::NotEmpty(0) => { + // The current chunk is currently empty, advance into a new chunk... + self.read_chunk_end().await?; + self.read_next_chunk_length().await?; } - self.chunk_remaining -= len as u32; + ChunkState::NotEmpty(_) => {} - if self.chunk_remaining == 0 { - self.read_chunk_end().await?; - } + ChunkState::Empty => return Ok(&[]), + } - Ok(len) + let remaining = self.chunk_remaining.len(); + + let buf = self.raw_body.fill_buf().await.map_err(|e| Error::Network(e.kind()))?; + if buf.is_empty() { + return Err(Error::ConnectionClosed); } + + let len = buf.len().min(remaining); + Ok(&buf[..len]) + } + + fn consume(&mut self, amt: usize) { + let consumed = self.chunk_remaining.consume(amt); + self.raw_body.consume(consumed); } } @@ -595,13 +682,14 @@ impl From for Status { #[cfg(test)] mod tests { - use embedded_io::ErrorType; + use embedded_io::{ErrorKind, ErrorType}; use embedded_io_async::{Read, Write}; use crate::{ client::HttpConnection, + reader::BufferingReader, request::Method, - response::{ChunkedBodyReader, Response}, + response::{ChunkState, ChunkedBodyReader, Response}, }; struct Buffer { @@ -614,7 +702,7 @@ mod tests { } impl ErrorType for Buffer { - type Error = embedded_io::ErrorKind; + type Error = ErrorKind; } impl Read for Buffer { @@ -622,6 +710,7 @@ mod tests { let len = buf.len().min(self.b.len()); buf[..len].copy_from_slice(&self.b[..len]); self.b.drain(..len); + Ok(len) } } @@ -750,11 +839,11 @@ mod tests { #[tokio::test] async fn chunked_body_reader_can_read_with_large_buffer() { - let raw_body = HttpConnection::Plain(Buffer::from(b"1\r\nX\r\n10\r\nYYYYYYYYYYYYYYYY\r\n0\r\n\r\n")); + let mut raw_body = HttpConnection::Plain(Buffer::from(b"1\r\nX\r\n10\r\nYYYYYYYYYYYYYYYY\r\n0\r\n\r\n")); + let mut read_buffer = [0; 128]; let mut reader = ChunkedBodyReader { - raw_body, - chunk_remaining: 0, - empty_chunk_received: false, + raw_body: BufferingReader::new(&mut read_buffer, 0, &mut raw_body), + chunk_remaining: ChunkState::NoChunk, }; let mut body = [0; 17]; @@ -767,11 +856,11 @@ mod tests { #[tokio::test] async fn chunked_body_reader_can_read_with_tiny_buffer() { - let raw_body = HttpConnection::Plain(Buffer::from(b"1\r\nX\r\n10\r\nYYYYYYYYYYYYYYYY\r\n0\r\n\r\n")); + let mut raw_body = HttpConnection::Plain(Buffer::from(b"1\r\nX\r\n10\r\nYYYYYYYYYYYYYYYY\r\n0\r\n\r\n")); + let mut read_buffer = [0; 128]; let mut reader = ChunkedBodyReader { - raw_body, - chunk_remaining: 0, - empty_chunk_received: false, + raw_body: BufferingReader::new(&mut read_buffer, 0, &mut raw_body), + chunk_remaining: ChunkState::NoChunk, }; let mut body = heapless::Vec::::new(); From 3ae5f10a25ac404754907195b0c51dd4f800145e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1niel=20Buga?= Date: Fri, 20 Oct 2023 16:19:25 +0200 Subject: [PATCH 04/18] Add test case and fix build --- src/client.rs | 76 +++++++++++++++++++++++++++++++++---------------- tests/client.rs | 37 ++++++++++++++++++++++++ 2 files changed, 88 insertions(+), 25 deletions(-) diff --git a/src/client.rs b/src/client.rs index ec23515..279ec8e 100644 --- a/src/client.rs +++ b/src/client.rs @@ -136,7 +136,7 @@ where &'m mut self, method: Method, url: &'m str, - ) -> Result>, ()>, Error> { + ) -> Result, ()>, Error> { let url = Url::parse(url)?; let conn = self.connect(&url).await?; Ok(HttpRequestHandle { @@ -150,7 +150,7 @@ where pub async fn resource<'res>( &'res mut self, resource_url: &'res str, - ) -> Result>>, Error> { + ) -> Result>, Error> { let resource_url = Url::parse(resource_url)?; let conn = self.connect(&resource_url).await?; Ok(HttpResource { @@ -180,6 +180,23 @@ impl<'conn, T> HttpConnection<'conn, T> where T: Read + Write, { + /// Turn the request into a buffered request. + /// + /// This is only relevant if no TLS is used, as `embedded-tls` buffers internally and we reuse + /// its buffer for non-TLS connections. + pub fn into_buffered<'buf>(self, tx_buf: &'buf mut [u8]) -> HttpConnection<'buf, T> + where + 'conn: 'buf, + { + match self { + HttpConnection::Plain(conn) => { + HttpConnection::PlainBuffered(BufferedWrite::new(buffered_io_adapter::ConnErrorAdapter(conn), tx_buf)) + } + HttpConnection::PlainBuffered(conn) => HttpConnection::PlainBuffered(conn), + HttpConnection::Tls(tls) => HttpConnection::Tls(tls), + } + } + /// Send a request on an established connection. /// /// The request is sent in its raw form without any base path from the resource. @@ -257,7 +274,7 @@ where C: Read + Write, B: RequestBody, { - pub conn: C, + pub conn: HttpConnection<'m, C>, request: Option>, } @@ -270,12 +287,12 @@ where /// /// This is only relevant if no TLS is used, as `embedded-tls` buffers internally and we reuse /// its buffer for non-TLS connections. - pub fn into_buffered<'buf>( - self, - tx_buf: &'buf mut [u8], - ) -> HttpRequestHandle<'m, BufferedWrite<'buf, buffered_io_adapter::ConnErrorAdapter>, B> { + pub fn into_buffered<'buf>(self, tx_buf: &'buf mut [u8]) -> HttpRequestHandle<'buf, C, B> + where + 'm: 'buf, + { HttpRequestHandle { - conn: BufferedWrite::new(buffered_io_adapter::ConnErrorAdapter(self.conn), tx_buf), + conn: self.conn.into_buffered(tx_buf), request: self.request, } } @@ -285,7 +302,13 @@ where /// The response headers are stored in the provided rx_buf, which should be sized to contain at least the response headers. /// /// The response is returned. - pub async fn send<'buf, 'conn>(&'conn mut self, rx_buf: &'buf mut [u8]) -> Result, Error> { + pub async fn send<'buf, 'conn>( + &'conn mut self, + rx_buf: &'buf mut [u8], + ) -> Result>, Error> + where + 'conn: 'm, + { let request = self.request.take().ok_or(Error::AlreadySent)?.build(); request.write(&mut self.conn).await?; Response::read(&mut self.conn, request.method, rx_buf).await @@ -343,7 +366,7 @@ pub struct HttpResource<'res, C> where C: Read + Write, { - pub conn: C, + pub conn: HttpConnection<'res, C>, pub host: &'res str, pub base_path: &'res str, } @@ -356,12 +379,12 @@ where /// /// This is only relevant if no TLS is used, as `embedded-tls` buffers internally and we reuse /// its buffer for non-TLS connections. - pub fn into_buffered<'buf>( - self, - tx_buf: &'buf mut [u8], - ) -> HttpResource<'res, BufferedWrite<'buf, buffered_io_adapter::ConnErrorAdapter>> { + pub fn into_buffered<'buf>(self, tx_buf: &'buf mut [u8]) -> HttpResource<'buf, C> + where + 'res: 'buf, + { HttpResource { - conn: BufferedWrite::new(buffered_io_adapter::ConnErrorAdapter(self.conn), tx_buf), + conn: self.conn.into_buffered(tx_buf), host: self.host, base_path: self.base_path, } @@ -432,7 +455,7 @@ where &'conn mut self, mut request: Request<'res, B>, rx_buf: &'buf mut [u8], - ) -> Result, Error> { + ) -> Result>, Error> { request.base_path = Some(self.base_path); request.write(&mut self.conn).await?; Response::read(&mut self.conn, request.method, rx_buf).await @@ -444,7 +467,7 @@ where C: Read + Write, B: RequestBody, { - conn: &'conn mut C, + conn: &'conn mut HttpConnection<'res, C>, base_path: &'res str, request: DefaultRequestBuilder<'m, B>, } @@ -460,7 +483,10 @@ where /// The response headers are stored in the provided rx_buf, which should be sized to contain at least the response headers. /// /// The response is returned. - pub async fn send<'buf>(self, rx_buf: &'buf mut [u8]) -> Result, Error> { + pub async fn send<'buf>( + self, + rx_buf: &'buf mut [u8], + ) -> Result>, Error> { let conn = self.conn; let mut request = self.request.build(); request.base_path = Some(self.base_path); @@ -515,7 +541,7 @@ where } mod buffered_io_adapter { - use embedded_io::{Error as _, ErrorType, ReadExactError}; + use embedded_io::{Error as _, ErrorKind, ErrorType, ReadExactError}; use embedded_io_async::{Read, Write}; pub struct Error(embedded_io::ErrorKind); @@ -535,7 +561,7 @@ mod buffered_io_adapter { pub struct ConnErrorAdapter(pub C); impl ErrorType for ConnErrorAdapter { - type Error = Error; + type Error = ErrorKind; } impl Write for ConnErrorAdapter @@ -543,15 +569,15 @@ mod buffered_io_adapter { C: Write, { async fn write(&mut self, buf: &[u8]) -> Result { - self.0.write(buf).await.map_err(|e| Error(e.kind())) + self.0.write(buf).await.map_err(|e| e.kind()) } async fn flush(&mut self) -> Result<(), Self::Error> { - self.0.flush().await.map_err(|e| Error(e.kind())) + self.0.flush().await.map_err(|e| e.kind()) } async fn write_all(&mut self, buf: &[u8]) -> Result<(), Self::Error> { - self.0.write_all(buf).await.map_err(|e| Error(e.kind())) + self.0.write_all(buf).await.map_err(|e| e.kind()) } } @@ -560,13 +586,13 @@ mod buffered_io_adapter { C: Read, { async fn read(&mut self, buf: &mut [u8]) -> Result { - self.0.read(buf).await.map_err(|e| Error(e.kind())) + self.0.read(buf).await.map_err(|e| e.kind()) } async fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), ReadExactError> { self.0.read_exact(buf).await.map_err(|e| match e { ReadExactError::UnexpectedEof => ReadExactError::UnexpectedEof, - ReadExactError::Other(e) => ReadExactError::Other(Error(e.kind())), + ReadExactError::Other(e) => ReadExactError::Other(e.kind()), }) } } diff --git a/tests/client.rs b/tests/client.rs index d284899..ef04564 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -207,6 +207,43 @@ async fn test_resource_drogue_cloud_sandbox() { } } +#[tokio::test] +async fn test_request_response_notls_buffered() { + setup(); + let addr = ([127, 0, 0, 1], 0).into(); + + let service = make_service_fn(|_| async { Ok::<_, hyper::Error>(service_fn(echo)) }); + + let server = Server::bind(&addr).serve(service); + let addr = server.local_addr(); + + let (tx, rx) = oneshot::channel(); + let t = tokio::spawn(async move { + tokio::select! { + _ = server => {} + _ = rx => {} + } + }); + + let url = format!("http://127.0.0.1:{}", addr.port()); + let mut client = HttpClient::new(&TCP, &LOOPBACK_DNS); + let mut tx_buf = [0; 4096]; + let mut rx_buf = [0; 4096]; + let mut request = client + .request(Method::POST, &url) + .await + .unwrap() + .into_buffered(&mut tx_buf) + .body(b"PING".as_slice()) + .content_type(ContentType::TextPlain); + let response = request.send(&mut rx_buf).await.unwrap(); + let body = response.body().read_to_end().await; + assert_eq!(body.unwrap(), b"PING"); + + tx.send(()).unwrap(); + t.await.unwrap(); +} + fn load_certs(filename: &std::path::PathBuf) -> Vec { let certfile = std::fs::File::open(filename).expect("cannot open certificate file"); let mut reader = std::io::BufReader::new(certfile); From b990e32d7498b0a2480825abcb78357c3d9b898d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1niel=20Buga?= Date: Fri, 20 Oct 2023 16:22:41 +0200 Subject: [PATCH 05/18] Remove ConnErrorAdapter --- src/client.rs | 66 +++------------------------------------------------ 1 file changed, 3 insertions(+), 63 deletions(-) diff --git a/src/client.rs b/src/client.rs index 279ec8e..d73244c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -121,7 +121,7 @@ where #[cfg(feature = "embedded-tls")] match self.tls.as_mut() { Some(tls) => Ok(HttpConnection::PlainBuffered(BufferedWrite::new( - buffered_io_adapter::ConnErrorAdapter(conn), + conn, tls.write_buffer, ))), None => Ok(HttpConnection::Plain(conn)), @@ -169,7 +169,7 @@ where { Plain(C), #[cfg(feature = "embedded-tls")] - PlainBuffered(BufferedWrite<'m, buffered_io_adapter::ConnErrorAdapter>), + PlainBuffered(BufferedWrite<'m, C>), #[cfg(feature = "embedded-tls")] Tls(embedded_tls::TlsConnection<'m, C, embedded_tls::Aes128GcmSha256>), #[cfg(not(feature = "embedded-tls"))] @@ -189,9 +189,7 @@ where 'conn: 'buf, { match self { - HttpConnection::Plain(conn) => { - HttpConnection::PlainBuffered(BufferedWrite::new(buffered_io_adapter::ConnErrorAdapter(conn), tx_buf)) - } + HttpConnection::Plain(conn) => HttpConnection::PlainBuffered(BufferedWrite::new(conn, tx_buf)), HttpConnection::PlainBuffered(conn) => HttpConnection::PlainBuffered(conn), HttpConnection::Tls(tls) => HttpConnection::Tls(tls), } @@ -539,61 +537,3 @@ where self.request.build() } } - -mod buffered_io_adapter { - use embedded_io::{Error as _, ErrorKind, ErrorType, ReadExactError}; - use embedded_io_async::{Read, Write}; - - pub struct Error(embedded_io::ErrorKind); - - impl core::fmt::Debug for Error { - fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - self.0.fmt(f) - } - } - - impl embedded_io_async::Error for Error { - fn kind(&self) -> embedded_io::ErrorKind { - self.0 - } - } - - pub struct ConnErrorAdapter(pub C); - - impl ErrorType for ConnErrorAdapter { - type Error = ErrorKind; - } - - impl Write for ConnErrorAdapter - where - C: Write, - { - async fn write(&mut self, buf: &[u8]) -> Result { - self.0.write(buf).await.map_err(|e| e.kind()) - } - - async fn flush(&mut self) -> Result<(), Self::Error> { - self.0.flush().await.map_err(|e| e.kind()) - } - - async fn write_all(&mut self, buf: &[u8]) -> Result<(), Self::Error> { - self.0.write_all(buf).await.map_err(|e| e.kind()) - } - } - - impl Read for ConnErrorAdapter - where - C: Read, - { - async fn read(&mut self, buf: &mut [u8]) -> Result { - self.0.read(buf).await.map_err(|e| e.kind()) - } - - async fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), ReadExactError> { - self.0.read_exact(buf).await.map_err(|e| match e { - ReadExactError::UnexpectedEof => ReadExactError::UnexpectedEof, - ReadExactError::Other(e) => ReadExactError::Other(e.kind()), - }) - } - } -} From 6a00519790eb808cb7efce9fe70a7453db15f11b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1niel=20Buga?= Date: Fri, 20 Oct 2023 16:46:14 +0200 Subject: [PATCH 06/18] Test and fix --no-default-features --- .github/workflows/ci.yaml | 4 +++- src/client.rs | 4 ---- src/reader.rs | 5 ++++- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 085721c..792ce69 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -36,4 +36,6 @@ jobs: run: cargo clippy - name: Test - run: cargo test + run: | + cargo test + cargo test --no-default-features diff --git a/src/client.rs b/src/client.rs index d73244c..c35fed0 100644 --- a/src/client.rs +++ b/src/client.rs @@ -168,7 +168,6 @@ where C: Read + Write, { Plain(C), - #[cfg(feature = "embedded-tls")] PlainBuffered(BufferedWrite<'m, C>), #[cfg(feature = "embedded-tls")] Tls(embedded_tls::TlsConnection<'m, C, embedded_tls::Aes128GcmSha256>), @@ -225,7 +224,6 @@ where async fn read(&mut self, buf: &mut [u8]) -> Result { match self { Self::Plain(conn) => conn.read(buf).await.map_err(|e| e.kind()), - #[cfg(feature = "embedded-tls")] Self::PlainBuffered(conn) => conn.read(buf).await.map_err(|e| e.kind()), #[cfg(feature = "embedded-tls")] Self::Tls(conn) => conn.read(buf).await.map_err(|e| e.kind()), @@ -242,7 +240,6 @@ where async fn write(&mut self, buf: &[u8]) -> Result { match self { Self::Plain(conn) => conn.write(buf).await.map_err(|e| e.kind()), - #[cfg(feature = "embedded-tls")] Self::PlainBuffered(conn) => conn.write(buf).await.map_err(|e| e.kind()), #[cfg(feature = "embedded-tls")] Self::Tls(conn) => conn.write(buf).await.map_err(|e| e.kind()), @@ -254,7 +251,6 @@ where async fn flush(&mut self) -> Result<(), Self::Error> { match self { Self::Plain(conn) => conn.flush().await.map_err(|e| e.kind()), - #[cfg(feature = "embedded-tls")] Self::PlainBuffered(conn) => conn.flush().await.map_err(|e| e.kind()), #[cfg(feature = "embedded-tls")] Self::Tls(conn) => conn.flush().await.map_err(|e| e.kind()), diff --git a/src/reader.rs b/src/reader.rs index 5119a25..9459d96 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -1,6 +1,9 @@ -use embedded_io::{Error, ErrorKind, ErrorType}; +use embedded_io::{ErrorKind, ErrorType}; use embedded_io_async::{BufRead, Read, Write}; +#[cfg(feature = "embedded-tls")] +use embedded_io::Error; + use crate::client::HttpConnection; struct ReadBuffer<'buf> { From abba49760edf8f54e1a487cfbbd213bde4915ad7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1niel=20Buga?= Date: Fri, 20 Oct 2023 17:17:55 +0200 Subject: [PATCH 07/18] Untangle lifetimes, use HttpConnection in Response --- src/client.rs | 124 ++++++++++++++++++++---------------------------- src/reader.rs | 20 +++++--- src/response.rs | 65 ++++++++++++++++++++----- 3 files changed, 117 insertions(+), 92 deletions(-) diff --git a/src/client.rs b/src/client.rs index c35fed0..751ab4c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -79,7 +79,10 @@ where } } - async fn connect<'m>(&'m mut self, url: &Url<'m>) -> Result>, Error> { + async fn connect<'conn>( + &'conn mut self, + url: &Url<'_>, + ) -> Result>, Error> { let host = url.host(); let port = url.port_or_default(); @@ -107,7 +110,7 @@ where if let TlsVerify::Psk { identity, psk } = tls.verify { config = config.with_psk(psk, &[identity]); } - let mut conn: embedded_tls::TlsConnection<'m, T::Connection<'m>, embedded_tls::Aes128GcmSha256> = + let mut conn: embedded_tls::TlsConnection<'conn, T::Connection<'conn>, embedded_tls::Aes128GcmSha256> = embedded_tls::TlsConnection::new(conn, tls.read_buffer, tls.write_buffer); conn.open::<_, embedded_tls::NoVerify>(TlsContext::new(&config, &mut rng)) .await?; @@ -132,11 +135,11 @@ where } /// Create a single http request. - pub async fn request<'m>( - &'m mut self, + pub async fn request<'conn>( + &'conn mut self, method: Method, - url: &'m str, - ) -> Result, ()>, Error> { + url: &'conn str, + ) -> Result, ()>, Error> { let url = Url::parse(url)?; let conn = self.connect(&url).await?; Ok(HttpRequestHandle { @@ -163,16 +166,16 @@ where /// Represents a HTTP connection that may be encrypted or unencrypted. #[allow(clippy::large_enum_variant)] -pub enum HttpConnection<'m, C> +pub enum HttpConnection<'conn, C> where C: Read + Write, { Plain(C), - PlainBuffered(BufferedWrite<'m, C>), + PlainBuffered(BufferedWrite<'conn, C>), #[cfg(feature = "embedded-tls")] - Tls(embedded_tls::TlsConnection<'m, C, embedded_tls::Aes128GcmSha256>), + Tls(embedded_tls::TlsConnection<'conn, C, embedded_tls::Aes128GcmSha256>), #[cfg(not(feature = "embedded-tls"))] - Tls((&'m mut (), core::convert::Infallible)), // Variant is impossible to create, but we need it to avoid "unused lifetime" warning + Tls((&'conn mut (), core::convert::Infallible)), // Variant is impossible to create, but we need it to avoid "unused lifetime" warning } impl<'conn, T> HttpConnection<'conn, T> @@ -201,10 +204,10 @@ where /// /// The response is returned. pub async fn send<'buf, B: RequestBody>( - &'conn mut self, + &'buf mut self, request: Request<'conn, B>, rx_buf: &'buf mut [u8], - ) -> Result>, Error> { + ) -> Result, Error> { request.write(self).await?; Response::read(self, request.method, rx_buf).await } @@ -263,16 +266,16 @@ where /// A HTTP request handle /// /// The underlying connection is closed when drop'ed. -pub struct HttpRequestHandle<'m, C, B> +pub struct HttpRequestHandle<'conn, C, B> where C: Read + Write, B: RequestBody, { - pub conn: HttpConnection<'m, C>, - request: Option>, + pub conn: HttpConnection<'conn, C>, + request: Option>, } -impl<'m, C, B> HttpRequestHandle<'m, C, B> +impl<'conn, C, B> HttpRequestHandle<'conn, C, B> where C: Read + Write, B: RequestBody, @@ -283,7 +286,7 @@ where /// its buffer for non-TLS connections. pub fn into_buffered<'buf>(self, tx_buf: &'buf mut [u8]) -> HttpRequestHandle<'buf, C, B> where - 'm: 'buf, + 'conn: 'buf, { HttpRequestHandle { conn: self.conn.into_buffered(tx_buf), @@ -296,13 +299,7 @@ where /// The response headers are stored in the provided rx_buf, which should be sized to contain at least the response headers. /// /// The response is returned. - pub async fn send<'buf, 'conn>( - &'conn mut self, - rx_buf: &'buf mut [u8], - ) -> Result>, Error> - where - 'conn: 'm, - { + pub async fn send<'buf>(&'buf mut self, rx_buf: &'buf mut [u8]) -> Result, Error> { let request = self.request.take().ok_or(Error::AlreadySent)?.build(); request.write(&mut self.conn).await?; Response::read(&mut self.conn, request.method, rx_buf).await @@ -384,14 +381,11 @@ where } } - pub fn request<'conn, 'm>( - &'conn mut self, + pub fn request<'req>( + &'req mut self, method: Method, - path: &'m str, - ) -> HttpResourceRequestBuilder<'conn, 'res, 'm, C, ()> - where - 'res: 'm, - { + path: &'req str, + ) -> HttpResourceRequestBuilder<'req, 'res, C, ()> { HttpResourceRequestBuilder { conn: &mut self.conn, request: Request::new(method, path).host(self.host), @@ -400,42 +394,27 @@ where } /// Create a new scoped GET http request. - pub fn get<'conn, 'm>(&'conn mut self, path: &'m str) -> HttpResourceRequestBuilder<'conn, 'res, 'm, C, ()> - where - 'res: 'm, - { + pub fn get<'req>(&'req mut self, path: &'req str) -> HttpResourceRequestBuilder<'req, 'res, C, ()> { self.request(Method::GET, path) } /// Create a new scoped POST http request. - pub fn post<'conn, 'm>(&'conn mut self, path: &'m str) -> HttpResourceRequestBuilder<'conn, 'res, 'm, C, ()> - where - 'res: 'm, - { + pub fn post<'req>(&'req mut self, path: &'req str) -> HttpResourceRequestBuilder<'req, 'res, C, ()> { self.request(Method::POST, path) } /// Create a new scoped PUT http request. - pub fn put<'conn, 'm>(&'conn mut self, path: &'m str) -> HttpResourceRequestBuilder<'conn, 'res, 'm, C, ()> - where - 'res: 'm, - { + pub fn put<'req>(&'req mut self, path: &'req str) -> HttpResourceRequestBuilder<'req, 'res, C, ()> { self.request(Method::PUT, path) } /// Create a new scoped DELETE http request. - pub fn delete<'conn, 'm>(&'conn mut self, path: &'m str) -> HttpResourceRequestBuilder<'conn, 'res, 'm, C, ()> - where - 'res: 'm, - { + pub fn delete<'req>(&'req mut self, path: &'req str) -> HttpResourceRequestBuilder<'req, 'res, C, ()> { self.request(Method::DELETE, path) } /// Create a new scoped HEAD http request. - pub fn head<'conn, 'm>(&'conn mut self, path: &'m str) -> HttpResourceRequestBuilder<'conn, 'res, 'm, C, ()> - where - 'res: 'm, - { + pub fn head<'req>(&'req mut self, path: &'req str) -> HttpResourceRequestBuilder<'req, 'res, C, ()> { self.request(Method::HEAD, path) } @@ -445,28 +424,28 @@ where /// The response headers are stored in the provided rx_buf, which should be sized to contain at least the response headers. /// /// The response is returned. - pub async fn send<'buf, 'conn, B: RequestBody>( - &'conn mut self, - mut request: Request<'res, B>, - rx_buf: &'buf mut [u8], - ) -> Result>, Error> { + pub async fn send<'req, B: RequestBody>( + &'req mut self, + mut request: Request<'req, B>, + rx_buf: &'req mut [u8], + ) -> Result, Error> { request.base_path = Some(self.base_path); request.write(&mut self.conn).await?; Response::read(&mut self.conn, request.method, rx_buf).await } } -pub struct HttpResourceRequestBuilder<'conn, 'res, 'm, C, B> +pub struct HttpResourceRequestBuilder<'req, 'conn, C, B> where C: Read + Write, B: RequestBody, { - conn: &'conn mut HttpConnection<'res, C>, - base_path: &'res str, - request: DefaultRequestBuilder<'m, B>, + conn: &'req mut HttpConnection<'conn, C>, + base_path: &'req str, + request: DefaultRequestBuilder<'req, B>, } -impl<'conn, 'res, 'm, C, B> HttpResourceRequestBuilder<'conn, 'res, 'm, C, B> +impl<'req, 'conn, C, B> HttpResourceRequestBuilder<'req, 'conn, C, B> where C: Read + Write, B: RequestBody, @@ -477,10 +456,11 @@ where /// The response headers are stored in the provided rx_buf, which should be sized to contain at least the response headers. /// /// The response is returned. - pub async fn send<'buf>( - self, - rx_buf: &'buf mut [u8], - ) -> Result>, Error> { + pub async fn send<'buf>(self, rx_buf: &'buf mut [u8]) -> Result, Error> + where + 'conn: 'req + 'buf, + 'req: 'buf, + { let conn = self.conn; let mut request = self.request.build(); request.base_path = Some(self.base_path); @@ -489,19 +469,19 @@ where } } -impl<'conn, 'res, 'm, C, B> RequestBuilder<'m, B> for HttpResourceRequestBuilder<'conn, 'res, 'm, C, B> +impl<'req, 'conn, C, B> RequestBuilder<'req, B> for HttpResourceRequestBuilder<'req, 'conn, C, B> where C: Read + Write, B: RequestBody, { - type WithBody = HttpResourceRequestBuilder<'conn, 'res, 'm, C, T>; + type WithBody = HttpResourceRequestBuilder<'req, 'conn, C, T>; - fn headers(mut self, headers: &'m [(&'m str, &'m str)]) -> Self { + fn headers(mut self, headers: &'req [(&'req str, &'req str)]) -> Self { self.request = self.request.headers(headers); self } - fn path(mut self, path: &'m str) -> Self { + fn path(mut self, path: &'req str) -> Self { self.request = self.request.path(path); self } @@ -514,7 +494,7 @@ where } } - fn host(mut self, host: &'m str) -> Self { + fn host(mut self, host: &'req str) -> Self { self.request = self.request.host(host); self } @@ -524,12 +504,12 @@ where self } - fn basic_auth(mut self, username: &'m str, password: &'m str) -> Self { + fn basic_auth(mut self, username: &'req str, password: &'req str) -> Self { self.request = self.request.basic_auth(username, password); self } - fn build(self) -> Request<'m, B> { + fn build(self) -> Request<'req, B> { self.request.build() } } diff --git a/src/reader.rs b/src/reader.rs index 9459d96..7cd8adb 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -45,13 +45,19 @@ impl ReadBuffer<'_> { } } -pub struct BufferingReader<'buf, B> { +pub struct BufferingReader<'buf, 'conn, B> +where + B: Read + Write, +{ buffer: ReadBuffer<'buf>, - stream: B, + stream: &'buf mut HttpConnection<'conn, B>, } -impl<'buf, B> BufferingReader<'buf, B> { - pub fn new(buffer: &'buf mut [u8], loaded: usize, stream: B) -> Self { +impl<'buf, 'conn, B> BufferingReader<'buf, 'conn, B> +where + B: Read + Write, +{ + pub fn new(buffer: &'buf mut [u8], loaded: usize, stream: &'buf mut HttpConnection<'conn, B>) -> Self { Self { buffer: ReadBuffer::new(buffer, loaded), stream, @@ -59,14 +65,14 @@ impl<'buf, B> BufferingReader<'buf, B> { } } -impl ErrorType for BufferingReader<'_, &mut HttpConnection<'_, C>> +impl ErrorType for BufferingReader<'_, '_, C> where C: Read + Write, { type Error = ErrorKind; } -impl Read for BufferingReader<'_, &mut HttpConnection<'_, C>> +impl Read for BufferingReader<'_, '_, C> where C: Read + Write, { @@ -80,7 +86,7 @@ where } } -impl BufRead for BufferingReader<'_, &mut HttpConnection<'_, C>> +impl BufRead for BufferingReader<'_, '_, C> where C: Read + Write, { diff --git a/src/response.rs b/src/response.rs index 0e396d7..9735e5e 100644 --- a/src/response.rs +++ b/src/response.rs @@ -1,20 +1,19 @@ use embedded_io::{Error as _, ErrorType}; -use embedded_io_async::{BufRead, Read}; +use embedded_io_async::{BufRead, Read, Write}; use heapless::Vec; +use crate::client::HttpConnection; use crate::headers::{ContentType, KeepAlive, TransferEncoding}; use crate::reader::BufferingReader; use crate::request::Method; use crate::Error; /// Type representing a parsed HTTP response. -#[derive(Debug)] -#[cfg_attr(feature = "defmt", derive(defmt::Format))] pub struct Response<'buf, 'conn, C> where - C: Read, + C: Read + Write, { - conn: &'conn mut C, + conn: &'buf mut HttpConnection<'conn, C>, /// The method used to create the response. method: Method, /// The HTTP response status code. @@ -32,13 +31,54 @@ where raw_body_read: usize, } +#[cfg(feature = "defmt")] +impl defmt::Format for Response<'_, '_, C> +where + C: Read + Write, +{ + fn format(&self, fmt: defmt::Formatter) { + defmt::write!( + fmt, + "Response {{ method = {}, status = {}, content_type = {}, content_length = {}, transfer_encoding = {}, keep_alive = {}, header_buf = {:?}, header_len = {}, raw_body_read = {} }}", + self.method, + self.status, + self.content_type, + self.content_length, + self.transfer_encoding, + self.keep_alive, + self.header_buf, + self.header_len, + self.raw_body_read, + ) + } +} + +impl core::fmt::Debug for Response<'_, '_, C> +where + C: Read + Write, +{ + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("Response") + .field("method", &self.method) + .field("status", &self.status) + .field("content_type", &self.content_type) + .field("content_length", &self.content_length) + .field("transfer_encoding", &self.transfer_encoding) + .field("keep_alive", &self.keep_alive) + .field("header_buf", &self.header_buf) + .field("header_len", &self.header_len) + .field("raw_body_read", &self.raw_body_read) + .finish() + } +} + impl<'buf, 'conn, C> Response<'buf, 'conn, C> where - C: Read, + C: Read + Write, { // Read at least the headers from the connection. pub async fn read( - conn: &'conn mut C, + conn: &'buf mut HttpConnection<'conn, C>, method: Method, header_buf: &'buf mut [u8], ) -> Result, Error> { @@ -185,9 +225,9 @@ impl<'a> Iterator for HeaderIterator<'a> { /// in `body_buf`, and a reader to be used for reading the remaining body. pub struct ResponseBody<'buf, 'conn, C> where - C: Read, + C: Read + Write, { - conn: &'conn mut C, + conn: &'buf mut HttpConnection<'conn, C>, reader_hint: ReaderHint, /// The number of raw bytes read from the body and available in the beginning of `body_buf`. raw_body_read: usize, @@ -204,9 +244,9 @@ enum ReaderHint { impl<'buf, 'conn, C> ResponseBody<'buf, 'conn, C> where - C: Read, + C: Read + Write, { - pub fn reader(self) -> BodyReader> { + pub fn reader(self) -> BodyReader> { let raw_body = BufferingReader::new(self.body_buf, self.raw_body_read, self.conn); match self.reader_hint { @@ -226,8 +266,7 @@ where impl<'buf, 'conn, C> ResponseBody<'buf, 'conn, C> where - C: Read, - BufferingReader<'buf, &'conn mut C>: BufRead + Read, + C: Read + Write, { /// Read the entire body into the buffer originally provided [`Response::read()`]. /// This requires that this original buffer is large enough to contain the entire body. From c04044db27d733f5af3d79f8a0485e6ccfe55f94 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1niel=20Buga?= Date: Fri, 20 Oct 2023 17:19:39 +0200 Subject: [PATCH 08/18] Test that multiple requests can be sent with the same client --- tests/client.rs | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/tests/client.rs b/tests/client.rs index ef04564..37964b1 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -52,15 +52,17 @@ async fn test_request_response_notls() { let url = format!("http://127.0.0.1:{}", addr.port()); let mut client = HttpClient::new(&TCP, &LOOPBACK_DNS); let mut rx_buf = [0; 4096]; - let mut request = client - .request(Method::POST, &url) - .await - .unwrap() - .body(b"PING".as_slice()) - .content_type(ContentType::TextPlain); - let response = request.send(&mut rx_buf).await.unwrap(); - let body = response.body().read_to_end().await; - assert_eq!(body.unwrap(), b"PING"); + for _ in 0..2 { + let mut request = client + .request(Method::POST, &url) + .await + .unwrap() + .body(b"PING".as_slice()) + .content_type(ContentType::TextPlain); + let response = request.send(&mut rx_buf).await.unwrap(); + let body = response.body().read_to_end().await; + assert_eq!(body.unwrap(), b"PING"); + } tx.send(()).unwrap(); t.await.unwrap(); From c953ae8745db328b283256c82396bafdca0fd412 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1niel=20Buga?= Date: Fri, 20 Oct 2023 17:26:04 +0200 Subject: [PATCH 09/18] FixedLength: read directly to output --- src/response.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/response.rs b/src/response.rs index 9735e5e..f542b49 100644 --- a/src/response.rs +++ b/src/response.rs @@ -341,7 +341,12 @@ where let is_done = match self { BodyReader::Empty => true, - BodyReader::FixedLength(reader) => reader.remaining == 0, + BodyReader::FixedLength(reader) => { + if reader.remaining > 0 { + warn!("FixedLength: {} bytes remained", reader.remaining); + } + reader.remaining == 0 + } BodyReader::Chunked(reader) => reader.chunk_remaining == ChunkState::Empty, BodyReader::ToEnd(_) => true, }; @@ -425,13 +430,14 @@ where C: BufRead + Read, { async fn read(&mut self, buf: &mut [u8]) -> Result { - let loaded = self.fill_buf().await?; - let len = loaded.len().min(buf.len()); + if self.remaining == 0 { + return Ok(0); + } - buf[..len].copy_from_slice(&loaded[..len]); - self.consume(len); + let read = self.raw_body.read(buf).await.map_err(|e| Error::Network(e.kind()))?; + self.remaining -= read; - Ok(len) + Ok(read) } } From 581927e1bd26b93623e606b98a81f3f6674f2b89 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1niel=20Buga?= Date: Fri, 20 Oct 2023 17:29:28 +0200 Subject: [PATCH 10/18] ChunkedBodyReader: read directly into output buffer --- src/response.rs | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/src/response.rs b/src/response.rs index f542b49..8b2b436 100644 --- a/src/response.rs +++ b/src/response.rs @@ -579,18 +579,30 @@ where C: BufRead + Read, { async fn read(&mut self, buf: &mut [u8]) -> Result { - if buf.is_empty() { - return Ok(0); + match self.chunk_remaining { + ChunkState::NoChunk => self.read_next_chunk_length().await?, + + ChunkState::NotEmpty(0) => { + // The current chunk is currently empty, advance into a new chunk... + self.read_chunk_end().await?; + self.read_next_chunk_length().await?; + } + + ChunkState::NotEmpty(_) => {} + + ChunkState::Empty => return Ok(0), } - // If we receive an empty buffer here, the body includes an empty chunk. - // `fill_buf` will return an Err if the connection is closed. - let loaded = self.fill_buf().await?; + let remaining = self.chunk_remaining.len(); + let max_len = buf.len().min(remaining); - let len = loaded.len().min(buf.len()); + let len = self + .raw_body + .read(&mut buf[..max_len]) + .await + .map_err(|e| Error::Network(e.kind()))?; - buf[..len].copy_from_slice(&loaded[..len]); - self.consume(len); + self.chunk_remaining.consume(len); Ok(len) } From 66051ba9a56450003dccd94a29d02dfb3855ce0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1niel=20Buga?= Date: Fri, 20 Oct 2023 17:46:52 +0200 Subject: [PATCH 11/18] Deduplicate chunk boundary logic --- src/response.rs | 47 ++++++++++++++++++----------------------------- 1 file changed, 18 insertions(+), 29 deletions(-) diff --git a/src/response.rs b/src/response.rs index 8b2b436..e82655c 100644 --- a/src/response.rs +++ b/src/response.rs @@ -568,17 +568,9 @@ where } Ok(()) } -} - -impl ErrorType for ChunkedBodyReader { - type Error = Error; -} -impl Read for ChunkedBodyReader -where - C: BufRead + Read, -{ - async fn read(&mut self, buf: &mut [u8]) -> Result { + /// Handles chunk boundary and returns the number of bytes in the current (or new) chunk. + async fn handle_chunk_boundary(&mut self) -> Result { match self.chunk_remaining { ChunkState::NoChunk => self.read_next_chunk_length().await?, @@ -593,7 +585,20 @@ where ChunkState::Empty => return Ok(0), } - let remaining = self.chunk_remaining.len(); + Ok(self.chunk_remaining.len()) + } +} + +impl ErrorType for ChunkedBodyReader { + type Error = Error; +} + +impl Read for ChunkedBodyReader +where + C: BufRead + Read, +{ + async fn read(&mut self, buf: &mut [u8]) -> Result { + let remaining = self.handle_chunk_boundary().await?; let max_len = buf.len().min(remaining); let len = self @@ -613,28 +618,12 @@ where C: BufRead + Read, { async fn fill_buf(&mut self) -> Result<&[u8], Self::Error> { - match self.chunk_remaining { - ChunkState::NoChunk => self.read_next_chunk_length().await?, - - ChunkState::NotEmpty(0) => { - // The current chunk is currently empty, advance into a new chunk... - self.read_chunk_end().await?; - self.read_next_chunk_length().await?; - } - - ChunkState::NotEmpty(_) => {} - - ChunkState::Empty => return Ok(&[]), - } - - let remaining = self.chunk_remaining.len(); + let remaining = self.handle_chunk_boundary().await?; let buf = self.raw_body.fill_buf().await.map_err(|e| Error::Network(e.kind()))?; - if buf.is_empty() { - return Err(Error::ConnectionClosed); - } let len = buf.len().min(remaining); + Ok(&buf[..len]) } From 8cad89b04c1be8518ba85b4c3442e65727f9b8d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1niel=20Buga?= Date: Fri, 20 Oct 2023 17:56:45 +0200 Subject: [PATCH 12/18] Fix typo --- src/response.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/response.rs b/src/response.rs index e82655c..20703ad 100644 --- a/src/response.rs +++ b/src/response.rs @@ -273,7 +273,7 @@ where /// /// This is not valid for chunked responses as it requires that the body bytes over-read /// while parsing the http response header would be available for the body reader. - /// For this case, of if the original buffer is not large enough, use + /// For this case, or if the original buffer is not large enough, use /// [`BodyReader::read_to_end()`] instead from the reader returned by [`ResponseBody::reader()`]. pub async fn read_to_end(self) -> Result<&'buf mut [u8], Error> { // We can only read responses with Content-Length header to end using the body_buf buffer, From 92a1c5711f7fb9b92245a5165dcbb480fbe9e9c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1niel=20Buga?= Date: Fri, 20 Oct 2023 18:01:05 +0200 Subject: [PATCH 13/18] Hide ChunkState --- src/response.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/response.rs b/src/response.rs index 20703ad..c52dbf6 100644 --- a/src/response.rs +++ b/src/response.rs @@ -472,7 +472,7 @@ where } #[derive(Clone, Copy, PartialEq, Eq)] -pub enum ChunkState { +enum ChunkState { NoChunk, NotEmpty(u32), Empty, From f1185f763032b86d91fb8de8146454212c75d289 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1niel=20Buga?= Date: Fri, 20 Oct 2023 18:05:52 +0200 Subject: [PATCH 14/18] Remove impl_trait_projections mention --- README.md | 1 - 1 file changed, 1 deletion(-) diff --git a/README.md b/README.md index 337e63b..fae0a17 100644 --- a/README.md +++ b/README.md @@ -64,6 +64,5 @@ This enables `alloc` on `embedded-tls` which in turn enables RSA signature algor `reqwless` requires a feature from `nightly` to compile `embedded-io` with async support: * `async_fn_in_trait` -* `impl_trait_projections` This feature is complete, but is not yet merged to `stable`. From b1128cd461ada0d52d8987407cca98530f2067b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1niel=20Buga?= Date: Sat, 21 Oct 2023 21:01:07 +0200 Subject: [PATCH 15/18] Clean up internals --- src/client.rs | 11 ++++--- src/reader.rs | 29 ++++++++-------- src/response.rs | 88 ++++++++++++++++++++++--------------------------- 3 files changed, 60 insertions(+), 68 deletions(-) diff --git a/src/client.rs b/src/client.rs index 751ab4c..7acaa86 100644 --- a/src/client.rs +++ b/src/client.rs @@ -207,7 +207,7 @@ where &'buf mut self, request: Request<'conn, B>, rx_buf: &'buf mut [u8], - ) -> Result, Error> { + ) -> Result>, Error> { request.write(self).await?; Response::read(self, request.method, rx_buf).await } @@ -299,7 +299,10 @@ where /// The response headers are stored in the provided rx_buf, which should be sized to contain at least the response headers. /// /// The response is returned. - pub async fn send<'buf>(&'buf mut self, rx_buf: &'buf mut [u8]) -> Result, Error> { + pub async fn send<'buf>( + &'buf mut self, + rx_buf: &'buf mut [u8], + ) -> Result>, Error> { let request = self.request.take().ok_or(Error::AlreadySent)?.build(); request.write(&mut self.conn).await?; Response::read(&mut self.conn, request.method, rx_buf).await @@ -428,7 +431,7 @@ where &'req mut self, mut request: Request<'req, B>, rx_buf: &'req mut [u8], - ) -> Result, Error> { + ) -> Result>, Error> { request.base_path = Some(self.base_path); request.write(&mut self.conn).await?; Response::read(&mut self.conn, request.method, rx_buf).await @@ -456,7 +459,7 @@ where /// The response headers are stored in the provided rx_buf, which should be sized to contain at least the response headers. /// /// The response is returned. - pub async fn send<'buf>(self, rx_buf: &'buf mut [u8]) -> Result, Error> + pub async fn send<'buf>(self, rx_buf: &'buf mut [u8]) -> Result>, Error> where 'conn: 'req + 'buf, 'req: 'buf, diff --git a/src/reader.rs b/src/reader.rs index 7cd8adb..62cfb83 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -1,9 +1,6 @@ -use embedded_io::{ErrorKind, ErrorType}; +use embedded_io::{Error, ErrorKind, ErrorType}; use embedded_io_async::{BufRead, Read, Write}; -#[cfg(feature = "embedded-tls")] -use embedded_io::Error; - use crate::client::HttpConnection; struct ReadBuffer<'buf> { @@ -45,19 +42,19 @@ impl ReadBuffer<'_> { } } -pub struct BufferingReader<'buf, 'conn, B> +pub struct BufferingReader<'buf, B> where - B: Read + Write, + B: Read, { buffer: ReadBuffer<'buf>, - stream: &'buf mut HttpConnection<'conn, B>, + stream: &'buf mut B, } -impl<'buf, 'conn, B> BufferingReader<'buf, 'conn, B> +impl<'buf, 'conn, B> BufferingReader<'buf, B> where - B: Read + Write, + B: Read, { - pub fn new(buffer: &'buf mut [u8], loaded: usize, stream: &'buf mut HttpConnection<'conn, B>) -> Self { + pub fn new(buffer: &'buf mut [u8], loaded: usize, stream: &'buf mut B) -> Self { Self { buffer: ReadBuffer::new(buffer, loaded), stream, @@ -65,16 +62,16 @@ where } } -impl ErrorType for BufferingReader<'_, '_, C> +impl ErrorType for BufferingReader<'_, C> where - C: Read + Write, + C: Read, { type Error = ErrorKind; } -impl Read for BufferingReader<'_, '_, C> +impl Read for BufferingReader<'_, C> where - C: Read + Write, + C: Read, { async fn read(&mut self, buf: &mut [u8]) -> Result { if !self.buffer.is_empty() { @@ -82,11 +79,11 @@ where return Ok(amt); } - self.stream.read(buf).await + self.stream.read(buf).await.map_err(|e| e.kind()) } } -impl BufRead for BufferingReader<'_, '_, C> +impl BufRead for BufferingReader<'_, HttpConnection<'_, C>> where C: Read + Write, { diff --git a/src/response.rs b/src/response.rs index c52dbf6..522b416 100644 --- a/src/response.rs +++ b/src/response.rs @@ -2,18 +2,17 @@ use embedded_io::{Error as _, ErrorType}; use embedded_io_async::{BufRead, Read, Write}; use heapless::Vec; -use crate::client::HttpConnection; use crate::headers::{ContentType, KeepAlive, TransferEncoding}; use crate::reader::BufferingReader; use crate::request::Method; use crate::Error; /// Type representing a parsed HTTP response. -pub struct Response<'buf, 'conn, C> +pub struct Response<'buf, C> where - C: Read + Write, + C: Read, { - conn: &'buf mut HttpConnection<'conn, C>, + conn: &'buf mut C, /// The method used to create the response. method: Method, /// The HTTP response status code. @@ -32,7 +31,7 @@ where } #[cfg(feature = "defmt")] -impl defmt::Format for Response<'_, '_, C> +impl defmt::Format for Response<'_, C> where C: Read + Write, { @@ -53,7 +52,7 @@ where } } -impl core::fmt::Debug for Response<'_, '_, C> +impl core::fmt::Debug for Response<'_, C> where C: Read + Write, { @@ -72,16 +71,12 @@ where } } -impl<'buf, 'conn, C> Response<'buf, 'conn, C> +impl<'buf, C> Response<'buf, C> where - C: Read + Write, + C: Read, { // Read at least the headers from the connection. - pub async fn read( - conn: &'buf mut HttpConnection<'conn, C>, - method: Method, - header_buf: &'buf mut [u8], - ) -> Result, Error> { + pub async fn read(conn: &'buf mut C, method: Method, header_buf: &'buf mut [u8]) -> Result { let mut header_len = 0; let mut pos = 0; while pos < header_buf.len() { @@ -179,7 +174,7 @@ where } /// Get the response body - pub fn body(self) -> ResponseBody<'buf, 'conn, C> { + pub fn body(self) -> ResponseBody<'buf, C> { let reader_hint = if self.method == Method::HEAD { // Head requests does not have a body so we return an empty reader ReaderHint::Empty @@ -223,11 +218,11 @@ impl<'a> Iterator for HeaderIterator<'a> { /// This type contains the original header buffer provided to `read_headers`, /// now renamed to `body_buf`, the number of read body bytes that are available /// in `body_buf`, and a reader to be used for reading the remaining body. -pub struct ResponseBody<'buf, 'conn, C> +pub struct ResponseBody<'buf, C> where - C: Read + Write, + C: Read, { - conn: &'buf mut HttpConnection<'conn, C>, + conn: &'buf mut C, reader_hint: ReaderHint, /// The number of raw bytes read from the body and available in the beginning of `body_buf`. raw_body_read: usize, @@ -242,11 +237,11 @@ enum ReaderHint { ToEnd, // https://www.rfc-editor.org/rfc/rfc7230#section-3.3.3 pt. 7: Until end of connection } -impl<'buf, 'conn, C> ResponseBody<'buf, 'conn, C> +impl<'buf, C> ResponseBody<'buf, C> where - C: Read + Write, + C: Read, { - pub fn reader(self) -> BodyReader> { + pub fn reader(self) -> BodyReader> { let raw_body = BufferingReader::new(self.body_buf, self.raw_body_read, self.conn); match self.reader_hint { @@ -264,9 +259,9 @@ where } } -impl<'buf, 'conn, C> ResponseBody<'buf, 'conn, C> +impl<'buf, C> ResponseBody<'buf, C> where - C: Read + Write, + C: Read, { /// Read the entire body into the buffer originally provided [`Response::read()`]. /// This requires that this original buffer is large enough to contain the entire body. @@ -326,7 +321,7 @@ pub enum BodyReader { impl BodyReader where - B: BufRead + Read, + B: Read, { /// Read the entire body pub async fn read_to_end(&mut self, buf: &mut [u8]) -> Result { @@ -360,14 +355,13 @@ where async fn discard(&mut self) -> Result { let mut body_len = 0; + let mut buf = [0; 128]; loop { - let buf = self.fill_buf().await?; - if buf.is_empty() { + let buf = self.read(&mut buf).await?; + if buf == 0 { break; } - let buf_len = buf.len(); - body_len += buf_len; - self.consume(buf_len); + body_len += buf; } Ok(body_len) @@ -380,7 +374,7 @@ impl ErrorType for BodyReader { impl Read for BodyReader where - B: BufRead + Read, + B: Read, { async fn read(&mut self, buf: &mut [u8]) -> Result { match self { @@ -427,7 +421,7 @@ impl ErrorType for FixedLengthBodyReader { impl Read for FixedLengthBodyReader where - C: BufRead + Read, + C: Read, { async fn read(&mut self, buf: &mut [u8]) -> Result { if self.remaining == 0 { @@ -506,31 +500,29 @@ pub struct ChunkedBodyReader { impl ChunkedBodyReader where - C: BufRead + Read, + C: Read, { async fn read_next_chunk_length(&mut self) -> Result<(), Error> { let mut header_buf = [0; 8 + 2]; // 32 bit hex + \r + \n let mut total_read = 0; 'read_size: loop { - let buf = self.raw_body.fill_buf().await.map_err(|e| e.kind())?; - for (i, byte) in buf.iter().enumerate() { - if *byte != b'\n' { - header_buf[total_read] = *byte; - total_read += 1; - - if total_read == header_buf.len() { - self.raw_body.consume(i + 1); - return Err(Error::Codec); - } - } else { - self.raw_body.consume(i + 1); - break 'read_size; + let mut byte = 0; + self.raw_body + .read_exact(core::slice::from_mut(&mut byte)) + .await + .map_err(|e| Error::from(e).kind())?; + + if byte != b'\n' { + header_buf[total_read] = byte; + total_read += 1; + + if total_read == header_buf.len() { + return Err(Error::Codec); } + } else { + break 'read_size; } - - let consumed = buf.len(); - self.raw_body.consume(consumed); } if header_buf[total_read - 1] != b'\r' { @@ -595,7 +587,7 @@ impl ErrorType for ChunkedBodyReader { impl Read for ChunkedBodyReader where - C: BufRead + Read, + C: Read, { async fn read(&mut self, buf: &mut [u8]) -> Result { let remaining = self.handle_chunk_boundary().await?; From 00d559d4d1c3682f63e15b75070e59cf965d4462 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1niel=20Buga?= Date: Sat, 21 Oct 2023 21:10:49 +0200 Subject: [PATCH 16/18] Make Debug impls on Response usable --- src/client.rs | 27 +++++++++++++++++++++++++++ src/response.rs | 45 +++------------------------------------------ 2 files changed, 30 insertions(+), 42 deletions(-) diff --git a/src/client.rs b/src/client.rs index 7acaa86..2e81c30 100644 --- a/src/client.rs +++ b/src/client.rs @@ -178,6 +178,33 @@ where Tls((&'conn mut (), core::convert::Infallible)), // Variant is impossible to create, but we need it to avoid "unused lifetime" warning } +#[cfg(feature = "defmt")] +impl defmt::Format for HttpConnection<'_, C> +where + C: Read + Write, +{ + fn format(&self, fmt: defmt::Formatter) { + match self { + HttpConnection::Plain(_) => defmt::write!(fmt, "Plain"), + HttpConnection::PlainBuffered(_) => defmt::write!(fmt, "PlainBuffered"), + HttpConnection::Tls(_) => defmt::write!(fmt, "Tls"), + } + } +} + +impl core::fmt::Debug for HttpConnection<'_, C> +where + C: Read + Write, +{ + fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { + match self { + HttpConnection::Plain(_) => f.debug_tuple("Plain").finish(), + HttpConnection::PlainBuffered(_) => f.debug_tuple("PlainBuffered").finish(), + HttpConnection::Tls(_) => f.debug_tuple("Tls").finish(), + } + } +} + impl<'conn, T> HttpConnection<'conn, T> where T: Read + Write, diff --git a/src/response.rs b/src/response.rs index 522b416..d724534 100644 --- a/src/response.rs +++ b/src/response.rs @@ -1,5 +1,5 @@ use embedded_io::{Error as _, ErrorType}; -use embedded_io_async::{BufRead, Read, Write}; +use embedded_io_async::{BufRead, Read}; use heapless::Vec; use crate::headers::{ContentType, KeepAlive, TransferEncoding}; @@ -8,6 +8,8 @@ use crate::request::Method; use crate::Error; /// Type representing a parsed HTTP response. +#[derive(Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] pub struct Response<'buf, C> where C: Read, @@ -30,47 +32,6 @@ where raw_body_read: usize, } -#[cfg(feature = "defmt")] -impl defmt::Format for Response<'_, C> -where - C: Read + Write, -{ - fn format(&self, fmt: defmt::Formatter) { - defmt::write!( - fmt, - "Response {{ method = {}, status = {}, content_type = {}, content_length = {}, transfer_encoding = {}, keep_alive = {}, header_buf = {:?}, header_len = {}, raw_body_read = {} }}", - self.method, - self.status, - self.content_type, - self.content_length, - self.transfer_encoding, - self.keep_alive, - self.header_buf, - self.header_len, - self.raw_body_read, - ) - } -} - -impl core::fmt::Debug for Response<'_, C> -where - C: Read + Write, -{ - fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - f.debug_struct("Response") - .field("method", &self.method) - .field("status", &self.status) - .field("content_type", &self.content_type) - .field("content_length", &self.content_length) - .field("transfer_encoding", &self.transfer_encoding) - .field("keep_alive", &self.keep_alive) - .field("header_buf", &self.header_buf) - .field("header_len", &self.header_len) - .field("raw_body_read", &self.raw_body_read) - .finish() - } -} - impl<'buf, C> Response<'buf, C> where C: Read, From 2cbf54d969c74bc9bdd755c1a74297136a23f5aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1niel=20Buga?= Date: Sat, 21 Oct 2023 21:17:52 +0200 Subject: [PATCH 17/18] Add test that uses fill_buf to read response --- tests/client.rs | 51 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/tests/client.rs b/tests/client.rs index 37964b1..77cf375 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -1,6 +1,7 @@ #![feature(async_fn_in_trait)] #![allow(incomplete_features)] use embedded_io_adapters::tokio_1::FromTokio; +use embedded_io_async::BufRead; use embedded_nal_async::{AddrType, IpAddr, Ipv4Addr}; use hyper::server::conn::Http; use hyper::service::{make_service_fn, service_fn}; @@ -106,6 +107,56 @@ async fn test_resource_notls() { t.await.unwrap(); } +#[tokio::test] +async fn test_resource_notls_bufread() { + setup(); + let addr = ([127, 0, 0, 1], 0).into(); + + let service = make_service_fn(|_| async { Ok::<_, hyper::Error>(service_fn(echo)) }); + + let server = Server::bind(&addr).serve(service); + let addr = server.local_addr(); + + let (tx, rx) = oneshot::channel(); + let t = tokio::spawn(async move { + tokio::select! { + _ = server => {} + _ = rx => {} + } + }); + + let url = format!("http://127.0.0.1:{}", addr.port()); + let mut client = HttpClient::new(&TCP, &LOOPBACK_DNS); + let mut rx_buf = [0; 4096]; + let mut resource = client.resource(&url).await.unwrap(); + for _ in 0..2 { + let response = resource + .post("/") + .body(b"PING".as_slice()) + .content_type(ContentType::TextPlain) + .send(&mut rx_buf) + .await + .unwrap(); + let mut body_reader = response.body().reader(); + + let mut body = vec![]; + loop { + let buf = body_reader.fill_buf().await.unwrap(); + if buf.is_empty() { + break; + } + body.extend_from_slice(buf); + let buf_len = buf.len(); + body_reader.consume(buf_len); + } + + assert_eq!(body, b"PING"); + } + + tx.send(()).unwrap(); + t.await.unwrap(); +} + #[tokio::test] #[cfg(feature = "embedded-tls")] async fn test_resource_rustls() { From ab16446611eda254bc683de4cd42ed0641c2a2f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1niel=20Buga?= Date: Sat, 21 Oct 2023 22:32:10 +0200 Subject: [PATCH 18/18] Simplify tests --- src/response.rs | 70 +++++++++---------------------------------------- 1 file changed, 13 insertions(+), 57 deletions(-) diff --git a/src/response.rs b/src/response.rs index d724534..3132865 100644 --- a/src/response.rs +++ b/src/response.rs @@ -681,55 +681,17 @@ impl From for Status { #[cfg(test)] mod tests { - use embedded_io::{ErrorKind, ErrorType}; - use embedded_io_async::{Read, Write}; + use embedded_io_async::Read; use crate::{ - client::HttpConnection, reader::BufferingReader, request::Method, response::{ChunkState, ChunkedBodyReader, Response}, }; - struct Buffer { - b: Vec, - } - impl Buffer { - fn from(b: &[u8]) -> Self { - Self { b: b.to_vec() } - } - } - - impl ErrorType for Buffer { - type Error = ErrorKind; - } - - impl Read for Buffer { - async fn read(&mut self, buf: &mut [u8]) -> Result { - let len = buf.len().min(self.b.len()); - buf[..len].copy_from_slice(&self.b[..len]); - self.b.drain(..len); - - Ok(len) - } - } - - impl Write for Buffer { - async fn write(&mut self, buf: &[u8]) -> Result { - self.b.extend_from_slice(buf); - Ok(buf.len()) - } - - async fn flush(&mut self) -> Result<(), Self::Error> { - Ok(()) - } - } - #[tokio::test] async fn can_read_with_content_length_with_same_buffer() { - let mut response = HttpConnection::Plain(Buffer::from( - b"HTTP/1.1 200 OK\r\nContent-Length: 11\r\n\r\nHELLO WORLD", - )); + let mut response = b"HTTP/1.1 200 OK\r\nContent-Length: 11\r\n\r\nHELLO WORLD".as_slice(); let mut response_buf = [0; 200]; let response = Response::read(&mut response, Method::GET, &mut response_buf) .await @@ -742,9 +704,7 @@ mod tests { #[tokio::test] async fn can_read_with_content_length_to_other_buffer() { - let mut response = HttpConnection::Plain(Buffer::from( - b"HTTP/1.1 200 OK\r\nContent-Length: 11\r\n\r\nHELLO WORLD", - )); + let mut response = b"HTTP/1.1 200 OK\r\nContent-Length: 11\r\n\r\nHELLO WORLD".as_slice(); let mut header_buf = [0; 200]; let response = Response::read(&mut response, Method::GET, &mut header_buf) .await @@ -758,9 +718,7 @@ mod tests { #[tokio::test] async fn can_discard_with_content_length() { - let mut response = HttpConnection::Plain(Buffer::from( - b"HTTP/1.1 200 OK\r\nContent-Length: 11\r\n\r\nHELLO WORLD", - )); + let mut response = b"HTTP/1.1 200 OK\r\nContent-Length: 11\r\n\r\nHELLO WORLD".as_slice(); let mut response_buf = [0; 200]; let response = Response::read(&mut response, Method::GET, &mut response_buf) .await @@ -771,9 +729,8 @@ mod tests { #[tokio::test] async fn can_read_with_chunked_encoding() { - let mut response = HttpConnection::Plain(Buffer::from( - b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\nB\r\nHELLO WORLD\r\n0\r\n\r\n", - )); + let mut response = + b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\nB\r\nHELLO WORLD\r\n0\r\n\r\n".as_slice(); let mut header_buf = [0; 200]; let response = Response::read(&mut response, Method::GET, &mut header_buf) .await @@ -787,9 +744,8 @@ mod tests { #[tokio::test] async fn can_discard_with_chunked_encoding() { - let mut response = HttpConnection::Plain(Buffer::from( - b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\nB\r\nHELLO WORLD\r\n0\r\n\r\n", - )); + let mut response = + b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\nB\r\nHELLO WORLD\r\n0\r\n\r\n".as_slice(); let mut header_buf = [0; 200]; let response = Response::read(&mut response, Method::GET, &mut header_buf) .await @@ -800,7 +756,7 @@ mod tests { #[tokio::test] async fn can_read_to_end_of_connection_with_same_buffer() { - let mut response = HttpConnection::Plain(Buffer::from(b"HTTP/1.1 200 OK\r\n\r\nHELLO WORLD")); + let mut response = b"HTTP/1.1 200 OK\r\n\r\nHELLO WORLD".as_slice(); let mut header_buf = [0; 200]; let response = Response::read(&mut response, Method::GET, &mut header_buf) .await @@ -813,7 +769,7 @@ mod tests { #[tokio::test] async fn can_read_to_end_of_connection_to_other_buffer() { - let mut response = HttpConnection::Plain(Buffer::from(b"HTTP/1.1 200 OK\r\n\r\nHELLO WORLD")); + let mut response = b"HTTP/1.1 200 OK\r\n\r\nHELLO WORLD".as_slice(); let mut header_buf = [0; 200]; let response = Response::read(&mut response, Method::GET, &mut header_buf) .await @@ -827,7 +783,7 @@ mod tests { #[tokio::test] async fn can_discard_to_end_of_connection() { - let mut response = HttpConnection::Plain(Buffer::from(b"HTTP/1.1 200 OK\r\n\r\nHELLO WORLD")); + let mut response = b"HTTP/1.1 200 OK\r\n\r\nHELLO WORLD".as_slice(); let mut header_buf = [0; 200]; let response = Response::read(&mut response, Method::GET, &mut header_buf) .await @@ -838,7 +794,7 @@ mod tests { #[tokio::test] async fn chunked_body_reader_can_read_with_large_buffer() { - let mut raw_body = HttpConnection::Plain(Buffer::from(b"1\r\nX\r\n10\r\nYYYYYYYYYYYYYYYY\r\n0\r\n\r\n")); + let mut raw_body = b"1\r\nX\r\n10\r\nYYYYYYYYYYYYYYYY\r\n0\r\n\r\n".as_slice(); let mut read_buffer = [0; 128]; let mut reader = ChunkedBodyReader { raw_body: BufferingReader::new(&mut read_buffer, 0, &mut raw_body), @@ -855,7 +811,7 @@ mod tests { #[tokio::test] async fn chunked_body_reader_can_read_with_tiny_buffer() { - let mut raw_body = HttpConnection::Plain(Buffer::from(b"1\r\nX\r\n10\r\nYYYYYYYYYYYYYYYY\r\n0\r\n\r\n")); + let mut raw_body = b"1\r\nX\r\n10\r\nYYYYYYYYYYYYYYYY\r\n0\r\n\r\n".as_slice(); let mut read_buffer = [0; 128]; let mut reader = ChunkedBodyReader { raw_body: BufferingReader::new(&mut read_buffer, 0, &mut raw_body),