Skip to content

Commit

Permalink
transports/tcp: remove InAddr
Browse files Browse the repository at this point in the history
  • Loading branch information
elenaf9 committed Aug 11, 2022
1 parent 9b1dc6b commit c4f2124
Showing 1 changed file with 76 additions and 104 deletions.
180 changes: 76 additions & 104 deletions transports/tcp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,20 @@ where
socket.bind(&socket_addr.into())?;
socket.listen(self.config.backlog as _)?;
socket.set_nonblocking(true)?;
TcpListenStream::<T>::new(id, socket.into(), self.port_reuse.clone())
let listener: TcpListener = socket.into();
let local_addr = listener.local_addr()?;
let if_watcher = if local_addr.ip().is_unspecified() {
Some(IfWatcher::new()?)
} else {
self.port_reuse.register(local_addr.ip(), local_addr.port());
let listen_addr = ip_to_multiaddr(local_addr.ip(), local_addr.port());
self.pending_events.push_back(TransportEvent::NewAddress {
listener_id: id,
listen_addr,
});
None
};
TcpListenStream::<T>::new(id, listener, if_watcher, self.port_reuse.clone())
}
}

Expand Down Expand Up @@ -607,14 +620,6 @@ pub enum TcpListenerEvent<S> {
Error(io::Error),
}

/// The listening addresses of a [`TcpListenStream`].
enum InAddr {
/// The stream accepts connections on a single interface.
One(Option<Multiaddr>),
/// The stream accepts connections on all interfaces.
Any(IfWatcher),
}

/// A stream of incoming connections on one or more interfaces.
pub struct TcpListenStream<T>
where
Expand All @@ -628,12 +633,8 @@ where
listen_addr: SocketAddr,
/// The async listening socket for incoming connections.
listener: T::Listener,
/// The IP addresses of network interfaces on which the listening socket
/// is accepting connections.
///
/// If the listen socket listens on all interfaces, these may change over
/// time as interfaces become available or unavailable.
in_addr: InAddr,

if_watcher: Option<IfWatcher>,
/// The port reuse configuration for outgoing connections.
///
/// If enabled, all IP addresses on which this listening stream
Expand All @@ -657,29 +658,18 @@ where
fn new(
listener_id: ListenerId,
listener: TcpListener,
if_watcher: Option<IfWatcher>,
port_reuse: PortReuse,
) -> io::Result<Self> {
let listen_addr = listener.local_addr()?;

let in_addr = if match &listen_addr {
SocketAddr::V4(a) => a.ip().is_unspecified(),
SocketAddr::V6(a) => a.ip().is_unspecified(),
} {
// The `addrs` are populated via `if_watch` when the
// `TcpListenStream` is polled.
InAddr::Any(IfWatcher::new()?)
} else {
InAddr::One(Some(ip_to_multiaddr(listen_addr.ip(), listen_addr.port())))
};

let listener = T::new_listener(listener)?;

Ok(TcpListenStream {
port_reuse,
listener,
listener_id,
listen_addr,
in_addr,
if_watcher,
pause: None,
sleep_on_error: Duration::from_millis(100),
})
Expand All @@ -692,17 +682,16 @@ where
///
/// Has no effect if port reuse is disabled.
fn disable_port_reuse(&mut self) {
match &self.in_addr {
InAddr::One(_) => {
self.port_reuse
.unregister(self.listen_addr.ip(), self.listen_addr.port());
}
InAddr::Any(if_watcher) => {
match &self.if_watcher {
Some(if_watcher) => {
for ip_net in if_watcher.iter() {
self.port_reuse
.unregister(ip_net.addr(), self.listen_addr.port());
}
}
None => self
.port_reuse
.unregister(self.listen_addr.ip(), self.listen_addr.port()),
}
}
}
Expand All @@ -726,88 +715,71 @@ where

fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let me = Pin::into_inner(self);

loop {
match &mut me.in_addr {
InAddr::Any(if_watcher) => {
while let Poll::Ready(ev) = IfWatcher::poll_next(Pin::new(if_watcher), cx) {
match ev {
Ok(IfEvent::Up(inet)) => {
let ip = inet.addr();
if me.listen_addr.is_ipv4() == ip.is_ipv4() {
let ma = ip_to_multiaddr(ip, me.listen_addr.port());
log::debug!("New listen address: {}", ma);
me.port_reuse.register(ip, me.listen_addr.port());
return Poll::Ready(Some(Ok(TcpListenerEvent::NewAddress(ma))));
}
}
Ok(IfEvent::Down(inet)) => {
let ip = inet.addr();
if me.listen_addr.is_ipv4() == ip.is_ipv4() {
let ma = ip_to_multiaddr(ip, me.listen_addr.port());
log::debug!("Expired listen address: {}", ma);
me.port_reuse.unregister(ip, me.listen_addr.port());
return Poll::Ready(Some(Ok(
TcpListenerEvent::AddressExpired(ma),
)));
}
}
Err(err) => {
log::debug! {
"Failure polling interfaces: {:?}. Scheduling retry.",
err
};
me.pause = Some(Delay::new(me.sleep_on_error));
return Poll::Ready(Some(Ok(TcpListenerEvent::Error(err))));
}
if let Some(if_watcher) = me.if_watcher.as_mut() {
while let Poll::Ready(ev) = IfWatcher::poll_next(Pin::new(if_watcher), cx) {
match ev {
Ok(IfEvent::Up(inet)) => {
let ip = inet.addr();
if me.listen_addr.is_ipv4() == ip.is_ipv4() {
let ma = ip_to_multiaddr(ip, me.listen_addr.port());
log::debug!("New listen address: {}", ma);
me.port_reuse.register(ip, me.listen_addr.port());
return Poll::Ready(Some(Ok(TcpListenerEvent::NewAddress(ma))));
}
}
}
// If the listener is bound to a single interface, make sure the
// address is registered for port reuse and reported once.
InAddr::One(out) => {
if let Some(multiaddr) = out.take() {
me.port_reuse
.register(me.listen_addr.ip(), me.listen_addr.port());
return Poll::Ready(Some(Ok(TcpListenerEvent::NewAddress(multiaddr))));
Ok(IfEvent::Down(inet)) => {
let ip = inet.addr();
if me.listen_addr.is_ipv4() == ip.is_ipv4() {
let ma = ip_to_multiaddr(ip, me.listen_addr.port());
log::debug!("Expired listen address: {}", ma);
me.port_reuse.unregister(ip, me.listen_addr.port());
return Poll::Ready(Some(Ok(TcpListenerEvent::AddressExpired(ma))));
}
}
Err(err) => {
log::debug! {
"Failure polling interfaces: {:?}. Scheduling retry.",
err
};
me.pause = Some(Delay::new(me.sleep_on_error));
return Poll::Ready(Some(Ok(TcpListenerEvent::Error(err))));
}
}
}
}

if let Some(mut pause) = me.pause.take() {
match Pin::new(&mut pause).poll(cx) {
Poll::Ready(_) => {}
Poll::Pending => {
me.pause = Some(pause);
return Poll::Pending;
}
if let Some(mut pause) = me.pause.take() {
match Pin::new(&mut pause).poll(cx) {
Poll::Ready(_) => {}
Poll::Pending => {
me.pause = Some(pause);
return Poll::Pending;
}
}
}

// Take the pending connection from the backlog.
let incoming = match T::poll_accept(&mut me.listener, cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(incoming)) => incoming,
Poll::Ready(Err(e)) => {
// These errors are non-fatal for the listener stream.
log::error!("error accepting incoming connection: {}", e);
me.pause = Some(Delay::new(me.sleep_on_error));
return Poll::Ready(Some(Ok(TcpListenerEvent::Error(e))));
}
};
// Take the pending connection from the backlog.
let incoming = match T::poll_accept(&mut me.listener, cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(incoming)) => incoming,
Poll::Ready(Err(e)) => {
// These errors are non-fatal for the listener stream.
log::error!("error accepting incoming connection: {}", e);
me.pause = Some(Delay::new(me.sleep_on_error));
return Poll::Ready(Some(Ok(TcpListenerEvent::Error(e))));
}
};

let local_addr = ip_to_multiaddr(incoming.local_addr.ip(), incoming.local_addr.port());
let remote_addr =
ip_to_multiaddr(incoming.remote_addr.ip(), incoming.remote_addr.port());
let local_addr = ip_to_multiaddr(incoming.local_addr.ip(), incoming.local_addr.port());
let remote_addr = ip_to_multiaddr(incoming.remote_addr.ip(), incoming.remote_addr.port());

log::debug!("Incoming connection from {} at {}", remote_addr, local_addr);
log::debug!("Incoming connection from {} at {}", remote_addr, local_addr);

return Poll::Ready(Some(Ok(TcpListenerEvent::Upgrade {
upgrade: future::ok(incoming.stream),
local_addr,
remote_addr,
})));
}
return Poll::Ready(Some(Ok(TcpListenerEvent::Upgrade {
upgrade: future::ok(incoming.stream),
local_addr,
remote_addr,
})));
}
}

Expand Down

0 comments on commit c4f2124

Please sign in to comment.