Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
devp2p: Move UDP socket handling from Discovery to Host.
Browse files Browse the repository at this point in the history
  • Loading branch information
jimpo committed Jun 5, 2018
1 parent 8469bc8 commit acc5a80
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 96 deletions.
96 changes: 15 additions & 81 deletions util/network-devp2p/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,13 @@
use ethcore_bytes::Bytes;
use std::net::SocketAddr;
use std::collections::{HashSet, HashMap, VecDeque};
use std::mem;
use std::default::Default;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use mio::*;
use mio::deprecated::{Handler, EventLoop};
use mio::udp::*;
use hash::keccak;
use ethereum_types::{H256, H520};
use rlp::{Rlp, RlpStream, encode_list};
use node_table::*;
use network::{Error, ErrorKind};
use io::{StreamToken, IoContext};
use ethkey::{Secret, KeyPair, sign, recover};
use network::IpFilter;

Expand All @@ -39,7 +34,7 @@ const ADDRESS_BITS: usize = 8 * ADDRESS_BYTES_SIZE; // Denoted by n in [Kademl
const DISCOVERY_MAX_STEPS: u16 = 8; // Max iterations of discovery. (discover)
const BUCKET_SIZE: usize = 16; // Denoted by k in [Kademlia]. Number of nodes stored in each bucket.
const ALPHA: usize = 3; // Denoted by \alpha in [Kademlia]. Number of concurrent FindNode requests.
const MAX_DATAGRAM_SIZE: usize = 1280;
pub const MAX_DATAGRAM_SIZE: usize = 1280;

const PACKET_PING: u8 = 1;
const PACKET_PONG: u8 = 2;
Expand Down Expand Up @@ -79,23 +74,21 @@ impl NodeBucket {
}
}

struct Datagramm {
payload: Bytes,
address: SocketAddr,
pub struct Datagramm {
pub payload: Bytes,
pub address: SocketAddr,
}

pub struct Discovery {
id: NodeId,
id_hash: H256,
secret: Secret,
public_endpoint: NodeEndpoint,
udp_socket: UdpSocket,
token: StreamToken,
discovery_round: u16,
discovery_id: NodeId,
discovery_nodes: HashSet<NodeId>,
node_buckets: Vec<NodeBucket>,
send_queue: VecDeque<Datagramm>,
pub send_queue: VecDeque<Datagramm>,
check_timestamps: bool,
adding_nodes: Vec<NodeEntry>,
ip_filter: IpFilter,
Expand All @@ -107,19 +100,16 @@ pub struct TableUpdates {
}

impl Discovery {
pub fn new(key: &KeyPair, listen: SocketAddr, public: NodeEndpoint, token: StreamToken, ip_filter: IpFilter) -> Discovery {
let socket = UdpSocket::bind(&listen).expect("Error binding UDP socket");
pub fn new(key: &KeyPair, public: NodeEndpoint, ip_filter: IpFilter) -> Discovery {
Discovery {
id: key.public().clone(),
id_hash: keccak(key.public()),
secret: key.secret().clone(),
public_endpoint: public,
token: token,
discovery_round: 0,
discovery_id: NodeId::new(),
discovery_nodes: HashSet::new(),
node_buckets: (0..ADDRESS_BITS).map(|_| NodeBucket::new()).collect(),
udp_socket: socket,
send_queue: VecDeque::new(),
check_timestamps: true,
adding_nodes: Vec::new(),
Expand Down Expand Up @@ -352,53 +342,12 @@ impl Discovery {
ret
}

pub fn writable<Message>(&mut self, io: &IoContext<Message>) where Message: Send + Sync + Clone {
while let Some(data) = self.send_queue.pop_front() {
match self.udp_socket.send_to(&data.payload, &data.address) {
Ok(Some(size)) if size == data.payload.len() => {
},
Ok(Some(_)) => {
warn!("UDP sent incomplete datagramm");
},
Ok(None) => {
self.send_queue.push_front(data);
return;
}
Err(e) => {
debug!("UDP send error: {:?}, address: {:?}", e, &data.address);
return;
}
}
}
io.update_registration(self.token).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e));
}

fn send_to(&mut self, payload: Bytes, address: SocketAddr) {
self.send_queue.push_back(Datagramm { payload: payload, address: address });
}

pub fn readable<Message>(&mut self, io: &IoContext<Message>) -> Option<TableUpdates> where Message: Send + Sync + Clone {
let mut buf: [u8; MAX_DATAGRAM_SIZE] = unsafe { mem::uninitialized() };
let writable = !self.send_queue.is_empty();
let res = match self.udp_socket.recv_from(&mut buf) {
Ok(Some((len, address))) => self.on_packet(&buf[0..len], address).unwrap_or_else(|e| {
debug!("Error processing UDP packet: {:?}", e);
None
}),
Ok(_) => None,
Err(e) => {
debug!("Error reading UPD socket: {:?}", e);
None
}
};
let new_writable = !self.send_queue.is_empty();
if writable != new_writable {
io.update_registration(self.token).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e));
}
res
}

fn on_packet(&mut self, packet: &[u8], from: SocketAddr) -> Result<Option<TableUpdates>, Error> {
pub fn on_packet(&mut self, packet: &[u8], from: SocketAddr) -> Result<Option<TableUpdates>, Error> {
// validate packet
if packet.len() < 32 + 65 + 4 + 1 {
return Err(ErrorKind::BadProtocol.into());
Expand Down Expand Up @@ -570,21 +519,6 @@ impl Discovery {
pub fn refresh(&mut self) {
self.start();
}

pub fn register_socket<Host:Handler>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> Result<(), Error> {
event_loop.register(&self.udp_socket, reg, Ready::all(), PollOpt::edge()).expect("Error registering UDP socket");
Ok(())
}

pub fn update_registration<Host:Handler>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> Result<(), Error> {
let registration = if !self.send_queue.is_empty() {
Ready::readable() | Ready::writable()
} else {
Ready::readable()
};
event_loop.reregister(&self.udp_socket, reg, registration, PollOpt::edge()).expect("Error reregistering UDP socket");
Ok(())
}
}

#[cfg(test)]
Expand Down Expand Up @@ -620,8 +554,8 @@ mod tests {
let key2 = Random.generate().unwrap();
let ep1 = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40444").unwrap(), udp_port: 40444 };
let ep2 = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40445").unwrap(), udp_port: 40445 };
let mut discovery1 = Discovery::new(&key1, ep1.address.clone(), ep1.clone(), 0, IpFilter::default());
let mut discovery2 = Discovery::new(&key2, ep2.address.clone(), ep2.clone(), 0, IpFilter::default());
let mut discovery1 = Discovery::new(&key1, ep1.clone(), IpFilter::default());
let mut discovery2 = Discovery::new(&key2, ep2.clone(), IpFilter::default());

let node1 = Node::from_str("enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@127.0.0.1:7770").unwrap();
let node2 = Node::from_str("enode://b979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@127.0.0.1:7771").unwrap();
Expand Down Expand Up @@ -653,7 +587,7 @@ mod tests {
fn removes_expired() {
let key = Random.generate().unwrap();
let ep = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40446").unwrap(), udp_port: 40447 };
let mut discovery = Discovery::new(&key, ep.address.clone(), ep.clone(), 0, IpFilter::default());
let mut discovery = Discovery::new(&key, ep.clone(), IpFilter::default());
for _ in 0..1200 {
discovery.add_node(NodeEntry { id: NodeId::random(), endpoint: ep.clone() });
}
Expand All @@ -668,7 +602,7 @@ mod tests {

let key = Random.generate().unwrap();
let ep = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40447").unwrap(), udp_port: 40447 };
let mut discovery = Discovery::new(&key, ep.address.clone(), ep.clone(), 0, IpFilter::default());
let mut discovery = Discovery::new(&key, ep.clone(), IpFilter::default());

for _ in 0..(16 + 10) {
discovery.node_buckets[0].nodes.push_back(BucketEntry {
Expand Down Expand Up @@ -728,7 +662,7 @@ mod tests {
let key = Secret::from_str(secret_hex)
.and_then(|secret| KeyPair::from_secret(secret))
.unwrap();
let mut discovery = Discovery::new(&key, ep.address.clone(), ep.clone(), 0, IpFilter::default());
let mut discovery = Discovery::new(&key, ep.clone(), IpFilter::default());

node_entries.iter().for_each(|entry| discovery.update_node(entry.clone()));

Expand Down Expand Up @@ -773,7 +707,7 @@ mod tests {
fn packets() {
let key = Random.generate().unwrap();
let ep = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40449").unwrap(), udp_port: 40449 };
let mut discovery = Discovery::new(&key, ep.address.clone(), ep.clone(), 0, IpFilter::default());
let mut discovery = Discovery::new(&key, ep.clone(), IpFilter::default());
discovery.check_timestamps = false;
let from = SocketAddr::from_str("99.99.99.99:40445").unwrap();

Expand Down Expand Up @@ -840,8 +774,8 @@ mod tests {
let key2 = Random.generate().unwrap();
let ep1 = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40344").unwrap(), udp_port: 40344 };
let ep2 = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40345").unwrap(), udp_port: 40345 };
let mut discovery1 = Discovery::new(&key1, ep1.address.clone(), ep1.clone(), 0, IpFilter::default());
let mut discovery2 = Discovery::new(&key2, ep2.address.clone(), ep2.clone(), 0, IpFilter::default());
let mut discovery1 = Discovery::new(&key1, ep1.clone(), IpFilter::default());
let mut discovery2 = Discovery::new(&key2, ep2.clone(), IpFilter::default());

discovery1.ping(&ep2);
let ping_data = discovery1.send_queue.pop_front().unwrap();
Expand Down
104 changes: 89 additions & 15 deletions util/network-devp2p/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ use std::cmp::{min, max};
use std::path::{Path, PathBuf};
use std::io::{Read, Write, self};
use std::fs;
use std::mem;
use std::time::Duration;
use ethkey::{KeyPair, Secret, Random, Generator};
use hash::keccak;
use mio::*;
use mio::deprecated::{EventLoop};
use mio::tcp::*;
use mio::udp::*;
use ethereum_types::H256;
use rlp::{RlpStream, Encodable};

Expand All @@ -40,7 +42,7 @@ use node_table::*;
use network::{NetworkConfiguration, NetworkIoMessage, ProtocolId, PeerId, PacketId};
use network::{NonReservedPeerMode, NetworkContext as NetworkContextTrait};
use network::{SessionInfo, Error, ErrorKind, DisconnectReason, NetworkProtocolHandler};
use discovery::{Discovery, TableUpdates, NodeEntry};
use discovery::{Discovery, TableUpdates, NodeEntry, MAX_DATAGRAM_SIZE};
use ip_utils::{map_external_address, select_public_address};
use path::restrict_permissions_owner;
use parking_lot::{Mutex, RwLock};
Expand Down Expand Up @@ -239,6 +241,7 @@ struct ProtocolTimer {
/// Root IO handler. Manages protocol handlers, IO timers and network connections.
pub struct Host {
pub info: RwLock<HostInfo>,
udp_socket: Mutex<Option<UdpSocket>>,
tcp_listener: Mutex<TcpListener>,
sessions: Arc<RwLock<Slab<SharedSession>>>,
discovery: Mutex<Option<Discovery>>,
Expand Down Expand Up @@ -295,6 +298,7 @@ impl Host {
local_endpoint: local_endpoint,
}),
discovery: Mutex::new(None),
udp_socket: Mutex::new(None),
tcp_listener: Mutex::new(tcp_listener),
sessions: Arc::new(RwLock::new(Slab::new_starting_at(FIRST_SESSION, MAX_SESSIONS))),
nodes: RwLock::new(NodeTable::new(path)),
Expand Down Expand Up @@ -458,13 +462,16 @@ impl Host {
let discovery = {
let info = self.info.read();
if info.config.discovery_enabled && info.config.non_reserved_mode == NonReservedPeerMode::Accept {
let mut udp_addr = local_endpoint.address.clone();
udp_addr.set_port(local_endpoint.udp_port);
Some(Discovery::new(&info.keys, udp_addr, public_endpoint, DISCOVERY, allow_ips))
Some(Discovery::new(&info.keys, public_endpoint, allow_ips))
} else { None }
};

if let Some(mut discovery) = discovery {
let mut udp_addr = local_endpoint.address.clone();
udp_addr.set_port(local_endpoint.udp_port);
let socket = UdpSocket::bind(&udp_addr).expect("Error binding UDP socket");
*self.udp_socket.lock() = Some(socket);

discovery.init_node_list(self.nodes.read().entries());
discovery.add_node_list(self.nodes.read().entries());
*self.discovery.lock() = Some(discovery);
Expand Down Expand Up @@ -819,6 +826,63 @@ impl Host {
}
}

fn discovery_readable(&self, io: &IoContext<NetworkIoMessage>) {
let node_changes = match (self.udp_socket.lock().as_ref(), self.discovery.lock().as_mut()) {
(Some(udp_socket), Some(discovery)) => {
let mut buf: [u8; MAX_DATAGRAM_SIZE] = unsafe { mem::uninitialized() };
let writable = !discovery.send_queue.is_empty();
let res = match udp_socket.recv_from(&mut buf) {
Ok(Some((len, address))) => discovery.on_packet(&buf[0..len], address).unwrap_or_else(|e| {
debug!("Error processing UDP packet: {:?}", e);
None
}),
Ok(_) => None,
Err(e) => {
debug!("Error reading UPD socket: {:?}", e);
None
}
};
let new_writable = !discovery.send_queue.is_empty();
if writable != new_writable {
io.update_registration(DISCOVERY)
.unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e));
}
res
},
_ => None,
};
if let Some(node_changes) = node_changes {
self.update_nodes(io, node_changes);
}
}

fn discovery_writable(&self, io: &IoContext<NetworkIoMessage>) {
match (self.udp_socket.lock().as_ref(), self.discovery.lock().as_mut()) {
(Some(udp_socket), Some(discovery)) => {
while let Some(data) = discovery.send_queue.pop_front() {
match udp_socket.send_to(&data.payload, &data.address) {
Ok(Some(size)) if size == data.payload.len() => {
},
Ok(Some(_)) => {
warn!("UDP sent incomplete datagramm");
},
Ok(None) => {
discovery.send_queue.push_front(data);
return;
}
Err(e) => {
debug!("UDP send error: {:?}, address: {:?}", e, &data.address);
return;
}
}
}
io.update_registration(DISCOVERY)
.unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e));
},
_ => (),
}
}

fn connection_timeout(&self, token: StreamToken, io: &IoContext<NetworkIoMessage>) {
trace!(target: "network", "Connection timeout: {}", token);
self.kill_connection(token, io, true)
Expand Down Expand Up @@ -920,12 +984,7 @@ impl IoHandler<NetworkIoMessage> for Host {
}
match stream {
FIRST_SESSION ... LAST_SESSION => self.session_readable(stream, io),
DISCOVERY => {
let node_changes = { self.discovery.lock().as_mut().map_or(None, |d| d.readable(io)) };
if let Some(node_changes) = node_changes {
self.update_nodes(io, node_changes);
}
},
DISCOVERY => self.discovery_readable(io),
TCP_ACCEPT => self.accept(io),
_ => panic!("Received unknown readable token"),
}
Expand All @@ -937,9 +996,7 @@ impl IoHandler<NetworkIoMessage> for Host {
}
match stream {
FIRST_SESSION ... LAST_SESSION => self.session_writable(stream, io),
DISCOVERY => {
self.discovery.lock().as_mut().map(|d| d.writable(io));
}
DISCOVERY => self.discovery_writable(io),
_ => panic!("Received unknown writable token"),
}
}
Expand Down Expand Up @@ -1055,7 +1112,13 @@ impl IoHandler<NetworkIoMessage> for Host {
session.lock().register_socket(reg, event_loop).expect("Error registering socket");
}
}
DISCOVERY => self.discovery.lock().as_ref().and_then(|d| d.register_socket(reg, event_loop).ok()).expect("Error registering discovery socket"),
DISCOVERY => match self.udp_socket.lock().as_ref() {
Some(udp_socket) => {
event_loop.register(udp_socket, reg, Ready::all(), PollOpt::edge())
.expect("Error registering UDP socket");
},
_ => panic!("Error registering discovery socket"),
}
TCP_ACCEPT => event_loop.register(&*self.tcp_listener.lock(), Token(TCP_ACCEPT), Ready::all(), PollOpt::edge()).expect("Error registering stream"),
_ => warn!("Unexpected stream registration")
}
Expand Down Expand Up @@ -1086,7 +1149,18 @@ impl IoHandler<NetworkIoMessage> for Host {
connection.lock().update_socket(reg, event_loop).expect("Error updating socket");
}
}
DISCOVERY => self.discovery.lock().as_ref().and_then(|d| d.update_registration(reg, event_loop).ok()).expect("Error reregistering discovery socket"),
DISCOVERY => match (self.udp_socket.lock().as_ref(), self.discovery.lock().as_ref()) {
(Some(udp_socket), Some(discovery)) => {
let registration = if !discovery.send_queue.is_empty() {
Ready::readable() | Ready::writable()
} else {
Ready::readable()
};
event_loop.reregister(udp_socket, reg, registration, PollOpt::edge())
.expect("Error reregistering UDP socket");
},
_ => panic!("Error reregistering discovery socket"),
}
TCP_ACCEPT => event_loop.reregister(&*self.tcp_listener.lock(), Token(TCP_ACCEPT), Ready::all(), PollOpt::edge()).expect("Error reregistering stream"),
_ => warn!("Unexpected stream update")
}
Expand Down

0 comments on commit acc5a80

Please sign in to comment.