diff --git a/socketioxide/src/ack.rs b/socketioxide/src/ack.rs index 02f1cc98..52757013 100644 --- a/socketioxide/src/ack.rs +++ b/socketioxide/src/ack.rs @@ -18,10 +18,12 @@ use futures::{ Future, Stream, }; use serde::de::DeserializeOwned; -use serde_json::Value; use tokio::{sync::oneshot::Receiver, time::Timeout}; -use crate::{adapter::Adapter, errors::AckError, extract::SocketRef, packet::Packet, SocketError}; +use crate::{ + adapter::Adapter, errors::AckError, extract::SocketRef, packet::Packet, + payload_value::PayloadValue, SocketError, +}; /// An acknowledgement sent by the client. /// It contains the data sent by the client and the binary payloads if there are any. @@ -29,12 +31,9 @@ use crate::{adapter::Adapter, errors::AckError, extract::SocketRef, packet::Pack pub struct AckResponse { /// The data returned by the client pub data: T, - /// Optional binary payloads. - /// If there is no binary payload, the `Vec` will be empty - pub binary: Vec>, } -pub(crate) type AckResult = Result, AckError<()>>; +pub(crate) type AckResult = Result, AckError<()>>; pin_project_lite::pin_project! { /// A [`Future`] of [`AckResponse`] received from the client with its corresponding [`Sid`]. @@ -127,12 +126,12 @@ pin_project_lite::pin_project! { pub enum AckInnerStream { Stream { #[pin] - rxs: FuturesUnordered>, + rxs: FuturesUnordered>, }, Fut { #[pin] - rx: AckResultWithId, + rx: AckResultWithId, polled: bool, }, } @@ -171,7 +170,7 @@ impl AckInnerStream { /// Creates a new [`AckInnerStream`] from a [`oneshot::Receiver`](tokio) corresponding to the acknowledgement /// of a single socket. - pub fn send(rx: Receiver>, duration: Duration, id: Sid) -> Self { + pub fn send(rx: Receiver>, duration: Duration, id: Sid) -> Self { AckInnerStream::Fut { polled: false, rx: AckResultWithId { @@ -183,7 +182,7 @@ impl AckInnerStream { } impl Stream for AckInnerStream { - type Item = (Sid, AckResult); + type Item = (Sid, AckResult); fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { use InnerProj::*; @@ -221,7 +220,7 @@ impl FusedStream for AckInnerStream { } impl Future for AckInnerStream { - type Output = AckResult; + type Output = AckResult; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match self.as_mut().poll_next(cx) { @@ -295,13 +294,11 @@ impl From for AckStream { } } -fn map_ack_response(ack: AckResult) -> AckResult { +fn map_ack_response(ack: AckResult) -> AckResult { ack.and_then(|v| { - serde_json::from_value(v.data) - .map(|data| AckResponse { - data, - binary: v.binary, - }) + v.data + .into_data::() + .map(|data| AckResponse { data }) .map_err(|e| e.into()) }) } @@ -328,12 +325,12 @@ mod test { async fn broadcast_ack() { let socket = create_socket(); let socket2 = create_socket(); - let mut packet = Packet::event("/", "test", "test".into()); + let mut packet = Packet::event("/", "test", PayloadValue::from_data("test").unwrap()); packet.inner.set_ack_id(1); let socks = vec![socket.clone().into(), socket2.clone().into()]; let stream: AckStream = AckInnerStream::broadcast(packet, socks, None).into(); - let res_packet = Packet::ack("test", "test".into(), 1); + let res_packet = Packet::ack("test", PayloadValue::from_data("test").unwrap(), 1); socket.recv(res_packet.inner.clone()).unwrap(); socket2.recv(res_packet.inner).unwrap(); @@ -351,8 +348,7 @@ mod test { let stream: AckStream = AckInnerStream::send(rx, Duration::from_secs(1), sid).into(); tx.send(Ok(AckResponse { - data: Value::String("test".into()), - binary: vec![], + data: PayloadValue::String("test".into()), })) .unwrap(); @@ -372,8 +368,7 @@ mod test { let stream: AckStream = AckInnerStream::send(rx, Duration::from_secs(1), sid).into(); tx.send(Ok(AckResponse { - data: Value::String("test".into()), - binary: vec![], + data: PayloadValue::String("test".into()), })) .unwrap(); @@ -384,12 +379,12 @@ mod test { async fn broadcast_ack_with_deserialize_error() { let socket = create_socket(); let socket2 = create_socket(); - let mut packet = Packet::event("/", "test", "test".into()); + let mut packet = Packet::event("/", "test", PayloadValue::from_data("test").unwrap()); packet.inner.set_ack_id(1); let socks = vec![socket.clone().into(), socket2.clone().into()]; let stream: AckStream = AckInnerStream::broadcast(packet, socks, None).into(); - let res_packet = Packet::ack("test", 132.into(), 1); + let res_packet = Packet::ack("test", PayloadValue::from_data(132).unwrap(), 1); socket.recv(res_packet.inner.clone()).unwrap(); socket2.recv(res_packet.inner).unwrap(); @@ -413,8 +408,7 @@ mod test { let stream: AckStream = AckInnerStream::send(rx, Duration::from_secs(1), sid).into(); tx.send(Ok(AckResponse { - data: Value::Bool(true), - binary: vec![], + data: PayloadValue::Bool(true), })) .unwrap(); assert_eq!(stream.size_hint().0, 1); @@ -436,8 +430,7 @@ mod test { let stream: AckStream = AckInnerStream::send(rx, Duration::from_secs(1), sid).into(); tx.send(Ok(AckResponse { - data: Value::Bool(true), - binary: vec![], + data: PayloadValue::Bool(true), })) .unwrap(); @@ -448,12 +441,12 @@ mod test { async fn broadcast_ack_with_closed_socket() { let socket = create_socket(); let socket2 = create_socket(); - let mut packet = Packet::event("/", "test", "test".into()); + let mut packet = Packet::event("/", "test", PayloadValue::from_data("test").unwrap()); packet.inner.set_ack_id(1); let socks = vec![socket.clone().into(), socket2.clone().into()]; let stream: AckStream = AckInnerStream::broadcast(packet, socks, None).into(); - let res_packet = Packet::ack("test", "test".into(), 1); + let res_packet = Packet::ack("test", PayloadValue::from_data("test").unwrap(), 1); socket.clone().recv(res_packet.inner.clone()).unwrap(); futures::pin_mut!(stream); @@ -503,14 +496,14 @@ mod test { async fn broadcast_ack_with_timeout() { let socket = create_socket(); let socket2 = create_socket(); - let mut packet = Packet::event("/", "test", "test".into()); + let mut packet = Packet::event("/", "test", PayloadValue::from_data("test").unwrap()); packet.inner.set_ack_id(1); let socks = vec![socket.clone().into(), socket2.clone().into()]; let stream: AckStream = AckInnerStream::broadcast(packet, socks, Some(Duration::from_millis(10))).into(); socket - .recv(Packet::ack("test", "test".into(), 1).inner) + .recv(Packet::ack("test", PayloadValue::from_data("test").unwrap(), 1).inner) .unwrap(); futures::pin_mut!(stream); diff --git a/socketioxide/src/errors.rs b/socketioxide/src/errors.rs index f7294789..2466e096 100644 --- a/socketioxide/src/errors.rs +++ b/socketioxide/src/errors.rs @@ -11,6 +11,9 @@ pub enum Error { #[error("invalid packet type")] InvalidPacketType, + #[error("invalid binary payload count")] + InvalidPayloadCount, + #[error("invalid event name")] InvalidEventName, @@ -165,9 +168,10 @@ impl From<&Error> for Option { use EIoDisconnectReason::*; match value { Error::SocketGone(_) => Some(TransportClose), - Error::Serialize(_) | Error::InvalidPacketType | Error::InvalidEventName => { - Some(PacketParsingError) - } + Error::Serialize(_) + | Error::InvalidPacketType + | Error::InvalidEventName + | Error::InvalidPayloadCount => Some(PacketParsingError), Error::Adapter(_) | Error::InvalidNamespace => None, } } diff --git a/socketioxide/src/handler/extract.rs b/socketioxide/src/handler/extract.rs index 7ee9c16e..594b4bc6 100644 --- a/socketioxide/src/handler/extract.rs +++ b/socketioxide/src/handler/extract.rs @@ -25,6 +25,7 @@ //! # use socketioxide::handler::{FromConnectParts, FromMessageParts}; //! # use socketioxide::adapter::Adapter; //! # use socketioxide::socket::Socket; +//! # use socketioxide::PayloadValue; //! # use std::sync::Arc; //! # use std::convert::Infallible; //! # use socketioxide::SocketIo; @@ -61,8 +62,7 @@ //! //! fn from_message_parts( //! s: &Arc>, -//! _: &mut serde_json::Value, -//! _: &mut Vec>, +//! _: &mut PayloadValue, //! _: &Option, //! ) -> Result { //! // In a real app it would be better to parse the query params with a crate like `url` @@ -89,6 +89,7 @@ use super::message::FromMessageParts; use super::FromDisconnectParts; use super::{connect::FromConnectParts, message::FromMessage}; use crate::errors::{DisconnectError, SendError}; +use crate::payload_value::PayloadValue; use crate::socket::DisconnectReason; use crate::{ adapter::{Adapter, LocalAdapter}, @@ -96,16 +97,15 @@ use crate::{ socket::Socket, }; use serde::{de::DeserializeOwned, Serialize}; -use serde_json::Value; #[cfg(feature = "state")] #[cfg_attr(docsrs, doc(cfg(feature = "state")))] pub use state_extract::*; /// Utility function to unwrap an array with a single element -fn upwrap_array(v: &mut Value) { +fn upwrap_array(v: &mut PayloadValue) { match v { - Value::Array(vec) if vec.len() == 1 => { + PayloadValue::Array(vec) if vec.len() == 1 => { *v = vec.pop().unwrap(); } _ => (), @@ -137,12 +137,11 @@ where type Error = serde_json::Error; fn from_message_parts( _: &Arc>, - v: &mut serde_json::Value, - _: &mut Vec>, + v: &mut PayloadValue, _: &Option, ) -> Result { upwrap_array(v); - serde_json::from_value(v.clone()).map(Data) + v.clone().into_data::().map(Data) } } @@ -171,12 +170,11 @@ where type Error = Infallible; fn from_message_parts( _: &Arc>, - v: &mut serde_json::Value, - _: &mut Vec>, + v: &mut PayloadValue, _: &Option, ) -> Result { upwrap_array(v); - Ok(TryData(serde_json::from_value(v.clone()))) + Ok(TryData(v.clone().into_data::())) } } /// An Extractor that returns a reference to a [`Socket`]. @@ -193,8 +191,7 @@ impl FromMessageParts for SocketRef { type Error = Infallible; fn from_message_parts( s: &Arc>, - _: &mut serde_json::Value, - _: &mut Vec>, + _: &mut PayloadValue, _: &Option, ) -> Result { Ok(SocketRef(s.clone())) @@ -244,11 +241,10 @@ impl FromMessage for Bin { type Error = Infallible; fn from_message( _: Arc>, - _: serde_json::Value, - bin: Vec>, + mut v: PayloadValue, _: Option, ) -> Result { - Ok(Bin(bin)) + Ok(Bin(v.extract_binary_payloads())) } } @@ -256,7 +252,6 @@ impl FromMessage for Bin { /// If the client sent a normal message without expecting an ack, the ack callback will do nothing. #[derive(Debug)] pub struct AckSender { - binary: Vec>, socket: Arc>, ack_id: Option, } @@ -264,8 +259,7 @@ impl FromMessageParts for AckSender { type Error = Infallible; fn from_message_parts( s: &Arc>, - _: &mut serde_json::Value, - _: &mut Vec>, + _: &mut PayloadValue, ack_id: &Option, ) -> Result { Ok(Self::new(s.clone(), *ack_id)) @@ -273,24 +267,16 @@ impl FromMessageParts for AckSender { } impl AckSender { pub(crate) fn new(socket: Arc>, ack_id: Option) -> Self { - Self { - binary: vec![], - socket, - ack_id, - } - } - - /// Add binary data to the ack response. - pub fn bin(mut self, bin: Vec>) -> Self { - self.binary = bin; - self + Self { socket, ack_id } } /// Send the ack response to the client. - pub fn send(self, data: T) -> Result<(), SendError> { + pub fn send(self, data: T) -> Result<(), SendError> { use crate::socket::PermitIteratorExt; if let Some(ack_id) = self.ack_id { - let permits = match self.socket.reserve(1 + self.binary.len()) { + let data = PayloadValue::from_data(data)?; + let payload_count = data.count_payloads(); + let permits = match self.socket.reserve(1 + payload_count) { Ok(permits) => permits, Err(e) => { #[cfg(feature = "tracing")] @@ -299,11 +285,10 @@ impl AckSender { } }; let ns = self.socket.ns(); - let data = serde_json::to_value(data)?; - let packet = if self.binary.is_empty() { + let packet = if payload_count == 0 { Packet::ack(ns, data, ack_id) } else { - Packet::bin_ack(ns, data, self.binary, ack_id) + Packet::bin_ack(ns, data, ack_id) }; permits.emit(packet); Ok(()) @@ -323,8 +308,7 @@ impl FromMessageParts for crate::ProtocolVersion { type Error = Infallible; fn from_message_parts( s: &Arc>, - _: &mut serde_json::Value, - _: &mut Vec>, + _: &mut PayloadValue, _: &Option, ) -> Result { Ok(s.protocol()) @@ -347,8 +331,7 @@ impl FromMessageParts for crate::TransportType { type Error = Infallible; fn from_message_parts( s: &Arc>, - _: &mut serde_json::Value, - _: &mut Vec>, + _: &mut PayloadValue, _: &Option, ) -> Result { Ok(s.transport_type()) @@ -442,8 +425,7 @@ mod state_extract { type Error = StateNotFound; fn from_message_parts( _: &Arc>, - _: &mut serde_json::Value, - _: &mut Vec>, + _: &mut PayloadValue, _: &Option, ) -> Result { get_state::().map(State).ok_or(StateNotFound) diff --git a/socketioxide/src/handler/message.rs b/socketioxide/src/handler/message.rs index a4501e14..8f37092b 100644 --- a/socketioxide/src/handler/message.rs +++ b/socketioxide/src/handler/message.rs @@ -60,7 +60,7 @@ //! # use serde_json::Error; //! # use socketioxide::extract::*; //! // async named event handler -//! async fn on_event(s: SocketRef, Data(data): Data, ack: AckSender) { +//! async fn on_event(s: SocketRef, Data(data): Data, ack: AckSender) { //! tokio::time::sleep(std::time::Duration::from_secs(1)).await; //! ack.send("Here is my acknowledgment!").ok(); //! } @@ -74,9 +74,9 @@ use std::sync::Arc; use futures::Future; -use serde_json::Value; use crate::adapter::Adapter; +use crate::payload_value::PayloadValue; use crate::socket::Socket; use super::MakeErasedHandler; @@ -85,7 +85,7 @@ use super::MakeErasedHandler; pub(crate) type BoxedMessageHandler = Box>; pub(crate) trait ErasedMessageHandler: Send + Sync + 'static { - fn call(&self, s: Arc>, v: Value, p: Vec>, ack_id: Option); + fn call(&self, s: Arc>, v: PayloadValue, ack_id: Option); } /// Define a handler for the connect event. @@ -101,7 +101,7 @@ pub(crate) trait ErasedMessageHandler: Send + Sync + 'static { )] pub trait MessageHandler: Send + Sync + 'static { /// Call the handler with the given arguments - fn call(&self, s: Arc>, v: Value, p: Vec>, ack_id: Option); + fn call(&self, s: Arc>, v: PayloadValue, ack_id: Option); #[doc(hidden)] fn phantom(&self) -> std::marker::PhantomData { @@ -127,8 +127,8 @@ where A: Adapter, { #[inline(always)] - fn call(&self, s: Arc>, v: Value, p: Vec>, ack_id: Option) { - self.handler.call(s, v, p, ack_id); + fn call(&self, s: Arc>, v: PayloadValue, ack_id: Option) { + self.handler.call(s, v, ack_id); } } @@ -164,8 +164,7 @@ pub trait FromMessageParts: Sized { /// If it fails, the handler is not called. fn from_message_parts( s: &Arc>, - v: &mut Value, - p: &mut Vec>, + v: &mut PayloadValue, ack_id: &Option, ) -> Result; } @@ -189,8 +188,7 @@ pub trait FromMessage: Sized { /// If it fails, the handler is not called fn from_message( s: Arc>, - v: Value, - p: Vec>, + v: PayloadValue, ack_id: Option, ) -> Result; } @@ -204,11 +202,10 @@ where type Error = T::Error; fn from_message( s: Arc>, - mut v: Value, - mut p: Vec>, + mut v: PayloadValue, ack_id: Option, ) -> Result { - Self::from_message_parts(&s, &mut v, &mut p, &ack_id) + Self::from_message_parts(&s, &mut v, &ack_id) } } @@ -219,7 +216,7 @@ where Fut: Future + Send + 'static, A: Adapter, { - fn call(&self, _: Arc>, _: Value, _: Vec>, _: Option) { + fn call(&self, _: Arc>, _: PayloadValue, _: Option) { let fut = (self.clone())(); tokio::spawn(fut); } @@ -231,7 +228,7 @@ where F: FnOnce() + Send + Sync + Clone + 'static, A: Adapter, { - fn call(&self, _: Arc>, _: Value, _: Vec>, _: Option) { + fn call(&self, _: Arc>, _: PayloadValue, _: Option) { (self.clone())(); } } @@ -249,9 +246,9 @@ macro_rules! impl_async_handler { $( $ty: FromMessageParts + Send, )* $last: FromMessage + Send, { - fn call(&self, s: Arc>, mut v: Value, mut p: Vec>, ack_id: Option) { + fn call(&self, s: Arc>, mut v: PayloadValue, ack_id: Option) { $( - let $ty = match $ty::from_message_parts(&s, &mut v, &mut p, &ack_id) { + let $ty = match $ty::from_message_parts(&s, &mut v, &ack_id) { Ok(v) => v, Err(_e) => { #[cfg(feature = "tracing")] @@ -260,7 +257,7 @@ macro_rules! impl_async_handler { }, }; )* - let last = match $last::from_message(s, v, p, ack_id) { + let last = match $last::from_message(s, v, ack_id) { Ok(v) => v, Err(_e) => { #[cfg(feature = "tracing")] @@ -287,14 +284,14 @@ macro_rules! impl_handler { $( $ty: FromMessageParts + Send, )* $last: FromMessage + Send, { - fn call(&self, s: Arc>, mut v: Value, mut p: Vec>, ack_id: Option) { + fn call(&self, s: Arc>, mut v: PayloadValue, ack_id: Option) { $( - let $ty = match $ty::from_message_parts(&s, &mut v, &mut p, &ack_id) { + let $ty = match $ty::from_message_parts(&s, &mut v, &ack_id) { Ok(v) => v, Err(_) => return, }; )* - let last = match $last::from_message(s, v, p, ack_id) { + let last = match $last::from_message(s, v, ack_id) { Ok(v) => v, Err(_) => return, }; diff --git a/socketioxide/src/io.rs b/socketioxide/src/io.rs index 21dfe6e7..08712c81 100644 --- a/socketioxide/src/io.rs +++ b/socketioxide/src/io.rs @@ -535,34 +535,6 @@ impl SocketIo { self.get_default_op().timeout(timeout) } - /// Adds a binary payload to the message. - /// - /// Alias for `io.of("/").unwrap().bin(binary_payload)` - /// - /// ## Panics - /// If the **default namespace "/" is not found** this fn will panic! - /// - /// ## Example - /// ``` - /// # use socketioxide::{SocketIo, extract::SocketRef}; - /// # use serde_json::Value; - /// let (_, io) = SocketIo::new_svc(); - /// io.ns("/", |socket: SocketRef| { - /// println!("Socket connected on / namespace with id: {}", socket.id); - /// }); - /// - /// // Later in your code you can emit a test message on the root namespace in the room1 and room3 rooms, - /// // except for the room2 with a binary payload - /// io.to("room1") - /// .to("room3") - /// .except("room2") - /// .bin(vec![vec![1, 2, 3, 4]]) - /// .emit("test", ()); - #[inline] - pub fn bin(&self, binary: Vec>) -> BroadcastOperators { - self.get_default_op().bin(binary) - } - /// Emits a message to all sockets selected with the previous operators. /// /// Alias for `io.of("/").unwrap().emit(event, data)` diff --git a/socketioxide/src/operators.rs b/socketioxide/src/operators.rs index b3c35ec7..cfd5778a 100644 --- a/socketioxide/src/operators.rs +++ b/socketioxide/src/operators.rs @@ -15,6 +15,7 @@ use crate::ack::{AckInnerStream, AckStream}; use crate::adapter::LocalAdapter; use crate::errors::{BroadcastError, DisconnectError}; use crate::extract::SocketRef; +use crate::payload_value::PayloadValue; use crate::socket::Socket; use crate::SendError; use crate::{ @@ -103,13 +104,11 @@ impl RoomParam for Sid { /// Chainable operators to configure the message to be sent. pub struct ConfOperators<'a, A: Adapter = LocalAdapter> { - binary: Vec>, timeout: Option, socket: &'a Socket, } /// Chainable operators to select sockets to send a message to and to configure the message to be sent. pub struct BroadcastOperators { - binary: Vec>, timeout: Option, ns: Arc>, opts: BroadcastOptions, @@ -122,7 +121,6 @@ impl From> for BroadcastOperators { ..Default::default() }; Self { - binary: conf.binary, timeout: conf.timeout, ns: conf.socket.ns.clone(), opts, @@ -134,7 +132,6 @@ impl From> for BroadcastOperators { impl<'a, A: Adapter> ConfOperators<'a, A> { pub(crate) fn new(sender: &'a Socket) -> Self { Self { - binary: vec![], timeout: None, socket: sender, } @@ -283,23 +280,6 @@ impl<'a, A: Adapter> ConfOperators<'a, A> { self.timeout = Some(timeout); self } - - /// Adds a binary payload to the message. - /// #### Example - /// ``` - /// # use socketioxide::{SocketIo, extract::*}; - /// # use serde_json::Value; - /// let (_, io) = SocketIo::new_svc(); - /// io.ns("/", |socket: SocketRef| { - /// socket.on("test", |socket: SocketRef, Data::(data), Bin(bin)| async move { - /// // This will send the binary payload received to all sockets in this namespace with the test message - /// socket.bin(bin).emit("test", data); - /// }); - /// }); - pub fn bin(mut self, binary: Vec>) -> Self { - self.binary = binary; - self - } } // ==== impl ConfOperators consume fns ==== @@ -344,9 +324,10 @@ impl ConfOperators<'_, A> { mut self, event: impl Into>, data: T, - ) -> Result<(), SendError> { + ) -> Result<(), SendError> { use crate::socket::PermitIteratorExt; - let permits = match self.socket.reserve(1 + self.binary.len()) { + let data = PayloadValue::from_data(data)?; + let permits = match self.socket.reserve(1 + data.count_payloads()) { Ok(permits) => permits, Err(e) => { #[cfg(feature = "tracing")] @@ -415,8 +396,10 @@ impl ConfOperators<'_, A> { mut self, event: impl Into>, data: T, - ) -> Result, SendError> { - let permits = match self.socket.reserve(1 + self.binary.len()) { + ) -> Result, SendError> { + let data = PayloadValue::from_data(data)?; + let payload_count = data.count_payloads(); + let permits = match self.socket.reserve(1 + payload_count) { Ok(permits) => permits, Err(e) => { #[cfg(feature = "tracing")] @@ -475,12 +458,12 @@ impl ConfOperators<'_, A> { data: impl serde::Serialize, ) -> Result, serde_json::Error> { let ns = self.socket.ns.path.clone(); - let data = serde_json::to_value(data)?; - let packet = if self.binary.is_empty() { + let data = PayloadValue::from_data(data)?; + let payload_count = data.count_payloads(); + let packet = if payload_count == 0 { Packet::event(ns, event.into(), data) } else { - let binary = std::mem::take(&mut self.binary); - Packet::bin_event(ns, event.into(), data, binary) + Packet::bin_event(ns, event.into(), data) }; Ok(packet) } @@ -489,7 +472,6 @@ impl ConfOperators<'_, A> { impl BroadcastOperators { pub(crate) fn new(ns: Arc>) -> Self { Self { - binary: vec![], timeout: None, ns, opts: BroadcastOptions::default(), @@ -497,7 +479,6 @@ impl BroadcastOperators { } pub(crate) fn from_sock(ns: Arc>, sid: Sid) -> Self { Self { - binary: vec![], timeout: None, ns, opts: BroadcastOptions { @@ -655,23 +636,6 @@ impl BroadcastOperators { self.timeout = Some(timeout); self } - - /// Adds a binary payload to the message. - /// #### Example - /// ``` - /// # use socketioxide::{SocketIo, extract::*}; - /// # use serde_json::Value; - /// let (_, io) = SocketIo::new_svc(); - /// io.ns("/", |socket: SocketRef| { - /// socket.on("test", |socket: SocketRef, Data::(data), Bin(bin)| async move { - /// // This will send the binary payload received to all sockets in this namespace with the test message - /// socket.bin(bin).emit("test", data); - /// }); - /// }); - pub fn bin(mut self, binary: Vec>) -> Self { - self.binary = binary; - self - } } // ==== impl BroadcastOperators consume fns ==== @@ -886,12 +850,12 @@ impl BroadcastOperators { data: impl serde::Serialize, ) -> Result, serde_json::Error> { let ns = self.ns.path.clone(); - let data = serde_json::to_value(data)?; - let packet = if self.binary.is_empty() { + let data = PayloadValue::from_data(data)?; + let payload_count = data.count_payloads(); + let packet = if payload_count == 0 { Packet::event(ns, event.into(), data) } else { - let binary = std::mem::take(&mut self.binary); - Packet::bin_event(ns, event.into(), data, binary) + Packet::bin_event(ns, event.into(), data) }; Ok(packet) } diff --git a/socketioxide/src/packet.rs b/socketioxide/src/packet.rs index 8c8b45c3..cac4697f 100644 --- a/socketioxide/src/packet.rs +++ b/socketioxide/src/packet.rs @@ -3,9 +3,8 @@ //! It should not be used directly except when implementing the [`Adapter`](crate::adapter::Adapter) trait. use std::borrow::Cow; -use crate::ProtocolVersion; +use crate::{payload_value::PayloadValue, ProtocolVersion}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use serde_json::{json, Value}; use crate::errors::Error; use engineioxide::sid::Sid; @@ -78,7 +77,11 @@ impl<'a> Packet<'a> { } /// Create an event packet for the given namespace - pub fn event(ns: impl Into>, e: impl Into>, data: Value) -> Self { + pub fn event( + ns: impl Into>, + e: impl Into>, + data: PayloadValue, + ) -> Self { Self { inner: PacketData::Event(e.into(), data, None), ns: ns.into(), @@ -89,12 +92,9 @@ impl<'a> Packet<'a> { pub fn bin_event( ns: impl Into>, e: impl Into>, - data: Value, - bin: Vec>, + data: PayloadValue, ) -> Self { - debug_assert!(!bin.is_empty()); - - let packet = BinaryPacket::outgoing(data, bin); + let packet = BinaryPacket::outgoing(data); Self { inner: PacketData::BinaryEvent(e.into(), packet, None), ns: ns.into(), @@ -102,7 +102,7 @@ impl<'a> Packet<'a> { } /// Create an ack packet for the given namespace - pub fn ack(ns: &'a str, data: Value, ack: i64) -> Self { + pub fn ack(ns: &'a str, data: PayloadValue, ack: i64) -> Self { Self { inner: PacketData::EventAck(data, ack), ns: Cow::Borrowed(ns), @@ -110,9 +110,8 @@ impl<'a> Packet<'a> { } /// Create a binary ack packet for the given namespace - pub fn bin_ack(ns: &'a str, data: Value, bin: Vec>, ack: i64) -> Self { - debug_assert!(!bin.is_empty()); - let packet = BinaryPacket::outgoing(data, bin); + pub fn bin_ack(ns: &'a str, data: PayloadValue, ack: i64) -> Self { + let packet = BinaryPacket::outgoing(data); Self { inner: PacketData::BinaryAck(packet, ack), ns: Cow::Borrowed(ns), @@ -184,9 +183,9 @@ pub enum PacketData<'a> { /// Disconnect packet, used to disconnect from a namespace Disconnect, /// Event packet with optional ack id, to request an ack from the other side - Event(Cow<'a, str>, Value, Option), + Event(Cow<'a, str>, PayloadValue, Option), /// Event ack packet, to acknowledge an event - EventAck(Value, i64), + EventAck(PayloadValue, i64), /// Connect error packet, sent when the namespace is invalid ConnectError, /// Binary event packet with optional ack id, to request an ack from the other side @@ -199,11 +198,11 @@ pub enum PacketData<'a> { #[derive(Debug, Clone, PartialEq, Eq)] pub struct BinaryPacket { /// Data related to the packet - pub data: Value, - /// Binary payload - pub bin: Vec>, + pub(crate) data: PayloadValue, /// The number of expected payloads (used when receiving data) payload_count: usize, + /// A place to receive binary payloads until the packet is complete + payloads: Vec>, } impl<'a> PacketData<'a> { @@ -248,63 +247,88 @@ impl<'a> PacketData<'a> { } impl BinaryPacket { - /// Create a binary packet from incoming data and remove all placeholders and get the payload count - pub fn incoming(mut data: Value) -> Self { - let payload_count = match &mut data { - Value::Array(ref mut v) => { - let count = v.len(); - v.retain(|v| v.as_object().and_then(|o| o.get("_placeholder")).is_none()); - count - v.len() - } - val => { - if val - .as_object() - .and_then(|o| o.get("_placeholder")) - .is_some() - { - data = Value::Array(vec![]); - 1 - } else { - 0 - } - } - }; + /// Create a binary packet from incoming data + fn incoming(data: PayloadValue, payload_count: usize) -> Self { + let actual_payload_count = data.count_payloads(); + if payload_count != actual_payload_count { + #[cfg(feature = "tracing")] + tracing::warn!( + "Binary packet header claimed {} payloads, but found {} placeholders", + payload_count, + actual_payload_count + ); + } Self { data, - bin: Vec::new(), - payload_count, + payload_count: actual_payload_count, + payloads: vec![], } } - /// Create a binary packet from outgoing data and a payload - pub fn outgoing(data: Value, bin: Vec>) -> Self { - let mut data = match data { - Value::Array(v) => Value::Array(v), - d => Value::Array(vec![d]), + /// Create a binary packet from outgoing data + fn outgoing(data: PayloadValue) -> Self { + let data = match data { + arr @ PayloadValue::Array(_) => arr, + d => PayloadValue::Array(vec![d]), }; - let payload_count = bin.len(); - (0..payload_count).for_each(|i| { - data.as_array_mut().unwrap().push(json!({ - "_placeholder": true, - "num": i - })) - }); + let payload_count = data.count_payloads(); + Self { data, - bin, payload_count, + payloads: vec![], } } /// Add a payload to the binary packet, when all payloads are added, /// the packet is complete and can be further processed pub fn add_payload(&mut self, payload: Vec) { - self.bin.push(payload); + if self.is_complete() { + #[cfg(feature = "tracing")] + tracing::warn!("Attempt to add payload to already-complete binary packet"); + } else { + self.payloads.push(payload); + + if self.is_complete() { + integrate_payloads(&mut self.data, &mut self.payloads); + } + } } + /// Check if the binary packet is complete, it means that all payloads have been received pub fn is_complete(&self) -> bool { - self.payload_count == self.bin.len() + self.payload_count == self.payloads.len() + } + + pub(crate) fn extract_payloads(&mut self) -> Vec> { + self.data.extract_binary_payloads() + } +} + +fn integrate_payloads(data: &mut PayloadValue, payloads: &mut Vec>) { + match data { + PayloadValue::Binary(n, ref mut data) => { + if let Some(payload) = payloads.get_mut(*n) { + std::mem::swap(payload, data); + } else { + #[cfg(feature = "tracing")] + tracing::warn!( + "Binary packet structure included placeholder with num out of range" + ); + } + } + PayloadValue::Object(o) => { + for value in o.values_mut() { + integrate_payloads(value, payloads); + } + } + PayloadValue::Array(a) => { + for value in a.iter_mut() { + integrate_payloads(value, payloads); + } + } + _ => (), } } @@ -318,11 +342,11 @@ impl<'a> From> for String { Event(e, data, _) | BinaryEvent(e, BinaryPacket { data, .. }, _) => { // Expand the packet if it is an array with data -> ["event", ...data] let packet = match data { - Value::Array(ref mut v) if !v.is_empty() => { - v.insert(0, Value::String((*e).to_string())); + PayloadValue::Array(ref mut v) if !v.is_empty() => { + v.insert(0, PayloadValue::String((*e).to_string())); serde_json::to_string(&v) } - Value::Array(_) => serde_json::to_string::<(_, [(); 0])>(&(e, [])), + PayloadValue::Array(_) => serde_json::to_string::<(_, [(); 0])>(&(e, [])), _ => serde_json::to_string(&(e, data)), } .unwrap(); @@ -331,8 +355,8 @@ impl<'a> From> for String { EventAck(data, _) | BinaryAck(BinaryPacket { data, .. }, _) => { // Enforce that the packet is an array -> [data] let packet = match data { - Value::Array(_) => serde_json::to_string(&data), - Value::Null => Ok("[]".to_string()), + PayloadValue::Array(_) => serde_json::to_string(&data), + PayloadValue::Null => Ok("[]".to_string()), _ => serde_json::to_string(&[data]), } .unwrap(); @@ -409,11 +433,11 @@ impl<'a> From> for String { /// ```text /// ["", ...] /// ``` -fn deserialize_event_packet(data: &str) -> Result<(String, Value), Error> { +fn deserialize_event_packet(data: &str) -> Result<(String, PayloadValue), Error> { #[cfg(feature = "tracing")] tracing::debug!("Deserializing event packet: {:?}", data); - let packet = match serde_json::from_str::(data)? { - Value::Array(packet) => packet, + let packet = match serde_json::from_str::(data)? { + PayloadValue::Array(packet) => packet, _ => return Err(Error::InvalidEventName), }; @@ -423,7 +447,7 @@ fn deserialize_event_packet(data: &str) -> Result<(String, Value), Error> { .as_str() .ok_or(Error::InvalidEventName)? .to_string(); - let payload = Value::from_iter(packet.into_iter().skip(1)); + let payload = PayloadValue::from_iter(packet.into_iter().skip(1)); Ok((event, payload)) } @@ -457,12 +481,18 @@ impl<'a> TryFrom for Packet<'a> { .ok_or(Error::InvalidPacketType)?; // Move the cursor to skip the payload count if it is a binary packet - if index == b'5' || index == b'6' { + let payload_count = if index == b'5' || index == b'6' { while chars.get(i) != Some(&b'-') { i += 1; } i += 1; - } + + std::str::from_utf8(&chars[1..(i - 1)]) + .map_err(|_| Error::InvalidPayloadCount) + .and_then(|s| s.parse::().map_err(|_| Error::InvalidPayloadCount))? + } else { + 0 + }; let start_index = i; // Custom nsps will start with a slash @@ -509,12 +539,16 @@ impl<'a> TryFrom for Packet<'a> { } b'5' => { let (event, payload) = deserialize_event_packet(data)?; - PacketData::BinaryEvent(event.into(), BinaryPacket::incoming(payload), ack) + PacketData::BinaryEvent( + event.into(), + BinaryPacket::incoming(payload, payload_count), + ack, + ) } b'6' => { let packet = deserialize_packet(data)?.ok_or(Error::InvalidPacketType)?; PacketData::BinaryAck( - BinaryPacket::incoming(packet), + BinaryPacket::incoming(packet, payload_count), ack.ok_or(Error::InvalidPacketType)?, ) } @@ -537,6 +571,21 @@ mod test { use super::*; + fn packet_data_map(key: &'static str, value: &'static str) -> PayloadValue { + PayloadValue::Object( + [(key.to_string(), PayloadValue::String(value.to_string()))] + .into_iter() + .collect(), + ) + } + + fn wrapped_packet_data_map(key: &'static str, value: &'static str) -> PayloadValue { + PayloadValue::Array(vec![ + packet_data_map(key, value), + PayloadValue::Binary(0, vec![1]), + ]) + } + #[test] fn packet_decode_connect() { let sid = Sid::new(); @@ -598,7 +647,11 @@ mod test { let packet = Packet::try_from(payload).unwrap(); assert_eq!( - Packet::event("/", "event", json!([{"data": "value"}])), + Packet::event( + "/", + "event", + PayloadValue::from_data(json!([{"data": "value"}])).unwrap() + ), packet ); @@ -606,7 +659,11 @@ mod test { let payload = format!("21{}", json!(["event", { "data": "value" }])); let packet = Packet::try_from(payload).unwrap(); - let mut comparison_packet = Packet::event("/", "event", json!([{"data": "value"}])); + let mut comparison_packet = Packet::event( + "/", + "event", + PayloadValue::from_data(json!([{"data": "value"}])).unwrap(), + ); comparison_packet.inner.set_ack_id(1); assert_eq!(packet, comparison_packet); @@ -615,7 +672,11 @@ mod test { let packet = Packet::try_from(payload).unwrap(); assert_eq!( - Packet::event("/admin™", "event", json!([{"data": "value™"}])), + Packet::event( + "/admin™", + "event", + PayloadValue::from_data(json!([{"data": "value™"}])).unwrap() + ), packet ); @@ -624,7 +685,11 @@ mod test { let mut packet = Packet::try_from(payload).unwrap(); packet.inner.set_ack_id(1); - let mut comparison_packet = Packet::event("/admin™", "event", json!([{"data": "value™"}])); + let mut comparison_packet = Packet::event( + "/admin™", + "event", + PayloadValue::from_data(json!([{"data": "value™"}])).unwrap(), + ); comparison_packet.inner.set_ack_id(1); assert_eq!(packet, comparison_packet); @@ -633,21 +698,32 @@ mod test { #[test] fn packet_encode_event() { let payload = format!("2{}", json!(["event", { "data": "value™" }])); - let packet: String = Packet::event("/", "event", json!({ "data": "value™" })) - .try_into() - .unwrap(); + let packet: String = Packet::event( + "/", + "event", + PayloadValue::from_data(json!({ "data": "value™" })).unwrap(), + ) + .try_into() + .unwrap(); assert_eq!(packet, payload); // Encode empty data let payload = format!("2{}", json!(["event", []])); - let packet: String = Packet::event("/", "event", json!([])).try_into().unwrap(); + let packet: String = + Packet::event("/", "event", PayloadValue::from_data(json!([])).unwrap()) + .try_into() + .unwrap(); assert_eq!(packet, payload); // Encode with ack ID let payload = format!("21{}", json!(["event", { "data": "value™" }])); - let mut packet = Packet::event("/", "event", json!({ "data": "value™" })); + let mut packet = Packet::event( + "/", + "event", + PayloadValue::from_data(json!({ "data": "value™" })).unwrap(), + ); packet.inner.set_ack_id(1); let packet: String = packet.try_into().unwrap(); @@ -655,15 +731,23 @@ mod test { // Encode with NS let payload = format!("2/admin™,{}", json!(["event", { "data": "value™" }])); - let packet: String = Packet::event("/admin™", "event", json!({"data": "value™"})) - .try_into() - .unwrap(); + let packet: String = Packet::event( + "/admin™", + "event", + PayloadValue::from_data(json!({"data": "value™"})).unwrap(), + ) + .try_into() + .unwrap(); assert_eq!(packet, payload); // Encode with NS and ack ID let payload = format!("2/admin™,1{}", json!(["event", { "data": "value™" }])); - let mut packet = Packet::event("/admin™", "event", json!([{"data": "value™"}])); + let mut packet = Packet::event( + "/admin™", + "event", + PayloadValue::from_data(json!([{"data": "value™"}])).unwrap(), + ); packet.inner.set_ack_id(1); let packet: String = packet.try_into().unwrap(); assert_eq!(packet, payload); @@ -675,24 +759,40 @@ mod test { let payload = "354[\"data\"]".to_string(); let packet = Packet::try_from(payload).unwrap(); - assert_eq!(Packet::ack("/", json!(["data"]), 54), packet); + assert_eq!( + Packet::ack("/", PayloadValue::from_data(json!(["data"])).unwrap(), 54), + packet + ); let payload = "3/admin™,54[\"data\"]".to_string(); let packet = Packet::try_from(payload).unwrap(); - assert_eq!(Packet::ack("/admin™", json!(["data"]), 54), packet); + assert_eq!( + Packet::ack( + "/admin™", + PayloadValue::from_data(json!(["data"])).unwrap(), + 54 + ), + packet + ); } #[test] fn packet_encode_event_ack() { let payload = "354[\"data\"]".to_string(); - let packet: String = Packet::ack("/", json!("data"), 54).try_into().unwrap(); + let packet: String = Packet::ack("/", PayloadValue::from_data(json!("data")).unwrap(), 54) + .try_into() + .unwrap(); assert_eq!(packet, payload); let payload = "3/admin™,54[\"data\"]".to_string(); - let packet: String = Packet::ack("/admin™", json!("data"), 54) - .try_into() - .unwrap(); + let packet: String = Packet::ack( + "/admin™", + PayloadValue::from_data(json!("data")).unwrap(), + 54, + ) + .try_into() + .unwrap(); assert_eq!(packet, payload); } @@ -713,17 +813,29 @@ mod test { let json = json!(["event", { "data": "value™" }, { "_placeholder": true, "num": 0}]); let payload = format!("51-{}", json); - let packet: String = - Packet::bin_event("/", "event", json!({ "data": "value™" }), vec![vec![1]]) - .try_into() - .unwrap(); + let packet: String = Packet::bin_event( + "/", + "event", + PayloadValue::from_data( + json!([{ "data": "value™" }, { "_placeholder": true, "num": 0 }]), + ) + .unwrap(), + ) + .try_into() + .unwrap(); assert_eq!(packet, payload); // Encode with ack ID let payload = format!("51-254{}", json); - let mut packet = - Packet::bin_event("/", "event", json!({ "data": "value™" }), vec![vec![1]]); + let mut packet = Packet::bin_event( + "/", + "event", + PayloadValue::from_data( + json!([{ "data": "value™" }, { "_placeholder": true, "num": 0 }]), + ) + .unwrap(), + ); packet.inner.set_ack_id(254); let packet: String = packet.try_into().unwrap(); @@ -734,8 +846,10 @@ mod test { let packet: String = Packet::bin_event( "/admin™", "event", - json!([{"data": "value™"}]), - vec![vec![1]], + PayloadValue::from_data( + json!([{"data": "value™"}, { "_placeholder": true, "num": 0 }]), + ) + .unwrap(), ) .try_into() .unwrap(); @@ -747,8 +861,10 @@ mod test { let mut packet = Packet::bin_event( "/admin™", "event", - json!([{"data": "value™"}]), - vec![vec![1]], + PayloadValue::from_data( + json!([{"data": "value™"}, { "_placeholder": true, "num": 0 }]), + ) + .unwrap(), ); packet.inner.set_ack_id(254); let packet: String = packet.try_into().unwrap(); @@ -762,9 +878,9 @@ mod test { inner: PacketData::BinaryEvent( "event".into(), BinaryPacket { - bin: vec![vec![1]], - data: json!([{"data": "value™"}]), + data: wrapped_packet_data_map("data", "value™"), payload_count: 1, + payloads: vec![vec![]], }, ack, ), @@ -816,18 +932,31 @@ mod test { let json = json!([{ "data": "value™" }, { "_placeholder": true, "num": 0}]); let payload = format!("61-54{}", json); - let packet: String = Packet::bin_ack("/", json!({ "data": "value™" }), vec![vec![1]], 54) - .try_into() - .unwrap(); + let packet: String = Packet::bin_ack( + "/", + PayloadValue::from_data( + json!([{ "data": "value™" }, { "_placeholder": true, "num": 0 }]), + ) + .unwrap(), + 54, + ) + .try_into() + .unwrap(); assert_eq!(packet, payload); // Encode with NS let payload = format!("61-/admin™,54{}", json); - let packet: String = - Packet::bin_ack("/admin™", json!({ "data": "value™" }), vec![vec![1]], 54) - .try_into() - .unwrap(); + let packet: String = Packet::bin_ack( + "/admin™", + PayloadValue::from_data( + json!([{ "data": "value™" }, { "_placeholder": true, "num": 0 }]), + ) + .unwrap(), + 54, + ) + .try_into() + .unwrap(); assert_eq!(packet, payload); } @@ -838,9 +967,9 @@ mod test { let comparison_packet = |ack, ns: &'static str| Packet { inner: PacketData::BinaryAck( BinaryPacket { - bin: vec![vec![1]], - data: json!([{"data": "value™"}]), + data: wrapped_packet_data_map("data", "value™"), payload_count: 1, + payloads: vec![vec![]], }, ack, ), @@ -886,30 +1015,45 @@ mod test { let packet = Packet::disconnect("/admin"); assert_eq!(packet.get_size_hint(), 8); - let packet = Packet::event("/", "event", json!({ "data": "value™" })); + let packet = Packet::event( + "/", + "event", + PayloadValue::from_data(json!({ "data": "value™" })).unwrap(), + ); assert_eq!(packet.get_size_hint(), 1); - let packet = Packet::event("/admin", "event", json!({ "data": "value™" })); + let packet = Packet::event( + "/admin", + "event", + PayloadValue::from_data(json!({ "data": "value™" })).unwrap(), + ); assert_eq!(packet.get_size_hint(), 8); - let packet = Packet::ack("/", json!("data"), 54); + let packet = Packet::ack("/", PayloadValue::from_data(json!("data")).unwrap(), 54); assert_eq!(packet.get_size_hint(), 3); - let packet = Packet::ack("/admin", json!("data"), 54); + let packet = Packet::ack( + "/admin", + PayloadValue::from_data(json!("data")).unwrap(), + 54, + ); assert_eq!(packet.get_size_hint(), 10); - let packet = Packet::bin_event("/", "event", json!({ "data": "value™" }), vec![vec![1]]); + let packet = Packet::bin_event( + "/", + "event", + PayloadValue::from_data(json!({ "data": "value™" })).unwrap(), + ); assert_eq!(packet.get_size_hint(), 3); let packet = Packet::bin_event( "/admin", "event", - json!({ "data": "value™" }), - vec![vec![1]], + PayloadValue::from_data(json!({ "data": "value™" })).unwrap(), ); assert_eq!(packet.get_size_hint(), 10); - let packet = Packet::bin_ack("/", json!("data"), vec![vec![1]], 54); + let packet = Packet::bin_ack("/", PayloadValue::from_data(json!("data")).unwrap(), 54); assert_eq!(packet.get_size_hint(), 5); } } diff --git a/socketioxide/src/socket.rs b/socketioxide/src/socket.rs index eac68f6d..ed9ebbfe 100644 --- a/socketioxide/src/socket.rs +++ b/socketioxide/src/socket.rs @@ -14,7 +14,6 @@ use std::{ use engineioxide::socket::{DisconnectReason as EIoDisconnectReason, Permit, PermitIterator}; use serde::{de::DeserializeOwned, Serialize}; -use serde_json::Value; use tokio::sync::oneshot::{self, Receiver}; #[cfg(feature = "extensions")] @@ -31,6 +30,7 @@ use crate::{ ns::Namespace, operators::{BroadcastOperators, ConfOperators, RoomParam}, packet::{BinaryPacket, Packet, PacketData}, + payload_value::PayloadValue, AckError, SocketIoConfig, }; use crate::{ @@ -107,9 +107,9 @@ pub(crate) trait PermitIteratorExt<'a>: fn emit(mut self, mut packet: Packet<'_>) { debug_assert!(self.len() > 0, "No permits available to send the message"); - let bin_payloads = match packet.inner { + let bin_payloads = match &mut packet.inner { PacketData::BinaryEvent(_, ref mut bin, _) | PacketData::BinaryAck(ref mut bin, _) => { - Some(std::mem::take(&mut bin.bin)) + Some(bin.extract_payloads()) } _ => None, }; @@ -138,7 +138,7 @@ pub struct Socket { pub(crate) ns: Arc>, message_handlers: RwLock, BoxedMessageHandler>>, disconnect_handler: Mutex>>, - ack_message: Mutex>>>, + ack_message: Mutex>>>, ack_counter: AtomicI64, /// The socket id pub id: Sid, @@ -323,7 +323,7 @@ impl Socket { }; let ns = self.ns(); - let data = serde_json::to_value(data)?; + let data = PayloadValue::from_data(data)?; permits.emit(Packet::event(ns, event.into(), data)); Ok(()) } @@ -392,7 +392,7 @@ impl Socket { return Err(e.with_value(data).into()); } }; - let data = serde_json::to_value(data)?; + let data = PayloadValue::from_data(data)?; let packet = Packet::event(self.ns(), event.into(), data); let rx = self.send_with_ack_permit(packet, permits); let stream = AckInnerStream::send(rx, self.config.ack_timeout, self.id); @@ -569,23 +569,6 @@ impl Socket { ConfOperators::new(self).timeout(timeout) } - /// Adds a binary payload to the message. - /// # Example - /// ``` - /// # use socketioxide::{SocketIo, extract::*}; - /// # use serde_json::Value; - /// # use std::sync::Arc; - /// let (_, io) = SocketIo::new_svc(); - /// io.ns("/", |socket: SocketRef| { - /// socket.on("test", |socket: SocketRef, Data::(data), Bin(bin)| async move { - /// // This will send the binary payload received to all clients in this namespace with the test message - /// socket.bin(bin).emit("test", data); - /// }); - /// }); - pub fn bin(&self, binary: Vec>) -> ConfOperators<'_, A> { - ConfOperators::new(self).bin(binary) - } - /// Broadcasts to all clients without any filtering (except the current socket). /// # Example /// ``` @@ -647,7 +630,7 @@ impl Socket { &self, mut packet: Packet<'_>, permits: PermitIterator<'_>, - ) -> Receiver> { + ) -> Receiver> { let (tx, rx) = oneshot::channel(); let ack = self.ack_counter.fetch_add(1, Ordering::SeqCst) + 1; @@ -657,7 +640,10 @@ impl Socket { rx } - pub(crate) fn send_with_ack(&self, mut packet: Packet<'_>) -> Receiver> { + pub(crate) fn send_with_ack( + &self, + mut packet: Packet<'_>, + ) -> Receiver> { let (tx, rx) = oneshot::channel(); let ack = self.ack_counter.fetch_add(1, Ordering::SeqCst) + 1; @@ -735,9 +721,14 @@ impl Socket { self.esocket.protocol.into() } - fn recv_event(self: Arc, e: &str, data: Value, ack: Option) -> Result<(), Error> { + fn recv_event( + self: Arc, + e: &str, + data: PayloadValue, + ack: Option, + ) -> Result<(), Error> { if let Some(handler) = self.message_handlers.read().unwrap().get(e) { - handler.call(self.clone(), data, vec![], ack); + handler.call(self.clone(), data, ack); } Ok(()) } @@ -749,17 +740,14 @@ impl Socket { ack: Option, ) -> Result<(), Error> { if let Some(handler) = self.message_handlers.read().unwrap().get(e) { - handler.call(self.clone(), packet.data, packet.bin, ack); + handler.call(self.clone(), packet.data, ack); } Ok(()) } - fn recv_ack(self: Arc, data: Value, ack: i64) -> Result<(), Error> { + fn recv_ack(self: Arc, data: PayloadValue, ack: i64) -> Result<(), Error> { if let Some(tx) = self.ack_message.lock().unwrap().remove(&ack) { - let res = AckResponse { - data, - binary: vec![], - }; + let res = AckResponse { data }; tx.send(Ok(res)).ok(); } Ok(()) @@ -767,10 +755,7 @@ impl Socket { fn recv_bin_ack(self: Arc, packet: BinaryPacket, ack: i64) -> Result<(), Error> { if let Some(tx) = self.ack_message.lock().unwrap().remove(&ack) { - let res = AckResponse { - data: packet.data, - binary: packet.bin, - }; + let res = AckResponse { data: packet.data }; tx.send(Ok(res)).ok(); } Ok(()) @@ -818,11 +803,11 @@ mod test { // Saturate the channel for _ in 0..200 { socket - .send(Packet::event("test", "test", Value::Null)) + .send(Packet::event("test", "test", PayloadValue::Null)) .unwrap(); } - let ack = socket.emit_with_ack::<_, Value>("test", Value::Null); + let ack = socket.emit_with_ack::<_, PayloadValue>("test", PayloadValue::Null); assert!(matches!( ack, Err(SendError::Socket(SocketError::InternalChannelFull(_)))