diff --git a/protocols/identify/CHANGELOG.md b/protocols/identify/CHANGELOG.md index f85477d8434..61032f9b6a6 100644 --- a/protocols/identify/CHANGELOG.md +++ b/protocols/identify/CHANGELOG.md @@ -1,7 +1,12 @@ # 0.42.0 [unreleased] +- Move I/O from `Behaviour` to `Handler`. Handle `Behaviour`'s Identify and Push requests independently by incoming order, + previously Push requests were prioritized. see [PR 3208]. + - Update to `libp2p-swarm` `v0.42.0`. +[PR 3208]: https://github.com/libp2p/rust-libp2p/pull/3208 + # 0.41.0 - Change default `cache_size` of `Config` to 100. See [PR 2995]. diff --git a/protocols/identify/src/behaviour.rs b/protocols/identify/src/behaviour.rs index 0e53470fd40..f37abfc0da7 100644 --- a/protocols/identify/src/behaviour.rs +++ b/protocols/identify/src/behaviour.rs @@ -18,24 +18,21 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::handler::{self, Proto, Push}; -use crate::protocol::{Info, ReplySubstream, UpgradeError}; -use futures::prelude::*; +use crate::handler::{self, InEvent, Proto}; +use crate::protocol::{Info, Protocol, UpgradeError}; use libp2p_core::{ - connection::ConnectionId, multiaddr::Protocol, ConnectedPoint, Multiaddr, PeerId, PublicKey, + connection::ConnectionId, multiaddr, ConnectedPoint, Multiaddr, PeerId, PublicKey, }; use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm}; use libp2p_swarm::{ dial_opts::DialOpts, AddressScore, ConnectionHandler, ConnectionHandlerUpgrErr, DialError, - IntoConnectionHandler, NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction, - NotifyHandler, PollParameters, + IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, }; use lru::LruCache; use std::num::NonZeroUsize; use std::{ collections::{HashMap, HashSet, VecDeque}, iter::FromIterator, - pin::Pin, task::Context, task::Poll, time::Duration, @@ -51,30 +48,23 @@ pub struct Behaviour { config: Config, /// For each peer we're connected to, the observed address to send back to it. connected: HashMap>, - /// Pending replies to send. - pending_replies: VecDeque, + /// Pending requests to be fulfilled, either `Handler` requests for `Behaviour` info + /// to address identification requests, or push requests to peers + /// with current information about the local peer. + requests: Vec, /// Pending events to be emitted when polled. events: VecDeque>, - /// Peers to which an active push with current information about - /// the local peer should be sent. - pending_push: HashSet, /// The addresses of all peers that we have discovered. discovered_peers: PeerCache, } -/// A pending reply to an inbound identification request. -enum Reply { - /// The reply is queued for sending. - Queued { - peer: PeerId, - io: ReplySubstream, - observed: Multiaddr, - }, - /// The reply is being sent. - Sending { - peer: PeerId, - io: Pin> + Send>>, - }, +/// A `Behaviour` request to be fulfilled, either `Handler` requests for `Behaviour` info +/// to address identification requests, or push requests to peers +/// with current information about the local peer. +#[derive(Debug, PartialEq, Eq)] +struct Request { + peer_id: PeerId, + protocol: Protocol, } /// Configuration for the [`identify::Behaviour`](Behaviour). @@ -184,9 +174,8 @@ impl Behaviour { Self { config, connected: HashMap::new(), - pending_replies: VecDeque::new(), + requests: Vec::new(), events: VecDeque::new(), - pending_push: HashSet::new(), discovered_peers, } } @@ -197,7 +186,13 @@ impl Behaviour { I: IntoIterator, { for p in peers { - if self.pending_push.insert(p) && !self.connected.contains_key(&p) { + let request = Request { + peer_id: p, + protocol: Protocol::Push, + }; + if !self.requests.contains(&request) { + self.requests.push(request); + let handler = self.new_handler(); self.events.push_back(NetworkBehaviourAction::Dial { opts: DialOpts::peer_id(p).build(), @@ -240,13 +235,19 @@ impl NetworkBehaviour for Behaviour { type OutEvent = Event; fn new_handler(&mut self) -> Self::ConnectionHandler { - Proto::new(self.config.initial_delay, self.config.interval) + Proto::new( + self.config.initial_delay, + self.config.interval, + self.config.local_public_key.clone(), + self.config.protocol_version.clone(), + self.config.agent_version.clone(), + ) } fn on_connection_handler_event( &mut self, peer_id: PeerId, - connection: ConnectionId, + connection_id: ConnectionId, event: <::Handler as ConnectionHandler>::OutEvent, ) { match event { @@ -271,26 +272,22 @@ impl NetworkBehaviour for Behaviour { score: AddressScore::Finite(1), }); } + handler::Event::Identification(peer) => { + self.events + .push_back(NetworkBehaviourAction::GenerateEvent(Event::Sent { + peer_id: peer, + })); + } handler::Event::IdentificationPushed => { self.events .push_back(NetworkBehaviourAction::GenerateEvent(Event::Pushed { peer_id, })); } - handler::Event::Identify(sender) => { - let observed = self - .connected - .get(&peer_id) - .and_then(|addrs| addrs.get(&connection)) - .expect( - "`on_connection_handler_event` is only called \ - with an established connection and calling `NetworkBehaviour::on_event` \ - with `FromSwarm::ConnectionEstablished ensures there is an entry; qed", - ); - self.pending_replies.push_back(Reply::Queued { - peer: peer_id, - io: sender, - observed: observed.clone(), + handler::Event::Identify => { + self.requests.push(Request { + peer_id, + protocol: Protocol::Identify(connection_id), }); } handler::Event::IdentificationError(error) => { @@ -305,99 +302,41 @@ impl NetworkBehaviour for Behaviour { fn poll( &mut self, - cx: &mut Context<'_>, + _cx: &mut Context<'_>, params: &mut impl PollParameters, ) -> Poll> { if let Some(event) = self.events.pop_front() { return Poll::Ready(event); } - // Check for a pending active push to perform. - let peer_push = self.pending_push.iter().find_map(|peer| { - self.connected.get(peer).map(|conns| { - let observed_addr = conns - .values() - .next() - .expect("connected peer has a connection") - .clone(); - - let listen_addrs = listen_addrs(params); - let protocols = supported_protocols(params); - - let info = Info { - public_key: self.config.local_public_key.clone(), - protocol_version: self.config.protocol_version.clone(), - agent_version: self.config.agent_version.clone(), - listen_addrs, - protocols, - observed_addr, - }; - - (*peer, Push(info)) - }) - }); - - if let Some((peer_id, push)) = peer_push { - self.pending_push.remove(&peer_id); - return Poll::Ready(NetworkBehaviourAction::NotifyHandler { + // Check for pending requests. + match self.requests.pop() { + Some(Request { + peer_id, + protocol: Protocol::Push, + }) => Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, - event: push, handler: NotifyHandler::Any, - }); - } - - // Check for pending replies to send. - if let Some(r) = self.pending_replies.pop_front() { - let mut sending = 0; - let to_send = self.pending_replies.len() + 1; - let mut reply = Some(r); - loop { - match reply { - Some(Reply::Queued { peer, io, observed }) => { - let info = Info { - listen_addrs: listen_addrs(params), - protocols: supported_protocols(params), - public_key: self.config.local_public_key.clone(), - protocol_version: self.config.protocol_version.clone(), - agent_version: self.config.agent_version.clone(), - observed_addr: observed, - }; - let io = Box::pin(io.send(info)); - reply = Some(Reply::Sending { peer, io }); - } - Some(Reply::Sending { peer, mut io }) => { - sending += 1; - match Future::poll(Pin::new(&mut io), cx) { - Poll::Ready(Ok(())) => { - let event = Event::Sent { peer_id: peer }; - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); - } - Poll::Pending => { - self.pending_replies.push_back(Reply::Sending { peer, io }); - if sending == to_send { - // All remaining futures are NotReady - break; - } else { - reply = self.pending_replies.pop_front(); - } - } - Poll::Ready(Err(err)) => { - let event = Event::Error { - peer_id: peer, - error: ConnectionHandlerUpgrErr::Upgrade( - libp2p_core::upgrade::UpgradeError::Apply(err), - ), - }; - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); - } - } - } - None => unreachable!(), - } - } + event: InEvent { + listen_addrs: listen_addrs(params), + supported_protocols: supported_protocols(params), + protocol: Protocol::Push, + }, + }), + Some(Request { + peer_id, + protocol: Protocol::Identify(connection_id), + }) => Poll::Ready(NetworkBehaviourAction::NotifyHandler { + peer_id, + handler: NotifyHandler::One(connection_id), + event: InEvent { + listen_addrs: listen_addrs(params), + supported_protocols: supported_protocols(params), + protocol: Protocol::Identify(connection_id), + }, + }), + None => Poll::Pending, } - - Poll::Pending } fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec { @@ -417,7 +356,13 @@ impl NetworkBehaviour for Behaviour { }) => { if remaining_established == 0 { self.connected.remove(&peer_id); - self.pending_push.remove(&peer_id); + self.requests.retain(|request| { + request + != &Request { + peer_id, + protocol: Protocol::Push, + } + }); } else if let Some(addrs) = self.connected.get_mut(&peer_id) { addrs.remove(&connection_id); } @@ -425,7 +370,13 @@ impl NetworkBehaviour for Behaviour { FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) => { if let Some(peer_id) = peer_id { if !self.connected.contains_key(&peer_id) { - self.pending_push.remove(&peer_id); + self.requests.retain(|request| { + request + != &Request { + peer_id, + protocol: Protocol::Push, + } + }); } } @@ -437,14 +388,17 @@ impl NetworkBehaviour for Behaviour { } } } - FromSwarm::NewListenAddr(_) => { - if self.config.push_listen_addr_updates { - self.pending_push.extend(self.connected.keys()); - } - } - FromSwarm::ExpiredListenAddr(_) => { + FromSwarm::NewListenAddr(_) | FromSwarm::ExpiredListenAddr(_) => { if self.config.push_listen_addr_updates { - self.pending_push.extend(self.connected.keys()); + for p in self.connected.keys() { + let request = Request { + peer_id: *p, + protocol: Protocol::Push, + }; + if !self.requests.contains(&request) { + self.requests.push(request); + } + } } } FromSwarm::AddressChange(_) @@ -509,7 +463,7 @@ fn listen_addrs(params: &impl PollParameters) -> Vec { /// the given peer_id. If there is no peer_id for the peer in the mutiaddr, this returns true. fn multiaddr_matches_peer_id(addr: &Multiaddr, peer_id: &PeerId) -> bool { let last_component = addr.iter().last(); - if let Some(Protocol::P2p(multi_addr_peer_id)) = last_component { + if let Some(multiaddr::Protocol::P2p(multi_addr_peer_id)) = last_component { return multi_addr_peer_id == *peer_id.as_ref(); } true @@ -557,6 +511,7 @@ impl PeerCache { mod tests { use super::*; use futures::pin_mut; + use futures::prelude::*; use libp2p_core::{identity, muxing::StreamMuxerBox, transport, upgrade, PeerId, Transport}; use libp2p_mplex::MplexConfig; use libp2p_noise as noise; @@ -618,7 +573,7 @@ mod tests { // nb. Either swarm may receive the `Identified` event first, upon which // it will permit the connection to be closed, as defined by - // `IdentifyHandler::connection_keep_alive`. Hence the test succeeds if + // `Handler::connection_keep_alive`. Hence the test succeeds if // either `Identified` event arrives correctly. async_std::task::block_on(async move { loop { @@ -835,8 +790,8 @@ mod tests { let addr_without_peer_id: Multiaddr = addr.clone(); let mut addr_with_other_peer_id = addr.clone(); - addr.push(Protocol::P2p(peer_id.into())); - addr_with_other_peer_id.push(Protocol::P2p(other_peer_id.into())); + addr.push(multiaddr::Protocol::P2p(peer_id.into())); + addr_with_other_peer_id.push(multiaddr::Protocol::P2p(other_peer_id.into())); assert!(multiaddr_matches_peer_id(&addr, &peer_id)); assert!(!multiaddr_matches_peer_id( diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index 0de54f0a006..21063acc661 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -19,14 +19,15 @@ // DEALINGS IN THE SOFTWARE. use crate::protocol::{ - InboundPush, Info, OutboundPush, Protocol, PushProtocol, ReplySubstream, UpgradeError, + self, Identify, InboundPush, Info, OutboundPush, Protocol, Push, UpgradeError, }; use futures::future::BoxFuture; use futures::prelude::*; +use futures::stream::FuturesUnordered; use futures_timer::Delay; use libp2p_core::either::{EitherError, EitherOutput}; use libp2p_core::upgrade::{EitherUpgrade, SelectUpgrade}; -use libp2p_core::{ConnectedPoint, PeerId}; +use libp2p_core::{ConnectedPoint, Multiaddr, PeerId, PublicKey}; use libp2p_swarm::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, }; @@ -36,18 +37,31 @@ use libp2p_swarm::{ }; use log::warn; use smallvec::SmallVec; +use std::collections::VecDeque; use std::{io, pin::Pin, task::Context, task::Poll, time::Duration}; pub struct Proto { initial_delay: Duration, interval: Duration, + public_key: PublicKey, + protocol_version: String, + agent_version: String, } impl Proto { - pub fn new(initial_delay: Duration, interval: Duration) -> Self { + pub fn new( + initial_delay: Duration, + interval: Duration, + public_key: PublicKey, + protocol_version: String, + agent_version: String, + ) -> Self { Proto { initial_delay, interval, + public_key, + protocol_version, + agent_version, } } } @@ -55,12 +69,25 @@ impl Proto { impl IntoConnectionHandler for Proto { type Handler = Handler; - fn into_handler(self, remote_peer_id: &PeerId, _endpoint: &ConnectedPoint) -> Self::Handler { - Handler::new(self.initial_delay, self.interval, *remote_peer_id) + fn into_handler(self, remote_peer_id: &PeerId, endpoint: &ConnectedPoint) -> Self::Handler { + let observed_addr = match endpoint { + ConnectedPoint::Dialer { address, .. } => address, + ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr, + }; + + Handler::new( + self.initial_delay, + self.interval, + *remote_peer_id, + self.public_key, + self.protocol_version, + self.agent_version, + observed_addr.clone(), + ) } fn inbound_protocol(&self) -> ::InboundProtocol { - SelectUpgrade::new(Protocol, PushProtocol::inbound()) + SelectUpgrade::new(Identify, Push::inbound()) } } @@ -74,14 +101,16 @@ pub struct Handler { inbound_identify_push: Option>>, /// Pending events to yield. events: SmallVec< - [ConnectionHandlerEvent< - EitherUpgrade>, - (), - Event, - io::Error, - >; 4], + [ConnectionHandlerEvent>, (), Event, io::Error>; + 4], >, + /// Streams awaiting `BehaviourInfo` to then send identify requests. + reply_streams: VecDeque, + + /// Pending identification replies, awaiting being sent. + pending_replies: FuturesUnordered>>, + /// Future that fires when we need to identify the node again. trigger_next_identify: Delay, @@ -90,36 +119,75 @@ pub struct Handler { /// The interval of `trigger_next_identify`, i.e. the recurrent delay. interval: Duration, + + /// The public key of the local peer. + public_key: PublicKey, + + /// Application-specific version of the protocol family used by the peer, + /// e.g. `ipfs/1.0.0` or `polkadot/1.0.0`. + protocol_version: String, + + /// Name and version of the peer, similar to the `User-Agent` header in + /// the HTTP protocol. + agent_version: String, + + /// Address observed by or for the remote. + observed_addr: Multiaddr, } -/// Event produced by the `IdentifyHandler`. +/// An event from `Behaviour` with the information requested by the `Handler`. +#[derive(Debug)] +pub struct InEvent { + /// The addresses that the peer is listening on. + pub listen_addrs: Vec, + + /// The list of protocols supported by the peer, e.g. `/ipfs/ping/1.0.0`. + pub supported_protocols: Vec, + + /// The protocol w.r.t. the information requested. + pub protocol: Protocol, +} + +/// Event produced by the `Handler`. #[derive(Debug)] #[allow(clippy::large_enum_variant)] pub enum Event { /// We obtained identification information from the remote. Identified(Info), + /// We replied to an identification request from the remote. + Identification(PeerId), /// We actively pushed our identification information to the remote. IdentificationPushed, /// We received a request for identification. - Identify(ReplySubstream), - /// Failed to identify the remote. + Identify, + /// Failed to identify the remote, or to reply to an identification request. IdentificationError(ConnectionHandlerUpgrErr), } -/// Identifying information of the local node that is pushed to a remote. -#[derive(Debug)] -pub struct Push(pub Info); - impl Handler { - /// Creates a new `IdentifyHandler`. - pub fn new(initial_delay: Duration, interval: Duration, remote_peer_id: PeerId) -> Self { + /// Creates a new `Handler`. + pub fn new( + initial_delay: Duration, + interval: Duration, + remote_peer_id: PeerId, + public_key: PublicKey, + protocol_version: String, + agent_version: String, + observed_addr: Multiaddr, + ) -> Self { Self { remote_peer_id, inbound_identify_push: Default::default(), events: SmallVec::new(), + reply_streams: VecDeque::new(), + pending_replies: FuturesUnordered::new(), trigger_next_identify: Delay::new(initial_delay), keep_alive: KeepAlive::Yes, interval, + public_key, + protocol_version, + agent_version, + observed_addr, } } @@ -133,9 +201,18 @@ impl Handler { >, ) { match output { - EitherOutput::First(substream) => self - .events - .push(ConnectionHandlerEvent::Custom(Event::Identify(substream))), + EitherOutput::First(substream) => { + self.events + .push(ConnectionHandlerEvent::Custom(Event::Identify)); + if !self.reply_streams.is_empty() { + warn!( + "New inbound identify request from {} while a previous one \ + is still pending. Queueing the new one.", + self.remote_peer_id, + ); + } + self.reply_streams.push_back(substream); + } EitherOutput::Second(fut) => { if self.inbound_identify_push.replace(fut).is_some() { warn!( @@ -195,26 +272,58 @@ impl Handler { } impl ConnectionHandler for Handler { - type InEvent = Push; + type InEvent = InEvent; type OutEvent = Event; type Error = io::Error; - type InboundProtocol = SelectUpgrade>; - type OutboundProtocol = EitherUpgrade>; + type InboundProtocol = SelectUpgrade>; + type OutboundProtocol = EitherUpgrade>; type OutboundOpenInfo = (); type InboundOpenInfo = (); fn listen_protocol(&self) -> SubstreamProtocol { - SubstreamProtocol::new(SelectUpgrade::new(Protocol, PushProtocol::inbound()), ()) + SubstreamProtocol::new(SelectUpgrade::new(Identify, Push::inbound()), ()) } - fn on_behaviour_event(&mut self, Push(push): Self::InEvent) { - self.events - .push(ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new( - EitherUpgrade::B(PushProtocol::outbound(push)), - (), - ), - }); + fn on_behaviour_event( + &mut self, + InEvent { + listen_addrs, + supported_protocols, + protocol, + }: Self::InEvent, + ) { + let info = Info { + public_key: self.public_key.clone(), + protocol_version: self.protocol_version.clone(), + agent_version: self.agent_version.clone(), + listen_addrs, + protocols: supported_protocols, + observed_addr: self.observed_addr.clone(), + }; + + match protocol { + Protocol::Push => { + self.events + .push(ConnectionHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new( + EitherUpgrade::B(Push::outbound(info)), + (), + ), + }); + } + Protocol::Identify(_) => { + let substream = self + .reply_streams + .pop_front() + .expect("A BehaviourInfo reply should have a matching substream."); + let peer = self.remote_peer_id; + let fut = Box::pin(async move { + protocol::send(substream, info).await?; + Ok(peer) + }); + self.pending_replies.push(fut); + } + } } fn connection_keep_alive(&self) -> KeepAlive { @@ -237,7 +346,7 @@ impl ConnectionHandler for Handler { Poll::Ready(()) => { self.trigger_next_identify.reset(self.interval); let ev = ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(EitherUpgrade::A(Protocol), ()), + protocol: SubstreamProtocol::new(EitherUpgrade::A(Identify), ()), }; return Poll::Ready(ev); } @@ -255,7 +364,18 @@ impl ConnectionHandler for Handler { } } - Poll::Pending + // Check for pending replies to send. + match self.pending_replies.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(peer_id))) => Poll::Ready(ConnectionHandlerEvent::Custom( + Event::Identification(peer_id), + )), + Poll::Ready(Some(Err(err))) => Poll::Ready(ConnectionHandlerEvent::Custom( + Event::IdentificationError(ConnectionHandlerUpgrErr::Upgrade( + libp2p_core::upgrade::UpgradeError::Apply(err), + )), + )), + Poll::Ready(None) | Poll::Pending => Poll::Pending, + } } fn on_connection_event( diff --git a/protocols/identify/src/protocol.rs b/protocols/identify/src/protocol.rs index f30622561e3..12e6de3f302 100644 --- a/protocols/identify/src/protocol.rs +++ b/protocols/identify/src/protocol.rs @@ -22,13 +22,14 @@ use crate::structs_proto; use asynchronous_codec::{FramedRead, FramedWrite}; use futures::{future::BoxFuture, prelude::*}; use libp2p_core::{ + connection::ConnectionId, identity, multiaddr, upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}, Multiaddr, PublicKey, }; use log::trace; use std::convert::TryFrom; -use std::{fmt, io, iter, pin::Pin}; +use std::{io, iter, pin::Pin}; use thiserror::Error; use void::Void; @@ -38,25 +39,32 @@ pub const PROTOCOL_NAME: &[u8; 14] = b"/ipfs/id/1.0.0"; pub const PUSH_PROTOCOL_NAME: &[u8; 19] = b"/ipfs/id/push/1.0.0"; +/// The type of the Substream protocol. +#[derive(Debug, PartialEq, Eq)] +pub enum Protocol { + Identify(ConnectionId), + Push, +} + /// Substream upgrade protocol for `/ipfs/id/1.0.0`. #[derive(Debug, Clone)] -pub struct Protocol; +pub struct Identify; /// Substream upgrade protocol for `/ipfs/id/push/1.0.0`. #[derive(Debug, Clone)] -pub struct PushProtocol(T); +pub struct Push(T); pub struct InboundPush(); pub struct OutboundPush(Info); -impl PushProtocol { +impl Push { pub fn inbound() -> Self { - PushProtocol(InboundPush()) + Push(InboundPush()) } } -impl PushProtocol { +impl Push { pub fn outbound(info: Info) -> Self { - PushProtocol(OutboundPush(info)) + Push(OutboundPush(info)) } } @@ -79,31 +87,7 @@ pub struct Info { pub observed_addr: Multiaddr, } -/// The substream on which a reply is expected to be sent. -pub struct ReplySubstream { - inner: T, -} - -impl fmt::Debug for ReplySubstream { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_tuple("ReplySubstream").finish() - } -} - -impl ReplySubstream -where - T: AsyncWrite + Unpin, -{ - /// Sends back the requested information on the substream. - /// - /// Consumes the substream, returning a future that resolves - /// when the reply has been sent on the underlying connection. - pub async fn send(self, info: Info) -> Result<(), UpgradeError> { - send(self.inner, info).await.map_err(Into::into) - } -} - -impl UpgradeInfo for Protocol { +impl UpgradeInfo for Identify { type Info = &'static [u8]; type InfoIter = iter::Once; @@ -112,17 +96,17 @@ impl UpgradeInfo for Protocol { } } -impl InboundUpgrade for Protocol { - type Output = ReplySubstream; +impl InboundUpgrade for Identify { + type Output = C; type Error = UpgradeError; type Future = future::Ready>; fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future { - future::ok(ReplySubstream { inner: socket }) + future::ok(socket) } } -impl OutboundUpgrade for Protocol +impl OutboundUpgrade for Identify where C: AsyncRead + AsyncWrite + Unpin + Send + 'static, { @@ -135,7 +119,7 @@ where } } -impl UpgradeInfo for PushProtocol { +impl UpgradeInfo for Push { type Info = &'static [u8]; type InfoIter = iter::Once; @@ -144,7 +128,7 @@ impl UpgradeInfo for PushProtocol { } } -impl InboundUpgrade for PushProtocol +impl InboundUpgrade for Push where C: AsyncRead + AsyncWrite + Unpin + Send + 'static, { @@ -158,7 +142,7 @@ where } } -impl OutboundUpgrade for PushProtocol +impl OutboundUpgrade for Push where C: AsyncWrite + Unpin + Send + 'static, { @@ -171,7 +155,7 @@ where } } -async fn send(io: T, info: Info) -> Result<(), UpgradeError> +pub(crate) async fn send(io: T, info: Info) -> Result<(), UpgradeError> where T: AsyncWrite + Unpin, { @@ -316,10 +300,11 @@ mod tests { .await .unwrap(); - let sender = apply_inbound(socket, Protocol).await.unwrap(); + let sender = apply_inbound(socket, Identify).await.unwrap(); - sender - .send(Info { + send( + sender, + Info { public_key: send_pubkey, protocol_version: "proto_version".to_owned(), agent_version: "agent_version".to_owned(), @@ -329,16 +314,17 @@ mod tests { ], protocols: vec!["proto1".to_string(), "proto2".to_string()], observed_addr: "/ip4/100.101.102.103/tcp/5000".parse().unwrap(), - }) - .await - .unwrap(); + }, + ) + .await + .unwrap(); }); async_std::task::block_on(async move { let mut transport = tcp::async_io::Transport::default(); let socket = transport.dial(rx.await.unwrap()).unwrap().await.unwrap(); - let info = apply_outbound(socket, Protocol, upgrade::Version::V1) + let info = apply_outbound(socket, Identify, upgrade::Version::V1) .await .unwrap(); assert_eq!(