diff --git a/miner/src/pool/listener.rs b/miner/src/pool/listener.rs index 62e3cbcc965..9983059048c 100644 --- a/miner/src/pool/listener.rs +++ b/miner/src/pool/listener.rs @@ -151,33 +151,36 @@ impl txpool::Listener for TransactionsPoolNotifier { #[cfg(test)] mod tests { use super::*; - use parking_lot::Mutex; use types::transaction; use txpool::Listener; + use futures::{Stream, Future}; use ethereum_types::Address; #[test] fn should_notify_listeners() { // given - let received = Arc::new(Mutex::new(vec![])); - let r = received.clone(); - let listener = Box::new(move |hashes: &[H256]| { - *r.lock() = hashes.iter().map(|x| *x).collect(); - }); + let (full_sender, full_receiver) = mpsc::unbounded(); + let (pending_sender, pending_receiver) = mpsc::unbounded(); - let mut tx_listener = Notifier::default(); - tx_listener.add(listener); + let mut tx_listener = TransactionsPoolNotifier::default(); + tx_listener.add_full_listener(full_sender); + tx_listener.add_pending_listener(pending_sender); // when let tx = new_tx(); tx_listener.added(&tx, None); - assert_eq!(*received.lock(), vec![]); // then tx_listener.notify(); + let (full_res , _full_receiver)= full_receiver.into_future().wait().unwrap(); + let (pending_res , _pending_receiver)= pending_receiver.into_future().wait().unwrap(); assert_eq!( - *received.lock(), - vec!["13aff4201ac1dc49daf6a7cf07b558ed956511acbaabf9502bdacc353953766d".parse().unwrap()] + full_res, + Some(Arc::new(vec![(serde_json::from_str::("\"0x13aff4201ac1dc49daf6a7cf07b558ed956511acbaabf9502bdacc353953766d\"").unwrap(), TxStatus::Added)])) + ); + assert_eq!( + pending_res, + Some(Arc::new(vec![serde_json::from_str::("\"0x13aff4201ac1dc49daf6a7cf07b558ed956511acbaabf9502bdacc353953766d\"").unwrap()])) ); } diff --git a/rpc/src/v1/tests/mocked/eth_pubsub.rs b/rpc/src/v1/tests/mocked/eth_pubsub.rs index 071e0eaced3..1336f4e154e 100644 --- a/rpc/src/v1/tests/mocked/eth_pubsub.rs +++ b/rpc/src/v1/tests/mocked/eth_pubsub.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use jsonrpc_core::MetaIoHandler; -use jsonrpc_core::futures::{self, Stream, Future}; +use jsonrpc_core::futures::{self, Stream, Future, sync::mpsc}; use jsonrpc_pubsub::Session; use std::time::Duration; @@ -40,7 +40,9 @@ fn should_subscribe_to_new_heads() { let h2 = client.block_hash_delta_minus(2); let h1 = client.block_hash_delta_minus(3); - let pubsub = EthPubSubClient::new(Arc::new(client), el.executor()); + let (_, pool_receiver) = mpsc::unbounded(); + + let pubsub = EthPubSubClient::new(Arc::new(client), el.executor(), pool_receiver); let handler = pubsub.handler().upgrade().unwrap(); let pubsub = pubsub.to_delegate(); @@ -112,7 +114,9 @@ fn should_subscribe_to_logs() { } ]); - let pubsub = EthPubSubClient::new(Arc::new(client), el.executor()); + let (_, pool_receiver) = mpsc::unbounded(); + + let pubsub = EthPubSubClient::new(Arc::new(client), el.executor(), pool_receiver); let handler = pubsub.handler().upgrade().unwrap(); let pubsub = pubsub.to_delegate(); @@ -159,8 +163,9 @@ fn should_subscribe_to_pending_transactions() { let el = Runtime::with_thread_count(1); let client = TestBlockChainClient::new(); - let pubsub = EthPubSubClient::new(Arc::new(client), el.executor()); - let handler = pubsub.handler().upgrade().unwrap(); + let (pool_sender, pool_receiver) = mpsc::unbounded(); + + let pubsub = EthPubSubClient::new(Arc::new(client), el.executor(), pool_receiver); let pubsub = pubsub.to_delegate(); let mut io = MetaIoHandler::default(); @@ -181,7 +186,7 @@ fn should_subscribe_to_pending_transactions() { assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned())); // Send new transactions - handler.notify_new_transactions(&[H256::from_low_u64_be(5), H256::from_low_u64_be(7)]); + pool_sender.unbounded_send(Arc::new(vec![H256::from_low_u64_be(5), H256::from_low_u64_be(7)])).unwrap(); let (res, receiver) = receiver.into_future().wait().unwrap(); let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":"0x0000000000000000000000000000000000000000000000000000000000000005","subscription":"0x43ca64edf03768e1"}}"#; @@ -205,7 +210,8 @@ fn eth_subscribe_syncing() { // given let el = Runtime::with_thread_count(1); let client = TestBlockChainClient::new(); - let pubsub = EthPubSubClient::new(Arc::new(client), el.executor()); + let (_, pool_receiver) = mpsc::unbounded(); + let pubsub = EthPubSubClient::new(Arc::new(client), el.executor(), pool_receiver); let pubsub = pubsub.to_delegate(); let mut io = MetaIoHandler::default();