Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
Merge Notifier and TransactionsPoolNotifier (#10591)
Browse files Browse the repository at this point in the history
* Merge `Notifier` and `TransactionsPoolNotifier`

* fix tests
  • Loading branch information
IntegralTeam authored and seunlanlege committed Jun 4, 2019
1 parent 425dcd4 commit faf6f1f
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 139 deletions.
37 changes: 23 additions & 14 deletions ethcore/light/src/transaction_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,23 +129,22 @@ pub enum ImportDestination {
Future,
}

type Listener = Box<Fn(&[H256]) + Send + Sync>;

/// Light transaction queue. See module docs for more details.
#[derive(Default)]
pub struct TransactionQueue {
by_account: HashMap<Address, AccountTransactions>,
by_hash: H256FastMap<PendingTransaction>,
listeners: Vec<Listener>,
tx_statuses_listeners: Vec<mpsc::UnboundedSender<Arc<Vec<(H256, TxStatus)>>>>,
pending_listeners: Vec<mpsc::UnboundedSender<Arc<Vec<H256>>>>,
full_listeners: Vec<mpsc::UnboundedSender<Arc<Vec<(H256, TxStatus)>>>>,
}

impl fmt::Debug for TransactionQueue {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("TransactionQueue")
.field("by_account", &self.by_account)
.field("by_hash", &self.by_hash)
.field("listeners", &self.listeners.len())
.field("pending_listeners", &self.pending_listeners.len())
.field("full_listeners", &self.pending_listeners.len())
.finish()
}
}
Expand Down Expand Up @@ -360,30 +359,40 @@ impl TransactionQueue {
}

/// Add a transaction queue listener.
pub fn add_listener(&mut self, f: Listener) {
self.listeners.push(f);
pub fn pending_transactions_receiver(&mut self) -> mpsc::UnboundedReceiver<Arc<Vec<H256>>> {
let (sender, receiver) = mpsc::unbounded();
self.pending_listeners.push(sender);
receiver
}

/// Add a transaction queue listener.
pub fn tx_statuses_receiver(&mut self) -> mpsc::UnboundedReceiver<Arc<Vec<(H256, TxStatus)>>> {
pub fn full_transactions_receiver(&mut self) -> mpsc::UnboundedReceiver<Arc<Vec<(H256, TxStatus)>>> {
let (sender, receiver) = mpsc::unbounded();
self.tx_statuses_listeners.push(sender);
self.full_listeners.push(sender);
receiver
}

/// Notifies all listeners about new pending transaction.
fn notify(&mut self, hashes: &[H256], status: TxStatus) {
for listener in &self.listeners {
listener(hashes)
if status == TxStatus::Added {
let to_pending_send: Arc<Vec<H256>> = Arc::new(
hashes
.into_iter()
.map(|hash| hash.clone())
.collect()
);
self.pending_listeners.retain(|listener| listener.unbounded_send(to_pending_send.clone()).is_ok());

}

let to_send: Arc<Vec<(H256, TxStatus)>> = Arc::new(
let to_full_send: Arc<Vec<(H256, TxStatus)>> = Arc::new(
hashes
.into_iter()
.map(|hash| (hash.clone(), status)).collect()
.map(|hash| (hash.clone(), status))
.collect()
);

self.tx_statuses_listeners.retain(| listener| listener.unbounded_send(to_send.clone()).is_ok());
self.full_listeners.retain(|listener| listener.unbounded_send(to_full_send.clone()).is_ok());
}
}

Expand Down
12 changes: 7 additions & 5 deletions ethcore/src/miner/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,14 +260,16 @@ impl Miner {
}

/// Set a callback to be notified about imported transactions' hashes.
pub fn add_transactions_listener(&self, f: Box<Fn(&[H256]) + Send + Sync>) {
self.transaction_queue.add_listener(f);
pub fn pending_transactions_receiver(&self) -> mpsc::UnboundedReceiver<Arc<Vec<H256>>> {
let (sender, receiver) = mpsc::unbounded();
self.transaction_queue.add_pending_listener(sender);
receiver
}

/// Set a callback to be notified
pub fn tx_pool_receiver(&self) -> mpsc::UnboundedReceiver<Arc<Vec<(H256, TxStatus)>>> {
/// Set a callback to be notified about imported transactions' hashes.
pub fn full_transactions_receiver(&self) -> mpsc::UnboundedReceiver<Arc<Vec<(H256, TxStatus)>>> {
let (sender, receiver) = mpsc::unbounded();
self.transaction_queue.add_tx_pool_listener(sender);
self.transaction_queue.add_full_listener(sender);
receiver
}

Expand Down
100 changes: 37 additions & 63 deletions miner/src/pool/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,50 +26,6 @@ use txpool::{self, VerifiedTransaction};
use pool::VerifiedTransaction as Transaction;
use pool::TxStatus;

type Listener = Box<Fn(&[H256]) + Send + Sync>;

/// Manages notifications to pending transaction listeners.
#[derive(Default)]
pub struct Notifier {
listeners: Vec<Listener>,
pending: Vec<H256>,
}

impl fmt::Debug for Notifier {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Notifier")
.field("listeners", &self.listeners.len())
.field("pending", &self.pending)
.finish()
}
}

impl Notifier {
/// Add new listener to receive notifications.
pub fn add(&mut self, f: Listener) {
self.listeners.push(f)
}

/// Notify listeners about all currently pending transactions.
pub fn notify(&mut self) {
if self.pending.is_empty() {
return;
}

for l in &self.listeners {
(l)(&self.pending);
}

self.pending.clear();
}
}

impl txpool::Listener<Transaction> for Notifier {
fn added(&mut self, tx: &Arc<Transaction>, _old: Option<&Arc<Transaction>>) {
self.pending.push(*tx.hash());
}
}

/// Transaction pool logger.
#[derive(Default, Debug)]
pub struct Logger;
Expand Down Expand Up @@ -121,14 +77,20 @@ impl txpool::Listener<Transaction> for Logger {
/// Transactions pool notifier
#[derive(Default)]
pub struct TransactionsPoolNotifier {
listeners: Vec<mpsc::UnboundedSender<Arc<Vec<(H256, TxStatus)>>>>,
full_listeners: Vec<mpsc::UnboundedSender<Arc<Vec<(H256, TxStatus)>>>>,
pending_listeners: Vec<mpsc::UnboundedSender<Arc<Vec<H256>>>>,
tx_statuses: Vec<(H256, TxStatus)>,
}

impl TransactionsPoolNotifier {
/// Add new listener to receive notifications.
pub fn add(&mut self, f: mpsc::UnboundedSender<Arc<Vec<(H256, TxStatus)>>>) {
self.listeners.push(f);
/// Add new full listener to receive notifications.
pub fn add_full_listener(&mut self, f: mpsc::UnboundedSender<Arc<Vec<(H256, TxStatus)>>>) {
self.full_listeners.push(f);
}

/// Add new pending listener to receive notifications.
pub fn add_pending_listener(&mut self, f: mpsc::UnboundedSender<Arc<Vec<H256>>>) {
self.pending_listeners.push(f);
}

/// Notify listeners about all currently transactions.
Expand All @@ -137,16 +99,25 @@ impl TransactionsPoolNotifier {
return;
}

let to_send = Arc::new(std::mem::replace(&mut self.tx_statuses, Vec::new()));
self.listeners
.retain(|listener| listener.unbounded_send(to_send.clone()).is_ok());
let to_pending_send: Arc<Vec<H256>> = Arc::new(
self.tx_statuses.clone()
.into_iter()
.map(|(hash, _)| hash)
.collect()
);
self.pending_listeners.retain(|listener| listener.unbounded_send(to_pending_send.clone()).is_ok());

let to_full_send = Arc::new(std::mem::replace(&mut self.tx_statuses, Vec::new()));
self.full_listeners
.retain(|listener| listener.unbounded_send(to_full_send.clone()).is_ok());
}
}

impl fmt::Debug for TransactionsPoolNotifier {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("TransactionsPoolNotifier")
.field("listeners", &self.listeners.len())
.field("full_listeners", &self.full_listeners.len())
.field("pending_listeners", &self.pending_listeners.len())
.finish()
}
}
Expand Down Expand Up @@ -180,33 +151,36 @@ impl txpool::Listener<Transaction> for TransactionsPoolNotifier {
#[cfg(test)]
mod tests {
use super::*;
use parking_lot::Mutex;
use types::transaction;
use txpool::Listener;
use futures::{Stream, Future};
use ethereum_types::Address;

#[test]
fn should_notify_listeners() {
// given
let received = Arc::new(Mutex::new(vec![]));
let r = received.clone();
let listener = Box::new(move |hashes: &[H256]| {
*r.lock() = hashes.iter().map(|x| *x).collect();
});
let (full_sender, full_receiver) = mpsc::unbounded();
let (pending_sender, pending_receiver) = mpsc::unbounded();

let mut tx_listener = Notifier::default();
tx_listener.add(listener);
let mut tx_listener = TransactionsPoolNotifier::default();
tx_listener.add_full_listener(full_sender);
tx_listener.add_pending_listener(pending_sender);

// when
let tx = new_tx();
tx_listener.added(&tx, None);
assert_eq!(*received.lock(), vec![]);

// then
tx_listener.notify();
let (full_res , _full_receiver)= full_receiver.into_future().wait().unwrap();
let (pending_res , _pending_receiver)= pending_receiver.into_future().wait().unwrap();
assert_eq!(
full_res,
Some(Arc::new(vec![(serde_json::from_str::<H256>("\"0x13aff4201ac1dc49daf6a7cf07b558ed956511acbaabf9502bdacc353953766d\"").unwrap(), TxStatus::Added)]))
);
assert_eq!(
*received.lock(),
vec!["13aff4201ac1dc49daf6a7cf07b558ed956511acbaabf9502bdacc353953766d".parse().unwrap()]
pending_res,
Some(Arc::new(vec![serde_json::from_str::<H256>("\"0x13aff4201ac1dc49daf6a7cf07b558ed956511acbaabf9502bdacc353953766d\"").unwrap()]))
);
}

Expand Down
16 changes: 7 additions & 9 deletions miner/src/pool/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use pool::{
};
use pool::local_transactions::LocalTransactionsList;

type Listener = (LocalTransactionsList, (listener::Notifier, (listener::Logger, listener::TransactionsPoolNotifier)));
type Listener = (LocalTransactionsList, (listener::TransactionsPoolNotifier, listener::Logger));
type Pool = txpool::Pool<pool::VerifiedTransaction, scoring::NonceAndGasPrice, Listener>;

/// Max cache time in milliseconds for pending transactions.
Expand Down Expand Up @@ -305,8 +305,6 @@ impl TransactionQueue {
// Notify about imported transactions.
(self.pool.write().listener_mut().1).0.notify();

((self.pool.write().listener_mut().1).1).1.notify();

if results.iter().any(|r| r.is_ok()) {
self.cached_pending.write().clear();
}
Expand Down Expand Up @@ -499,7 +497,7 @@ impl TransactionQueue {
/// removes them from the pool.
/// That method should be used if invalid transactions are detected
/// or you want to cancel a transaction.
pub fn remove<'a, T: IntoIterator<Item = &'a H256>>(
pub fn remove<'a, T: IntoIterator<Item=&'a H256>>(
&self,
hashes: T,
is_invalid: bool,
Expand Down Expand Up @@ -571,16 +569,16 @@ impl TransactionQueue {
self.pool.read().listener().0.all_transactions().iter().map(|(a, b)| (*a, b.clone())).collect()
}

/// Add a callback to be notified about all transactions entering the pool.
pub fn add_listener(&self, f: Box<Fn(&[H256]) + Send + Sync>) {
/// Add a listener to be notified about all transactions the pool
pub fn add_pending_listener(&self, f: mpsc::UnboundedSender<Arc<Vec<H256>>>) {
let mut pool = self.pool.write();
(pool.listener_mut().1).0.add(f);
(pool.listener_mut().1).0.add_pending_listener(f);
}

/// Add a listener to be notified about all transactions the pool
pub fn add_tx_pool_listener(&self, f: mpsc::UnboundedSender<Arc<Vec<(H256, TxStatus)>>>) {
pub fn add_full_listener(&self, f: mpsc::UnboundedSender<Arc<Vec<(H256, TxStatus)>>>) {
let mut pool = self.pool.write();
((pool.listener_mut().1).1).1.add(f);
(pool.listener_mut().1).0.add_full_listener(f);
}

/// Check if pending set is cached.
Expand Down
26 changes: 7 additions & 19 deletions parity/rpc_apis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,9 @@ impl FullDependencies {
}
Api::EthPubSub => {
if !for_generic_pubsub {
let pool_receiver = self.miner.pending_transactions_receiver();
let mut client =
EthPubSubClient::new(self.client.clone(), self.executor.clone());
EthPubSubClient::new(self.client.clone(), self.executor.clone(), pool_receiver);
let weak_client = Arc::downgrade(&self.client);

client.add_sync_notifier(self.sync.sync_notification(), move |state| {
Expand All @@ -345,14 +346,6 @@ impl FullDependencies {
})
});

let h = client.handler();
self.miner
.add_transactions_listener(Box::new(move |hashes| {
if let Some(h) = h.upgrade() {
h.notify_new_transactions(hashes);
}
}));

if let Some(h) = client.handler().upgrade() {
self.client.add_notify(h);
}
Expand All @@ -361,7 +354,7 @@ impl FullDependencies {
}
Api::ParityTransactionsPool => {
if !for_generic_pubsub {
let receiver = self.miner.tx_pool_receiver();
let receiver = self.miner.full_transactions_receiver();
let client = TransactionsPoolClient::new(self.executor.clone(), receiver);
handler.extend_with(TransactionsPoolClient::to_delegate(client));
}
Expand Down Expand Up @@ -583,13 +576,16 @@ impl<C: LightChainClient + 'static> LightDependencies<C> {
}
}
Api::EthPubSub => {
let receiver = self.transaction_queue.write().pending_transactions_receiver();

let mut client = EthPubSubClient::light(
self.client.clone(),
self.on_demand.clone(),
self.sync.clone(),
self.cache.clone(),
self.executor.clone(),
self.gas_price_percentile,
receiver
);

let weak_client = Arc::downgrade(&self.client);
Expand All @@ -607,19 +603,11 @@ impl<C: LightChainClient + 'static> LightDependencies<C> {
});

self.client.add_listener(client.handler() as Weak<_>);
let h = client.handler();
self.transaction_queue
.write()
.add_listener(Box::new(move |transactions| {
if let Some(h) = h.upgrade() {
h.notify_new_transactions(transactions);
}
}));
handler.extend_with(EthPubSub::to_delegate(client));
}
Api::ParityTransactionsPool => {
if !for_generic_pubsub {
let receiver = self.transaction_queue.write().tx_statuses_receiver();
let receiver = self.transaction_queue.write().full_transactions_receiver();
let client = TransactionsPoolClient::new(self.executor.clone(), receiver);
handler.extend_with(TransactionsPoolClient::to_delegate(client));
}
Expand Down
Loading

0 comments on commit faf6f1f

Please sign in to comment.