From 17cf06a09a5efbcd9d52af01d581c31fdf1d99c0 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Sat, 13 Mar 2021 16:07:50 +0100 Subject: [PATCH 1/2] swarm/src/lib: Remove Deref and DerefMut impls on Swarm Remove `Deref` and `DerefMut` implementations previously dereferencing to the `NetworkBehaviour` on `Swarm`. Instead one can access the `NetworkBehaviour` via `Swarm::behaviour` and `Swarm::behaviour_mut`. Methods on `Swarm` can now be accessed directly, e.g. via `my_swarm.local_peer_id()`. Reasoning: Accessing the `NetworkBehaviour` of a `Swarm` through `Deref` and `DerefMut` instead of a method call is an unnecessary complication, especially for newcomers. In addition, `Swarm` is not a smart-pointer and should thus not make use of `Deref` and `DerefMut`, see documentation from the standard library below. > Deref should only be implemented for smart pointers to avoid confusion. https://doc.rust-lang.org/std/ops/trait.Deref.html --- examples/chat-tokio.rs | 4 +- examples/chat.rs | 8 +- examples/distributed-key-value-store.rs | 4 +- examples/gossipsub-chat.rs | 6 +- examples/ipfs-kad.rs | 2 +- examples/ipfs-private.rs | 5 +- examples/mdns-passive-discovery.rs | 2 +- examples/ping.rs | 4 +- protocols/gossipsub/src/lib.rs | 2 +- protocols/gossipsub/tests/smoke.rs | 6 +- protocols/identify/src/identify.rs | 4 +- protocols/kad/src/behaviour/test.rs | 117 ++++++++++--------- protocols/ping/tests/ping.rs | 8 +- protocols/relay/examples/relay.rs | 2 +- protocols/relay/src/lib.rs | 4 +- protocols/relay/tests/lib.rs | 136 ++++++++++++----------- protocols/request-response/tests/ping.rs | 46 ++++---- swarm/CHANGELOG.md | 12 ++ swarm/src/lib.rs | 120 +++++++++----------- 19 files changed, 248 insertions(+), 244 deletions(-) diff --git a/examples/chat-tokio.rs b/examples/chat-tokio.rs index 8775449755c..ebdfaf62175 100644 --- a/examples/chat-tokio.rs +++ b/examples/chat-tokio.rs @@ -138,7 +138,7 @@ async fn main() -> Result<(), Box> { // Reach out to another node if specified if let Some(to_dial) = std::env::args().nth(1) { let addr: Multiaddr = to_dial.parse()?; - Swarm::dial_addr(&mut swarm, addr)?; + swarm.dial_addr(addr)?; println!("Dialed {:?}", to_dial) } @@ -146,7 +146,7 @@ async fn main() -> Result<(), Box> { let mut stdin = io::BufReader::new(io::stdin()).lines(); // Listen on all interfaces and whatever port the OS assigns - Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse()?)?; + swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; // Kick it off let mut listening = false; diff --git a/examples/chat.rs b/examples/chat.rs index e7050da90bc..f95c61c35f4 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -135,7 +135,7 @@ fn main() -> Result<(), Box> { // Reach out to another node if specified if let Some(to_dial) = std::env::args().nth(1) { let addr: Multiaddr = to_dial.parse()?; - Swarm::dial_addr(&mut swarm, addr)?; + swarm.dial_addr(addr)?; println!("Dialed {:?}", to_dial) } @@ -143,14 +143,16 @@ fn main() -> Result<(), Box> { let mut stdin = io::BufReader::new(io::stdin()).lines(); // Listen on all interfaces and whatever port the OS assigns - Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse()?)?; + swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; // Kick it off let mut listening = false; task::block_on(future::poll_fn(move |cx: &mut Context<'_>| { loop { match stdin.try_poll_next_unpin(cx)? { - Poll::Ready(Some(line)) => swarm.floodsub.publish(floodsub_topic.clone(), line.as_bytes()), + Poll::Ready(Some(line)) => swarm.behaviour_mut() + .floodsub + .publish(floodsub_topic.clone(), line.as_bytes()), Poll::Ready(None) => panic!("Stdin closed"), Poll::Pending => break } diff --git a/examples/distributed-key-value-store.rs b/examples/distributed-key-value-store.rs index 412e933f618..d67453618b9 100644 --- a/examples/distributed-key-value-store.rs +++ b/examples/distributed-key-value-store.rs @@ -160,14 +160,14 @@ fn main() -> Result<(), Box> { let mut stdin = io::BufReader::new(io::stdin()).lines(); // Listen on all interfaces and whatever port the OS assigns. - Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse()?)?; + swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; // Kick it off. let mut listening = false; task::block_on(future::poll_fn(move |cx: &mut Context<'_>| { loop { match stdin.try_poll_next_unpin(cx)? { - Poll::Ready(Some(line)) => handle_input_line(&mut swarm.kademlia, line), + Poll::Ready(Some(line)) => handle_input_line(&mut swarm.behaviour_mut().kademlia, line), Poll::Ready(None) => panic!("Stdin closed"), Poll::Pending => break } diff --git a/examples/gossipsub-chat.rs b/examples/gossipsub-chat.rs index 2a03b5051e3..2be0f07b8c5 100644 --- a/examples/gossipsub-chat.rs +++ b/examples/gossipsub-chat.rs @@ -115,13 +115,13 @@ fn main() -> Result<(), Box> { }; // Listen on all interfaces and whatever port the OS assigns - libp2p::Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse().unwrap()).unwrap(); + swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()).unwrap(); // Reach out to another node if specified if let Some(to_dial) = std::env::args().nth(1) { let dialing = to_dial.clone(); match to_dial.parse() { - Ok(to_dial) => match libp2p::Swarm::dial_addr(&mut swarm, to_dial) { + Ok(to_dial) => match swarm.dial_addr(to_dial) { Ok(_) => println!("Dialed {:?}", dialing), Err(e) => println!("Dial {:?} failed: {:?}", dialing, e), }, @@ -137,7 +137,7 @@ fn main() -> Result<(), Box> { task::block_on(future::poll_fn(move |cx: &mut Context<'_>| { loop { if let Err(e) = match stdin.try_poll_next_unpin(cx)? { - Poll::Ready(Some(line)) => swarm.publish(topic.clone(), line.as_bytes()), + Poll::Ready(Some(line)) => swarm.behaviour_mut().publish(topic.clone(), line.as_bytes()), Poll::Ready(None) => panic!("Stdin closed"), Poll::Pending => break, } { diff --git a/examples/ipfs-kad.rs b/examples/ipfs-kad.rs index ec48435db00..3b5aa0762fe 100644 --- a/examples/ipfs-kad.rs +++ b/examples/ipfs-kad.rs @@ -91,7 +91,7 @@ fn main() -> Result<(), Box> { }; println!("Searching for the closest peers to {:?}", to_search); - swarm.get_closest_peers(to_search); + swarm.behaviour_mut().get_closest_peers(to_search); // Kick it off! task::block_on(async move { diff --git a/examples/ipfs-private.rs b/examples/ipfs-private.rs index f53419d267e..4ee8562e569 100644 --- a/examples/ipfs-private.rs +++ b/examples/ipfs-private.rs @@ -261,7 +261,7 @@ fn main() -> Result<(), Box> { // Reach out to other nodes if specified for to_dial in std::env::args().skip(1) { let addr: Multiaddr = parse_legacy_multiaddr(&to_dial)?; - Swarm::dial_addr(&mut swarm, addr)?; + swarm.dial_addr(addr)?; println!("Dialed {:?}", to_dial) } @@ -269,7 +269,7 @@ fn main() -> Result<(), Box> { let mut stdin = io::BufReader::new(io::stdin()).lines(); // Listen on all interfaces and whatever port the OS assigns - Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse()?)?; + swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; // Kick it off let mut listening = false; @@ -277,6 +277,7 @@ fn main() -> Result<(), Box> { loop { if let Err(e) = match stdin.try_poll_next_unpin(cx)? { Poll::Ready(Some(line)) => swarm + .behaviour_mut() .gossipsub .publish(gossipsub_topic.clone(), line.as_bytes()), Poll::Ready(None) => panic!("Stdin closed"), diff --git a/examples/mdns-passive-discovery.rs b/examples/mdns-passive-discovery.rs index 774fc9e6090..9e544609816 100644 --- a/examples/mdns-passive-discovery.rs +++ b/examples/mdns-passive-discovery.rs @@ -40,7 +40,7 @@ async fn main() -> Result<(), Box> { // Note that the MDNS behaviour itself will not actually inititiate any connections, // as it only uses UDP. let mut swarm = Swarm::new(transport, behaviour, peer_id); - Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse()?)?; + swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; loop { match swarm.next().await { diff --git a/examples/ping.rs b/examples/ping.rs index eb5fa762fc7..6994dc21f40 100644 --- a/examples/ping.rs +++ b/examples/ping.rs @@ -69,12 +69,12 @@ fn main() -> Result<(), Box> { // command-line argument, if any. if let Some(addr) = std::env::args().nth(1) { let remote = addr.parse()?; - Swarm::dial_addr(&mut swarm, remote)?; + swarm.dial_addr(remote)?; println!("Dialed {}", addr) } // Tell the swarm to listen on all interfaces and a random, OS-assigned port. - Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse()?)?; + swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; let mut listening = false; task::block_on(future::poll_fn(move |cx: &mut Context<'_>| { diff --git a/protocols/gossipsub/src/lib.rs b/protocols/gossipsub/src/lib.rs index cba668f6a99..ddba0f69a1e 100644 --- a/protocols/gossipsub/src/lib.rs +++ b/protocols/gossipsub/src/lib.rs @@ -118,7 +118,7 @@ //! //! // Listen on a memory transport. //! let memory: Multiaddr = libp2p_core::multiaddr::Protocol::Memory(10).into(); -//! let addr = libp2p_swarm::Swarm::listen_on(&mut swarm, memory).unwrap(); +//! let addr = swarm.listen_on(memory).unwrap(); //! println!("Listening on {:?}", addr); //! ``` diff --git a/protocols/gossipsub/tests/smoke.rs b/protocols/gossipsub/tests/smoke.rs index 1ef92ba658c..cb7c23d1747 100644 --- a/protocols/gossipsub/tests/smoke.rs +++ b/protocols/gossipsub/tests/smoke.rs @@ -170,7 +170,7 @@ fn build_node() -> (Multiaddr, Swarm) { let port = 1 + random::(); let mut addr: Multiaddr = Protocol::Memory(port).into(); - Swarm::listen_on(&mut swarm, addr.clone()).unwrap(); + swarm.listen_on(addr.clone()).unwrap(); addr = addr.with(libp2p_core::multiaddr::Protocol::P2p( public_key.into_peer_id().into(), @@ -196,7 +196,7 @@ fn multi_hop_propagation() { // Subscribe each node to the same topic. let topic = Topic::new("test-net"); for (_addr, node) in &mut graph.nodes { - node.subscribe(&topic).unwrap(); + node.behaviour_mut().subscribe(&topic).unwrap(); } // Wait for all nodes to be subscribed. @@ -223,7 +223,7 @@ fn multi_hop_propagation() { graph = graph.drain_poll(); // Publish a single message. - graph.nodes[0].1.publish(topic, vec![1, 2, 3]).unwrap(); + graph.nodes[0].1.behaviour_mut().publish(topic, vec![1, 2, 3]).unwrap(); // Wait for all nodes to receive the published message. let mut received_msgs = 0; diff --git a/protocols/identify/src/identify.rs b/protocols/identify/src/identify.rs index 81c114f12ea..c56aa22bca2 100644 --- a/protocols/identify/src/identify.rs +++ b/protocols/identify/src/identify.rs @@ -318,7 +318,7 @@ mod tests { (swarm, pubkey) }; - Swarm::listen_on(&mut swarm1, "/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); + swarm1.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); let listen_addr = async_std::task::block_on(async { loop { @@ -330,7 +330,7 @@ mod tests { } } }); - Swarm::dial_addr(&mut swarm2, listen_addr).unwrap(); + swarm2.dial_addr(listen_addr).unwrap(); // nb. Either swarm may receive the `Identified` event first, upon which // it will permit the connection to be closed, as defined by diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index 4275d55c091..41e5a41326a 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -71,7 +71,7 @@ fn build_node_with_config(cfg: KademliaConfig) -> (Multiaddr, TestSwarm) { let mut swarm = Swarm::new(transport, behaviour, local_id); let address: Multiaddr = Protocol::Memory(random::()).into(); - Swarm::listen_on(&mut swarm, address.clone()).unwrap(); + swarm.listen_on(address.clone()).unwrap(); (address, swarm) } @@ -95,13 +95,13 @@ fn build_connected_nodes_with_config(total: usize, step: usize, cfg: KademliaCon { let mut swarms = build_nodes_with_config(total, cfg); let swarm_ids: Vec<_> = swarms.iter() - .map(|(addr, swarm)| (addr.clone(), Swarm::local_peer_id(swarm).clone())) + .map(|(addr, swarm)| (addr.clone(), *swarm.local_peer_id())) .collect(); let mut i = 0; for (j, (addr, peer_id)) in swarm_ids.iter().enumerate().skip(1) { if i < swarm_ids.len() { - swarms[i].1.add_address(peer_id, addr.clone()); + swarms[i].1.behaviour_mut().add_address(peer_id, addr.clone()); } if j % step == 0 { i += step; @@ -116,12 +116,12 @@ fn build_fully_connected_nodes_with_config(total: usize, cfg: KademliaConfig) { let mut swarms = build_nodes_with_config(total, cfg); let swarm_addr_and_peer_id: Vec<_> = swarms.iter() - .map(|(addr, swarm)| (addr.clone(), Swarm::local_peer_id(swarm).clone())) + .map(|(addr, swarm)| (addr.clone(), *swarm.local_peer_id())) .collect(); for (_addr, swarm) in swarms.iter_mut() { for (addr, peer) in &swarm_addr_and_peer_id { - swarm.add_address(&peer, addr.clone()); + swarm.behaviour_mut().add_address(&peer, addr.clone()); } } @@ -173,7 +173,7 @@ fn bootstrap() { .cloned() .collect(); - let qid = swarms[0].bootstrap().unwrap(); + let qid = swarms[0].behaviour_mut().bootstrap().unwrap(); // Expected known peers let expected_known = swarm_ids.iter().skip(1).cloned().collect::>(); @@ -197,7 +197,7 @@ fn bootstrap() { first = false; if ok.num_remaining == 0 { let mut known = HashSet::new(); - for b in swarm.kbuckets.iter() { + for b in swarm.behaviour_mut().kbuckets.iter() { for e in b.iter() { known.insert(e.node.key.preimage().clone()); } @@ -241,9 +241,9 @@ fn query_iter() { // propagate forwards through the list of peers. let search_target = PeerId::random(); let search_target_key = kbucket::Key::from(search_target); - let qid = swarms[0].get_closest_peers(search_target); + let qid = swarms[0].behaviour_mut().get_closest_peers(search_target); - match swarms[0].query(&qid) { + match swarms[0].behaviour_mut().query(&qid) { Some(q) => match q.info() { QueryInfo::GetClosestPeers { key } => { assert_eq!(&key[..], search_target.to_bytes().as_slice()) @@ -271,7 +271,7 @@ fn query_iter() { assert_eq!(id, qid); assert_eq!(&ok.key[..], search_target.to_bytes().as_slice()); assert_eq!(swarm_ids[i], expected_swarm_id); - assert_eq!(swarm.queries.size(), 0); + assert_eq!(swarm.behaviour_mut().queries.size(), 0); assert!(expected_peer_ids.iter().all(|p| ok.peers.contains(p))); let key = kbucket::Key::new(ok.key); assert_eq!(expected_distances, distances(&key, ok.peers)); @@ -306,12 +306,12 @@ fn unresponsive_not_returned_direct() { // Add fake addresses. for _ in 0 .. 10 { - swarms[0].add_address(&PeerId::random(), Protocol::Udp(10u16).into()); + swarms[0].behaviour_mut().add_address(&PeerId::random(), Protocol::Udp(10u16).into()); } // Ask first to search a random value. let search_target = PeerId::random(); - swarms[0].get_closest_peers(search_target); + swarms[0].behaviour_mut().get_closest_peers(search_target); block_on( poll_fn(move |ctx| { @@ -348,20 +348,20 @@ fn unresponsive_not_returned_indirect() { // Add fake addresses to first. for _ in 0 .. 10 { - swarms[0].1.add_address(&PeerId::random(), multiaddr![Udp(10u16)]); + swarms[0].1.behaviour_mut().add_address(&PeerId::random(), multiaddr![Udp(10u16)]); } // Connect second to first. - let first_peer_id = Swarm::local_peer_id(&swarms[0].1).clone(); + let first_peer_id = *swarms[0].1.local_peer_id(); let first_address = swarms[0].0.clone(); - swarms[1].1.add_address(&first_peer_id, first_address); + swarms[1].1.behaviour_mut().add_address(&first_peer_id, first_address); // Drop the swarm addresses. let mut swarms = swarms.into_iter().map(|(_addr, swarm)| swarm).collect::>(); // Ask second to search a random value. let search_target = PeerId::random(); - swarms[1].get_closest_peers(search_target); + swarms[1].behaviour_mut().get_closest_peers(search_target); block_on( poll_fn(move |ctx| { @@ -394,19 +394,18 @@ fn get_record_not_found() { let mut swarms = build_nodes(3); let swarm_ids: Vec<_> = swarms.iter() - .map(|(_addr, swarm)| Swarm::local_peer_id(swarm)) - .cloned() + .map(|(_addr, swarm)| *swarm.local_peer_id()) .collect(); let (second, third) = (swarms[1].0.clone(), swarms[2].0.clone()); - swarms[0].1.add_address(&swarm_ids[1], second); - swarms[1].1.add_address(&swarm_ids[2], third); + swarms[0].1.behaviour_mut().add_address(&swarm_ids[1], second); + swarms[1].1.behaviour_mut().add_address(&swarm_ids[2], third); // Drop the swarm addresses. let mut swarms = swarms.into_iter().map(|(_addr, swarm)| swarm).collect::>(); let target_key = record::Key::from(random_multihash()); - let qid = swarms[0].get_record(&target_key, Quorum::One); + let qid = swarms[0].behaviour_mut().get_record(&target_key, Quorum::One); block_on( poll_fn(move |ctx| { @@ -466,8 +465,8 @@ fn put_record() { let mut single_swarm = build_node_with_config(config); // Connect `single_swarm` to three bootnodes. for i in 0..3 { - single_swarm.1.add_address( - &Swarm::local_peer_id(&fully_connected_swarms[i].1), + single_swarm.1.behaviour_mut().add_address( + fully_connected_swarms[i].1.local_peer_id(), fully_connected_swarms[i].0.clone(), ); } @@ -493,8 +492,8 @@ fn put_record() { // Initiate put_record queries. let mut qids = HashSet::new(); for r in records.values() { - let qid = swarms[0].put_record(r.clone(), Quorum::All).unwrap(); - match swarms[0].query(&qid) { + let qid = swarms[0].behaviour_mut().put_record(r.clone(), Quorum::All).unwrap(); + match swarms[0].behaviour_mut().query(&qid) { Some(q) => match q.info() { QueryInfo::PutRecord { phase, record, .. } => { assert_eq!(phase, &PutRecordPhase::GetClosestPeers); @@ -535,7 +534,7 @@ fn put_record() { Err(e) => panic!("{:?}", e), Ok(ok) => { assert!(records.contains_key(&ok.key)); - let record = swarm.store.get(&ok.key).unwrap(); + let record = swarm.behaviour_mut().store.get(&ok.key).unwrap(); results.push(record.into_owned()); } } @@ -562,7 +561,7 @@ fn put_record() { assert_eq!(r.key, expected.key); assert_eq!(r.value, expected.value); assert_eq!(r.expires, expected.expires); - assert_eq!(r.publisher.as_ref(), Some(Swarm::local_peer_id(&swarms[0]))); + assert_eq!(r.publisher, Some(*swarms[0].local_peer_id())); let key = kbucket::Key::new(r.key.clone()); let mut expected = swarms.iter() @@ -582,8 +581,8 @@ fn put_record() { let actual = swarms.iter() .skip(1) .filter_map(|swarm| - if swarm.store.get(key.preimage()).is_some() { - Some(Swarm::local_peer_id(swarm).clone()) + if swarm.behaviour().store.get(key.preimage()).is_some() { + Some(*swarm.local_peer_id()) } else { None }) @@ -608,18 +607,18 @@ fn put_record() { } if republished { - assert_eq!(swarms[0].store.records().count(), records.len()); - assert_eq!(swarms[0].queries.size(), 0); + assert_eq!(swarms[0].behaviour_mut().store.records().count(), records.len()); + assert_eq!(swarms[0].behaviour_mut().queries.size(), 0); for k in records.keys() { - swarms[0].store.remove(&k); + swarms[0].behaviour_mut().store.remove(&k); } - assert_eq!(swarms[0].store.records().count(), 0); + assert_eq!(swarms[0].behaviour_mut().store.records().count(), 0); // All records have been republished, thus the test is complete. return Poll::Ready(()); } // Tell the replication job to republish asap. - swarms[0].put_record_job.as_mut().unwrap().asap(true); + swarms[0].behaviour_mut().put_record_job.as_mut().unwrap().asap(true); republished = true; }) ) @@ -635,7 +634,7 @@ fn get_record() { // Let first peer know of second peer and second peer know of third peer. for i in 0..2 { let (peer_id, address) = (Swarm::local_peer_id(&swarms[i+1].1).clone(), swarms[i+1].0.clone()); - swarms[i].1.add_address(&peer_id, address); + swarms[i].1.behaviour_mut().add_address(&peer_id, address); } // Drop the swarm addresses. @@ -645,8 +644,8 @@ fn get_record() { let expected_cache_candidate = *Swarm::local_peer_id(&swarms[1]); - swarms[2].store.put(record.clone()).unwrap(); - let qid = swarms[0].get_record(&record.key, Quorum::One); + swarms[2].behaviour_mut().store.put(record.clone()).unwrap(); + let qid = swarms[0].behaviour_mut().get_record(&record.key, Quorum::One); block_on( poll_fn(move |ctx| { @@ -692,11 +691,11 @@ fn get_record_many() { let record = Record::new(random_multihash(), vec![4,5,6]); for i in 0 .. num_nodes { - swarms[i].store.put(record.clone()).unwrap(); + swarms[i].behaviour_mut().store.put(record.clone()).unwrap(); } let quorum = Quorum::N(NonZeroUsize::new(num_results).unwrap()); - let qid = swarms[0].get_record(&record.key, quorum); + let qid = swarms[0].behaviour_mut().get_record(&record.key, quorum); block_on( poll_fn(move |ctx| { @@ -751,8 +750,8 @@ fn add_provider() { let mut single_swarm = build_node_with_config(config); // Connect `single_swarm` to three bootnodes. for i in 0..3 { - single_swarm.1.add_address( - &Swarm::local_peer_id(&fully_connected_swarms[i].1), + single_swarm.1.behaviour_mut().add_address( + fully_connected_swarms[i].1.local_peer_id(), fully_connected_swarms[i].0.clone(), ); } @@ -775,7 +774,7 @@ fn add_provider() { // Initiate the first round of publishing. let mut qids = HashSet::new(); for k in &keys { - let qid = swarms[0].start_providing(k.clone()).unwrap(); + let qid = swarms[0].behaviour_mut().start_providing(k.clone()).unwrap(); qids.insert(qid); } @@ -825,7 +824,7 @@ fn add_provider() { // Collect the nodes that have a provider record for `key`. let actual = swarms.iter().skip(1) .filter_map(|swarm| - if swarm.store.providers(&key).len() == 1 { + if swarm.behaviour().store.providers(&key).len() == 1 { Some(Swarm::local_peer_id(&swarm).clone()) } else { None @@ -859,22 +858,22 @@ fn add_provider() { // One round of publishing is complete. assert!(results.is_empty()); for swarm in &swarms { - assert_eq!(swarm.queries.size(), 0); + assert_eq!(swarm.behaviour().queries.size(), 0); } if republished { - assert_eq!(swarms[0].store.provided().count(), keys.len()); + assert_eq!(swarms[0].behaviour_mut().store.provided().count(), keys.len()); for k in &keys { - swarms[0].stop_providing(&k); + swarms[0].behaviour_mut().stop_providing(&k); } - assert_eq!(swarms[0].store.provided().count(), 0); + assert_eq!(swarms[0].behaviour_mut().store.provided().count(), 0); // All records have been republished, thus the test is complete. return Poll::Ready(()); } // Initiate the second round of publishing by telling the // periodic provider job to run asap. - swarms[0].add_provider_job.as_mut().unwrap().asap(); + swarms[0].behaviour_mut().add_provider_job.as_mut().unwrap().asap(); published = false; republished = true; }) @@ -892,10 +891,10 @@ fn exceed_jobs_max_queries() { let (_addr, mut swarm) = build_node(); let num = JOBS_MAX_QUERIES + 1; for _ in 0 .. num { - swarm.get_closest_peers(PeerId::random()); + swarm.behaviour_mut().get_closest_peers(PeerId::random()); } - assert_eq!(swarm.queries.size(), num); + assert_eq!(swarm.behaviour_mut().queries.size(), num); block_on( poll_fn(move |ctx| { @@ -947,18 +946,18 @@ fn disjoint_query_does_not_finish_before_all_paths_did() { // Make `bob` and `trudy` aware of their version of the record searched by // `alice`. - bob.1.store.put(record_bob.clone()).unwrap(); - trudy.1.store.put(record_trudy.clone()).unwrap(); + bob.1.behaviour_mut().store.put(record_bob.clone()).unwrap(); + trudy.1.behaviour_mut().store.put(record_trudy.clone()).unwrap(); // Make `trudy` and `bob` known to `alice`. - alice.1.add_address(&Swarm::local_peer_id(&trudy.1), trudy.0.clone()); - alice.1.add_address(&Swarm::local_peer_id(&bob.1), bob.0.clone()); + alice.1.behaviour_mut().add_address(&trudy.1.local_peer_id(), trudy.0.clone()); + alice.1.behaviour_mut().add_address(&bob.1.local_peer_id(), bob.0.clone()); // Drop the swarm addresses. let (mut alice, mut bob, mut trudy) = (alice.1, bob.1, trudy.1); // Have `alice` query the Dht for `key` with a quorum of 1. - alice.get_record(&key, Quorum::One); + alice.behaviour_mut().get_record(&key, Quorum::One); // The default peer timeout is 10 seconds. Choosing 1 seconds here should // give enough head room to prevent connections to `bob` to time out. @@ -1001,8 +1000,8 @@ fn disjoint_query_does_not_finish_before_all_paths_did() { ); // Make sure `alice` has exactly one query with `trudy`'s record only. - assert_eq!(1, alice.queries.iter().count()); - alice.queries.iter().for_each(|q| { + assert_eq!(1, alice.behaviour().queries.iter().count()); + alice.behaviour().queries.iter().for_each(|q| { match &q.inner.info { QueryInfo::GetRecord{ records, .. } => { assert_eq!( @@ -1079,7 +1078,7 @@ fn manual_bucket_inserts() { // that none of them was inserted into a bucket. let mut routable = Vec::new(); // Start an iterative query from the first peer. - swarms[0].1.get_closest_peers(PeerId::random()); + swarms[0].1.behaviour_mut().get_closest_peers(PeerId::random()); block_on(poll_fn(move |ctx| { for (_, swarm) in swarms.iter_mut() { loop { @@ -1091,7 +1090,7 @@ fn manual_bucket_inserts() { routable.push(peer); if expected.is_empty() { for peer in routable.iter() { - let bucket = swarm.kbucket(*peer).unwrap(); + let bucket = swarm.behaviour_mut().kbucket(*peer).unwrap(); assert!(bucket.iter().all(|e| e.node.key.preimage() != peer)); } return Poll::Ready(()) diff --git a/protocols/ping/tests/ping.rs b/protocols/ping/tests/ping.rs index 556f27ed21f..ede6d49bfa7 100644 --- a/protocols/ping/tests/ping.rs +++ b/protocols/ping/tests/ping.rs @@ -56,7 +56,7 @@ fn ping_pong() { let pid1 = peer1_id.clone(); let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap(); - Swarm::listen_on(&mut swarm1, addr).unwrap(); + swarm1.listen_on(addr).unwrap(); let mut count1 = count.get(); let mut count2 = count.get(); @@ -79,7 +79,7 @@ fn ping_pong() { let pid2 = peer2_id.clone(); let peer2 = async move { - Swarm::dial_addr(&mut swarm2, rx.next().await.unwrap()).unwrap(); + swarm2.dial_addr(rx.next().await.unwrap()).unwrap(); loop { match swarm2.next().await { @@ -124,7 +124,7 @@ fn max_failures() { let (mut tx, mut rx) = mpsc::channel::(1); let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap(); - Swarm::listen_on(&mut swarm1, addr).unwrap(); + swarm1.listen_on(addr).unwrap(); let peer1 = async move { let mut count1: u8 = 0; @@ -151,7 +151,7 @@ fn max_failures() { }; let peer2 = async move { - Swarm::dial_addr(&mut swarm2, rx.next().await.unwrap()).unwrap(); + swarm2.dial_addr(rx.next().await.unwrap()).unwrap(); let mut count2: u8 = 0; diff --git a/protocols/relay/examples/relay.rs b/protocols/relay/examples/relay.rs index b52637e2cd3..2e22cdff7f0 100644 --- a/protocols/relay/examples/relay.rs +++ b/protocols/relay/examples/relay.rs @@ -60,7 +60,7 @@ fn main() -> Result<(), Box> { let mut swarm = Swarm::new(transport, relay_behaviour, local_peer_id); // Listen on all interfaces and whatever port the OS assigns - Swarm::listen_on(&mut swarm, "/ip6/::/tcp/0".parse()?)?; + swarm.listen_on("/ip6/::/tcp/0".parse()?)?; let mut listening = false; block_on(futures::future::poll_fn(move |cx: &mut Context<'_>| { diff --git a/protocols/relay/src/lib.rs b/protocols/relay/src/lib.rs index 62a0645c432..7f81860534d 100644 --- a/protocols/relay/src/lib.rs +++ b/protocols/relay/src/lib.rs @@ -59,10 +59,10 @@ //! let dst_addr = relay_addr.clone().with(Protocol::Memory(5678)); //! //! // Listen for incoming connections via relay node (1234). -//! Swarm::listen_on(&mut swarm, relay_addr).unwrap(); +//! swarm.listen_on(relay_addr).unwrap(); //! //! // Dial node (5678) via relay node (1234). -//! Swarm::dial_addr(&mut swarm, dst_addr).unwrap(); +//! swarm.dial_addr(dst_addr).unwrap(); //! ``` //! //! ## Terminology diff --git a/protocols/relay/tests/lib.rs b/protocols/relay/tests/lib.rs index 70584d8a62b..53ee81652e3 100644 --- a/protocols/relay/tests/lib.rs +++ b/protocols/relay/tests/lib.rs @@ -57,9 +57,9 @@ fn src_connect_to_dst_listening_via_relay() { let mut dst_swarm = build_swarm(Reachability::Firewalled, RelayMode::Passive); let mut relay_swarm = build_swarm(Reachability::Routable, RelayMode::Passive); - let src_peer_id = Swarm::local_peer_id(&src_swarm).clone(); - let dst_peer_id = Swarm::local_peer_id(&dst_swarm).clone(); - let relay_peer_id = Swarm::local_peer_id(&relay_swarm).clone(); + let src_peer_id = *src_swarm.local_peer_id(); + let dst_peer_id = *dst_swarm.local_peer_id(); + let relay_peer_id = *relay_swarm.local_peer_id(); let relay_addr = Multiaddr::empty().with(Protocol::Memory(rand::random::())); let dst_listen_addr_via_relay = relay_addr @@ -70,10 +70,10 @@ fn src_connect_to_dst_listening_via_relay() { .clone() .with(Protocol::P2p(dst_peer_id.into())); - Swarm::listen_on(&mut relay_swarm, relay_addr.clone()).unwrap(); + relay_swarm.listen_on(relay_addr.clone()).unwrap(); spawn_swarm_on_pool(&pool, relay_swarm); - Swarm::listen_on(&mut dst_swarm, dst_listen_addr_via_relay.clone()).unwrap(); + dst_swarm.listen_on(dst_listen_addr_via_relay.clone()).unwrap(); pool.run_until(async { // Destination Node dialing Relay. @@ -142,7 +142,7 @@ fn src_connect_to_dst_listening_via_relay() { } }; - Swarm::dial_addr(&mut src_swarm, dst_addr_via_relay).unwrap(); + src_swarm.dial_addr(dst_addr_via_relay).unwrap(); let src = async move { // Source Node dialing Relay to connect to Destination Node. match src_swarm.next_event().await { @@ -197,8 +197,8 @@ fn src_connect_to_dst_not_listening_via_active_relay() { let mut dst_swarm = build_swarm(Reachability::Routable, RelayMode::Passive); let mut relay_swarm = build_swarm(Reachability::Routable, RelayMode::Active); - let relay_peer_id = Swarm::local_peer_id(&relay_swarm).clone(); - let dst_peer_id = Swarm::local_peer_id(&dst_swarm).clone(); + let relay_peer_id = *relay_swarm.local_peer_id(); + let dst_peer_id = *dst_swarm.local_peer_id(); let relay_addr: Multiaddr = Protocol::Memory(rand::random::()).into(); let dst_addr: Multiaddr = Protocol::Memory(rand::random::()).into(); @@ -209,15 +209,15 @@ fn src_connect_to_dst_not_listening_via_active_relay() { .with(dst_addr.into_iter().next().unwrap()) .with(Protocol::P2p(dst_peer_id.clone().into())); - Swarm::listen_on(&mut relay_swarm, relay_addr.clone()).unwrap(); + relay_swarm.listen_on(relay_addr.clone()).unwrap(); spawn_swarm_on_pool(&pool, relay_swarm); - Swarm::listen_on(&mut dst_swarm, dst_addr.clone()).unwrap(); + dst_swarm.listen_on(dst_addr.clone()).unwrap(); // Instruct destination node to listen for incoming relayed connections from unknown relay nodes. - Swarm::listen_on(&mut dst_swarm, Protocol::P2pCircuit.into()).unwrap(); + dst_swarm.listen_on(Protocol::P2pCircuit.into()).unwrap(); spawn_swarm_on_pool(&pool, dst_swarm); - Swarm::dial_addr(&mut src_swarm, dst_addr_via_relay).unwrap(); + src_swarm.dial_addr(dst_addr_via_relay).unwrap(); pool.run_until(async move { // Source Node dialing Relay to connect to Destination Node. match src_swarm.next_event().await { @@ -269,8 +269,8 @@ fn src_connect_to_dst_via_established_connection_to_relay() { let mut dst_swarm = build_swarm(Reachability::Routable, RelayMode::Passive); let mut relay_swarm = build_swarm(Reachability::Routable, RelayMode::Passive); - let relay_peer_id = Swarm::local_peer_id(&relay_swarm).clone(); - let dst_peer_id = Swarm::local_peer_id(&dst_swarm).clone(); + let relay_peer_id = *relay_swarm.local_peer_id(); + let dst_peer_id = *dst_swarm.local_peer_id(); let relay_addr: Multiaddr = Protocol::Memory(rand::random::()).into(); let dst_addr_via_relay = relay_addr @@ -279,10 +279,10 @@ fn src_connect_to_dst_via_established_connection_to_relay() { .with(Protocol::P2pCircuit) .with(Protocol::P2p(dst_peer_id.clone().into())); - Swarm::listen_on(&mut relay_swarm, relay_addr.clone()).unwrap(); + relay_swarm.listen_on(relay_addr.clone()).unwrap(); spawn_swarm_on_pool(&pool, relay_swarm); - Swarm::listen_on(&mut dst_swarm, dst_addr_via_relay.clone()).unwrap(); + dst_swarm.listen_on(dst_addr_via_relay.clone()).unwrap(); // Wait for destination to listen via relay. pool.run_until(async { loop { @@ -297,7 +297,7 @@ fn src_connect_to_dst_via_established_connection_to_relay() { spawn_swarm_on_pool(&pool, dst_swarm); pool.run_until(async move { - Swarm::dial_addr(&mut src_swarm, relay_addr).unwrap(); + src_swarm.dial_addr(relay_addr).unwrap(); // Source Node establishing connection to Relay. loop { @@ -311,7 +311,7 @@ fn src_connect_to_dst_via_established_connection_to_relay() { } } - Swarm::dial_addr(&mut src_swarm, dst_addr_via_relay).unwrap(); + src_swarm.dial_addr(dst_addr_via_relay).unwrap(); // Source Node establishing connection to destination node via Relay. loop { @@ -350,7 +350,7 @@ fn src_try_connect_to_offline_dst() { let mut src_swarm = build_swarm(Reachability::Firewalled, RelayMode::Passive); let mut relay_swarm = build_swarm(Reachability::Routable, RelayMode::Passive); - let relay_peer_id = Swarm::local_peer_id(&relay_swarm).clone(); + let relay_peer_id = *relay_swarm.local_peer_id(); let dst_peer_id = PeerId::random(); let relay_addr: Multiaddr = Protocol::Memory(rand::random::()).into(); @@ -362,10 +362,10 @@ fn src_try_connect_to_offline_dst() { .with(dst_addr.into_iter().next().unwrap()) .with(Protocol::P2p(dst_peer_id.clone().into())); - Swarm::listen_on(&mut relay_swarm, relay_addr.clone()).unwrap(); + relay_swarm.listen_on(relay_addr.clone()).unwrap(); spawn_swarm_on_pool(&pool, relay_swarm); - Swarm::dial_addr(&mut src_swarm, dst_addr_via_relay.clone()).unwrap(); + src_swarm.dial_addr(dst_addr_via_relay.clone()).unwrap(); pool.run_until(async move { // Source Node dialing Relay to connect to Destination Node. match src_swarm.next_event().await { @@ -403,8 +403,8 @@ fn src_try_connect_to_unsupported_dst() { let mut relay_swarm = build_swarm(Reachability::Routable, RelayMode::Passive); let mut dst_swarm = build_keep_alive_only_swarm(); - let relay_peer_id = Swarm::local_peer_id(&relay_swarm).clone(); - let dst_peer_id = Swarm::local_peer_id(&dst_swarm).clone(); + let relay_peer_id = *relay_swarm.local_peer_id(); + let dst_peer_id = *dst_swarm.local_peer_id(); let relay_addr: Multiaddr = Protocol::Memory(rand::random::()).into(); let dst_addr: Multiaddr = Protocol::Memory(rand::random::()).into(); @@ -415,13 +415,13 @@ fn src_try_connect_to_unsupported_dst() { .with(dst_addr.into_iter().next().unwrap()) .with(Protocol::P2p(dst_peer_id.clone().into())); - Swarm::listen_on(&mut relay_swarm, relay_addr.clone()).unwrap(); + relay_swarm.listen_on(relay_addr.clone()).unwrap(); spawn_swarm_on_pool(&pool, relay_swarm); - Swarm::listen_on(&mut dst_swarm, dst_addr.clone()).unwrap(); + dst_swarm.listen_on(dst_addr.clone()).unwrap(); spawn_swarm_on_pool(&pool, dst_swarm); - Swarm::dial_addr(&mut src_swarm, dst_addr_via_relay.clone()).unwrap(); + src_swarm.dial_addr(dst_addr_via_relay.clone()).unwrap(); pool.run_until(async move { // Source Node dialing Relay to connect to Destination Node. match src_swarm.next_event().await { @@ -470,7 +470,7 @@ fn src_try_connect_to_offline_dst_via_offline_relay() { .with(dst_addr.into_iter().next().unwrap()) .with(Protocol::P2p(dst_peer_id.clone().into())); - Swarm::dial_addr(&mut src_swarm, dst_addr_via_relay.clone()).unwrap(); + src_swarm.dial_addr(dst_addr_via_relay.clone()).unwrap(); pool.run_until(async move { // Source Node dialing Relay to connect to Destination Node. match src_swarm.next_event().await { @@ -503,9 +503,9 @@ fn firewalled_src_discover_firewalled_dst_via_kad_and_connect_to_dst_via_routabl let mut dst_swarm = build_swarm(Reachability::Firewalled, RelayMode::Passive); let mut relay_swarm = build_swarm(Reachability::Routable, RelayMode::Passive); - let src_peer_id = Swarm::local_peer_id(&src_swarm).clone(); - let dst_peer_id = Swarm::local_peer_id(&dst_swarm).clone(); - let relay_peer_id = Swarm::local_peer_id(&relay_swarm).clone(); + let src_peer_id = *src_swarm.local_peer_id(); + let dst_peer_id = *dst_swarm.local_peer_id(); + let relay_peer_id = *relay_swarm.local_peer_id(); let relay_addr: Multiaddr = Protocol::Memory(rand::random::()).into(); let dst_addr_via_relay = relay_addr @@ -515,17 +515,19 @@ fn firewalled_src_discover_firewalled_dst_via_kad_and_connect_to_dst_via_routabl .with(Protocol::P2p(dst_peer_id.into())); src_swarm + .behaviour_mut() .kad .add_address(&relay_peer_id, relay_addr.clone()); dst_swarm + .behaviour_mut() .kad .add_address(&relay_peer_id, relay_addr.clone()); - Swarm::listen_on(&mut relay_swarm, relay_addr.clone()).unwrap(); + relay_swarm.listen_on(relay_addr.clone()).unwrap(); spawn_swarm_on_pool(&pool, relay_swarm); // Destination Node listen via Relay. - Swarm::listen_on(&mut dst_swarm, dst_addr_via_relay.clone()).unwrap(); + dst_swarm.listen_on(dst_addr_via_relay.clone()).unwrap(); pool.run_until(async { // Destination Node dialing Relay. @@ -560,7 +562,7 @@ fn firewalled_src_discover_firewalled_dst_via_kad_and_connect_to_dst_via_routabl } // Destination Node bootstrapping. - let query_id = dst_swarm.kad.bootstrap().unwrap(); + let query_id = dst_swarm.behaviour_mut().kad.bootstrap().unwrap(); loop { match dst_swarm.next_event().await { SwarmEvent::Behaviour(CombinedEvent::Kad(KademliaEvent::QueryResult { @@ -568,7 +570,7 @@ fn firewalled_src_discover_firewalled_dst_via_kad_and_connect_to_dst_via_routabl result: QueryResult::Bootstrap(Ok(_)), .. })) if query_id == id => { - if dst_swarm.kad.iter_queries().count() == 0 { + if dst_swarm.behaviour_mut().kad.iter_queries().count() == 0 { break; } } @@ -617,7 +619,7 @@ fn firewalled_src_discover_firewalled_dst_via_kad_and_connect_to_dst_via_routabl let src = async move { // Source Node looking for Destination Node on the Kademlia DHT. - let mut query_id = src_swarm.kad.get_closest_peers(dst_peer_id); + let mut query_id = src_swarm.behaviour_mut().kad.get_closest_peers(dst_peer_id); // One has to retry multiple times to wait for Relay to receive Identify event from Node // B. let mut tries = 0; @@ -648,7 +650,7 @@ fn firewalled_src_discover_firewalled_dst_via_kad_and_connect_to_dst_via_routabl panic!("Too many retries."); } - query_id = src_swarm.kad.get_closest_peers(dst_peer_id); + query_id = src_swarm.behaviour_mut().kad.get_closest_peers(dst_peer_id); } SwarmEvent::Behaviour(CombinedEvent::Kad(KademliaEvent::RoutingUpdated { .. @@ -691,10 +693,10 @@ fn inactive_connection_timeout() { let mut relay_swarm = build_keep_alive_swarm(); // Connections only kept alive by Source Node and Destination Node. - relay_swarm.keep_alive.keep_alive = KeepAlive::No; + relay_swarm.behaviour_mut().keep_alive.keep_alive = KeepAlive::No; - let relay_peer_id = Swarm::local_peer_id(&relay_swarm).clone(); - let dst_peer_id = Swarm::local_peer_id(&dst_swarm).clone(); + let relay_peer_id = *relay_swarm.local_peer_id(); + let dst_peer_id = *dst_swarm.local_peer_id(); let relay_addr: Multiaddr = Protocol::Memory(rand::random::()).into(); let dst_addr_via_relay = relay_addr @@ -703,10 +705,10 @@ fn inactive_connection_timeout() { .with(Protocol::P2pCircuit) .with(Protocol::P2p(dst_peer_id.clone().into())); - Swarm::listen_on(&mut relay_swarm, relay_addr.clone()).unwrap(); + relay_swarm.listen_on(relay_addr.clone()).unwrap(); spawn_swarm_on_pool(&pool, relay_swarm); - Swarm::listen_on(&mut dst_swarm, dst_addr_via_relay.clone()).unwrap(); + dst_swarm.listen_on(dst_addr_via_relay.clone()).unwrap(); // Wait for destination to listen via relay. pool.run_until(async { loop { @@ -721,7 +723,7 @@ fn inactive_connection_timeout() { spawn_swarm_on_pool(&pool, dst_swarm); pool.run_until(async move { - Swarm::dial_addr(&mut src_swarm, relay_addr).unwrap(); + src_swarm.dial_addr(relay_addr).unwrap(); // Source Node dialing Relay. loop { match src_swarm.next_event().await { @@ -733,7 +735,7 @@ fn inactive_connection_timeout() { } } - Swarm::dial_addr(&mut src_swarm, dst_addr_via_relay).unwrap(); + src_swarm.dial_addr(dst_addr_via_relay).unwrap(); // Source Node establishing connection to destination node via Relay. match src_swarm.next_event().await { @@ -767,8 +769,8 @@ fn concurrent_connection_same_relay_same_dst() { let mut dst_swarm = build_swarm(Reachability::Routable, RelayMode::Passive); let mut relay_swarm = build_swarm(Reachability::Routable, RelayMode::Passive); - let relay_peer_id = Swarm::local_peer_id(&relay_swarm).clone(); - let dst_peer_id = Swarm::local_peer_id(&dst_swarm).clone(); + let relay_peer_id = *relay_swarm.local_peer_id(); + let dst_peer_id = *dst_swarm.local_peer_id(); let relay_addr: Multiaddr = Protocol::Memory(rand::random::()).into(); let dst_addr_via_relay = relay_addr @@ -777,10 +779,10 @@ fn concurrent_connection_same_relay_same_dst() { .with(Protocol::P2pCircuit) .with(Protocol::P2p(dst_peer_id.clone().into())); - Swarm::listen_on(&mut relay_swarm, relay_addr.clone()).unwrap(); + relay_swarm.listen_on(relay_addr.clone()).unwrap(); spawn_swarm_on_pool(&pool, relay_swarm); - Swarm::listen_on(&mut dst_swarm, dst_addr_via_relay.clone()).unwrap(); + dst_swarm.listen_on(dst_addr_via_relay.clone()).unwrap(); // Wait for destination to listen via relay. pool.run_until(async { loop { @@ -795,8 +797,8 @@ fn concurrent_connection_same_relay_same_dst() { spawn_swarm_on_pool(&pool, dst_swarm); pool.run_until(async move { - Swarm::dial_addr(&mut src_swarm, dst_addr_via_relay.clone()).unwrap(); - Swarm::dial_addr(&mut src_swarm, dst_addr_via_relay).unwrap(); + src_swarm.dial_addr(dst_addr_via_relay.clone()).unwrap(); + src_swarm.dial_addr(dst_addr_via_relay).unwrap(); // Source Node establishing two connections to destination node via Relay. let mut num_established = 0; @@ -849,13 +851,13 @@ fn yield_incoming_connection_through_correct_listener() { let mut relay_2_swarm = build_swarm(Reachability::Routable, RelayMode::Passive); let mut relay_3_swarm = build_swarm(Reachability::Routable, RelayMode::Active); - let dst_peer_id = Swarm::local_peer_id(&dst_swarm).clone(); - let src_1_peer_id = Swarm::local_peer_id(&src_1_swarm).clone(); - let src_2_peer_id = Swarm::local_peer_id(&src_2_swarm).clone(); - let src_3_peer_id = Swarm::local_peer_id(&src_3_swarm).clone(); - let relay_1_peer_id = Swarm::local_peer_id(&relay_1_swarm).clone(); - let relay_2_peer_id = Swarm::local_peer_id(&relay_2_swarm).clone(); - let relay_3_peer_id = Swarm::local_peer_id(&relay_3_swarm).clone(); + let dst_peer_id = *dst_swarm.local_peer_id(); + let src_1_peer_id = *src_1_swarm.local_peer_id(); + let src_2_peer_id = *src_2_swarm.local_peer_id(); + let src_3_peer_id = *src_3_swarm.local_peer_id(); + let relay_1_peer_id = *relay_1_swarm.local_peer_id(); + let relay_2_peer_id = *relay_2_swarm.local_peer_id(); + let relay_3_peer_id = *relay_3_swarm.local_peer_id(); let dst_memory_port = Protocol::Memory(rand::random::()); let dst_addr = Multiaddr::empty().with(dst_memory_port.clone()); @@ -886,19 +888,19 @@ fn yield_incoming_connection_through_correct_listener() { .with(dst_memory_port) .with(Protocol::P2p(dst_peer_id.into())); - Swarm::listen_on(&mut relay_1_swarm, relay_1_addr.clone()).unwrap(); + relay_1_swarm.listen_on(relay_1_addr.clone()).unwrap(); spawn_swarm_on_pool(&pool, relay_1_swarm); - Swarm::listen_on(&mut relay_2_swarm, relay_2_addr.clone()).unwrap(); + relay_2_swarm.listen_on(relay_2_addr.clone()).unwrap(); spawn_swarm_on_pool(&pool, relay_2_swarm); - Swarm::listen_on(&mut relay_3_swarm, relay_3_addr.clone()).unwrap(); + relay_3_swarm.listen_on(relay_3_addr.clone()).unwrap(); spawn_swarm_on_pool(&pool, relay_3_swarm); - Swarm::listen_on(&mut dst_swarm, relay_1_addr_incl_circuit.clone()).unwrap(); - Swarm::listen_on(&mut dst_swarm, relay_2_addr_incl_circuit.clone()).unwrap(); + dst_swarm.listen_on(relay_1_addr_incl_circuit.clone()).unwrap(); + dst_swarm.listen_on(relay_2_addr_incl_circuit.clone()).unwrap(); // Listen on own address in order for relay 3 to be able to connect to destination node. - Swarm::listen_on(&mut dst_swarm, dst_addr.clone()).unwrap(); + dst_swarm.listen_on(dst_addr.clone()).unwrap(); // Wait for destination node to establish connections to relay 1 and 2. pool.run_until(async { @@ -925,8 +927,8 @@ fn yield_incoming_connection_through_correct_listener() { } }); - Swarm::dial_addr(&mut src_1_swarm, dst_addr_via_relay_1.clone()).unwrap(); - Swarm::dial_addr(&mut src_2_swarm, dst_addr_via_relay_2.clone()).unwrap(); + src_1_swarm.dial_addr(dst_addr_via_relay_1.clone()).unwrap(); + src_2_swarm.dial_addr(dst_addr_via_relay_2.clone()).unwrap(); spawn_swarm_on_pool(&pool, src_1_swarm); spawn_swarm_on_pool(&pool, src_2_swarm); @@ -985,7 +987,7 @@ fn yield_incoming_connection_through_correct_listener() { // Expect destination node to reject incoming connection from unknown relay given that // destination node is not listening for such connections. - Swarm::dial_addr(&mut src_3_swarm, dst_addr_via_relay_3.clone()).unwrap(); + src_3_swarm.dial_addr(dst_addr_via_relay_3.clone()).unwrap(); pool.run_until(poll_fn(|cx| { match dst_swarm.next_event().boxed().poll_unpin(cx) { Poll::Ready(SwarmEvent::Behaviour(CombinedEvent::Ping(_))) => {} @@ -1030,7 +1032,7 @@ fn yield_incoming_connection_through_correct_listener() { })); // Instruct destination node to listen for incoming relayed connections from unknown relay nodes. - Swarm::listen_on(&mut dst_swarm, Protocol::P2pCircuit.into()).unwrap(); + dst_swarm.listen_on(Protocol::P2pCircuit.into()).unwrap(); // Wait for destination node to report new listen address. pool.run_until(async { loop { @@ -1048,7 +1050,7 @@ fn yield_incoming_connection_through_correct_listener() { // Expect destination node to accept incoming connection from "unknown" relay, i.e. the // connection from source node 3 via relay 3. - Swarm::dial_addr(&mut src_3_swarm, dst_addr_via_relay_3.clone()).unwrap(); + src_3_swarm.dial_addr(dst_addr_via_relay_3.clone()).unwrap(); pool.run_until(async move { loop { match src_3_swarm.next_event().await { diff --git a/protocols/request-response/tests/ping.rs b/protocols/request-response/tests/ping.rs index 7c07d6a159c..734a6729f9e 100644 --- a/protocols/request-response/tests/ping.rs +++ b/protocols/request-response/tests/ping.rs @@ -50,7 +50,7 @@ fn is_response_outbound() { let ping_proto1 = RequestResponse::new(PingCodec(), protocols, cfg); let mut swarm1 = Swarm::new(trans, ping_proto1, peer1_id); - let request_id1 = swarm1.send_request(&offline_peer, ping.clone()); + let request_id1 = swarm1.behaviour_mut().send_request(&offline_peer, ping.clone()); match futures::executor::block_on(swarm1.next()) { RequestResponseEvent::OutboundFailure{peer, request_id: req_id, error: _error} => { @@ -60,10 +60,10 @@ fn is_response_outbound() { e => panic!("Peer: Unexpected event: {:?}", e), } - let request_id2 = swarm1.send_request(&offline_peer, ping); + let request_id2 = swarm1.behaviour_mut().send_request(&offline_peer, ping); - assert!(!swarm1.is_pending_outbound(&offline_peer, &request_id1)); - assert!(swarm1.is_pending_outbound(&offline_peer, &request_id2)); + assert!(!swarm1.behaviour().is_pending_outbound(&offline_peer, &request_id1)); + assert!(swarm1.behaviour().is_pending_outbound(&offline_peer, &request_id2)); } /// Exercises a simple ping protocol. @@ -86,7 +86,7 @@ fn ping_protocol() { let (mut tx, mut rx) = mpsc::channel::(1); let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap(); - Swarm::listen_on(&mut swarm1, addr).unwrap(); + swarm1.listen_on(addr).unwrap(); let expected_ping = ping.clone(); let expected_pong = pong.clone(); @@ -101,7 +101,7 @@ fn ping_protocol() { }) => { assert_eq!(&request, &expected_ping); assert_eq!(&peer, &peer2_id); - swarm1.send_response(channel, pong.clone()).unwrap(); + swarm1.behaviour_mut().send_response(channel, pong.clone()).unwrap(); }, SwarmEvent::Behaviour(RequestResponseEvent::ResponseSent { peer, .. @@ -119,9 +119,9 @@ fn ping_protocol() { let peer2 = async move { let mut count = 0; let addr = rx.next().await.unwrap(); - swarm2.add_address(&peer1_id, addr.clone()); - let mut req_id = swarm2.send_request(&peer1_id, ping.clone()); - assert!(swarm2.is_pending_outbound(&peer1_id, &req_id)); + swarm2.behaviour_mut().add_address(&peer1_id, addr.clone()); + let mut req_id = swarm2.behaviour_mut().send_request(&peer1_id, ping.clone()); + assert!(swarm2.behaviour().is_pending_outbound(&peer1_id, &req_id)); loop { match swarm2.next().await { @@ -136,7 +136,7 @@ fn ping_protocol() { if count >= num_pings { return } else { - req_id = swarm2.send_request(&peer1_id, ping.clone()); + req_id = swarm2.behaviour_mut().send_request(&peer1_id, ping.clone()); } }, e => panic!("Peer2: Unexpected event: {:?}", e) @@ -164,14 +164,14 @@ fn emits_inbound_connection_closed_failure() { let mut swarm2 = Swarm::new(trans, ping_proto2, peer2_id); let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap(); - Swarm::listen_on(&mut swarm1, addr).unwrap(); + swarm1.listen_on(addr).unwrap(); futures::executor::block_on(async move { while swarm1.next().now_or_never().is_some() {} let addr1 = Swarm::listeners(&swarm1).next().unwrap(); - swarm2.add_address(&peer1_id, addr1.clone()); - swarm2.send_request(&peer1_id, ping.clone()); + swarm2.behaviour_mut().add_address(&peer1_id, addr1.clone()); + swarm2.behaviour_mut().send_request(&peer1_id, ping.clone()); // Wait for swarm 1 to receive request by swarm 2. let _channel = loop { @@ -222,14 +222,14 @@ fn emits_inbound_connection_closed_if_channel_is_dropped() { let mut swarm2 = Swarm::new(trans, ping_proto2, peer2_id); let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap(); - Swarm::listen_on(&mut swarm1, addr).unwrap(); + swarm1.listen_on(addr).unwrap(); futures::executor::block_on(async move { while swarm1.next().now_or_never().is_some() {} let addr1 = Swarm::listeners(&swarm1).next().unwrap(); - swarm2.add_address(&peer1_id, addr1.clone()); - swarm2.send_request(&peer1_id, ping.clone()); + swarm2.behaviour_mut().add_address(&peer1_id, addr1.clone()); + swarm2.behaviour_mut().send_request(&peer1_id, ping.clone()); // Wait for swarm 1 to receive request by swarm 2. let event = loop { @@ -278,15 +278,15 @@ fn ping_protocol_throttled() { let (mut tx, mut rx) = mpsc::channel::(1); let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap(); - Swarm::listen_on(&mut swarm1, addr).unwrap(); + swarm1.listen_on(addr).unwrap(); let expected_ping = ping.clone(); let expected_pong = pong.clone(); let limit1: u16 = rand::thread_rng().gen_range(1, 10); let limit2: u16 = rand::thread_rng().gen_range(1, 10); - swarm1.set_receive_limit(NonZeroU16::new(limit1).unwrap()); - swarm2.set_receive_limit(NonZeroU16::new(limit2).unwrap()); + swarm1.behaviour_mut().set_receive_limit(NonZeroU16::new(limit1).unwrap()); + swarm2.behaviour_mut().set_receive_limit(NonZeroU16::new(limit2).unwrap()); let peer1 = async move { for i in 1 .. { @@ -298,7 +298,7 @@ fn ping_protocol_throttled() { })) => { assert_eq!(&request, &expected_ping); assert_eq!(&peer, &peer2_id); - swarm1.send_response(channel, pong.clone()).unwrap(); + swarm1.behaviour_mut().send_response(channel, pong.clone()).unwrap(); }, SwarmEvent::Behaviour(throttled::Event::Event(RequestResponseEvent::ResponseSent { peer, .. @@ -310,7 +310,7 @@ fn ping_protocol_throttled() { } if i % 31 == 0 { let lim = rand::thread_rng().gen_range(1, 17); - swarm1.override_receive_limit(&peer2_id, NonZeroU16::new(lim).unwrap()); + swarm1.behaviour_mut().override_receive_limit(&peer2_id, NonZeroU16::new(lim).unwrap()); } } }; @@ -320,14 +320,14 @@ fn ping_protocol_throttled() { let peer2 = async move { let mut count = 0; let addr = rx.next().await.unwrap(); - swarm2.add_address(&peer1_id, addr.clone()); + swarm2.behaviour_mut().add_address(&peer1_id, addr.clone()); let mut blocked = false; let mut req_ids = HashSet::new(); loop { if !blocked { - while let Some(id) = swarm2.send_request(&peer1_id, ping.clone()).ok() { + while let Some(id) = swarm2.behaviour_mut().send_request(&peer1_id, ping.clone()).ok() { req_ids.insert(id); } blocked = true; diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index f6efa8a6ce9..60d7b2c6134 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -6,6 +6,18 @@ There is a `Swarm`-scoped configuration for this version available since [1858](https://github.com/libp2p/rust-libp2p/pull/1858). +- Remove `Deref` and `DerefMut` implementations previously dereferencing to the + `NetworkBehaviour` on `Swarm`. Instead one can access the `NetworkBehaviour` + via `Swarm::behaviour` and `Swarm::behaviour_mut`. Methods on `Swarm` can now + be accessed directly, e.g. via `my_swarm.local_peer_id()`. You may use the + command below to transform fully qualified method calls on `Swarm` to simple + method calls. + + ``` bash + # Go from e.g. `Swarm::local_peer_id(&my_swarm)` to `my_swarm.local_peer_id()`. + grep -RiIl --include \*.rs --exclude-dir target . --exclude-dir .git | xargs sed -i "s/\(libp2p::\)*Swarm::\([a-z_]*\)(&mut \([a-z_0-9]*\), /\3.\2(/g" + ``` + # 0.27.2 [2021-02-04] - Have `ToggleProtoHandler` ignore listen upgrade errors when disabled. diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index c61f12e1de3..66c067baec7 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -124,7 +124,7 @@ use libp2p_core::{ }; use registry::{Addresses, AddressIntoIter}; use smallvec::SmallVec; -use std::{error, fmt, io, ops::{Deref, DerefMut}, pin::Pin, task::{Context, Poll}}; +use std::{error, fmt, io, pin::Pin, task::{Context, Poll}}; use std::collections::HashSet; use std::num::{NonZeroU32, NonZeroUsize}; use upgrade::UpgradeInfoSend as _; @@ -292,28 +292,6 @@ where substream_upgrade_protocol_override: Option, } -impl Deref for - ExpandedSwarm -where - THandler: IntoProtocolsHandler, -{ - type Target = TBehaviour; - - fn deref(&self) -> &Self::Target { - &self.behaviour - } -} - -impl DerefMut for - ExpandedSwarm -where - THandler: IntoProtocolsHandler, -{ - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.behaviour - } -} - impl Unpin for ExpandedSwarm where @@ -340,50 +318,50 @@ where TBehaviour: NetworkBehaviour, } /// Returns information about the [`Network`] underlying the `Swarm`. - pub fn network_info(me: &Self) -> NetworkInfo { - me.network.info() + pub fn network_info(&self) -> NetworkInfo { + self.network.info() } /// Starts listening on the given address. /// /// Returns an error if the address is not supported. - pub fn listen_on(me: &mut Self, addr: Multiaddr) -> Result> { - me.network.listen_on(addr) + pub fn listen_on(&mut self, addr: Multiaddr) -> Result> { + self.network.listen_on(addr) } /// Remove some listener. /// /// Returns `Ok(())` if there was a listener with this ID. - pub fn remove_listener(me: &mut Self, id: ListenerId) -> Result<(), ()> { - me.network.remove_listener(id) + pub fn remove_listener(&mut self, id: ListenerId) -> Result<(), ()> { + self.network.remove_listener(id) } /// Initiates a new dialing attempt to the given address. - pub fn dial_addr(me: &mut Self, addr: Multiaddr) -> Result<(), ConnectionLimit> { - let handler = me.behaviour.new_handler() + pub fn dial_addr(&mut self, addr: Multiaddr) -> Result<(), ConnectionLimit> { + let handler = self.behaviour.new_handler() .into_node_handler_builder() - .with_substream_upgrade_protocol_override(me.substream_upgrade_protocol_override); - me.network.dial(&addr, handler).map(|_id| ()) + .with_substream_upgrade_protocol_override(self.substream_upgrade_protocol_override); + self.network.dial(&addr, handler).map(|_id| ()) } /// Initiates a new dialing attempt to the given peer. - pub fn dial(me: &mut Self, peer_id: &PeerId) -> Result<(), DialError> { - if me.banned_peers.contains(peer_id) { - me.behaviour.inject_dial_failure(peer_id); + pub fn dial(&mut self, peer_id: &PeerId) -> Result<(), DialError> { + if self.banned_peers.contains(peer_id) { + self.behaviour.inject_dial_failure(peer_id); return Err(DialError::Banned) } - let self_listening = &me.listened_addrs; - let mut addrs = me.behaviour.addresses_of_peer(peer_id) + let self_listening = &self.listened_addrs; + let mut addrs = self.behaviour.addresses_of_peer(peer_id) .into_iter() .filter(|a| !self_listening.contains(a)); let result = if let Some(first) = addrs.next() { - let handler = me.behaviour.new_handler() + let handler = self.behaviour.new_handler() .into_node_handler_builder() - .with_substream_upgrade_protocol_override(me.substream_upgrade_protocol_override); - me.network.peer(*peer_id) + .with_substream_upgrade_protocol_override(self.substream_upgrade_protocol_override); + self.network.peer(*peer_id) .dial(first, addrs, handler) .map(|_| ()) .map_err(DialError::ConnectionLimit) @@ -395,27 +373,27 @@ where TBehaviour: NetworkBehaviour, log::debug!( "New dialing attempt to peer {:?} failed: {:?}.", peer_id, error); - me.behaviour.inject_dial_failure(&peer_id); + self.behaviour.inject_dial_failure(&peer_id); } result } /// Returns an iterator that produces the list of addresses we're listening on. - pub fn listeners(me: &Self) -> impl Iterator { - me.network.listen_addrs() + pub fn listeners(&self) -> impl Iterator { + self.network.listen_addrs() } /// Returns the peer ID of the swarm passed as parameter. - pub fn local_peer_id(me: &Self) -> &PeerId { - me.network.local_peer_id() + pub fn local_peer_id(&self) -> &PeerId { + self.network.local_peer_id() } /// Returns an iterator for [`AddressRecord`]s of external addresses /// of the local node, in decreasing order of their current /// [score](AddressScore). - pub fn external_addresses(me: &Self) -> impl Iterator { - me.external_addrs.iter() + pub fn external_addresses(&self) -> impl Iterator { + self.external_addrs.iter() } /// Adds an external address record for the local node. @@ -432,8 +410,8 @@ where TBehaviour: NetworkBehaviour, /// how frequently it is reported by the `NetworkBehaviour` via /// [`NetworkBehaviourAction::ReportObservedAddr`] or explicitly /// through this method. - pub fn add_external_address(me: &mut Self, a: Multiaddr, s: AddressScore) -> AddAddressResult { - me.external_addrs.add(a, s) + pub fn add_external_address(&mut self, a: Multiaddr, s: AddressScore) -> AddAddressResult { + self.external_addrs.add(a, s) } /// Removes an external address of the local node, regardless of @@ -442,30 +420,40 @@ where TBehaviour: NetworkBehaviour, /// /// Returns `true` if the address existed and was removed, `false` /// otherwise. - pub fn remove_external_address(me: &mut Self, addr: &Multiaddr) -> bool { - me.external_addrs.remove(addr) + pub fn remove_external_address(&mut self, addr: &Multiaddr) -> bool { + self.external_addrs.remove(addr) } /// Bans a peer by its peer ID. /// /// Any incoming connection and any dialing attempt will immediately be rejected. /// This function has no effect if the peer is already banned. - pub fn ban_peer_id(me: &mut Self, peer_id: PeerId) { - if me.banned_peers.insert(peer_id) { - if let Some(peer) = me.network.peer(peer_id).into_connected() { + pub fn ban_peer_id(&mut self, peer_id: PeerId) { + if self.banned_peers.insert(peer_id) { + if let Some(peer) = self.network.peer(peer_id).into_connected() { peer.disconnect(); } } } /// Unbans a peer. - pub fn unban_peer_id(me: &mut Self, peer_id: PeerId) { - me.banned_peers.remove(&peer_id); + pub fn unban_peer_id(&mut self, peer_id: PeerId) { + self.banned_peers.remove(&peer_id); } /// Checks whether the [`Network`] has an established connection to a peer. - pub fn is_connected(me: &Self, peer_id: &PeerId) -> bool { - me.network.is_connected(peer_id) + pub fn is_connected(&self, peer_id: &PeerId) -> bool { + self.network.is_connected(peer_id) + } + + /// Returns a reference to the provided [`NetworkBehaviour`]. + pub fn behaviour(&self) -> &TBehaviour { + &self.behaviour + } + + /// Returns a mutable reference to the provided [`NetworkBehaviour`]. + pub fn behaviour_mut(&mut self) -> &mut TBehaviour { + &mut self.behaviour } /// Returns the next event that happens in the `Swarm`. @@ -1165,8 +1153,8 @@ mod tests { let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::()).into(); let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::()).into(); - Swarm::listen_on(&mut swarm1, addr1.clone().into()).unwrap(); - Swarm::listen_on(&mut swarm2, addr2.clone().into()).unwrap(); + swarm1.listen_on(addr1.clone().into()).unwrap(); + swarm2.listen_on(addr2.clone().into()).unwrap(); // Test execution state. Connection => Disconnecting => Connecting. enum State { @@ -1174,7 +1162,7 @@ mod tests { Disconnecting, } - let swarm1_id = Swarm::local_peer_id(&swarm1).clone(); + let swarm1_id = *swarm1.local_peer_id(); let mut banned = false; let mut unbanned = false; @@ -1182,7 +1170,7 @@ mod tests { let num_connections = 10; for _ in 0 .. num_connections { - Swarm::dial_addr(&mut swarm1, addr2.clone()).unwrap(); + swarm1.dial_addr(addr2.clone()).unwrap(); } let mut state = State::Connecting; @@ -1207,7 +1195,7 @@ mod tests { if banned { return Poll::Ready(()) } - Swarm::ban_peer_id(&mut swarm2, swarm1_id.clone()); + swarm2.ban_peer_id(swarm1_id.clone()); swarm1.behaviour.reset(); swarm2.behaviour.reset(); banned = true; @@ -1231,12 +1219,12 @@ mod tests { return Poll::Ready(()) } // Unban the first peer and reconnect. - Swarm::unban_peer_id(&mut swarm2, swarm1_id.clone()); + swarm2.unban_peer_id(swarm1_id.clone()); swarm1.behaviour.reset(); swarm2.behaviour.reset(); unbanned = true; for _ in 0 .. num_connections { - Swarm::dial_addr(&mut swarm2, addr1.clone()).unwrap(); + swarm2.dial_addr(addr1.clone()).unwrap(); } state = State::Connecting; } From f1fb653bf931b6d9d43446e64031912be75d46e2 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Sat, 13 Mar 2021 17:57:39 +0100 Subject: [PATCH 2/2] examples/chat-tokio: Access behaviour via behaviour_mut() --- examples/chat-tokio.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/chat-tokio.rs b/examples/chat-tokio.rs index ebdfaf62175..b4c284ec21f 100644 --- a/examples/chat-tokio.rs +++ b/examples/chat-tokio.rs @@ -166,7 +166,7 @@ async fn main() -> Result<(), Box> { } }; if let Some((topic, line)) = to_publish { - swarm.floodsub.publish(topic, line.as_bytes()); + swarm.behaviour_mut().floodsub.publish(topic, line.as_bytes()); } if !listening { for addr in Swarm::listeners(&swarm) {