Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm/: include ListenerId in SwarmEvents #2123

Merged
merged 3 commits into from
Jul 8, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/chat-tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
swarm.behaviour_mut().floodsub.publish(floodsub_topic.clone(), line.as_bytes());
}
event = swarm.select_next_some() => {
if let SwarmEvent::NewListenAddr(addr) = event {
println!("Listening on {:?}", addr);
if let SwarmEvent::NewListenAddr { address, ..} = event {
println!("Listening on {:?}", address);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
loop {
match swarm.poll_next_unpin(cx) {
Poll::Ready(Some(event)) => {
if let SwarmEvent::NewListenAddr(addr) = event {
println!("Listening on {:?}", addr);
if let SwarmEvent::NewListenAddr { address, ..} = event {
println!("Listening on {:?}", address);
}
}
Poll::Ready(None) => return Poll::Ready(Ok(())),
Expand Down
4 changes: 2 additions & 2 deletions examples/distributed-key-value-store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
loop {
match swarm.poll_next_unpin(cx) {
Poll::Ready(Some(event)) => {
if let SwarmEvent::NewListenAddr(addr) = event {
println!("Listening on {:?}", addr);
if let SwarmEvent::NewListenAddr { address, ..} = event {
println!("Listening on {:?}", address);
}
}
Poll::Ready(None) => return Poll::Ready(Ok(())),
Expand Down
4 changes: 2 additions & 2 deletions examples/gossipsub-chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
id,
peer_id
),
SwarmEvent::NewListenAddr(addr) => {
println!("Listening on {:?}", addr);
SwarmEvent::NewListenAddr { address, .. } => {
println!("Listening on {:?}", address);
}
_ => {}
},
Expand Down
4 changes: 2 additions & 2 deletions examples/ipfs-private.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,8 @@ fn main() -> Result<(), Box<dyn Error>> {
loop {
match swarm.poll_next_unpin(cx) {
Poll::Ready(Some(event)) => {
if let SwarmEvent::NewListenAddr(addr) = event {
println!("Listening on {:?}", addr);
if let SwarmEvent::NewListenAddr { address, .. } = event {
println!("Listening on {:?}", address);
}
}
Poll::Ready(None) => return Poll::Ready(Ok(())),
Expand Down
2 changes: 1 addition & 1 deletion examples/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ fn main() -> Result<(), Box<dyn Error>> {
block_on(future::poll_fn(move |cx| loop {
match swarm.poll_next_unpin(cx) {
Poll::Ready(Some(event)) => match event {
SwarmEvent::NewListenAddr(addr) => println!("Listening on {:?}", addr),
SwarmEvent::NewListenAddr{ address, .. } => println!("Listening on {:?}", address),
SwarmEvent::Behaviour(event) => println!("{:?}", event),
_ => {}
},
Expand Down
4 changes: 2 additions & 2 deletions protocols/identify/src/identify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ mod tests {
let swarm1_fut = swarm1.select_next_some();
pin_mut!(swarm1_fut);
match swarm1_fut.await {
SwarmEvent::NewListenAddr(addr) => return addr,
SwarmEvent::NewListenAddr { address, .. }=> return address,
_ => {}
}
}
Expand Down Expand Up @@ -577,7 +577,7 @@ mod tests {
let swarm1_fut = swarm1.select_next_some();
pin_mut!(swarm1_fut);
match swarm1_fut.await {
SwarmEvent::NewListenAddr(addr) => return addr,
SwarmEvent::NewListenAddr { address, .. }=> return address,
_ => {}
}
}
Expand Down
4 changes: 2 additions & 2 deletions protocols/ping/tests/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ fn ping_pong() {
let peer1 = async move {
loop {
match swarm1.select_next_some().await {
SwarmEvent::NewListenAddr(listener) => tx.send(listener).await.unwrap(),
SwarmEvent::NewListenAddr { address, .. } => tx.send(address).await.unwrap(),
SwarmEvent::Behaviour(PingEvent { peer, result: Ok(PingSuccess::Ping { rtt }) }) => {
count1 -= 1;
if count1 == 0 {
Expand Down Expand Up @@ -137,7 +137,7 @@ fn max_failures() {

loop {
match swarm1.select_next_some().await {
SwarmEvent::NewListenAddr(listener) => tx.send(listener).await.unwrap(),
SwarmEvent::NewListenAddr { address, .. } => tx.send(address).await.unwrap(),
SwarmEvent::Behaviour(PingEvent {
result: Ok(PingSuccess::Ping { .. }), ..
}) => {
Expand Down
4 changes: 2 additions & 2 deletions protocols/relay/examples/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ fn main() -> Result<(), Box<dyn Error>> {
loop {
match swarm.poll_next_unpin(cx) {
Poll::Ready(Some(event)) => match event {
SwarmEvent::NewListenAddr(addr) => {
print_listener_peer(&addr, &opt.mode, local_peer_id)
SwarmEvent::NewListenAddr { address, .. } => {
print_listener_peer(&address, &opt.mode, local_peer_id)
}
_ => println!("{:?}", event),
},
Expand Down
28 changes: 14 additions & 14 deletions protocols/relay/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ fn src_connect_to_dst_listening_via_relay() {
// Destination Node reporting listen address via relay.
loop {
match dst_swarm.select_next_some().await {
SwarmEvent::NewListenAddr(addr) if addr == dst_listen_addr_via_relay => break,
SwarmEvent::NewListenAddr { address, .. } if address == dst_listen_addr_via_relay => break,
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
SwarmEvent::Behaviour(CombinedEvent::Kad(KademliaEvent::RoutingUpdated {
..
Expand Down Expand Up @@ -289,7 +289,7 @@ fn src_connect_to_dst_via_established_connection_to_relay() {
match dst_swarm.select_next_some().await {
SwarmEvent::Dialing(_) => {}
SwarmEvent::ConnectionEstablished { .. } => {}
SwarmEvent::NewListenAddr(addr) if addr == dst_addr_via_relay => break,
SwarmEvent::NewListenAddr { address, .. } if address == dst_addr_via_relay => break,
e => panic!("{:?}", e),
}
}
Expand Down Expand Up @@ -556,7 +556,7 @@ fn firewalled_src_discover_firewalled_dst_via_kad_and_connect_to_dst_via_routabl
// Destination Node reporting listen address via relay.
loop {
match dst_swarm.select_next_some().await {
SwarmEvent::NewListenAddr(addr) if addr == dst_addr_via_relay => break,
SwarmEvent::NewListenAddr { address, .. } if address == dst_addr_via_relay => break,
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
SwarmEvent::Behaviour(CombinedEvent::Kad(KademliaEvent::RoutingUpdated {
..
Expand Down Expand Up @@ -722,7 +722,7 @@ fn inactive_connection_timeout() {
match dst_swarm.select_next_some().await {
SwarmEvent::Dialing(_) => {}
SwarmEvent::ConnectionEstablished { .. } => {}
SwarmEvent::NewListenAddr(addr) if addr == dst_addr_via_relay => break,
SwarmEvent::NewListenAddr { address, .. } if address == dst_addr_via_relay => break,
e => panic!("{:?}", e),
}
}
Expand Down Expand Up @@ -796,7 +796,7 @@ fn concurrent_connection_same_relay_same_dst() {
match dst_swarm.select_next_some().await {
SwarmEvent::Dialing(_) => {}
SwarmEvent::ConnectionEstablished { .. } => {}
SwarmEvent::NewListenAddr(addr) if addr == dst_addr_via_relay => break,
SwarmEvent::NewListenAddr { address, .. } if address == dst_addr_via_relay => break,
e => panic!("{:?}", e),
}
}
Expand Down Expand Up @@ -924,10 +924,10 @@ fn yield_incoming_connection_through_correct_listener() {
break;
}
}
SwarmEvent::NewListenAddr(addr)
if addr == relay_1_addr_incl_circuit
|| addr == relay_2_addr_incl_circuit
|| addr == dst_addr => {}
SwarmEvent::NewListenAddr { address, .. }
if address == relay_1_addr_incl_circuit
|| address == relay_2_addr_incl_circuit
|| address == dst_addr => {}
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
e => panic!("{:?}", e),
}
Expand Down Expand Up @@ -966,10 +966,10 @@ fn yield_incoming_connection_through_correct_listener() {
unreachable!();
}
}
SwarmEvent::NewListenAddr(addr)
if addr == relay_1_addr_incl_circuit
|| addr == relay_2_addr_incl_circuit
|| addr == dst_addr => {}
SwarmEvent::NewListenAddr { address, .. }
if address == relay_1_addr_incl_circuit
|| address == relay_2_addr_incl_circuit
|| address == dst_addr => {}
SwarmEvent::Behaviour(CombinedEvent::Ping(PingEvent {
peer,
result: Ok(_),
Expand Down Expand Up @@ -1044,7 +1044,7 @@ fn yield_incoming_connection_through_correct_listener() {
pool.run_until(async {
loop {
match dst_swarm.select_next_some().await {
SwarmEvent::NewListenAddr(addr) if addr == Protocol::P2pCircuit.into() => break,
SwarmEvent::NewListenAddr { address, .. } if address == Protocol::P2pCircuit.into() => break,
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
SwarmEvent::Behaviour(CombinedEvent::Kad(KademliaEvent::RoutingUpdated {
..
Expand Down
4 changes: 2 additions & 2 deletions protocols/request-response/tests/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ fn ping_protocol() {
let peer1 = async move {
loop {
match swarm1.select_next_some().await {
SwarmEvent::NewListenAddr(addr) => tx.send(addr).await.unwrap(),
SwarmEvent::NewListenAddr { address, .. }=> tx.send(address).await.unwrap(),
SwarmEvent::Behaviour(RequestResponseEvent::Message {
peer,
message: RequestResponseMessage::Request { request, channel, .. }
Expand Down Expand Up @@ -312,7 +312,7 @@ fn ping_protocol_throttled() {
let peer1 = async move {
for i in 1 .. {
match swarm1.select_next_some().await {
SwarmEvent::NewListenAddr(addr) => tx.send(addr).await.unwrap(),
SwarmEvent::NewListenAddr { address, .. } => tx.send(address).await.unwrap(),
SwarmEvent::Behaviour(throttled::Event::Event(RequestResponseEvent::Message {
peer,
message: RequestResponseMessage::Request { request, channel, .. },
Expand Down
4 changes: 2 additions & 2 deletions src/tutorial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,8 @@
//! block_on(future::poll_fn(move |cx| loop {
//! match swarm.poll_next_unpin(cx) {
//! Poll::Ready(Some(event)) => {
//! if let SwarmEvent::NewListenAddr(addr) = event {
//! println!("Listening on {:?}", addr);
//! if let SwarmEvent::NewListenAddr { address, .. }= event {
//! println!("Listening on {:?}", address);
//! }
//! },
//! Poll::Ready(None) => return Poll::Ready(()),
Expand Down
30 changes: 26 additions & 4 deletions swarm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,23 @@ pub enum SwarmEvent<TBvEv, THandleErr> {
error: PendingConnectionError<io::Error>,
},
/// One of our listeners has reported a new local listening address.
NewListenAddr(Multiaddr),
NewListenAddr{
/// The listener that is listening on the new address.
listener_id: ListenerId,
/// The new address that is being listened on.
address: Multiaddr
},
/// One of our listeners has reported the expiration of a listening address.
ExpiredListenAddr(Multiaddr),
ExpiredListenAddr{
/// The listener that is no longer listening on the address.
listener_id: ListenerId,
/// The new address that is being listened on.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// The new address that is being listened on.
/// The expired address.

address: Multiaddr
},
/// One of the listeners gracefully closed.
ListenerClosed {
/// The listener that closed.
listener_id: ListenerId,
/// The addresses that the listener was listening on. These addresses are now considered
/// expired, similar to if a [`ExpiredListenAddr`](SwarmEvent::ExpiredListenAddr) event
/// has been generated for each of them.
Expand All @@ -244,6 +256,8 @@ pub enum SwarmEvent<TBvEv, THandleErr> {
},
/// One of the listeners reported a non-fatal error.
ListenerError {
/// The listener that errored.
listener_id: ListenerId,
/// The listener error.
error: io::Error,
},
Expand Down Expand Up @@ -588,13 +602,19 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
this.listened_addrs.push(listen_addr.clone())
}
this.behaviour.inject_new_listen_addr(listener_id, &listen_addr);
return Poll::Ready(SwarmEvent::NewListenAddr(listen_addr));
return Poll::Ready(SwarmEvent::NewListenAddr {
listener_id,
address: listen_addr
});
}
Poll::Ready(NetworkEvent::ExpiredListenerAddress { listener_id, listen_addr }) => {
log::debug!("Listener {:?}; Expired address {:?}.", listener_id, listen_addr);
this.listened_addrs.retain(|a| a != &listen_addr);
this.behaviour.inject_expired_listen_addr(listener_id, &listen_addr);
return Poll::Ready(SwarmEvent::ExpiredListenAddr(listen_addr));
return Poll::Ready(SwarmEvent::ExpiredListenAddr{
listener_id,
address: listen_addr
});
}
Poll::Ready(NetworkEvent::ListenerClosed { listener_id, addresses, reason }) => {
log::debug!("Listener {:?}; Closed by {:?}.", listener_id, reason);
Expand All @@ -606,13 +626,15 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
Err(err) => Err(err),
});
return Poll::Ready(SwarmEvent::ListenerClosed {
listener_id,
addresses,
reason,
});
}
Poll::Ready(NetworkEvent::ListenerError { listener_id, error }) => {
this.behaviour.inject_listener_error(listener_id, &error);
return Poll::Ready(SwarmEvent::ListenerError {
listener_id,
error,
});
},
Expand Down