diff --git a/src/protocol/frame/mod.rs b/src/protocol/frame/mod.rs index f9094750..3c45dd92 100644 --- a/src/protocol/frame/mod.rs +++ b/src/protocol/frame/mod.rs @@ -66,9 +66,13 @@ impl FrameSocket where Stream: Write, { - /// Add a frame to the end of the output buffer. - pub fn queue_frame(&mut self, frame: Frame) { - self.codec.queue_frame(frame); + /// Write a frame to stream. + /// + /// This function guarantees that the frame is queued regardless of any errors. + /// There is no need to resend the frame. In order to handle WouldBlock or Incomplete, + /// call write_pending() afterwards. + pub fn write_frame(&mut self, frame: Frame) -> Result<()> { + self.codec.write_frame(&mut self.stream, frame) } /// Complete pending write, if any. @@ -94,10 +98,6 @@ impl FrameCodec { Self { in_buffer: ReadBuffer::new(), out_buffer: Vec::new(), header: None } } - pub(super) fn has_unsent(&self) -> bool { - !self.out_buffer.is_empty() - } - /// Create a new frame codec from partially read data. pub(super) fn from_partially_read(part: Vec) -> Self { Self { @@ -165,12 +165,15 @@ impl FrameCodec { Ok(Some(frame)) } - /// Add a frame to the end of the output buffer. - pub(super) fn queue_frame(&mut self, frame: Frame) + /// Write a frame to the provided stream. + pub(super) fn write_frame(&mut self, stream: &mut Stream, frame: Frame) -> Result<()> + where + Stream: Write, { trace!("writing frame {}", frame); self.out_buffer.reserve(frame.len()); frame.format(&mut self.out_buffer).expect("Bug: can't write to vector"); + self.write_pending(stream) } /// Complete pending write, if any. @@ -238,28 +241,10 @@ mod tests { let mut sock = FrameSocket::new(Vec::new()); let frame = Frame::ping(vec![0x04, 0x05]); - sock.queue_frame(frame); - sock.write_pending().unwrap(); - - let frame = Frame::pong(vec![0x01]); - sock.queue_frame(frame); - sock.write_pending().unwrap(); - - let (buf, _) = sock.into_inner(); - assert_eq!(buf, vec![0x89, 0x02, 0x04, 0x05, 0x8a, 0x01, 0x01]); - } - - #[test] - fn queue_frames() { - let mut sock = FrameSocket::new(Vec::new()); - - let frame = Frame::ping(vec![0x04, 0x05]); - sock.queue_frame(frame); + sock.write_frame(frame).unwrap(); let frame = Frame::pong(vec![0x01]); - sock.queue_frame(frame); - - sock.write_pending().unwrap(); + sock.write_frame(frame).unwrap(); let (buf, _) = sock.into_inner(); assert_eq!(buf, vec![0x89, 0x02, 0x04, 0x05, 0x8a, 0x01, 0x01]); diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index dfc0d485..a3fa9c23 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -190,41 +190,7 @@ impl WebSocket { /// (consider these fatal except for WouldBlock). /// - [Error::Capacity] if your message size is bigger than the configured max message size. pub fn write_message(&mut self, message: Message) -> Result<()> { - self.context.queue_message(&mut self.socket, message)?; - self.context.write_pending(&mut self.socket) - } - - - /// Add a message to the send queue. Differs from write_message in that this method does not - /// flush queued messages to the stream. - /// - /// WebSocket will buffer a configurable number of messages at a time, except to reply to Ping - /// requests. A Pong reply will jump the queue because the - /// [websocket RFC](https://tools.ietf.org/html/rfc6455#section-5.5.2) specifies it should be sent - /// as soon as is practical. - /// - /// Note that upon receiving a ping message, tungstenite cues a pong reply automatically. - /// When you call either `read_message`, `write_message` or `write_pending` next it will try to send - /// that pong out if the underlying connection can take more data. This means you should not - /// respond to ping frames manually. - /// - /// You can however send pong frames manually in order to indicate a unidirectional heartbeat - /// as described in [RFC 6455](https://tools.ietf.org/html/rfc6455#section-5.5.3). Note that - /// if `read_message` returns a ping, you should call `write_pending` until it doesn't return - /// WouldBlock before passing a pong to `write_message`, otherwise the response to the - /// ping will not be sent, but rather replaced by your custom pong message. - /// - /// ## Errors - /// - If the WebSocket's send queue is full, `SendQueueFull` will be returned - /// along with the passed message. Otherwise, the message is queued and Ok(()) is returned. - /// - If the connection is closed and should be dropped, this will return [Error::ConnectionClosed]. - /// - If you try again after [Error::ConnectionClosed] was returned either from here or from `read_message`, - /// [Error::AlreadyClosed] will be returned. This indicates a program error on your part. - /// - [Error::Io] is returned if the underlying connection returns an error - /// (consider these fatal except for WouldBlock). - /// - [Error::Capacity] if your message size is bigger than the configured max message size. - pub fn queue_message(&mut self, message: Message) -> Result<()> { - self.context.queue_message(&mut self.socket, message) + self.context.write_message(&mut self.socket, message) } /// Flush the pending send queue. @@ -348,7 +314,7 @@ impl WebSocketContext { } } - /// Queue up a message without flushing to the stream. + /// Send a message to the provided stream, if possible. /// /// WebSocket will buffer a configurable number of messages at a time, except to reply to Ping /// and Close requests. If the WebSocket's send queue is full, `SendQueueFull` will be returned @@ -356,7 +322,7 @@ impl WebSocketContext { /// /// Note that only the last pong frame is stored to be sent, and only the /// most recent pong frame is sent if multiple pong frames are queued. - pub fn queue_message(&mut self, stream: &mut Stream, message: Message) -> Result<()> + pub fn write_message(&mut self, stream: &mut Stream, message: Message) -> Result<()> where Stream: Read + Write, { @@ -394,7 +360,7 @@ impl WebSocketContext { }; self.send_queue.push_back(frame); - Ok(()) + self.write_pending(stream) } /// Flush the pending send queue. @@ -402,22 +368,20 @@ impl WebSocketContext { where Stream: Read + Write, { + // First, make sure we have no pending frame sending. + self.frame.write_pending(stream)?; + // Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in // response, unless it already received a Close frame. It SHOULD // respond with Pong frame as soon as is practical. (RFC 6455) if let Some(pong) = self.pong.take() { trace!("Sending pong reply"); - self.queue_one_frame(pong); + self.send_one_frame(stream, pong)?; } - // If we have any frames queued, add them to the outgoing buffer + // If we have any unsent frames, send them. trace!("Frames still in queue: {}", self.send_queue.len()); while let Some(data) = self.send_queue.pop_front() { - self.queue_one_frame(data); - } - - // if the outgoing buffer isn't empty then try write some data - if self.frame.has_unsent() { - self.frame.write_pending(stream)?; + self.send_one_frame(stream, data)?; } // If we get to this point, the send queue is empty and the underlying socket is still @@ -624,8 +588,10 @@ impl WebSocketContext { } } - /// Add a single frame to the end of the output buffer. - fn queue_one_frame(&mut self, mut frame: Frame) + /// Send a single pending frame. + fn send_one_frame(&mut self, stream: &mut Stream, mut frame: Frame) -> Result<()> + where + Stream: Read + Write, { match self.role { Role::Server => {} @@ -637,7 +603,7 @@ impl WebSocketContext { } trace!("Sending frame: {:?}", frame); - self.frame.queue_frame(frame); + self.frame.write_frame(stream, frame).check_connection_reset(self.state) } }