Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
parity-version: bump beta to 1.11.3 (#8806)
Browse files Browse the repository at this point in the history
* parity-version: bump beta to 1.11.3

* Disallow unsigned transactions in case EIP-86 is disabled (#8802)

* Disallow unsigned transactions in case EIP-86 is disabled

* Add tests for verification

* Add disallow unsigned transactions test in machine

* Fix ancient blocks queue deadlock (#8751)

* Revert "Fix not downloading old blocks (#8642)"

This reverts commit d193436.

* Make sure only one thread actually imports old blocks.

* Add some trace timers.

* Bring back pending hashes set.

* Separate locks so that queue can happen while we are importing.

* Address grumbles.
  • Loading branch information
5chdn committed Jun 5, 2018
1 parent c4c27cf commit a66e36b
Show file tree
Hide file tree
Showing 11 changed files with 129 additions and 59 deletions.
16 changes: 8 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
description = "Parity Ethereum client"
name = "parity"
# NOTE Make sure to update util/version/Cargo.toml as well
version = "1.11.2"
version = "1.11.3"
license = "GPL-3.0"
authors = ["Parity Technologies <admin@parity.io>"]

Expand Down
115 changes: 70 additions & 45 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::collections::{HashSet, HashMap, BTreeMap, BTreeSet, VecDeque};
use std::fmt;
use std::str::FromStr;
use std::sync::{Arc, Weak};
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
use std::time::{Instant};

// util
Expand Down Expand Up @@ -88,6 +88,8 @@ use_contract!(registry, "Registry", "res/contracts/registrar.json");

const MAX_TX_QUEUE_SIZE: usize = 4096;
const MAX_ANCIENT_BLOCKS_QUEUE_SIZE: usize = 4096;
// Max number of blocks imported at once.
const MAX_ANCIENT_BLOCKS_TO_IMPORT: usize = 4;
const MAX_QUEUE_SIZE_TO_SLEEP_ON: usize = 2;
const MIN_HISTORY_SIZE: u64 = 8;

Expand Down Expand Up @@ -208,8 +210,12 @@ pub struct Client {
queue_transactions: IoChannelQueue,
/// Ancient blocks import queue
queue_ancient_blocks: IoChannelQueue,
/// Hashes of pending ancient block wainting to be included
pending_ancient_blocks: RwLock<HashSet<H256>>,
/// Queued ancient blocks, make sure they are imported in order.
queued_ancient_blocks: Arc<RwLock<(
HashSet<H256>,
VecDeque<(Header, Bytes, Bytes)>
)>>,
ancient_blocks_import_lock: Arc<Mutex<()>>,
/// Consensus messages import queue
queue_consensus_message: IoChannelQueue,

Expand Down Expand Up @@ -463,7 +469,6 @@ impl Importer {
let hash = header.hash();
let _import_lock = self.import_lock.lock();

trace!(target: "client", "Trying to import old block #{}", header.number());
{
trace_time!("import_old_block");
// verify the block, passing the chain for updating the epoch verifier.
Expand Down Expand Up @@ -744,7 +749,8 @@ impl Client {
notify: RwLock::new(Vec::new()),
queue_transactions: IoChannelQueue::new(MAX_TX_QUEUE_SIZE),
queue_ancient_blocks: IoChannelQueue::new(MAX_ANCIENT_BLOCKS_QUEUE_SIZE),
pending_ancient_blocks: RwLock::new(HashSet::new()),
queued_ancient_blocks: Default::default(),
ancient_blocks_import_lock: Default::default(),
queue_consensus_message: IoChannelQueue::new(usize::max_value()),
last_hashes: RwLock::new(VecDeque::new()),
factories: factories,
Expand Down Expand Up @@ -1975,8 +1981,9 @@ impl BlockChainClient for Client {

impl IoClient for Client {
fn queue_transactions(&self, transactions: Vec<Bytes>, peer_id: usize) {
trace_time!("queue_transactions");
let len = transactions.len();
self.queue_transactions.queue(&mut self.io_channel.lock(), move |client| {
self.queue_transactions.queue(&mut self.io_channel.lock(), len, move |client| {
trace_time!("import_queued_transactions");

let txs: Vec<UnverifiedTransaction> = transactions
Expand All @@ -1995,6 +2002,7 @@ impl IoClient for Client {
}

fn queue_ancient_block(&self, block_bytes: Bytes, receipts_bytes: Bytes) -> Result<H256, BlockImportError> {
trace_time!("queue_ancient_block");
let header: Header = ::rlp::Rlp::new(&block_bytes).val_at(0)?;
let hash = header.hash();

Expand All @@ -2003,39 +2011,59 @@ impl IoClient for Client {
if self.chain.read().is_known(&hash) {
bail!(BlockImportErrorKind::Import(ImportErrorKind::AlreadyInChain));
}

let parent_hash = *header.parent_hash();
let parent_pending = self.pending_ancient_blocks.read().contains(&parent_hash);
let status = self.block_status(BlockId::Hash(parent_hash));
if !parent_pending && (status == BlockStatus::Unknown || status == BlockStatus::Pending) {
bail!(BlockImportErrorKind::Block(BlockError::UnknownParent(parent_hash)));
let parent_hash = header.parent_hash();
// NOTE To prevent race condition with import, make sure to check queued blocks first
// (and attempt to acquire lock)
let is_parent_pending = self.queued_ancient_blocks.read().0.contains(parent_hash);
if !is_parent_pending {
let status = self.block_status(BlockId::Hash(*parent_hash));
if status == BlockStatus::Unknown || status == BlockStatus::Pending {
bail!(BlockImportErrorKind::Block(BlockError::UnknownParent(*parent_hash)));
}
}
}

self.pending_ancient_blocks.write().insert(hash);

trace!(target: "client", "Queuing old block #{}", header.number());
match self.queue_ancient_blocks.queue(&mut self.io_channel.lock(), move |client| {
let result = client.importer.import_old_block(
&header,
&block_bytes,
&receipts_bytes,
&**client.db.read(),
&*client.chain.read()
);

client.pending_ancient_blocks.write().remove(&hash);
result.map(|_| ()).unwrap_or_else(|e| {
error!(target: "client", "Error importing ancient block: {}", e);
});
// we queue blocks here and trigger an IO message.
{
let mut queued = self.queued_ancient_blocks.write();
queued.0.insert(hash);
queued.1.push_back((header, block_bytes, receipts_bytes));
}

let queued = self.queued_ancient_blocks.clone();
let lock = self.ancient_blocks_import_lock.clone();
match self.queue_ancient_blocks.queue(&mut self.io_channel.lock(), 1, move |client| {
trace_time!("import_ancient_block");
// Make sure to hold the lock here to prevent importing out of order.
// We use separate lock, cause we don't want to block queueing.
let _lock = lock.lock();
for _i in 0..MAX_ANCIENT_BLOCKS_TO_IMPORT {
let first = queued.write().1.pop_front();
if let Some((header, block_bytes, receipts_bytes)) = first {
let hash = header.hash();
client.importer.import_old_block(
&header,
&block_bytes,
&receipts_bytes,
&**client.db.read(),
&*client.chain.read()
).ok().map_or((), |e| {
error!(target: "client", "Error importing ancient block: {}", e);
});
// remove from pending
queued.write().0.remove(&hash);
} else {
break;
}
}
}) {
Ok(_) => Ok(hash),
Err(e) => bail!(BlockImportErrorKind::Other(format!("{}", e))),
}
}

fn queue_consensus_message(&self, message: Bytes) {
match self.queue_consensus_message.queue(&mut self.io_channel.lock(), move |client| {
match self.queue_consensus_message.queue(&mut self.io_channel.lock(), 1, move |client| {
if let Err(e) = client.engine().handle_message(&message) {
debug!(target: "poa", "Invalid message received: {}", e);
}
Expand Down Expand Up @@ -2446,38 +2474,35 @@ impl fmt::Display for QueueError {

/// Queue some items to be processed by IO client.
struct IoChannelQueue {
queue: Arc<Mutex<VecDeque<Box<Fn(&Client) + Send>>>>,
currently_queued: Arc<AtomicUsize>,
limit: usize,
}

impl IoChannelQueue {
pub fn new(limit: usize) -> Self {
IoChannelQueue {
queue: Default::default(),
currently_queued: Default::default(),
limit,
}
}

pub fn queue<F>(&self, channel: &mut IoChannel<ClientIoMessage>, fun: F) -> Result<(), QueueError>
where F: Fn(&Client) + Send + Sync + 'static
pub fn queue<F>(&self, channel: &mut IoChannel<ClientIoMessage>, count: usize, fun: F) -> Result<(), QueueError> where
F: Fn(&Client) + Send + Sync + 'static,
{
{
let mut queue = self.queue.lock();
let queue_size = queue.len();
ensure!(queue_size < self.limit, QueueError::Full(self.limit));
let queue_size = self.currently_queued.load(AtomicOrdering::Relaxed);
ensure!(queue_size < self.limit, QueueError::Full(self.limit));

queue.push_back(Box::new(fun));
}

let queue = self.queue.clone();
let currently_queued = self.currently_queued.clone();
let result = channel.send(ClientIoMessage::execute(move |client| {
while let Some(fun) = queue.lock().pop_front() {
fun(client);
}
currently_queued.fetch_sub(count, AtomicOrdering::SeqCst);
fun(client);
}));

match result {
Ok(_) => Ok(()),
Ok(_) => {
self.currently_queued.fetch_add(count, AtomicOrdering::SeqCst);
Ok(())
},
Err(e) => Err(QueueError::Channel(e)),
}
}
Expand Down
3 changes: 3 additions & 0 deletions ethcore/src/ethereum/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ pub fn new_morden<'a, T: Into<SpecParams<'a>>>(params: T) -> Spec {
/// Create a new Foundation Frontier-era chain spec as though it never changes to Homestead.
pub fn new_frontier_test() -> Spec { load(None, include_bytes!("../../res/ethereum/frontier_test.json")) }

/// Create a new Ropsten chain spec.
pub fn new_ropsten_test() -> Spec { load(None, include_bytes!("../../res/ethereum/ropsten.json")) }

/// Create a new Foundation Homestead-era chain spec as though it never changed from Frontier.
pub fn new_homestead_test() -> Spec { load(None, include_bytes!("../../res/ethereum/homestead_test.json")) }

Expand Down
19 changes: 19 additions & 0 deletions ethcore/src/machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,25 @@ mod tests {
}
}

#[test]
fn should_disallow_unsigned_transactions() {
let rlp = "ea80843b9aca0083015f90948921ebb5f79e9e3920abe571004d0b1d5119c154865af3107a400080038080".into();
let transaction: UnverifiedTransaction = ::rlp::decode(&::rustc_hex::FromHex::from_hex(rlp).unwrap()).unwrap();
let spec = ::ethereum::new_ropsten_test();
let ethparams = get_default_ethash_extensions();

let machine = EthereumMachine::with_ethash_extensions(
spec.params().clone(),
Default::default(),
ethparams,
);
let mut header = ::header::Header::new();
header.set_number(15);

let res = machine.verify_transaction_basic(&transaction, &header);
assert_eq!(res, Err(transaction::Error::InvalidSignature("Crypto error (Invalid EC signature)".into())));
}

#[test]
fn ethash_gas_limit_is_multiple_of_determinant() {
use ethereum_types::U256;
Expand Down
19 changes: 19 additions & 0 deletions ethcore/src/verification/verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,17 @@ mod tests {
nonce: U256::from(2)
}.sign(keypair.secret(), None);

let tr3 = Transaction {
action: Action::Call(0x0.into()),
value: U256::from(0),
data: Bytes::new(),
gas: U256::from(30_000),
gas_price: U256::from(0),
nonce: U256::zero(),
}.null_sign(0);

let good_transactions = [ tr1.clone(), tr2.clone() ];
let eip86_transactions = [ tr3.clone() ];

let diff_inc = U256::from(0x40);

Expand Down Expand Up @@ -610,6 +620,7 @@ mod tests {
uncles_rlp.append_list(&good_uncles);
let good_uncles_hash = keccak(uncles_rlp.as_raw());
let good_transactions_root = ordered_trie_root(good_transactions.iter().map(|t| ::rlp::encode::<UnverifiedTransaction>(t)));
let eip86_transactions_root = ordered_trie_root(eip86_transactions.iter().map(|t| ::rlp::encode::<UnverifiedTransaction>(t)));

let mut parent = good.clone();
parent.set_number(9);
Expand All @@ -630,6 +641,14 @@ mod tests {

check_ok(basic_test(&create_test_block(&good), engine));

let mut bad_header = good.clone();
bad_header.set_transactions_root(eip86_transactions_root.clone());
bad_header.set_uncles_hash(good_uncles_hash.clone());
match basic_test(&create_test_block_with_data(&bad_header, &eip86_transactions, &good_uncles), engine) {
Err(Error(ErrorKind::Transaction(ref e), _)) if e == &::ethkey::Error::InvalidSignature.into() => (),
e => panic!("Block verification failed.\nExpected: Transaction Error (Invalid Signature)\nGot: {:?}", e),
}

let mut header = good.clone();
header.set_transactions_root(good_transactions_root.clone());
header.set_uncles_hash(good_uncles_hash.clone());
Expand Down
4 changes: 4 additions & 0 deletions ethcore/transaction/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,10 @@ impl UnverifiedTransaction {
if check_low_s && !(allow_empty_signature && self.is_unsigned()) {
self.check_low_s()?;
}
// Disallow unsigned transactions in case EIP-86 is disabled.
if !allow_empty_signature && self.is_unsigned() {
return Err(ethkey::Error::InvalidSignature.into());
}
// EIP-86: Transactions of this form MUST have gasprice = 0, nonce = 0, value = 0, and do NOT increment the nonce of account 0.
if allow_empty_signature && self.is_unsigned() && !(self.gas_price.is_zero() && self.value.is_zero() && self.nonce.is_zero()) {
return Err(ethkey::Error::InvalidSignature.into())
Expand Down
2 changes: 1 addition & 1 deletion mac/Parity.pkgproj
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@
<key>OVERWRITE_PERMISSIONS</key>
<false/>
<key>VERSION</key>
<string>1.11.2</string>
<string>1.11.3</string>
</dict>
<key>UUID</key>
<string>2DCD5B81-7BAF-4DA1-9251-6274B089FD36</string>
Expand Down
2 changes: 1 addition & 1 deletion miner/src/pool/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ impl TransactionQueue {
transactions: Vec<verifier::Transaction>,
) -> Vec<Result<(), transaction::Error>> {
// Run verification
let _timer = ::trace_time::PerfTimer::new("queue::verifyAndImport");
let _timer = ::trace_time::PerfTimer::new("pool::verify_and_import");
let options = self.options.read().clone();

let verifier = verifier::Verifier::new(client, options, self.insertion_id.clone());
Expand Down
Loading

0 comments on commit a66e36b

Please sign in to comment.