From eacc07a5eb0551aba0399767358e61f4c77efa7b Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Mon, 1 Jul 2024 10:31:56 +0200 Subject: [PATCH] Track specifically if the chunked body writer is terminated and add better documentation --- src/body_writer.rs | 64 +++++++++++++++++++++++++++++++++++++++------- 1 file changed, 55 insertions(+), 9 deletions(-) diff --git a/src/body_writer.rs b/src/body_writer.rs index 5cc4316..6385139 100644 --- a/src/body_writer.rs +++ b/src/body_writer.rs @@ -111,15 +111,37 @@ where } } +/// A body writer that buffers internally and emits chunks as expected by the +/// `Transfer-Encoding: chunked` header specification. +/// +/// Each emittted chunk has a header that specifies the size of the chunk, +/// and the last chunk has size equal to zero, indicating the end of the request. +/// +/// The writer can be seeded with a buffer that is already pre-written. This is +/// typical if for example the request header is already written to the buffer. +/// The writer will in this case start appending a chunk to the end of the pre-written +/// buffer data leaving the pre-written data as-is. +/// +/// To minimize the number of write calls to the underlying connection the writer +/// works by pre-allocating the chunk header in the buffer. The written body data is +/// then appended after this pre-allocated header. Depending on the number of bytes +/// actually written to the current chunk before the writer is terminated (indicating +/// the end of the request body), the pre-allocated header may be too large. If this +/// is the case, then the chunk payload is moved into the pre-allocated header region +/// such that the header and payload can be written to the underlying connection in +/// a single write. +/// pub struct BufferingChunkedBodyWriter<'a, C: Write> { conn: C, buf: &'a mut [u8], /// The position where the allocated chunk header starts header_pos: usize, /// The size of the allocated header (the final header may be smaller) + /// This may be 0 if the pre-written bytes in `buf` is too large to fit a header. allocated_header: usize, /// The position of the data in the chunk pos: usize, + terminated: bool, } impl<'a, C> BufferingChunkedBodyWriter<'a, C> @@ -136,31 +158,35 @@ where header_pos: written, pos: written + allocated_header, allocated_header, + terminated: false, } } /// Terminate the request body by writing an empty chunk pub async fn terminate(&mut self) -> Result<(), C::Error> { - assert!(self.allocated_header > 0); + assert!(!self.terminated); if self.pos > self.header_pos + self.allocated_header { // There are bytes written in the current chunk self.finish_current_chunk(); + } - if self.header_pos + EMPTY_CHUNK.len() > self.buf.len() { - // There is not enough space to fit the empty chunk in the buffer - self.emit_buffered().await?; - } + if self.header_pos + EMPTY_CHUNK.len() > self.buf.len() { + // There is not enough space to fit the empty chunk in the buffer + self.emit_buffered().await?; } self.buf[self.header_pos..self.header_pos + EMPTY_CHUNK.len()].copy_from_slice(EMPTY_CHUNK); self.header_pos += EMPTY_CHUNK.len(); self.allocated_header = 0; self.pos = self.header_pos + self.allocated_header; - self.emit_buffered().await + self.emit_buffered().await?; + self.terminated = true; + Ok(()) } - /// Append to the buffer + /// Append data to the current chunk and return the number of bytes appended. + /// This returns 0 if there is no current chunk to append to. fn append_current_chunk(&mut self, buf: &[u8]) -> usize { let buffered = usize::min(buf.len(), self.buf.len().saturating_sub(NEWLINE.len() + self.pos)); if buffered > 0 { @@ -241,7 +267,8 @@ where self.finish_current_chunk(); self.emit_buffered().await.map_err(|e| e.kind())?; } else if self.header_pos > 0 { - // There are pre-written bytes in the buffer + // There are pre-written bytes in the buffer but no current chunk + // (the number of pre-written was so large that the space for a header could not be allocated) self.emit_buffered().await.map_err(|e| e.kind())?; } self.conn.flush().await.map_err(|e| e.kind()) @@ -293,6 +320,9 @@ mod tests { #[test] fn can_get_max_chunk_header_size() { + assert_eq!(0, get_max_chunk_header_size(0)); + assert_eq!(0, get_max_chunk_header_size(1)); + assert_eq!(0, get_max_chunk_header_size(2)); assert_eq!(0, get_max_chunk_header_size(3)); assert_eq!(3, get_max_chunk_header_size(0x00 + 2 + 2)); assert_eq!(3, get_max_chunk_header_size(0x01 + 2 + 2)); @@ -368,7 +398,7 @@ mod tests { } #[tokio::test] - async fn flush_when_entire_buffer_is_prewritten() { + async fn flush_empty_body_when_entire_buffer_is_prewritten() { // Given let mut conn = Vec::new(); let mut buf = [0; 10]; @@ -383,6 +413,22 @@ mod tests { assert_eq!(b"HELLOHELLO", conn.as_slice()); } + #[tokio::test] + async fn terminate_empty_body_when_entire_buffer_is_prewritten() { + // Given + let mut conn = Vec::new(); + let mut buf = [0; 10]; + buf.copy_from_slice(b"HELLOHELLO"); + + // When + let mut writer = BufferingChunkedBodyWriter::new_with_data(&mut conn, &mut buf, 10); + writer.terminate().await.unwrap(); + + // Then + print!("{:?}", conn.as_slice()); + assert_eq!(b"HELLOHELLO0\r\n\r\n", conn.as_slice()); + } + #[tokio::test] async fn flush_when_entire_buffer_is_nearly_prewritten() { // Given