From 85c20cdd929a362e546671f7e6e5bd17b1d5e4a4 Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Fri, 17 May 2024 15:09:51 +0200 Subject: [PATCH 01/16] Buffer writes before chunks are written to connection This commit: * Moves the responsibility for writing the request body from `Request` to `HttpConnection`. * Uses the buffer provided when calling `into_buffered()` to buffer writes before they are passed on to the `ChunkedBufferWriter` This fixes #71 --- src/client.rs | 164 +++++++++++++++++++++++++++++++++++++++++++++-- src/request.rs | 100 ++++++++++++++++------------- tests/request.rs | 14 ++-- 3 files changed, 219 insertions(+), 59 deletions(-) diff --git a/src/client.rs b/src/client.rs index 86db2ce..87d51f6 100644 --- a/src/client.rs +++ b/src/client.rs @@ -280,12 +280,68 @@ where /// The response is returned. pub async fn send<'req, 'buf, B: RequestBody>( &'req mut self, - request: Request<'conn, B>, + request: Request<'req, B>, rx_buf: &'buf mut [u8], ) -> Result>, Error> { - request.write(self).await?; + self.write_request(&request).await?; + self.flush().await?; Response::read(self, request.method, rx_buf).await } + + async fn write_request<'req, B: RequestBody>(&mut self, request: &Request<'req, B>) -> Result<(), Error> { + request.write_header(self).await?; + + if let Some(body) = request.body.as_ref() { + match body.len() { + Some(0) => { + // Empty body + } + Some(len) => { + trace!("Writing not-chunked body"); + let mut writer = FixedBodyWriter::new(self); + body.write(&mut writer).await.map_err(|e| e.kind())?; + + if writer.written() != len { + return Err(Error::IncorrectBodyWritten); + } + } + None => { + trace!("Writing chunked body"); + match self { + HttpConnection::Plain(c) => { + let mut writer = ChunkedBodyWriter::new(c); + body.write(&mut writer).await?; + writer.write_empty_chunk().await.map_err(|e| e.kind())?; + } + HttpConnection::PlainBuffered(buffered_conn) => { + // Flush the buffered connection so that we can bypass it and rent its buffer + buffered_conn.flush().await.map_err(|e| e.kind())?; + let (conn, buf) = buffered_conn.bypass_with_buf().unwrap(); + + // Construct a new buffered writer that buffers _before_ the chunked body writer + let mut writer = BufferedWrite::new(ChunkedBodyWriter::new(conn), buf); + body.write(&mut writer).await?; + + // Flush the buffered writer and write the empty chunk to the chunked body writer + writer.flush().await.map_err(|e| e.kind())?; + writer + .bypass() + .unwrap() + .write_empty_chunk() + .await + .map_err(|e| e.kind())?; + } + HttpConnection::Tls(c) => { + let mut writer = ChunkedBodyWriter::new(c); + body.write(&mut writer).await?; + writer.write_empty_chunk().await.map_err(|e| e.kind())?; + } + }; + } + } + } + Ok(()) + } } impl ErrorType for HttpConnection<'_, T> @@ -379,7 +435,8 @@ where rx_buf: &'buf mut [u8], ) -> Result>, Error> { let request = self.request.take().ok_or(Error::AlreadySent)?.build(); - request.write(&mut self.conn).await?; + self.conn.write_request(&request).await?; + self.conn.flush().await?; Response::read(&mut self.conn, request.method, rx_buf).await } } @@ -508,7 +565,8 @@ where rx_buf: &'buf mut [u8], ) -> Result>, Error> { request.base_path = Some(self.base_path); - request.write(&mut self.conn).await?; + self.conn.write_request(&request).await?; + self.conn.flush().await?; Response::read(&mut self.conn, request.method, rx_buf).await } } @@ -541,7 +599,8 @@ where let conn = self.conn; let mut request = self.request.build(); request.base_path = Some(self.base_path); - request.write(conn).await?; + conn.write_request(&request).await?; + conn.flush().await?; Response::read(conn, request.method, rx_buf).await } } @@ -590,3 +649,98 @@ where self.request.build() } } + +#[cfg(test)] +mod tests { + use core::convert::Infallible; + + use super::*; + + #[derive(Default)] + struct VecBuffer(Vec); + + impl ErrorType for VecBuffer { + type Error = Infallible; + } + + impl Read for VecBuffer { + async fn read(&mut self, buf: &mut [u8]) -> Result { + unreachable!() + } + } + + impl Write for VecBuffer { + async fn write(&mut self, buf: &[u8]) -> Result { + self.0.extend_from_slice(buf); + Ok(buf.len()) + } + } + + #[tokio::test] + async fn with_empty_body() { + let mut buffer = VecBuffer::default(); + let mut conn = HttpConnection::Plain(&mut buffer); + + let request = Request::new(Method::POST, "/").body([].as_slice()).build(); + conn.write_request(&request).await.unwrap(); + + assert_eq!(b"POST / HTTP/1.1\r\nContent-Length: 0\r\n\r\n", buffer.0.as_slice()); + } + + #[tokio::test] + async fn with_known_body() { + let mut buffer = VecBuffer::default(); + let mut conn = HttpConnection::Plain(&mut buffer); + + let request = Request::new(Method::POST, "/").body(b"BODY".as_slice()).build(); + conn.write_request(&request).await.unwrap(); + + assert_eq!(b"POST / HTTP/1.1\r\nContent-Length: 4\r\n\r\nBODY", buffer.0.as_slice()); + } + + struct ChunkedBody(&'static [&'static [u8]]); + + impl RequestBody for ChunkedBody { + fn len(&self) -> Option { + None // Unknown length: triggers chunked body + } + + async fn write(&self, writer: &mut W) -> Result<(), W::Error> { + for chunk in self.0 { + writer.write_all(chunk).await?; + } + Ok(()) + } + } + + #[tokio::test] + async fn with_unknown_body_unbuffered() { + let mut buffer = VecBuffer::default(); + let mut conn = HttpConnection::Plain(&mut buffer); + + static CHUNKS: [&'static [u8]; 2] = [b"PART1", b"PART2"]; + let request = Request::new(Method::POST, "/").body(ChunkedBody(&CHUNKS)).build(); + conn.write_request(&request).await.unwrap(); + + assert_eq!( + b"POST / HTTP/1.1\r\nTransfer-Encoding: chunked\r\n\r\n5\r\nPART1\r\n5\r\nPART2\r\n0\r\n\r\n", + buffer.0.as_slice() + ); + } + + #[tokio::test] + async fn with_unknown_body_buffered() { + let mut buffer = VecBuffer::default(); + let mut tx_buf = [0; 1024]; + let mut conn = HttpConnection::Plain(&mut buffer).into_buffered(&mut tx_buf); + + static CHUNKS: [&'static [u8]; 2] = [b"PART1", b"PART2"]; + let request = Request::new(Method::POST, "/").body(ChunkedBody(&CHUNKS)).build(); + conn.write_request(&request).await.unwrap(); + + assert_eq!( + b"POST / HTTP/1.1\r\nTransfer-Encoding: chunked\r\n\r\na\r\nPART1PART2\r\n0\r\n\r\n", + buffer.0.as_slice() + ); + } +} diff --git a/src/request.rs b/src/request.rs index 416211e..ae1b41f 100644 --- a/src/request.rs +++ b/src/request.rs @@ -106,8 +106,8 @@ impl<'req, B> Request<'req, B> where B: RequestBody, { - /// Write request to the I/O stream - pub async fn write(&self, c: &mut C) -> Result<(), Error> + /// Write request header to the I/O stream + pub async fn write_header(&self, c: &mut C) -> Result<(), Error> where C: Write, { @@ -161,31 +161,6 @@ where } write_str(c, "\r\n").await?; trace!("Header written"); - if let Some(body) = self.body.as_ref() { - match body.len() { - Some(0) => { - // Empty body - } - Some(len) => { - trace!("Writing not-chunked body"); - let mut writer = FixedBodyWriter(c, 0); - body.write(&mut writer).await.map_err(to_errorkind)?; - - if writer.1 != len { - return Err(Error::IncorrectBodyWritten); - } - } - None => { - trace!("Writing chunked body"); - let mut writer = ChunkedBodyWriter(c, 0); - body.write(&mut writer).await?; - - write_str(c, "0\r\n\r\n").await?; - } - } - } - - c.flush().await.map_err(|e| e.kind())?; Ok(()) } } @@ -337,16 +312,29 @@ where } } -pub struct FixedBodyWriter<'a, C: Write>(&'a mut C, usize); +pub struct FixedBodyWriter(C, usize); -impl ErrorType for FixedBodyWriter<'_, C> +impl FixedBodyWriter +where + C: Write, +{ + pub fn new(conn: C) -> Self { + Self(conn, 0) + } + + pub fn written(&self) -> usize { + self.1 + } +} + +impl ErrorType for FixedBodyWriter where C: Write, { type Error = C::Error; } -impl Write for FixedBodyWriter<'_, C> +impl Write for FixedBodyWriter where C: Write, { @@ -367,9 +355,22 @@ where } } -pub struct ChunkedBodyWriter<'a, C: Write>(&'a mut C, usize); +pub struct ChunkedBodyWriter(C, usize); -impl ErrorType for ChunkedBodyWriter<'_, C> +impl ChunkedBodyWriter +where + C: Write, +{ + pub fn new(conn: C) -> Self { + Self(conn, 0) + } + + pub async fn write_empty_chunk(&mut self) -> Result<(), C::Error> { + self.0.write_all(b"0\r\n\r\n").await + } +} + +impl ErrorType for ChunkedBodyWriter where C: Write, { @@ -380,7 +381,7 @@ fn to_errorkind(e: E) -> embedded_io::ErrorKind { e.kind() } -impl Write for ChunkedBodyWriter<'_, C> +impl Write for ChunkedBodyWriter where C: Write, { @@ -392,6 +393,13 @@ where async fn write_all(&mut self, buf: &[u8]) -> Result<(), Self::Error> { // Write chunk header let len = buf.len(); + + // Do not write an empty chunk as that will terminate the body + // Use `ChunkedBodyWriter.write_empty_chunk` instead if this is intended + if len == 0 { + return Ok(()); + } + let mut hex = [0; 2 * size_of::()]; hex::encode_to_slice(len.to_be_bytes(), &mut hex).unwrap(); let leading_zeros = hex.iter().position(|x| *x != b'0').unwrap_or_default(); @@ -423,7 +431,7 @@ mod tests { Request::new(Method::GET, "/") .basic_auth("username", "password") .build() - .write(&mut buffer) + .write_header(&mut buffer) .await .unwrap(); @@ -439,7 +447,7 @@ mod tests { Request::new(Method::POST, "/") .body([].as_slice()) .build() - .write(&mut buffer) + .write_header(&mut buffer) .await .unwrap(); @@ -447,43 +455,43 @@ mod tests { } #[tokio::test] - async fn with_known_body() { + async fn with_known_body_adds_content_length_header() { let mut buffer = Vec::new(); Request::new(Method::POST, "/") .body(b"BODY".as_slice()) .build() - .write(&mut buffer) + .write_header(&mut buffer) .await .unwrap(); - assert_eq!(b"POST / HTTP/1.1\r\nContent-Length: 4\r\n\r\nBODY", buffer.as_slice()); + assert_eq!(b"POST / HTTP/1.1\r\nContent-Length: 4\r\n\r\n", buffer.as_slice()); } - struct ChunkedBody<'a>(&'a [u8]); + struct ChunkedBody; - impl RequestBody for ChunkedBody<'_> { + impl RequestBody for ChunkedBody { fn len(&self) -> Option { None // Unknown length: triggers chunked body } - async fn write(&self, writer: &mut W) -> Result<(), W::Error> { - writer.write_all(self.0).await + async fn write(&self, _writer: &mut W) -> Result<(), W::Error> { + unreachable!() } } #[tokio::test] - async fn with_unknown_body() { + async fn with_unknown_body_adds_transfer_encoding_header() { let mut buffer = Vec::new(); Request::new(Method::POST, "/") - .body(ChunkedBody(b"BODY".as_slice())) + .body(ChunkedBody) .build() - .write(&mut buffer) + .write_header(&mut buffer) .await .unwrap(); assert_eq!( - b"POST / HTTP/1.1\r\nTransfer-Encoding: chunked\r\n\r\n4\r\nBODY\r\n0\r\n\r\n", + b"POST / HTTP/1.1\r\nTransfer-Encoding: chunked\r\n\r\n", buffer.as_slice() ); } diff --git a/tests/request.rs b/tests/request.rs index 111ccc5..fa5fea9 100644 --- a/tests/request.rs +++ b/tests/request.rs @@ -2,9 +2,9 @@ use embedded_io_adapters::tokio_1::FromTokio; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Server}; use reqwless::client::HttpConnection; -use reqwless::request::{Method, RequestBuilder}; +use reqwless::request::RequestBuilder; use reqwless::Error; -use reqwless::{headers::ContentType, request::Request, response::Response}; +use reqwless::{headers::ContentType, request::Request}; use std::str::from_utf8; use std::sync::Once; use tokio::net::TcpStream; @@ -48,9 +48,8 @@ async fn test_request_response() { .content_type(ContentType::TextPlain) .build(); - request.write(&mut stream).await.unwrap(); let mut rx_buf = [0; 4096]; - let response = Response::read(&mut stream, Method::POST, &mut rx_buf).await.unwrap(); + let response = stream.send(request, &mut rx_buf).await.unwrap(); let body = response.body().read_to_end().await; assert_eq!(body.unwrap(), b"PING"); @@ -70,7 +69,7 @@ async fn write_without_base_path() { let request = Request::get("/hello").build(); let mut buf = Vec::new(); - request.write(&mut buf).await.unwrap(); + request.write_header(&mut buf).await.unwrap(); assert!(from_utf8(&buf).unwrap().starts_with("GET /hello HTTP/1.1")); } @@ -82,16 +81,15 @@ async fn google_panic() { let addr = SocketAddr::from((google_ip, 80)); let conn = tokio::net::TcpStream::connect(addr).await.unwrap(); - let mut conn = TokioStream(FromTokio::new(conn)); + let mut conn = HttpConnection::Plain(TokioStream(FromTokio::new(conn))); let request = Request::get("/") .host("www.google.com") .content_type(ContentType::TextPlain) .build(); - request.write(&mut conn).await.unwrap(); let mut rx_buf = [0; 8 * 1024]; - let resp = Response::read(&mut conn, Method::GET, &mut rx_buf).await.unwrap(); + let resp = conn.send(request, &mut rx_buf).await.unwrap(); let result = resp.body().read_to_end().await; match result { From 609cf4b0f5931c306c01a964dcb283662da0560b Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Fri, 17 May 2024 15:12:31 +0200 Subject: [PATCH 02/16] This requires buffered-io 0.5.1 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index c127c9d..1d2a9b1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ keywords = ["embedded", "async", "http", "no_std"] exclude = [".github"] [dependencies] -buffered-io = { version = "0.5" } +buffered-io = { version = "0.5.1" } embedded-io = { version = "0.6" } embedded-io-async = { version = "0.6" } embedded-nal-async = "0.7.0" From b66eb131b3b0f390f378ee3a01faa22a310e2b24 Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Fri, 17 May 2024 15:14:59 +0200 Subject: [PATCH 03/16] Fix clippy warning --- src/reader.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/reader.rs b/src/reader.rs index bb22d13..6d1b8c6 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -98,7 +98,7 @@ where unreachable!() } - self.buffer.loaded = self.stream.read(&mut self.buffer.buffer).await.map_err(|e| e.kind())?; + self.buffer.loaded = self.stream.read(self.buffer.buffer).await.map_err(|e| e.kind())?; } self.buffer.fill_buf() From ab074c6f33be527cb4e6f8a62228d1d6d68ecbf0 Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Fri, 17 May 2024 15:23:38 +0200 Subject: [PATCH 04/16] Fix compilation without tls --- src/client.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/client.rs b/src/client.rs index 87d51f6..91f717c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -331,11 +331,14 @@ where .await .map_err(|e| e.kind())?; } + #[cfg(any(feature = "embedded-tls", feature = "esp-mbedtls"))] HttpConnection::Tls(c) => { let mut writer = ChunkedBodyWriter::new(c); body.write(&mut writer).await?; writer.write_empty_chunk().await.map_err(|e| e.kind())?; } + #[cfg(all(not(feature = "embedded-tls"), not(feature = "esp-mbedtls")))] + HttpConnection::Tls(_) => unreachable!(), }; } } @@ -664,7 +667,7 @@ mod tests { } impl Read for VecBuffer { - async fn read(&mut self, buf: &mut [u8]) -> Result { + async fn read(&mut self, _buf: &mut [u8]) -> Result { unreachable!() } } From f80172a6092fa78e98fdc4a9874468d7e6c44a97 Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Fri, 17 May 2024 16:15:19 +0200 Subject: [PATCH 05/16] Change ChunkedWriterStrategy to minimize connection writes --- src/client.rs | 4 ++-- src/request.rs | 25 +++++++++++++++++-------- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/src/client.rs b/src/client.rs index 91f717c..0321ec5 100644 --- a/src/client.rs +++ b/src/client.rs @@ -311,7 +311,7 @@ where HttpConnection::Plain(c) => { let mut writer = ChunkedBodyWriter::new(c); body.write(&mut writer).await?; - writer.write_empty_chunk().await.map_err(|e| e.kind())?; + writer.write_termination().await.map_err(|e| e.kind())?; } HttpConnection::PlainBuffered(buffered_conn) => { // Flush the buffered connection so that we can bypass it and rent its buffer @@ -335,7 +335,7 @@ where HttpConnection::Tls(c) => { let mut writer = ChunkedBodyWriter::new(c); body.write(&mut writer).await?; - writer.write_empty_chunk().await.map_err(|e| e.kind())?; + writer.write_termination().await.map_err(|e| e.kind())?; } #[cfg(all(not(feature = "embedded-tls"), not(feature = "esp-mbedtls")))] HttpConnection::Tls(_) => unreachable!(), diff --git a/src/request.rs b/src/request.rs index ae1b41f..fcac185 100644 --- a/src/request.rs +++ b/src/request.rs @@ -5,7 +5,7 @@ use core::fmt::Write as _; use core::mem::size_of; use embedded_io::{Error as _, ErrorType}; use embedded_io_async::Write; -use heapless::String; +use heapless::{String, Vec}; /// A read only HTTP request type pub struct Request<'req, B> @@ -365,8 +365,8 @@ where Self(conn, 0) } - pub async fn write_empty_chunk(&mut self) -> Result<(), C::Error> { - self.0.write_all(b"0\r\n\r\n").await + pub async fn write_termination(&mut self) -> Result<(), C::Error> { + self.0.write_all(b"\r\n0\r\n\r\n").await } } @@ -404,15 +404,23 @@ where hex::encode_to_slice(len.to_be_bytes(), &mut hex).unwrap(); let leading_zeros = hex.iter().position(|x| *x != b'0').unwrap_or_default(); let (_, hex) = hex.split_at(leading_zeros); - self.0.write_all(hex).await.map_err(to_errorkind)?; - self.0.write_all(b"\r\n").await.map_err(to_errorkind)?; - // Write chunk + let mut header = Vec::::new(); + if self.1 > 0 { + // Write newline terminating ongoing chunk + // We do it here instead of after writing the chunk + // to minimize the number of writes to the underlying connection + header.extend_from_slice(b"\r\n").unwrap(); + } + header.extend_from_slice(hex).unwrap(); + header.extend_from_slice(b"\r\n").unwrap(); + + self.0.write_all(&header).await.map_err(to_errorkind)?; + + // Write chunk payload self.0.write_all(buf).await.map_err(to_errorkind)?; self.1 += len; - // Write newline - self.0.write_all(b"\r\n").await.map_err(to_errorkind)?; Ok(()) } @@ -424,6 +432,7 @@ where #[cfg(test)] mod tests { use super::*; + use std::vec::Vec; #[tokio::test] async fn basic_auth() { From 9d769715b34e1c0b84144fa6c966af8c59191a1b Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Fri, 17 May 2024 16:16:30 +0200 Subject: [PATCH 06/16] Fix rename --- src/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client.rs b/src/client.rs index 0321ec5..b09a143 100644 --- a/src/client.rs +++ b/src/client.rs @@ -327,7 +327,7 @@ where writer .bypass() .unwrap() - .write_empty_chunk() + .write_termination() .await .map_err(|e| e.kind())?; } From c2d40f99f1a195d37ca51dc8e89c4e5f2b4c61eb Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Fri, 17 May 2024 16:20:19 +0200 Subject: [PATCH 07/16] Revert "Change ChunkedWriterStrategy to minimize connection writes" This reverts commit 87353be9b77b66d7a2e700cdd27c4bb9ea9ec419. --- src/client.rs | 4 ++-- src/request.rs | 25 ++++++++----------------- 2 files changed, 10 insertions(+), 19 deletions(-) diff --git a/src/client.rs b/src/client.rs index b09a143..ae635cc 100644 --- a/src/client.rs +++ b/src/client.rs @@ -311,7 +311,7 @@ where HttpConnection::Plain(c) => { let mut writer = ChunkedBodyWriter::new(c); body.write(&mut writer).await?; - writer.write_termination().await.map_err(|e| e.kind())?; + writer.write_empty_chunk().await.map_err(|e| e.kind())?; } HttpConnection::PlainBuffered(buffered_conn) => { // Flush the buffered connection so that we can bypass it and rent its buffer @@ -335,7 +335,7 @@ where HttpConnection::Tls(c) => { let mut writer = ChunkedBodyWriter::new(c); body.write(&mut writer).await?; - writer.write_termination().await.map_err(|e| e.kind())?; + writer.write_empty_chunk().await.map_err(|e| e.kind())?; } #[cfg(all(not(feature = "embedded-tls"), not(feature = "esp-mbedtls")))] HttpConnection::Tls(_) => unreachable!(), diff --git a/src/request.rs b/src/request.rs index fcac185..ae1b41f 100644 --- a/src/request.rs +++ b/src/request.rs @@ -5,7 +5,7 @@ use core::fmt::Write as _; use core::mem::size_of; use embedded_io::{Error as _, ErrorType}; use embedded_io_async::Write; -use heapless::{String, Vec}; +use heapless::String; /// A read only HTTP request type pub struct Request<'req, B> @@ -365,8 +365,8 @@ where Self(conn, 0) } - pub async fn write_termination(&mut self) -> Result<(), C::Error> { - self.0.write_all(b"\r\n0\r\n\r\n").await + pub async fn write_empty_chunk(&mut self) -> Result<(), C::Error> { + self.0.write_all(b"0\r\n\r\n").await } } @@ -404,23 +404,15 @@ where hex::encode_to_slice(len.to_be_bytes(), &mut hex).unwrap(); let leading_zeros = hex.iter().position(|x| *x != b'0').unwrap_or_default(); let (_, hex) = hex.split_at(leading_zeros); + self.0.write_all(hex).await.map_err(to_errorkind)?; + self.0.write_all(b"\r\n").await.map_err(to_errorkind)?; - let mut header = Vec::::new(); - if self.1 > 0 { - // Write newline terminating ongoing chunk - // We do it here instead of after writing the chunk - // to minimize the number of writes to the underlying connection - header.extend_from_slice(b"\r\n").unwrap(); - } - header.extend_from_slice(hex).unwrap(); - header.extend_from_slice(b"\r\n").unwrap(); - - self.0.write_all(&header).await.map_err(to_errorkind)?; - - // Write chunk payload + // Write chunk self.0.write_all(buf).await.map_err(to_errorkind)?; self.1 += len; + // Write newline + self.0.write_all(b"\r\n").await.map_err(to_errorkind)?; Ok(()) } @@ -432,7 +424,6 @@ where #[cfg(test)] mod tests { use super::*; - use std::vec::Vec; #[tokio::test] async fn basic_auth() { From c6d1f595ac969fd74a61c71b8137fdbcdea1247e Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Fri, 17 May 2024 16:20:31 +0200 Subject: [PATCH 08/16] Revert "Fix rename" This reverts commit 5b280152227a1264a86424fe3fd0eedb642a2e25. --- src/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client.rs b/src/client.rs index ae635cc..91f717c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -327,7 +327,7 @@ where writer .bypass() .unwrap() - .write_termination() + .write_empty_chunk() .await .map_err(|e| e.kind())?; } From cbae22caf6215470102097a66d6afc0852b5ac00 Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Fri, 17 May 2024 23:33:05 +0200 Subject: [PATCH 09/16] Add BufferedChunkedBodyWriter --- Cargo.toml | 6 +- src/client.rs | 24 ++----- src/request.rs | 176 +++++++++++++++++++++++++++++++++++++++++++------ 3 files changed, 166 insertions(+), 40 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1d2a9b1..83d32f8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ keywords = ["embedded", "async", "http", "no_std"] exclude = [".github"] [dependencies] -buffered-io = { version = "0.5.1" } +buffered-io = { version = "0.5.2" } embedded-io = { version = "0.6" } embedded-io-async = { version = "0.6" } embedded-nal-async = "0.7.0" @@ -27,7 +27,9 @@ defmt = { version = "0.3", optional = true } embedded-tls = { version = "0.17", default-features = false, optional = true } rand_chacha = { version = "0.3", default-features = false } nourl = "0.1.1" -esp-mbedtls = { git = "https://github.com/esp-rs/esp-mbedtls.git", features = ["async"], optional = true } +esp-mbedtls = { git = "https://github.com/esp-rs/esp-mbedtls.git", features = [ + "async", +], optional = true } [dev-dependencies] hyper = { version = "0.14.23", features = ["full"] } diff --git a/src/client.rs b/src/client.rs index 91f717c..2b007c4 100644 --- a/src/client.rs +++ b/src/client.rs @@ -311,31 +311,19 @@ where HttpConnection::Plain(c) => { let mut writer = ChunkedBodyWriter::new(c); body.write(&mut writer).await?; - writer.write_empty_chunk().await.map_err(|e| e.kind())?; + writer.terminate().await.map_err(|e| e.kind())?; } - HttpConnection::PlainBuffered(buffered_conn) => { - // Flush the buffered connection so that we can bypass it and rent its buffer - buffered_conn.flush().await.map_err(|e| e.kind())?; - let (conn, buf) = buffered_conn.bypass_with_buf().unwrap(); - - // Construct a new buffered writer that buffers _before_ the chunked body writer - let mut writer = BufferedWrite::new(ChunkedBodyWriter::new(conn), buf); + HttpConnection::PlainBuffered(buffered) => { + let (conn, buf, unwritten) = buffered.split(); + let mut writer = BufferedChunkedBodyWriter::new_with_data(conn, buf, unwritten); body.write(&mut writer).await?; - - // Flush the buffered writer and write the empty chunk to the chunked body writer - writer.flush().await.map_err(|e| e.kind())?; - writer - .bypass() - .unwrap() - .write_empty_chunk() - .await - .map_err(|e| e.kind())?; + writer.terminate().await.map_err(|e| e.kind())?; } #[cfg(any(feature = "embedded-tls", feature = "esp-mbedtls"))] HttpConnection::Tls(c) => { let mut writer = ChunkedBodyWriter::new(c); body.write(&mut writer).await?; - writer.write_empty_chunk().await.map_err(|e| e.kind())?; + writer.terminate().await.map_err(|e| e.kind())?; } #[cfg(all(not(feature = "embedded-tls"), not(feature = "esp-mbedtls")))] HttpConnection::Tls(_) => unreachable!(), diff --git a/src/request.rs b/src/request.rs index ae1b41f..68e4c8f 100644 --- a/src/request.rs +++ b/src/request.rs @@ -248,7 +248,7 @@ impl Method { } async fn write_str(c: &mut C, data: &str) -> Result<(), Error> { - c.write_all(data.as_bytes()).await.map_err(to_errorkind)?; + c.write_all(data.as_bytes()).await.map_err(|e| e.kind())?; Ok(()) } @@ -355,17 +355,35 @@ where } } -pub struct ChunkedBodyWriter(C, usize); +const fn hex_chars(number: usize) -> u32 { + if number == 0 { + 1 + } else { + (usize::BITS - number.leading_zeros()).div_ceil(4) + } +} + +fn write_chunked_header(buf: &mut [u8], chunk_len: usize) -> usize { + let mut hex = [0; 2 * size_of::()]; + hex::encode_to_slice(chunk_len.to_be_bytes(), &mut hex).unwrap(); + let leading_zeros = hex.iter().position(|x| *x != b'0').unwrap_or_default(); + let hex_chars = hex.len() - leading_zeros; + buf[..hex_chars].copy_from_slice(&hex[leading_zeros..]); + buf[hex_chars..hex_chars + 2].copy_from_slice(b"\r\n"); + hex_chars + 2 +} + +pub struct ChunkedBodyWriter(C); impl ChunkedBodyWriter where C: Write, { pub fn new(conn: C) -> Self { - Self(conn, 0) + Self(conn) } - pub async fn write_empty_chunk(&mut self) -> Result<(), C::Error> { + pub async fn terminate(&mut self) -> Result<(), C::Error> { self.0.write_all(b"0\r\n\r\n").await } } @@ -377,21 +395,16 @@ where type Error = embedded_io::ErrorKind; } -fn to_errorkind(e: E) -> embedded_io::ErrorKind { - e.kind() -} - impl Write for ChunkedBodyWriter where C: Write, { async fn write(&mut self, buf: &[u8]) -> Result { - self.write_all(buf).await.map_err(to_errorkind)?; + self.write_all(buf).await.map_err(|e| e.kind())?; Ok(buf.len()) } async fn write_all(&mut self, buf: &[u8]) -> Result<(), Self::Error> { - // Write chunk header let len = buf.len(); // Do not write an empty chunk as that will terminate the body @@ -400,19 +413,19 @@ where return Ok(()); } - let mut hex = [0; 2 * size_of::()]; - hex::encode_to_slice(len.to_be_bytes(), &mut hex).unwrap(); - let leading_zeros = hex.iter().position(|x| *x != b'0').unwrap_or_default(); - let (_, hex) = hex.split_at(leading_zeros); - self.0.write_all(hex).await.map_err(to_errorkind)?; - self.0.write_all(b"\r\n").await.map_err(to_errorkind)?; + // Write chunk header + let mut header_buf = [0; 2 * size_of::() + 2]; + let header_len = write_chunked_header(&mut header_buf, len); + self.0 + .write_all(&header_buf[..header_len]) + .await + .map_err(|e| e.kind())?; // Write chunk - self.0.write_all(buf).await.map_err(to_errorkind)?; - self.1 += len; + self.0.write_all(buf).await.map_err(|e| e.kind())?; - // Write newline - self.0.write_all(b"\r\n").await.map_err(to_errorkind)?; + // Write newline footer + self.0.write_all(b"\r\n").await.map_err(|e| e.kind())?; Ok(()) } @@ -421,10 +434,133 @@ where } } +pub struct BufferedChunkedBodyWriter<'a, C: Write> { + conn: C, + buf: &'a mut [u8], + header_pos: usize, + pos: usize, + max_header_size: usize, + max_footer_size: usize, +} + +impl<'a, C> BufferedChunkedBodyWriter<'a, C> +where + C: Write, +{ + pub fn new_with_data(conn: C, buf: &'a mut [u8], written: usize) -> Self { + let max_hex_chars = hex_chars(buf.len()); + let max_header_size = max_hex_chars as usize + 2; + let max_footer_size = 2; + assert!(buf.len() > max_header_size + max_footer_size); // There must be space for the chunk header and footer + Self { + conn, + buf, + header_pos: written, + pos: written + max_header_size, + max_header_size, + max_footer_size, + } + } + + pub async fn terminate(&mut self) -> Result<(), C::Error> { + if self.pos > self.header_pos + self.max_header_size { + self.finish_current_chunk(); + } + const EMPTY: &[u8; 5] = b"0\r\n\r\n"; + if self.header_pos + EMPTY.len() > self.buf.len() { + self.emit_finished_chunk().await?; + } + + self.buf[self.header_pos..self.header_pos + EMPTY.len()].copy_from_slice(EMPTY); + self.header_pos += EMPTY.len(); + self.pos = self.header_pos + self.max_header_size; + self.emit_finished_chunk().await + } + + fn append_current_chunk(&mut self, buf: &[u8]) -> usize { + let buffered = usize::min(buf.len(), self.buf.len() - self.max_footer_size - self.pos); + if buffered > 0 { + self.buf[self.pos..self.pos + buffered].copy_from_slice(&buf[..buffered]); + self.pos += buffered; + } + buffered + } + + fn finish_current_chunk(&mut self) { + // Write the header in the allocated position position + let chunk_len = self.pos - self.header_pos - self.max_header_size; + let header_buf = &mut self.buf[self.header_pos..self.header_pos + self.max_header_size]; + let header_len = write_chunked_header(header_buf, chunk_len); + + // Move the payload if the header length was not as large as it could possibly be + let spacing = self.max_header_size - header_len; + if spacing > 0 { + self.buf.copy_within( + self.header_pos + self.max_header_size..self.pos, + self.header_pos + header_len, + ); + self.pos -= spacing + } + + // Write newline footer after chunk payload + self.buf[self.pos..self.pos + 2].copy_from_slice(b"\r\n"); + self.pos += 2; + + self.header_pos = self.pos; + self.pos = self.header_pos + self.max_header_size; + } + + async fn emit_finished_chunk(&mut self) -> Result<(), C::Error> { + self.conn.write_all(&self.buf[..self.header_pos]).await?; + self.header_pos = 0; + self.pos = self.max_header_size; + Ok(()) + } +} + +impl ErrorType for BufferedChunkedBodyWriter<'_, C> +where + C: Write, +{ + type Error = embedded_io::ErrorKind; +} + +impl Write for BufferedChunkedBodyWriter<'_, C> +where + C: Write, +{ + async fn write(&mut self, buf: &[u8]) -> Result { + let written = self.append_current_chunk(buf); + if written < buf.len() { + self.finish_current_chunk(); + self.emit_finished_chunk().await.map_err(|e| e.kind())?; + } + Ok(written) + } + + async fn flush(&mut self) -> Result<(), Self::Error> { + if self.header_pos > 0 { + self.finish_current_chunk(); + self.emit_finished_chunk().await.map_err(|e| e.kind())?; + } + self.conn.flush().await.map_err(|e| e.kind()) + } +} + #[cfg(test)] mod tests { use super::*; + #[test] + fn hex_chars_values() { + assert_eq!(1, hex_chars(0)); + assert_eq!(1, hex_chars(1)); + assert_eq!(1, hex_chars(0xF)); + assert_eq!(2, hex_chars(0x10)); + assert_eq!(2, hex_chars(0xFF)); + assert_eq!(3, hex_chars(0x100)); + } + #[tokio::test] async fn basic_auth() { let mut buffer: Vec = Vec::new(); From 2df80cadf54edcb8a6416fd443b65a8913a99261 Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Tue, 21 May 2024 07:53:34 +0200 Subject: [PATCH 10/16] Move body writers to separate file --- src/body_writer.rs | 254 +++++++++++++++++++++++++++++++++++++++++++++ src/client.rs | 5 +- src/lib.rs | 1 + src/request.rs | 250 +------------------------------------------- 4 files changed, 261 insertions(+), 249 deletions(-) create mode 100644 src/body_writer.rs diff --git a/src/body_writer.rs b/src/body_writer.rs new file mode 100644 index 0000000..65c6ccf --- /dev/null +++ b/src/body_writer.rs @@ -0,0 +1,254 @@ +use core::mem::size_of; + +use embedded_io::{Error as _, ErrorType}; +use embedded_io_async::Write; + +pub struct FixedBodyWriter(C, usize); + +impl FixedBodyWriter +where + C: Write, +{ + pub fn new(conn: C) -> Self { + Self(conn, 0) + } + + pub fn written(&self) -> usize { + self.1 + } +} + +impl ErrorType for FixedBodyWriter +where + C: Write, +{ + type Error = C::Error; +} + +impl Write for FixedBodyWriter +where + C: Write, +{ + async fn write(&mut self, buf: &[u8]) -> Result { + let written = self.0.write(buf).await?; + self.1 += written; + Ok(written) + } + + async fn write_all(&mut self, buf: &[u8]) -> Result<(), Self::Error> { + self.0.write_all(buf).await?; + self.1 += buf.len(); + Ok(()) + } + + async fn flush(&mut self) -> Result<(), Self::Error> { + self.0.flush().await + } +} + +pub struct ChunkedBodyWriter(C); + +impl ChunkedBodyWriter +where + C: Write, +{ + pub fn new(conn: C) -> Self { + Self(conn) + } + + pub async fn terminate(&mut self) -> Result<(), C::Error> { + self.0.write_all(b"0\r\n\r\n").await + } +} + +impl ErrorType for ChunkedBodyWriter +where + C: Write, +{ + type Error = embedded_io::ErrorKind; +} + +impl Write for ChunkedBodyWriter +where + C: Write, +{ + async fn write(&mut self, buf: &[u8]) -> Result { + self.write_all(buf).await.map_err(|e| e.kind())?; + Ok(buf.len()) + } + + async fn write_all(&mut self, buf: &[u8]) -> Result<(), Self::Error> { + let len = buf.len(); + + // Do not write an empty chunk as that will terminate the body + // Use `ChunkedBodyWriter.write_empty_chunk` instead if this is intended + if len == 0 { + return Ok(()); + } + + // Write chunk header + let mut header_buf = [0; 2 * size_of::() + 2]; + let header_len = write_chunked_header(&mut header_buf, len); + self.0 + .write_all(&header_buf[..header_len]) + .await + .map_err(|e| e.kind())?; + + // Write chunk + self.0.write_all(buf).await.map_err(|e| e.kind())?; + + // Write newline footer + self.0.write_all(b"\r\n").await.map_err(|e| e.kind())?; + Ok(()) + } + + async fn flush(&mut self) -> Result<(), Self::Error> { + self.0.flush().await.map_err(|e| e.kind()) + } +} + +pub struct BufferingChunkedBodyWriter<'a, C: Write> { + conn: C, + buf: &'a mut [u8], + header_pos: usize, + pos: usize, + max_header_size: usize, + max_footer_size: usize, +} + +impl<'a, C> BufferingChunkedBodyWriter<'a, C> +where + C: Write, +{ + pub fn new_with_data(conn: C, buf: &'a mut [u8], written: usize) -> Self { + let max_hex_chars = hex_chars(buf.len()); + let max_header_size = max_hex_chars as usize + 2; + let max_footer_size = 2; + assert!(buf.len() > max_header_size + max_footer_size); // There must be space for the chunk header and footer + Self { + conn, + buf, + header_pos: written, + pos: written + max_header_size, + max_header_size, + max_footer_size, + } + } + + pub async fn terminate(&mut self) -> Result<(), C::Error> { + if self.pos > self.header_pos + self.max_header_size { + self.finish_current_chunk(); + } + const EMPTY: &[u8; 5] = b"0\r\n\r\n"; + if self.header_pos + EMPTY.len() > self.buf.len() { + self.emit_finished_chunk().await?; + } + + self.buf[self.header_pos..self.header_pos + EMPTY.len()].copy_from_slice(EMPTY); + self.header_pos += EMPTY.len(); + self.pos = self.header_pos + self.max_header_size; + self.emit_finished_chunk().await + } + + fn append_current_chunk(&mut self, buf: &[u8]) -> usize { + let buffered = usize::min(buf.len(), self.buf.len() - self.max_footer_size - self.pos); + if buffered > 0 { + self.buf[self.pos..self.pos + buffered].copy_from_slice(&buf[..buffered]); + self.pos += buffered; + } + buffered + } + + fn finish_current_chunk(&mut self) { + // Write the header in the allocated position position + let chunk_len = self.pos - self.header_pos - self.max_header_size; + let header_buf = &mut self.buf[self.header_pos..self.header_pos + self.max_header_size]; + let header_len = write_chunked_header(header_buf, chunk_len); + + // Move the payload if the header length was not as large as it could possibly be + let spacing = self.max_header_size - header_len; + if spacing > 0 { + self.buf.copy_within( + self.header_pos + self.max_header_size..self.pos, + self.header_pos + header_len, + ); + self.pos -= spacing + } + + // Write newline footer after chunk payload + self.buf[self.pos..self.pos + 2].copy_from_slice(b"\r\n"); + self.pos += 2; + + self.header_pos = self.pos; + self.pos = self.header_pos + self.max_header_size; + } + + async fn emit_finished_chunk(&mut self) -> Result<(), C::Error> { + self.conn.write_all(&self.buf[..self.header_pos]).await?; + self.header_pos = 0; + self.pos = self.max_header_size; + Ok(()) + } +} + +impl ErrorType for BufferingChunkedBodyWriter<'_, C> +where + C: Write, +{ + type Error = embedded_io::ErrorKind; +} + +impl Write for BufferingChunkedBodyWriter<'_, C> +where + C: Write, +{ + async fn write(&mut self, buf: &[u8]) -> Result { + let written = self.append_current_chunk(buf); + if written < buf.len() { + self.finish_current_chunk(); + self.emit_finished_chunk().await.map_err(|e| e.kind())?; + } + Ok(written) + } + + async fn flush(&mut self) -> Result<(), Self::Error> { + if self.header_pos > 0 { + self.finish_current_chunk(); + self.emit_finished_chunk().await.map_err(|e| e.kind())?; + } + self.conn.flush().await.map_err(|e| e.kind()) + } +} + +const fn hex_chars(number: usize) -> u32 { + if number == 0 { + 1 + } else { + (usize::BITS - number.leading_zeros()).div_ceil(4) + } +} + +fn write_chunked_header(buf: &mut [u8], chunk_len: usize) -> usize { + let mut hex = [0; 2 * size_of::()]; + hex::encode_to_slice(chunk_len.to_be_bytes(), &mut hex).unwrap(); + let leading_zeros = hex.iter().position(|x| *x != b'0').unwrap_or_default(); + let hex_chars = hex.len() - leading_zeros; + buf[..hex_chars].copy_from_slice(&hex[leading_zeros..]); + buf[hex_chars..hex_chars + 2].copy_from_slice(b"\r\n"); + hex_chars + 2 +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn hex_chars_values() { + assert_eq!(1, hex_chars(0)); + assert_eq!(1, hex_chars(1)); + assert_eq!(1, hex_chars(0xF)); + assert_eq!(2, hex_chars(0x10)); + assert_eq!(2, hex_chars(0xFF)); + assert_eq!(3, hex_chars(0x100)); + } +} diff --git a/src/client.rs b/src/client.rs index 2b007c4..1be30bc 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,3 +1,6 @@ +use crate::body_writer::BufferingChunkedBodyWriter; +use crate::body_writer::ChunkedBodyWriter; +use crate::body_writer::FixedBodyWriter; /// Client using embedded-nal-async traits to establish connections and perform HTTP requests. /// use crate::headers::ContentType; @@ -315,7 +318,7 @@ where } HttpConnection::PlainBuffered(buffered) => { let (conn, buf, unwritten) = buffered.split(); - let mut writer = BufferedChunkedBodyWriter::new_with_data(conn, buf, unwritten); + let mut writer = BufferingChunkedBodyWriter::new_with_data(conn, buf, unwritten); body.write(&mut writer).await?; writer.terminate().await.map_err(|e| e.kind())?; } diff --git a/src/lib.rs b/src/lib.rs index e1f7740..f699415 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,6 +7,7 @@ use embedded_io_async::ReadExactError; mod fmt; +mod body_writer; pub mod client; pub mod headers; mod reader; diff --git a/src/request.rs b/src/request.rs index 68e4c8f..a9238e5 100644 --- a/src/request.rs +++ b/src/request.rs @@ -1,9 +1,8 @@ -use crate::headers::ContentType; /// Low level API for encoding requests and decoding responses. +use crate::headers::ContentType; use crate::Error; use core::fmt::Write as _; -use core::mem::size_of; -use embedded_io::{Error as _, ErrorType}; +use embedded_io::Error as _; use embedded_io_async::Write; use heapless::String; @@ -312,255 +311,10 @@ where } } -pub struct FixedBodyWriter(C, usize); - -impl FixedBodyWriter -where - C: Write, -{ - pub fn new(conn: C) -> Self { - Self(conn, 0) - } - - pub fn written(&self) -> usize { - self.1 - } -} - -impl ErrorType for FixedBodyWriter -where - C: Write, -{ - type Error = C::Error; -} - -impl Write for FixedBodyWriter -where - C: Write, -{ - async fn write(&mut self, buf: &[u8]) -> Result { - let written = self.0.write(buf).await?; - self.1 += written; - Ok(written) - } - - async fn write_all(&mut self, buf: &[u8]) -> Result<(), Self::Error> { - self.0.write_all(buf).await?; - self.1 += buf.len(); - Ok(()) - } - - async fn flush(&mut self) -> Result<(), Self::Error> { - self.0.flush().await - } -} - -const fn hex_chars(number: usize) -> u32 { - if number == 0 { - 1 - } else { - (usize::BITS - number.leading_zeros()).div_ceil(4) - } -} - -fn write_chunked_header(buf: &mut [u8], chunk_len: usize) -> usize { - let mut hex = [0; 2 * size_of::()]; - hex::encode_to_slice(chunk_len.to_be_bytes(), &mut hex).unwrap(); - let leading_zeros = hex.iter().position(|x| *x != b'0').unwrap_or_default(); - let hex_chars = hex.len() - leading_zeros; - buf[..hex_chars].copy_from_slice(&hex[leading_zeros..]); - buf[hex_chars..hex_chars + 2].copy_from_slice(b"\r\n"); - hex_chars + 2 -} - -pub struct ChunkedBodyWriter(C); - -impl ChunkedBodyWriter -where - C: Write, -{ - pub fn new(conn: C) -> Self { - Self(conn) - } - - pub async fn terminate(&mut self) -> Result<(), C::Error> { - self.0.write_all(b"0\r\n\r\n").await - } -} - -impl ErrorType for ChunkedBodyWriter -where - C: Write, -{ - type Error = embedded_io::ErrorKind; -} - -impl Write for ChunkedBodyWriter -where - C: Write, -{ - async fn write(&mut self, buf: &[u8]) -> Result { - self.write_all(buf).await.map_err(|e| e.kind())?; - Ok(buf.len()) - } - - async fn write_all(&mut self, buf: &[u8]) -> Result<(), Self::Error> { - let len = buf.len(); - - // Do not write an empty chunk as that will terminate the body - // Use `ChunkedBodyWriter.write_empty_chunk` instead if this is intended - if len == 0 { - return Ok(()); - } - - // Write chunk header - let mut header_buf = [0; 2 * size_of::() + 2]; - let header_len = write_chunked_header(&mut header_buf, len); - self.0 - .write_all(&header_buf[..header_len]) - .await - .map_err(|e| e.kind())?; - - // Write chunk - self.0.write_all(buf).await.map_err(|e| e.kind())?; - - // Write newline footer - self.0.write_all(b"\r\n").await.map_err(|e| e.kind())?; - Ok(()) - } - - async fn flush(&mut self) -> Result<(), Self::Error> { - self.0.flush().await.map_err(|e| e.kind()) - } -} - -pub struct BufferedChunkedBodyWriter<'a, C: Write> { - conn: C, - buf: &'a mut [u8], - header_pos: usize, - pos: usize, - max_header_size: usize, - max_footer_size: usize, -} - -impl<'a, C> BufferedChunkedBodyWriter<'a, C> -where - C: Write, -{ - pub fn new_with_data(conn: C, buf: &'a mut [u8], written: usize) -> Self { - let max_hex_chars = hex_chars(buf.len()); - let max_header_size = max_hex_chars as usize + 2; - let max_footer_size = 2; - assert!(buf.len() > max_header_size + max_footer_size); // There must be space for the chunk header and footer - Self { - conn, - buf, - header_pos: written, - pos: written + max_header_size, - max_header_size, - max_footer_size, - } - } - - pub async fn terminate(&mut self) -> Result<(), C::Error> { - if self.pos > self.header_pos + self.max_header_size { - self.finish_current_chunk(); - } - const EMPTY: &[u8; 5] = b"0\r\n\r\n"; - if self.header_pos + EMPTY.len() > self.buf.len() { - self.emit_finished_chunk().await?; - } - - self.buf[self.header_pos..self.header_pos + EMPTY.len()].copy_from_slice(EMPTY); - self.header_pos += EMPTY.len(); - self.pos = self.header_pos + self.max_header_size; - self.emit_finished_chunk().await - } - - fn append_current_chunk(&mut self, buf: &[u8]) -> usize { - let buffered = usize::min(buf.len(), self.buf.len() - self.max_footer_size - self.pos); - if buffered > 0 { - self.buf[self.pos..self.pos + buffered].copy_from_slice(&buf[..buffered]); - self.pos += buffered; - } - buffered - } - - fn finish_current_chunk(&mut self) { - // Write the header in the allocated position position - let chunk_len = self.pos - self.header_pos - self.max_header_size; - let header_buf = &mut self.buf[self.header_pos..self.header_pos + self.max_header_size]; - let header_len = write_chunked_header(header_buf, chunk_len); - - // Move the payload if the header length was not as large as it could possibly be - let spacing = self.max_header_size - header_len; - if spacing > 0 { - self.buf.copy_within( - self.header_pos + self.max_header_size..self.pos, - self.header_pos + header_len, - ); - self.pos -= spacing - } - - // Write newline footer after chunk payload - self.buf[self.pos..self.pos + 2].copy_from_slice(b"\r\n"); - self.pos += 2; - - self.header_pos = self.pos; - self.pos = self.header_pos + self.max_header_size; - } - - async fn emit_finished_chunk(&mut self) -> Result<(), C::Error> { - self.conn.write_all(&self.buf[..self.header_pos]).await?; - self.header_pos = 0; - self.pos = self.max_header_size; - Ok(()) - } -} - -impl ErrorType for BufferedChunkedBodyWriter<'_, C> -where - C: Write, -{ - type Error = embedded_io::ErrorKind; -} - -impl Write for BufferedChunkedBodyWriter<'_, C> -where - C: Write, -{ - async fn write(&mut self, buf: &[u8]) -> Result { - let written = self.append_current_chunk(buf); - if written < buf.len() { - self.finish_current_chunk(); - self.emit_finished_chunk().await.map_err(|e| e.kind())?; - } - Ok(written) - } - - async fn flush(&mut self) -> Result<(), Self::Error> { - if self.header_pos > 0 { - self.finish_current_chunk(); - self.emit_finished_chunk().await.map_err(|e| e.kind())?; - } - self.conn.flush().await.map_err(|e| e.kind()) - } -} - #[cfg(test)] mod tests { use super::*; - #[test] - fn hex_chars_values() { - assert_eq!(1, hex_chars(0)); - assert_eq!(1, hex_chars(1)); - assert_eq!(1, hex_chars(0xF)); - assert_eq!(2, hex_chars(0x10)); - assert_eq!(2, hex_chars(0xFF)); - assert_eq!(3, hex_chars(0x100)); - } - #[tokio::test] async fn basic_auth() { let mut buffer: Vec = Vec::new(); From 5949cb83920993c5d7ab1f50ea48a1314ce0281e Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Tue, 21 May 2024 09:04:40 +0200 Subject: [PATCH 11/16] Add tests and optimize allocated header to match the remaining size of the buffer --- src/body_writer.rs | 167 +++++++++++++++++++++++++++++++++++---------- 1 file changed, 131 insertions(+), 36 deletions(-) diff --git a/src/body_writer.rs b/src/body_writer.rs index 65c6ccf..e77d038 100644 --- a/src/body_writer.rs +++ b/src/body_writer.rs @@ -3,6 +3,9 @@ use core::mem::size_of; use embedded_io::{Error as _, ErrorType}; use embedded_io_async::Write; +const NEWLINE: &[u8; 2] = b"\r\n"; +const EMPTY_CHUNK: &[u8; 5] = b"0\r\n\r\n"; + pub struct FixedBodyWriter(C, usize); impl FixedBodyWriter @@ -56,8 +59,9 @@ where Self(conn) } + /// Terminate the request body by writing an empty chunk pub async fn terminate(&mut self) -> Result<(), C::Error> { - self.0.write_all(b"0\r\n\r\n").await + self.0.write_all(EMPTY_CHUNK).await } } @@ -98,7 +102,7 @@ where self.0.write_all(buf).await.map_err(|e| e.kind())?; // Write newline footer - self.0.write_all(b"\r\n").await.map_err(|e| e.kind())?; + self.0.write_all(NEWLINE).await.map_err(|e| e.kind())?; Ok(()) } @@ -110,10 +114,12 @@ where pub struct BufferingChunkedBodyWriter<'a, C: Write> { conn: C, buf: &'a mut [u8], + /// The position where the allocated chunk header starts header_pos: usize, + /// The size of the allocated header (the final header may be smaller) + allocated_header: usize, + /// The position of the data in the chunk pos: usize, - max_header_size: usize, - max_footer_size: usize, } impl<'a, C> BufferingChunkedBodyWriter<'a, C> @@ -121,37 +127,42 @@ where C: Write, { pub fn new_with_data(conn: C, buf: &'a mut [u8], written: usize) -> Self { - let max_hex_chars = hex_chars(buf.len()); - let max_header_size = max_hex_chars as usize + 2; - let max_footer_size = 2; - assert!(buf.len() > max_header_size + max_footer_size); // There must be space for the chunk header and footer + assert!(written <= buf.len()); + let allocated_header = get_max_chunk_header_size(buf.len() - written); + assert!(buf.len() > allocated_header + NEWLINE.len()); // There must be space for the chunk header and footer Self { conn, buf, header_pos: written, - pos: written + max_header_size, - max_header_size, - max_footer_size, + pos: written + allocated_header, + allocated_header, } } + /// Terminate the request body by writing an empty chunk pub async fn terminate(&mut self) -> Result<(), C::Error> { - if self.pos > self.header_pos + self.max_header_size { + assert!(self.allocated_header > 0); + + if self.pos > self.header_pos + self.allocated_header { + // There are bytes written in the current chunk self.finish_current_chunk(); - } - const EMPTY: &[u8; 5] = b"0\r\n\r\n"; - if self.header_pos + EMPTY.len() > self.buf.len() { - self.emit_finished_chunk().await?; + + if self.header_pos + EMPTY_CHUNK.len() > self.buf.len() { + // There is not enough space to fit the empty chunk in the buffer + self.emit_finished_chunk().await?; + } } - self.buf[self.header_pos..self.header_pos + EMPTY.len()].copy_from_slice(EMPTY); - self.header_pos += EMPTY.len(); - self.pos = self.header_pos + self.max_header_size; + self.buf[self.header_pos..self.header_pos + EMPTY_CHUNK.len()].copy_from_slice(EMPTY_CHUNK); + self.header_pos += EMPTY_CHUNK.len(); + self.allocated_header = 0; + self.pos = self.header_pos + self.allocated_header; self.emit_finished_chunk().await } + /// Append to the buffer fn append_current_chunk(&mut self, buf: &[u8]) -> usize { - let buffered = usize::min(buf.len(), self.buf.len() - self.max_footer_size - self.pos); + let buffered = usize::min(buf.len(), self.buf.len() - NEWLINE.len() - self.pos); if buffered > 0 { self.buf[self.pos..self.pos + buffered].copy_from_slice(&buf[..buffered]); self.pos += buffered; @@ -159,34 +170,37 @@ where buffered } + /// Finish the current chunk by writing the header fn finish_current_chunk(&mut self) { // Write the header in the allocated position position - let chunk_len = self.pos - self.header_pos - self.max_header_size; - let header_buf = &mut self.buf[self.header_pos..self.header_pos + self.max_header_size]; + let chunk_len = self.pos - self.header_pos - self.allocated_header; + let header_buf = &mut self.buf[self.header_pos..self.header_pos + self.allocated_header]; let header_len = write_chunked_header(header_buf, chunk_len); // Move the payload if the header length was not as large as it could possibly be - let spacing = self.max_header_size - header_len; + let spacing = self.allocated_header - header_len; if spacing > 0 { self.buf.copy_within( - self.header_pos + self.max_header_size..self.pos, + self.header_pos + self.allocated_header..self.pos, self.header_pos + header_len, ); self.pos -= spacing } // Write newline footer after chunk payload - self.buf[self.pos..self.pos + 2].copy_from_slice(b"\r\n"); + self.buf[self.pos..self.pos + NEWLINE.len()].copy_from_slice(NEWLINE); self.pos += 2; self.header_pos = self.pos; - self.pos = self.header_pos + self.max_header_size; + self.allocated_header = get_max_chunk_header_size(self.buf.len() - self.header_pos); + self.pos = self.header_pos + self.allocated_header; } async fn emit_finished_chunk(&mut self) -> Result<(), C::Error> { self.conn.write_all(&self.buf[..self.header_pos]).await?; self.header_pos = 0; - self.pos = self.max_header_size; + self.allocated_header = get_max_chunk_header_size(self.buf.len()); + self.pos = self.allocated_header; Ok(()) } } @@ -220,7 +234,7 @@ where } } -const fn hex_chars(number: usize) -> u32 { +const fn get_hex_chars(number: usize) -> u32 { if number == 0 { 1 } else { @@ -228,13 +242,22 @@ const fn hex_chars(number: usize) -> u32 { } } +const fn get_max_chunk_header_size(buffer_size: usize) -> usize { + if buffer_size >= NEWLINE.len() + NEWLINE.len() { + get_hex_chars(buffer_size - NEWLINE.len() - NEWLINE.len()) as usize + NEWLINE.len() + } else { + // Not enough space in buffer to fit a header + footer + 0 + } +} + fn write_chunked_header(buf: &mut [u8], chunk_len: usize) -> usize { let mut hex = [0; 2 * size_of::()]; hex::encode_to_slice(chunk_len.to_be_bytes(), &mut hex).unwrap(); let leading_zeros = hex.iter().position(|x| *x != b'0').unwrap_or_default(); let hex_chars = hex.len() - leading_zeros; buf[..hex_chars].copy_from_slice(&hex[leading_zeros..]); - buf[hex_chars..hex_chars + 2].copy_from_slice(b"\r\n"); + buf[hex_chars..hex_chars + NEWLINE.len()].copy_from_slice(NEWLINE); hex_chars + 2 } @@ -243,12 +266,84 @@ mod tests { use super::*; #[test] - fn hex_chars_values() { - assert_eq!(1, hex_chars(0)); - assert_eq!(1, hex_chars(1)); - assert_eq!(1, hex_chars(0xF)); - assert_eq!(2, hex_chars(0x10)); - assert_eq!(2, hex_chars(0xFF)); - assert_eq!(3, hex_chars(0x100)); + fn can_get_hex_chars() { + assert_eq!(1, get_hex_chars(0)); + assert_eq!(1, get_hex_chars(1)); + assert_eq!(1, get_hex_chars(0xF)); + assert_eq!(2, get_hex_chars(0x10)); + assert_eq!(2, get_hex_chars(0xFF)); + assert_eq!(3, get_hex_chars(0x100)); + } + + #[test] + fn can_get_max_chunk_header_size() { + assert_eq!(0, get_max_chunk_header_size(3)); + assert_eq!(3, get_max_chunk_header_size(0x00 + 2 + 2)); + assert_eq!(3, get_max_chunk_header_size(0x01 + 2 + 2)); + assert_eq!(3, get_max_chunk_header_size(0x0F + 2 + 2)); + assert_eq!(4, get_max_chunk_header_size(0x10 + 2 + 2)); + } + + #[tokio::test] + async fn preserves_already_written_bytes_in_the_buffer_without_any_chunks() { + // Given + let mut conn = Vec::new(); + let mut buf = [0; 1024]; + buf[..5].copy_from_slice(b"HELLO"); + + // When + let mut writer = BufferingChunkedBodyWriter::new_with_data(&mut conn, &mut buf, 5); + writer.terminate().await.unwrap(); + + // Then + assert_eq!(b"HELLO0\r\n\r\n", conn.as_slice()); + } + + #[tokio::test] + async fn preserves_already_written_bytes_in_the_buffer_with_chunks() { + // Given + let mut conn = Vec::new(); + let mut buf = [0; 1024]; + buf[..5].copy_from_slice(b"HELLO"); + + // When + let mut writer = BufferingChunkedBodyWriter::new_with_data(&mut conn, &mut buf, 5); + writer.write_all(b"BODY").await.unwrap(); + writer.terminate().await.unwrap(); + + // Then + assert_eq!(b"HELLO4\r\nBODY\r\n0\r\n\r\n", conn.as_slice()); + } + + #[tokio::test] + async fn current_chunk_is_emitted_before_empty_chunk_is_emitted() { + // Given + let mut conn = Vec::new(); + let mut buf = [0; 14]; + buf[..5].copy_from_slice(b"HELLO"); + + // When + let mut writer = BufferingChunkedBodyWriter::new_with_data(&mut conn, &mut buf, 5); + writer.write_all(b"BODY").await.unwrap(); // Can fit + writer.terminate().await.unwrap(); // Cannot fit + + // Then + assert_eq!(b"HELLO4\r\nBODY\r\n0\r\n\r\n", conn.as_slice()); + } + + #[tokio::test] + async fn write_emits_chunks() { + // Given + let mut conn = Vec::new(); + let mut buf = [0; 12]; + buf[..5].copy_from_slice(b"HELLO"); + + // When + let mut writer = BufferingChunkedBodyWriter::new_with_data(&mut conn, &mut buf, 5); + writer.write_all(b"BODY").await.unwrap(); // Only "BO" can fit first, then "DY" is written in a different chunk + writer.terminate().await.unwrap(); + + // Then + assert_eq!(b"HELLO2\r\nBO\r\n2\r\nDY\r\n0\r\n\r\n", conn.as_slice()); } } From 564b5c4ba2836b27f380eed4ec10dae53021fef0 Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Tue, 21 May 2024 09:58:07 +0200 Subject: [PATCH 12/16] Fix test for any buffered chunk bytes in flush() --- src/body_writer.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/body_writer.rs b/src/body_writer.rs index e77d038..1ea7fdc 100644 --- a/src/body_writer.rs +++ b/src/body_writer.rs @@ -226,7 +226,8 @@ where } async fn flush(&mut self) -> Result<(), Self::Error> { - if self.header_pos > 0 { + if self.pos > self.header_pos + self.allocated_header { + // There are bytes written in the current chunk self.finish_current_chunk(); self.emit_finished_chunk().await.map_err(|e| e.kind())?; } From 6a58d14b5501afbd3672c750d4be7bf12c72ddb4 Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Tue, 21 May 2024 09:58:37 +0200 Subject: [PATCH 13/16] Rename emit_finished_chunk to emit_finished_chunks --- src/body_writer.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/body_writer.rs b/src/body_writer.rs index 1ea7fdc..3c9a445 100644 --- a/src/body_writer.rs +++ b/src/body_writer.rs @@ -149,7 +149,7 @@ where if self.header_pos + EMPTY_CHUNK.len() > self.buf.len() { // There is not enough space to fit the empty chunk in the buffer - self.emit_finished_chunk().await?; + self.emit_finished_chunks().await?; } } @@ -157,7 +157,7 @@ where self.header_pos += EMPTY_CHUNK.len(); self.allocated_header = 0; self.pos = self.header_pos + self.allocated_header; - self.emit_finished_chunk().await + self.emit_finished_chunks().await } /// Append to the buffer @@ -196,7 +196,7 @@ where self.pos = self.header_pos + self.allocated_header; } - async fn emit_finished_chunk(&mut self) -> Result<(), C::Error> { + async fn emit_finished_chunks(&mut self) -> Result<(), C::Error> { self.conn.write_all(&self.buf[..self.header_pos]).await?; self.header_pos = 0; self.allocated_header = get_max_chunk_header_size(self.buf.len()); @@ -220,7 +220,7 @@ where let written = self.append_current_chunk(buf); if written < buf.len() { self.finish_current_chunk(); - self.emit_finished_chunk().await.map_err(|e| e.kind())?; + self.emit_finished_chunks().await.map_err(|e| e.kind())?; } Ok(written) } @@ -229,7 +229,7 @@ where if self.pos > self.header_pos + self.allocated_header { // There are bytes written in the current chunk self.finish_current_chunk(); - self.emit_finished_chunk().await.map_err(|e| e.kind())?; + self.emit_finished_chunks().await.map_err(|e| e.kind())?; } self.conn.flush().await.map_err(|e| e.kind()) } From ffdf4806da5e0b84e7a3d25f1028b5794ed6086e Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Tue, 21 May 2024 09:59:37 +0200 Subject: [PATCH 14/16] Ensure that we clear the BufferedWrite that we inherit --- Cargo.toml | 2 +- src/client.rs | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 83d32f8..fa666c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ keywords = ["embedded", "async", "http", "no_std"] exclude = [".github"] [dependencies] -buffered-io = { version = "0.5.2" } +buffered-io = { version = "0.5.3" } embedded-io = { version = "0.6" } embedded-io-async = { version = "0.6" } embedded-nal-async = "0.7.0" diff --git a/src/client.rs b/src/client.rs index 1be30bc..163ab50 100644 --- a/src/client.rs +++ b/src/client.rs @@ -321,6 +321,7 @@ where let mut writer = BufferingChunkedBodyWriter::new_with_data(conn, buf, unwritten); body.write(&mut writer).await?; writer.terminate().await.map_err(|e| e.kind())?; + buffered.clear(); } #[cfg(any(feature = "embedded-tls", feature = "esp-mbedtls"))] HttpConnection::Tls(c) => { From 3c6cd6f28b9937fbe6d6c9c60b831fe49f97534b Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Tue, 21 May 2024 10:13:53 +0200 Subject: [PATCH 15/16] Fixes according to review - thanks @bugadani ! --- src/body_writer.rs | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/src/body_writer.rs b/src/body_writer.rs index 3c9a445..6c535b7 100644 --- a/src/body_writer.rs +++ b/src/body_writer.rs @@ -235,17 +235,19 @@ where } } -const fn get_hex_chars(number: usize) -> u32 { +/// Get the number of hex characters for a number. +/// E.g. 0x0 => 1, 0x0F => 1, 0x10 => 2, 0x1234 => 4. +const fn get_num_hex_chars(number: usize) -> usize { if number == 0 { 1 } else { - (usize::BITS - number.leading_zeros()).div_ceil(4) + (usize::BITS - number.leading_zeros()).div_ceil(4) as usize } } const fn get_max_chunk_header_size(buffer_size: usize) -> usize { - if buffer_size >= NEWLINE.len() + NEWLINE.len() { - get_hex_chars(buffer_size - NEWLINE.len() - NEWLINE.len()) as usize + NEWLINE.len() + if let Some(hex_chars_and_payload_size) = buffer_size.checked_sub(2 * NEWLINE.len()) { + get_num_hex_chars(hex_chars_and_payload_size) + NEWLINE.len() } else { // Not enough space in buffer to fit a header + footer 0 @@ -268,12 +270,12 @@ mod tests { #[test] fn can_get_hex_chars() { - assert_eq!(1, get_hex_chars(0)); - assert_eq!(1, get_hex_chars(1)); - assert_eq!(1, get_hex_chars(0xF)); - assert_eq!(2, get_hex_chars(0x10)); - assert_eq!(2, get_hex_chars(0xFF)); - assert_eq!(3, get_hex_chars(0x100)); + assert_eq!(1, get_num_hex_chars(0)); + assert_eq!(1, get_num_hex_chars(1)); + assert_eq!(1, get_num_hex_chars(0xF)); + assert_eq!(2, get_num_hex_chars(0x10)); + assert_eq!(2, get_num_hex_chars(0xFF)); + assert_eq!(3, get_num_hex_chars(0x100)); } #[test] @@ -283,6 +285,8 @@ mod tests { assert_eq!(3, get_max_chunk_header_size(0x01 + 2 + 2)); assert_eq!(3, get_max_chunk_header_size(0x0F + 2 + 2)); assert_eq!(4, get_max_chunk_header_size(0x10 + 2 + 2)); + assert_eq!(4, get_max_chunk_header_size(0x11 + 2 + 2)); + assert_eq!(4, get_max_chunk_header_size(0x12 + 2 + 2)); } #[tokio::test] From a813563ce0760f827ff449cd566c7a03b1ff06e7 Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Tue, 21 May 2024 10:22:11 +0200 Subject: [PATCH 16/16] Fix write_chunked_header to support zero chunk length --- src/body_writer.rs | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/src/body_writer.rs b/src/body_writer.rs index 6c535b7..c7aea69 100644 --- a/src/body_writer.rs +++ b/src/body_writer.rs @@ -257,7 +257,7 @@ const fn get_max_chunk_header_size(buffer_size: usize) -> usize { fn write_chunked_header(buf: &mut [u8], chunk_len: usize) -> usize { let mut hex = [0; 2 * size_of::()]; hex::encode_to_slice(chunk_len.to_be_bytes(), &mut hex).unwrap(); - let leading_zeros = hex.iter().position(|x| *x != b'0').unwrap_or_default(); + let leading_zeros = hex.iter().position(|x| *x != b'0').unwrap_or(hex.len() - 1); let hex_chars = hex.len() - leading_zeros; buf[..hex_chars].copy_from_slice(&hex[leading_zeros..]); buf[hex_chars..hex_chars + NEWLINE.len()].copy_from_slice(NEWLINE); @@ -289,6 +289,23 @@ mod tests { assert_eq!(4, get_max_chunk_header_size(0x12 + 2 + 2)); } + #[test] + fn can_write_chunked_header() { + let mut buf = [0; 4]; + + let len = write_chunked_header(&mut buf, 0x00); + assert_eq!(b"0\r\n", &buf[..len]); + + let len = write_chunked_header(&mut buf, 0x01); + assert_eq!(b"1\r\n", &buf[..len]); + + let len = write_chunked_header(&mut buf, 0x0F); + assert_eq!(b"f\r\n", &buf[..len]); + + let len = write_chunked_header(&mut buf, 0x10); + assert_eq!(b"10\r\n", &buf[..len]); + } + #[tokio::test] async fn preserves_already_written_bytes_in_the_buffer_without_any_chunks() { // Given