From 489a8aeff04974b220c695242aa7c283aad68465 Mon Sep 17 00:00:00 2001 From: "Roman S. Borschel" Date: Tue, 12 May 2020 12:31:05 +0200 Subject: [PATCH 01/16] Add the libp2p-request-response protocol. This crate provides a generic implementation for request/response protocols, whereby each request is sent on a new substream. --- Cargo.toml | 1 + core/src/upgrade.rs | 13 +- protocols/request-response/Cargo.toml | 24 + protocols/request-response/src/lib.rs | 622 +++++++++++++++++++++++ protocols/request-response/tests/ping.rs | 181 +++++++ swarm/src/lib.rs | 2 + swarm/src/protocols_handler.rs | 4 +- swarm/src/protocols_handler/one_shot.rs | 148 +++--- 8 files changed, 923 insertions(+), 72 deletions(-) create mode 100644 protocols/request-response/Cargo.toml create mode 100644 protocols/request-response/src/lib.rs create mode 100644 protocols/request-response/tests/ping.rs diff --git a/Cargo.toml b/Cargo.toml index 71dcabf41a4..837c97755bc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -107,6 +107,7 @@ members = [ "protocols/noise", "protocols/ping", "protocols/plaintext", + "protocols/request-response", "protocols/secio", "swarm", "transports/dns", diff --git a/core/src/upgrade.rs b/core/src/upgrade.rs index dbe3a5b7c96..9798ae6c27a 100644 --- a/core/src/upgrade.rs +++ b/core/src/upgrade.rs @@ -87,12 +87,12 @@ pub use self::{ /// /// # Context /// -/// In situations where we provide a list of protocols that we support, the elements of that list are required to -/// implement the [`ProtocolName`] trait. +/// In situations where we provide a list of protocols that we support, +/// the elements of that list are required to implement the [`ProtocolName`] trait. /// -/// Libp2p will call the [`ProtocolName::protocol_name`] trait method on each element of that list, and transmit the -/// returned value on the network. If the remote accepts a given protocol, the element serves as the return value of -/// the function that performed the negotiation. +/// Libp2p will call [`ProtocolName::protocol_name`] on each element of that list, and transmit the +/// returned value on the network. If the remote accepts a given protocol, the element +/// serves as the return value of the function that performed the negotiation. /// /// # Example /// @@ -118,6 +118,9 @@ pub use self::{ /// pub trait ProtocolName { /// The protocol name as bytes. Transmitted on the network. + /// + /// **Note:** Valid protocol names must start with `/` and + /// not exceed 140 bytes in length. fn protocol_name(&self) -> &[u8]; } diff --git a/protocols/request-response/Cargo.toml b/protocols/request-response/Cargo.toml new file mode 100644 index 00000000000..f702f808772 --- /dev/null +++ b/protocols/request-response/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "libp2p-request-response" +edition = "2018" +description = "Generic Request/Response Protocols" +version = "0.19.0" +authors = ["Parity Technologies "] +license = "MIT" +repository = "https://github.com/libp2p/rust-libp2p" +keywords = ["peer-to-peer", "libp2p", "networking"] +categories = ["network-programming", "asynchronous"] + +[dependencies] +bytes = "0.5" +futures = "0.3.1" +libp2p-core = { version = "0.19.0", path = "../../core" } +libp2p-swarm = { version = "0.19.0", path = "../../swarm" } +smallvec = "1.4" + +[dev-dependencies] +async-std = "< 1.6" +libp2p-noise = { version = "0.19.0", path = "../noise" } +libp2p-tcp = { version = "0.19.0", path = "../../transports/tcp", features = ["async-std"] } +libp2p-yamux = { version = "0.19.0", path = "../../muxers/yamux" } +rand = "0.7" diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs new file mode 100644 index 00000000000..9d82abcb29a --- /dev/null +++ b/protocols/request-response/src/lib.rs @@ -0,0 +1,622 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Generic request/response protocols. +//! +//! ## General Usage +//! +//! [`RequestResponse`] is a `NetworkBehaviour` that implements a generic +//! request/response protocol whereby each request is sent over a new +//! substream on a connection. `RequestResponse` is generic over the +//! actual messages being sent, which are defined in terms of a +//! [`RequestResponseCodec`]. Creating a request/response protocol thus amounts +//! to providing an implementation of this trait that can be +//! given to [`RequestResponse::new`]. Further configuration is available +//! via the [`RequestResponseConfig`]. +//! +//! Requests are sent using [`RequestResponse::send_request`] and the +//! responses received as [`RequestResponseMessage::Response`] via +//! [`RequestResponseEvent::Message`]. +//! +//! Responses are sent using [`RequestResponse::send_response`] upon +//! receiving a [`RequestResponseMessage::Request`] via +//! [`RequestResponseEvent::Message`]. +//! +//! ## One-Way Protocols +//! +//! The implementation supports one-way protocols that do not +//! have responses. In these cases the [`RequestResponseCodec::Response`] can +//! be defined as `()` and [`RequestResponseCodec::read_response`] as well as +//! [`RequestResponseCodec::write_response`] given the obvious implementations. +//! Note that `RequestResponseMessage::Response` will still be emitted, +//! immediately after the request has been sent, since `RequestResponseCodec::read_response` +//! will not actually read anything from the given I/O stream. +//! [`RequestResponse::send_response`] need not be called for one-way protocols. + +use bytes::Bytes; +use futures::{ + future::BoxFuture, + prelude::*, + stream::FuturesUnordered +}; +use libp2p_core::{ + ConnectedPoint, + Multiaddr, + PeerId, + ProtocolName, + connection::ConnectionId, + upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}, +}; +use libp2p_swarm::{ + DialPeerCondition, + NegotiatedSubstream, + NetworkBehaviour, + NetworkBehaviourAction, + NotifyHandler, + OneShotEvent, + OneShotHandler, + OneShotHandlerConfig, + OneShotOutboundInfo, + PollParameters, + SubstreamProtocol +}; +use smallvec::SmallVec; +use std::{ + collections::{VecDeque, HashMap}, + io, + iter, + time::Duration, + task::{Context, Poll} +}; + +/// An inbound request or response. +#[derive(Debug)] +pub enum RequestResponseMessage { + /// A request message. + Request { + /// The request message. + request: TRequest, + /// The sender of the request who is awaiting a response. + /// + /// See [`RequestResponse::send_response`]. + channel: ResponseChannel, + }, + /// A response message. + Response { + /// The ID of the request that produced this response. + /// + /// See [`RequestResponse::send_request`]. + request_id: RequestId, + /// The response message. + response: TResponse + }, +} + +/// The events emitted by a [`RequestResponse`] protocol. +#[derive(Debug)] +pub enum RequestResponseEvent { + /// An incoming message (request or response). + Message { + /// The peer who sent the message. + peer: PeerId, + /// The incoming message. + message: RequestResponseMessage + }, + /// An outbound request failed. + RequestFailure { + /// The peer to whom the request was sent. + peer: PeerId, + /// The (local) ID of the failed request. + request_id: RequestId, + /// The error that occurred. + error: RequestFailure + }, + /// Sending of a response to an incoming request failed. + /// + /// See [`RequestResponse::send_response`]. + ResponseFailure { + /// The error that occurred. + error: io::Error + }, +} + +/// Possible request failures. +#[derive(Debug)] +pub enum RequestFailure { + /// The request could not be sent because a dialing attempt failed. + DialFailure, + /// The request timed out before receiving a response. + Timeout, + /// The connection closed before a response was received. + /// + /// It is not known whether the request may have been + /// received (and processed) by the remote peer. + ConnectionClosed, +} + +/// A channel for sending a response to an inbound request. +/// +/// See [`RequestResponse::send_response`]. +#[derive(Debug)] +pub struct ResponseChannel(NegotiatedSubstream); + +/// The ID of an outgoing request. +/// +/// See [`RequestResponse::send_request`]. +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] +pub struct RequestId(u64); + +/// The configuration for an `RequestResponse`. +#[derive(Debug, Clone)] +pub struct RequestResponseConfig { + protocol: Bytes, + request_timeout: Duration, + connection_keep_alive: Duration, +} + +impl RequestResponseConfig { + /// Creates a new `RequestResponseConfig` for the given protocol. + pub fn new(protocol: impl ProtocolName) -> Self { + Self { + protocol: Bytes::copy_from_slice(protocol.protocol_name()), + connection_keep_alive: Duration::from_secs(10), + request_timeout: Duration::from_secs(10), + } + } + + /// Sets the keep-alive timeout of idle connections. + pub fn set_connection_keep_alive(&mut self, v: Duration) -> &mut Self { + self.connection_keep_alive = v; + self + } + + /// Sets the request timeout for outbound requests. + pub fn set_request_timeout(&mut self, v: Duration) -> &mut Self { + self.request_timeout = v; + self + } +} + +/// A request/response protocol for some message codec. +pub struct RequestResponse +where + TCodec: RequestResponseCodec, +{ + /// The next (local) request ID. + next_request_id: RequestId, + /// The protocol configuration. + config: RequestResponseConfig, + /// The protocol codec for reading and writing requests and responses. + codec: TCodec, + /// Pending futures for outgoing responses. + responding: FuturesUnordered>>, + /// Pending events to return from `poll`. + pending_events: VecDeque< + NetworkBehaviourAction< + RequestProtocol, + RequestResponseEvent>>, + /// The currently connected peers and their known, reachable addresses, if any. + connected: HashMap>, + /// Externally managed addresses via `add_address` and `remove_address`. + addresses: HashMap>, + /// Requests that have not yet been sent and are waiting for a connection + /// to be established. + pending_requests: HashMap; 10]>>, + /// Responses that have not yet been received. + pending_responses: HashMap, +} + +impl RequestResponse +where + TCodec: RequestResponseCodec + Clone, +{ + /// Creates a new `RequestResponse` protocol for the given codec and configuration. + pub fn new(cfg: RequestResponseConfig, codec: TCodec) -> Self { + RequestResponse { + next_request_id: RequestId(1), + config: cfg, + codec, + responding: FuturesUnordered::new(), + pending_events: VecDeque::new(), + connected: HashMap::new(), + pending_requests: HashMap::new(), + pending_responses: HashMap::new(), + addresses: HashMap::new(), + } + } + + /// Initiates sending a request. + /// + /// If the targeted peer is currently not connected, a dialing + /// attempt is initiated and the request is sent as soon as a + /// connection is established. + /// + /// > **Note**: In order for such a dialing attempt to succeed, + /// > the `RequestResonse` protocol must either be embedded + /// > in another `NetworkBehaviour` that provides peer and + /// > address discovery, or known addresses of peers must be + /// > managed via [`RequestResponse::add_address`] and + /// > [`RequestResponse::remove_address`]. + pub fn send_request(&mut self, peer: &PeerId, request: TCodec::Request) -> RequestId { + let request_id = self.next_request_id(); + let request = RequestProtocol { + request_id, + codec: self.codec.clone(), + protocol: self.config.protocol.clone(), + request, + }; + + if let Some(request) = self.try_send_request(peer, request) { + self.pending_events.push_back(NetworkBehaviourAction::DialPeer { + peer_id: peer.clone(), + condition: DialPeerCondition::Disconnected, + }); + self.pending_requests.entry(peer.clone()).or_default().push(request); + } + + request_id + } + + /// Initiates sending a response to an inbound request. + /// + /// The provided `ResponseChannel` is obtained from a + /// [`RequestResponseMessage::Request`]. + pub fn send_response(&mut self, mut channel: ResponseChannel, response: TCodec::Response) + where + TCodec: RequestResponseCodec + Send + Sync + 'static, + TCodec::Response: Send + 'static, + { + let mut codec = self.codec.clone(); + self.responding.push(async move { + codec.write_response(&mut channel.0, response).await + }.boxed()); + } + + /// Adds a known address for a peer that can be used for + /// dialing attempts by the `Swarm`, i.e. is returned + /// by [`NetworkBehaviour::addresses_of_peer`]. + /// + /// Addresses added in this way are only removed by `remove_address`. + pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr) { + self.addresses.entry(peer.clone()).or_default().push(address); + } + + /// Removes an address of a peer previously added via `add_address`. + pub fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) { + let mut last = false; + if let Some(addresses) = self.addresses.get_mut(peer) { + addresses.retain(|a| a != address); + last = addresses.is_empty(); + } + if last { + self.addresses.remove(peer); + } + } + + /// Checks whether a peer is currently connected. + pub fn is_connected(&self, peer: &PeerId) -> bool { + self.connected.contains_key(peer) + } + + /// Returns the next request ID. + fn next_request_id(&mut self) -> RequestId { + let request_id = self.next_request_id; + self.next_request_id.0 += 1; + request_id + } + + /// Tries to send a request by queueing an appropriate event to be + /// emitted to the `Swarm`. If the peer is not currently connected, + /// the given request is return unchanged. + fn try_send_request(&mut self, peer: &PeerId, request: RequestProtocol) + -> Option> + { + if let Some(connections) = self.connected.get(peer) { + let ix = (request.request_id.0 as usize) % connections.len(); + let conn = connections[ix].id; + self.pending_responses.insert(request.request_id, (peer.clone(), conn)); + self.pending_events.push_back(NetworkBehaviourAction::NotifyHandler { + peer_id: peer.clone(), + handler: NotifyHandler::One(conn), + event: request + }); + None + } else { + Some(request) + } + } +} + +impl NetworkBehaviour for RequestResponse +where + TCodec: RequestResponseCodec + Send + Sync + Clone + 'static, + TCodec::Response: Send, + TCodec::Request: Send, +{ + type ProtocolsHandler = OneShotHandler< + ResponseProtocol, + RequestProtocol, + RequestResponseMessage, + RequestId, + >; + + type OutEvent = RequestResponseEvent; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + let inbound = SubstreamProtocol::new(ResponseProtocol { + protocol: self.config.protocol.clone(), + codec: self.codec.clone(), + }).with_timeout(self.config.request_timeout); + OneShotHandler::new(inbound, OneShotHandlerConfig { + keep_alive_timeout: self.config.connection_keep_alive, + outbound_substream_timeout: self.config.request_timeout, + }) + } + + fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec { + let mut addresses = Vec::new(); + if let Some(connections) = self.connected.get(peer) { + addresses.extend(connections.iter().filter_map(|c| c.address.clone())) + } + if let Some(more) = self.addresses.get(peer) { + addresses.extend(more.into_iter().cloned()); + } + addresses + } + + fn inject_connected(&mut self, peer: &PeerId) { + if let Some(pending) = self.pending_requests.remove(peer) { + for request in pending { + let request = self.try_send_request(peer, request); + assert!(request.is_none()); + } + } + } + + fn inject_connection_established(&mut self, peer: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) { + let address = match endpoint { + ConnectedPoint::Dialer { address } => Some(address.clone()), + ConnectedPoint::Listener { .. } => None + }; + let connections = self.connected.entry(peer.clone()).or_default(); + connections.push(Connection { id: *conn, address }) + } + + fn inject_connection_closed(&mut self, peer: &PeerId, conn: &ConnectionId, _: &ConnectedPoint) { + if let Some(connections) = self.connected.get_mut(peer) { + if let Some(pos) = connections.iter().position(|c| &c.id == conn) { + connections.remove(pos); + } + } + + // Any pending responses of requests sent over this connection + // must be considered failed. + let failed = self.pending_responses.iter() + .filter_map(|(r, (p, c))| + if conn == c { + Some((p.clone(), *r)) + } else { + None + }) + .collect::>(); + + for (peer, request_id) in failed { + self.pending_responses.remove(&request_id); + self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent( + RequestResponseEvent::RequestFailure { + peer, + request_id, + error: RequestFailure::ConnectionClosed + } + )); + } + } + + fn inject_disconnected(&mut self, peer: &PeerId) { + self.connected.remove(peer); + } + + fn inject_dial_failure(&mut self, peer: &PeerId) { + // If there are pending outgoing requests when a dial failure occurs, + // it is implied that we are not connected to the peer, since pending + // outgoing requests are drained when a connection is established and + // only created when a peer is not connected when a request is made. + // Thus these requests must be considered failed, even if there is + // another, concurrent dialing attempt ongoing. + if let Some(pending) = self.pending_requests.remove(peer) { + for request in pending { + self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent( + RequestResponseEvent::RequestFailure { + peer: peer.clone(), + request_id: request.request_id, + error: RequestFailure::DialFailure + } + )); + } + } + } + + fn inject_event( + &mut self, + peer: PeerId, + _conn: ConnectionId, + event: OneShotEvent, RequestId> + ) { + match event { + OneShotEvent::Success(message) => { + if let RequestResponseMessage::Response { request_id, .. } = &message { + self.pending_responses.remove(request_id); + } + self.pending_events.push_back( + NetworkBehaviourAction::GenerateEvent( + RequestResponseEvent::Message { peer, message })); + } + OneShotEvent::Timeout(request_id) => { + if let Some((peer, _conn)) = self.pending_responses.remove(&request_id) { + self.pending_events.push_back( + NetworkBehaviourAction::GenerateEvent( + RequestResponseEvent::RequestFailure { + peer, + request_id, + error: RequestFailure::Timeout, + })); + } + } + } + } + + fn poll(&mut self, cx: &mut Context, _: &mut impl PollParameters) + -> Poll, + RequestResponseEvent + >> + { + if let Some(ev) = self.pending_events.pop_front() { + return Poll::Ready(ev); + } + + while let Poll::Ready(Some(result)) = self.responding.poll_next_unpin(cx) { + if let Err(error) = result { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent( + RequestResponseEvent::ResponseFailure { error } + )); + } + } + + Poll::Pending + } +} + +/// Response substream upgrade protocol. +#[derive(Debug, Clone)] +pub struct ResponseProtocol { + protocol: Bytes, + codec: TCodec, +} + +impl UpgradeInfo for ResponseProtocol { + type Info = Bytes; + type InfoIter = iter::Once; + + fn protocol_info(&self) -> Self::InfoIter { + iter::once(self.protocol.clone()) + } +} + +impl InboundUpgrade for ResponseProtocol +where + TCodec: RequestResponseCodec + Send + Sync + 'static, +{ + type Output = RequestResponseMessage; + type Error = io::Error; + type Future = BoxFuture<'static, Result>; + + fn upgrade_inbound(mut self, mut io: NegotiatedSubstream, _: Self::Info) -> Self::Future { + async move { + let request = self.codec.read_request(&mut io).await?; + Ok(RequestResponseMessage::Request { request, channel: ResponseChannel(io) }) + }.boxed() + } +} + +/// Request substream upgrade protocol. +#[derive(Debug, Clone)] +pub struct RequestProtocol +where + TCodec: RequestResponseCodec +{ + codec: TCodec, + protocol: Bytes, + request: TCodec::Request, + request_id: RequestId, +} + +impl OneShotOutboundInfo for RequestProtocol +where + TCodec: RequestResponseCodec +{ + fn info(&self) -> RequestId { + self.request_id + } +} + +impl UpgradeInfo for RequestProtocol +where + TCodec: RequestResponseCodec +{ + type Info = Bytes; + type InfoIter = iter::Once; + + fn protocol_info(&self) -> Self::InfoIter { + iter::once(self.protocol.clone()) + } +} + +impl OutboundUpgrade for RequestProtocol +where + TCodec: RequestResponseCodec + Send + Sync + 'static, + TCodec::Request: Send + 'static, +{ + type Output = RequestResponseMessage; + type Error = io::Error; + type Future = BoxFuture<'static, Result>; + + fn upgrade_outbound(mut self, mut io: NegotiatedSubstream, _: Self::Info) -> Self::Future { + async move { + self.codec.write_request(&mut io, self.request).await?; + let response = self.codec.read_response(&mut io).await?; + Ok(RequestResponseMessage::Response { request_id: self.request_id, response }) + }.boxed() + } +} + +/// An `RequestResponseCodec` defines the request and response types +/// for a [`RequestResponse`] protocol and how they are encoded / decoded +/// to / from an I/O stream. +pub trait RequestResponseCodec { + type Request; + type Response; + + fn read_request<'a, T>(&mut self, io: &'a mut T) + -> BoxFuture<'a, Result> + where + T: AsyncRead + Unpin + Send; + + fn read_response<'a, T>(&mut self, io: &'a mut T) + -> BoxFuture<'a, Result> + where + T: AsyncRead + Unpin + Send; + + fn write_request<'a, T>(&mut self, io: &'a mut T, req: Self::Request) + -> BoxFuture<'a, Result<(), io::Error>> + where + T: AsyncWrite + Unpin + Send; + + fn write_response<'a, T>(&mut self, io: &'a mut T, res: Self::Response) + -> BoxFuture<'a, Result<(), io::Error>> + where + T: AsyncWrite + Unpin + Send; +} + +/// Internal information tracked for an established connection. +struct Connection { + id: ConnectionId, + address: Option, +} diff --git a/protocols/request-response/tests/ping.rs b/protocols/request-response/tests/ping.rs new file mode 100644 index 00000000000..3ebebdd08ab --- /dev/null +++ b/protocols/request-response/tests/ping.rs @@ -0,0 +1,181 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. +//! Integration tests for the `Ping` network behaviour. + +use libp2p_core::{ + Multiaddr, + PeerId, + identity, + muxing::StreamMuxerBox, + transport::{Transport, boxed::Boxed}, + upgrade::{self, read_one, write_one} +}; +use libp2p_noise::{NoiseConfig, X25519Spec, Keypair}; +use libp2p_request_response::*; +use libp2p_swarm::Swarm; +use libp2p_tcp::TcpConfig; +use futures::{prelude::*, channel::mpsc, future::BoxFuture}; +use rand::{self, Rng}; +use std::io; + +/// Sends a ping and expects a pong response. +#[test] +fn ping() { + let num_pings: u8 = rand::thread_rng().gen_range(1, 100); + + let ping = Ping("ping".to_string().into_bytes()); + let pong = Pong("pong".to_string().into_bytes()); + + let cfg = RequestResponseConfig::new("/ping/1".to_string()); + + let (peer1_id, trans) = mk_transport(); + let ping_proto1 = RequestResponse::new(cfg.clone(), PingCodec()); + let mut swarm1 = Swarm::new(trans, ping_proto1, peer1_id.clone()); + + let (peer2_id, trans) = mk_transport(); + let ping_proto2 = RequestResponse::new(cfg, PingCodec()); + let mut swarm2 = Swarm::new(trans, ping_proto2, peer2_id.clone()); + + let (mut tx, mut rx) = mpsc::channel::(1); + + let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap(); + Swarm::listen_on(&mut swarm1, addr).unwrap(); + + let expected_ping = ping.clone(); + let expected_pong = pong.clone(); + + let peer1 = async move { + while let Some(_) = swarm1.next().now_or_never() {} + + let l = Swarm::listeners(&swarm1).next().unwrap(); + tx.send(l.clone()).await.unwrap(); + + loop { + match swarm1.next().await { + RequestResponseEvent::Message { + peer, + message: RequestResponseMessage::Request { request, channel } + } => { + assert_eq!(&request, &expected_ping); + assert_eq!(&peer, &peer2_id); + swarm1.send_response(channel, pong.clone()); + }, + e => panic!("Unexpected event: {:?}", e) + } + } + }; + + let peer2 = async move { + let mut count = 0; + let addr = rx.next().await.unwrap(); + swarm2.add_address(&peer1_id, addr.clone()); + let mut req_id = swarm2.send_request(&peer1_id, ping.clone()); + + loop { + match swarm2.next().await { + RequestResponseEvent::Message { + peer, + message: RequestResponseMessage::Response { request_id, response } + } => { + count += 1; + assert_eq!(&response, &expected_pong); + assert_eq!(&peer, &peer1_id); + assert_eq!(req_id, request_id); + if count >= num_pings { + return + } else { + req_id = swarm2.send_request(&peer1_id, ping.clone()); + } + }, + e => panic!("Unexpected event: {:?}", e) + } + } + }; + + async_std::task::spawn(Box::pin(peer1)); + let () = async_std::task::block_on(peer2); +} + +fn mk_transport() -> (PeerId, Boxed<(PeerId, StreamMuxerBox), io::Error>) { + let id_keys = identity::Keypair::generate_ed25519(); + let peer_id = id_keys.public().into_peer_id(); + let noise_keys = Keypair::::new().into_authentic(&id_keys).unwrap(); + let transport = TcpConfig::new() + .nodelay(true) + .upgrade(upgrade::Version::V1) + .authenticate(NoiseConfig::xx(noise_keys).into_authenticated()) + .multiplex(libp2p_yamux::Config::default()) + .map(|(peer, muxer), _| (peer, StreamMuxerBox::new(muxer))) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err)) + .boxed(); + (peer_id, transport) +} + +// Simple Ping-Pong Protocol + +#[derive(Clone)] +struct PingCodec(); +#[derive(Debug, Clone, PartialEq, Eq)] +struct Ping(Vec); +#[derive(Debug, Clone, PartialEq, Eq)] +struct Pong(Vec); + +impl RequestResponseCodec for PingCodec { + type Request = Ping; + type Response = Pong; + + fn read_request<'a, T>(&mut self, io: &'a mut T) -> BoxFuture<'a, Result> + where + T: AsyncRead + Unpin + Send + { + read_one(io, 1024) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) + .and_then(|data| future::ready(Ok(Ping(data)))) + .boxed() + } + + fn read_response<'a, T>(&mut self, io: &'a mut T) + -> BoxFuture<'a, Result> + where + T: AsyncRead + Unpin + Send + { + read_one(io, 1024) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) + .and_then(|data| future::ready(Ok(Pong(data)))) + .boxed() + } + + fn write_request<'a, T>(&mut self, io: &'a mut T, Ping(data): Ping) + -> BoxFuture<'a, Result<(), io::Error>> + where + T: AsyncWrite + Unpin + Send + { + write_one(io, data).boxed() + } + + fn write_response<'a, T>(&mut self, io: &'a mut T, Pong(data): Pong) + -> BoxFuture<'a, Result<(), io::Error>> + where + T: AsyncWrite + Unpin + Send + { + write_one(io, data).boxed() + } +} + diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index f848f64f9d3..38336a8fc04 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -76,8 +76,10 @@ pub use protocols_handler::{ ProtocolsHandlerEvent, ProtocolsHandlerSelect, ProtocolsHandlerUpgrErr, + OneShotEvent, OneShotHandler, OneShotHandlerConfig, + OneShotOutboundInfo, SubstreamProtocol }; diff --git a/swarm/src/protocols_handler.rs b/swarm/src/protocols_handler.rs index 95722ed9388..a90689a3bbd 100644 --- a/swarm/src/protocols_handler.rs +++ b/swarm/src/protocols_handler.rs @@ -63,7 +63,7 @@ pub use dummy::DummyProtocolsHandler; pub use map_in::MapInEvent; pub use map_out::MapOutEvent; pub use node_handler::{NodeHandlerWrapper, NodeHandlerWrapperBuilder, NodeHandlerWrapperError}; -pub use one_shot::{OneShotHandler, OneShotHandlerConfig}; +pub use one_shot::{OneShotHandler, OneShotHandlerConfig, OneShotEvent, OneShotOutboundInfo}; pub use select::{IntoProtocolsHandlerSelect, ProtocolsHandlerSelect}; /// A handler for a set of protocols used on a connection with a remote. @@ -236,7 +236,7 @@ pub struct SubstreamProtocol { } impl SubstreamProtocol { - /// Create a new `ListenProtocol` from the given upgrade. + /// Create a new `SubstreamProtocol` from the given upgrade. /// /// The default timeout for applying the given upgrade on a substream is /// 10 seconds. diff --git a/swarm/src/protocols_handler/one_shot.rs b/swarm/src/protocols_handler/one_shot.rs index 92d63088895..7575a1b6059 100644 --- a/swarm/src/protocols_handler/one_shot.rs +++ b/swarm/src/protocols_handler/one_shot.rs @@ -31,23 +31,20 @@ use smallvec::SmallVec; use std::{error, task::Context, task::Poll, time::Duration}; use wasm_timer::Instant; -/// Implementation of `ProtocolsHandler` that opens a new substream for each individual message. -/// -/// This struct is meant to be a helper for other implementations to use. +/// A `ProtocolsHandler` that opens a new substream for each request. // TODO: Debug -pub struct OneShotHandler +pub struct OneShotHandler where - TOutProto: OutboundUpgradeSend, + TOutbound: OutboundUpgradeSend, { /// The upgrade for inbound substreams. - listen_protocol: SubstreamProtocol, + listen_protocol: SubstreamProtocol, /// If `Some`, something bad happened and we should shut down the handler with an error. - pending_error: - Option::Error>>, + pending_error: Option<(ProtocolsHandlerUpgrErr<::Error>, TInfo)>, /// Queue of events to produce in `poll()`. - events_out: SmallVec<[TOutEvent; 4]>, + events_out: SmallVec<[TEvent; 4]>, /// Queue of outbound substreams to open. - dial_queue: SmallVec<[TOutProto; 4]>, + dial_queue: SmallVec<[TOutbound; 4]>, /// Current number of concurrent outbound substreams being opened. dial_negotiated: u32, /// Maximum number of concurrent outbound substreams being opened. Value is never modified. @@ -58,15 +55,14 @@ where config: OneShotHandlerConfig, } -impl - OneShotHandler +impl + OneShotHandler where - TOutProto: OutboundUpgradeSend, + TOutbound: OutboundUpgradeSend, { /// Creates a `OneShotHandler`. - #[inline] pub fn new( - listen_protocol: SubstreamProtocol, + listen_protocol: SubstreamProtocol, config: OneShotHandlerConfig, ) -> Self { OneShotHandler { @@ -77,12 +73,11 @@ where dial_negotiated: 0, max_dial_negotiated: 8, keep_alive: KeepAlive::Yes, - config + config, } } /// Returns the number of pending requests. - #[inline] pub fn pending_requests(&self) -> u32 { self.dial_negotiated + self.dial_queue.len() as u32 } @@ -91,8 +86,7 @@ where /// /// > **Note**: If you modify the protocol, modifications will only applies to future inbound /// > substreams, not the ones already being negotiated. - #[inline] - pub fn listen_protocol_ref(&self) -> &SubstreamProtocol { + pub fn listen_protocol_ref(&self) -> &SubstreamProtocol { &self.listen_protocol } @@ -100,26 +94,23 @@ where /// /// > **Note**: If you modify the protocol, modifications will only applies to future inbound /// > substreams, not the ones already being negotiated. - #[inline] - pub fn listen_protocol_mut(&mut self) -> &mut SubstreamProtocol { + pub fn listen_protocol_mut(&mut self) -> &mut SubstreamProtocol { &mut self.listen_protocol } /// Opens an outbound substream with `upgrade`. - #[inline] - pub fn send_request(&mut self, upgrade: TOutProto) { + pub fn send_request(&mut self, upgrade: TOutbound) { self.keep_alive = KeepAlive::Yes; self.dial_queue.push(upgrade); } } -impl Default - for OneShotHandler +impl Default + for OneShotHandler where - TOutProto: OutboundUpgradeSend, - TInProto: InboundUpgradeSend + Default, + TOutbound: OutboundUpgradeSend, + TInbound: InboundUpgradeSend + Default, { - #[inline] fn default() -> Self { OneShotHandler::new( SubstreamProtocol::new(Default::default()), @@ -128,45 +119,51 @@ where } } -impl ProtocolsHandler - for OneShotHandler +/// The events emitted by the [`OneShotHandler`]. +pub enum OneShotEvent { + /// An inbound or outbound upgrade succeeded. + Success(TEvent), + /// An outbound upgrade (i.e. request) timed out. + Timeout(TInfo), +} + +impl ProtocolsHandler + for OneShotHandler where - TInProto: InboundUpgradeSend + Send + 'static, - TOutProto: OutboundUpgradeSend, - TInProto::Output: Into, - TOutProto::Output: Into, - TOutProto::Error: error::Error + Send + 'static, - SubstreamProtocol: Clone, - TOutEvent: Send + 'static, + TInbound: InboundUpgradeSend + Send + 'static, + TOutbound: OutboundUpgradeSend + OneShotOutboundInfo, + TInbound::Output: Into, + TOutbound::Output: Into, + TOutbound::Error: error::Error + Send + 'static, + SubstreamProtocol: Clone, + TEvent: Send + 'static, + TInfo: Send + 'static, { - type InEvent = TOutProto; - type OutEvent = TOutEvent; + type InEvent = TOutbound; + type OutEvent = OneShotEvent; type Error = ProtocolsHandlerUpgrErr< ::Error, >; - type InboundProtocol = TInProto; - type OutboundProtocol = TOutProto; - type OutboundOpenInfo = (); + type InboundProtocol = TInbound; + type OutboundProtocol = TOutbound; + type OutboundOpenInfo = TInfo; - #[inline] fn listen_protocol(&self) -> SubstreamProtocol { self.listen_protocol.clone() } - #[inline] fn inject_fully_negotiated_inbound( &mut self, out: ::Output, ) { // If we're shutting down the connection for inactivity, reset the timeout. if !self.keep_alive.is_yes() { - self.keep_alive = KeepAlive::Until(Instant::now() + self.config.inactive_timeout); + self.keep_alive = KeepAlive::Until(Instant::now() + self.config.keep_alive_timeout); } self.events_out.push(out.into()); } - #[inline] fn inject_fully_negotiated_outbound( &mut self, out: ::Output, @@ -175,31 +172,28 @@ where self.dial_negotiated -= 1; if self.dial_negotiated == 0 && self.dial_queue.is_empty() { - self.keep_alive = KeepAlive::Until(Instant::now() + self.config.inactive_timeout); + self.keep_alive = KeepAlive::Until(Instant::now() + self.config.keep_alive_timeout); } self.events_out.push(out.into()); } - #[inline] fn inject_event(&mut self, event: Self::InEvent) { self.send_request(event); } - #[inline] fn inject_dial_upgrade_error( &mut self, - _: Self::OutboundOpenInfo, + info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr< ::Error, >, ) { if self.pending_error.is_none() { - self.pending_error = Some(error); + self.pending_error = Some((error, info)); } } - #[inline] fn connection_keep_alive(&self) -> KeepAlive { self.keep_alive } @@ -210,13 +204,19 @@ where ) -> Poll< ProtocolsHandlerEvent, > { - if let Some(err) = self.pending_error.take() { - return Poll::Ready(ProtocolsHandlerEvent::Close(err)); + if let Some((err, info)) = self.pending_error.take() { + if let ProtocolsHandlerUpgrErr::Timeout = &err { + return Poll::Ready(ProtocolsHandlerEvent::Custom( + OneShotEvent::Timeout(info) + )) + } else { + return Poll::Ready(ProtocolsHandlerEvent::Close(err)) + } } if !self.events_out.is_empty() { return Poll::Ready(ProtocolsHandlerEvent::Custom( - self.events_out.remove(0), + OneShotEvent::Success(self.events_out.remove(0)), )); } else { self.events_out.shrink_to_fit(); @@ -225,11 +225,13 @@ where if !self.dial_queue.is_empty() { if self.dial_negotiated < self.max_dial_negotiated { self.dial_negotiated += 1; + let upgrade = self.dial_queue.remove(0); + let info = upgrade.info(); return Poll::Ready( ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(self.dial_queue.remove(0)) - .with_timeout(self.config.substream_timeout), - info: (), + protocol: SubstreamProtocol::new(upgrade) + .with_timeout(self.config.outbound_substream_timeout), + info, }, ); } @@ -244,17 +246,33 @@ where /// Configuration parameters for the `OneShotHandler` #[derive(Debug)] pub struct OneShotHandlerConfig { - /// After the given duration has elapsed, an inactive connection will shutdown. - pub inactive_timeout: Duration, - /// Timeout duration for each newly opened outbound substream. - pub substream_timeout: Duration, + /// Keep-alive timeout for idle connections. + pub keep_alive_timeout: Duration, + /// Timeout for outbound substream upgrades. + pub outbound_substream_timeout: Duration, } impl Default for OneShotHandlerConfig { fn default() -> Self { - let inactive_timeout = Duration::from_secs(10); - let substream_timeout = Duration::from_secs(10); - OneShotHandlerConfig { inactive_timeout, substream_timeout } + OneShotHandlerConfig { + keep_alive_timeout: Duration::from_secs(10), + outbound_substream_timeout: Duration::from_secs(10), + } + } +} + +/// Information about an outgoing request that is returned +/// to the behaviour in case of a (non-fatal) request error. +/// +/// This trait must be implemented by the outbound upgrade +/// used with the `OneShotHandler`. +pub trait OneShotOutboundInfo { + fn info(&self) -> T; +} + +impl OneShotOutboundInfo<()> for T { + fn info(&self) -> () { + () } } From 521fe2d6221abb8a59b08a0ac6d228490a7973bf Mon Sep 17 00:00:00 2001 From: "Roman S. Borschel" Date: Sat, 30 May 2020 20:12:38 +0200 Subject: [PATCH 02/16] Fix OneShotHandler usage in floodsub. --- protocols/floodsub/src/layer.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/protocols/floodsub/src/layer.rs b/protocols/floodsub/src/layer.rs index 4bb6aa08cbf..12b44fda7d6 100644 --- a/protocols/floodsub/src/layer.rs +++ b/protocols/floodsub/src/layer.rs @@ -29,6 +29,7 @@ use libp2p_swarm::{ NetworkBehaviourAction, PollParameters, ProtocolsHandler, + OneShotEvent, OneShotHandler, NotifyHandler, DialPeerCondition, @@ -237,7 +238,7 @@ impl Floodsub { } impl NetworkBehaviour for Floodsub { - type ProtocolsHandler = OneShotHandler; + type ProtocolsHandler = OneShotHandler; type OutEvent = FloodsubEvent; fn new_handler(&mut self) -> Self::ProtocolsHandler { @@ -287,12 +288,12 @@ impl NetworkBehaviour for Floodsub { &mut self, propagation_source: PeerId, _connection: ConnectionId, - event: InnerMessage, + event: OneShotEvent, ) { - // We ignore successful sends event. + // We ignore successful sends or timeouts. let event = match event { - InnerMessage::Rx(event) => event, - InnerMessage::Sent => return, + OneShotEvent::Success(InnerMessage::Rx(event)) => event, + OneShotEvent::Success(InnerMessage::Sent) | OneShotEvent::Timeout(()) => return, }; // Update connected peers topics From 89ea70af66bda8a3834e773f5a8e7e595c1f7ecd Mon Sep 17 00:00:00 2001 From: "Roman S. Borschel" Date: Wed, 3 Jun 2020 17:58:07 +0200 Subject: [PATCH 03/16] Custom ProtocolsHandler and multiple protocols. 1. Implement a custom ProtocolsHandler instead of using the OneShotHandler for better control and error handling. In particular, all request/response sending/receiving is kept in the substreams upgrades and thus the background task of a connection. 2. Support multiple protocols (usually protocol versions) with a single `RequestResponse` instance, with configurable inbound/outbound support. --- protocols/request-response/Cargo.toml | 2 +- protocols/request-response/src/codec.rs | 64 +++ protocols/request-response/src/handler.rs | 326 +++++++++++++++ .../request-response/src/handler/protocol.rs | 161 ++++++++ protocols/request-response/src/lib.rs | 370 ++++++++---------- protocols/request-response/tests/ping.rs | 38 +- swarm/src/protocols_handler.rs | 10 +- swarm/src/protocols_handler/node_handler.rs | 17 +- 8 files changed, 765 insertions(+), 223 deletions(-) create mode 100644 protocols/request-response/src/codec.rs create mode 100644 protocols/request-response/src/handler.rs create mode 100644 protocols/request-response/src/handler/protocol.rs diff --git a/protocols/request-response/Cargo.toml b/protocols/request-response/Cargo.toml index f702f808772..81c4810c49b 100644 --- a/protocols/request-response/Cargo.toml +++ b/protocols/request-response/Cargo.toml @@ -10,11 +10,11 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] -bytes = "0.5" futures = "0.3.1" libp2p-core = { version = "0.19.0", path = "../../core" } libp2p-swarm = { version = "0.19.0", path = "../../swarm" } smallvec = "1.4" +wasm-timer = "0.2" [dev-dependencies] async-std = "< 1.6" diff --git a/protocols/request-response/src/codec.rs b/protocols/request-response/src/codec.rs new file mode 100644 index 00000000000..0a8660c7541 --- /dev/null +++ b/protocols/request-response/src/codec.rs @@ -0,0 +1,64 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +pub use libp2p_core::ProtocolName; + +use futures::{prelude::*, future::BoxFuture}; +use std::io; + +/// A `RequestResponseCodec` defines the request and response types +/// for a [`RequestResponse`](crate::RequestResponse) protocol or +/// protocol family and how they are encoded / decoded on an I/O stream. +pub trait RequestResponseCodec { + /// The type of protocol(s) or protocol versions being negotiated. + type Protocol: ProtocolName + Send + Sync + Clone; + /// The type of inbound and outbound requests. + type Request: Send + Clone; + /// The type of inbound and outbound responses. + type Response: Send; + + /// Reads a request from the given I/O stream according to the + /// negotiated protocol. + fn read_request<'a, T>(&mut self, protocol: &Self::Protocol, io: &'a mut T) + -> BoxFuture<'a, Result> + where + T: AsyncRead + Unpin + Send; + + /// Reads a response from the given I/O stream according to the + /// negotiated protocol. + fn read_response<'a, T>(&mut self, protocol: &Self::Protocol, io: &'a mut T) + -> BoxFuture<'a, Result> + where + T: AsyncRead + Unpin + Send; + + /// Writes a request to the given I/O stream according to the + /// negotiated protocol. + fn write_request<'a, T>(&mut self, protocol: &Self::Protocol, io: &'a mut T, req: Self::Request) + -> BoxFuture<'a, Result<(), io::Error>> + where + T: AsyncWrite + Unpin + Send; + + /// Writes a response to the given I/O stream according to the + /// negotiated protocol. + fn write_response<'a, T>(&mut self, protocol: &Self::Protocol, io: &'a mut T, res: Self::Response) + -> BoxFuture<'a, Result<(), io::Error>> + where + T: AsyncWrite + Unpin + Send; +} diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs new file mode 100644 index 00000000000..1cba9388606 --- /dev/null +++ b/protocols/request-response/src/handler.rs @@ -0,0 +1,326 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +mod protocol; + +use crate::{EMPTY_QUEUE_SHRINK_THRESHOLD, RequestId}; +use crate::codec::RequestResponseCodec; + +pub use protocol::{RequestProtocol, ResponseProtocol, ProtocolSupport}; + +use futures::{ + channel::oneshot, + future::BoxFuture, + prelude::*, + stream::FuturesUnordered +}; +use libp2p_core::{ + upgrade::{UpgradeError, NegotiationError}, +}; +use libp2p_swarm::{ + SubstreamProtocol, + protocols_handler::{ + KeepAlive, + ProtocolsHandler, + ProtocolsHandlerEvent, + ProtocolsHandlerUpgrErr, + } +}; +use smallvec::SmallVec; +use std::{ + collections::VecDeque, + io, + time::Duration, + task::{Context, Poll} +}; +use wasm_timer::Instant; + +/// A connection handler of a `RequestResponse` protocol. +#[doc(hidden)] +pub struct RequestResponseHandler +where + TCodec: RequestResponseCodec, +{ + /// The supported inbound protocols. + inbound_protocols: SmallVec<[TCodec::Protocol; 2]>, + /// The request/response message codec. + codec: TCodec, + /// The keep-alive timeout of idle connections. A connection is considered + /// idle if there are no outbound substreams. + keep_alive_timeout: Duration, + /// The timeout for inbound and outbound substreams (i.e. request + /// and response processing). + substream_timeout: Duration, + /// The current connection keep-alive. + keep_alive: KeepAlive, + /// A pending fatal error that results in the connection being closed. + pending_error: Option>, + /// Queue of events to emit in `poll()`. + pending_events: VecDeque>, + /// Outbound upgrades waiting to be emitted as an `OutboundSubstreamRequest`. + outbound: VecDeque>, + /// Inbound upgrades waiting for the incoming request. + inbound: FuturesUnordered), + oneshot::Canceled + >>>, +} + +impl RequestResponseHandler +where + TCodec: RequestResponseCodec, +{ + pub(super) fn new( + inbound_protocols: SmallVec<[TCodec::Protocol; 2]>, + codec: TCodec, + keep_alive_timeout: Duration, + substream_timeout: Duration, + ) -> Self { + Self { + inbound_protocols, + codec, + keep_alive: KeepAlive::Yes, + keep_alive_timeout, + substream_timeout, + outbound: VecDeque::new(), + inbound: FuturesUnordered::new(), + pending_events: VecDeque::new(), + pending_error: None, + } + } +} + +/// The events emitted by the [`RequestResponseHandler`]. +#[doc(hidden)] +pub enum RequestResponseHandlerEvent +where + TCodec: RequestResponseCodec +{ + /// An inbound request. + Request { + request: TCodec::Request, + sender: oneshot::Sender + }, + /// An inbound response. + Response { + request_id: RequestId, + response: TCodec::Response + }, + /// An outbound upgrade (i.e. request) timed out. + OutboundTimeout(RequestId), + /// An outbound request failed to negotiate a mutually supported protocol. + OutboundUnsupportedProtocols(RequestId), + /// An inbound request timed out. + InboundTimeout, + /// An inbound request failed to negotiate a mutually supported protocol. + InboundUnsupportedProtocols, +} + +impl ProtocolsHandler for RequestResponseHandler +where + TCodec: RequestResponseCodec + Send + Sync + Clone + 'static, +{ + type InEvent = RequestProtocol; + type OutEvent = RequestResponseHandlerEvent; + type Error = ProtocolsHandlerUpgrErr; + type InboundProtocol = ResponseProtocol; + type OutboundProtocol = RequestProtocol; + type OutboundOpenInfo = RequestId; + + fn listen_protocol(&self) -> SubstreamProtocol { + // A channel for notifying the handler when the inbound + // upgrade received the request. + let (rq_send, rq_recv) = oneshot::channel(); + + // A channel for notifying the inbound upgrade when the + // response is sent. + let (rs_send, rs_recv) = oneshot::channel(); + + // By keeping all I/O inside the `ResponseProtocol` and thus the + // inbound substream upgrade via above channels, we ensure that it + // is all subject to the configured timeout without extra bookkeeping + // for inbound substreams as well as their timeouts and also make the + // implementation of inbound and outbound upgrades symmetric in + // this sense. + let proto = ResponseProtocol { + protocols: self.inbound_protocols.clone(), + codec: self.codec.clone(), + request_sender: rq_send, + response_receiver: rs_recv, + }; + + // The handler waits for the request to come in. It then emits + // `RequestResponseHandlerEvent::Request` together with a + // `ResponseChannel`. + self.inbound.push(rq_recv.map_ok(move |rq| (rq, rs_send)).boxed()); + + SubstreamProtocol::new(proto).with_timeout(self.substream_timeout) + } + + fn inject_fully_negotiated_inbound( + &mut self, + (): (), + ) { + // Nothing to do, as the response has already been sent + // as part of the upgrade. + } + + fn inject_fully_negotiated_outbound( + &mut self, + response: TCodec::Response, + request_id: RequestId, + ) { + self.pending_events.push_back( + RequestResponseHandlerEvent::Response { + request_id, response + }); + } + + fn inject_event(&mut self, request: Self::InEvent) { + self.keep_alive = KeepAlive::Yes; + self.outbound.push_back(request); + } + + fn inject_dial_upgrade_error( + &mut self, + info: RequestId, + error: ProtocolsHandlerUpgrErr, + ) { + match error { + ProtocolsHandlerUpgrErr::Timeout => { + self.pending_events.push_back( + RequestResponseHandlerEvent::OutboundTimeout(info)); + } + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { + // The remote merely doesn't support the protocol(s) we requested. + // This is no reason to close the connection, which may + // successfully communicate with other protocols already. + // An event is reported to permit user code to react to the fact that + // the remote peer does not support the requested protocol(s). + self.pending_events.push_back( + RequestResponseHandlerEvent::OutboundUnsupportedProtocols(info)); + } + _ => { + // Anything else is considered a fatal error or misbehaviour of + // the remote peer and results in closing the connection. + self.pending_error = Some(error); + } + } + } + + fn inject_listen_upgrade_error( + &mut self, + error: ProtocolsHandlerUpgrErr + ) { + match error { + ProtocolsHandlerUpgrErr::Timeout => { + self.pending_events.push_back( + RequestResponseHandlerEvent::InboundTimeout); + } + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { + // The local peer merely doesn't support the protocol(s) requested. + // This is no reason to close the connection, which may + // successfully communicate with other protocols already. + // An event is reported to permit user code to react to the fact that + // the local peer does not support the requested protocol(s). + self.pending_events.push_back( + RequestResponseHandlerEvent::InboundUnsupportedProtocols); + } + _ => { + // Anything else is considered a fatal error or misbehaviour of + // the remote peer and results in closing the connection. + self.pending_error = Some(error); + } + } + } + + fn connection_keep_alive(&self) -> KeepAlive { + self.keep_alive + } + + fn poll( + &mut self, + cx: &mut Context, + ) -> Poll< + ProtocolsHandlerEvent, RequestId, Self::OutEvent, Self::Error>, + > { + // Check for a pending (fatal) error. + if let Some(err) = self.pending_error.take() { + // The handler will not be polled again by the `Swarm`. + return Poll::Ready(ProtocolsHandlerEvent::Close(err)) + } + + // Drain pending events. + if let Some(event) = self.pending_events.pop_front() { + return Poll::Ready(ProtocolsHandlerEvent::Custom(event)) + } else if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD { + self.pending_events.shrink_to_fit(); + } + + // Check for inbound requests. + while let Poll::Ready(Some(result)) = self.inbound.poll_next_unpin(cx) { + match result { + Ok((rq, rs_sender)) => { + // We received an inbound request. + self.keep_alive = KeepAlive::Yes; + return Poll::Ready(ProtocolsHandlerEvent::Custom( + RequestResponseHandlerEvent::Request { + request: rq, sender: rs_sender + })) + } + Err(oneshot::Canceled) => { + // The inbound upgrade has errored or timed out reading + // or waiting for the request. The handler is informed + // via `inject_listen_upgrade_error`. + } + } + } + + // Emit outbound requests. + if let Some(request) = self.outbound.pop_front() { + let info = request.request_id; + return Poll::Ready( + ProtocolsHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new(request) + .with_timeout(self.substream_timeout), + info, + }, + ) + } + + debug_assert!(self.outbound.is_empty()); + + if self.outbound.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD { + self.outbound.shrink_to_fit(); + } + + if self.inbound.is_empty() { + // No new inbound or outbound requests. However, we may just have + // started the latest inbound or outbound upgrade(s), so make sure + // the keep-alive timeout is preceded by the substream timeout. + let until = Instant::now() + self.substream_timeout + self.keep_alive_timeout; + self.keep_alive = KeepAlive::Until(until); + } + + Poll::Pending + } +} + diff --git a/protocols/request-response/src/handler/protocol.rs b/protocols/request-response/src/handler/protocol.rs new file mode 100644 index 00000000000..b546b5496ee --- /dev/null +++ b/protocols/request-response/src/handler/protocol.rs @@ -0,0 +1,161 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! The definition of a request/response protocol via inbound +//! and outbound substream upgrades. The inbound upgrade +//! receives a request and sends a response, whereas the +//! outbound upgrade send a request and receives a response. + +use crate::RequestId; +use crate::codec::RequestResponseCodec; + +use futures::{ + channel::oneshot, + future::BoxFuture, + prelude::*, +}; +use libp2p_core::{ + upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}, +}; +use libp2p_swarm::{ + NegotiatedSubstream, +}; +use smallvec::SmallVec; +use std::io; + +/// The level of support for a particular protocol. +#[derive(Debug, Clone)] +pub enum ProtocolSupport { + /// The protocol is only supported for inbound requests. + Inbound, + /// The protocol is only supported for outbound requests. + Outbound, + /// The protocol is supported for inbound and outbound requests. + Full +} + +impl ProtocolSupport { + /// Whether inbound requests are supported. + pub fn inbound(&self) -> bool { + match self { + ProtocolSupport::Inbound | ProtocolSupport::Full => true, + ProtocolSupport::Outbound => false, + } + } + + /// Whether outbound requests are supported. + pub fn outbound(&self) -> bool { + match self { + ProtocolSupport::Outbound | ProtocolSupport::Full => true, + ProtocolSupport::Inbound => false, + } + } +} + +/// Response substream upgrade protocol. +/// +/// Receives a request and sends a response. +#[derive(Debug)] +pub struct ResponseProtocol +where + TCodec: RequestResponseCodec +{ + pub(crate) codec: TCodec, + pub(crate) protocols: SmallVec<[TCodec::Protocol; 2]>, + pub(crate) request_sender: oneshot::Sender, + pub(crate) response_receiver: oneshot::Receiver +} + +impl UpgradeInfo for ResponseProtocol +where + TCodec: RequestResponseCodec +{ + type Info = TCodec::Protocol; + type InfoIter = smallvec::IntoIter<[Self::Info; 2]>; + + fn protocol_info(&self) -> Self::InfoIter { + self.protocols.clone().into_iter() + } +} + +impl InboundUpgrade for ResponseProtocol +where + TCodec: RequestResponseCodec + Send + Sync + 'static, +{ + type Output = (); + type Error = io::Error; + type Future = BoxFuture<'static, Result>; + + fn upgrade_inbound(mut self, mut io: NegotiatedSubstream, protocol: Self::Info) -> Self::Future { + async move { + let request = self.codec.read_request(&protocol, &mut io).await?; + if let Ok(()) = self.request_sender.send(request) { + if let Ok(response) = self.response_receiver.await { + self.codec.write_response(&protocol, &mut io, response).await?; + } + } + Ok(()) + }.boxed() + } +} + +/// Request substream upgrade protocol. +/// +/// Sends a request and receives a response. +#[derive(Debug, Clone)] +pub struct RequestProtocol +where + TCodec: RequestResponseCodec +{ + pub(crate) codec: TCodec, + pub(crate) protocols: SmallVec<[TCodec::Protocol; 2]>, + pub(crate) request_id: RequestId, + pub(crate) request: TCodec::Request, +} + +impl UpgradeInfo for RequestProtocol +where + TCodec: RequestResponseCodec +{ + type Info = TCodec::Protocol; + type InfoIter = smallvec::IntoIter<[Self::Info; 2]>; + + fn protocol_info(&self) -> Self::InfoIter { + self.protocols.clone().into_iter() + } +} + +impl OutboundUpgrade for RequestProtocol +where + TCodec: RequestResponseCodec + Send + Sync + 'static, +{ + type Output = TCodec::Response; + type Error = io::Error; + type Future = BoxFuture<'static, Result>; + + fn upgrade_outbound(mut self, mut io: NegotiatedSubstream, protocol: Self::Info) -> Self::Future { + async move { + self.codec.write_request(&protocol, &mut io, self.request).await?; + let response = self.codec.read_response(&protocol, &mut io).await?; + Ok(response) + }.boxed() + } +} + diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index 9d82abcb29a..f55c1e2edc1 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -23,13 +23,13 @@ //! ## General Usage //! //! [`RequestResponse`] is a `NetworkBehaviour` that implements a generic -//! request/response protocol whereby each request is sent over a new -//! substream on a connection. `RequestResponse` is generic over the -//! actual messages being sent, which are defined in terms of a +//! request/response protocol or protocol family, whereby each request is +//! sent over a new substream on a connection. `RequestResponse` is generic +//! over the actual messages being sent, which are defined in terms of a //! [`RequestResponseCodec`]. Creating a request/response protocol thus amounts -//! to providing an implementation of this trait that can be -//! given to [`RequestResponse::new`]. Further configuration is available -//! via the [`RequestResponseConfig`]. +//! to providing an implementation of this trait which can then be +//! given to [`RequestResponse::new`]. Further configuration options are +//! available via the [`RequestResponseConfig`]. //! //! Requests are sent using [`RequestResponse::send_request`] and the //! responses received as [`RequestResponseMessage::Response`] via @@ -39,6 +39,13 @@ //! receiving a [`RequestResponseMessage::Request`] via //! [`RequestResponseEvent::Message`]. //! +//! ## Protocol Families +//! +//! A single [`RequestResponse`] instance can be used with an entire +//! protocol family that share the same request and response types. +//! For that purpose, [`RequestResponseCodec::Protocol`] is typically +//! instantiated with a sum type. +//! //! ## One-Way Protocols //! //! The implementation supports one-way protocols that do not @@ -49,39 +56,47 @@ //! immediately after the request has been sent, since `RequestResponseCodec::read_response` //! will not actually read anything from the given I/O stream. //! [`RequestResponse::send_response`] need not be called for one-way protocols. +//! +//! ## Limited Protocol Support +//! +//! It is possible to only support inbound or outbound requests for +//! a particular protocol. This is achieved by instantiating `RequestResponse` +//! with protocols using [`ProtocolSupport::Inbound`] or +//! [`ProtocolSupport::Outbound`]. Any subset of protocols of a protocol +//! family can be configured in this way. Such protocols will not be +//! advertised during inbound respectively outbound protocol negotiation +//! on the substreams. + +pub mod codec; +pub mod handler; + +pub use codec::{RequestResponseCodec, ProtocolName}; +pub use handler::ProtocolSupport; -use bytes::Bytes; use futures::{ - future::BoxFuture, - prelude::*, - stream::FuturesUnordered + channel::oneshot, +}; +use handler::{ + RequestProtocol, + RequestResponseHandler, + RequestResponseHandlerEvent, }; use libp2p_core::{ ConnectedPoint, Multiaddr, PeerId, - ProtocolName, connection::ConnectionId, - upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}, }; use libp2p_swarm::{ DialPeerCondition, - NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, - OneShotEvent, - OneShotHandler, - OneShotHandlerConfig, - OneShotOutboundInfo, PollParameters, - SubstreamProtocol }; use smallvec::SmallVec; use std::{ collections::{VecDeque, HashMap}, - io, - iter, time::Duration, task::{Context, Poll} }; @@ -96,10 +111,10 @@ pub enum RequestResponseMessage { /// The sender of the request who is awaiting a response. /// /// See [`RequestResponse::send_response`]. - channel: ResponseChannel, + channel: ResponseChannel, }, /// A response message. - Response { + Response { /// The ID of the request that produced this response. /// /// See [`RequestResponse::send_request`]. @@ -120,74 +135,95 @@ pub enum RequestResponseEvent { message: RequestResponseMessage }, /// An outbound request failed. - RequestFailure { + OutboundFailure { /// The peer to whom the request was sent. peer: PeerId, /// The (local) ID of the failed request. request_id: RequestId, /// The error that occurred. - error: RequestFailure + error: OutboundFailure, }, - /// Sending of a response to an incoming request failed. - /// - /// See [`RequestResponse::send_response`]. - ResponseFailure { + /// An inbound request failed. + InboundFailure { + /// The peer from whom the request was received. + peer: PeerId, /// The error that occurred. - error: io::Error + error: InboundFailure, }, } -/// Possible request failures. +/// Possible failures occurring in the context of sending +/// an outbound request and receiving the response. #[derive(Debug)] -pub enum RequestFailure { +pub enum OutboundFailure { /// The request could not be sent because a dialing attempt failed. DialFailure, - /// The request timed out before receiving a response. + /// The request timed out before a response was received. + /// + /// It is not known whether the request may have been + /// received (and processed) by the remote peer. Timeout, /// The connection closed before a response was received. /// /// It is not known whether the request may have been /// received (and processed) by the remote peer. ConnectionClosed, + /// The remote supports none of the requested protocols. + UnsupportedProtocols, +} + +/// Possible failures occurring in the context of receiving an +/// inbound request and sending a response. +#[derive(Debug)] +pub enum InboundFailure { + /// The inbound request timed out, either while reading the + /// incoming request or before a response is sent, i.e. if + /// [`RequestResponse::send_response`] is not called in a + /// timely manner. + Timeout, + /// The local peer supports none of the requested protocols. + UnsupportedProtocols, } /// A channel for sending a response to an inbound request. /// /// See [`RequestResponse::send_response`]. #[derive(Debug)] -pub struct ResponseChannel(NegotiatedSubstream); +pub struct ResponseChannel { + peer: PeerId, + sender: oneshot::Sender, +} -/// The ID of an outgoing request. +/// The (local) ID of an outgoing request. /// /// See [`RequestResponse::send_request`]. #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] pub struct RequestId(u64); -/// The configuration for an `RequestResponse`. +/// The configuration for a `RequestResponse` protocol. #[derive(Debug, Clone)] pub struct RequestResponseConfig { - protocol: Bytes, request_timeout: Duration, connection_keep_alive: Duration, } -impl RequestResponseConfig { - /// Creates a new `RequestResponseConfig` for the given protocol. - pub fn new(protocol: impl ProtocolName) -> Self { +impl Default for RequestResponseConfig { + fn default() -> Self { Self { - protocol: Bytes::copy_from_slice(protocol.protocol_name()), connection_keep_alive: Duration::from_secs(10), request_timeout: Duration::from_secs(10), } } +} +impl RequestResponseConfig { /// Sets the keep-alive timeout of idle connections. pub fn set_connection_keep_alive(&mut self, v: Duration) -> &mut Self { self.connection_keep_alive = v; self } - /// Sets the request timeout for outbound requests. + /// Sets the timeout for inbound and outbound requests. pub fn set_request_timeout(&mut self, v: Duration) -> &mut Self { self.request_timeout = v; self @@ -199,14 +235,16 @@ pub struct RequestResponse where TCodec: RequestResponseCodec, { + /// The supported inbound protocols. + inbound_protocols: SmallVec<[TCodec::Protocol; 2]>, + /// The supported outbound protocols. + outbound_protocols: SmallVec<[TCodec::Protocol; 2]>, /// The next (local) request ID. next_request_id: RequestId, /// The protocol configuration. config: RequestResponseConfig, /// The protocol codec for reading and writing requests and responses. codec: TCodec, - /// Pending futures for outgoing responses. - responding: FuturesUnordered>>, /// Pending events to return from `poll`. pending_events: VecDeque< NetworkBehaviourAction< @@ -227,13 +265,28 @@ impl RequestResponse where TCodec: RequestResponseCodec + Clone, { - /// Creates a new `RequestResponse` protocol for the given codec and configuration. - pub fn new(cfg: RequestResponseConfig, codec: TCodec) -> Self { + /// Creates a new `RequestResponse` behaviour for the given + /// protocols, codec and configuration. + pub fn new(codec: TCodec, protocols: I, cfg: RequestResponseConfig) -> Self + where + I: IntoIterator + { + let mut inbound_protocols = SmallVec::new(); + let mut outbound_protocols = SmallVec::new(); + for (p, s) in protocols { + if s.inbound() { + inbound_protocols.push(p.clone()); + } + if s.outbound() { + outbound_protocols.push(p.clone()); + } + } RequestResponse { + inbound_protocols, + outbound_protocols, next_request_id: RequestId(1), config: cfg, codec, - responding: FuturesUnordered::new(), pending_events: VecDeque::new(), connected: HashMap::new(), pending_requests: HashMap::new(), @@ -259,7 +312,7 @@ where let request = RequestProtocol { request_id, codec: self.codec.clone(), - protocol: self.config.protocol.clone(), + protocols: self.outbound_protocols.clone(), request, }; @@ -278,15 +331,11 @@ where /// /// The provided `ResponseChannel` is obtained from a /// [`RequestResponseMessage::Request`]. - pub fn send_response(&mut self, mut channel: ResponseChannel, response: TCodec::Response) - where - TCodec: RequestResponseCodec + Send + Sync + 'static, - TCodec::Response: Send + 'static, - { - let mut codec = self.codec.clone(); - self.responding.push(async move { - codec.write_response(&mut channel.0, response).await - }.boxed()); + pub fn send_response(&mut self, ch: ResponseChannel, rs: TCodec::Response) { + // Fails only if the inbound upgrade timed out waiting for the response, + // in which case the handler emits `RequestResponseHandlerEvent::InboundTimeout` + // which in turn results in `RequestResponseEvent::InboundFailure`. + let _ = ch.sender.send(rs); } /// Adds a known address for a peer that can be used for @@ -347,27 +396,17 @@ where impl NetworkBehaviour for RequestResponse where TCodec: RequestResponseCodec + Send + Sync + Clone + 'static, - TCodec::Response: Send, - TCodec::Request: Send, { - type ProtocolsHandler = OneShotHandler< - ResponseProtocol, - RequestProtocol, - RequestResponseMessage, - RequestId, - >; - + type ProtocolsHandler = RequestResponseHandler; type OutEvent = RequestResponseEvent; fn new_handler(&mut self) -> Self::ProtocolsHandler { - let inbound = SubstreamProtocol::new(ResponseProtocol { - protocol: self.config.protocol.clone(), - codec: self.codec.clone(), - }).with_timeout(self.config.request_timeout); - OneShotHandler::new(inbound, OneShotHandlerConfig { - keep_alive_timeout: self.config.connection_keep_alive, - outbound_substream_timeout: self.config.request_timeout, - }) + RequestResponseHandler::new( + self.inbound_protocols.clone(), + self.codec.clone(), + self.config.connection_keep_alive, + self.config.request_timeout, + ) } fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec { @@ -420,10 +459,10 @@ where for (peer, request_id) in failed { self.pending_responses.remove(&request_id); self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent( - RequestResponseEvent::RequestFailure { + RequestResponseEvent::OutboundFailure { peer, request_id, - error: RequestFailure::ConnectionClosed + error: OutboundFailure::ConnectionClosed } )); } @@ -443,10 +482,10 @@ where if let Some(pending) = self.pending_requests.remove(peer) { for request in pending { self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent( - RequestResponseEvent::RequestFailure { + RequestResponseEvent::OutboundFailure { peer: peer.clone(), request_id: request.request_id, - error: RequestFailure::DialFailure + error: OutboundFailure::DialFailure } )); } @@ -456,33 +495,64 @@ where fn inject_event( &mut self, peer: PeerId, - _conn: ConnectionId, - event: OneShotEvent, RequestId> + _: ConnectionId, + event: RequestResponseHandlerEvent, ) { match event { - OneShotEvent::Success(message) => { - if let RequestResponseMessage::Response { request_id, .. } = &message { - self.pending_responses.remove(request_id); - } + RequestResponseHandlerEvent::Response { request_id, response } => { + self.pending_responses.remove(&request_id); + let message = RequestResponseMessage::Response { request_id, response }; self.pending_events.push_back( NetworkBehaviourAction::GenerateEvent( RequestResponseEvent::Message { peer, message })); } - OneShotEvent::Timeout(request_id) => { + RequestResponseHandlerEvent::Request { request, sender } => { + let channel = ResponseChannel { peer: peer.clone(), sender }; + let message = RequestResponseMessage::Request { request, channel }; + self.pending_events.push_back( + NetworkBehaviourAction::GenerateEvent( + RequestResponseEvent::Message { peer, message })); + } + RequestResponseHandlerEvent::OutboundTimeout(request_id) => { if let Some((peer, _conn)) = self.pending_responses.remove(&request_id) { self.pending_events.push_back( NetworkBehaviourAction::GenerateEvent( - RequestResponseEvent::RequestFailure { + RequestResponseEvent::OutboundFailure { peer, request_id, - error: RequestFailure::Timeout, + error: OutboundFailure::Timeout, })); } } + RequestResponseHandlerEvent::InboundTimeout => { + self.pending_events.push_back( + NetworkBehaviourAction::GenerateEvent( + RequestResponseEvent::InboundFailure { + peer, + error: InboundFailure::Timeout, + })); + } + RequestResponseHandlerEvent::OutboundUnsupportedProtocols(request_id) => { + self.pending_events.push_back( + NetworkBehaviourAction::GenerateEvent( + RequestResponseEvent::OutboundFailure { + peer, + request_id, + error: OutboundFailure::UnsupportedProtocols, + })); + } + RequestResponseHandlerEvent::InboundUnsupportedProtocols => { + self.pending_events.push_back( + NetworkBehaviourAction::GenerateEvent( + RequestResponseEvent::InboundFailure { + peer, + error: InboundFailure::UnsupportedProtocols, + })); + } } } - fn poll(&mut self, cx: &mut Context, _: &mut impl PollParameters) + fn poll(&mut self, _: &mut Context, _: &mut impl PollParameters) -> Poll, RequestResponseEvent @@ -490,133 +560,23 @@ where { if let Some(ev) = self.pending_events.pop_front() { return Poll::Ready(ev); - } - - while let Poll::Ready(Some(result)) = self.responding.poll_next_unpin(cx) { - if let Err(error) = result { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent( - RequestResponseEvent::ResponseFailure { error } - )); - } + } else if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD { + self.pending_events.shrink_to_fit(); } Poll::Pending } } -/// Response substream upgrade protocol. -#[derive(Debug, Clone)] -pub struct ResponseProtocol { - protocol: Bytes, - codec: TCodec, -} - -impl UpgradeInfo for ResponseProtocol { - type Info = Bytes; - type InfoIter = iter::Once; - - fn protocol_info(&self) -> Self::InfoIter { - iter::once(self.protocol.clone()) - } -} - -impl InboundUpgrade for ResponseProtocol -where - TCodec: RequestResponseCodec + Send + Sync + 'static, -{ - type Output = RequestResponseMessage; - type Error = io::Error; - type Future = BoxFuture<'static, Result>; - - fn upgrade_inbound(mut self, mut io: NegotiatedSubstream, _: Self::Info) -> Self::Future { - async move { - let request = self.codec.read_request(&mut io).await?; - Ok(RequestResponseMessage::Request { request, channel: ResponseChannel(io) }) - }.boxed() - } -} - -/// Request substream upgrade protocol. -#[derive(Debug, Clone)] -pub struct RequestProtocol -where - TCodec: RequestResponseCodec -{ - codec: TCodec, - protocol: Bytes, - request: TCodec::Request, - request_id: RequestId, -} - -impl OneShotOutboundInfo for RequestProtocol -where - TCodec: RequestResponseCodec -{ - fn info(&self) -> RequestId { - self.request_id - } -} - -impl UpgradeInfo for RequestProtocol -where - TCodec: RequestResponseCodec -{ - type Info = Bytes; - type InfoIter = iter::Once; - - fn protocol_info(&self) -> Self::InfoIter { - iter::once(self.protocol.clone()) - } -} - -impl OutboundUpgrade for RequestProtocol -where - TCodec: RequestResponseCodec + Send + Sync + 'static, - TCodec::Request: Send + 'static, -{ - type Output = RequestResponseMessage; - type Error = io::Error; - type Future = BoxFuture<'static, Result>; - - fn upgrade_outbound(mut self, mut io: NegotiatedSubstream, _: Self::Info) -> Self::Future { - async move { - self.codec.write_request(&mut io, self.request).await?; - let response = self.codec.read_response(&mut io).await?; - Ok(RequestResponseMessage::Response { request_id: self.request_id, response }) - }.boxed() - } -} - -/// An `RequestResponseCodec` defines the request and response types -/// for a [`RequestResponse`] protocol and how they are encoded / decoded -/// to / from an I/O stream. -pub trait RequestResponseCodec { - type Request; - type Response; - - fn read_request<'a, T>(&mut self, io: &'a mut T) - -> BoxFuture<'a, Result> - where - T: AsyncRead + Unpin + Send; - - fn read_response<'a, T>(&mut self, io: &'a mut T) - -> BoxFuture<'a, Result> - where - T: AsyncRead + Unpin + Send; - - fn write_request<'a, T>(&mut self, io: &'a mut T, req: Self::Request) - -> BoxFuture<'a, Result<(), io::Error>> - where - T: AsyncWrite + Unpin + Send; - - fn write_response<'a, T>(&mut self, io: &'a mut T, res: Self::Response) - -> BoxFuture<'a, Result<(), io::Error>> - where - T: AsyncWrite + Unpin + Send; -} +/// Internal threshold for when to shrink the capacity +/// of empty queues. If the capacity of an empty queue +/// exceeds this threshold, the associated memory is +/// released. +const EMPTY_QUEUE_SHRINK_THRESHOLD: usize = 100; /// Internal information tracked for an established connection. struct Connection { id: ConnectionId, address: Option, } + diff --git a/protocols/request-response/tests/ping.rs b/protocols/request-response/tests/ping.rs index 3ebebdd08ab..508514b1c84 100644 --- a/protocols/request-response/tests/ping.rs +++ b/protocols/request-response/tests/ping.rs @@ -17,7 +17,8 @@ // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -//! Integration tests for the `Ping` network behaviour. + +//! Integration tests for the `RequestResponse` network behaviour. use libp2p_core::{ Multiaddr, @@ -33,24 +34,25 @@ use libp2p_swarm::Swarm; use libp2p_tcp::TcpConfig; use futures::{prelude::*, channel::mpsc, future::BoxFuture}; use rand::{self, Rng}; -use std::io; +use std::{io, iter}; -/// Sends a ping and expects a pong response. +/// Exercises a simple ping protocol. #[test] -fn ping() { +fn ping_protocol() { let num_pings: u8 = rand::thread_rng().gen_range(1, 100); let ping = Ping("ping".to_string().into_bytes()); let pong = Pong("pong".to_string().into_bytes()); - let cfg = RequestResponseConfig::new("/ping/1".to_string()); + let protocols = iter::once((PingProtocol(), ProtocolSupport::Full)); + let cfg = RequestResponseConfig::default(); let (peer1_id, trans) = mk_transport(); - let ping_proto1 = RequestResponse::new(cfg.clone(), PingCodec()); + let ping_proto1 = RequestResponse::new(PingCodec(), protocols.clone(), cfg.clone()); let mut swarm1 = Swarm::new(trans, ping_proto1, peer1_id.clone()); let (peer2_id, trans) = mk_transport(); - let ping_proto2 = RequestResponse::new(cfg, PingCodec()); + let ping_proto2 = RequestResponse::new(PingCodec(), protocols, cfg); let mut swarm2 = Swarm::new(trans, ping_proto2, peer2_id.clone()); let (mut tx, mut rx) = mpsc::channel::(1); @@ -77,7 +79,7 @@ fn ping() { assert_eq!(&peer, &peer2_id); swarm1.send_response(channel, pong.clone()); }, - e => panic!("Unexpected event: {:?}", e) + e => panic!("Peer1: Unexpected event: {:?}", e) } } }; @@ -104,7 +106,7 @@ fn ping() { req_id = swarm2.send_request(&peer1_id, ping.clone()); } }, - e => panic!("Unexpected event: {:?}", e) + e => panic!("Peer2: Unexpected event: {:?}", e) } } }; @@ -130,6 +132,8 @@ fn mk_transport() -> (PeerId, Boxed<(PeerId, StreamMuxerBox), io::Error>) { // Simple Ping-Pong Protocol +#[derive(Debug, Clone)] +struct PingProtocol(); #[derive(Clone)] struct PingCodec(); #[derive(Debug, Clone, PartialEq, Eq)] @@ -137,11 +141,19 @@ struct Ping(Vec); #[derive(Debug, Clone, PartialEq, Eq)] struct Pong(Vec); +impl ProtocolName for PingProtocol { + fn protocol_name(&self) -> &[u8] { + "/ping/1".as_bytes() + } +} + impl RequestResponseCodec for PingCodec { + type Protocol = PingProtocol; type Request = Ping; type Response = Pong; - fn read_request<'a, T>(&mut self, io: &'a mut T) -> BoxFuture<'a, Result> + fn read_request<'a, T>(&mut self, _: &PingProtocol, io: &'a mut T) + -> BoxFuture<'a, Result> where T: AsyncRead + Unpin + Send { @@ -151,7 +163,7 @@ impl RequestResponseCodec for PingCodec { .boxed() } - fn read_response<'a, T>(&mut self, io: &'a mut T) + fn read_response<'a, T>(&mut self, _: &PingProtocol, io: &'a mut T) -> BoxFuture<'a, Result> where T: AsyncRead + Unpin + Send @@ -162,7 +174,7 @@ impl RequestResponseCodec for PingCodec { .boxed() } - fn write_request<'a, T>(&mut self, io: &'a mut T, Ping(data): Ping) + fn write_request<'a, T>(&mut self, _: &PingProtocol, io: &'a mut T, Ping(data): Ping) -> BoxFuture<'a, Result<(), io::Error>> where T: AsyncWrite + Unpin + Send @@ -170,7 +182,7 @@ impl RequestResponseCodec for PingCodec { write_one(io, data).boxed() } - fn write_response<'a, T>(&mut self, io: &'a mut T, Pong(data): Pong) + fn write_response<'a, T>(&mut self, _: &PingProtocol, io: &'a mut T, Pong(data): Pong) -> BoxFuture<'a, Result<(), io::Error>> where T: AsyncWrite + Unpin + Send diff --git a/swarm/src/protocols_handler.rs b/swarm/src/protocols_handler.rs index a90689a3bbd..057c0edfb69 100644 --- a/swarm/src/protocols_handler.rs +++ b/swarm/src/protocols_handler.rs @@ -140,7 +140,7 @@ pub trait ProtocolsHandler: Send + 'static { /// Injects an event coming from the outside in the handler. fn inject_event(&mut self, event: Self::InEvent); - /// Indicates to the handler that upgrading a substream to the given protocol has failed. + /// Indicates to the handler that upgrading an outbound substream to the given protocol has failed. fn inject_dial_upgrade_error( &mut self, info: Self::OutboundOpenInfo, @@ -149,6 +149,14 @@ pub trait ProtocolsHandler: Send + 'static { > ); + /// Indicates to the handler that upgrading an inbound substream to the given protocol has failed. + fn inject_listen_upgrade_error( + &mut self, + _: ProtocolsHandlerUpgrErr< + ::Error + > + ) {} + /// Returns until when the connection should be kept alive. /// /// This method is called by the `Swarm` after each invocation of diff --git a/swarm/src/protocols_handler/node_handler.rs b/swarm/src/protocols_handler/node_handler.rs index 8b04506170c..f9c7ea1ce03 100644 --- a/swarm/src/protocols_handler/node_handler.rs +++ b/swarm/src/protocols_handler/node_handler.rs @@ -228,15 +228,26 @@ where for n in (0..self.negotiating_in.len()).rev() { let (mut in_progress, mut timeout) = self.negotiating_in.swap_remove(n); match Future::poll(Pin::new(&mut timeout), cx) { - Poll::Ready(_) => continue, + Poll::Ready(Ok(_)) => { + let err = ProtocolsHandlerUpgrErr::Timeout; + self.handler.inject_listen_upgrade_error(err); + continue + } + Poll::Ready(Err(_)) => { + let err = ProtocolsHandlerUpgrErr::Timer; + self.handler.inject_listen_upgrade_error(err); + continue; + } Poll::Pending => {}, } match Future::poll(Pin::new(&mut in_progress), cx) { Poll::Ready(Ok(upgrade)) => self.handler.inject_fully_negotiated_inbound(upgrade), Poll::Pending => self.negotiating_in.push((in_progress, timeout)), - // TODO: return a diagnostic event? - Poll::Ready(Err(_err)) => {} + Poll::Ready(Err(err)) => { + let err = ProtocolsHandlerUpgrErr::Upgrade(err); + self.handler.inject_listen_upgrade_error(err); + } } } From e2a596d7c274cdbd7c42aff4d644d2569cc59df7 Mon Sep 17 00:00:00 2001 From: "Roman S. Borschel" Date: Mon, 8 Jun 2020 11:05:00 +0200 Subject: [PATCH 04/16] Small doc clarification. --- protocols/request-response/src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index f55c1e2edc1..264f21b50df 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -55,7 +55,8 @@ //! Note that `RequestResponseMessage::Response` will still be emitted, //! immediately after the request has been sent, since `RequestResponseCodec::read_response` //! will not actually read anything from the given I/O stream. -//! [`RequestResponse::send_response`] need not be called for one-way protocols. +//! [`RequestResponse::send_response`] need not be called for one-way protocols, +//! i.e. the [`ResponseChannel`] may just be dropped. //! //! ## Limited Protocol Support //! From eb9c02d570404aa0ae3f084841bacdc7157465ff Mon Sep 17 00:00:00 2001 From: "Roman S. Borschel" Date: Thu, 25 Jun 2020 12:30:59 +0200 Subject: [PATCH 05/16] Remove unnecessary Sync bounds. --- protocols/request-response/src/handler.rs | 2 +- protocols/request-response/src/handler/protocol.rs | 4 ++-- protocols/request-response/src/lib.rs | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index 1cba9388606..3a491a69b6a 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -136,7 +136,7 @@ where impl ProtocolsHandler for RequestResponseHandler where - TCodec: RequestResponseCodec + Send + Sync + Clone + 'static, + TCodec: RequestResponseCodec + Send + Clone + 'static, { type InEvent = RequestProtocol; type OutEvent = RequestResponseHandlerEvent; diff --git a/protocols/request-response/src/handler/protocol.rs b/protocols/request-response/src/handler/protocol.rs index b546b5496ee..dc821fcc628 100644 --- a/protocols/request-response/src/handler/protocol.rs +++ b/protocols/request-response/src/handler/protocol.rs @@ -97,7 +97,7 @@ where impl InboundUpgrade for ResponseProtocol where - TCodec: RequestResponseCodec + Send + Sync + 'static, + TCodec: RequestResponseCodec + Send + 'static, { type Output = (); type Error = io::Error; @@ -144,7 +144,7 @@ where impl OutboundUpgrade for RequestProtocol where - TCodec: RequestResponseCodec + Send + Sync + 'static, + TCodec: RequestResponseCodec + Send + 'static, { type Output = TCodec::Response; type Error = io::Error; diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index 264f21b50df..efad614aafa 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -396,7 +396,7 @@ where impl NetworkBehaviour for RequestResponse where - TCodec: RequestResponseCodec + Send + Sync + Clone + 'static, + TCodec: RequestResponseCodec + Send + Clone + 'static, { type ProtocolsHandler = RequestResponseHandler; type OutEvent = RequestResponseEvent; From 52d47ecf91f6bbc2436eaf90da4ef202d9501fad Mon Sep 17 00:00:00 2001 From: "Roman S. Borschel" Date: Thu, 25 Jun 2020 13:29:51 +0200 Subject: [PATCH 06/16] Remove redundant Clone constraint. --- protocols/request-response/src/codec.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocols/request-response/src/codec.rs b/protocols/request-response/src/codec.rs index 0a8660c7541..68199e77ba1 100644 --- a/protocols/request-response/src/codec.rs +++ b/protocols/request-response/src/codec.rs @@ -30,7 +30,7 @@ pub trait RequestResponseCodec { /// The type of protocol(s) or protocol versions being negotiated. type Protocol: ProtocolName + Send + Sync + Clone; /// The type of inbound and outbound requests. - type Request: Send + Clone; + type Request: Send; /// The type of inbound and outbound responses. type Response: Send; From 54f56d8e82cafa85e55eadd0be22c371040bc685 Mon Sep 17 00:00:00 2001 From: Roman Borschel Date: Thu, 25 Jun 2020 13:48:01 +0200 Subject: [PATCH 07/16] Update protocols/request-response/Cargo.toml Co-authored-by: Toralf Wittner --- protocols/request-response/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocols/request-response/Cargo.toml b/protocols/request-response/Cargo.toml index 81c4810c49b..48235761076 100644 --- a/protocols/request-response/Cargo.toml +++ b/protocols/request-response/Cargo.toml @@ -17,7 +17,7 @@ smallvec = "1.4" wasm-timer = "0.2" [dev-dependencies] -async-std = "< 1.6" +async-std = "1.6.2" libp2p-noise = { version = "0.19.0", path = "../noise" } libp2p-tcp = { version = "0.19.0", path = "../../transports/tcp", features = ["async-std"] } libp2p-yamux = { version = "0.19.0", path = "../../muxers/yamux" } From 11777d764a7b650977a19326f248e76a7eaef744 Mon Sep 17 00:00:00 2001 From: "Roman S. Borschel" Date: Thu, 25 Jun 2020 23:11:28 +0200 Subject: [PATCH 08/16] Update dev-dependencies. --- protocols/request-response/Cargo.toml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/protocols/request-response/Cargo.toml b/protocols/request-response/Cargo.toml index 48235761076..b882325cbbb 100644 --- a/protocols/request-response/Cargo.toml +++ b/protocols/request-response/Cargo.toml @@ -18,7 +18,7 @@ wasm-timer = "0.2" [dev-dependencies] async-std = "1.6.2" -libp2p-noise = { version = "0.19.0", path = "../noise" } -libp2p-tcp = { version = "0.19.0", path = "../../transports/tcp", features = ["async-std"] } -libp2p-yamux = { version = "0.19.0", path = "../../muxers/yamux" } +libp2p-noise = { path = "../noise" } +libp2p-tcp = { path = "../../transports/tcp", features = ["async-std"] } +libp2p-yamux = { path = "../../muxers/yamux" } rand = "0.7" From a204f73c18b08f21ca442ac2c538671ae6bb3e1f Mon Sep 17 00:00:00 2001 From: "Roman S. Borschel" Date: Thu, 25 Jun 2020 23:13:00 +0200 Subject: [PATCH 09/16] Update Cargo.tomls. --- Cargo.toml | 3 +++ protocols/request-response/Cargo.toml | 6 +++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c9852d703d3..72918e1ebe8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ default = [ "ping", "plaintext", "pnet", + "request-response", "secio", "secp256k1", "tcp-async-std", @@ -43,6 +44,7 @@ noise = ["libp2p-noise"] ping = ["libp2p-ping"] plaintext = ["libp2p-plaintext"] pnet = ["libp2p-pnet"] +request-response = ["libp2p-request-response"] secio = ["libp2p-secio"] tcp-async-std = ["libp2p-tcp", "libp2p-tcp/async-std"] tcp-tokio = ["libp2p-tcp", "libp2p-tcp/tokio"] @@ -67,6 +69,7 @@ libp2p-noise = { version = "0.19.1", path = "protocols/noise", optional = true } libp2p-ping = { version = "0.19.3", path = "protocols/ping", optional = true } libp2p-plaintext = { version = "0.19.1", path = "protocols/plaintext", optional = true } libp2p-pnet = { version = "0.19.1", path = "protocols/pnet", optional = true } +libp2p-request-response = { version = "0.1.0", path = "protocols/request-response", optional = true } libp2p-secio = { version = "0.19.2", path = "protocols/secio", default-features = false, optional = true } libp2p-swarm = { version = "0.19.1", path = "swarm" } libp2p-uds = { version = "0.19.2", path = "transports/uds", optional = true } diff --git a/protocols/request-response/Cargo.toml b/protocols/request-response/Cargo.toml index b882325cbbb..f88b8baa832 100644 --- a/protocols/request-response/Cargo.toml +++ b/protocols/request-response/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-request-response" edition = "2018" description = "Generic Request/Response Protocols" -version = "0.19.0" +version = "0.1.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -11,8 +11,8 @@ categories = ["network-programming", "asynchronous"] [dependencies] futures = "0.3.1" -libp2p-core = { version = "0.19.0", path = "../../core" } -libp2p-swarm = { version = "0.19.0", path = "../../swarm" } +libp2p-core = { version = "0.19.2", path = "../../core" } +libp2p-swarm = { version = "0.19.1", path = "../../swarm" } smallvec = "1.4" wasm-timer = "0.2" From f964bbcd6eba1c5793e23668b8c6af486164221d Mon Sep 17 00:00:00 2001 From: "Roman S. Borschel" Date: Thu, 25 Jun 2020 23:17:32 +0200 Subject: [PATCH 10/16] Add changelog. --- CHANGELOG.md | 1 + protocols/request-response/CHANGELOG.md | 4 ++++ 2 files changed, 5 insertions(+) create mode 100644 protocols/request-response/CHANGELOG.md diff --git a/CHANGELOG.md b/CHANGELOG.md index 8765ed8f538..2d3e9f51eb4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ - [`libp2p-ping` CHANGELOG](protocols/ping/CHANGELOG.md) - [`libp2p-plaintext` CHANGELOG](protocols/plaintext/CHANGELOG.md) - [`libp2p-pnet` CHANGELOG](protocols/pnet/CHANGELOG.md) +- [`libp2p-request-response` CHANGELOG](protocols/request-response/CHANGELOG.md) - [`libp2p-secio` CHANGELOG](protocols/secio/CHANGELOG.md) - [`libp2p-swarm` CHANGELOG](swarm/CHANGELOG.md) - [`libp2p-tcp` CHANGELOG](transports/tcp/CHANGELOG.md) diff --git a/protocols/request-response/CHANGELOG.md b/protocols/request-response/CHANGELOG.md new file mode 100644 index 00000000000..49462a2fc6f --- /dev/null +++ b/protocols/request-response/CHANGELOG.md @@ -0,0 +1,4 @@ +# 0.1.0 + +Initial release. + From fa2ee4129c8b6c9b9660e23c54cee1917e9b3c4e Mon Sep 17 00:00:00 2001 From: "Roman S. Borschel" Date: Fri, 26 Jun 2020 16:36:34 +0200 Subject: [PATCH 11/16] Remove Sync bound from RequestResponseCodec::Protocol. Apparently the compiler just needs some help with the scope of borrows, which is unfortunate. --- protocols/request-response/src/codec.rs | 2 +- protocols/request-response/src/handler/protocol.rs | 12 ++++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/protocols/request-response/src/codec.rs b/protocols/request-response/src/codec.rs index 68199e77ba1..6aa1be442c1 100644 --- a/protocols/request-response/src/codec.rs +++ b/protocols/request-response/src/codec.rs @@ -28,7 +28,7 @@ use std::io; /// protocol family and how they are encoded / decoded on an I/O stream. pub trait RequestResponseCodec { /// The type of protocol(s) or protocol versions being negotiated. - type Protocol: ProtocolName + Send + Sync + Clone; + type Protocol: ProtocolName + Send + Clone; /// The type of inbound and outbound requests. type Request: Send; /// The type of inbound and outbound responses. diff --git a/protocols/request-response/src/handler/protocol.rs b/protocols/request-response/src/handler/protocol.rs index dc821fcc628..c0dcdaf9231 100644 --- a/protocols/request-response/src/handler/protocol.rs +++ b/protocols/request-response/src/handler/protocol.rs @@ -105,10 +105,12 @@ where fn upgrade_inbound(mut self, mut io: NegotiatedSubstream, protocol: Self::Info) -> Self::Future { async move { - let request = self.codec.read_request(&protocol, &mut io).await?; + let read = self.codec.read_request(&protocol, &mut io); + let request = read.await?; if let Ok(()) = self.request_sender.send(request) { if let Ok(response) = self.response_receiver.await { - self.codec.write_response(&protocol, &mut io, response).await?; + let write = self.codec.write_response(&protocol, &mut io, response); + write.await?; } } Ok(()) @@ -152,8 +154,10 @@ where fn upgrade_outbound(mut self, mut io: NegotiatedSubstream, protocol: Self::Info) -> Self::Future { async move { - self.codec.write_request(&protocol, &mut io, self.request).await?; - let response = self.codec.read_response(&protocol, &mut io).await?; + let write = self.codec.write_request(&protocol, &mut io, self.request); + write.await?; + let read = self.codec.read_response(&protocol, &mut io); + let response = read.await?; Ok(response) }.boxed() } From 9c6e6b4e88de7f7a7369b3f3834ab5f5233bc398 Mon Sep 17 00:00:00 2001 From: "Roman S. Borschel" Date: Sat, 27 Jun 2020 17:00:31 +0200 Subject: [PATCH 12/16] Try async-trait. --- protocols/request-response/Cargo.toml | 1 + protocols/request-response/src/codec.rs | 20 ++++++++------- protocols/request-response/tests/ping.rs | 32 +++++++++++++----------- 3 files changed, 29 insertions(+), 24 deletions(-) diff --git a/protocols/request-response/Cargo.toml b/protocols/request-response/Cargo.toml index f88b8baa832..023b83dcc20 100644 --- a/protocols/request-response/Cargo.toml +++ b/protocols/request-response/Cargo.toml @@ -10,6 +10,7 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] +async-trait = "0.1" futures = "0.3.1" libp2p-core = { version = "0.19.2", path = "../../core" } libp2p-swarm = { version = "0.19.1", path = "../../swarm" } diff --git a/protocols/request-response/src/codec.rs b/protocols/request-response/src/codec.rs index 6aa1be442c1..da85b277d81 100644 --- a/protocols/request-response/src/codec.rs +++ b/protocols/request-response/src/codec.rs @@ -20,12 +20,14 @@ pub use libp2p_core::ProtocolName; -use futures::{prelude::*, future::BoxFuture}; +use async_trait::async_trait; +use futures::prelude::*; use std::io; /// A `RequestResponseCodec` defines the request and response types /// for a [`RequestResponse`](crate::RequestResponse) protocol or /// protocol family and how they are encoded / decoded on an I/O stream. +#[async_trait] pub trait RequestResponseCodec { /// The type of protocol(s) or protocol versions being negotiated. type Protocol: ProtocolName + Send + Clone; @@ -36,29 +38,29 @@ pub trait RequestResponseCodec { /// Reads a request from the given I/O stream according to the /// negotiated protocol. - fn read_request<'a, T>(&mut self, protocol: &Self::Protocol, io: &'a mut T) - -> BoxFuture<'a, Result> + async fn read_request(&mut self, protocol: &Self::Protocol, io: &mut T) + -> io::Result where T: AsyncRead + Unpin + Send; /// Reads a response from the given I/O stream according to the /// negotiated protocol. - fn read_response<'a, T>(&mut self, protocol: &Self::Protocol, io: &'a mut T) - -> BoxFuture<'a, Result> + async fn read_response(&mut self, protocol: &Self::Protocol, io: &mut T) + -> io::Result where T: AsyncRead + Unpin + Send; /// Writes a request to the given I/O stream according to the /// negotiated protocol. - fn write_request<'a, T>(&mut self, protocol: &Self::Protocol, io: &'a mut T, req: Self::Request) - -> BoxFuture<'a, Result<(), io::Error>> + async fn write_request(&mut self, protocol: &Self::Protocol, io: &mut T, req: Self::Request) + -> io::Result<()> where T: AsyncWrite + Unpin + Send; /// Writes a response to the given I/O stream according to the /// negotiated protocol. - fn write_response<'a, T>(&mut self, protocol: &Self::Protocol, io: &'a mut T, res: Self::Response) - -> BoxFuture<'a, Result<(), io::Error>> + async fn write_response(&mut self, protocol: &Self::Protocol, io: &mut T, res: Self::Response) + -> io::Result<()> where T: AsyncWrite + Unpin + Send; } diff --git a/protocols/request-response/tests/ping.rs b/protocols/request-response/tests/ping.rs index 508514b1c84..107a37edf04 100644 --- a/protocols/request-response/tests/ping.rs +++ b/protocols/request-response/tests/ping.rs @@ -20,6 +20,7 @@ //! Integration tests for the `RequestResponse` network behaviour. +use async_trait::async_trait; use libp2p_core::{ Multiaddr, PeerId, @@ -32,7 +33,7 @@ use libp2p_noise::{NoiseConfig, X25519Spec, Keypair}; use libp2p_request_response::*; use libp2p_swarm::Swarm; use libp2p_tcp::TcpConfig; -use futures::{prelude::*, channel::mpsc, future::BoxFuture}; +use futures::{prelude::*, channel::mpsc}; use rand::{self, Rng}; use std::{io, iter}; @@ -147,47 +148,48 @@ impl ProtocolName for PingProtocol { } } +#[async_trait] impl RequestResponseCodec for PingCodec { type Protocol = PingProtocol; type Request = Ping; type Response = Pong; - fn read_request<'a, T>(&mut self, _: &PingProtocol, io: &'a mut T) - -> BoxFuture<'a, Result> + async fn read_request(&mut self, _: &PingProtocol, io: &mut T) + -> io::Result where T: AsyncRead + Unpin + Send { read_one(io, 1024) .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) - .and_then(|data| future::ready(Ok(Ping(data)))) - .boxed() + .map_ok(Ping) + .await } - fn read_response<'a, T>(&mut self, _: &PingProtocol, io: &'a mut T) - -> BoxFuture<'a, Result> + async fn read_response(&mut self, _: &PingProtocol, io: &mut T) + -> io::Result where T: AsyncRead + Unpin + Send { read_one(io, 1024) .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) - .and_then(|data| future::ready(Ok(Pong(data)))) - .boxed() + .map_ok(Pong) + .await } - fn write_request<'a, T>(&mut self, _: &PingProtocol, io: &'a mut T, Ping(data): Ping) - -> BoxFuture<'a, Result<(), io::Error>> + async fn write_request(&mut self, _: &PingProtocol, io: &mut T, Ping(data): Ping) + -> io::Result<()> where T: AsyncWrite + Unpin + Send { - write_one(io, data).boxed() + write_one(io, data).await } - fn write_response<'a, T>(&mut self, _: &PingProtocol, io: &'a mut T, Pong(data): Pong) - -> BoxFuture<'a, Result<(), io::Error>> + async fn write_response(&mut self, _: &PingProtocol, io: &mut T, Pong(data): Pong) + -> io::Result<()> where T: AsyncWrite + Unpin + Send { - write_one(io, data).boxed() + write_one(io, data).await } } From 7bfdff5c630e202f4dacf18e42c0a9e5dd18e822 Mon Sep 17 00:00:00 2001 From: "Roman S. Borschel" Date: Sat, 27 Jun 2020 17:10:24 +0200 Subject: [PATCH 13/16] Allow checking whether a ResponseChannel is still open. Also expand the commentary on `send_response` to indicate that responses may be discard if they come in too late. --- protocols/request-response/src/lib.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index efad614aafa..98776e97777 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -195,6 +195,19 @@ pub struct ResponseChannel { sender: oneshot::Sender, } +impl ResponseChannel { + /// Checks whether the response channel is still open, i.e. + /// the `RequestResponse` behaviour is still waiting for a + /// a response to be sent via [`RequestResponse::send_response`] + /// and this response channel. + /// + /// If the response channel is no longer open then the inbound + /// request timed out waiting for the response. + pub fn is_open(&self) -> bool { + !self.sender.is_canceled() + } +} + /// The (local) ID of an outgoing request. /// /// See [`RequestResponse::send_request`]. @@ -330,6 +343,10 @@ where /// Initiates sending a response to an inbound request. /// + /// If the `ResponseChannel` is already closed due to a timeout, + /// the response is discarded and eventually [`RequestResponseEvent::InboundFailure`] + /// is emitted by `RequestResponse::poll`. + /// /// The provided `ResponseChannel` is obtained from a /// [`RequestResponseMessage::Request`]. pub fn send_response(&mut self, ch: ResponseChannel, rs: TCodec::Response) { From 2ec63e5283505e2907af8cb0a127be78b7793d14 Mon Sep 17 00:00:00 2001 From: "Roman S. Borschel" Date: Sun, 28 Jun 2020 15:34:11 +0200 Subject: [PATCH 14/16] Add `RequestResponse::is_pending`. As an analogue of `ResponseChannel::is_open` for outbound requests. --- protocols/request-response/src/lib.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index 98776e97777..c21929343a0 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -382,6 +382,13 @@ where self.connected.contains_key(peer) } + /// Checks whether an outbound request initiated by + /// [`RequestResponse::send_request`] is still pending, i.e. waiting + /// for a response. + pub fn is_pending(&self, req_id: &RequestId) -> bool { + self.pending_responses.contains_key(req_id) + } + /// Returns the next request ID. fn next_request_id(&mut self) -> RequestId { let request_id = self.next_request_id; From 216fca4e579f8a396669dee84dfc2655baf0f54f Mon Sep 17 00:00:00 2001 From: "Roman S. Borschel" Date: Mon, 29 Jun 2020 16:28:17 +0200 Subject: [PATCH 15/16] Revert now unnecessary changes to the OneShotHandler. Since `libp2p-request-response` is no longer using it. --- protocols/floodsub/src/layer.rs | 9 ++-- swarm/src/lib.rs | 2 - swarm/src/protocols_handler.rs | 2 +- swarm/src/protocols_handler/one_shot.rs | 65 +++++++------------------ 4 files changed, 22 insertions(+), 56 deletions(-) diff --git a/protocols/floodsub/src/layer.rs b/protocols/floodsub/src/layer.rs index 12b44fda7d6..c1d66572dbd 100644 --- a/protocols/floodsub/src/layer.rs +++ b/protocols/floodsub/src/layer.rs @@ -29,7 +29,6 @@ use libp2p_swarm::{ NetworkBehaviourAction, PollParameters, ProtocolsHandler, - OneShotEvent, OneShotHandler, NotifyHandler, DialPeerCondition, @@ -238,7 +237,7 @@ impl Floodsub { } impl NetworkBehaviour for Floodsub { - type ProtocolsHandler = OneShotHandler; + type ProtocolsHandler = OneShotHandler; type OutEvent = FloodsubEvent; fn new_handler(&mut self) -> Self::ProtocolsHandler { @@ -288,12 +287,12 @@ impl NetworkBehaviour for Floodsub { &mut self, propagation_source: PeerId, _connection: ConnectionId, - event: OneShotEvent, + event: InnerMessage, ) { // We ignore successful sends or timeouts. let event = match event { - OneShotEvent::Success(InnerMessage::Rx(event)) => event, - OneShotEvent::Success(InnerMessage::Sent) | OneShotEvent::Timeout(()) => return, + InnerMessage::Rx(event) => event, + InnerMessage::Sent => return, }; // Update connected peers topics diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 38336a8fc04..f848f64f9d3 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -76,10 +76,8 @@ pub use protocols_handler::{ ProtocolsHandlerEvent, ProtocolsHandlerSelect, ProtocolsHandlerUpgrErr, - OneShotEvent, OneShotHandler, OneShotHandlerConfig, - OneShotOutboundInfo, SubstreamProtocol }; diff --git a/swarm/src/protocols_handler.rs b/swarm/src/protocols_handler.rs index 057c0edfb69..62be4138b3e 100644 --- a/swarm/src/protocols_handler.rs +++ b/swarm/src/protocols_handler.rs @@ -63,7 +63,7 @@ pub use dummy::DummyProtocolsHandler; pub use map_in::MapInEvent; pub use map_out::MapOutEvent; pub use node_handler::{NodeHandlerWrapper, NodeHandlerWrapperBuilder, NodeHandlerWrapperError}; -pub use one_shot::{OneShotHandler, OneShotHandlerConfig, OneShotEvent, OneShotOutboundInfo}; +pub use one_shot::{OneShotHandler, OneShotHandlerConfig}; pub use select::{IntoProtocolsHandlerSelect, ProtocolsHandlerSelect}; /// A handler for a set of protocols used on a connection with a remote. diff --git a/swarm/src/protocols_handler/one_shot.rs b/swarm/src/protocols_handler/one_shot.rs index 7575a1b6059..4659fe0faaa 100644 --- a/swarm/src/protocols_handler/one_shot.rs +++ b/swarm/src/protocols_handler/one_shot.rs @@ -33,14 +33,14 @@ use wasm_timer::Instant; /// A `ProtocolsHandler` that opens a new substream for each request. // TODO: Debug -pub struct OneShotHandler +pub struct OneShotHandler where TOutbound: OutboundUpgradeSend, { /// The upgrade for inbound substreams. listen_protocol: SubstreamProtocol, /// If `Some`, something bad happened and we should shut down the handler with an error. - pending_error: Option<(ProtocolsHandlerUpgrErr<::Error>, TInfo)>, + pending_error: Option::Error>>, /// Queue of events to produce in `poll()`. events_out: SmallVec<[TEvent; 4]>, /// Queue of outbound substreams to open. @@ -55,8 +55,8 @@ where config: OneShotHandlerConfig, } -impl - OneShotHandler +impl + OneShotHandler where TOutbound: OutboundUpgradeSend, { @@ -105,8 +105,8 @@ where } } -impl Default - for OneShotHandler +impl Default + for OneShotHandler where TOutbound: OutboundUpgradeSend, TInbound: InboundUpgradeSend + Default, @@ -119,34 +119,25 @@ where } } -/// The events emitted by the [`OneShotHandler`]. -pub enum OneShotEvent { - /// An inbound or outbound upgrade succeeded. - Success(TEvent), - /// An outbound upgrade (i.e. request) timed out. - Timeout(TInfo), -} - -impl ProtocolsHandler - for OneShotHandler +impl ProtocolsHandler + for OneShotHandler where TInbound: InboundUpgradeSend + Send + 'static, - TOutbound: OutboundUpgradeSend + OneShotOutboundInfo, + TOutbound: OutboundUpgradeSend, TInbound::Output: Into, TOutbound::Output: Into, TOutbound::Error: error::Error + Send + 'static, SubstreamProtocol: Clone, TEvent: Send + 'static, - TInfo: Send + 'static, { type InEvent = TOutbound; - type OutEvent = OneShotEvent; + type OutEvent = TEvent; type Error = ProtocolsHandlerUpgrErr< ::Error, >; type InboundProtocol = TInbound; type OutboundProtocol = TOutbound; - type OutboundOpenInfo = TInfo; + type OutboundOpenInfo = (); fn listen_protocol(&self) -> SubstreamProtocol { self.listen_protocol.clone() @@ -184,13 +175,13 @@ where fn inject_dial_upgrade_error( &mut self, - info: Self::OutboundOpenInfo, + _info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr< ::Error, >, ) { if self.pending_error.is_none() { - self.pending_error = Some((error, info)); + self.pending_error = Some(error); } } @@ -204,19 +195,13 @@ where ) -> Poll< ProtocolsHandlerEvent, > { - if let Some((err, info)) = self.pending_error.take() { - if let ProtocolsHandlerUpgrErr::Timeout = &err { - return Poll::Ready(ProtocolsHandlerEvent::Custom( - OneShotEvent::Timeout(info) - )) - } else { - return Poll::Ready(ProtocolsHandlerEvent::Close(err)) - } + if let Some(err) = self.pending_error.take() { + return Poll::Ready(ProtocolsHandlerEvent::Close(err)) } if !self.events_out.is_empty() { return Poll::Ready(ProtocolsHandlerEvent::Custom( - OneShotEvent::Success(self.events_out.remove(0)), + self.events_out.remove(0) )); } else { self.events_out.shrink_to_fit(); @@ -226,12 +211,11 @@ where if self.dial_negotiated < self.max_dial_negotiated { self.dial_negotiated += 1; let upgrade = self.dial_queue.remove(0); - let info = upgrade.info(); return Poll::Ready( ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol: SubstreamProtocol::new(upgrade) .with_timeout(self.config.outbound_substream_timeout), - info, + info: (), }, ); } @@ -261,18 +245,3 @@ impl Default for OneShotHandlerConfig { } } -/// Information about an outgoing request that is returned -/// to the behaviour in case of a (non-fatal) request error. -/// -/// This trait must be implemented by the outbound upgrade -/// used with the `OneShotHandler`. -pub trait OneShotOutboundInfo { - fn info(&self) -> T; -} - -impl OneShotOutboundInfo<()> for T { - fn info(&self) -> () { - () - } -} - From 9fadb3f2099c1d7943fbbdcf04402f3d30e1fe0b Mon Sep 17 00:00:00 2001 From: "Roman S. Borschel" Date: Mon, 29 Jun 2020 16:46:37 +0200 Subject: [PATCH 16/16] Update CHANGELOG for libp2p-swarm. --- swarm/CHANGELOG.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index e294ca61489..a10d9fe66cf 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -1,3 +1,10 @@ +# 0.20.0 [????-??-??] + +- Add `ProtocolsHandler::inject_listen_upgrade_error`, the inbound +analogue of `ProtocolsHandler::inject_dial_upgrade_error`, with an +empty default implementation. No implementation is required to +retain existing behaviour. + # 0.19.1 [2020-06-18] - Bugfix: Fix MultiHandler panicking when empty