diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 90097a763e7..a4109ad8597 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -5,7 +5,7 @@ stages: - publish-onchain - optional -image: parity/rust-parity-ethereum-build:stretch +image: parity/rust:gitlab-ci variables: GIT_STRATEGY: fetch @@ -44,6 +44,7 @@ test-linux: RUN_TESTS: all script: - scripts/gitlab/test-all.sh + - sccache -s tags: - linux-docker @@ -102,9 +103,9 @@ publish-docker: - scripts/gitlab/publish-docker.sh parity publish-snap: - stage: publish + stage: optional #publish only: *releaseable_branches - image: parity/snapcraft:gitlab-ci + image: snapcore/snapcraft variables: BUILD_ARCH: amd64 cache: {} @@ -118,7 +119,7 @@ publish-snap: allow_failure: true <<: *collect_artifacts -publish:onnet:update: +publish-onnet-update: stage: publish-onchain only: *releaseable_branches cache: {} @@ -126,7 +127,7 @@ publish:onnet:update: - build-linux - build-darwin - build-windows - - publish:awss3:release + - publish-awss3-release before_script: *determine_version script: - scripts/gitlab/publish-onnet-update.sh @@ -134,7 +135,7 @@ publish:onnet:update: - linux-docker # configures aws for fast uploads/syncs -.s3_before_script: &s3_before_script +.s3-before-script: &s3-before-script before_script: - mkdir -p ${HOME}/.aws - | @@ -150,7 +151,7 @@ publish:onnet:update: addressing_style = path EOC -publish:awss3:release: +publish-awss3-release: image: parity/awscli:latest stage: publish only: *releaseable_branches @@ -161,7 +162,7 @@ publish:awss3:release: - build-windows variables: GIT_STRATEGY: none - <<: *s3_before_script + <<: *s3-before-script script: - echo "__________Push binaries to AWS S3____________" - case "${SCHEDULE_TAG:-${CI_COMMIT_REF_NAME}}" in @@ -174,7 +175,7 @@ publish:awss3:release: esac - aws s3 sync ./artifacts s3://${BUCKET}/${SCHEDULE_TAG:-${CI_COMMIT_REF_NAME}}/ after_script: - - aws s3 ls s3://${BUCKET}/latest/ + - aws s3 ls s3://${BUCKET}/${SCHEDULE_TAG:-${CI_COMMIT_REF_NAME}}/ --recursive --human-readable --summarize tags: - linux-docker diff --git a/Cargo.lock b/Cargo.lock index 263c71570f2..8d983ac853b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,3 +1,5 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. [[package]] name = "adler32" version = "1.0.3" @@ -681,6 +683,14 @@ dependencies = [ "heapsize 0.4.2 (git+https://github.com/cheme/heapsize.git?branch=ec-macfix)", ] +[[package]] +name = "enum_primitive" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "num-traits 0.1.43 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "env_logger" version = "0.5.13" @@ -800,6 +810,7 @@ dependencies = [ "ethjson 0.1.0", "ethkey 0.3.0", "evm 0.1.0", + "fetch 0.1.0", "hashdb 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "heapsize 0.4.2 (git+https://github.com/cheme/heapsize.git?branch=ec-macfix)", "itertools 0.5.10 (registry+https://github.com/rust-lang/crates.io-index)", @@ -822,6 +833,7 @@ dependencies = [ "parity-bytes 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "parity-crypto 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "parity-machine 0.1.0", + "parity-runtime 0.1.0", "parity-snappy 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "patricia-trie 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1063,6 +1075,7 @@ dependencies = [ "keccak-hash 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.48 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", + "lru-cache 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "mio 0.6.16 (registry+https://github.com/rust-lang/crates.io-index)", "parity-bytes 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "parity-crypto 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1199,6 +1212,7 @@ name = "ethcore-sync" version = "1.12.0" dependencies = [ "common-types 0.1.0", + "enum_primitive 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.5.13 (registry+https://github.com/rust-lang/crates.io-index)", "ethcore 1.12.0", "ethcore-io 1.12.0", @@ -4756,6 +4770,7 @@ dependencies = [ "checksum edit-distance 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3bd26878c3d921f89797a4e1a1711919f999a9f6946bb6f5a4ffda126d297b7e" "checksum either 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3be565ca5c557d7f59e7cfcf1844f9e3033650c929c6566f511e8005f205c1d0" "checksum elastic-array 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "88d4851b005ef16de812ea9acdb7bece2f0a40dd86c07b85631d7dafa54537bb" +"checksum enum_primitive 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "be4551092f4d519593039259a9ed8daedf0da12e5109c5280338073eaeb81180" "checksum env_logger 0.5.13 (registry+https://github.com/rust-lang/crates.io-index)" = "15b0a4d2e39f8420210be8b27eeda28029729e2fd4291019455016c348240c38" "checksum error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "07e791d3be96241c77c43846b665ef1384606da2cd2a48730abe606a12906e02" "checksum eth-secp256k1 0.5.7 (git+https://github.com/paritytech/rust-secp256k1)" = "" diff --git a/ethcore/Cargo.toml b/ethcore/Cargo.toml index b13a6a3284c..113147aa3fa 100644 --- a/ethcore/Cargo.toml +++ b/ethcore/Cargo.toml @@ -76,7 +76,9 @@ blooms-db = { path = "../util/blooms-db" } criterion = "0.2" env_logger = "0.5" ethcore-accounts = { path = "../accounts" } +fetch = { path = "../util/fetch" } kvdb-rocksdb = "0.1.3" +parity-runtime = { path = "../util/runtime" } rlp_compress = { path = "../util/rlp-compress" } tempdir = "0.3" trie-standardmap = "0.1" diff --git a/ethcore/evm/src/interpreter/mod.rs b/ethcore/evm/src/interpreter/mod.rs index bd47c7ed713..d699e61cbec 100644 --- a/ethcore/evm/src/interpreter/mod.rs +++ b/ethcore/evm/src/interpreter/mod.rs @@ -571,10 +571,10 @@ impl Interpreter { let out_size = self.stack.pop_back(); // Add stipend (only CALL|CALLCODE when value > 0) - let call_gas = call_gas + value.map_or_else(|| Cost::from(0), |val| match val.is_zero() { + let call_gas = call_gas.overflow_add(value.map_or_else(|| Cost::from(0), |val| match val.is_zero() { false => Cost::from(ext.schedule().call_stipend), true => Cost::from(0), - }); + })).0; // Get sender & receive addresses, check if we have balance let (sender_address, receive_address, has_balance, call_type) = match instruction { diff --git a/ethcore/light/src/transaction_queue.rs b/ethcore/light/src/transaction_queue.rs index 53359368da9..65e646d8466 100644 --- a/ethcore/light/src/transaction_queue.rs +++ b/ethcore/light/src/transaction_queue.rs @@ -95,7 +95,7 @@ impl AccountTransactions { } fn next_nonce(&self) -> U256 { - self.current.last().map(|last| last.nonce + 1) + self.current.last().map(|last| last.nonce.saturating_add(1.into())) .unwrap_or_else(|| *self.cur_nonce.value()) } @@ -107,7 +107,7 @@ impl AccountTransactions { while let Some(tx) = self.future.remove(&next_nonce) { promoted.push(tx.hash); self.current.push(tx); - next_nonce = next_nonce + 1; + next_nonce = next_nonce.saturating_add(1.into()); } promoted diff --git a/ethcore/private-tx/src/error.rs b/ethcore/private-tx/src/error.rs index 325b561b76b..b3465883f4e 100644 --- a/ethcore/private-tx/src/error.rs +++ b/ethcore/private-tx/src/error.rs @@ -76,7 +76,7 @@ error_chain! { } #[doc = "Wrong private transaction type."] - BadTransactonType { + BadTransactionType { description("Wrong private transaction type."), display("Wrong private transaction type"), } diff --git a/ethcore/private-tx/src/lib.rs b/ethcore/private-tx/src/lib.rs index 71a97d6a041..f536e22fbd8 100644 --- a/ethcore/private-tx/src/lib.rs +++ b/ethcore/private-tx/src/lib.rs @@ -241,7 +241,7 @@ impl Provider { bail!(ErrorKind::SignerAccountNotSet); } let tx_hash = signed_transaction.hash(); - let contract = Self::contract_address_from_transaction(&signed_transaction).map_err(|_| ErrorKind::BadTransactonType)?; + let contract = Self::contract_address_from_transaction(&signed_transaction).map_err(|_| ErrorKind::BadTransactionType)?; let data = signed_transaction.rlp_bytes(); let encrypted_transaction = self.encrypt(&contract, &Self::iv_from_transaction(&signed_transaction), &data)?; let private = PrivateTransaction::new(encrypted_transaction, contract); @@ -415,7 +415,7 @@ impl Provider { Action::Call(contract) => Ok(contract), _ => { warn!(target: "privatetx", "Incorrect type of action for the transaction"); - bail!(ErrorKind::BadTransactonType); + bail!(ErrorKind::BadTransactionType); } } } @@ -610,7 +610,7 @@ impl Provider { /// Create encrypted public contract deployment transaction. pub fn public_creation_transaction(&self, block: BlockId, source: &SignedTransaction, validators: &[Address], gas_price: U256) -> Result<(Transaction, Address), Error> { if let Action::Call(_) = source.action { - bail!(ErrorKind::BadTransactonType); + bail!(ErrorKind::BadTransactionType); } let sender = source.sender(); let state = self.client.state_at(block).ok_or(ErrorKind::StatePruned)?; @@ -649,7 +649,7 @@ impl Provider { /// Create encrypted public contract deployment transaction. Returns updated encrypted state. pub fn execute_private_transaction(&self, block: BlockId, source: &SignedTransaction) -> Result { if let Action::Create = source.action { - bail!(ErrorKind::BadTransactonType); + bail!(ErrorKind::BadTransactionType); } let result = self.execute_private(source, TransactOptions::with_no_tracing(), block)?; Ok(result.state) diff --git a/ethcore/res/ethereum/st_peters_test.json b/ethcore/res/ethereum/st_peters_test.json new file mode 100644 index 00000000000..ee88008f668 --- /dev/null +++ b/ethcore/res/ethereum/st_peters_test.json @@ -0,0 +1,65 @@ +{ + "name": "St. Peter's (test)", + "engine": { + "Ethash": { + "params": { + "minimumDifficulty": "0x020000", + "difficultyBoundDivisor": "0x0800", + "durationLimit": "0x0d", + "blockReward": "0x1BC16D674EC80000", + "homesteadTransition": "0x0", + "eip100bTransition": "0x0", + "difficultyBombDelays": { + "0": 5000000 + } + } + } + }, + "params": { + "gasLimitBoundDivisor": "0x0400", + "registrar" : "0xc6d9d2cd449a754c494264e1809c50e34d64562b", + "accountStartNonce": "0x00", + "maximumExtraDataSize": "0x20", + "minGasLimit": "0x1388", + "networkID" : "0x1", + "maxCodeSize": 24576, + "maxCodeSizeTransition": "0x0", + "eip150Transition": "0x0", + "eip160Transition": "0x0", + "eip161abcTransition": "0x0", + "eip161dTransition": "0x0", + "eip140Transition": "0x0", + "eip211Transition": "0x0", + "eip214Transition": "0x0", + "eip155Transition": "0x0", + "eip658Transition": "0x0", + "eip145Transition": "0x0", + "eip1014Transition": "0x0", + "eip1052Transition": "0x0", + "eip1283DisableTransition": "0x0" + }, + "genesis": { + "seal": { + "ethereum": { + "nonce": "0x0000000000000042", + "mixHash": "0x0000000000000000000000000000000000000000000000000000000000000000" + } + }, + "difficulty": "0x400000000", + "author": "0x0000000000000000000000000000000000000000", + "timestamp": "0x00", + "parentHash": "0x0000000000000000000000000000000000000000000000000000000000000000", + "extraData": "0x11bbe8db4e347b4e8c937c1c8370e4b5ed33adb3db69cbdb7a38e1e50b1b82fa", + "gasLimit": "0x1388" + }, + "accounts": { + "0000000000000000000000000000000000000001": { "balance": "1", "builtin": { "name": "ecrecover", "pricing": { "linear": { "base": 3000, "word": 0 } } } }, + "0000000000000000000000000000000000000002": { "balance": "1", "builtin": { "name": "sha256", "pricing": { "linear": { "base": 60, "word": 12 } } } }, + "0000000000000000000000000000000000000003": { "balance": "1", "builtin": { "name": "ripemd160", "pricing": { "linear": { "base": 600, "word": 120 } } } }, + "0000000000000000000000000000000000000004": { "balance": "1", "builtin": { "name": "identity", "pricing": { "linear": { "base": 15, "word": 3 } } } }, + "0000000000000000000000000000000000000005": { "builtin": { "name": "modexp", "activate_at": "0x00", "pricing": { "modexp": { "divisor": 20 } } } }, + "0000000000000000000000000000000000000006": { "builtin": { "name": "alt_bn128_add", "activate_at": "0x00", "pricing": { "linear": { "base": 500, "word": 0 } } } }, + "0000000000000000000000000000000000000007": { "builtin": { "name": "alt_bn128_mul", "activate_at": "0x00", "pricing": { "linear": { "base": 40000, "word": 0 } } } }, + "0000000000000000000000000000000000000008": { "builtin": { "name": "alt_bn128_pairing", "activate_at": "0x00", "pricing": { "alt_bn128_pairing": { "base": 100000, "pair": 80000 } } } } + } +} diff --git a/ethcore/res/ethereum/tests b/ethcore/res/ethereum/tests index 2cd62aeec11..725dbc73a54 160000 --- a/ethcore/res/ethereum/tests +++ b/ethcore/res/ethereum/tests @@ -1 +1 @@ -Subproject commit 2cd62aeec11da29766b30d500f2b9a96f1f28cf0 +Subproject commit 725dbc73a54649e22a00330bd0f4d6699a5060e5 diff --git a/ethcore/src/client/evm_test_client.rs b/ethcore/src/client/evm_test_client.rs index 7a2158209b5..a65e2b313ec 100644 --- a/ethcore/src/client/evm_test_client.rs +++ b/ethcore/src/client/evm_test_client.rs @@ -93,6 +93,7 @@ impl<'a> EvmTestClient<'a> { ForkSpec::EIP158 => Some(ethereum::new_eip161_test()), ForkSpec::Byzantium => Some(ethereum::new_byzantium_test()), ForkSpec::Constantinople => Some(ethereum::new_constantinople_test()), + ForkSpec::ConstantinopleFix => Some(ethereum::new_constantinople_fix_test()), ForkSpec::EIP158ToByzantiumAt5 => Some(ethereum::new_transition_test()), ForkSpec::FrontierToHomesteadAt5 | ForkSpec::HomesteadToDaoAt5 | ForkSpec::HomesteadToEIP150At5 => None, } diff --git a/ethcore/src/error.rs b/ethcore/src/error.rs index 2984dcdea88..32cfe057a0a 100644 --- a/ethcore/src/error.rs +++ b/ethcore/src/error.rs @@ -89,6 +89,8 @@ pub enum BlockError { InvalidNumber(Mismatch), /// Block number isn't sensible. RidiculousNumber(OutOfBounds), + /// Timestamp header overflowed + TimestampOverflow, /// Too many transactions from a particular address. TooManyTransactions(Address), /// Parent given is unknown. @@ -138,6 +140,7 @@ impl fmt::Display for BlockError { UnknownParent(ref hash) => format!("Unknown parent: {}", hash), UnknownUncleParent(ref hash) => format!("Unknown uncle parent: {}", hash), UnknownEpochTransition(ref num) => format!("Unknown transition to epoch number: {}", num), + TimestampOverflow => format!("Timestamp overflow"), TooManyTransactions(ref address) => format!("Too many transactions from: {}", address), }; diff --git a/ethcore/src/ethereum/mod.rs b/ethcore/src/ethereum/mod.rs index 104155532cb..b7c60789a3d 100644 --- a/ethcore/src/ethereum/mod.rs +++ b/ethcore/src/ethereum/mod.rs @@ -148,6 +148,9 @@ pub fn new_byzantium_test() -> Spec { load(None, include_bytes!("../../res/ether /// Create a new Foundation Constantinople era spec. pub fn new_constantinople_test() -> Spec { load(None, include_bytes!("../../res/ethereum/constantinople_test.json")) } +/// Create a new Foundation St. Peter's (Contantinople Fix) era spec. +pub fn new_constantinople_fix_test() -> Spec { load(None, include_bytes!("../../res/ethereum/st_peters_test.json")) } + /// Create a new Musicoin-MCIP3-era spec. pub fn new_mcip3_test() -> Spec { load(None, include_bytes!("../../res/ethereum/mcip3_test.json")) } @@ -168,6 +171,9 @@ pub fn new_byzantium_test_machine() -> EthereumMachine { load_machine(include_by /// Create a new Foundation Constantinople era spec. pub fn new_constantinople_test_machine() -> EthereumMachine { load_machine(include_bytes!("../../res/ethereum/constantinople_test.json")) } +/// Create a new Foundation St. Peter's (Contantinople Fix) era spec. +pub fn new_constantinople_fix_test_machine() -> EthereumMachine { load_machine(include_bytes!("../../res/ethereum/st_peters_test.json")) } + /// Create a new Musicoin-MCIP3-era spec. pub fn new_mcip3_test_machine() -> EthereumMachine { load_machine(include_bytes!("../../res/ethereum/mcip3_test.json")) } diff --git a/ethcore/src/executive.rs b/ethcore/src/executive.rs index 97758994a17..68d4edc7948 100644 --- a/ethcore/src/executive.rs +++ b/ethcore/src/executive.rs @@ -312,7 +312,7 @@ impl<'a> CallCreateExecutive<'a> { let prev_bal = state.balance(¶ms.address)?; if let ActionValue::Transfer(val) = params.value { state.sub_balance(¶ms.sender, &val, &mut substate.to_cleanup_mode(&schedule))?; - state.new_contract(¶ms.address, val + prev_bal, nonce_offset)?; + state.new_contract(¶ms.address, val.saturating_add(prev_bal), nonce_offset)?; } else { state.new_contract(¶ms.address, prev_bal, nonce_offset)?; } @@ -1103,9 +1103,13 @@ impl<'a, B: 'a + StateBackend> Executive<'a, B> { let refunded = cmp::min(refunds_bound, (t.gas - gas_left_prerefund) >> 1); let gas_left = gas_left_prerefund + refunded; - let gas_used = t.gas - gas_left; - let refund_value = gas_left * t.gas_price; - let fees_value = gas_used * t.gas_price; + let gas_used = t.gas.saturating_sub(gas_left); + let (refund_value, overflow_1) = gas_left.overflowing_mul(t.gas_price); + let (fees_value, overflow_2) = gas_used.overflowing_mul(t.gas_price); + if overflow_1 || overflow_2 { + return Err(ExecutionError::TransactionMalformed("U256 Overflow".to_string())); + } + trace!("exec::finalize: t.gas={}, sstore_refunds={}, suicide_refunds={}, refunds_bound={}, gas_left_prerefund={}, refunded={}, gas_left={}, gas_used={}, refund_value={}, fees_value={}\n", t.gas, sstore_refunds, suicide_refunds, refunds_bound, gas_left_prerefund, refunded, gas_left, gas_used, refund_value, fees_value); @@ -1123,7 +1127,7 @@ impl<'a, B: 'a + StateBackend> Executive<'a, B> { } // perform garbage-collection - let min_balance = if schedule.kill_dust != CleanDustMode::Off { Some(U256::from(schedule.tx_gas) * t.gas_price) } else { None }; + let min_balance = if schedule.kill_dust != CleanDustMode::Off { Some(U256::from(schedule.tx_gas).overflowing_mul(t.gas_price).0) } else { None }; self.state.kill_garbage(&substate.touched, schedule.kill_empty, &min_balance, schedule.kill_dust == CleanDustMode::WithCodeAndStorage)?; match result { diff --git a/ethcore/src/json_tests/state.rs b/ethcore/src/json_tests/state.rs index 45d9c035f69..33fc6d522d5 100644 --- a/ethcore/src/json_tests/state.rs +++ b/ethcore/src/json_tests/state.rs @@ -165,6 +165,7 @@ mod state_tests { declare_test!{GeneralStateTest_stRefundTest, "GeneralStateTests/stRefundTest/"} declare_test!{GeneralStateTest_stReturnDataTest, "GeneralStateTests/stReturnDataTest/"} declare_test!{GeneralStateTest_stRevertTest, "GeneralStateTests/stRevertTest/"} + declare_test!{GeneralStateTest_stSStoreTest, "GeneralStateTests/stSStoreTest/"} declare_test!{GeneralStateTest_stShift, "GeneralStateTests/stShift/"} declare_test!{GeneralStateTest_stSolidityTest, "GeneralStateTests/stSolidityTest/"} declare_test!{GeneralStateTest_stSpecialTest, "GeneralStateTests/stSpecialTest/"} @@ -177,7 +178,6 @@ mod state_tests { declare_test!{GeneralStateTest_stZeroCallsRevert, "GeneralStateTests/stZeroCallsRevert/"} declare_test!{GeneralStateTest_stZeroCallsTest, "GeneralStateTests/stZeroCallsTest/"} declare_test!{GeneralStateTest_stZeroKnowledge, "GeneralStateTests/stZeroKnowledge/"} - declare_test!{GeneralStateTest_stSStoreTest, "GeneralStateTests/stSStoreTest/"} // Attempts to send a transaction that requires more than current balance: // Tx: diff --git a/ethcore/src/lib.rs b/ethcore/src/lib.rs index 99ee8e668c0..a868b5c132a 100644 --- a/ethcore/src/lib.rs +++ b/ethcore/src/lib.rs @@ -143,6 +143,12 @@ extern crate serde_derive; #[cfg_attr(test, macro_use)] extern crate evm; +#[cfg(all(test, feature = "price-info"))] +extern crate fetch; + +#[cfg(all(test, feature = "price-info"))] +extern crate parity_runtime; + pub mod block; pub mod builtin; pub mod client; diff --git a/ethcore/src/miner/miner.rs b/ethcore/src/miner/miner.rs index 82e0e4bb0be..7d6bcbe496d 100644 --- a/ethcore/src/miner/miner.rs +++ b/ethcore/src/miner/miner.rs @@ -873,6 +873,32 @@ impl miner::MinerService for Miner { self.params.read().gas_range_target.0 / 5 } + fn set_minimal_gas_price(&self, new_price: U256) -> Result { + match *self.gas_pricer.lock() { + // Binding the gas pricer to `gp` here to prevent + // a deadlock when calling recalibrate() + ref mut gp @ GasPricer::Fixed(_) => { + trace!(target: "miner", "minimal_gas_price: recalibrating fixed..."); + *gp = GasPricer::new_fixed(new_price); + + let txq = self.transaction_queue.clone(); + let mut options = self.options.pool_verification_options.clone(); + gp.recalibrate(move |gas_price| { + debug!(target: "miner", "minimal_gas_price: Got gas price! {}", gas_price); + options.minimal_gas_price = gas_price; + txq.set_verifier_options(options); + }); + + Ok(true) + }, + #[cfg(feature = "price-info")] + GasPricer::Calibrated(_) => { + let error_msg = "Can't update fixed gas price while automatic gas calibration is enabled."; + return Err(error_msg); + }, + } + } + fn import_external_transactions( &self, chain: &C, @@ -1654,4 +1680,60 @@ mod tests { assert!(miner.is_currently_sealing()); } + + #[test] + fn should_set_new_minimum_gas_price() { + // Creates a new GasPricer::Fixed behind the scenes + let miner = Miner::new_for_tests(&Spec::new_test(), None); + + let expected_minimum_gas_price: U256 = 0x1337.into(); + miner.set_minimal_gas_price(expected_minimum_gas_price).unwrap(); + + let txq_options = miner.transaction_queue.status().options; + let current_minimum_gas_price = txq_options.minimal_gas_price; + + assert!(current_minimum_gas_price == expected_minimum_gas_price); + } + + #[cfg(feature = "price-info")] + fn dynamic_gas_pricer() -> GasPricer { + use std::time::Duration; + use parity_runtime::Executor; + use fetch::Client as FetchClient; + use ethcore_miner::gas_price_calibrator::{GasPriceCalibrator, GasPriceCalibratorOptions}; + + // Don't really care about any of these settings since + // the gas pricer is never actually going to be used + let fetch = FetchClient::new(1).unwrap(); + let p = Executor::new_sync(); + + GasPricer::new_calibrated( + GasPriceCalibrator::new( + GasPriceCalibratorOptions { + usd_per_tx: 0.0, + recalibration_period: Duration::from_secs(0), + }, + fetch, + p, + ) + ) + } + + #[test] + #[cfg(feature = "price-info")] + fn should_fail_to_set_new_minimum_gas_price() { + // We get a fixed gas pricer by default, need to change that + let miner = Miner::new_for_tests(&Spec::new_test(), None); + let calibrated_gas_pricer = dynamic_gas_pricer(); + *miner.gas_pricer.lock() = calibrated_gas_pricer; + + let expected_minimum_gas_price: U256 = 0x1337.into(); + let result = miner.set_minimal_gas_price(expected_minimum_gas_price); + assert!(result.is_err()); + + let received_error_msg = result.unwrap_err(); + let expected_error_msg = "Can't update fixed gas price while automatic gas calibration is enabled."; + + assert!(received_error_msg == expected_error_msg); + } } diff --git a/ethcore/src/miner/mod.rs b/ethcore/src/miner/mod.rs index 5156b604010..fd7ab96513c 100644 --- a/ethcore/src/miner/mod.rs +++ b/ethcore/src/miner/mod.rs @@ -205,4 +205,8 @@ pub trait MinerService : Send + Sync { /// Suggested gas limit. fn sensible_gas_limit(&self) -> U256; + + /// Set a new minimum gas limit. + /// Will not work if dynamic gas calibration is set. + fn set_minimal_gas_price(&self, gas_price: U256) -> Result; } diff --git a/ethcore/src/state/account.rs b/ethcore/src/state/account.rs index e4553b906f3..483df7ba193 100644 --- a/ethcore/src/state/account.rs +++ b/ethcore/src/state/account.rs @@ -466,12 +466,12 @@ impl Account { /// Increment the nonce of the account by one. pub fn inc_nonce(&mut self) { - self.nonce = self.nonce + U256::from(1u8); + self.nonce = self.nonce.saturating_add(U256::from(1u8)); } /// Increase account balance. pub fn add_balance(&mut self, x: &U256) { - self.balance = self.balance + *x; + self.balance = self.balance.saturating_add(*x); } /// Decrease account balance. diff --git a/ethcore/src/state/mod.rs b/ethcore/src/state/mod.rs index 4d24a1b1573..8cc06fa83cc 100644 --- a/ethcore/src/state/mod.rs +++ b/ethcore/src/state/mod.rs @@ -500,7 +500,12 @@ impl State { /// it will have its code reset, ready for `init_code()`. pub fn new_contract(&mut self, contract: &Address, balance: U256, nonce_offset: U256) -> TrieResult<()> { let original_storage_root = self.original_storage_root(contract)?; - self.insert_cache(contract, AccountEntry::new_dirty(Some(Account::new_contract(balance, self.account_start_nonce + nonce_offset, original_storage_root)))); + let (nonce, overflow) = self.account_start_nonce.overflowing_add(nonce_offset); + if overflow { + return Err(Box::new(TrieError::DecoderError(H256::from(contract), + rlp::DecoderError::Custom("Nonce overflow".into())))); + } + self.insert_cache(contract, AccountEntry::new_dirty(Some(Account::new_contract(balance, nonce, original_storage_root)))); Ok(()) } diff --git a/ethcore/src/verification/verification.rs b/ethcore/src/verification/verification.rs index 827b7143913..3f5008a2b86 100644 --- a/ethcore/src/verification/verification.rs +++ b/ethcore/src/verification/verification.rs @@ -40,6 +40,25 @@ use types::{BlockNumber, header::Header}; use types::transaction::SignedTransaction; use verification::queue::kind::blocks::Unverified; + +/// Returns `Ok` when the result less or equal to `i32::max_value` to prevent `SystemTime` to panic because +/// it is platform specific, may be i32 or i64. +/// +/// `Err Result { + let d1 = sys.duration_since(UNIX_EPOCH).map_err(|_| BlockError::TimestampOverflow)?; + let total_time = d1.checked_add(d2).ok_or(BlockError::TimestampOverflow)?; + + if total_time.as_secs() <= i32::max_value() as u64 { + Ok(sys + d2) + } else { + Err(BlockError::TimestampOverflow) + } +} + /// Preprocessed block data gathered in `verify_block_unordered` call pub struct PreverifiedBlock { /// Populated block header @@ -306,7 +325,7 @@ pub fn verify_header_params(header: &Header, engine: &EthEngine, is_full: bool, const ACCEPTABLE_DRIFT: Duration = Duration::from_secs(15); let max_time = SystemTime::now() + ACCEPTABLE_DRIFT; let invalid_threshold = max_time + ACCEPTABLE_DRIFT * 9; - let timestamp = UNIX_EPOCH + Duration::from_secs(header.timestamp()); + let timestamp = timestamp_checked_add(UNIX_EPOCH, Duration::from_secs(header.timestamp()))?; if timestamp > invalid_threshold { return Err(From::from(BlockError::InvalidTimestamp(OutOfBounds { max: Some(max_time), min: None, found: timestamp }))) @@ -328,8 +347,8 @@ fn verify_parent(header: &Header, parent: &Header, engine: &EthEngine) -> Result let gas_limit_divisor = engine.params().gas_limit_bound_divisor; if !engine.is_timestamp_valid(header.timestamp(), parent.timestamp()) { - let min = SystemTime::now() + Duration::from_secs(parent.timestamp() + 1); - let found = SystemTime::now() + Duration::from_secs(header.timestamp()); + let min = timestamp_checked_add(SystemTime::now(), Duration::from_secs(parent.timestamp().saturating_add(1)))?; + let found = timestamp_checked_add(SystemTime::now(), Duration::from_secs(header.timestamp()))?; return Err(From::from(BlockError::InvalidTimestamp(OutOfBounds { max: None, min: Some(min), found }))) } if header.number() != parent.number() + 1 { @@ -743,7 +762,8 @@ mod tests { check_fail_timestamp(family_test(&create_test_block_with_data(&header, &good_transactions, &good_uncles), engine, &bc), false); header = good.clone(); - header.set_timestamp(2450000000); + // will return `BlockError::TimestampOverflow` when timestamp > `i32::max_value()` + header.set_timestamp(i32::max_value() as u64); check_fail_timestamp(basic_test(&create_test_block_with_data(&header, &good_transactions, &good_uncles), engine), false); header = good.clone(); @@ -815,4 +835,11 @@ mod tests { check_fail(unordered_test(&create_test_block_with_data(&header, &bad_transactions, &[]), &engine), TooManyTransactions(keypair.address())); unordered_test(&create_test_block_with_data(&header, &good_transactions, &[]), &engine).unwrap(); } + + #[test] + fn checked_add_systime_dur() { + assert!(timestamp_checked_add(UNIX_EPOCH, Duration::new(i32::max_value() as u64 + 1, 0)).is_err()); + assert!(timestamp_checked_add(UNIX_EPOCH, Duration::new(i32::max_value() as u64, 0)).is_ok()); + assert!(timestamp_checked_add(UNIX_EPOCH, Duration::new(i32::max_value() as u64 - 1, 1_000_000_000)).is_ok()); + } } diff --git a/ethcore/sync/Cargo.toml b/ethcore/sync/Cargo.toml index 37417cd0a3c..9db93316261 100644 --- a/ethcore/sync/Cargo.toml +++ b/ethcore/sync/Cargo.toml @@ -9,6 +9,7 @@ authors = ["Parity Technologies "] [dependencies] common-types = { path = "../types" } +enum_primitive = "0.1.1" ethcore = { path = ".." } ethcore-io = { path = "../../util/io" } ethcore-light = { path = "../light" } diff --git a/ethcore/sync/src/api.rs b/ethcore/sync/src/api.rs index 9374b3ff250..4a66f468d59 100644 --- a/ethcore/sync/src/api.rs +++ b/ethcore/sync/src/api.rs @@ -39,8 +39,8 @@ use std::net::{SocketAddr, AddrParseError}; use std::str::FromStr; use parking_lot::{RwLock, Mutex}; use chain::{ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_62, - PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3, - PRIVATE_TRANSACTION_PACKET, SIGNED_PRIVATE_TRANSACTION_PACKET}; + PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3}; +use chain::sync_packet::SyncPacket::{PrivateTransactionPacket, SignedPrivateTransactionPacket}; use light::client::AsLightClient; use light::Provider; use light::net::{ @@ -51,6 +51,8 @@ use network::IpFilter; use private_tx::PrivateTxHandler; use types::transaction::UnverifiedTransaction; +use super::light_sync::SyncInfo; + /// Parity sync protocol pub const WARP_SYNC_PROTOCOL_ID: ProtocolId = *b"par"; /// Ethereum sync protocol @@ -577,9 +579,9 @@ impl ChainNotify for EthSync { match message_type { ChainMessageType::Consensus(message) => self.eth_handler.sync.write().propagate_consensus_packet(&mut sync_io, message), ChainMessageType::PrivateTransaction(transaction_hash, message) => - self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, PRIVATE_TRANSACTION_PACKET, message), + self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, PrivateTransactionPacket, message), ChainMessageType::SignedPrivateTransaction(transaction_hash, message) => - self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, SIGNED_PRIVATE_TRANSACTION_PACKET, message), + self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, SignedPrivateTransactionPacket, message), } }); } @@ -804,6 +806,24 @@ pub trait LightSyncProvider { fn transactions_stats(&self) -> BTreeMap; } +/// Wrapper around `light_sync::SyncInfo` to expose those methods without the concrete type `LightSync` +pub trait LightSyncInfo: Send + Sync { + /// Get the highest block advertised on the network. + fn highest_block(&self) -> Option; + + /// Get the block number at the time of sync start. + fn start_block(&self) -> u64; + + /// Whether major sync is underway. + fn is_major_importing(&self) -> bool; +} + +/// Execute a closure with a protocol context. +pub trait LightNetworkDispatcher { + /// Execute a closure with a protocol context. + fn with_context(&self, f: F) -> Option where F: FnOnce(&::light::net::BasicContext) -> T; +} + /// Configuration for the light sync. pub struct LightSyncParams { /// Network configuration. @@ -823,7 +843,7 @@ pub struct LightSyncParams { /// Service for light synchronization. pub struct LightSync { proto: Arc, - sync: Arc<::light_sync::SyncInfo + Sync + Send>, + sync: Arc, attached_protos: Vec, network: NetworkService, subprotocol_name: [u8; 3], @@ -874,15 +894,6 @@ impl LightSync { }) } - /// Execute a closure with a protocol context. - pub fn with_context(&self, f: F) -> Option - where F: FnOnce(&::light::net::BasicContext) -> T - { - self.network.with_context_eval( - self.subprotocol_name, - move |ctx| self.proto.with_context(&ctx, f), - ) - } } impl ::std::ops::Deref for LightSync { @@ -891,6 +902,16 @@ impl ::std::ops::Deref for LightSync { fn deref(&self) -> &Self::Target { &*self.sync } } + +impl LightNetworkDispatcher for LightSync { + fn with_context(&self, f: F) -> Option where F: FnOnce(&::light::net::BasicContext) -> T { + self.network.with_context_eval( + self.subprotocol_name, + move |ctx| self.proto.with_context(&ctx, f), + ) + } +} + impl ManageNetwork for LightSync { fn accept_unreserved_peers(&self) { self.network.set_non_reserved_mode(NonReservedPeerMode::Accept); @@ -991,3 +1012,17 @@ impl LightSyncProvider for LightSync { Default::default() // TODO } } + +impl LightSyncInfo for LightSync { + fn highest_block(&self) -> Option { + (*self.sync).highest_block() + } + + fn start_block(&self) -> u64 { + (*self.sync).start_block() + } + + fn is_major_importing(&self) -> bool { + (*self.sync).is_major_importing() + } +} diff --git a/ethcore/sync/src/chain/handler.rs b/ethcore/sync/src/chain/handler.rs index 83323ba996a..63ab8916139 100644 --- a/ethcore/sync/src/chain/handler.rs +++ b/ethcore/sync/src/chain/handler.rs @@ -17,6 +17,7 @@ use api::WARP_SYNC_PROTOCOL_ID; use block_sync::{BlockDownloaderImportError as DownloaderImportError, DownloadAction}; use bytes::Bytes; +use enum_primitive::FromPrimitive; use ethcore::error::{Error as EthcoreError, ErrorKind as EthcoreErrorKind, ImportErrorKind, BlockError}; use ethcore::snapshot::{ManifestData, RestorationStatus}; use ethcore::verification::queue::kind::blocks::Unverified; @@ -33,6 +34,20 @@ use types::BlockNumber; use types::block_status::BlockStatus; use types::ids::BlockId; +use super::sync_packet::{PacketInfo, SyncPacket}; +use super::sync_packet::SyncPacket::{ + StatusPacket, + NewBlockHashesPacket, + BlockHeadersPacket, + BlockBodiesPacket, + NewBlockPacket, + ReceiptsPacket, + SnapshotManifestPacket, + SnapshotDataPacket, + PrivateTransactionPacket, + SignedPrivateTransactionPacket, +}; + use super::{ BlockSet, ChainSync, @@ -48,16 +63,6 @@ use super::{ MAX_NEW_HASHES, PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_3, - BLOCK_BODIES_PACKET, - BLOCK_HEADERS_PACKET, - NEW_BLOCK_HASHES_PACKET, - NEW_BLOCK_PACKET, - PRIVATE_TRANSACTION_PACKET, - RECEIPTS_PACKET, - SIGNED_PRIVATE_TRANSACTION_PACKET, - SNAPSHOT_DATA_PACKET, - SNAPSHOT_MANIFEST_PACKET, - STATUS_PACKET, }; /// The Chain Sync Handler: handles responses from peers @@ -67,36 +72,40 @@ impl SyncHandler { /// Handle incoming packet from peer pub fn on_packet(sync: &mut ChainSync, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) { let rlp = Rlp::new(data); - let result = match packet_id { - STATUS_PACKET => SyncHandler::on_peer_status(sync, io, peer, &rlp), - BLOCK_HEADERS_PACKET => SyncHandler::on_peer_block_headers(sync, io, peer, &rlp), - BLOCK_BODIES_PACKET => SyncHandler::on_peer_block_bodies(sync, io, peer, &rlp), - RECEIPTS_PACKET => SyncHandler::on_peer_block_receipts(sync, io, peer, &rlp), - NEW_BLOCK_PACKET => SyncHandler::on_peer_new_block(sync, io, peer, &rlp), - NEW_BLOCK_HASHES_PACKET => SyncHandler::on_peer_new_hashes(sync, io, peer, &rlp), - SNAPSHOT_MANIFEST_PACKET => SyncHandler::on_snapshot_manifest(sync, io, peer, &rlp), - SNAPSHOT_DATA_PACKET => SyncHandler::on_snapshot_data(sync, io, peer, &rlp), - PRIVATE_TRANSACTION_PACKET => SyncHandler::on_private_transaction(sync, io, peer, &rlp), - SIGNED_PRIVATE_TRANSACTION_PACKET => SyncHandler::on_signed_private_transaction(sync, io, peer, &rlp), - _ => { - debug!(target: "sync", "{}: Unknown packet {}", peer, packet_id); - Ok(()) - } - }; + if let Some(packet_id) = SyncPacket::from_u8(packet_id) { + let result = match packet_id { + StatusPacket => SyncHandler::on_peer_status(sync, io, peer, &rlp), + BlockHeadersPacket => SyncHandler::on_peer_block_headers(sync, io, peer, &rlp), + BlockBodiesPacket => SyncHandler::on_peer_block_bodies(sync, io, peer, &rlp), + ReceiptsPacket => SyncHandler::on_peer_block_receipts(sync, io, peer, &rlp), + NewBlockPacket => SyncHandler::on_peer_new_block(sync, io, peer, &rlp), + NewBlockHashesPacket => SyncHandler::on_peer_new_hashes(sync, io, peer, &rlp), + SnapshotManifestPacket => SyncHandler::on_snapshot_manifest(sync, io, peer, &rlp), + SnapshotDataPacket => SyncHandler::on_snapshot_data(sync, io, peer, &rlp), + PrivateTransactionPacket => SyncHandler::on_private_transaction(sync, io, peer, &rlp), + SignedPrivateTransactionPacket => SyncHandler::on_signed_private_transaction(sync, io, peer, &rlp), + _ => { + debug!(target: "sync", "{}: Unknown packet {}", peer, packet_id.id()); + Ok(()) + } + }; - match result { - Err(DownloaderImportError::Invalid) => { - debug!(target:"sync", "{} -> Invalid packet {}", peer, packet_id); - io.disable_peer(peer); - sync.deactivate_peer(io, peer); - }, - Err(DownloaderImportError::Useless) => { - sync.deactivate_peer(io, peer); - }, - Ok(()) => { - // give a task to the same peer first - sync.sync_peer(io, peer, false); - }, + match result { + Err(DownloaderImportError::Invalid) => { + debug!(target:"sync", "{} -> Invalid packet {}", peer, packet_id.id()); + io.disable_peer(peer); + sync.deactivate_peer(io, peer); + }, + Err(DownloaderImportError::Useless) => { + sync.deactivate_peer(io, peer); + }, + Ok(()) => { + // give a task to the same peer first + sync.sync_peer(io, peer, false); + }, + } + } else { + debug!(target: "sync", "{}: Unknown packet {}", peer, packet_id); } } diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index 5a144853d6f..81f1ccffe94 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -88,6 +88,7 @@ //! All other messages are ignored. mod handler; +pub mod sync_packet; mod propagator; mod requester; mod supplier; @@ -119,6 +120,12 @@ use types::transaction::UnverifiedTransaction; use types::BlockNumber; use self::handler::SyncHandler; +use self::sync_packet::{PacketInfo, SyncPacket}; +use self::sync_packet::SyncPacket::{ + NewBlockPacket, + StatusPacket, +}; + use self::propagator::SyncPropagator; use self::requester::SyncRequester; pub(crate) use self::supplier::SyncSupplier; @@ -154,28 +161,6 @@ const MAX_TRANSACTION_PACKET_SIZE: usize = 5 * 1024 * 1024; const SNAPSHOT_RESTORE_THRESHOLD: BlockNumber = 30000; const SNAPSHOT_MIN_PEERS: usize = 3; -const STATUS_PACKET: u8 = 0x00; -const NEW_BLOCK_HASHES_PACKET: u8 = 0x01; -const TRANSACTIONS_PACKET: u8 = 0x02; -pub const GET_BLOCK_HEADERS_PACKET: u8 = 0x03; -pub const BLOCK_HEADERS_PACKET: u8 = 0x04; -pub const GET_BLOCK_BODIES_PACKET: u8 = 0x05; -const BLOCK_BODIES_PACKET: u8 = 0x06; -const NEW_BLOCK_PACKET: u8 = 0x07; - -pub const GET_NODE_DATA_PACKET: u8 = 0x0d; -pub const NODE_DATA_PACKET: u8 = 0x0e; -pub const GET_RECEIPTS_PACKET: u8 = 0x0f; -pub const RECEIPTS_PACKET: u8 = 0x10; - -pub const GET_SNAPSHOT_MANIFEST_PACKET: u8 = 0x11; -pub const SNAPSHOT_MANIFEST_PACKET: u8 = 0x12; -pub const GET_SNAPSHOT_DATA_PACKET: u8 = 0x13; -pub const SNAPSHOT_DATA_PACKET: u8 = 0x14; -pub const CONSENSUS_DATA_PACKET: u8 = 0x15; -pub const PRIVATE_TRANSACTION_PACKET: u8 = 0x16; -pub const SIGNED_PRIVATE_TRANSACTION_PACKET: u8 = 0x17; - const MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD: usize = 3; const WAIT_PEERS_TIMEOUT: Duration = Duration::from_secs(5); @@ -484,7 +469,7 @@ impl ChainSyncApi { for peers in sync.get_peers(&chain_info, PeerState::SameBlock).chunks(10) { check_deadline(deadline)?; for peer in peers { - SyncPropagator::send_packet(io, *peer, NEW_BLOCK_PACKET, rlp.clone()); + SyncPropagator::send_packet(io, *peer, NewBlockPacket, rlp.clone()); if let Some(ref mut peer) = sync.peers.get_mut(peer) { peer.latest_hash = hash; } @@ -1146,7 +1131,7 @@ impl ChainSync { } } packet.complete_unbounded_list(); - io.respond(STATUS_PACKET, packet.out()) + io.respond(StatusPacket.id(), packet.out()) } pub fn maintain_peers(&mut self, io: &mut SyncIo) { @@ -1331,7 +1316,7 @@ impl ChainSync { } /// Broadcast private transaction message to peers. - pub fn propagate_private_transaction(&mut self, io: &mut SyncIo, transaction_hash: H256, packet_id: PacketId, packet: Bytes) { + pub fn propagate_private_transaction(&mut self, io: &mut SyncIo, transaction_hash: H256, packet_id: SyncPacket, packet: Bytes) { SyncPropagator::propagate_private_transaction(self, io, transaction_hash, packet_id, packet); } } diff --git a/ethcore/sync/src/chain/propagator.rs b/ethcore/sync/src/chain/propagator.rs index 9d50fafc14c..c3654553ffd 100644 --- a/ethcore/sync/src/chain/propagator.rs +++ b/ethcore/sync/src/chain/propagator.rs @@ -20,8 +20,8 @@ use std::collections::HashSet; use bytes::Bytes; use ethereum_types::H256; use fastmap::H256FastSet; -use network::{PeerId, PacketId}; use network::client_version::ClientCapabilities; +use network::PeerId; use rand::Rng; use rlp::{Encodable, RlpStream}; use sync_io::SyncIo; @@ -29,6 +29,14 @@ use types::transaction::SignedTransaction; use types::BlockNumber; use types::blockchain_info::BlockChainInfo; +use super::sync_packet::SyncPacket; +use super::sync_packet::SyncPacket::{ + NewBlockHashesPacket, + TransactionsPacket, + NewBlockPacket, + ConsensusDataPacket, +}; + use super::{ random, ChainSync, @@ -36,13 +44,8 @@ use super::{ MAX_PEER_LAG_PROPAGATION, MAX_PEERS_PROPAGATION, MIN_PEERS_PROPAGATION, - CONSENSUS_DATA_PACKET, - NEW_BLOCK_HASHES_PACKET, - NEW_BLOCK_PACKET, - TRANSACTIONS_PACKET, }; - /// The Chain Sync Propagator: propagates data to peers pub struct SyncPropagator; @@ -53,7 +56,8 @@ impl SyncPropagator { let sent = peers.len(); let mut send_packet = |io: &mut SyncIo, rlp: Bytes| { for peer_id in peers { - SyncPropagator::send_packet(io, *peer_id, NEW_BLOCK_PACKET, rlp.clone()); + SyncPropagator::send_packet(io, *peer_id, NewBlockPacket, rlp.clone()); + if let Some(ref mut peer) = sync.peers.get_mut(peer_id) { peer.latest_hash = chain_info.best_block_hash.clone(); } @@ -88,7 +92,7 @@ impl SyncPropagator { if let Some(ref mut peer) = sync.peers.get_mut(peer_id) { peer.latest_hash = best_block_hash; } - SyncPropagator::send_packet(io, *peer_id, NEW_BLOCK_HASHES_PACKET, rlp.clone()); + SyncPropagator::send_packet(io, *peer_id, NewBlockHashesPacket, rlp.clone()); } sent } @@ -156,7 +160,7 @@ impl SyncPropagator { let send_packet = |io: &mut SyncIo, peer_id: PeerId, sent: usize, rlp: Bytes| { let size = rlp.len(); - SyncPropagator::send_packet(io, peer_id, TRANSACTIONS_PACKET, rlp); + SyncPropagator::send_packet(io, peer_id, TransactionsPacket, rlp); trace!(target: "sync", "{:02} <- Transactions ({} entries; {} bytes)", peer_id, sent, size); }; @@ -275,7 +279,7 @@ impl SyncPropagator { io.chain().chain_info().total_difficulty ); for peer_id in &peers { - SyncPropagator::send_packet(io, *peer_id, NEW_BLOCK_PACKET, rlp.clone()); + SyncPropagator::send_packet(io, *peer_id, NewBlockPacket, rlp.clone()); } } } @@ -285,12 +289,12 @@ impl SyncPropagator { let lucky_peers = ChainSync::select_random_peers(&sync.get_consensus_peers()); trace!(target: "sync", "Sending consensus packet to {:?}", lucky_peers); for peer_id in lucky_peers { - SyncPropagator::send_packet(io, peer_id, CONSENSUS_DATA_PACKET, packet.clone()); + SyncPropagator::send_packet(io, peer_id, ConsensusDataPacket, packet.clone()); } } /// Broadcast private transaction message to peers. - pub fn propagate_private_transaction(sync: &mut ChainSync, io: &mut SyncIo, transaction_hash: H256, packet_id: PacketId, packet: Bytes) { + pub fn propagate_private_transaction(sync: &mut ChainSync, io: &mut SyncIo, transaction_hash: H256, packet_id: SyncPacket, packet: Bytes) { let lucky_peers = ChainSync::select_random_peers(&sync.get_private_transaction_peers(&transaction_hash)); if lucky_peers.is_empty() { error!(target: "privatetx", "Cannot propagate the packet, no peers with private tx enabled connected"); @@ -321,7 +325,7 @@ impl SyncPropagator { } /// Generic packet sender - pub fn send_packet(sync: &mut SyncIo, peer_id: PeerId, packet_id: PacketId, packet: Bytes) { + pub fn send_packet(sync: &mut SyncIo, peer_id: PeerId, packet_id: SyncPacket, packet: Bytes) { if let Err(e) = sync.send(peer_id, packet_id, packet) { debug!(target:"sync", "Error sending packet: {:?}", e); sync.disconnect_peer(peer_id); diff --git a/ethcore/sync/src/chain/requester.rs b/ethcore/sync/src/chain/requester.rs index 09eec748ec9..31d3ce59006 100644 --- a/ethcore/sync/src/chain/requester.rs +++ b/ethcore/sync/src/chain/requester.rs @@ -14,26 +14,28 @@ // You should have received a copy of the GNU General Public License // along with Parity Ethereum. If not, see . -use api::WARP_SYNC_PROTOCOL_ID; use block_sync::BlockRequest; use bytes::Bytes; use ethereum_types::H256; -use network::{PeerId, PacketId}; +use network::{PeerId}; use rlp::RlpStream; use std::time::Instant; use sync_io::SyncIo; use types::BlockNumber; +use super::sync_packet::SyncPacket; +use super::sync_packet::SyncPacket::{ + GetBlockHeadersPacket, + GetBlockBodiesPacket, + GetReceiptsPacket, + GetSnapshotManifestPacket, + GetSnapshotDataPacket, +}; + use super::{ BlockSet, ChainSync, PeerAsking, - ETH_PROTOCOL_VERSION_63, - GET_BLOCK_BODIES_PACKET, - GET_BLOCK_HEADERS_PACKET, - GET_RECEIPTS_PACKET, - GET_SNAPSHOT_DATA_PACKET, - GET_SNAPSHOT_MANIFEST_PACKET, }; /// The Chain Sync Requester: requesting data to other peers @@ -62,8 +64,7 @@ impl SyncRequester { for h in &hashes { rlp.append(&h.clone()); } - - SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockBodies, GET_BLOCK_BODIES_PACKET, rlp.out()); + SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockBodies, GetBlockBodiesPacket, rlp.out()); let peer = sync.peers.get_mut(&peer_id).expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed"); peer.asking_blocks = hashes; peer.block_set = Some(set); @@ -77,7 +78,7 @@ impl SyncRequester { rlp.append(&1u32); rlp.append(&0u32); rlp.append(&0u32); - SyncRequester::send_request(sync, io, peer_id, PeerAsking::ForkHeader, GET_BLOCK_HEADERS_PACKET, rlp.out()); + SyncRequester::send_request(sync, io, peer_id, PeerAsking::ForkHeader, GetBlockHeadersPacket, rlp.out()); } /// Find some headers or blocks to download for a peer. @@ -95,7 +96,7 @@ impl SyncRequester { pub fn request_snapshot_manifest(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId) { trace!(target: "sync", "{} <- GetSnapshotManifest", peer_id); let rlp = RlpStream::new_list(0); - SyncRequester::send_request(sync, io, peer_id, PeerAsking::SnapshotManifest, GET_SNAPSHOT_MANIFEST_PACKET, rlp.out()); + SyncRequester::send_request(sync, io, peer_id, PeerAsking::SnapshotManifest, GetSnapshotManifestPacket, rlp.out()); } /// Request headers from a peer by block hash @@ -106,7 +107,7 @@ impl SyncRequester { rlp.append(&count); rlp.append(&skip); rlp.append(&if reverse {1u32} else {0u32}); - SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockHeaders, GET_BLOCK_HEADERS_PACKET, rlp.out()); + SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockHeaders, GetBlockHeadersPacket, rlp.out()); let peer = sync.peers.get_mut(&peer_id).expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed"); peer.asking_hash = Some(h.clone()); peer.block_set = Some(set); @@ -119,7 +120,7 @@ impl SyncRequester { for h in &hashes { rlp.append(&h.clone()); } - SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockReceipts, GET_RECEIPTS_PACKET, rlp.out()); + SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockReceipts, GetReceiptsPacket, rlp.out()); let peer = sync.peers.get_mut(&peer_id).expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed"); peer.asking_blocks = hashes; peer.block_set = Some(set); @@ -130,23 +131,20 @@ impl SyncRequester { trace!(target: "sync", "{} <- GetSnapshotData {:?}", peer_id, chunk); let mut rlp = RlpStream::new_list(1); rlp.append(chunk); - SyncRequester::send_request(sync, io, peer_id, PeerAsking::SnapshotData, GET_SNAPSHOT_DATA_PACKET, rlp.out()); + SyncRequester::send_request(sync, io, peer_id, PeerAsking::SnapshotData, GetSnapshotDataPacket, rlp.out()); } /// Generic request sender - fn send_request(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, asking: PeerAsking, packet_id: PacketId, packet: Bytes) { + fn send_request(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, asking: PeerAsking, packet_id: SyncPacket, packet: Bytes) { if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) { if peer.asking != PeerAsking::Nothing { warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking); } peer.asking = asking; peer.ask_time = Instant::now(); - // TODO [ToDr] This seems quite fragile. Be careful when protocol is updated. - let result = if packet_id >= ETH_PROTOCOL_VERSION_63.1 { - io.send_protocol(WARP_SYNC_PROTOCOL_ID, peer_id, packet_id, packet) - } else { - io.send(peer_id, packet_id, packet) - }; + + let result = io.send(peer_id, packet_id, packet); + if let Err(e) = result { debug!(target:"sync", "Error sending request: {:?}", e); io.disconnect_peer(peer_id); diff --git a/ethcore/sync/src/chain/supplier.rs b/ethcore/sync/src/chain/supplier.rs index d691cb35afd..7e71e6aeec7 100644 --- a/ethcore/sync/src/chain/supplier.rs +++ b/ethcore/sync/src/chain/supplier.rs @@ -15,6 +15,7 @@ // along with Parity Ethereum. If not, see . use bytes::Bytes; +use enum_primitive::FromPrimitive; use ethereum_types::H256; use network::{self, PeerId}; use parking_lot::RwLock; @@ -25,30 +26,34 @@ use types::ids::BlockId; use sync_io::SyncIo; +use super::sync_packet::{PacketInfo, SyncPacket}; +use super::sync_packet::SyncPacket::{ + StatusPacket, + TransactionsPacket, + GetBlockHeadersPacket, + BlockHeadersPacket, + GetBlockBodiesPacket, + BlockBodiesPacket, + GetNodeDataPacket, + NodeDataPacket, + GetReceiptsPacket, + ReceiptsPacket, + GetSnapshotManifestPacket, + SnapshotManifestPacket, + GetSnapshotDataPacket, + SnapshotDataPacket, + ConsensusDataPacket, +}; + use super::{ ChainSync, SyncHandler, RlpResponseResult, PacketDecodeError, - BLOCK_BODIES_PACKET, - BLOCK_HEADERS_PACKET, - CONSENSUS_DATA_PACKET, - GET_BLOCK_BODIES_PACKET, - GET_BLOCK_HEADERS_PACKET, - GET_NODE_DATA_PACKET, - GET_RECEIPTS_PACKET, - GET_SNAPSHOT_DATA_PACKET, - GET_SNAPSHOT_MANIFEST_PACKET, MAX_BODIES_TO_SEND, MAX_HEADERS_TO_SEND, MAX_NODE_DATA_TO_SEND, MAX_RECEIPTS_HEADERS_TO_SEND, - NODE_DATA_PACKET, - RECEIPTS_PACKET, - SNAPSHOT_DATA_PACKET, - SNAPSHOT_MANIFEST_PACKET, - STATUS_PACKET, - TRANSACTIONS_PACKET, }; /// The Chain Sync Supplier: answers requests from peers with available data @@ -56,72 +61,83 @@ pub struct SyncSupplier; impl SyncSupplier { /// Dispatch incoming requests and responses + // Take a u8 and not a SyncPacketId because this is the entry point + // to chain sync from the outside world. pub fn dispatch_packet(sync: &RwLock, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) { let rlp = Rlp::new(data); - let result = match packet_id { - GET_BLOCK_BODIES_PACKET => SyncSupplier::return_rlp(io, &rlp, peer, - SyncSupplier::return_block_bodies, - |e| format!("Error sending block bodies: {:?}", e)), - - GET_BLOCK_HEADERS_PACKET => SyncSupplier::return_rlp(io, &rlp, peer, - SyncSupplier::return_block_headers, - |e| format!("Error sending block headers: {:?}", e)), - - GET_RECEIPTS_PACKET => SyncSupplier::return_rlp(io, &rlp, peer, - SyncSupplier::return_receipts, - |e| format!("Error sending receipts: {:?}", e)), - - GET_NODE_DATA_PACKET => SyncSupplier::return_rlp(io, &rlp, peer, - SyncSupplier::return_node_data, - |e| format!("Error sending nodes: {:?}", e)), - - GET_SNAPSHOT_MANIFEST_PACKET => SyncSupplier::return_rlp(io, &rlp, peer, - SyncSupplier::return_snapshot_manifest, - |e| format!("Error sending snapshot manifest: {:?}", e)), - - GET_SNAPSHOT_DATA_PACKET => SyncSupplier::return_rlp(io, &rlp, peer, - SyncSupplier::return_snapshot_data, - |e| format!("Error sending snapshot data: {:?}", e)), - - STATUS_PACKET => { - sync.write().on_packet(io, peer, packet_id, data); - Ok(()) - }, - // Packets that require the peer to be confirmed - _ => { - if !sync.read().peers.contains_key(&peer) { - debug!(target:"sync", "Unexpected packet {} from unregistered peer: {}:{}", packet_id, peer, io.peer_version(peer)); - return; - } - debug!(target: "sync", "{} -> Dispatching packet: {}", peer, packet_id); - - match packet_id { - CONSENSUS_DATA_PACKET => { - SyncHandler::on_consensus_packet(io, peer, &rlp) - }, - TRANSACTIONS_PACKET => { - let res = { - let sync_ro = sync.read(); - SyncHandler::on_peer_transactions(&*sync_ro, io, peer, &rlp) - }; - if res.is_err() { - // peer sent invalid data, disconnect. - io.disable_peer(peer); - sync.write().deactivate_peer(io, peer); + if let Some(id) = SyncPacket::from_u8(packet_id) { + let result = match id { + GetBlockBodiesPacket => SyncSupplier::return_rlp( + io, &rlp, peer, + SyncSupplier::return_block_bodies, + |e| format!("Error sending block bodies: {:?}", e)), + + GetBlockHeadersPacket => SyncSupplier::return_rlp( + io, &rlp, peer, + SyncSupplier::return_block_headers, + |e| format!("Error sending block headers: {:?}", e)), + + GetReceiptsPacket => SyncSupplier::return_rlp( + io, &rlp, peer, + SyncSupplier::return_receipts, + |e| format!("Error sending receipts: {:?}", e)), + + GetNodeDataPacket => SyncSupplier::return_rlp( + io, &rlp, peer, + SyncSupplier::return_node_data, + |e| format!("Error sending nodes: {:?}", e)), + + GetSnapshotManifestPacket => SyncSupplier::return_rlp( + io, &rlp, peer, + SyncSupplier::return_snapshot_manifest, + |e| format!("Error sending snapshot manifest: {:?}", e)), + + GetSnapshotDataPacket => SyncSupplier::return_rlp( + io, &rlp, peer, + SyncSupplier::return_snapshot_data, + |e| format!("Error sending snapshot data: {:?}", e)), + + StatusPacket => { + sync.write().on_packet(io, peer, packet_id, data); + Ok(()) + }, + // Packets that require the peer to be confirmed + _ => { + if !sync.read().peers.contains_key(&peer) { + debug!(target:"sync", "Unexpected packet {} from unregistered peer: {}:{}", packet_id, peer, io.peer_version(peer)); + return; + } + debug!(target: "sync", "{} -> Dispatching packet: {}", peer, packet_id); + + match id { + ConsensusDataPacket => { + SyncHandler::on_consensus_packet(io, peer, &rlp) + }, + TransactionsPacket => { + let res = { + let sync_ro = sync.read(); + SyncHandler::on_peer_transactions(&*sync_ro, io, peer, &rlp) + }; + if res.is_err() { + // peer sent invalid data, disconnect. + io.disable_peer(peer); + sync.write().deactivate_peer(io, peer); + } + }, + _ => { + sync.write().on_packet(io, peer, packet_id, data); } - }, - _ => { - sync.write().on_packet(io, peer, packet_id, data); } + + Ok(()) } + }; - Ok(()) - } - }; - result.unwrap_or_else(|e| { - debug!(target:"sync", "{} -> Malformed packet {} : {}", peer, packet_id, e); - }) + result.unwrap_or_else(|e| { + debug!(target:"sync", "{} -> Malformed packet {} : {}", peer, packet_id, e); + }) + } } /// Respond to GetBlockHeaders request @@ -148,11 +164,11 @@ impl SyncSupplier { trace!(target:"sync", "Returning single header: {:?}", hash); let mut rlp = RlpStream::new_list(1); rlp.append_raw(&hdr.into_inner(), 1); - return Ok(Some((BLOCK_HEADERS_PACKET, rlp))); + return Ok(Some((BlockHeadersPacket.id(), rlp))); } number } - None => return Ok(Some((BLOCK_HEADERS_PACKET, RlpStream::new_list(0)))) //no such header, return nothing + None => return Ok(Some((BlockHeadersPacket.id(), RlpStream::new_list(0)))) //no such header, return nothing } } else { let number = r.val_at::(0)?; @@ -202,7 +218,7 @@ impl SyncSupplier { let mut rlp = RlpStream::new_list(count as usize); rlp.append_raw(&data, count as usize); trace!(target: "sync", "{} -> GetBlockHeaders: returned {} entries", peer_id, count); - Ok(Some((BLOCK_HEADERS_PACKET, rlp))) + Ok(Some((BlockHeadersPacket.id(), rlp))) } /// Respond to GetBlockBodies request @@ -229,7 +245,7 @@ impl SyncSupplier { let mut rlp = RlpStream::new_list(added); rlp.append_raw(&data, added); trace!(target: "sync", "{} -> GetBlockBodies: returned {} entries", peer_id, added); - Ok(Some((BLOCK_BODIES_PACKET, rlp))) + Ok(Some((BlockBodiesPacket.id(), rlp))) } /// Respond to GetNodeData request @@ -261,7 +277,7 @@ impl SyncSupplier { for d in data { rlp.append(&d); } - Ok(Some((NODE_DATA_PACKET, rlp))) + Ok(Some((NodeDataPacket.id(), rlp))) } fn return_receipts(io: &SyncIo, rlp: &Rlp, peer_id: PeerId) -> RlpResponseResult { @@ -287,7 +303,7 @@ impl SyncSupplier { } let mut rlp_result = RlpStream::new_list(added_headers); rlp_result.append_raw(&data, added_headers); - Ok(Some((RECEIPTS_PACKET, rlp_result))) + Ok(Some((ReceiptsPacket.id(), rlp_result))) } /// Respond to GetSnapshotManifest request @@ -310,7 +326,7 @@ impl SyncSupplier { RlpStream::new_list(0) } }; - Ok(Some((SNAPSHOT_MANIFEST_PACKET, rlp))) + Ok(Some((SnapshotManifestPacket.id(), rlp))) } /// Respond to GetSnapshotData request @@ -329,7 +345,7 @@ impl SyncSupplier { RlpStream::new_list(0) } }; - Ok(Some((SNAPSHOT_DATA_PACKET, rlp))) + Ok(Some((SnapshotDataPacket.id(), rlp))) } fn return_rlp(io: &mut SyncIo, rlp: &Rlp, peer: PeerId, rlp_func: FRlp, error_func: FError) -> Result<(), PacketDecodeError> @@ -491,7 +507,7 @@ mod test { io.sender = Some(2usize); - SyncSupplier::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, GET_NODE_DATA_PACKET, &node_request); + SyncSupplier::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, GetNodeDataPacket.id(), &node_request); assert_eq!(1, io.packets.len()); } @@ -533,7 +549,7 @@ mod test { assert_eq!(603, rlp_result.unwrap().1.out().len()); io.sender = Some(2usize); - SyncSupplier::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, GET_RECEIPTS_PACKET, &receipts_request); + SyncSupplier::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, GetReceiptsPacket.id(), &receipts_request); assert_eq!(1, io.packets.len()); } } diff --git a/ethcore/sync/src/chain/sync_packet.rs b/ethcore/sync/src/chain/sync_packet.rs new file mode 100644 index 00000000000..3891090f65e --- /dev/null +++ b/ethcore/sync/src/chain/sync_packet.rs @@ -0,0 +1,141 @@ +// Copyright 2015-2019 Parity Technologies (UK) Ltd. +// This file is part of Parity Ethereum. + +// Parity Ethereum is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Ethereum is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Ethereum. If not, see . + +//! When sending packets over p2p we specify both which subprotocol +//! to use and what kind of packet we are sending (through a packet id). +//! Likewise when receiving packets from other peers we decode the +//! subprotocol and the packet id. This module helps coupling both +//! pieces of information together and provides an easy mechanism +//! to convert to/from the packet id values transmitted over the +//! wire. + +use api::{ETH_PROTOCOL, WARP_SYNC_PROTOCOL_ID}; +use network::{PacketId, ProtocolId}; + +/// An enum that defines all known packet ids in the context of +/// synchronization and provides a mechanism to convert from +/// packet ids (of type PacketId or u8) directly read from the network +/// to enum variants. This implicitly provides a mechanism to +/// check whether a given packet id is known, and to prevent +/// packet id clashes when defining new ids. +enum_from_primitive! { +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum SyncPacket { + StatusPacket = 0x00, + NewBlockHashesPacket = 0x01, + TransactionsPacket = 0x02, + GetBlockHeadersPacket = 0x03, + BlockHeadersPacket = 0x04, + GetBlockBodiesPacket = 0x05, + BlockBodiesPacket = 0x06, + NewBlockPacket = 0x07, + + GetNodeDataPacket = 0x0d, + NodeDataPacket = 0x0e, + GetReceiptsPacket = 0x0f, + ReceiptsPacket = 0x10, + + GetSnapshotManifestPacket = 0x11, + SnapshotManifestPacket = 0x12, + GetSnapshotDataPacket = 0x13, + SnapshotDataPacket = 0x14, + ConsensusDataPacket = 0x15, + PrivateTransactionPacket = 0x16, + SignedPrivateTransactionPacket = 0x17, +} +} + +use self::SyncPacket::*; + +/// Provide both subprotocol and packet id information within the +/// same object. +pub trait PacketInfo { + fn id(&self) -> PacketId; + fn protocol(&self) -> ProtocolId; +} + +// The mechanism to match packet ids and protocol may be improved +// through some macro magic, but for now this works. +impl PacketInfo for SyncPacket { + fn protocol(&self) -> ProtocolId { + match self { + StatusPacket | + NewBlockHashesPacket | + TransactionsPacket | + GetBlockHeadersPacket | + BlockHeadersPacket | + GetBlockBodiesPacket | + BlockBodiesPacket | + NewBlockPacket | + + GetNodeDataPacket| + NodeDataPacket | + GetReceiptsPacket | + ReceiptsPacket + + => ETH_PROTOCOL, + + GetSnapshotManifestPacket| + SnapshotManifestPacket | + GetSnapshotDataPacket | + SnapshotDataPacket | + ConsensusDataPacket | + PrivateTransactionPacket | + SignedPrivateTransactionPacket + + => WARP_SYNC_PROTOCOL_ID, + } + } + + fn id(&self) -> PacketId { + (*self) as PacketId + } +} + + +#[cfg(test)] +mod tests { + use super::*; + + use enum_primitive::FromPrimitive; + + #[test] + fn packet_ids_from_u8_when_from_primitive_zero_then_equals_status_packet() { + assert_eq!(SyncPacket::from_u8(0x00), Some(StatusPacket)); + } + + #[test] + fn packet_ids_from_u8_when_from_primitive_eleven_then_equals_get_snapshot_manifest_packet() { + assert_eq!(SyncPacket::from_u8(0x11), Some(GetSnapshotManifestPacket)); + } + + #[test] + fn packet_ids_from_u8_when_invalid_packet_id_then_none() { + assert!(SyncPacket::from_u8(0x99).is_none()); + } + + #[test] + fn when_status_packet_then_id_and_protocol_match() { + assert_eq!(StatusPacket.id(), StatusPacket as PacketId); + assert_eq!(StatusPacket.protocol(), ETH_PROTOCOL); + } + + #[test] + fn when_consensus_data_packet_then_id_and_protocol_match() { + assert_eq!(ConsensusDataPacket.id(), ConsensusDataPacket as PacketId); + assert_eq!(ConsensusDataPacket.protocol(), WARP_SYNC_PROTOCOL_ID); + } +} diff --git a/ethcore/sync/src/lib.rs b/ethcore/sync/src/lib.rs index e5bb2c4a76b..8a1e19569a4 100644 --- a/ethcore/sync/src/lib.rs +++ b/ethcore/sync/src/lib.rs @@ -44,6 +44,8 @@ extern crate ethcore_light as light; #[cfg(test)] extern crate kvdb_memorydb; #[cfg(test)] extern crate rustc_hex; +#[macro_use] +extern crate enum_primitive; #[macro_use] extern crate macros; #[macro_use] diff --git a/ethcore/sync/src/sync_io.rs b/ethcore/sync/src/sync_io.rs index b4cd9a08fd1..56bf98ab2ee 100644 --- a/ethcore/sync/src/sync_io.rs +++ b/ethcore/sync/src/sync_io.rs @@ -15,6 +15,7 @@ // along with Parity Ethereum. If not, see . use std::collections::HashMap; +use chain::sync_packet::{PacketInfo, SyncPacket}; use network::{NetworkContext, PeerId, PacketId, Error, SessionInfo, ProtocolId}; use network::client_version::ClientVersion; use bytes::Bytes; @@ -33,10 +34,8 @@ pub trait SyncIo { fn disconnect_peer(&mut self, peer_id: PeerId); /// Respond to current request with a packet. Can be called from an IO handler for incoming packet. fn respond(&mut self, packet_id: PacketId, data: Vec) -> Result<(), Error>; - /// Send a packet to a peer. - fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), Error>; /// Send a packet to a peer using specified protocol. - fn send_protocol(&mut self, protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), Error>; + fn send(&mut self, peer_id: PeerId, packet_id: SyncPacket, data: Vec) -> Result<(), Error>; /// Get the blockchain fn chain(&self) -> &BlockChainClient; /// Get the snapshot service. @@ -99,12 +98,8 @@ impl<'s> SyncIo for NetSyncIo<'s> { self.network.respond(packet_id, data) } - fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), Error>{ - self.network.send(peer_id, packet_id, data) - } - - fn send_protocol(&mut self, protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), Error>{ - self.network.send_protocol(protocol, peer_id, packet_id, data) + fn send(&mut self, peer_id: PeerId, packet_id: SyncPacket, data: Vec) -> Result<(), Error>{ + self.network.send_protocol(packet_id.protocol(), peer_id, packet_id.id(), data) } fn chain(&self) -> &BlockChainClient { diff --git a/ethcore/sync/src/tests/helpers.rs b/ethcore/sync/src/tests/helpers.rs index cdc55fba069..8bc4b542e28 100644 --- a/ethcore/sync/src/tests/helpers.rs +++ b/ethcore/sync/src/tests/helpers.rs @@ -31,7 +31,10 @@ use ethcore::test_helpers; use sync_io::SyncIo; use io::{IoChannel, IoContext, IoHandler}; use api::WARP_SYNC_PROTOCOL_ID; -use chain::{ChainSync, ETH_PROTOCOL_VERSION_63, PAR_PROTOCOL_VERSION_3, PRIVATE_TRANSACTION_PACKET, SIGNED_PRIVATE_TRANSACTION_PACKET, SyncSupplier}; +use chain::{ChainSync, SyncSupplier, ETH_PROTOCOL_VERSION_63, PAR_PROTOCOL_VERSION_3}; +use chain::sync_packet::{PacketInfo, SyncPacket}; +use chain::sync_packet::SyncPacket::{PrivateTransactionPacket, SignedPrivateTransactionPacket}; + use SyncConfig; use private_tx::SimplePrivateTxHandler; use types::BlockNumber; @@ -102,19 +105,15 @@ impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p { Ok(()) } - fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), network::Error> { + fn send(&mut self,peer_id: PeerId, packet_id: SyncPacket, data: Vec) -> Result<(), network::Error> { self.packets.push(TestPacket { data: data, - packet_id: packet_id, + packet_id: packet_id.id(), recipient: peer_id, }); Ok(()) } - fn send_protocol(&mut self, _protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), network::Error> { - self.send(peer_id, packet_id, data) - } - fn chain(&self) -> &BlockChainClient { &*self.chain } @@ -236,9 +235,9 @@ impl EthPeer where C: FlushingBlockChainClient { match message { ChainMessageType::Consensus(data) => self.sync.write().propagate_consensus_packet(&mut io, data), ChainMessageType::PrivateTransaction(transaction_hash, data) => - self.sync.write().propagate_private_transaction(&mut io, transaction_hash, PRIVATE_TRANSACTION_PACKET, data), + self.sync.write().propagate_private_transaction(&mut io, transaction_hash, PrivateTransactionPacket, data), ChainMessageType::SignedPrivateTransaction(transaction_hash, data) => - self.sync.write().propagate_private_transaction(&mut io, transaction_hash, SIGNED_PRIVATE_TRANSACTION_PACKET, data), + self.sync.write().propagate_private_transaction(&mut io, transaction_hash, SignedPrivateTransactionPacket, data), } } diff --git a/json/src/spec/spec.rs b/json/src/spec/spec.rs index f8f1217b479..68824cad99b 100644 --- a/json/src/spec/spec.rs +++ b/json/src/spec/spec.rs @@ -30,6 +30,7 @@ pub enum ForkSpec { Homestead, Byzantium, Constantinople, + ConstantinopleFix, EIP158ToByzantiumAt5, FrontierToHomesteadAt5, HomesteadToDaoAt5, diff --git a/miner/src/gas_pricer.rs b/miner/src/gas_pricer.rs index 0f851a9f862..c4e04442f19 100644 --- a/miner/src/gas_pricer.rs +++ b/miner/src/gas_pricer.rs @@ -45,7 +45,7 @@ impl GasPricer { /// Recalibrate current gas price. pub fn recalibrate(&mut self, set_price: F) { match *self { - GasPricer::Fixed(ref max) => set_price(max.clone()), + GasPricer::Fixed(ref curr) => set_price(curr.clone()), #[cfg(feature = "price-info")] GasPricer::Calibrated(ref mut cal) => cal.recalibrate(set_price), } diff --git a/miner/src/pool/queue.rs b/miner/src/pool/queue.rs index 22278c3e858..51c46ad823c 100644 --- a/miner/src/pool/queue.rs +++ b/miner/src/pool/queue.rs @@ -474,7 +474,7 @@ impl TransactionQueue { self.pool.read().pending_from_sender(state_readiness, address) .last() - .map(|tx| tx.signed().nonce + 1) + .map(|tx| tx.signed().nonce.saturating_add(U256::from(1))) } /// Retrieve a transaction from the pool. diff --git a/miner/src/pool/ready.rs b/miner/src/pool/ready.rs index 20507186e7a..3accba13903 100644 --- a/miner/src/pool/ready.rs +++ b/miner/src/pool/ready.rs @@ -95,7 +95,7 @@ impl txpool::Ready for State { }, cmp::Ordering::Less => txpool::Readiness::Stale, cmp::Ordering::Equal => { - *nonce = *nonce + 1; + *nonce = nonce.saturating_add(U256::from(1)); txpool::Readiness::Ready }, } @@ -159,7 +159,7 @@ impl Option> txpool::Ready for Opt cmp::Ordering::Greater => txpool::Readiness::Future, cmp::Ordering::Less => txpool::Readiness::Stale, cmp::Ordering::Equal => { - *nonce = *nonce + 1; + *nonce = nonce.saturating_add(U256::from(1)); txpool::Readiness::Ready }, } diff --git a/miner/src/pool/tests/mod.rs b/miner/src/pool/tests/mod.rs index ad9f6824fae..0eb223dba00 100644 --- a/miner/src/pool/tests/mod.rs +++ b/miner/src/pool/tests/mod.rs @@ -97,7 +97,7 @@ fn should_return_correct_nonces_when_dropped_because_of_limit() { Ok(()) ]); assert_eq!(txq.status().status.transaction_count, 3); - // tx2 transacton got dropped because of limit + // tx2 transaction got dropped because of limit // tx1 and tx1' are kept, because they have lower insertion_ids so they are preferred. assert_eq!(txq.next_nonce(TestClient::new(), &sender), None); } diff --git a/miner/src/pool/verifier.rs b/miner/src/pool/verifier.rs index 65835230d8e..1fded37630e 100644 --- a/miner/src/pool/verifier.rs +++ b/miner/src/pool/verifier.rs @@ -283,7 +283,19 @@ impl txpool::Verifier for Verifier. use std::{mem, ptr}; +use std::ffi::c_void; use std::sync::Arc; -use std::time::Duration; -use std::thread; -use std::os::raw::c_void; -use {parity_config_from_cli, parity_destroy, parity_set_logger, parity_start, parity_unsubscribe_ws, ParityParams, error}; +use {Callback, parity_config_from_cli, parity_destroy, parity_rpc_worker, parity_start, parity_set_logger, + parity_unsubscribe_ws, parity_ws_worker, ParityParams}; -use futures::{Future, Stream}; -use futures::sync::mpsc; use jni::{JavaVM, JNIEnv}; use jni::objects::{JClass, JString, JObject, JValue, GlobalRef}; use jni::sys::{jlong, jobjectArray, va_list}; -use tokio_current_thread::CurrentThread; -use parity_ethereum::{RunningClient, PubSubSession}; +use parity_ethereum::RunningClient; type CheckedQuery<'a> = (&'a RunningClient, String, JavaVM, GlobalRef); // Creates a Java callback to a static method named `void callback(Object)` -struct Callback<'a> { +struct JavaCallback<'a> { jvm: JavaVM, callback: GlobalRef, method_name: &'a str, method_descriptor: &'a str, } -unsafe impl<'a> Send for Callback<'a> {} -unsafe impl<'a> Sync for Callback<'a> {} -impl<'a> Callback<'a> { +unsafe impl<'a> Send for JavaCallback<'a> {} +unsafe impl<'a> Sync for JavaCallback<'a> {} + +impl<'a> JavaCallback<'a> { fn new(jvm: JavaVM, callback: GlobalRef) -> Self { Self { jvm, @@ -51,7 +48,9 @@ impl<'a> Callback<'a> { method_descriptor: "(Ljava/lang/Object;)V", } } +} +impl<'a> Callback for JavaCallback<'a> { fn call(&self, msg: &str) { let env = self.jvm.attach_current_thread().expect("JavaVM should have an environment; qed"); let java_str = env.new_string(msg.to_string()).expect("Rust String is valid JString; qed"); @@ -63,13 +62,13 @@ impl<'a> Callback<'a> { #[no_mangle] pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_configFromCli(env: JNIEnv, _: JClass, cli: jobjectArray) -> jlong { - let cli_len = env.get_array_length(cli).expect("invalid Java bindings"); + let cli_len = env.get_array_length(cli).expect("invalid Java bindings") as usize; - let mut jni_strings = Vec::with_capacity(cli_len as usize); - let mut opts = Vec::with_capacity(cli_len as usize); - let mut opts_lens = Vec::with_capacity(cli_len as usize); + let mut jni_strings = Vec::with_capacity(cli_len); + let mut opts = Vec::with_capacity(cli_len); + let mut opts_lens = Vec::with_capacity(cli_len); - for n in 0..cli_len { + for n in 0..cli_len as i32 { let elem = env.get_object_array_element(cli, n).expect("invalid Java bindings"); let elem_str: JString = elem.into(); match env.get_string(elem_str) { @@ -77,7 +76,7 @@ pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_configFromCli(env: opts.push(s.as_ptr()); opts_lens.push(s.to_bytes().len()); jni_strings.push(s); - }, + } Err(err) => { let _ = env.throw_new("java/lang/Exception", err.to_string()); return 0 @@ -86,7 +85,7 @@ pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_configFromCli(env: } let mut out = ptr::null_mut(); - match parity_config_from_cli(opts.as_ptr(), opts_lens.as_ptr(), cli_len as usize, &mut out) { + match parity_config_from_cli(opts.as_ptr(), opts_lens.as_ptr(), cli_len, &mut out) { 0 => out as jlong, _ => { let _ = env.throw_new("java/lang/Exception", "failed to create config object"); @@ -120,7 +119,7 @@ pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_build( _ => { let _ = env.throw_new("java/lang/Exception", "failed to start Parity"); 0 - }, + } } } @@ -129,7 +128,7 @@ pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_destroy(_env: JNIEn parity_destroy(parity); } -unsafe fn async_checker<'a>(client: va_list, rpc: JString, callback: JObject, env: &JNIEnv<'a>) +unsafe fn java_query_checker<'a>(client: va_list, rpc: JString, callback: JObject, env: &JNIEnv<'a>) -> Result, String> { let query: String = env.get_string(rpc) .map(Into::into) @@ -151,26 +150,10 @@ pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_rpcQueryNative( callback: JObject, ) { - let _ = async_checker(parity, rpc, callback, &env) + let _ = java_query_checker(parity, rpc, callback, &env) .map(|(client, query, jvm, global_ref)| { - let callback = Arc::new(Callback::new(jvm, global_ref)); - let cb = callback.clone(); - let future = client.rpc_query(&query, None).map(move |response| { - let response = response.unwrap_or_else(|| error::EMPTY.to_string()); - callback.call(&response); - }); - - let _handle = thread::Builder::new() - .name("rpc_query".to_string()) - .spawn(move || { - let mut current_thread = CurrentThread::new(); - current_thread.spawn(future); - let _ = current_thread.run_timeout(Duration::from_millis(timeout_ms as u64)) - .map_err(|_e| { - cb.call(error::TIMEOUT); - }); - }) - .expect("rpc-query thread shouldn't fail; qed"); + let callback = Arc::new(JavaCallback::new(jvm, global_ref)); + parity_rpc_worker(client, &query, callback, timeout_ms as u64); }) .map_err(|e| { let _ = env.throw_new("java/lang/Exception", e); @@ -186,43 +169,10 @@ pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_subscribeWebSocketN callback: JObject, ) -> va_list { - async_checker(parity, rpc, callback, &env) + java_query_checker(parity, rpc, callback, &env) .map(move |(client, query, jvm, global_ref)| { - let callback = Arc::new(Callback::new(jvm, global_ref)); - let (tx, mut rx) = mpsc::channel(1); - let session = Arc::new(PubSubSession::new(tx)); - let weak_session = Arc::downgrade(&session); - let query_future = client.rpc_query(&query, Some(session.clone()));; - - let _handle = thread::Builder::new() - .name("ws-subscriber".into()) - .spawn(move || { - // Wait for subscription ID - // Note this may block forever and can't be destroyed using the session object - // However, this will likely timeout or be catched the RPC layer - if let Ok(Some(response)) = query_future.wait() { - callback.call(&response); - } else { - callback.call(error::SUBSCRIBE); - return; - }; - - loop { - for response in rx.by_ref().wait() { - if let Ok(r) = response { - callback.call(&r); - } - } - - let rc = weak_session.upgrade().map_or(0,|session| Arc::strong_count(&session)); - // No subscription left, then terminate - if rc <= 1 { - break; - } - } - }) - .expect("rpc-subscriber thread shouldn't fail; qed"); - Arc::into_raw(session) as va_list + let callback = Arc::new(JavaCallback::new(jvm, global_ref)); + parity_ws_worker(client, &query, callback) as va_list }) .unwrap_or_else(|e| { let _ = env.throw_new("java/lang/Exception", e); diff --git a/parity-clib/src/lib.rs b/parity-clib/src/lib.rs index eca6edcd696..bbb60ec2d26 100644 --- a/parity-clib/src/lib.rs +++ b/parity-clib/src/lib.rs @@ -40,7 +40,7 @@ use futures::sync::mpsc; use parity_ethereum::{PubSubSession, RunningClient}; use tokio_current_thread::CurrentThread; -type Callback = Option; +type CCallback = Option; type CheckedQuery<'a> = (&'a RunningClient, &'static str); pub mod error { @@ -52,11 +52,33 @@ pub mod error { #[repr(C)] pub struct ParityParams { pub configuration: *mut c_void, - pub on_client_restart_cb: Callback, + pub on_client_restart_cb: CCallback, pub on_client_restart_cb_custom: *mut c_void, pub logger: *mut c_void } +/// Trait representing a callback that passes a string +pub(crate) trait Callback: Send + Sync { + fn call(&self, msg: &str); +} + +// Internal structure for handling callbacks that get passed a string. +struct CallbackStr { + user_data: *mut c_void, + function: CCallback, +} + +unsafe impl Send for CallbackStr {} +unsafe impl Sync for CallbackStr {} +impl Callback for CallbackStr { + fn call(&self, msg: &str) { + if let Some(ref cb) = self.function { + let cstr = CString::new(msg).expect("valid string with no nul bytes in the middle; qed").into_raw(); + cb(self.user_data, cstr, msg.len()) + } + } +} + #[no_mangle] pub unsafe extern fn parity_config_from_cli( args: *const *const c_char, @@ -112,7 +134,6 @@ pub unsafe extern fn parity_start(cfg: *const ParityParams, output: *mut *mut c_ panic::catch_unwind(|| { *output = ptr::null_mut(); let cfg: &ParityParams = &*cfg; - let logger = Arc::from_raw(cfg.logger as *mut parity_ethereum::RotatingLogger); let config = Box::from_raw(cfg.configuration as *mut parity_ethereum::Configuration); @@ -121,7 +142,7 @@ pub unsafe extern fn parity_start(cfg: *const ParityParams, output: *mut *mut c_ user_data: cfg.on_client_restart_cb_custom, function: cfg.on_client_restart_cb, }; - move |new_chain: String| { cb.call(new_chain.as_bytes()); } + move |new_chain: String| { cb.call(&new_chain); } }; let action = match parity_ethereum::start(*config, logger, on_client_restart_cb, || {}) { @@ -133,7 +154,7 @@ pub unsafe extern fn parity_start(cfg: *const ParityParams, output: *mut *mut c_ parity_ethereum::ExecutionAction::Instant(Some(s)) => { println!("{}", s); 0 }, parity_ethereum::ExecutionAction::Instant(None) => 0, parity_ethereum::ExecutionAction::Running(client) => { - *output = Box::into_raw(Box::::new(client)) as *mut c_void; + *output = Box::into_raw(Box::new(client)) as *mut c_void; 0 } } @@ -148,47 +169,19 @@ pub unsafe extern fn parity_destroy(client: *mut c_void) { }); } -unsafe fn parity_rpc_query_checker<'a>(client: *const c_void, query: *const c_char, len: usize) - -> Option> -{ - let query_str = { - let string = slice::from_raw_parts(query as *const u8, len); - str::from_utf8(string).ok()? - }; - let client: &RunningClient = &*(client as *const RunningClient); - Some((client, query_str)) -} - #[no_mangle] pub unsafe extern fn parity_rpc( client: *const c_void, query: *const c_char, len: usize, timeout_ms: usize, - callback: Callback, + callback: CCallback, user_data: *mut c_void, ) -> c_int { panic::catch_unwind(|| { if let Some((client, query)) = parity_rpc_query_checker(client, query, len) { - let client = client as &RunningClient; let callback = Arc::new(CallbackStr {user_data, function: callback} ); - let cb = callback.clone(); - let query = client.rpc_query(query, None).map(move |response| { - let response = response.unwrap_or_else(|| error::EMPTY.to_string()); - callback.call(response.as_bytes()); - }); - - let _handle = thread::Builder::new() - .name("rpc_query".to_string()) - .spawn(move || { - let mut current_thread = CurrentThread::new(); - current_thread.spawn(query); - let _ = current_thread.run_timeout(Duration::from_millis(timeout_ms as u64)) - .map_err(|_e| { - cb.call(error::TIMEOUT.as_bytes()); - }); - }) - .expect("rpc-query thread shouldn't fail; qed"); + parity_rpc_worker(client, query, callback, timeout_ms as u64); 0 } else { 1 @@ -201,47 +194,13 @@ pub unsafe extern fn parity_subscribe_ws( client: *const c_void, query: *const c_char, len: usize, - callback: Callback, + callback: CCallback, user_data: *mut c_void, ) -> *const c_void { - panic::catch_unwind(|| { if let Some((client, query)) = parity_rpc_query_checker(client, query, len) { - let (tx, mut rx) = mpsc::channel(1); - let session = Arc::new(PubSubSession::new(tx)); - let query_future = client.rpc_query(query, Some(session.clone())); - let weak_session = Arc::downgrade(&session); - let cb = CallbackStr { user_data, function: callback}; - - let _handle = thread::Builder::new() - .name("ws-subscriber".into()) - .spawn(move || { - // Wait for subscription ID - // Note this may block forever and be can't destroyed using the session object - // However, this will likely timeout or be catched the RPC layer - if let Ok(Some(response)) = query_future.wait() { - cb.call(response.as_bytes()); - } else { - cb.call(error::SUBSCRIBE.as_bytes()); - return; - } - - loop { - for response in rx.by_ref().wait() { - if let Ok(r) = response { - cb.call(r.as_bytes()); - } - } - - let rc = weak_session.upgrade().map_or(0,|session| Arc::strong_count(&session)); - // No subscription left, then terminate - if rc <= 1 { - break; - } - } - }) - .expect("rpc-subscriber thread shouldn't fail; qed"); - Arc::into_raw(session) as *const c_void + let callback = Arc::new(CallbackStr { user_data, function: callback}); + parity_ws_worker(client, query, callback) } else { ptr::null() } @@ -257,10 +216,10 @@ pub unsafe extern fn parity_unsubscribe_ws(session: *const c_void) { } #[no_mangle] -pub unsafe extern fn parity_set_panic_hook(callback: Callback, param: *mut c_void) { +pub extern fn parity_set_panic_hook(callback: CCallback, param: *mut c_void) { let cb = CallbackStr {user_data: param, function: callback}; panic_hook::set_with(move |panic_msg| { - cb.call(panic_msg.as_bytes()); + cb.call(panic_msg); }); } @@ -283,19 +242,63 @@ pub unsafe extern fn parity_set_logger( *logger = Arc::into_raw(parity_ethereum::setup_log(&logger_cfg).expect("Logger initialized only once; qed")) as *mut _; } -// Internal structure for handling callbacks that get passed a string. -struct CallbackStr { - user_data: *mut c_void, - function: Callback, +// WebSocket event loop +fn parity_ws_worker(client: &RunningClient, query: &str, callback: Arc) -> *const c_void { + let (tx, mut rx) = mpsc::channel(1); + let session = Arc::new(PubSubSession::new(tx)); + let query_future = client.rpc_query(query, Some(session.clone())); + let weak_session = Arc::downgrade(&session); + let _handle = thread::Builder::new() + .name("ws-subscriber".into()) + .spawn(move || { + // Wait for subscription ID + // Note this may block forever and be can't destroyed using the session object + // However, this will likely timeout or be catched the RPC layer + if let Ok(Some(response)) = query_future.wait() { + callback.call(&response); + } else { + callback.call(error::SUBSCRIBE); + return; + } + + while weak_session.upgrade().map_or(0, |session| Arc::strong_count(&session)) > 1 { + for response in rx.by_ref().wait() { + if let Ok(r) = response { + callback.call(&r); + } + } + } + }) + .expect("rpc-subscriber thread shouldn't fail; qed"); + Arc::into_raw(session) as *const c_void } -unsafe impl Send for CallbackStr {} -unsafe impl Sync for CallbackStr {} -impl CallbackStr { - fn call(&self, msg: &[u8]) { - if let Some(ref cb) = self.function { - let cstr = CString::new(msg).expect("valid string with no null bytes in the middle; qed").into_raw(); - cb(self.user_data, cstr, msg.len()) - } - } +// RPC event loop that runs for at most `timeout_ms` +fn parity_rpc_worker(client: &RunningClient, query: &str, callback: Arc, timeout_ms: u64) { + let cb = callback.clone(); + let query = client.rpc_query(query, None).map(move |response| { + let response = response.unwrap_or_else(|| error::EMPTY.to_string()); + callback.call(&response); + }); + + let _handle = thread::Builder::new() + .name("rpc_query".to_string()) + .spawn(move || { + let mut current_thread = CurrentThread::new(); + current_thread.spawn(query); + let _ = current_thread + .run_timeout(Duration::from_millis(timeout_ms)) + .map_err(|_e| { + cb.call(error::TIMEOUT); + }); + }) + .expect("rpc-query thread shouldn't fail; qed"); +} + +unsafe fn parity_rpc_query_checker<'a>(client: *const c_void, query: *const c_char, len: usize) + -> Option> +{ + let query_str = str::from_utf8(slice::from_raw_parts(query as *const u8, len)).ok()?; + let client: &RunningClient = &*(client as *const RunningClient); + Some((client, query_str)) } diff --git a/parity/light_helpers/epoch_fetch.rs b/parity/light_helpers/epoch_fetch.rs index ecf94860891..01e74059ea5 100644 --- a/parity/light_helpers/epoch_fetch.rs +++ b/parity/light_helpers/epoch_fetch.rs @@ -18,7 +18,7 @@ use std::sync::{Arc, Weak}; use ethcore::engines::{EthEngine, StateDependentProof}; use ethcore::machine::EthereumMachine; -use sync::LightSync; +use sync::{LightSync, LightNetworkDispatcher}; use types::encoded; use types::header::Header; use types::receipt::Receipt; diff --git a/parity/light_helpers/queue_cull.rs b/parity/light_helpers/queue_cull.rs index c919511ba5c..ec1ca612b80 100644 --- a/parity/light_helpers/queue_cull.rs +++ b/parity/light_helpers/queue_cull.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use std::time::Duration; use ethcore::client::ClientIoMessage; -use sync::LightSync; +use sync::{LightSync, LightNetworkDispatcher}; use io::{IoContext, IoHandler, TimerToken}; use light::client::LightChainClient; diff --git a/parity/rpc.rs b/parity/rpc.rs index 57bb584775d..b07ca3f3e48 100644 --- a/parity/rpc.rs +++ b/parity/rpc.rs @@ -151,7 +151,7 @@ pub fn new_ws( let url = format!("{}:{}", conf.interface, conf.port); let addr = url.parse().map_err(|_| format!("Invalid WebSockets listen host/port given: {}", url))?; - let full_handler = setup_apis(rpc_apis::ApiSet::SafeContext, deps); + let full_handler = setup_apis(rpc_apis::ApiSet::All, deps); let handler = { let mut handler = MetaIoHandler::with_middleware(( rpc::WsDispatcher::new(full_handler), diff --git a/parity/rpc_apis.rs b/parity/rpc_apis.rs index 951a4dec4bc..4413e6a779a 100644 --- a/parity/rpc_apis.rs +++ b/parity/rpc_apis.rs @@ -113,11 +113,9 @@ impl FromStr for Api { #[derive(Debug, Clone)] pub enum ApiSet { - // Safe context (like token-protected WS interface) - SafeContext, // Unsafe context (like jsonrpc over http) UnsafeContext, - // All possible APIs + // All possible APIs (safe context like token-protected WS interface) All, // Local "unsafe" context and accounts access IpcContext, @@ -723,16 +721,6 @@ impl ApiSet { public_list.insert(Api::ParityAccounts); public_list } - ApiSet::SafeContext => { - public_list.insert(Api::Debug); - public_list.insert(Api::Traces); - public_list.insert(Api::ParityPubSub); - public_list.insert(Api::ParityAccounts); - public_list.insert(Api::ParitySet); - public_list.insert(Api::Signer); - public_list.insert(Api::SecretStore); - public_list - } ApiSet::All => { public_list.insert(Api::Debug); public_list.insert(Api::Traces); @@ -838,33 +826,6 @@ mod test { assert_eq!(ApiSet::IpcContext.list_apis(), expected); } - #[test] - fn test_api_set_safe_context() { - let expected = vec![ - // safe - Api::Web3, - Api::Net, - Api::Eth, - Api::EthPubSub, - Api::Parity, - Api::ParityPubSub, - Api::Traces, - Api::Rpc, - Api::SecretStore, - Api::Whisper, - Api::WhisperPubSub, - Api::Private, - // semi-safe - Api::ParityAccounts, - // Unsafe - Api::ParitySet, - Api::Signer, - Api::Debug, - ].into_iter() - .collect(); - assert_eq!(ApiSet::SafeContext.list_apis(), expected); - } - #[test] fn test_all_apis() { assert_eq!( diff --git a/rpc/src/v1/helpers/dispatch/light.rs b/rpc/src/v1/helpers/dispatch/light.rs index ff47eaa7ac9..2913e52c85c 100644 --- a/rpc/src/v1/helpers/dispatch/light.rs +++ b/rpc/src/v1/helpers/dispatch/light.rs @@ -23,7 +23,7 @@ use light::client::LightChainClient; use light::on_demand::{request, OnDemand}; use parking_lot::{Mutex, RwLock}; use stats::Corpus; -use sync::LightSync; +use sync::{LightSyncProvider, LightNetworkDispatcher, ManageNetwork}; use types::basic_account::BasicAccount; use types::ids::BlockId; use types::transaction::{SignedTransaction, PendingTransaction, Error as TransactionError}; @@ -37,10 +37,9 @@ use v1::types::{RichRawTransaction as RpcRichRawTransaction,}; use super::{Dispatcher, Accounts, SignWith, PostSign}; /// Dispatcher for light clients -- fetches default gas price, next nonce, etc. from network. -#[derive(Clone)] -pub struct LightDispatcher { +pub struct LightDispatcher { /// Sync service. - pub sync: Arc, + pub sync: Arc, /// Header chain client. pub client: Arc, /// On-demand request service. @@ -55,12 +54,15 @@ pub struct LightDispatcher { pub gas_price_percentile: usize, } -impl LightDispatcher { +impl LightDispatcher +where + S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static +{ /// Create a new `LightDispatcher` from its requisite parts. /// /// For correct operation, the OnDemand service is assumed to be registered as a network handler, pub fn new( - sync: Arc, + sync: Arc, client: Arc, on_demand: Arc, cache: Arc>, @@ -115,7 +117,27 @@ impl LightDispatcher { } } -impl Dispatcher for LightDispatcher { +impl Clone for LightDispatcher +where + S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static +{ + fn clone(&self) -> Self { + Self { + sync: self.sync.clone(), + client: self.client.clone(), + on_demand: self.on_demand.clone(), + cache: self.cache.clone(), + transaction_queue: self.transaction_queue.clone(), + nonces: self.nonces.clone(), + gas_price_percentile: self.gas_price_percentile + } + } +} + +impl Dispatcher for LightDispatcher +where + S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static +{ // Ignore the `force_nonce` flag in order to always query the network when fetching the nonce and // the account state. If the nonce is specified in the transaction use that nonce instead but do the // network request anyway to the account state (balance) @@ -217,8 +239,8 @@ impl Dispatcher for LightDispatcher { /// Get a recent gas price corpus. // TODO: this could be `impl Trait`. -pub fn fetch_gas_price_corpus( - sync: Arc, +pub fn fetch_gas_price_corpus( + sync: Arc, client: Arc, on_demand: Arc, cache: Arc>, diff --git a/rpc/src/v1/helpers/light_fetch.rs b/rpc/src/v1/helpers/light_fetch.rs index 5a73c04be0b..3ac17c2fd8d 100644 --- a/rpc/src/v1/helpers/light_fetch.rs +++ b/rpc/src/v1/helpers/light_fetch.rs @@ -17,6 +17,7 @@ //! Helpers for fetching blockchain data either from the light client or the network. use std::cmp; +use std::clone::Clone; use std::sync::Arc; use types::basic_account::BasicAccount; @@ -40,7 +41,9 @@ use light::on_demand::{ use light::on_demand::error::Error as OnDemandError; use light::request::Field; -use sync::LightSync; + +use sync::{LightNetworkDispatcher, ManageNetwork, LightSyncProvider}; + use ethereum_types::{U256, Address}; use hash::H256; use parking_lot::Mutex; @@ -52,10 +55,12 @@ use v1::helpers::{CallRequest as CallRequestHelper, errors, dispatch}; use v1::types::{BlockNumber, CallRequest, Log, Transaction}; const NO_INVALID_BACK_REFS_PROOF: &str = "Fails only on invalid back-references; back-references here known to be valid; qed"; - const WRONG_RESPONSE_AMOUNT_TYPE_PROOF: &str = "responses correspond directly with requests in amount and type; qed"; -pub fn light_all_transactions(dispatch: &Arc) -> impl Iterator { +pub fn light_all_transactions(dispatch: &Arc>) -> impl Iterator +where + S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static +{ let txq = dispatch.transaction_queue.read(); let chain_info = dispatch.client.chain_info(); @@ -66,20 +71,36 @@ pub fn light_all_transactions(dispatch: &Arc) -> impl /// Helper for fetching blockchain data either from the light client or the network /// as necessary. -#[derive(Clone)] -pub struct LightFetch { +pub struct LightFetch +{ /// The light client. pub client: Arc, /// The on-demand request service. pub on_demand: Arc, /// Handle to the network. - pub sync: Arc, + pub sync: Arc, /// The light data cache. pub cache: Arc>, /// Gas Price percentile pub gas_price_percentile: usize, } +impl Clone for LightFetch +where + S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static +{ + fn clone(&self) -> Self { + Self { + client: self.client.clone(), + on_demand: self.on_demand.clone(), + sync: self.sync.clone(), + cache: self.cache.clone(), + gas_price_percentile: self.gas_price_percentile + } + } +} + + /// Extract a transaction at given index. pub fn extract_transaction_at_index(block: encoded::Block, index: usize) -> Option { block.transactions().into_iter().nth(index) @@ -115,7 +136,10 @@ fn extract_header(res: &[OnDemandResponse], header: HeaderRef) -> Option LightFetch +where + S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static +{ // push the necessary requests onto the request chain to get the header by the given ID. // yield a header reference which other requests can use. fn make_header_requests(&self, id: BlockId, reqs: &mut Vec) -> Result { @@ -635,20 +659,42 @@ impl LightFetch { } } -#[derive(Clone)] -struct ExecuteParams { +struct ExecuteParams +where + S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static +{ from: Address, tx: EthTransaction, hdr: encoded::Header, env_info: ::vm::EnvInfo, engine: Arc<::ethcore::engines::EthEngine>, on_demand: Arc, - sync: Arc, + sync: Arc, +} + +impl Clone for ExecuteParams +where + S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static +{ + fn clone(&self) -> Self { + Self { + from: self.from.clone(), + tx: self.tx.clone(), + hdr: self.hdr.clone(), + env_info: self.env_info.clone(), + engine: self.engine.clone(), + on_demand: self.on_demand.clone(), + sync: self.sync.clone() + } + } } // Has a peer execute the transaction with given params. If `gas_known` is false, this will set the `gas value` to the // `required gas value` unless it exceeds the block gas limit -fn execute_read_only_tx(gas_known: bool, params: ExecuteParams) -> impl Future + Send { +fn execute_read_only_tx(gas_known: bool, params: ExecuteParams) -> impl Future + Send +where + S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static +{ if !gas_known { Box::new(future::loop_fn(params, |mut params| { execute_read_only_tx(true, params.clone()).and_then(move |res| { diff --git a/rpc/src/v1/impls/eth_pubsub.rs b/rpc/src/v1/impls/eth_pubsub.rs index 4d452a4791b..b4baf5aa4d7 100644 --- a/rpc/src/v1/impls/eth_pubsub.rs +++ b/rpc/src/v1/impls/eth_pubsub.rs @@ -36,7 +36,9 @@ use light::client::{LightChainClient, LightChainNotify}; use light::on_demand::OnDemand; use parity_runtime::Executor; use parking_lot::{RwLock, Mutex}; -use sync::LightSync; + +use sync::{LightSyncProvider, LightNetworkDispatcher, ManageNetwork}; + use types::encoded; use types::filter::Filter as EthFilter; @@ -87,12 +89,15 @@ impl EthPubSubClient { } } -impl EthPubSubClient { +impl EthPubSubClient> +where + S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static +{ /// Creates a new `EthPubSubClient` for `LightClient`. pub fn light( client: Arc, on_demand: Arc, - sync: Arc, + sync: Arc, cache: Arc>, executor: Executor, gas_price_percentile: usize, @@ -189,7 +194,10 @@ pub trait LightClient: Send + Sync { fn logs(&self, filter: EthFilter) -> BoxFuture>; } -impl LightClient for LightFetch { +impl LightClient for LightFetch +where + S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static +{ fn block_header(&self, id: BlockId) -> Option { self.client.block_header(id) } @@ -200,10 +208,7 @@ impl LightClient for LightFetch { } impl LightChainNotify for ChainNotificationHandler { - fn new_headers( - &self, - enacted: &[H256], - ) { + fn new_headers(&self, enacted: &[H256]) { let headers = enacted .iter() .filter_map(|hash| self.client.block_header(BlockId::Hash(*hash))) diff --git a/rpc/src/v1/impls/light/eth.rs b/rpc/src/v1/impls/light/eth.rs index 2dd1943bee1..e13f5621945 100644 --- a/rpc/src/v1/impls/light/eth.rs +++ b/rpc/src/v1/impls/light/eth.rs @@ -32,7 +32,6 @@ use ethereum_types::{U256, Address}; use hash::{KECCAK_NULL_RLP, KECCAK_EMPTY_LIST_RLP}; use parking_lot::{RwLock, Mutex}; use rlp::Rlp; -use sync::LightSync; use types::transaction::SignedTransaction; use types::encoded; use types::filter::Filter as EthcoreFilter; @@ -45,19 +44,22 @@ use v1::helpers::deprecated::{self, DeprecationNotice}; use v1::helpers::light_fetch::{self, LightFetch}; use v1::traits::Eth; use v1::types::{ - RichBlock, Block, BlockTransactions, BlockNumber, LightBlockNumber, Bytes, SyncStatus, SyncInfo, + RichBlock, Block, BlockTransactions, BlockNumber, LightBlockNumber, Bytes, + SyncStatus as RpcSyncStatus, SyncInfo as RpcSyncInfo, Transaction, CallRequest, Index, Filter, Log, Receipt, Work, EthAccount, H64 as RpcH64, H256 as RpcH256, H160 as RpcH160, U256 as RpcU256, U64 as RpcU64, }; use v1::metadata::Metadata; +use sync::{LightSyncInfo, LightSyncProvider, LightNetworkDispatcher, ManageNetwork}; + const NO_INVALID_BACK_REFS: &str = "Fails only on invalid back-references; back-references here known to be valid; qed"; /// Light client `ETH` (and filter) RPC. -pub struct EthClient { - sync: Arc, - client: Arc, +pub struct EthClient { + sync: Arc, + client: Arc, on_demand: Arc, transaction_queue: Arc>, accounts: Arc Vec
+ Send + Sync>, @@ -68,7 +70,10 @@ pub struct EthClient { deprecation_notice: DeprecationNotice, } -impl Clone for EthClient { +impl Clone for EthClient +where + S: LightSyncProvider + LightNetworkDispatcher + 'static +{ fn clone(&self) -> Self { // each instance should have its own poll manager. EthClient { @@ -86,12 +91,16 @@ impl Clone for EthClient { } } -impl EthClient { +impl EthClient +where + C: LightChainClient + 'static, + S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static +{ /// Create a new `EthClient` with a handle to the light sync instance, client, /// and on-demand request service, which is assumed to be attached as a handler. pub fn new( - sync: Arc, - client: Arc, + sync: Arc, + client: Arc, on_demand: Arc, transaction_queue: Arc>, accounts: Arc Vec
+ Send + Sync>, @@ -114,7 +123,8 @@ impl EthClient { } /// Create a light data fetcher instance. - fn fetcher(&self) -> LightFetch { + fn fetcher(&self) -> LightFetch + { LightFetch { client: self.client.clone(), on_demand: self.on_demand.clone(), @@ -211,21 +221,25 @@ impl EthClient { } } -impl Eth for EthClient { +impl Eth for EthClient +where + C: LightChainClient + 'static, + S: LightSyncInfo + LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static +{ type Metadata = Metadata; fn protocol_version(&self) -> Result { Ok(format!("{}", ::light::net::MAX_PROTOCOL_VERSION)) } - fn syncing(&self) -> Result { + fn syncing(&self) -> Result { if self.sync.is_major_importing() { let chain_info = self.client.chain_info(); let current_block = U256::from(chain_info.best_block_number); let highest_block = self.sync.highest_block().map(U256::from) .unwrap_or_else(|| current_block); - Ok(SyncStatus::Info(SyncInfo { + Ok(RpcSyncStatus::Info(RpcSyncInfo { starting_block: U256::from(self.sync.start_block()).into(), current_block: current_block.into(), highest_block: highest_block.into(), @@ -233,7 +247,7 @@ impl Eth for EthClient { warp_chunks_processed: None, })) } else { - Ok(SyncStatus::None) + Ok(RpcSyncStatus::None) } } @@ -524,7 +538,11 @@ impl Eth for EthClient { } // This trait implementation triggers a blanked impl of `EthFilter`. -impl Filterable for EthClient { +impl Filterable for EthClient +where + C: LightChainClient + 'static, + S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static +{ fn best_block_number(&self) -> u64 { self.client.chain_info().best_block_number } fn block_hash(&self, id: BlockId) -> Option<::ethereum_types::H256> { diff --git a/rpc/src/v1/impls/light/parity.rs b/rpc/src/v1/impls/light/parity.rs index 3bb88b509ea..f744095cbaf 100644 --- a/rpc/src/v1/impls/light/parity.rs +++ b/rpc/src/v1/impls/light/parity.rs @@ -23,7 +23,7 @@ use version::version_data; use crypto::DEFAULT_MAC; use ethkey::{crypto::ecies, Brain, Generator}; use ethstore::random_phrase; -use sync::LightSyncProvider; +use sync::{LightSyncInfo, LightSyncProvider, LightNetworkDispatcher, ManageNetwork}; use ethcore_logger::RotatingLogger; use jsonrpc_core::{Result, BoxFuture}; @@ -46,8 +46,8 @@ use v1::types::{ use Host; /// Parity implementation for light client. -pub struct ParityClient { - light_dispatch: Arc, +pub struct ParityClient { + light_dispatch: Arc>, logger: Arc, settings: Arc, signer: Option>, @@ -55,10 +55,13 @@ pub struct ParityClient { gas_price_percentile: usize, } -impl ParityClient { +impl ParityClient +where + S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static +{ /// Creates new `ParityClient`. pub fn new( - light_dispatch: Arc, + light_dispatch: Arc>, logger: Arc, settings: Arc, signer: Option>, @@ -76,7 +79,8 @@ impl ParityClient { } /// Create a light blockchain data fetcher. - fn fetcher(&self) -> LightFetch { + fn fetcher(&self) -> LightFetch + { LightFetch { client: self.light_dispatch.client.clone(), on_demand: self.light_dispatch.on_demand.clone(), @@ -87,7 +91,10 @@ impl ParityClient { } } -impl Parity for ParityClient { +impl Parity for ParityClient +where + S: LightSyncInfo + LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static +{ type Metadata = Metadata; fn transactions_limit(&self) -> Result { @@ -371,7 +378,7 @@ impl Parity for ParityClient { fn status(&self) -> Result<()> { let has_peers = self.settings.is_dev_chain || self.light_dispatch.sync.peer_numbers().connected > 0; - let is_importing = self.light_dispatch.sync.is_major_importing(); + let is_importing = (*self.light_dispatch.sync).is_major_importing(); if has_peers && !is_importing { Ok(()) diff --git a/rpc/src/v1/impls/parity_set.rs b/rpc/src/v1/impls/parity_set.rs index fc839d621a9..3e0d2697452 100644 --- a/rpc/src/v1/impls/parity_set.rs +++ b/rpc/src/v1/impls/parity_set.rs @@ -118,9 +118,11 @@ impl ParitySet for ParitySetClient where F: Fetch + 'static, { - fn set_min_gas_price(&self, _gas_price: U256) -> Result { - warn!("setMinGasPrice is deprecated. Ignoring request."); - Ok(false) + fn set_min_gas_price(&self, gas_price: U256) -> Result { + match self.miner.set_minimal_gas_price(gas_price.into()) { + Ok(success) => Ok(success), + Err(e) => Err(errors::unsupported(e, None)), + } } fn set_transactions_limit(&self, _limit: usize) -> Result { diff --git a/rpc/src/v1/tests/helpers/miner_service.rs b/rpc/src/v1/tests/helpers/miner_service.rs index f9b649fa62e..77618c85328 100644 --- a/rpc/src/v1/tests/helpers/miner_service.rs +++ b/rpc/src/v1/tests/helpers/miner_service.rs @@ -49,6 +49,8 @@ pub struct TestMinerService { pub pending_receipts: Mutex>, /// Next nonces. pub next_nonces: RwLock>, + /// Minimum gas price + pub min_gas_price: RwLock>, /// Signer (if any) pub signer: RwLock>>, @@ -63,6 +65,7 @@ impl Default for TestMinerService { local_transactions: Default::default(), pending_receipts: Default::default(), next_nonces: Default::default(), + min_gas_price: RwLock::new(Some(0.into())), authoring_params: RwLock::new(AuthoringParams { author: Address::zero(), gas_range_target: (12345.into(), 54321.into()), @@ -279,4 +282,18 @@ impl MinerService for TestMinerService { fn sensible_gas_limit(&self) -> U256 { 0x5208.into() } + + fn set_minimal_gas_price(&self, gas_price: U256) -> Result { + let mut new_price = self.min_gas_price.write(); + match *new_price { + Some(ref mut v) => { + *v = gas_price; + Ok(true) + }, + None => { + let error_msg = "Can't update fixed gas price while automatic gas calibration is enabled."; + Err(error_msg) + }, + } + } } diff --git a/rpc/src/v1/tests/mocked/parity_set.rs b/rpc/src/v1/tests/mocked/parity_set.rs index 13473fbdf85..25c13fb1cb6 100644 --- a/rpc/src/v1/tests/mocked/parity_set.rs +++ b/rpc/src/v1/tests/mocked/parity_set.rs @@ -112,7 +112,25 @@ fn rpc_parity_set_min_gas_price() { io.extend_with(parity_set_client(&client, &miner, &updater, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "parity_setMinGasPrice", "params":["0xcd1722f3947def4cf144679da39c4c32bdc35681"], "id": 1}"#; - let response = r#"{"jsonrpc":"2.0","result":false,"id":1}"#; + let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#; + + assert_eq!(io.handle_request_sync(request), Some(response.to_owned())); +} + +#[test] +fn rpc_parity_set_min_gas_price_with_automated_calibration_enabled() { + let miner = miner_service(); + *miner.min_gas_price.write() = None; + + let client = client_service(); + let network = network_service(); + let updater = updater_service(); + + let mut io = IoHandler::new(); + io.extend_with(parity_set_client(&client, &miner, &updater, &network).to_delegate()); + + let request = r#"{"jsonrpc": "2.0", "method": "parity_setMinGasPrice", "params":["0xdeadbeef"], "id": 1}"#; + let response = r#"{"jsonrpc":"2.0","error":{"code":-32000,"message":"Can't update fixed gas price while automatic gas calibration is enabled."},"id":1}"#; assert_eq!(io.handle_request_sync(request), Some(response.to_owned())); } diff --git a/scripts/docker/hub/Dockerfile b/scripts/docker/hub/Dockerfile index d40a662111f..a18cacf9cc9 100644 --- a/scripts/docker/hub/Dockerfile +++ b/scripts/docker/hub/Dockerfile @@ -1,8 +1,5 @@ FROM ubuntu:xenial -MAINTAINER Parity Technologies -#set ENVIROMENT -ARG TARGET -ENV TARGET ${TARGET} +LABEL MAINTAINER="Parity Technologies " # install tools and dependencies RUN apt update && apt install -y --no-install-recommends openssl libudev-dev file curl jq @@ -10,27 +7,24 @@ RUN apt update && apt install -y --no-install-recommends openssl libudev-dev fil # show backtraces ENV RUST_BACKTRACE 1 -#cleanup Docker image -RUN apt autoremove -y -RUN apt clean -y -RUN rm -rf /tmp/* /var/tmp/* /var/lib/apt/lists/* +# cleanup Docker image +RUN apt autoremove -y \ + && apt clean -y \ + && rm -rf /tmp/* /var/tmp/* /var/lib/apt/lists/* RUN groupadd -g 1000 parity \ && useradd -m -u 1000 -g parity -s /bin/sh parity +WORKDIR /home/parity -#add TARGET to docker image -COPY artifacts/x86_64-unknown-linux-gnu/$TARGET /bin/$TARGET - -# Build a shell script because the ENTRYPOINT command doesn't like using ENV -RUN echo "#!/bin/bash \n ${TARGET} \$@" > ./entrypoint.sh -RUN chmod +x ./entrypoint.sh +# add parity-ethereum to docker image +COPY artifacts/x86_64-unknown-linux-gnu/parity /bin/parity COPY scripts/docker/hub/check_sync.sh /check_sync.sh # switch to user parity here USER parity -# setup ENTRYPOINT EXPOSE 5001 8080 8082 8083 8545 8546 8180 30303/tcp 30303/udp -ENTRYPOINT ["./entrypoint.sh"] + +ENTRYPOINT ["/bin/parity"] diff --git a/scripts/gitlab/publish-snap.sh b/scripts/gitlab/publish-snap.sh index f001bbff0d8..19cbfa2e6b8 100755 --- a/scripts/gitlab/publish-snap.sh +++ b/scripts/gitlab/publish-snap.sh @@ -4,7 +4,7 @@ set -e # fail on any error set -u # treat unset variables as error # some necromancy: -# gsub(/"/, "", $2) deletes "qoutes" +# gsub(/"/, "", $2) deletes "qoutes" # gsub(/ /, "", $2) deletes whitespaces TRACK=`awk -F '=' '/^track/ {gsub(/"/, "", $2); gsub(/ /, "", $2); print $2}' ./util/version/Cargo.toml` echo Track is: $TRACK @@ -16,12 +16,25 @@ case ${TRACK} in *) echo "No release" && exit 0;; esac +VERSION="v"$VERSION SNAP_PACKAGE="parity_"$VERSION"_"$BUILD_ARCH".snap" echo "__________Create snap package__________" echo "Release channel :" $GRADE " Branch/tag: " $CI_COMMIT_REF_NAME echo $VERSION:$GRADE:$BUILD_ARCH -cat scripts/snap/snapcraft.template.yaml | envsubst '$VERSION:$GRADE:$BUILD_ARCH:$CARGO_TARGET' > snapcraft.yaml +# cat scripts/snap/snapcraft.template.yaml | envsubst '$VERSION:$GRADE:$BUILD_ARCH:$CARGO_TARGET' > snapcraft.yaml +# a bit more necromancy (substitutions): +pwd +cd /builds/$CI_PROJECT_PATH/scripts/snap/ +sed -e 's/$VERSION/'"$VERSION"'/g' \ + -e 's/$GRADE/'"$GRADE"'/g' \ + -e 's/$BUILD_ARCH/'"$BUILD_ARCH"'/g' \ + -e 's/$CARGO_TARGET/'"$CARGO_TARGET"'/g' \ + snapcraft.template.yaml > /builds/$CI_PROJECT_PATH/snapcraft.yaml +cd /builds/$CI_PROJECT_PATH +pwd +apt update +apt install -y --no-install-recommends rhash cat snapcraft.yaml snapcraft --target-arch=$BUILD_ARCH ls *.snap @@ -36,5 +49,8 @@ echo "Release channel :" $CHANNEL " Branch/tag: " $CI_COMMIT_REF_NAME echo $SNAPCRAFT_LOGIN_PARITY_BASE64 | base64 --decode > snapcraft.login snapcraft login --with snapcraft.login snapcraft push --release $CHANNEL $SNAP_PACKAGE +case ${CHANNEL} in + beta) snapcraft push --release candidate $SNAP_PACKAGE;; +esac snapcraft status parity snapcraft logout diff --git a/scripts/snap/snapcraft.template.yaml b/scripts/snap/snapcraft.template.yaml index eb67ba12820..d170241dbec 100644 --- a/scripts/snap/snapcraft.template.yaml +++ b/scripts/snap/snapcraft.template.yaml @@ -50,8 +50,4 @@ parts: cp -v ethkey $SNAPCRAFT_PART_INSTALL/usr/bin/ethkey cp -v ethstore $SNAPCRAFT_PART_INSTALL/usr/bin/ethstore cp -v whisper $SNAPCRAFT_PART_INSTALL/usr/bin/whisper - stage-packages: [libc6, libudev1, libstdc++6, cmake, libdb] - df: - plugin: nil - stage-packages: [coreutils] - stage: [bin/df] + stage-packages: [libc6, libudev1, libstdc++6, cmake, libdb5.3] diff --git a/util/network-devp2p/Cargo.toml b/util/network-devp2p/Cargo.toml index 5f1a5d33978..8bf402e206e 100644 --- a/util/network-devp2p/Cargo.toml +++ b/util/network-devp2p/Cargo.toml @@ -34,6 +34,7 @@ serde = "1.0" serde_json = "1.0" serde_derive = "1.0" error-chain = { version = "0.12", default-features = false } +lru-cache = "0.1" [dev-dependencies] env_logger = "0.5" diff --git a/util/network-devp2p/src/discovery.rs b/util/network-devp2p/src/discovery.rs index f6eaf494b99..7bf8dc62e5e 100644 --- a/util/network-devp2p/src/discovery.rs +++ b/util/network-devp2p/src/discovery.rs @@ -20,6 +20,7 @@ use std::collections::{HashSet, HashMap, VecDeque}; use std::collections::hash_map::Entry; use std::default::Default; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; +use lru_cache::LruCache; use hash::keccak; use ethereum_types::{H256, H520}; use rlp::{Rlp, RlpStream}; @@ -55,6 +56,8 @@ const REQUEST_BACKOFF: [Duration; 4] = [ const NODE_LAST_SEEN_TIMEOUT: Duration = Duration::from_secs(24*60*60); +const OBSERVED_NODES_MAX_SIZE: usize = 10_000; + #[derive(Clone, Debug)] pub struct NodeEntry { pub id: NodeId, @@ -95,7 +98,27 @@ struct FindNodeRequest { #[derive(Clone, Copy)] enum PingReason { Default, - FromDiscoveryRequest(NodeId) + FromDiscoveryRequest(NodeId, NodeValidity), +} + +#[derive(Clone, Copy, PartialEq)] +enum NodeCategory { + Bucket, + Observed +} + +#[derive(Clone, Copy, PartialEq)] +enum NodeValidity { + Ourselves, + ValidNode(NodeCategory), + ExpiredNode(NodeCategory), + UnknownNode +} + +#[derive(Debug)] +enum BucketError { + Ourselves, + NotInTheBucket{node_entry: NodeEntry, bucket_distance: usize}, } struct PingRequest { @@ -145,6 +168,12 @@ pub struct Discovery<'a> { discovery_id: NodeId, discovery_nodes: HashSet, node_buckets: Vec, + + // Sometimes we don't want to add nodes to the NodeTable, but still want to + // keep track of them to avoid excessive pinging (happens when an unknown node sends + // a discovery request to us -- the node might be on a different net). + other_observed_nodes: LruCache, + in_flight_pings: HashMap, in_flight_find_nodes: HashMap, send_queue: VecDeque, @@ -171,6 +200,7 @@ impl<'a> Discovery<'a> { discovery_id: NodeId::new(), discovery_nodes: HashSet::new(), node_buckets: (0..ADDRESS_BITS).map(|_| NodeBucket::new()).collect(), + other_observed_nodes: LruCache::new(OBSERVED_NODES_MAX_SIZE), in_flight_pings: HashMap::new(), in_flight_find_nodes: HashMap::new(), send_queue: VecDeque::new(), @@ -200,41 +230,53 @@ impl<'a> Discovery<'a> { } } - fn update_node(&mut self, e: NodeEntry) -> Option { - trace!(target: "discovery", "Inserting {:?}", &e); + fn update_bucket_record(&mut self, e: NodeEntry) -> Result<(), BucketError> { let id_hash = keccak(e.id); let dist = match Discovery::distance(&self.id_hash, &id_hash) { Some(dist) => dist, None => { debug!(target: "discovery", "Attempted to update own entry: {:?}", e); - return None; + return Err(BucketError::Ourselves); } }; + let bucket = &mut self.node_buckets[dist]; + bucket.nodes.iter_mut().find(|n| n.address.id == e.id) + .map_or(Err(BucketError::NotInTheBucket{node_entry: e.clone(), bucket_distance: dist}.into()), |entry| { + entry.address = e; + entry.last_seen = Instant::now(); + entry.backoff_until = Instant::now(); + entry.fail_count = 0; + Ok(()) + }) + } - let mut added_map = HashMap::new(); - let ping = { - let bucket = &mut self.node_buckets[dist]; - let updated = if let Some(node) = bucket.nodes.iter_mut().find(|n| n.address.id == e.id) { - node.address = e.clone(); - node.last_seen = Instant::now(); - node.backoff_until = Instant::now(); - node.fail_count = 0; - true - } else { false }; + fn update_node(&mut self, e: NodeEntry) -> Option { + trace!(target: "discovery", "Inserting {:?}", &e); + + match self.update_bucket_record(e) { + Ok(()) => None, + Err(BucketError::Ourselves) => None, + Err(BucketError::NotInTheBucket{node_entry, bucket_distance}) => Some((node_entry, bucket_distance)) + }.map(|(node_entry, bucket_distance)| { + trace!(target: "discovery", "Adding a new node {:?} into our bucket {}", &node_entry, bucket_distance); - if !updated { - added_map.insert(e.id, e.clone()); - bucket.nodes.push_front(BucketEntry::new(e)); + let mut added = HashMap::with_capacity(1); + added.insert(node_entry.id, node_entry.clone()); + let node_to_ping = { + let bucket = &mut self.node_buckets[bucket_distance]; + bucket.nodes.push_front(BucketEntry::new(node_entry)); if bucket.nodes.len() > BUCKET_SIZE { select_bucket_ping(bucket.nodes.iter()) - } else { None } - } else { None } - }; - if let Some(node) = ping { - self.try_ping(node, PingReason::Default); - } - Some(TableUpdates { added: added_map, removed: HashSet::new() }) + } else { + None + } + }; + if let Some(node) = node_to_ping { + self.try_ping(node, PingReason::Default); + }; + TableUpdates{added, removed: HashSet::new()} + }) } /// Starts the discovery process at round 0 @@ -541,10 +583,28 @@ impl<'a> Discovery<'a> { }; if let Some((node, ping_reason)) = expected_node { - if let PingReason::FromDiscoveryRequest(target) = ping_reason { + if let PingReason::FromDiscoveryRequest(target, validity) = ping_reason { self.respond_with_discovery(target, &node)?; + // kirushik: I would prefer to probe the network id of the remote node here, and add it to the nodes list if it's on "our" net -- + // but `on_packet` happens synchronously, so doing the full TCP handshake ceremony here is a bad idea. + // So instead we just LRU-caching most recently seen nodes to avoid unnecessary pinging + match validity { + NodeValidity::ValidNode(NodeCategory::Bucket) | NodeValidity::ExpiredNode(NodeCategory::Bucket) => { + trace!(target: "discovery", "Updating node {:?} in our Kad buckets", &node); + self.update_bucket_record(node).unwrap_or_else(|error| { + debug!(target: "discovery", "Error occured when processing ping from a bucket node: {:?}", &error); + }); + }, + NodeValidity::UnknownNode | NodeValidity::ExpiredNode(NodeCategory::Observed) | NodeValidity::ValidNode(NodeCategory::Observed)=> { + trace!(target: "discovery", "Updating node {:?} in the list of other_observed_nodes", &node); + self.other_observed_nodes.insert(node.id, (node.endpoint, Instant::now())); + }, + NodeValidity::Ourselves => (), + } + Ok(None) + } else { + Ok(self.update_node(node)) } - Ok(self.update_node(node)) } else { debug!(target: "discovery", "Got unexpected Pong from {:?} ; request not found", &from); Ok(None) @@ -565,31 +625,41 @@ impl<'a> Discovery<'a> { } }; - if self.is_a_valid_known_node(&node) { - self.respond_with_discovery(target, &node)?; - } else { + match self.check_validity(&node) { + NodeValidity::Ourselves => (), // It makes no sense to respond to the discovery request from ourselves + NodeValidity::ValidNode(_) => self.respond_with_discovery(target, &node)?, // Make sure the request source is actually there and responds to pings before actually responding - self.try_ping(node, PingReason::FromDiscoveryRequest(target)); + invalidity_reason => self.try_ping(node, PingReason::FromDiscoveryRequest(target, invalidity_reason)) } Ok(None) } - fn is_a_valid_known_node(&self, node: &NodeEntry) -> bool { + fn check_validity(&mut self, node: &NodeEntry) -> NodeValidity { let id_hash = keccak(node.id); let dist = match Discovery::distance(&self.id_hash, &id_hash) { Some(dist) => dist, None => { debug!(target: "discovery", "Got an incoming discovery request from self: {:?}", node); - return false; + return NodeValidity::Ourselves; } }; let bucket = &self.node_buckets[dist]; if let Some(known_node) = bucket.nodes.iter().find(|n| n.address.id == node.id) { debug!(target: "discovery", "Found a known node in a bucket when processing discovery: {:?}/{:?}", known_node, node); - (known_node.address.endpoint == node.endpoint) && (known_node.last_seen.elapsed() < NODE_LAST_SEEN_TIMEOUT) + match ((known_node.address.endpoint == node.endpoint), (known_node.last_seen.elapsed() < NODE_LAST_SEEN_TIMEOUT)) { + (true, true) => NodeValidity::ValidNode(NodeCategory::Bucket), + (true, false) => NodeValidity::ExpiredNode(NodeCategory::Bucket), + _ => NodeValidity::UnknownNode + } } else { - false + self.other_observed_nodes.get_mut(&node.id).map_or(NodeValidity::UnknownNode, |(endpoint, observed_at)| { + match ((node.endpoint==*endpoint), (observed_at.elapsed() < NODE_LAST_SEEN_TIMEOUT)) { + (true, true) => NodeValidity::ValidNode(NodeCategory::Observed), + (true, false) => NodeValidity::ExpiredNode(NodeCategory::Observed), + _ => NodeValidity::UnknownNode + } + }) } } diff --git a/util/network-devp2p/src/lib.rs b/util/network-devp2p/src/lib.rs index 531c6ee506f..6082049e890 100644 --- a/util/network-devp2p/src/lib.rs +++ b/util/network-devp2p/src/lib.rs @@ -84,6 +84,7 @@ extern crate keccak_hash as hash; extern crate serde; extern crate serde_json; extern crate parity_snappy as snappy; +extern crate lru_cache; #[macro_use] extern crate error_chain;