Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(request-response): add modules for json and cbor messages #3952

Merged
merged 45 commits into from
May 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
dd98771
3905 Serde support
dgarus May 16, 2023
55bc76a
fix failures
dgarus May 16, 2023
a5f3ed9
fix failures
dgarus May 16, 2023
5304fb8
fix failures
dgarus May 16, 2023
e9628ee
Fix format
dgarus May 16, 2023
4f8150d
Merge branch 'master' into 3905-serde_support
dgarus May 16, 2023
23b7722
Fix failure
dgarus May 16, 2023
11e0783
Cbor behavior
dgarus May 18, 2023
ade1842
Cbor behavior
dgarus May 18, 2023
771560c
Format
dgarus May 18, 2023
2a6693e
Fix test
dgarus May 18, 2023
d4bff4c
Fix review comments
dgarus May 19, 2023
92d9862
Fix tests running
dgarus May 19, 2023
a0b8ee5
Fix tests running
dgarus May 19, 2023
1129dee
Added features `json` and `cbor`
dgarus May 19, 2023
a8f9ee9
CHANGELOG
dgarus May 19, 2023
36eaf02
Update protocols/request-response/CHANGELOG.md
dgarus May 20, 2023
61f7d48
Update protocols/request-response/src/cbor.rs
dgarus May 20, 2023
d78c17b
Update protocols/request-response/src/cbor.rs
dgarus May 20, 2023
0ee31bd
Update protocols/request-response/src/cbor.rs
dgarus May 20, 2023
d4044a5
Update protocols/request-response/src/cbor.rs
dgarus May 20, 2023
416f271
fix review comments
dgarus May 20, 2023
9d20aaa
fix serde feature should be in the test scope
dgarus May 22, 2023
ed5da99
No default features
dgarus May 22, 2023
2d70235
swarm-derive should be optional
dgarus May 22, 2023
9c7d967
Codecs are `pub(crate)`
dgarus May 22, 2023
3d4b12b
Merge branch 'master' into 3905-serde_support
dgarus May 22, 2023
3839599
revert
dgarus May 22, 2023
4e95f78
Merge remote-tracking branch 'origin/3905-serde_support' into 3905-se…
dgarus May 22, 2023
15b0cb5
Hide codec implementation from public API
thomaseizinger May 22, 2023
30c9420
Merge branch 'master' into 3905-serde_support
dgarus May 23, 2023
c6a0327
Hide json::Codec
dgarus May 23, 2023
9d6d608
Remove `libp2p-swarm-derive` dep
dgarus May 23, 2023
5cdb636
Read/write without prefixed size
dgarus May 23, 2023
c177570
fix format
dgarus May 23, 2023
60f029c
fix format
dgarus May 23, 2023
0fb3e78
fix bug
dgarus May 23, 2023
12a1322
Deref for behaviors
dgarus May 23, 2023
4122c4e
fix format
dgarus May 23, 2023
74669a3
Rename constructors to better support type aliases
thomaseizinger May 23, 2023
2da4b43
move fn read_to_end to request_response crate
dgarus May 23, 2023
20b2c07
Merge branch 'master' into 3905-serde_support
dgarus May 23, 2023
207f15a
fix review comments
dgarus May 24, 2023
9047c7a
Merge branch 'master' into 3905-serde_support
dgarus May 24, 2023
8197278
fix review comments
dgarus May 24, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion examples/file-sharing/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub(crate) async fn new(
transport,
ComposedBehaviour {
kademlia: Kademlia::new(peer_id, MemoryStore::new(peer_id)),
request_response: request_response::Behaviour::new(
request_response: request_response::Behaviour::with_codec(
FileExchangeCodec(),
iter::once((
StreamProtocol::new("/file-exchange/1"),
Expand Down
2 changes: 1 addition & 1 deletion protocols/autonat/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ impl Behaviour {
let protocols = iter::once((DEFAULT_PROTOCOL_NAME, ProtocolSupport::Full));
let mut cfg = request_response::Config::default();
cfg.set_request_timeout(config.timeout);
let inner = request_response::Behaviour::new(AutoNatCodec, protocols, cfg);
let inner = request_response::Behaviour::with_codec(AutoNatCodec, protocols, cfg);
Self {
local_peer_id,
inner,
Expand Down
7 changes: 6 additions & 1 deletion protocols/request-response/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## 0.25.0 - unreleased

- Add `request_response::json::Behaviour` and `request_response::cbor::Behaviour` building on top of the `serde` traits.
To conveniently construct these, we remove the `Codec` parameter from `Behaviour::new` and add `Behaviour::with_codec`.
See [PR 3952].

- Raise MSRV to 1.65.
See [PR 3715].
- Remove deprecated `RequestResponse` prefixed items. See [PR 3702].
Expand All @@ -8,10 +12,11 @@
These variants are no longer constructed.
See [PR 3605].

- Don't close connections if individual streams fail.
- Don't close connections if individual streams fail.
Log the error instead.
See [PR 3913].

[PR 3952]: https://github.com/libp2p/rust-libp2p/pull/3952
[PR 3605]: https://github.com/libp2p/rust-libp2p/pull/3605
[PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715
[PR 3702]: https://github.com/libp2p/rust-libp2p/pull/3702
Expand Down
9 changes: 9 additions & 0 deletions protocols/request-response/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,17 @@ libp2p-core = { workspace = true }
libp2p-swarm = { workspace = true }
libp2p-identity = { workspace = true }
rand = "0.8"
serde = { version = "1.0", optional = true}
serde_json = { version = "1.0.96", optional = true }
serde_cbor = { version = "0.11.2", optional = true }
smallvec = "1.6.1"
void = "1.0.2"
log = "0.4.17"

[features]
json = ["dep:serde", "dep:serde_json", "libp2p-swarm/macros"]
cbor = ["dep:serde", "dep:serde_cbor", "libp2p-swarm/macros"]

[dev-dependencies]
async-std = { version = "1.6.2", features = ["attributes"] }
env_logger = "0.10.0"
Expand All @@ -30,6 +37,8 @@ libp2p-tcp = { workspace = true, features = ["async-io"] }
libp2p-yamux = { workspace = true }
rand = "0.8"
libp2p-swarm-test = { workspace = true }
futures_ringbuf = "0.3.1"
serde = { version = "1.0", features = ["derive"]}

# Passing arguments to the docsrs builder in order to properly document cfg's.
# More information: https://docs.rs/about/builds#cross-compiling
Expand Down
214 changes: 214 additions & 0 deletions protocols/request-response/src/cbor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
// Copyright 2023 Protocol Labs
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

/// A request-response behaviour using [`serde_cbor`] for serializing and deserializing the messages.
///
/// # Example
///
/// ```
/// # use libp2p_request_response::{cbor, ProtocolSupport, self as request_response};
/// # use libp2p_swarm::{StreamProtocol, SwarmBuilder};
/// #[derive(Debug, serde::Serialize, serde::Deserialize)]
/// struct GreetRequest {
/// name: String,
/// }
///
/// #[derive(Debug, serde::Serialize, serde::Deserialize)]
/// struct GreetResponse {
/// message: String,
/// }
///
/// let behaviour = cbor::Behaviour::<GreetRequest, GreetResponse>::new(
/// [(StreamProtocol::new("/my-cbor-protocol"), ProtocolSupport::Full)],
/// request_response::Config::default()
/// );
Comment on lines +26 to +41
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wonderful how simple this turned out to be. Thanks @dgarus and @thomaseizinger!

/// ```
pub type Behaviour<Req, Resp> = crate::Behaviour<codec::Codec<Req, Resp>>;

mod codec {
use async_trait::async_trait;
use futures::prelude::*;
use futures::{AsyncRead, AsyncWrite};
use libp2p_swarm::StreamProtocol;
use serde::{de::DeserializeOwned, Serialize};
use std::{io, marker::PhantomData};

/// Max request size in bytes
const REQUEST_SIZE_MAXIMUM: u64 = 1024 * 1024;
/// Max response size in bytes
const RESPONSE_SIZE_MAXIMUM: u64 = 10 * 1024 * 1024;

pub struct Codec<Req, Resp> {
phantom: PhantomData<(Req, Resp)>,
}

impl<Req, Resp> Default for Codec<Req, Resp> {
fn default() -> Self {
Codec {
phantom: PhantomData,
}
}
}

impl<Req, Resp> Clone for Codec<Req, Resp> {
fn clone(&self) -> Self {
Self::default()
}
}

#[async_trait]
impl<Req, Resp> crate::Codec for Codec<Req, Resp>
where
Req: Send + Serialize + DeserializeOwned,
Resp: Send + Serialize + DeserializeOwned,
{
type Protocol = StreamProtocol;
type Request = Req;
type Response = Resp;

async fn read_request<T>(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result<Req>
where
T: AsyncRead + Unpin + Send,
{
let mut vec = Vec::new();

io.take(REQUEST_SIZE_MAXIMUM).read_to_end(&mut vec).await?;

serde_cbor::from_slice(vec.as_slice()).map_err(into_io_error)
}

async fn read_response<T>(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result<Resp>
where
T: AsyncRead + Unpin + Send,
{
let mut vec = Vec::new();

io.take(RESPONSE_SIZE_MAXIMUM).read_to_end(&mut vec).await?;

serde_cbor::from_slice(vec.as_slice()).map_err(into_io_error)
}

async fn write_request<T>(
&mut self,
_: &Self::Protocol,
io: &mut T,
req: Self::Request,
) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send,
{
let data: Vec<u8> = serde_cbor::to_vec(&req).map_err(into_io_error)?;

io.write_all(data.as_ref()).await?;

Ok(())
}

async fn write_response<T>(
&mut self,
_: &Self::Protocol,
io: &mut T,
resp: Self::Response,
) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send,
{
let data: Vec<u8> = serde_cbor::to_vec(&resp).map_err(into_io_error).unwrap();

io.write_all(data.as_ref()).await?;

Ok(())
}
}

fn into_io_error(err: serde_cbor::Error) -> io::Error {
if err.is_syntax() || err.is_data() {
return io::Error::new(io::ErrorKind::InvalidData, err);
}

if err.is_eof() {
return io::Error::new(io::ErrorKind::UnexpectedEof, err);
}

io::Error::new(io::ErrorKind::Other, err)
}
}

#[cfg(test)]
mod tests {
use crate::cbor::codec::Codec;
use crate::Codec as _;
use futures::AsyncWriteExt;
use futures_ringbuf::Endpoint;
use libp2p_swarm::StreamProtocol;
use serde::{Deserialize, Serialize};

#[async_std::test]
async fn test_codec() {
let expected_request = TestRequest {
payload: "test_payload".to_string(),
};
let expected_response = TestResponse {
payload: "test_payload".to_string(),
};
let protocol = StreamProtocol::new("/test_cbor/1");
let mut codec = Codec::default();

let (mut a, mut b) = Endpoint::pair(124, 124);
codec
.write_request(&protocol, &mut a, expected_request.clone())
.await
.expect("Should write request");
a.close().await.unwrap();

let actual_request = codec
.read_request(&protocol, &mut b)
.await
.expect("Should read request");
b.close().await.unwrap();

assert_eq!(actual_request, expected_request);

let (mut a, mut b) = Endpoint::pair(124, 124);
codec
.write_response(&protocol, &mut a, expected_response.clone())
.await
.expect("Should write response");
a.close().await.unwrap();

let actual_response = codec
.read_response(&protocol, &mut b)
.await
.expect("Should read response");
b.close().await.unwrap();

assert_eq!(actual_response, expected_response);
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct TestRequest {
payload: String,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct TestResponse {
payload: String,
}
}
Loading