This repository has been archived by the owner on Dec 8, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 43
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
rewrite async module to handle multiple responses
hopefully it's clearer now too, still to integrate retries somehow
- Loading branch information
Showing
5 changed files
with
179 additions
and
70 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,89 +1,185 @@ | ||
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; | ||
use std::str; | ||
use std::time::Duration; | ||
use std::collections::HashMap; | ||
|
||
use futures::prelude::*; | ||
use futures::future::Either; | ||
use futures::{Future, Stream}; | ||
|
||
use futures::future; | ||
use futures::{Future, IntoFuture, Stream}; | ||
use hyper::Client; | ||
|
||
use tokio::prelude::FutureExt; | ||
use tokio::net::UdpSocket; | ||
|
||
use bytes::Bytes; | ||
|
||
use async::Gateway; | ||
use common::{messages, parsing}; | ||
use errors::SearchError; | ||
|
||
/// Search gateway, bind to all interfaces and use a timeout of 3 seconds. | ||
/// | ||
/// Bind to all interfaces. | ||
/// The request will timeout after 3 seconds. | ||
pub fn search_gateway() -> Box<Future<Item = Gateway, Error = SearchError> + Send> { | ||
search_gateway_timeout(Duration::from_secs(3)) | ||
const MAX_RESPONSE_SIZE: usize = 1500; | ||
|
||
/// Gateway search configuration | ||
/// SearchOptions::default() should suffice for most situations | ||
pub struct SearchOptions { | ||
/// Bind address for UDP socket (defaults to all interfaces) | ||
pub bind_addr: SocketAddr, | ||
/// Broadcast address for discovery packets | ||
pub broadcast_address: SocketAddr, | ||
/// Timeout for a search iteration | ||
pub timeout: Option<Duration>, | ||
} | ||
|
||
impl Default for SearchOptions { | ||
fn default() -> Self { | ||
Self { | ||
bind_addr: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0)), | ||
broadcast_address: "239.255.255.250:1900".parse().unwrap(), | ||
timeout: Some(Duration::from_secs(3)), | ||
} | ||
} | ||
} | ||
|
||
/// Search gateway, bind to all interfaces and use the given duration for the timeout. | ||
/// | ||
/// Bind to all interfaces. | ||
/// The request will timeout after the given duration. | ||
pub fn search_gateway_timeout(timeout: Duration) -> Box<Future<Item = Gateway, Error = SearchError> + Send> { | ||
search_gateway_from_timeout(Ipv4Addr::new(0, 0, 0, 0), timeout) | ||
/// Search for a gateway with the provided options | ||
pub fn search_gateway(options: SearchOptions) -> impl Future<Item=Gateway, Error=SearchError> { | ||
|
||
// Create socket for future calls | ||
let socket = UdpSocket::bind(&options.bind_addr).unwrap(); | ||
|
||
// Create future and issue request | ||
match options.timeout { | ||
Some(t) => Either::A(SearchFuture::search(socket, options.broadcast_address) | ||
.and_then(|search| search ).timeout(t).map_err(|e| SearchError::from(e) )), | ||
_ => Either::B(SearchFuture::search(socket, options.broadcast_address).and_then(|search| search )), | ||
} | ||
} | ||
|
||
/// Search gateway, bind to the given interface and use a time of 3 seconds. | ||
/// | ||
/// Bind to the given interface. | ||
/// The request will timeout after 3 seconds. | ||
pub fn search_gateway_from(ip: Ipv4Addr) -> Box<Future<Item = Gateway, Error = SearchError> + Send> { | ||
search_gateway_from_timeout(ip, Duration::from_secs(3)) | ||
pub struct SearchFuture { | ||
socket: UdpSocket, | ||
pending: HashMap<SocketAddr, SearchState>, | ||
} | ||
|
||
/// Search gateway, bind to the given interface and use the given duration for the timeout. | ||
/// | ||
/// Bind to the given interface. | ||
/// The request will timeout after the given duration. | ||
pub fn search_gateway_from_timeout( | ||
ip: Ipv4Addr, | ||
timeout: Duration, | ||
) -> Box<Future<Item = Gateway, Error = SearchError> + Send> { | ||
let addr = SocketAddr::V4(SocketAddrV4::new(ip, 0)); | ||
let task = UdpSocket::bind(&addr) | ||
.into_future() | ||
.and_then(|socket| socket.send_dgram(messages::SEARCH_REQUEST.as_bytes(), &"239.255.255.250:1900".parse().unwrap())) | ||
.and_then(|(socket, _)| socket.recv_dgram(vec![0u8; 1500])) | ||
.map_err(|err| SearchError::from(err)) | ||
.and_then(|(_sock, buf, n, _addr)| { | ||
|
||
str::from_utf8(&buf[..n]) | ||
.map_err(|err| SearchError::from(err)) | ||
.and_then(|text| { | ||
println!("Recv: {:?}", text); | ||
parsing::parse_search_result(text) | ||
}) | ||
}) | ||
.and_then(move |location| { | ||
get_control_url(&location) | ||
.and_then(move |control_url| Ok(Gateway::new(location.0, control_url))) | ||
}); | ||
let timeout = task.timeout(timeout) | ||
.map_err(|e| e.into() ); | ||
|
||
Box::new(timeout) | ||
enum SearchState { | ||
Connecting(Box<Future<Item=Bytes, Error=SearchError> + Send>), | ||
Done(String), | ||
Error, | ||
} | ||
|
||
fn get_control_url( | ||
location: &(SocketAddrV4, String), | ||
) -> Box<Future<Item = String, Error = SearchError> + Send> { | ||
let client = Client::new(); | ||
let uri = match format!("http://{}{}", location.0, location.1).parse() { | ||
Ok(uri) => uri, | ||
Err(err) => return Box::new(future::err(SearchError::from(err))), | ||
}; | ||
let future = client | ||
.get(uri) | ||
.and_then(|resp| resp.into_body().concat2()) | ||
.then(|result| match result { | ||
Ok(body) => parsing::parse_control_url(body.as_ref()), | ||
Err(err) => Err(SearchError::from(err)), | ||
}); | ||
Box::new(future) | ||
impl SearchFuture { | ||
// Create a new search | ||
fn search(socket: UdpSocket, addr: SocketAddr) -> impl Future<Item=SearchFuture, Error=SearchError> { | ||
debug!("sending broadcast request to: {} on interface: {:?}", addr, socket.local_addr()); | ||
|
||
socket.send_dgram(messages::SEARCH_REQUEST.as_bytes(), &addr) | ||
.map(|(socket, _n)| SearchFuture{socket, pending: HashMap::new() }) | ||
.map_err(|e| SearchError::from(e) ) | ||
} | ||
|
||
// Handle a UDP response message | ||
fn handle_broadcast_resp(from: SocketAddr, data: &[u8]) -> Result<(SocketAddr, String), SearchError> { | ||
debug!("handling broadcast response from: {}, data: {:?}", from, data); | ||
|
||
// Convert response to text | ||
let text = str::from_utf8(&data) | ||
.map_err(|e| SearchError::from(e))?; | ||
|
||
// Parse socket address and path | ||
let (addr, path) = parsing::parse_search_result(text)?; | ||
|
||
Ok((SocketAddr::V4(addr), path)) | ||
} | ||
|
||
// Issue a control URL request over HTTP using the provided | ||
fn request_control_url(addr: SocketAddr, path: String) -> Result<Box<Future<Item=Bytes, Error=SearchError> + Send>, SearchError> { | ||
let client = Client::new(); | ||
|
||
let uri = match format!("http://{}{}", addr, path).parse() { | ||
Ok(uri) => uri, | ||
Err(err) => return Err(SearchError::from(err)), | ||
}; | ||
|
||
debug!("requesting control url from: {}", uri); | ||
|
||
Ok(Box::new(client.get(uri) | ||
.and_then(|resp| resp.into_body().concat2() ) | ||
.map(|chunk| chunk.into_bytes() ) | ||
.map_err(|e| SearchError::from(e) ) | ||
)) | ||
} | ||
|
||
// Process a control response to extract the control URL | ||
fn handle_control_resp(addr: SocketAddr, resp: Bytes) -> Result<String, SearchError> { | ||
debug!("handling control response from: {}, data: {:?}", addr, resp); | ||
|
||
// Create a cursor over the response data | ||
let c = std::io::Cursor::new(&resp); | ||
|
||
// Parse control URL out of body | ||
let url = parsing::parse_control_url(c)?; | ||
|
||
Ok(url) | ||
} | ||
} | ||
|
||
|
||
impl Future for SearchFuture { | ||
type Item=Gateway; | ||
type Error=SearchError; | ||
|
||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { | ||
|
||
// Poll for (and handle) incoming messages | ||
let mut buff = [0u8; MAX_RESPONSE_SIZE]; | ||
if let Async::Ready((n, from)) = self.socket.poll_recv_from(&mut buff)? { | ||
// Try handle response messages | ||
if let Ok((addr, path)) = Self::handle_broadcast_resp(from, &buff[0..n]) { | ||
if !self.pending.contains_key(&addr) { | ||
debug!("received broadcast response from: {}", from); | ||
|
||
// Issue control request | ||
let req = Self::request_control_url(addr, path)?; | ||
// Store pending requests | ||
self.pending.insert(addr, SearchState::Connecting(req)); | ||
} else { | ||
debug!("received duplicate broadcast response from: {}, dropping", from); | ||
} | ||
} | ||
} | ||
|
||
// Poll on any outstanding control requests | ||
for (addr, state) in &mut self.pending { | ||
// Poll if we're in the connecting state | ||
let resp = { | ||
let c = match state { | ||
SearchState::Connecting(c) => c, | ||
_ => continue, | ||
}; | ||
|
||
match c.poll()? { | ||
Async::Ready(resp) => resp, | ||
_ => continue, | ||
} | ||
}; | ||
|
||
// Handle any responses | ||
if let Ok(url) = Self::handle_control_resp(*addr, resp) { | ||
debug!("received control url from: {} (url: {})", addr, url); | ||
*state = SearchState::Done(url.clone()); | ||
|
||
match addr { | ||
SocketAddr::V4(a) => { | ||
let g = Gateway::new(*a, url); | ||
return Ok(Async::Ready(g)); | ||
} | ||
_ => warn!("unsupported IPv6 gateway response from addr: {}", addr), | ||
} | ||
|
||
} else { | ||
*state = SearchState::Error; | ||
} | ||
} | ||
|
||
Ok(Async::NotReady) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters