diff --git a/ethcore/src/miner/stratum.rs b/ethcore/src/miner/stratum.rs index aad646753a3..f2623632fb8 100644 --- a/ethcore/src/miner/stratum.rs +++ b/ethcore/src/miner/stratum.rs @@ -225,8 +225,6 @@ impl NotifyWork for Stratum { self.service.push_work_all( self.dispatcher.payload(pow_hash, difficulty, number) - ).unwrap_or_else( - |e| warn!(target: "stratum", "Error while pushing work: {:?}", e) ); } } @@ -239,16 +237,13 @@ impl Stratum { let dispatcher = Arc::new(StratumJobDispatcher::new(miner, client)); - let stratum_svc = StratumService::start( + let service = StratumService::start( &SocketAddr::new(options.listen_addr.parse::()?, options.port), dispatcher.clone(), options.secret.clone(), )?; - Ok(Stratum { - dispatcher: dispatcher, - service: stratum_svc, - }) + Ok(Stratum { dispatcher, service }) } /// Start STRATUM job dispatcher and register it in the miner diff --git a/miner/stratum/src/lib.rs b/miner/stratum/src/lib.rs index 5ee0296dacf..a343486ccd4 100644 --- a/miner/stratum/src/lib.rs +++ b/miner/stratum/src/lib.rs @@ -76,7 +76,7 @@ impl Stratum { let implementation = Arc::new(StratumImpl { subscribers: RwLock::default(), - job_que: RwLock::default(), + job_queue: RwLock::default(), dispatcher, workers: Arc::new(RwLock::default()), secret, @@ -106,13 +106,9 @@ impl Stratum { } impl PushWorkHandler for Stratum { - fn push_work_all(&self, payload: String) -> Result<(), Error> { + fn push_work_all(&self, payload: String) { self.implementation.push_work_all(payload, &self.tcp_dispatcher) } - - fn push_work(&self, payloads: Vec) -> Result<(), Error> { - self.implementation.push_work(payloads, &self.tcp_dispatcher) - } } impl Drop for Stratum { @@ -126,14 +122,14 @@ struct StratumImpl { /// Subscribed clients subscribers: RwLock>, /// List of workers supposed to receive job update - job_que: RwLock>, + job_queue: RwLock>, /// Payload manager dispatcher: Arc, /// Authorized workers (socket - worker_id) workers: Arc>>, /// Secret if any secret: Option, - /// Dispatch notify couinter + /// Dispatch notify counter notify_counter: RwLock, } @@ -143,7 +139,7 @@ impl StratumImpl { use std::str::FromStr; self.subscribers.write().push(meta.addr().clone()); - self.job_que.write().insert(meta.addr().clone()); + self.job_queue.write().insert(meta.addr().clone()); trace!(target: "stratum", "Subscription request from {:?}", meta.addr()); Ok(match self.dispatcher.initial() { @@ -160,7 +156,7 @@ impl StratumImpl { /// rpc method `mining.authorize` fn authorize(&self, params: Params, meta: SocketMetadata) -> RpcResult { - params.parse::<(String, String)>().map(|(worker_id, secret)|{ + params.parse::<(String, String)>().map(|(worker_id, secret)| { if let Some(valid_secret) = self.secret { let hash = keccak(secret); if hash != valid_secret { @@ -184,15 +180,15 @@ impl StratumImpl { _ => None }) .collect::>()) { - Ok(()) => { - self.update_peers(&meta.tcp_dispatcher.expect("tcp_dispatcher is always initialized; qed")); - to_value(true) - }, - Err(submit_err) => { - warn!("Error while submitting share: {:?}", submit_err); - to_value(false) - } + Ok(()) => { + self.update_peers(&meta.tcp_dispatcher.expect("tcp_dispatcher is always initialized; qed")); + to_value(true) + }, + Err(submit_err) => { + warn!("Error while submitting share: {:?}", submit_err); + to_value(false) } + } }, _ => { trace!(target: "stratum", "Invalid submit work format {:?}", params); @@ -204,36 +200,37 @@ impl StratumImpl { /// Helper method fn update_peers(&self, tcp_dispatcher: &Dispatcher) { if let Some(job) = self.dispatcher.job() { - if let Err(e) = self.push_work_all(job, tcp_dispatcher) { - warn!("Failed to update some of the peers: {:?}", e); - } + self.push_work_all(job, tcp_dispatcher) } } - fn push_work_all(&self, payload: String, tcp_dispatcher: &Dispatcher) -> Result<(), Error> { + fn push_work_all(&self, payload: String, tcp_dispatcher: &Dispatcher) { let hup_peers = { let workers = self.workers.read(); let next_request_id = { let mut counter = self.notify_counter.write(); - if *counter == ::std::u32::MAX { *counter = NOTIFY_COUNTER_INITIAL; } - else { *counter = *counter + 1 } + if *counter == ::std::u32::MAX { + *counter = NOTIFY_COUNTER_INITIAL; + } else { + *counter = *counter + 1 + } *counter }; - let mut hup_peers = HashSet::with_capacity(0); // most of the cases won't be needed, hence avoid allocation + let mut hup_peers = HashSet::new(); let workers_msg = format!("{{ \"id\": {}, \"method\": \"mining.notify\", \"params\": {} }}", next_request_id, payload); trace!(target: "stratum", "pushing work for {} workers (payload: '{}')", workers.len(), &workers_msg); - for (ref addr, _) in workers.iter() { + for (addr, _) in workers.iter() { trace!(target: "stratum", "pusing work to {}", addr); match tcp_dispatcher.push_message(addr, workers_msg.clone()) { Err(PushMessageError::NoSuchPeer) => { - trace!(target: "stratum", "Worker no longer connected: {}", &addr); - hup_peers.insert(*addr.clone()); + trace!(target: "stratum", "Worker no longer connected: {}", addr); + hup_peers.insert(addr.clone()); }, Err(e) => { warn!(target: "stratum", "Unexpected transport error: {:?}", e); }, - Ok(_) => { }, + Ok(_) => {}, } } hup_peers @@ -241,33 +238,10 @@ impl StratumImpl { if !hup_peers.is_empty() { let mut workers = self.workers.write(); - for hup_peer in hup_peers { workers.remove(&hup_peer); } - } - - Ok(()) - } - - fn push_work(&self, payloads: Vec, tcp_dispatcher: &Dispatcher) -> Result<(), Error> { - if !payloads.len() > 0 { - return Err(Error::NoWork); - } - let workers = self.workers.read(); - let addrs = workers.keys().collect::>(); - if !workers.len() > 0 { - return Err(Error::NoWorkers); - } - let mut que = payloads; - let mut addr_index = 0; - while que.len() > 0 { - let next_worker = addrs[addr_index]; - let mut next_payload = que.drain(0..1); - tcp_dispatcher.push_message( - next_worker, - next_payload.nth(0).expect("drained successfully of 0..1, so 0-th element should exist") - )?; - addr_index = addr_index + 1; + for hup_peer in hup_peers { + workers.remove(&hup_peer); + } } - Ok(()) } } @@ -475,8 +449,7 @@ mod tests { .map_err(|err: timeout::Error<()>| panic!("Timeout: {:?}", err)) .and_then(move |stream| { trace!(target: "stratum", "Pusing work to peers"); - stratum.push_work_all(r#"{ "00040008", "100500" }"#.to_owned()) - .expect("Pushing work should produce no errors"); + stratum.push_work_all(r#"{ "00040008", "100500" }"#.to_owned()); Timeout::new(future::ok(stream), ::std::time::Duration::from_millis(100)) }) .map_err(|err: timeout::Error<()>| panic!("Timeout: {:?}", err)) diff --git a/miner/stratum/src/traits.rs b/miner/stratum/src/traits.rs index 36b95a0169e..d71af1feedc 100644 --- a/miner/stratum/src/traits.rs +++ b/miner/stratum/src/traits.rs @@ -55,10 +55,7 @@ pub trait JobDispatcher: Send + Sync { /// Interface that can handle requests to push job for workers pub trait PushWorkHandler: Send + Sync { /// push the same work package for all workers (`payload`: json of pow-specific set of work specification) - fn push_work_all(&self, payload: String) -> Result<(), Error>; - - /// push the work packages worker-wise (`payload`: json of pow-specific set of work specification) - fn push_work(&self, payloads: Vec) -> Result<(), Error>; + fn push_work_all(&self, payload: String); } pub struct ServiceConfiguration {