Skip to content

Commit

Permalink
feat(kad): migrate to quick-protobuf-codec crate
Browse files Browse the repository at this point in the history
Resolves #4487.

Pull-Request: #4501.
  • Loading branch information
0xcrust authored and thomaseizinger committed Sep 21, 2023
1 parent 03eba50 commit ce8d48f
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 75 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ libp2p-floodsub = { version = "0.43.0", path = "protocols/floodsub" }
libp2p-gossipsub = { version = "0.45.1", path = "protocols/gossipsub" }
libp2p-identify = { version = "0.43.0", path = "protocols/identify" }
libp2p-identity = { version = "0.2.3" }
libp2p-kad = { version = "0.44.4", path = "protocols/kad" }
libp2p-kad = { version = "0.44.5", path = "protocols/kad" }
libp2p-mdns = { version = "0.44.0", path = "protocols/mdns" }
libp2p-memory-connection-limits = { version = "0.1.0", path = "misc/memory-connection-limits" }
libp2p-metrics = { version = "0.13.1", path = "misc/metrics" }
Expand Down
6 changes: 6 additions & 0 deletions protocols/kad/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 0.44.5 - unreleased
- Migrate to `quick-protobuf-codec` crate for codec logic.
See [PR 4501].

[PR 4501]: https://github.com/libp2p/rust-libp2p/pull/4501

## 0.44.4

- Implement common traits on `RoutingUpdate`.
Expand Down
3 changes: 2 additions & 1 deletion protocols/kad/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-kad"
edition = "2021"
rust-version = { workspace = true }
description = "Kademlia protocol for libp2p"
version = "0.44.4"
version = "0.44.5"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand All @@ -21,6 +21,7 @@ log = "0.4"
libp2p-core = { workspace = true }
libp2p-swarm = { workspace = true }
quick-protobuf = "0.8"
quick-protobuf-codec = { workspace = true }
libp2p-identity = { workspace = true }
rand = "0.8"
sha2 = "0.10.7"
Expand Down
141 changes: 69 additions & 72 deletions protocols/kad/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,17 @@

use crate::proto;
use crate::record_priv::{self, Record};
use asynchronous_codec::Framed;
use asynchronous_codec::{Decoder, Encoder, Framed};
use bytes::BytesMut;
use codec::UviBytes;
use futures::prelude::*;
use instant::Instant;
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use libp2p_core::Multiaddr;
use libp2p_identity::PeerId;
use libp2p_swarm::StreamProtocol;
use quick_protobuf::{BytesReader, Writer};
use std::marker::PhantomData;
use std::{convert::TryFrom, time::Duration};
use std::{io, iter};
use unsigned_varint::codec;

/// The protocol name used for negotiating with multistream-select.
pub(crate) const DEFAULT_PROTO_NAME: StreamProtocol = StreamProtocol::new("/ipfs/kad/1.0.0");
Expand Down Expand Up @@ -179,6 +177,42 @@ impl UpgradeInfo for KademliaProtocolConfig {
}
}

/// Codec for Kademlia inbound and outbound message framing.
pub struct Codec<A, B> {
codec: quick_protobuf_codec::Codec<proto::Message>,
__phantom: PhantomData<(A, B)>,
}
impl<A, B> Codec<A, B> {
fn new(max_packet_size: usize) -> Self {
Codec {
codec: quick_protobuf_codec::Codec::new(max_packet_size),
__phantom: PhantomData,
}
}
}

impl<A: Into<proto::Message>, B> Encoder for Codec<A, B> {
type Error = io::Error;
type Item = A;

fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
Ok(self.codec.encode(item.into(), dst)?)
}
}
impl<A, B: TryFrom<proto::Message, Error = io::Error>> Decoder for Codec<A, B> {
type Error = io::Error;
type Item = B;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
self.codec.decode(src)?.map(B::try_from).transpose()
}
}

/// Sink of responses and stream of requests.
pub(crate) type KadInStreamSink<S> = Framed<S, Codec<KadResponseMsg, KadRequestMsg>>;
/// Sink of requests and stream of responses.
pub(crate) type KadOutStreamSink<S> = Framed<S, Codec<KadRequestMsg, KadResponseMsg>>;

impl<C> InboundUpgrade<C> for KademliaProtocolConfig
where
C: AsyncRead + AsyncWrite + Unpin,
Expand All @@ -188,32 +222,9 @@ where
type Error = io::Error;

fn upgrade_inbound(self, incoming: C, _: Self::Info) -> Self::Future {
use quick_protobuf::{MessageRead, MessageWrite};

let mut codec = UviBytes::default();
codec.set_max_len(self.max_packet_size);

future::ok(
Framed::new(incoming, codec)
.err_into()
.with::<_, _, fn(_) -> _, _>(|response| {
let proto_struct = resp_msg_to_proto(response);
let mut buf = Vec::with_capacity(proto_struct.get_size());
let mut writer = Writer::new(&mut buf);
proto_struct
.write_message(&mut writer)
.expect("Encoding to succeed");
future::ready(Ok(io::Cursor::new(buf)))
})
.and_then::<_, fn(_) -> _>(|bytes| {
let mut reader = BytesReader::from_bytes(&bytes);
let request = match proto::Message::from_reader(&mut reader, &bytes) {
Ok(r) => r,
Err(err) => return future::ready(Err(err.into())),
};
future::ready(proto_to_req_msg(request))
}),
)
let codec = Codec::new(self.max_packet_size);

future::ok(Framed::new(incoming, codec))
}
}

Expand All @@ -226,51 +237,12 @@ where
type Error = io::Error;

fn upgrade_outbound(self, incoming: C, _: Self::Info) -> Self::Future {
use quick_protobuf::{MessageRead, MessageWrite};

let mut codec = UviBytes::default();
codec.set_max_len(self.max_packet_size);

future::ok(
Framed::new(incoming, codec)
.err_into()
.with::<_, _, fn(_) -> _, _>(|request| {
let proto_struct = req_msg_to_proto(request);
let mut buf = Vec::with_capacity(proto_struct.get_size());
let mut writer = Writer::new(&mut buf);
proto_struct
.write_message(&mut writer)
.expect("Encoding to succeed");
future::ready(Ok(io::Cursor::new(buf)))
})
.and_then::<_, fn(_) -> _>(|bytes| {
let mut reader = BytesReader::from_bytes(&bytes);
let response = match proto::Message::from_reader(&mut reader, &bytes) {
Ok(r) => r,
Err(err) => return future::ready(Err(err.into())),
};
future::ready(proto_to_resp_msg(response))
}),
)
let codec = Codec::new(self.max_packet_size);

future::ok(Framed::new(incoming, codec))
}
}

/// Sink of responses and stream of requests.
pub(crate) type KadInStreamSink<S> = KadStreamSink<S, KadResponseMsg, KadRequestMsg>;
/// Sink of requests and stream of responses.
pub(crate) type KadOutStreamSink<S> = KadStreamSink<S, KadRequestMsg, KadResponseMsg>;
pub(crate) type KadStreamSink<S, A, B> = stream::AndThen<
sink::With<
stream::ErrInto<Framed<S, UviBytes<io::Cursor<Vec<u8>>>>, io::Error>,
io::Cursor<Vec<u8>>,
A,
future::Ready<Result<io::Cursor<Vec<u8>>, io::Error>>,
fn(A) -> future::Ready<Result<io::Cursor<Vec<u8>>, io::Error>>,
>,
future::Ready<Result<B, io::Error>>,
fn(BytesMut) -> future::Ready<Result<B, io::Error>>,
>;

/// Request that we can send to a peer or that we received from a peer.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum KadRequestMsg {
Expand Down Expand Up @@ -346,6 +318,31 @@ pub enum KadResponseMsg {
},
}

impl From<KadRequestMsg> for proto::Message {
fn from(kad_msg: KadRequestMsg) -> Self {
req_msg_to_proto(kad_msg)
}
}
impl From<KadResponseMsg> for proto::Message {
fn from(kad_msg: KadResponseMsg) -> Self {
resp_msg_to_proto(kad_msg)
}
}
impl TryFrom<proto::Message> for KadRequestMsg {
type Error = io::Error;

fn try_from(message: proto::Message) -> Result<Self, Self::Error> {
proto_to_req_msg(message)
}
}
impl TryFrom<proto::Message> for KadResponseMsg {
type Error = io::Error;

fn try_from(message: proto::Message) -> Result<Self, Self::Error> {
proto_to_resp_msg(message)
}
}

/// Converts a `KadRequestMsg` into the corresponding protobuf message for sending.
fn req_msg_to_proto(kad_msg: KadRequestMsg) -> proto::Message {
match kad_msg {
Expand Down

0 comments on commit ce8d48f

Please sign in to comment.