diff --git a/CHANGELOG.md b/CHANGELOG.md index 8765ed8f538..2d3e9f51eb4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ - [`libp2p-ping` CHANGELOG](protocols/ping/CHANGELOG.md) - [`libp2p-plaintext` CHANGELOG](protocols/plaintext/CHANGELOG.md) - [`libp2p-pnet` CHANGELOG](protocols/pnet/CHANGELOG.md) +- [`libp2p-request-response` CHANGELOG](protocols/request-response/CHANGELOG.md) - [`libp2p-secio` CHANGELOG](protocols/secio/CHANGELOG.md) - [`libp2p-swarm` CHANGELOG](swarm/CHANGELOG.md) - [`libp2p-tcp` CHANGELOG](transports/tcp/CHANGELOG.md) diff --git a/Cargo.toml b/Cargo.toml index ed727247bd2..72918e1ebe8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ default = [ "ping", "plaintext", "pnet", + "request-response", "secio", "secp256k1", "tcp-async-std", @@ -43,6 +44,7 @@ noise = ["libp2p-noise"] ping = ["libp2p-ping"] plaintext = ["libp2p-plaintext"] pnet = ["libp2p-pnet"] +request-response = ["libp2p-request-response"] secio = ["libp2p-secio"] tcp-async-std = ["libp2p-tcp", "libp2p-tcp/async-std"] tcp-tokio = ["libp2p-tcp", "libp2p-tcp/tokio"] @@ -67,6 +69,7 @@ libp2p-noise = { version = "0.19.1", path = "protocols/noise", optional = true } libp2p-ping = { version = "0.19.3", path = "protocols/ping", optional = true } libp2p-plaintext = { version = "0.19.1", path = "protocols/plaintext", optional = true } libp2p-pnet = { version = "0.19.1", path = "protocols/pnet", optional = true } +libp2p-request-response = { version = "0.1.0", path = "protocols/request-response", optional = true } libp2p-secio = { version = "0.19.2", path = "protocols/secio", default-features = false, optional = true } libp2p-swarm = { version = "0.19.1", path = "swarm" } libp2p-uds = { version = "0.19.2", path = "transports/uds", optional = true } @@ -107,6 +110,7 @@ members = [ "protocols/noise", "protocols/ping", "protocols/plaintext", + "protocols/request-response", "protocols/secio", "swarm", "transports/dns", diff --git a/core/src/upgrade.rs b/core/src/upgrade.rs index dbe3a5b7c96..9798ae6c27a 100644 --- a/core/src/upgrade.rs +++ b/core/src/upgrade.rs @@ -87,12 +87,12 @@ pub use self::{ /// /// # Context /// -/// In situations where we provide a list of protocols that we support, the elements of that list are required to -/// implement the [`ProtocolName`] trait. +/// In situations where we provide a list of protocols that we support, +/// the elements of that list are required to implement the [`ProtocolName`] trait. /// -/// Libp2p will call the [`ProtocolName::protocol_name`] trait method on each element of that list, and transmit the -/// returned value on the network. If the remote accepts a given protocol, the element serves as the return value of -/// the function that performed the negotiation. +/// Libp2p will call [`ProtocolName::protocol_name`] on each element of that list, and transmit the +/// returned value on the network. If the remote accepts a given protocol, the element +/// serves as the return value of the function that performed the negotiation. /// /// # Example /// @@ -118,6 +118,9 @@ pub use self::{ /// pub trait ProtocolName { /// The protocol name as bytes. Transmitted on the network. + /// + /// **Note:** Valid protocol names must start with `/` and + /// not exceed 140 bytes in length. fn protocol_name(&self) -> &[u8]; } diff --git a/protocols/floodsub/src/layer.rs b/protocols/floodsub/src/layer.rs index 4bb6aa08cbf..c1d66572dbd 100644 --- a/protocols/floodsub/src/layer.rs +++ b/protocols/floodsub/src/layer.rs @@ -289,7 +289,7 @@ impl NetworkBehaviour for Floodsub { _connection: ConnectionId, event: InnerMessage, ) { - // We ignore successful sends event. + // We ignore successful sends or timeouts. let event = match event { InnerMessage::Rx(event) => event, InnerMessage::Sent => return, diff --git a/protocols/request-response/CHANGELOG.md b/protocols/request-response/CHANGELOG.md new file mode 100644 index 00000000000..49462a2fc6f --- /dev/null +++ b/protocols/request-response/CHANGELOG.md @@ -0,0 +1,4 @@ +# 0.1.0 + +Initial release. + diff --git a/protocols/request-response/Cargo.toml b/protocols/request-response/Cargo.toml new file mode 100644 index 00000000000..023b83dcc20 --- /dev/null +++ b/protocols/request-response/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "libp2p-request-response" +edition = "2018" +description = "Generic Request/Response Protocols" +version = "0.1.0" +authors = ["Parity Technologies "] +license = "MIT" +repository = "https://github.com/libp2p/rust-libp2p" +keywords = ["peer-to-peer", "libp2p", "networking"] +categories = ["network-programming", "asynchronous"] + +[dependencies] +async-trait = "0.1" +futures = "0.3.1" +libp2p-core = { version = "0.19.2", path = "../../core" } +libp2p-swarm = { version = "0.19.1", path = "../../swarm" } +smallvec = "1.4" +wasm-timer = "0.2" + +[dev-dependencies] +async-std = "1.6.2" +libp2p-noise = { path = "../noise" } +libp2p-tcp = { path = "../../transports/tcp", features = ["async-std"] } +libp2p-yamux = { path = "../../muxers/yamux" } +rand = "0.7" diff --git a/protocols/request-response/src/codec.rs b/protocols/request-response/src/codec.rs new file mode 100644 index 00000000000..da85b277d81 --- /dev/null +++ b/protocols/request-response/src/codec.rs @@ -0,0 +1,66 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +pub use libp2p_core::ProtocolName; + +use async_trait::async_trait; +use futures::prelude::*; +use std::io; + +/// A `RequestResponseCodec` defines the request and response types +/// for a [`RequestResponse`](crate::RequestResponse) protocol or +/// protocol family and how they are encoded / decoded on an I/O stream. +#[async_trait] +pub trait RequestResponseCodec { + /// The type of protocol(s) or protocol versions being negotiated. + type Protocol: ProtocolName + Send + Clone; + /// The type of inbound and outbound requests. + type Request: Send; + /// The type of inbound and outbound responses. + type Response: Send; + + /// Reads a request from the given I/O stream according to the + /// negotiated protocol. + async fn read_request(&mut self, protocol: &Self::Protocol, io: &mut T) + -> io::Result + where + T: AsyncRead + Unpin + Send; + + /// Reads a response from the given I/O stream according to the + /// negotiated protocol. + async fn read_response(&mut self, protocol: &Self::Protocol, io: &mut T) + -> io::Result + where + T: AsyncRead + Unpin + Send; + + /// Writes a request to the given I/O stream according to the + /// negotiated protocol. + async fn write_request(&mut self, protocol: &Self::Protocol, io: &mut T, req: Self::Request) + -> io::Result<()> + where + T: AsyncWrite + Unpin + Send; + + /// Writes a response to the given I/O stream according to the + /// negotiated protocol. + async fn write_response(&mut self, protocol: &Self::Protocol, io: &mut T, res: Self::Response) + -> io::Result<()> + where + T: AsyncWrite + Unpin + Send; +} diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs new file mode 100644 index 00000000000..3a491a69b6a --- /dev/null +++ b/protocols/request-response/src/handler.rs @@ -0,0 +1,326 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +mod protocol; + +use crate::{EMPTY_QUEUE_SHRINK_THRESHOLD, RequestId}; +use crate::codec::RequestResponseCodec; + +pub use protocol::{RequestProtocol, ResponseProtocol, ProtocolSupport}; + +use futures::{ + channel::oneshot, + future::BoxFuture, + prelude::*, + stream::FuturesUnordered +}; +use libp2p_core::{ + upgrade::{UpgradeError, NegotiationError}, +}; +use libp2p_swarm::{ + SubstreamProtocol, + protocols_handler::{ + KeepAlive, + ProtocolsHandler, + ProtocolsHandlerEvent, + ProtocolsHandlerUpgrErr, + } +}; +use smallvec::SmallVec; +use std::{ + collections::VecDeque, + io, + time::Duration, + task::{Context, Poll} +}; +use wasm_timer::Instant; + +/// A connection handler of a `RequestResponse` protocol. +#[doc(hidden)] +pub struct RequestResponseHandler +where + TCodec: RequestResponseCodec, +{ + /// The supported inbound protocols. + inbound_protocols: SmallVec<[TCodec::Protocol; 2]>, + /// The request/response message codec. + codec: TCodec, + /// The keep-alive timeout of idle connections. A connection is considered + /// idle if there are no outbound substreams. + keep_alive_timeout: Duration, + /// The timeout for inbound and outbound substreams (i.e. request + /// and response processing). + substream_timeout: Duration, + /// The current connection keep-alive. + keep_alive: KeepAlive, + /// A pending fatal error that results in the connection being closed. + pending_error: Option>, + /// Queue of events to emit in `poll()`. + pending_events: VecDeque>, + /// Outbound upgrades waiting to be emitted as an `OutboundSubstreamRequest`. + outbound: VecDeque>, + /// Inbound upgrades waiting for the incoming request. + inbound: FuturesUnordered), + oneshot::Canceled + >>>, +} + +impl RequestResponseHandler +where + TCodec: RequestResponseCodec, +{ + pub(super) fn new( + inbound_protocols: SmallVec<[TCodec::Protocol; 2]>, + codec: TCodec, + keep_alive_timeout: Duration, + substream_timeout: Duration, + ) -> Self { + Self { + inbound_protocols, + codec, + keep_alive: KeepAlive::Yes, + keep_alive_timeout, + substream_timeout, + outbound: VecDeque::new(), + inbound: FuturesUnordered::new(), + pending_events: VecDeque::new(), + pending_error: None, + } + } +} + +/// The events emitted by the [`RequestResponseHandler`]. +#[doc(hidden)] +pub enum RequestResponseHandlerEvent +where + TCodec: RequestResponseCodec +{ + /// An inbound request. + Request { + request: TCodec::Request, + sender: oneshot::Sender + }, + /// An inbound response. + Response { + request_id: RequestId, + response: TCodec::Response + }, + /// An outbound upgrade (i.e. request) timed out. + OutboundTimeout(RequestId), + /// An outbound request failed to negotiate a mutually supported protocol. + OutboundUnsupportedProtocols(RequestId), + /// An inbound request timed out. + InboundTimeout, + /// An inbound request failed to negotiate a mutually supported protocol. + InboundUnsupportedProtocols, +} + +impl ProtocolsHandler for RequestResponseHandler +where + TCodec: RequestResponseCodec + Send + Clone + 'static, +{ + type InEvent = RequestProtocol; + type OutEvent = RequestResponseHandlerEvent; + type Error = ProtocolsHandlerUpgrErr; + type InboundProtocol = ResponseProtocol; + type OutboundProtocol = RequestProtocol; + type OutboundOpenInfo = RequestId; + + fn listen_protocol(&self) -> SubstreamProtocol { + // A channel for notifying the handler when the inbound + // upgrade received the request. + let (rq_send, rq_recv) = oneshot::channel(); + + // A channel for notifying the inbound upgrade when the + // response is sent. + let (rs_send, rs_recv) = oneshot::channel(); + + // By keeping all I/O inside the `ResponseProtocol` and thus the + // inbound substream upgrade via above channels, we ensure that it + // is all subject to the configured timeout without extra bookkeeping + // for inbound substreams as well as their timeouts and also make the + // implementation of inbound and outbound upgrades symmetric in + // this sense. + let proto = ResponseProtocol { + protocols: self.inbound_protocols.clone(), + codec: self.codec.clone(), + request_sender: rq_send, + response_receiver: rs_recv, + }; + + // The handler waits for the request to come in. It then emits + // `RequestResponseHandlerEvent::Request` together with a + // `ResponseChannel`. + self.inbound.push(rq_recv.map_ok(move |rq| (rq, rs_send)).boxed()); + + SubstreamProtocol::new(proto).with_timeout(self.substream_timeout) + } + + fn inject_fully_negotiated_inbound( + &mut self, + (): (), + ) { + // Nothing to do, as the response has already been sent + // as part of the upgrade. + } + + fn inject_fully_negotiated_outbound( + &mut self, + response: TCodec::Response, + request_id: RequestId, + ) { + self.pending_events.push_back( + RequestResponseHandlerEvent::Response { + request_id, response + }); + } + + fn inject_event(&mut self, request: Self::InEvent) { + self.keep_alive = KeepAlive::Yes; + self.outbound.push_back(request); + } + + fn inject_dial_upgrade_error( + &mut self, + info: RequestId, + error: ProtocolsHandlerUpgrErr, + ) { + match error { + ProtocolsHandlerUpgrErr::Timeout => { + self.pending_events.push_back( + RequestResponseHandlerEvent::OutboundTimeout(info)); + } + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { + // The remote merely doesn't support the protocol(s) we requested. + // This is no reason to close the connection, which may + // successfully communicate with other protocols already. + // An event is reported to permit user code to react to the fact that + // the remote peer does not support the requested protocol(s). + self.pending_events.push_back( + RequestResponseHandlerEvent::OutboundUnsupportedProtocols(info)); + } + _ => { + // Anything else is considered a fatal error or misbehaviour of + // the remote peer and results in closing the connection. + self.pending_error = Some(error); + } + } + } + + fn inject_listen_upgrade_error( + &mut self, + error: ProtocolsHandlerUpgrErr + ) { + match error { + ProtocolsHandlerUpgrErr::Timeout => { + self.pending_events.push_back( + RequestResponseHandlerEvent::InboundTimeout); + } + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { + // The local peer merely doesn't support the protocol(s) requested. + // This is no reason to close the connection, which may + // successfully communicate with other protocols already. + // An event is reported to permit user code to react to the fact that + // the local peer does not support the requested protocol(s). + self.pending_events.push_back( + RequestResponseHandlerEvent::InboundUnsupportedProtocols); + } + _ => { + // Anything else is considered a fatal error or misbehaviour of + // the remote peer and results in closing the connection. + self.pending_error = Some(error); + } + } + } + + fn connection_keep_alive(&self) -> KeepAlive { + self.keep_alive + } + + fn poll( + &mut self, + cx: &mut Context, + ) -> Poll< + ProtocolsHandlerEvent, RequestId, Self::OutEvent, Self::Error>, + > { + // Check for a pending (fatal) error. + if let Some(err) = self.pending_error.take() { + // The handler will not be polled again by the `Swarm`. + return Poll::Ready(ProtocolsHandlerEvent::Close(err)) + } + + // Drain pending events. + if let Some(event) = self.pending_events.pop_front() { + return Poll::Ready(ProtocolsHandlerEvent::Custom(event)) + } else if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD { + self.pending_events.shrink_to_fit(); + } + + // Check for inbound requests. + while let Poll::Ready(Some(result)) = self.inbound.poll_next_unpin(cx) { + match result { + Ok((rq, rs_sender)) => { + // We received an inbound request. + self.keep_alive = KeepAlive::Yes; + return Poll::Ready(ProtocolsHandlerEvent::Custom( + RequestResponseHandlerEvent::Request { + request: rq, sender: rs_sender + })) + } + Err(oneshot::Canceled) => { + // The inbound upgrade has errored or timed out reading + // or waiting for the request. The handler is informed + // via `inject_listen_upgrade_error`. + } + } + } + + // Emit outbound requests. + if let Some(request) = self.outbound.pop_front() { + let info = request.request_id; + return Poll::Ready( + ProtocolsHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new(request) + .with_timeout(self.substream_timeout), + info, + }, + ) + } + + debug_assert!(self.outbound.is_empty()); + + if self.outbound.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD { + self.outbound.shrink_to_fit(); + } + + if self.inbound.is_empty() { + // No new inbound or outbound requests. However, we may just have + // started the latest inbound or outbound upgrade(s), so make sure + // the keep-alive timeout is preceded by the substream timeout. + let until = Instant::now() + self.substream_timeout + self.keep_alive_timeout; + self.keep_alive = KeepAlive::Until(until); + } + + Poll::Pending + } +} + diff --git a/protocols/request-response/src/handler/protocol.rs b/protocols/request-response/src/handler/protocol.rs new file mode 100644 index 00000000000..c0dcdaf9231 --- /dev/null +++ b/protocols/request-response/src/handler/protocol.rs @@ -0,0 +1,165 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! The definition of a request/response protocol via inbound +//! and outbound substream upgrades. The inbound upgrade +//! receives a request and sends a response, whereas the +//! outbound upgrade send a request and receives a response. + +use crate::RequestId; +use crate::codec::RequestResponseCodec; + +use futures::{ + channel::oneshot, + future::BoxFuture, + prelude::*, +}; +use libp2p_core::{ + upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}, +}; +use libp2p_swarm::{ + NegotiatedSubstream, +}; +use smallvec::SmallVec; +use std::io; + +/// The level of support for a particular protocol. +#[derive(Debug, Clone)] +pub enum ProtocolSupport { + /// The protocol is only supported for inbound requests. + Inbound, + /// The protocol is only supported for outbound requests. + Outbound, + /// The protocol is supported for inbound and outbound requests. + Full +} + +impl ProtocolSupport { + /// Whether inbound requests are supported. + pub fn inbound(&self) -> bool { + match self { + ProtocolSupport::Inbound | ProtocolSupport::Full => true, + ProtocolSupport::Outbound => false, + } + } + + /// Whether outbound requests are supported. + pub fn outbound(&self) -> bool { + match self { + ProtocolSupport::Outbound | ProtocolSupport::Full => true, + ProtocolSupport::Inbound => false, + } + } +} + +/// Response substream upgrade protocol. +/// +/// Receives a request and sends a response. +#[derive(Debug)] +pub struct ResponseProtocol +where + TCodec: RequestResponseCodec +{ + pub(crate) codec: TCodec, + pub(crate) protocols: SmallVec<[TCodec::Protocol; 2]>, + pub(crate) request_sender: oneshot::Sender, + pub(crate) response_receiver: oneshot::Receiver +} + +impl UpgradeInfo for ResponseProtocol +where + TCodec: RequestResponseCodec +{ + type Info = TCodec::Protocol; + type InfoIter = smallvec::IntoIter<[Self::Info; 2]>; + + fn protocol_info(&self) -> Self::InfoIter { + self.protocols.clone().into_iter() + } +} + +impl InboundUpgrade for ResponseProtocol +where + TCodec: RequestResponseCodec + Send + 'static, +{ + type Output = (); + type Error = io::Error; + type Future = BoxFuture<'static, Result>; + + fn upgrade_inbound(mut self, mut io: NegotiatedSubstream, protocol: Self::Info) -> Self::Future { + async move { + let read = self.codec.read_request(&protocol, &mut io); + let request = read.await?; + if let Ok(()) = self.request_sender.send(request) { + if let Ok(response) = self.response_receiver.await { + let write = self.codec.write_response(&protocol, &mut io, response); + write.await?; + } + } + Ok(()) + }.boxed() + } +} + +/// Request substream upgrade protocol. +/// +/// Sends a request and receives a response. +#[derive(Debug, Clone)] +pub struct RequestProtocol +where + TCodec: RequestResponseCodec +{ + pub(crate) codec: TCodec, + pub(crate) protocols: SmallVec<[TCodec::Protocol; 2]>, + pub(crate) request_id: RequestId, + pub(crate) request: TCodec::Request, +} + +impl UpgradeInfo for RequestProtocol +where + TCodec: RequestResponseCodec +{ + type Info = TCodec::Protocol; + type InfoIter = smallvec::IntoIter<[Self::Info; 2]>; + + fn protocol_info(&self) -> Self::InfoIter { + self.protocols.clone().into_iter() + } +} + +impl OutboundUpgrade for RequestProtocol +where + TCodec: RequestResponseCodec + Send + 'static, +{ + type Output = TCodec::Response; + type Error = io::Error; + type Future = BoxFuture<'static, Result>; + + fn upgrade_outbound(mut self, mut io: NegotiatedSubstream, protocol: Self::Info) -> Self::Future { + async move { + let write = self.codec.write_request(&protocol, &mut io, self.request); + write.await?; + let read = self.codec.read_response(&protocol, &mut io); + let response = read.await?; + Ok(response) + }.boxed() + } +} + diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs new file mode 100644 index 00000000000..c21929343a0 --- /dev/null +++ b/protocols/request-response/src/lib.rs @@ -0,0 +1,607 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Generic request/response protocols. +//! +//! ## General Usage +//! +//! [`RequestResponse`] is a `NetworkBehaviour` that implements a generic +//! request/response protocol or protocol family, whereby each request is +//! sent over a new substream on a connection. `RequestResponse` is generic +//! over the actual messages being sent, which are defined in terms of a +//! [`RequestResponseCodec`]. Creating a request/response protocol thus amounts +//! to providing an implementation of this trait which can then be +//! given to [`RequestResponse::new`]. Further configuration options are +//! available via the [`RequestResponseConfig`]. +//! +//! Requests are sent using [`RequestResponse::send_request`] and the +//! responses received as [`RequestResponseMessage::Response`] via +//! [`RequestResponseEvent::Message`]. +//! +//! Responses are sent using [`RequestResponse::send_response`] upon +//! receiving a [`RequestResponseMessage::Request`] via +//! [`RequestResponseEvent::Message`]. +//! +//! ## Protocol Families +//! +//! A single [`RequestResponse`] instance can be used with an entire +//! protocol family that share the same request and response types. +//! For that purpose, [`RequestResponseCodec::Protocol`] is typically +//! instantiated with a sum type. +//! +//! ## One-Way Protocols +//! +//! The implementation supports one-way protocols that do not +//! have responses. In these cases the [`RequestResponseCodec::Response`] can +//! be defined as `()` and [`RequestResponseCodec::read_response`] as well as +//! [`RequestResponseCodec::write_response`] given the obvious implementations. +//! Note that `RequestResponseMessage::Response` will still be emitted, +//! immediately after the request has been sent, since `RequestResponseCodec::read_response` +//! will not actually read anything from the given I/O stream. +//! [`RequestResponse::send_response`] need not be called for one-way protocols, +//! i.e. the [`ResponseChannel`] may just be dropped. +//! +//! ## Limited Protocol Support +//! +//! It is possible to only support inbound or outbound requests for +//! a particular protocol. This is achieved by instantiating `RequestResponse` +//! with protocols using [`ProtocolSupport::Inbound`] or +//! [`ProtocolSupport::Outbound`]. Any subset of protocols of a protocol +//! family can be configured in this way. Such protocols will not be +//! advertised during inbound respectively outbound protocol negotiation +//! on the substreams. + +pub mod codec; +pub mod handler; + +pub use codec::{RequestResponseCodec, ProtocolName}; +pub use handler::ProtocolSupport; + +use futures::{ + channel::oneshot, +}; +use handler::{ + RequestProtocol, + RequestResponseHandler, + RequestResponseHandlerEvent, +}; +use libp2p_core::{ + ConnectedPoint, + Multiaddr, + PeerId, + connection::ConnectionId, +}; +use libp2p_swarm::{ + DialPeerCondition, + NetworkBehaviour, + NetworkBehaviourAction, + NotifyHandler, + PollParameters, +}; +use smallvec::SmallVec; +use std::{ + collections::{VecDeque, HashMap}, + time::Duration, + task::{Context, Poll} +}; + +/// An inbound request or response. +#[derive(Debug)] +pub enum RequestResponseMessage { + /// A request message. + Request { + /// The request message. + request: TRequest, + /// The sender of the request who is awaiting a response. + /// + /// See [`RequestResponse::send_response`]. + channel: ResponseChannel, + }, + /// A response message. + Response { + /// The ID of the request that produced this response. + /// + /// See [`RequestResponse::send_request`]. + request_id: RequestId, + /// The response message. + response: TResponse + }, +} + +/// The events emitted by a [`RequestResponse`] protocol. +#[derive(Debug)] +pub enum RequestResponseEvent { + /// An incoming message (request or response). + Message { + /// The peer who sent the message. + peer: PeerId, + /// The incoming message. + message: RequestResponseMessage + }, + /// An outbound request failed. + OutboundFailure { + /// The peer to whom the request was sent. + peer: PeerId, + /// The (local) ID of the failed request. + request_id: RequestId, + /// The error that occurred. + error: OutboundFailure, + }, + /// An inbound request failed. + InboundFailure { + /// The peer from whom the request was received. + peer: PeerId, + /// The error that occurred. + error: InboundFailure, + }, +} + +/// Possible failures occurring in the context of sending +/// an outbound request and receiving the response. +#[derive(Debug)] +pub enum OutboundFailure { + /// The request could not be sent because a dialing attempt failed. + DialFailure, + /// The request timed out before a response was received. + /// + /// It is not known whether the request may have been + /// received (and processed) by the remote peer. + Timeout, + /// The connection closed before a response was received. + /// + /// It is not known whether the request may have been + /// received (and processed) by the remote peer. + ConnectionClosed, + /// The remote supports none of the requested protocols. + UnsupportedProtocols, +} + +/// Possible failures occurring in the context of receiving an +/// inbound request and sending a response. +#[derive(Debug)] +pub enum InboundFailure { + /// The inbound request timed out, either while reading the + /// incoming request or before a response is sent, i.e. if + /// [`RequestResponse::send_response`] is not called in a + /// timely manner. + Timeout, + /// The local peer supports none of the requested protocols. + UnsupportedProtocols, +} + +/// A channel for sending a response to an inbound request. +/// +/// See [`RequestResponse::send_response`]. +#[derive(Debug)] +pub struct ResponseChannel { + peer: PeerId, + sender: oneshot::Sender, +} + +impl ResponseChannel { + /// Checks whether the response channel is still open, i.e. + /// the `RequestResponse` behaviour is still waiting for a + /// a response to be sent via [`RequestResponse::send_response`] + /// and this response channel. + /// + /// If the response channel is no longer open then the inbound + /// request timed out waiting for the response. + pub fn is_open(&self) -> bool { + !self.sender.is_canceled() + } +} + +/// The (local) ID of an outgoing request. +/// +/// See [`RequestResponse::send_request`]. +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] +pub struct RequestId(u64); + +/// The configuration for a `RequestResponse` protocol. +#[derive(Debug, Clone)] +pub struct RequestResponseConfig { + request_timeout: Duration, + connection_keep_alive: Duration, +} + +impl Default for RequestResponseConfig { + fn default() -> Self { + Self { + connection_keep_alive: Duration::from_secs(10), + request_timeout: Duration::from_secs(10), + } + } +} + +impl RequestResponseConfig { + /// Sets the keep-alive timeout of idle connections. + pub fn set_connection_keep_alive(&mut self, v: Duration) -> &mut Self { + self.connection_keep_alive = v; + self + } + + /// Sets the timeout for inbound and outbound requests. + pub fn set_request_timeout(&mut self, v: Duration) -> &mut Self { + self.request_timeout = v; + self + } +} + +/// A request/response protocol for some message codec. +pub struct RequestResponse +where + TCodec: RequestResponseCodec, +{ + /// The supported inbound protocols. + inbound_protocols: SmallVec<[TCodec::Protocol; 2]>, + /// The supported outbound protocols. + outbound_protocols: SmallVec<[TCodec::Protocol; 2]>, + /// The next (local) request ID. + next_request_id: RequestId, + /// The protocol configuration. + config: RequestResponseConfig, + /// The protocol codec for reading and writing requests and responses. + codec: TCodec, + /// Pending events to return from `poll`. + pending_events: VecDeque< + NetworkBehaviourAction< + RequestProtocol, + RequestResponseEvent>>, + /// The currently connected peers and their known, reachable addresses, if any. + connected: HashMap>, + /// Externally managed addresses via `add_address` and `remove_address`. + addresses: HashMap>, + /// Requests that have not yet been sent and are waiting for a connection + /// to be established. + pending_requests: HashMap; 10]>>, + /// Responses that have not yet been received. + pending_responses: HashMap, +} + +impl RequestResponse +where + TCodec: RequestResponseCodec + Clone, +{ + /// Creates a new `RequestResponse` behaviour for the given + /// protocols, codec and configuration. + pub fn new(codec: TCodec, protocols: I, cfg: RequestResponseConfig) -> Self + where + I: IntoIterator + { + let mut inbound_protocols = SmallVec::new(); + let mut outbound_protocols = SmallVec::new(); + for (p, s) in protocols { + if s.inbound() { + inbound_protocols.push(p.clone()); + } + if s.outbound() { + outbound_protocols.push(p.clone()); + } + } + RequestResponse { + inbound_protocols, + outbound_protocols, + next_request_id: RequestId(1), + config: cfg, + codec, + pending_events: VecDeque::new(), + connected: HashMap::new(), + pending_requests: HashMap::new(), + pending_responses: HashMap::new(), + addresses: HashMap::new(), + } + } + + /// Initiates sending a request. + /// + /// If the targeted peer is currently not connected, a dialing + /// attempt is initiated and the request is sent as soon as a + /// connection is established. + /// + /// > **Note**: In order for such a dialing attempt to succeed, + /// > the `RequestResonse` protocol must either be embedded + /// > in another `NetworkBehaviour` that provides peer and + /// > address discovery, or known addresses of peers must be + /// > managed via [`RequestResponse::add_address`] and + /// > [`RequestResponse::remove_address`]. + pub fn send_request(&mut self, peer: &PeerId, request: TCodec::Request) -> RequestId { + let request_id = self.next_request_id(); + let request = RequestProtocol { + request_id, + codec: self.codec.clone(), + protocols: self.outbound_protocols.clone(), + request, + }; + + if let Some(request) = self.try_send_request(peer, request) { + self.pending_events.push_back(NetworkBehaviourAction::DialPeer { + peer_id: peer.clone(), + condition: DialPeerCondition::Disconnected, + }); + self.pending_requests.entry(peer.clone()).or_default().push(request); + } + + request_id + } + + /// Initiates sending a response to an inbound request. + /// + /// If the `ResponseChannel` is already closed due to a timeout, + /// the response is discarded and eventually [`RequestResponseEvent::InboundFailure`] + /// is emitted by `RequestResponse::poll`. + /// + /// The provided `ResponseChannel` is obtained from a + /// [`RequestResponseMessage::Request`]. + pub fn send_response(&mut self, ch: ResponseChannel, rs: TCodec::Response) { + // Fails only if the inbound upgrade timed out waiting for the response, + // in which case the handler emits `RequestResponseHandlerEvent::InboundTimeout` + // which in turn results in `RequestResponseEvent::InboundFailure`. + let _ = ch.sender.send(rs); + } + + /// Adds a known address for a peer that can be used for + /// dialing attempts by the `Swarm`, i.e. is returned + /// by [`NetworkBehaviour::addresses_of_peer`]. + /// + /// Addresses added in this way are only removed by `remove_address`. + pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr) { + self.addresses.entry(peer.clone()).or_default().push(address); + } + + /// Removes an address of a peer previously added via `add_address`. + pub fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) { + let mut last = false; + if let Some(addresses) = self.addresses.get_mut(peer) { + addresses.retain(|a| a != address); + last = addresses.is_empty(); + } + if last { + self.addresses.remove(peer); + } + } + + /// Checks whether a peer is currently connected. + pub fn is_connected(&self, peer: &PeerId) -> bool { + self.connected.contains_key(peer) + } + + /// Checks whether an outbound request initiated by + /// [`RequestResponse::send_request`] is still pending, i.e. waiting + /// for a response. + pub fn is_pending(&self, req_id: &RequestId) -> bool { + self.pending_responses.contains_key(req_id) + } + + /// Returns the next request ID. + fn next_request_id(&mut self) -> RequestId { + let request_id = self.next_request_id; + self.next_request_id.0 += 1; + request_id + } + + /// Tries to send a request by queueing an appropriate event to be + /// emitted to the `Swarm`. If the peer is not currently connected, + /// the given request is return unchanged. + fn try_send_request(&mut self, peer: &PeerId, request: RequestProtocol) + -> Option> + { + if let Some(connections) = self.connected.get(peer) { + let ix = (request.request_id.0 as usize) % connections.len(); + let conn = connections[ix].id; + self.pending_responses.insert(request.request_id, (peer.clone(), conn)); + self.pending_events.push_back(NetworkBehaviourAction::NotifyHandler { + peer_id: peer.clone(), + handler: NotifyHandler::One(conn), + event: request + }); + None + } else { + Some(request) + } + } +} + +impl NetworkBehaviour for RequestResponse +where + TCodec: RequestResponseCodec + Send + Clone + 'static, +{ + type ProtocolsHandler = RequestResponseHandler; + type OutEvent = RequestResponseEvent; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + RequestResponseHandler::new( + self.inbound_protocols.clone(), + self.codec.clone(), + self.config.connection_keep_alive, + self.config.request_timeout, + ) + } + + fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec { + let mut addresses = Vec::new(); + if let Some(connections) = self.connected.get(peer) { + addresses.extend(connections.iter().filter_map(|c| c.address.clone())) + } + if let Some(more) = self.addresses.get(peer) { + addresses.extend(more.into_iter().cloned()); + } + addresses + } + + fn inject_connected(&mut self, peer: &PeerId) { + if let Some(pending) = self.pending_requests.remove(peer) { + for request in pending { + let request = self.try_send_request(peer, request); + assert!(request.is_none()); + } + } + } + + fn inject_connection_established(&mut self, peer: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) { + let address = match endpoint { + ConnectedPoint::Dialer { address } => Some(address.clone()), + ConnectedPoint::Listener { .. } => None + }; + let connections = self.connected.entry(peer.clone()).or_default(); + connections.push(Connection { id: *conn, address }) + } + + fn inject_connection_closed(&mut self, peer: &PeerId, conn: &ConnectionId, _: &ConnectedPoint) { + if let Some(connections) = self.connected.get_mut(peer) { + if let Some(pos) = connections.iter().position(|c| &c.id == conn) { + connections.remove(pos); + } + } + + // Any pending responses of requests sent over this connection + // must be considered failed. + let failed = self.pending_responses.iter() + .filter_map(|(r, (p, c))| + if conn == c { + Some((p.clone(), *r)) + } else { + None + }) + .collect::>(); + + for (peer, request_id) in failed { + self.pending_responses.remove(&request_id); + self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent( + RequestResponseEvent::OutboundFailure { + peer, + request_id, + error: OutboundFailure::ConnectionClosed + } + )); + } + } + + fn inject_disconnected(&mut self, peer: &PeerId) { + self.connected.remove(peer); + } + + fn inject_dial_failure(&mut self, peer: &PeerId) { + // If there are pending outgoing requests when a dial failure occurs, + // it is implied that we are not connected to the peer, since pending + // outgoing requests are drained when a connection is established and + // only created when a peer is not connected when a request is made. + // Thus these requests must be considered failed, even if there is + // another, concurrent dialing attempt ongoing. + if let Some(pending) = self.pending_requests.remove(peer) { + for request in pending { + self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent( + RequestResponseEvent::OutboundFailure { + peer: peer.clone(), + request_id: request.request_id, + error: OutboundFailure::DialFailure + } + )); + } + } + } + + fn inject_event( + &mut self, + peer: PeerId, + _: ConnectionId, + event: RequestResponseHandlerEvent, + ) { + match event { + RequestResponseHandlerEvent::Response { request_id, response } => { + self.pending_responses.remove(&request_id); + let message = RequestResponseMessage::Response { request_id, response }; + self.pending_events.push_back( + NetworkBehaviourAction::GenerateEvent( + RequestResponseEvent::Message { peer, message })); + } + RequestResponseHandlerEvent::Request { request, sender } => { + let channel = ResponseChannel { peer: peer.clone(), sender }; + let message = RequestResponseMessage::Request { request, channel }; + self.pending_events.push_back( + NetworkBehaviourAction::GenerateEvent( + RequestResponseEvent::Message { peer, message })); + } + RequestResponseHandlerEvent::OutboundTimeout(request_id) => { + if let Some((peer, _conn)) = self.pending_responses.remove(&request_id) { + self.pending_events.push_back( + NetworkBehaviourAction::GenerateEvent( + RequestResponseEvent::OutboundFailure { + peer, + request_id, + error: OutboundFailure::Timeout, + })); + } + } + RequestResponseHandlerEvent::InboundTimeout => { + self.pending_events.push_back( + NetworkBehaviourAction::GenerateEvent( + RequestResponseEvent::InboundFailure { + peer, + error: InboundFailure::Timeout, + })); + } + RequestResponseHandlerEvent::OutboundUnsupportedProtocols(request_id) => { + self.pending_events.push_back( + NetworkBehaviourAction::GenerateEvent( + RequestResponseEvent::OutboundFailure { + peer, + request_id, + error: OutboundFailure::UnsupportedProtocols, + })); + } + RequestResponseHandlerEvent::InboundUnsupportedProtocols => { + self.pending_events.push_back( + NetworkBehaviourAction::GenerateEvent( + RequestResponseEvent::InboundFailure { + peer, + error: InboundFailure::UnsupportedProtocols, + })); + } + } + } + + fn poll(&mut self, _: &mut Context, _: &mut impl PollParameters) + -> Poll, + RequestResponseEvent + >> + { + if let Some(ev) = self.pending_events.pop_front() { + return Poll::Ready(ev); + } else if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD { + self.pending_events.shrink_to_fit(); + } + + Poll::Pending + } +} + +/// Internal threshold for when to shrink the capacity +/// of empty queues. If the capacity of an empty queue +/// exceeds this threshold, the associated memory is +/// released. +const EMPTY_QUEUE_SHRINK_THRESHOLD: usize = 100; + +/// Internal information tracked for an established connection. +struct Connection { + id: ConnectionId, + address: Option, +} + diff --git a/protocols/request-response/tests/ping.rs b/protocols/request-response/tests/ping.rs new file mode 100644 index 00000000000..107a37edf04 --- /dev/null +++ b/protocols/request-response/tests/ping.rs @@ -0,0 +1,195 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Integration tests for the `RequestResponse` network behaviour. + +use async_trait::async_trait; +use libp2p_core::{ + Multiaddr, + PeerId, + identity, + muxing::StreamMuxerBox, + transport::{Transport, boxed::Boxed}, + upgrade::{self, read_one, write_one} +}; +use libp2p_noise::{NoiseConfig, X25519Spec, Keypair}; +use libp2p_request_response::*; +use libp2p_swarm::Swarm; +use libp2p_tcp::TcpConfig; +use futures::{prelude::*, channel::mpsc}; +use rand::{self, Rng}; +use std::{io, iter}; + +/// Exercises a simple ping protocol. +#[test] +fn ping_protocol() { + let num_pings: u8 = rand::thread_rng().gen_range(1, 100); + + let ping = Ping("ping".to_string().into_bytes()); + let pong = Pong("pong".to_string().into_bytes()); + + let protocols = iter::once((PingProtocol(), ProtocolSupport::Full)); + let cfg = RequestResponseConfig::default(); + + let (peer1_id, trans) = mk_transport(); + let ping_proto1 = RequestResponse::new(PingCodec(), protocols.clone(), cfg.clone()); + let mut swarm1 = Swarm::new(trans, ping_proto1, peer1_id.clone()); + + let (peer2_id, trans) = mk_transport(); + let ping_proto2 = RequestResponse::new(PingCodec(), protocols, cfg); + let mut swarm2 = Swarm::new(trans, ping_proto2, peer2_id.clone()); + + let (mut tx, mut rx) = mpsc::channel::(1); + + let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap(); + Swarm::listen_on(&mut swarm1, addr).unwrap(); + + let expected_ping = ping.clone(); + let expected_pong = pong.clone(); + + let peer1 = async move { + while let Some(_) = swarm1.next().now_or_never() {} + + let l = Swarm::listeners(&swarm1).next().unwrap(); + tx.send(l.clone()).await.unwrap(); + + loop { + match swarm1.next().await { + RequestResponseEvent::Message { + peer, + message: RequestResponseMessage::Request { request, channel } + } => { + assert_eq!(&request, &expected_ping); + assert_eq!(&peer, &peer2_id); + swarm1.send_response(channel, pong.clone()); + }, + e => panic!("Peer1: Unexpected event: {:?}", e) + } + } + }; + + let peer2 = async move { + let mut count = 0; + let addr = rx.next().await.unwrap(); + swarm2.add_address(&peer1_id, addr.clone()); + let mut req_id = swarm2.send_request(&peer1_id, ping.clone()); + + loop { + match swarm2.next().await { + RequestResponseEvent::Message { + peer, + message: RequestResponseMessage::Response { request_id, response } + } => { + count += 1; + assert_eq!(&response, &expected_pong); + assert_eq!(&peer, &peer1_id); + assert_eq!(req_id, request_id); + if count >= num_pings { + return + } else { + req_id = swarm2.send_request(&peer1_id, ping.clone()); + } + }, + e => panic!("Peer2: Unexpected event: {:?}", e) + } + } + }; + + async_std::task::spawn(Box::pin(peer1)); + let () = async_std::task::block_on(peer2); +} + +fn mk_transport() -> (PeerId, Boxed<(PeerId, StreamMuxerBox), io::Error>) { + let id_keys = identity::Keypair::generate_ed25519(); + let peer_id = id_keys.public().into_peer_id(); + let noise_keys = Keypair::::new().into_authentic(&id_keys).unwrap(); + let transport = TcpConfig::new() + .nodelay(true) + .upgrade(upgrade::Version::V1) + .authenticate(NoiseConfig::xx(noise_keys).into_authenticated()) + .multiplex(libp2p_yamux::Config::default()) + .map(|(peer, muxer), _| (peer, StreamMuxerBox::new(muxer))) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err)) + .boxed(); + (peer_id, transport) +} + +// Simple Ping-Pong Protocol + +#[derive(Debug, Clone)] +struct PingProtocol(); +#[derive(Clone)] +struct PingCodec(); +#[derive(Debug, Clone, PartialEq, Eq)] +struct Ping(Vec); +#[derive(Debug, Clone, PartialEq, Eq)] +struct Pong(Vec); + +impl ProtocolName for PingProtocol { + fn protocol_name(&self) -> &[u8] { + "/ping/1".as_bytes() + } +} + +#[async_trait] +impl RequestResponseCodec for PingCodec { + type Protocol = PingProtocol; + type Request = Ping; + type Response = Pong; + + async fn read_request(&mut self, _: &PingProtocol, io: &mut T) + -> io::Result + where + T: AsyncRead + Unpin + Send + { + read_one(io, 1024) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) + .map_ok(Ping) + .await + } + + async fn read_response(&mut self, _: &PingProtocol, io: &mut T) + -> io::Result + where + T: AsyncRead + Unpin + Send + { + read_one(io, 1024) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) + .map_ok(Pong) + .await + } + + async fn write_request(&mut self, _: &PingProtocol, io: &mut T, Ping(data): Ping) + -> io::Result<()> + where + T: AsyncWrite + Unpin + Send + { + write_one(io, data).await + } + + async fn write_response(&mut self, _: &PingProtocol, io: &mut T, Pong(data): Pong) + -> io::Result<()> + where + T: AsyncWrite + Unpin + Send + { + write_one(io, data).await + } +} + diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index e294ca61489..a10d9fe66cf 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -1,3 +1,10 @@ +# 0.20.0 [????-??-??] + +- Add `ProtocolsHandler::inject_listen_upgrade_error`, the inbound +analogue of `ProtocolsHandler::inject_dial_upgrade_error`, with an +empty default implementation. No implementation is required to +retain existing behaviour. + # 0.19.1 [2020-06-18] - Bugfix: Fix MultiHandler panicking when empty diff --git a/swarm/src/protocols_handler.rs b/swarm/src/protocols_handler.rs index 95722ed9388..62be4138b3e 100644 --- a/swarm/src/protocols_handler.rs +++ b/swarm/src/protocols_handler.rs @@ -140,7 +140,7 @@ pub trait ProtocolsHandler: Send + 'static { /// Injects an event coming from the outside in the handler. fn inject_event(&mut self, event: Self::InEvent); - /// Indicates to the handler that upgrading a substream to the given protocol has failed. + /// Indicates to the handler that upgrading an outbound substream to the given protocol has failed. fn inject_dial_upgrade_error( &mut self, info: Self::OutboundOpenInfo, @@ -149,6 +149,14 @@ pub trait ProtocolsHandler: Send + 'static { > ); + /// Indicates to the handler that upgrading an inbound substream to the given protocol has failed. + fn inject_listen_upgrade_error( + &mut self, + _: ProtocolsHandlerUpgrErr< + ::Error + > + ) {} + /// Returns until when the connection should be kept alive. /// /// This method is called by the `Swarm` after each invocation of @@ -236,7 +244,7 @@ pub struct SubstreamProtocol { } impl SubstreamProtocol { - /// Create a new `ListenProtocol` from the given upgrade. + /// Create a new `SubstreamProtocol` from the given upgrade. /// /// The default timeout for applying the given upgrade on a substream is /// 10 seconds. diff --git a/swarm/src/protocols_handler/node_handler.rs b/swarm/src/protocols_handler/node_handler.rs index 8b04506170c..f9c7ea1ce03 100644 --- a/swarm/src/protocols_handler/node_handler.rs +++ b/swarm/src/protocols_handler/node_handler.rs @@ -228,15 +228,26 @@ where for n in (0..self.negotiating_in.len()).rev() { let (mut in_progress, mut timeout) = self.negotiating_in.swap_remove(n); match Future::poll(Pin::new(&mut timeout), cx) { - Poll::Ready(_) => continue, + Poll::Ready(Ok(_)) => { + let err = ProtocolsHandlerUpgrErr::Timeout; + self.handler.inject_listen_upgrade_error(err); + continue + } + Poll::Ready(Err(_)) => { + let err = ProtocolsHandlerUpgrErr::Timer; + self.handler.inject_listen_upgrade_error(err); + continue; + } Poll::Pending => {}, } match Future::poll(Pin::new(&mut in_progress), cx) { Poll::Ready(Ok(upgrade)) => self.handler.inject_fully_negotiated_inbound(upgrade), Poll::Pending => self.negotiating_in.push((in_progress, timeout)), - // TODO: return a diagnostic event? - Poll::Ready(Err(_err)) => {} + Poll::Ready(Err(err)) => { + let err = ProtocolsHandlerUpgrErr::Upgrade(err); + self.handler.inject_listen_upgrade_error(err); + } } } diff --git a/swarm/src/protocols_handler/one_shot.rs b/swarm/src/protocols_handler/one_shot.rs index 92d63088895..4659fe0faaa 100644 --- a/swarm/src/protocols_handler/one_shot.rs +++ b/swarm/src/protocols_handler/one_shot.rs @@ -31,23 +31,20 @@ use smallvec::SmallVec; use std::{error, task::Context, task::Poll, time::Duration}; use wasm_timer::Instant; -/// Implementation of `ProtocolsHandler` that opens a new substream for each individual message. -/// -/// This struct is meant to be a helper for other implementations to use. +/// A `ProtocolsHandler` that opens a new substream for each request. // TODO: Debug -pub struct OneShotHandler +pub struct OneShotHandler where - TOutProto: OutboundUpgradeSend, + TOutbound: OutboundUpgradeSend, { /// The upgrade for inbound substreams. - listen_protocol: SubstreamProtocol, + listen_protocol: SubstreamProtocol, /// If `Some`, something bad happened and we should shut down the handler with an error. - pending_error: - Option::Error>>, + pending_error: Option::Error>>, /// Queue of events to produce in `poll()`. - events_out: SmallVec<[TOutEvent; 4]>, + events_out: SmallVec<[TEvent; 4]>, /// Queue of outbound substreams to open. - dial_queue: SmallVec<[TOutProto; 4]>, + dial_queue: SmallVec<[TOutbound; 4]>, /// Current number of concurrent outbound substreams being opened. dial_negotiated: u32, /// Maximum number of concurrent outbound substreams being opened. Value is never modified. @@ -58,15 +55,14 @@ where config: OneShotHandlerConfig, } -impl - OneShotHandler +impl + OneShotHandler where - TOutProto: OutboundUpgradeSend, + TOutbound: OutboundUpgradeSend, { /// Creates a `OneShotHandler`. - #[inline] pub fn new( - listen_protocol: SubstreamProtocol, + listen_protocol: SubstreamProtocol, config: OneShotHandlerConfig, ) -> Self { OneShotHandler { @@ -77,12 +73,11 @@ where dial_negotiated: 0, max_dial_negotiated: 8, keep_alive: KeepAlive::Yes, - config + config, } } /// Returns the number of pending requests. - #[inline] pub fn pending_requests(&self) -> u32 { self.dial_negotiated + self.dial_queue.len() as u32 } @@ -91,8 +86,7 @@ where /// /// > **Note**: If you modify the protocol, modifications will only applies to future inbound /// > substreams, not the ones already being negotiated. - #[inline] - pub fn listen_protocol_ref(&self) -> &SubstreamProtocol { + pub fn listen_protocol_ref(&self) -> &SubstreamProtocol { &self.listen_protocol } @@ -100,26 +94,23 @@ where /// /// > **Note**: If you modify the protocol, modifications will only applies to future inbound /// > substreams, not the ones already being negotiated. - #[inline] - pub fn listen_protocol_mut(&mut self) -> &mut SubstreamProtocol { + pub fn listen_protocol_mut(&mut self) -> &mut SubstreamProtocol { &mut self.listen_protocol } /// Opens an outbound substream with `upgrade`. - #[inline] - pub fn send_request(&mut self, upgrade: TOutProto) { + pub fn send_request(&mut self, upgrade: TOutbound) { self.keep_alive = KeepAlive::Yes; self.dial_queue.push(upgrade); } } -impl Default - for OneShotHandler +impl Default + for OneShotHandler where - TOutProto: OutboundUpgradeSend, - TInProto: InboundUpgradeSend + Default, + TOutbound: OutboundUpgradeSend, + TInbound: InboundUpgradeSend + Default, { - #[inline] fn default() -> Self { OneShotHandler::new( SubstreamProtocol::new(Default::default()), @@ -128,45 +119,42 @@ where } } -impl ProtocolsHandler - for OneShotHandler +impl ProtocolsHandler + for OneShotHandler where - TInProto: InboundUpgradeSend + Send + 'static, - TOutProto: OutboundUpgradeSend, - TInProto::Output: Into, - TOutProto::Output: Into, - TOutProto::Error: error::Error + Send + 'static, - SubstreamProtocol: Clone, - TOutEvent: Send + 'static, + TInbound: InboundUpgradeSend + Send + 'static, + TOutbound: OutboundUpgradeSend, + TInbound::Output: Into, + TOutbound::Output: Into, + TOutbound::Error: error::Error + Send + 'static, + SubstreamProtocol: Clone, + TEvent: Send + 'static, { - type InEvent = TOutProto; - type OutEvent = TOutEvent; + type InEvent = TOutbound; + type OutEvent = TEvent; type Error = ProtocolsHandlerUpgrErr< ::Error, >; - type InboundProtocol = TInProto; - type OutboundProtocol = TOutProto; + type InboundProtocol = TInbound; + type OutboundProtocol = TOutbound; type OutboundOpenInfo = (); - #[inline] fn listen_protocol(&self) -> SubstreamProtocol { self.listen_protocol.clone() } - #[inline] fn inject_fully_negotiated_inbound( &mut self, out: ::Output, ) { // If we're shutting down the connection for inactivity, reset the timeout. if !self.keep_alive.is_yes() { - self.keep_alive = KeepAlive::Until(Instant::now() + self.config.inactive_timeout); + self.keep_alive = KeepAlive::Until(Instant::now() + self.config.keep_alive_timeout); } self.events_out.push(out.into()); } - #[inline] fn inject_fully_negotiated_outbound( &mut self, out: ::Output, @@ -175,21 +163,19 @@ where self.dial_negotiated -= 1; if self.dial_negotiated == 0 && self.dial_queue.is_empty() { - self.keep_alive = KeepAlive::Until(Instant::now() + self.config.inactive_timeout); + self.keep_alive = KeepAlive::Until(Instant::now() + self.config.keep_alive_timeout); } self.events_out.push(out.into()); } - #[inline] fn inject_event(&mut self, event: Self::InEvent) { self.send_request(event); } - #[inline] fn inject_dial_upgrade_error( &mut self, - _: Self::OutboundOpenInfo, + _info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr< ::Error, >, @@ -199,7 +185,6 @@ where } } - #[inline] fn connection_keep_alive(&self) -> KeepAlive { self.keep_alive } @@ -211,12 +196,12 @@ where ProtocolsHandlerEvent, > { if let Some(err) = self.pending_error.take() { - return Poll::Ready(ProtocolsHandlerEvent::Close(err)); + return Poll::Ready(ProtocolsHandlerEvent::Close(err)) } if !self.events_out.is_empty() { return Poll::Ready(ProtocolsHandlerEvent::Custom( - self.events_out.remove(0), + self.events_out.remove(0) )); } else { self.events_out.shrink_to_fit(); @@ -225,10 +210,11 @@ where if !self.dial_queue.is_empty() { if self.dial_negotiated < self.max_dial_negotiated { self.dial_negotiated += 1; + let upgrade = self.dial_queue.remove(0); return Poll::Ready( ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(self.dial_queue.remove(0)) - .with_timeout(self.config.substream_timeout), + protocol: SubstreamProtocol::new(upgrade) + .with_timeout(self.config.outbound_substream_timeout), info: (), }, ); @@ -244,17 +230,18 @@ where /// Configuration parameters for the `OneShotHandler` #[derive(Debug)] pub struct OneShotHandlerConfig { - /// After the given duration has elapsed, an inactive connection will shutdown. - pub inactive_timeout: Duration, - /// Timeout duration for each newly opened outbound substream. - pub substream_timeout: Duration, + /// Keep-alive timeout for idle connections. + pub keep_alive_timeout: Duration, + /// Timeout for outbound substream upgrades. + pub outbound_substream_timeout: Duration, } impl Default for OneShotHandlerConfig { fn default() -> Self { - let inactive_timeout = Duration::from_secs(10); - let substream_timeout = Duration::from_secs(10); - OneShotHandlerConfig { inactive_timeout, substream_timeout } + OneShotHandlerConfig { + keep_alive_timeout: Duration::from_secs(10), + outbound_substream_timeout: Duration::from_secs(10), + } } }