Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
IntegralTeam committed Jun 4, 2019
1 parent f31b4db commit 4142b65
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 18 deletions.
25 changes: 14 additions & 11 deletions miner/src/pool/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,33 +151,36 @@ impl txpool::Listener<Transaction> for TransactionsPoolNotifier {
#[cfg(test)]
mod tests {
use super::*;
use parking_lot::Mutex;
use types::transaction;
use txpool::Listener;
use futures::{Stream, Future};
use ethereum_types::Address;

#[test]
fn should_notify_listeners() {
// given
let received = Arc::new(Mutex::new(vec![]));
let r = received.clone();
let listener = Box::new(move |hashes: &[H256]| {
*r.lock() = hashes.iter().map(|x| *x).collect();
});
let (full_sender, full_receiver) = mpsc::unbounded();
let (pending_sender, pending_receiver) = mpsc::unbounded();

let mut tx_listener = Notifier::default();
tx_listener.add(listener);
let mut tx_listener = TransactionsPoolNotifier::default();
tx_listener.add_full_listener(full_sender);
tx_listener.add_pending_listener(pending_sender);

// when
let tx = new_tx();
tx_listener.added(&tx, None);
assert_eq!(*received.lock(), vec![]);

// then
tx_listener.notify();
let (full_res , _full_receiver)= full_receiver.into_future().wait().unwrap();
let (pending_res , _pending_receiver)= pending_receiver.into_future().wait().unwrap();
assert_eq!(
*received.lock(),
vec!["13aff4201ac1dc49daf6a7cf07b558ed956511acbaabf9502bdacc353953766d".parse().unwrap()]
full_res,
Some(Arc::new(vec![(serde_json::from_str::<H256>("\"0x13aff4201ac1dc49daf6a7cf07b558ed956511acbaabf9502bdacc353953766d\"").unwrap(), TxStatus::Added)]))
);
assert_eq!(
pending_res,
Some(Arc::new(vec![serde_json::from_str::<H256>("\"0x13aff4201ac1dc49daf6a7cf07b558ed956511acbaabf9502bdacc353953766d\"").unwrap()]))
);
}

Expand Down
20 changes: 13 additions & 7 deletions rpc/src/v1/tests/mocked/eth_pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
use std::sync::Arc;

use jsonrpc_core::MetaIoHandler;
use jsonrpc_core::futures::{self, Stream, Future};
use jsonrpc_core::futures::{self, Stream, Future, sync::mpsc};
use jsonrpc_pubsub::Session;

use std::time::Duration;
Expand All @@ -40,7 +40,9 @@ fn should_subscribe_to_new_heads() {
let h2 = client.block_hash_delta_minus(2);
let h1 = client.block_hash_delta_minus(3);

let pubsub = EthPubSubClient::new(Arc::new(client), el.executor());
let (_, pool_receiver) = mpsc::unbounded();

let pubsub = EthPubSubClient::new(Arc::new(client), el.executor(), pool_receiver);
let handler = pubsub.handler().upgrade().unwrap();
let pubsub = pubsub.to_delegate();

Expand Down Expand Up @@ -112,7 +114,9 @@ fn should_subscribe_to_logs() {
}
]);

let pubsub = EthPubSubClient::new(Arc::new(client), el.executor());
let (_, pool_receiver) = mpsc::unbounded();

let pubsub = EthPubSubClient::new(Arc::new(client), el.executor(), pool_receiver);
let handler = pubsub.handler().upgrade().unwrap();
let pubsub = pubsub.to_delegate();

Expand Down Expand Up @@ -159,8 +163,9 @@ fn should_subscribe_to_pending_transactions() {
let el = Runtime::with_thread_count(1);
let client = TestBlockChainClient::new();

let pubsub = EthPubSubClient::new(Arc::new(client), el.executor());
let handler = pubsub.handler().upgrade().unwrap();
let (pool_sender, pool_receiver) = mpsc::unbounded();

let pubsub = EthPubSubClient::new(Arc::new(client), el.executor(), pool_receiver);
let pubsub = pubsub.to_delegate();

let mut io = MetaIoHandler::default();
Expand All @@ -181,7 +186,7 @@ fn should_subscribe_to_pending_transactions() {
assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned()));

// Send new transactions
handler.notify_new_transactions(&[H256::from_low_u64_be(5), H256::from_low_u64_be(7)]);
pool_sender.unbounded_send(Arc::new(vec![H256::from_low_u64_be(5), H256::from_low_u64_be(7)])).unwrap();

let (res, receiver) = receiver.into_future().wait().unwrap();
let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":"0x0000000000000000000000000000000000000000000000000000000000000005","subscription":"0x43ca64edf03768e1"}}"#;
Expand All @@ -205,7 +210,8 @@ fn eth_subscribe_syncing() {
// given
let el = Runtime::with_thread_count(1);
let client = TestBlockChainClient::new();
let pubsub = EthPubSubClient::new(Arc::new(client), el.executor());
let (_, pool_receiver) = mpsc::unbounded();
let pubsub = EthPubSubClient::new(Arc::new(client), el.executor(), pool_receiver);
let pubsub = pubsub.to_delegate();

let mut io = MetaIoHandler::default();
Expand Down

0 comments on commit 4142b65

Please sign in to comment.