Skip to content

Commit

Permalink
wip: integrate binary data into payloads
Browse files Browse the repository at this point in the history
Previously, the non-binary part of a message and the binary payloads in
a message were represented separately: the non-binary portion was
represented by a serde_json::Value, and could be converted to an
arbitrary data structure.  That data structure would not include the
binary data or any indication that there is any binary data at all.  The
binary data would be provided in a Vec<Vec<u8>>.

There were a few problems with this:

1. The original code only supported cases where the payload was a flat
   array with some binary payloads in the root of the array, or a flat
   object where the root of the object was a binary payload.  Objects
   with more complicated structure and binary data embedded in various
   places in the structure were not supported.
2. Adding support for the above turned out to not be possible in a
   useful way, because the ordering of the Vec<Vec<u8>> matters, and it
   could never be clear where exactly in the possibly-complex structure
   each binary payload belonged.
3. One of the useful features of the socket.io protocol is that it lets
   users efficiently transmit binary data in addition to textual/numeric
   data, and have that handled transparently by the protocol, with
   either end of the connection believing that they just sent or
   received a single mixed textual/numeric/binary payload.  Separating
   the non-binary from the binary negates that benefit.

This introduces a new type, PayloadValue, that behaves similarly to
serde_json::Value.  The main difference is that it has a Binary variant,
which holds a numeric index and a Vec<u8>.  This allows us to include
the binary data where the sender of that data intended it to be.

There is currently one wrinkle: serde_json does not appear to
consistently handle binary data; when serializing a struct with Vec<u8>,
I believe it will serialize it as an array of numbers, rather than
recognize that it's binary data.  For now, I've included a Binary struct
that wraps a Vec<u8>, which can be included as the type of a binary
member, instead of using a Vec<u8> directly.  Hopefully I'll be able to
figure out a better way to do this.

Unfinished tasks:

* Testing: I have no idea if this even works yet.  All I've done is get
  it to compile.
* Benchmarking: I've tried to ensure that I don't copy data any more
  than the existing library does, but it's possible I've introduced some
  performance regressions, so I'll have to look into that.
* Documentation: the documentation still references the old way of doing
  things and needs to be updated.

Closes Totodore#276.
  • Loading branch information
kelnos committed Mar 7, 2024
1 parent c243f6c commit ed1293d
Show file tree
Hide file tree
Showing 8 changed files with 374 additions and 333 deletions.
59 changes: 26 additions & 33 deletions socketioxide/src/ack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,22 @@ use futures::{
Future, Stream,
};
use serde::de::DeserializeOwned;
use serde_json::Value;
use tokio::{sync::oneshot::Receiver, time::Timeout};

use crate::{adapter::Adapter, errors::AckError, extract::SocketRef, packet::Packet, SocketError};
use crate::{
adapter::Adapter, errors::AckError, extract::SocketRef, packet::Packet,
payload_value::PayloadValue, SocketError,
};

/// An acknowledgement sent by the client.
/// It contains the data sent by the client and the binary payloads if there are any.
#[derive(Debug)]
pub struct AckResponse<T> {
/// The data returned by the client
pub data: T,
/// Optional binary payloads.
/// If there is no binary payload, the `Vec` will be empty
pub binary: Vec<Vec<u8>>,
}

pub(crate) type AckResult<T = Value> = Result<AckResponse<T>, AckError<()>>;
pub(crate) type AckResult<T = PayloadValue> = Result<AckResponse<T>, AckError<()>>;

pin_project_lite::pin_project! {
/// A [`Future`] of [`AckResponse`] received from the client with its corresponding [`Sid`].
Expand Down Expand Up @@ -127,12 +126,12 @@ pin_project_lite::pin_project! {
pub enum AckInnerStream {
Stream {
#[pin]
rxs: FuturesUnordered<AckResultWithId<Value>>,
rxs: FuturesUnordered<AckResultWithId<PayloadValue>>,
},

Fut {
#[pin]
rx: AckResultWithId<Value>,
rx: AckResultWithId<PayloadValue>,
polled: bool,
},
}
Expand Down Expand Up @@ -171,7 +170,7 @@ impl AckInnerStream {

/// Creates a new [`AckInnerStream`] from a [`oneshot::Receiver`](tokio) corresponding to the acknowledgement
/// of a single socket.
pub fn send(rx: Receiver<AckResult<Value>>, duration: Duration, id: Sid) -> Self {
pub fn send(rx: Receiver<AckResult<PayloadValue>>, duration: Duration, id: Sid) -> Self {
AckInnerStream::Fut {
polled: false,
rx: AckResultWithId {
Expand All @@ -183,7 +182,7 @@ impl AckInnerStream {
}

impl Stream for AckInnerStream {
type Item = (Sid, AckResult<Value>);
type Item = (Sid, AckResult<PayloadValue>);

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
use InnerProj::*;
Expand Down Expand Up @@ -221,7 +220,7 @@ impl FusedStream for AckInnerStream {
}

impl Future for AckInnerStream {
type Output = AckResult<Value>;
type Output = AckResult<PayloadValue>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.as_mut().poll_next(cx) {
Expand Down Expand Up @@ -295,13 +294,11 @@ impl<T> From<AckInnerStream> for AckStream<T> {
}
}

fn map_ack_response<T: DeserializeOwned>(ack: AckResult<Value>) -> AckResult<T> {
fn map_ack_response<T: DeserializeOwned>(ack: AckResult<PayloadValue>) -> AckResult<T> {
ack.and_then(|v| {
serde_json::from_value(v.data)
.map(|data| AckResponse {
data,
binary: v.binary,
})
v.data
.into_data::<T>()
.map(|data| AckResponse { data })
.map_err(|e| e.into())
})
}
Expand All @@ -328,12 +325,12 @@ mod test {
async fn broadcast_ack() {
let socket = create_socket();
let socket2 = create_socket();
let mut packet = Packet::event("/", "test", "test".into());
let mut packet = Packet::event("/", "test", PayloadValue::from_data("test").unwrap());
packet.inner.set_ack_id(1);
let socks = vec![socket.clone().into(), socket2.clone().into()];
let stream: AckStream<String> = AckInnerStream::broadcast(packet, socks, None).into();

let res_packet = Packet::ack("test", "test".into(), 1);
let res_packet = Packet::ack("test", PayloadValue::from_data("test").unwrap(), 1);
socket.recv(res_packet.inner.clone()).unwrap();
socket2.recv(res_packet.inner).unwrap();

Expand All @@ -351,8 +348,7 @@ mod test {
let stream: AckStream<String> =
AckInnerStream::send(rx, Duration::from_secs(1), sid).into();
tx.send(Ok(AckResponse {
data: Value::String("test".into()),
binary: vec![],
data: PayloadValue::String("test".into()),
}))
.unwrap();

Expand All @@ -372,8 +368,7 @@ mod test {
let stream: AckStream<String> =
AckInnerStream::send(rx, Duration::from_secs(1), sid).into();
tx.send(Ok(AckResponse {
data: Value::String("test".into()),
binary: vec![],
data: PayloadValue::String("test".into()),
}))
.unwrap();

Expand All @@ -384,12 +379,12 @@ mod test {
async fn broadcast_ack_with_deserialize_error() {
let socket = create_socket();
let socket2 = create_socket();
let mut packet = Packet::event("/", "test", "test".into());
let mut packet = Packet::event("/", "test", PayloadValue::from_data("test").unwrap());
packet.inner.set_ack_id(1);
let socks = vec![socket.clone().into(), socket2.clone().into()];
let stream: AckStream<String> = AckInnerStream::broadcast(packet, socks, None).into();

let res_packet = Packet::ack("test", 132.into(), 1);
let res_packet = Packet::ack("test", PayloadValue::from_data(132).unwrap(), 1);
socket.recv(res_packet.inner.clone()).unwrap();
socket2.recv(res_packet.inner).unwrap();

Expand All @@ -413,8 +408,7 @@ mod test {
let stream: AckStream<String> =
AckInnerStream::send(rx, Duration::from_secs(1), sid).into();
tx.send(Ok(AckResponse {
data: Value::Bool(true),
binary: vec![],
data: PayloadValue::Bool(true),
}))
.unwrap();
assert_eq!(stream.size_hint().0, 1);
Expand All @@ -436,8 +430,7 @@ mod test {
let stream: AckStream<String> =
AckInnerStream::send(rx, Duration::from_secs(1), sid).into();
tx.send(Ok(AckResponse {
data: Value::Bool(true),
binary: vec![],
data: PayloadValue::Bool(true),
}))
.unwrap();

Expand All @@ -448,12 +441,12 @@ mod test {
async fn broadcast_ack_with_closed_socket() {
let socket = create_socket();
let socket2 = create_socket();
let mut packet = Packet::event("/", "test", "test".into());
let mut packet = Packet::event("/", "test", PayloadValue::from_data("test").unwrap());
packet.inner.set_ack_id(1);
let socks = vec![socket.clone().into(), socket2.clone().into()];
let stream: AckStream<String> = AckInnerStream::broadcast(packet, socks, None).into();

let res_packet = Packet::ack("test", "test".into(), 1);
let res_packet = Packet::ack("test", PayloadValue::from_data("test").unwrap(), 1);
socket.clone().recv(res_packet.inner.clone()).unwrap();

futures::pin_mut!(stream);
Expand Down Expand Up @@ -503,14 +496,14 @@ mod test {
async fn broadcast_ack_with_timeout() {
let socket = create_socket();
let socket2 = create_socket();
let mut packet = Packet::event("/", "test", "test".into());
let mut packet = Packet::event("/", "test", PayloadValue::from_data("test").unwrap());
packet.inner.set_ack_id(1);
let socks = vec![socket.clone().into(), socket2.clone().into()];
let stream: AckStream<String> =
AckInnerStream::broadcast(packet, socks, Some(Duration::from_millis(10))).into();

socket
.recv(Packet::ack("test", "test".into(), 1).inner)
.recv(Packet::ack("test", PayloadValue::from_data("test").unwrap(), 1).inner)
.unwrap();

futures::pin_mut!(stream);
Expand Down
10 changes: 7 additions & 3 deletions socketioxide/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ pub enum Error {
#[error("invalid packet type")]
InvalidPacketType,

#[error("invalid binary payload count")]
InvalidPayloadCount,

#[error("invalid event name")]
InvalidEventName,

Expand Down Expand Up @@ -165,9 +168,10 @@ impl From<&Error> for Option<EIoDisconnectReason> {
use EIoDisconnectReason::*;
match value {
Error::SocketGone(_) => Some(TransportClose),
Error::Serialize(_) | Error::InvalidPacketType | Error::InvalidEventName => {
Some(PacketParsingError)
}
Error::Serialize(_)
| Error::InvalidPacketType
| Error::InvalidEventName
| Error::InvalidPayloadCount => Some(PacketParsingError),
Error::Adapter(_) | Error::InvalidNamespace => None,
}
}
Expand Down
Loading

0 comments on commit ed1293d

Please sign in to comment.