diff --git a/Cargo.lock b/Cargo.lock index 1731ded2906cf..81a53ed858859 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -484,12 +484,15 @@ dependencies = [ "beefy-primitives", "fnv", "futures 0.3.19", + "futures-timer", "hex", "log 0.4.14", "parity-scale-codec", "parking_lot 0.12.0", "sc-chain-spec", "sc-client-api", + "sc-consensus", + "sc-finality-grandpa", "sc-keystore", "sc-network", "sc-network-gossip", @@ -500,13 +503,19 @@ dependencies = [ "sp-application-crypto", "sp-arithmetic", "sp-blockchain", + "sp-consensus", "sp-core", + "sp-finality-grandpa", + "sp-keyring", "sp-keystore", "sp-runtime", "sp-tracing", "strum", "substrate-prometheus-endpoint", + "substrate-test-runtime-client", + "tempfile", "thiserror", + "tokio", "wasm-timer", ] @@ -10688,6 +10697,7 @@ dependencies = [ name = "substrate-test-runtime" version = "2.0.0" dependencies = [ + "beefy-primitives", "cfg-if 1.0.0", "frame-support", "frame-system", diff --git a/client/beefy/Cargo.toml b/client/beefy/Cargo.toml index 1cd0f1fd50d80..02be645b3fc08 100644 --- a/client/beefy/Cargo.toml +++ b/client/beefy/Cargo.toml @@ -10,6 +10,7 @@ description = "BEEFY Client gadget for substrate" [dependencies] fnv = "1.0.6" futures = "0.3" +futures-timer = "3.0.1" hex = "0.4.2" log = "0.4" parking_lot = "0.12.0" @@ -23,22 +24,31 @@ sp-api = { version = "4.0.0-dev", path = "../../primitives/api" } sp-application-crypto = { version = "6.0.0", path = "../../primitives/application-crypto" } sp-arithmetic = { version = "5.0.0", path = "../../primitives/arithmetic" } sp-blockchain = { version = "4.0.0-dev", path = "../../primitives/blockchain" } +sp-consensus = { version = "0.10.0-dev", path = "../../primitives/consensus/common" } sp-core = { version = "6.0.0", path = "../../primitives/core" } sp-keystore = { version = "0.12.0", path = "../../primitives/keystore" } sp-runtime = { version = "6.0.0", path = "../../primitives/runtime" } sc-chain-spec = { version = "4.0.0-dev", path = "../../client/chain-spec" } -sc-utils = { version = "4.0.0-dev", path = "../utils" } sc-client-api = { version = "4.0.0-dev", path = "../api" } +sc-finality-grandpa = { version = "0.10.0-dev", path = "../../client/finality-grandpa" } sc-keystore = { version = "4.0.0-dev", path = "../keystore" } sc-network = { version = "0.10.0-dev", path = "../network" } sc-network-gossip = { version = "0.10.0-dev", path = "../network-gossip" } +sc-utils = { version = "4.0.0-dev", path = "../utils" } beefy-primitives = { version = "4.0.0-dev", path = "../../primitives/beefy" } [dev-dependencies] -sp-tracing = { version = "5.0.0", path = "../../primitives/tracing" } +sc-consensus = { version = "0.10.0-dev", path = "../consensus/common" } sc-network-test = { version = "0.8.0", path = "../network/test" } +sp-finality-grandpa = { version = "4.0.0-dev", path = "../../primitives/finality-grandpa" } +sp-keyring = { version = "6.0.0", path = "../../primitives/keyring" } +sp-tracing = { version = "5.0.0", path = "../../primitives/tracing" } +substrate-test-runtime-client = { version = "2.0.0", path = "../../test-utils/runtime/client" } + serde = "1.0.136" strum = { version = "0.23", features = ["derive"] } +tokio = "1.15" +tempfile = "3.1.0" diff --git a/client/beefy/src/gossip.rs b/client/beefy/src/gossip.rs index 37358441ef88a..54d283fede32e 100644 --- a/client/beefy/src/gossip.rs +++ b/client/beefy/src/gossip.rs @@ -35,9 +35,6 @@ use beefy_primitives::{ use crate::keystore::BeefyKeystore; -// Limit BEEFY gossip by keeping only a bound number of voting rounds alive. -const MAX_LIVE_GOSSIP_ROUNDS: usize = 3; - // Timeout for rebroadcasting messages. const REBROADCAST_AFTER: Duration = Duration::from_secs(60 * 5); @@ -52,13 +49,50 @@ where /// A type that represents hash of the message. pub type MessageHash = [u8; 8]; -type KnownVotes = BTreeMap, fnv::FnvHashSet>; +struct KnownVotes { + last_done: Option>, + live: BTreeMap, fnv::FnvHashSet>, +} + +impl KnownVotes { + pub fn new() -> Self { + Self { last_done: None, live: BTreeMap::new() } + } + + /// Create new round votes set if not already present. + fn insert(&mut self, round: NumberFor) { + self.live.entry(round).or_default(); + } + + /// Remove `round` and older from live set, update `last_done` accordingly. + fn conclude(&mut self, round: NumberFor) { + self.live.retain(|&number, _| number > round); + self.last_done = self.last_done.max(Some(round)); + } + + /// Return true if `round` is newer than previously concluded rounds. + /// + /// Latest concluded round is still considered alive to allow proper gossiping for it. + fn is_live(&self, round: &NumberFor) -> bool { + Some(*round) >= self.last_done + } + + /// Add new _known_ `hash` to the round's known votes. + fn add_known(&mut self, round: &NumberFor, hash: MessageHash) { + self.live.get_mut(round).map(|known| known.insert(hash)); + } + + /// Check if `hash` is already part of round's known votes. + fn is_known(&self, round: &NumberFor, hash: &MessageHash) -> bool { + self.live.get(round).map(|known| known.contains(hash)).unwrap_or(false) + } +} /// BEEFY gossip validator /// /// Validate BEEFY gossip messages and limit the number of live BEEFY voting rounds. /// -/// Allows messages from last [`MAX_LIVE_GOSSIP_ROUNDS`] to flow, everything else gets +/// Allows messages for 'rounds >= last concluded' to flow, everything else gets /// rejected/expired. /// ///All messaging is handled in a single BEEFY global topic. @@ -78,57 +112,25 @@ where pub fn new() -> GossipValidator { GossipValidator { topic: topic::(), - known_votes: RwLock::new(BTreeMap::new()), + known_votes: RwLock::new(KnownVotes::new()), next_rebroadcast: Mutex::new(Instant::now() + REBROADCAST_AFTER), } } /// Note a voting round. /// - /// Noting `round` will keep `round` live. - /// - /// We retain the [`MAX_LIVE_GOSSIP_ROUNDS`] most **recent** voting rounds as live. - /// As long as a voting round is live, it will be gossiped to peer nodes. + /// Noting round will start a live `round`. pub(crate) fn note_round(&self, round: NumberFor) { - debug!(target: "beefy", "🥩 About to note round #{}", round); - - let mut live = self.known_votes.write(); - - if !live.contains_key(&round) { - live.insert(round, Default::default()); - } - - if live.len() > MAX_LIVE_GOSSIP_ROUNDS { - let to_remove = live.iter().next().map(|x| x.0).copied(); - if let Some(first) = to_remove { - live.remove(&first); - } - } - } - - fn add_known(known_votes: &mut KnownVotes, round: &NumberFor, hash: MessageHash) { - known_votes.get_mut(round).map(|known| known.insert(hash)); - } - - // Note that we will always keep the most recent unseen round alive. - // - // This is a preliminary fix and the detailed description why we are - // doing this can be found as part of the issue below - // - // https://github.com/paritytech/grandpa-bridge-gadget/issues/237 - // - fn is_live(known_votes: &KnownVotes, round: &NumberFor) -> bool { - let unseen_round = if let Some(max_known_round) = known_votes.keys().last() { - round > max_known_round - } else { - known_votes.is_empty() - }; - - known_votes.contains_key(round) || unseen_round + debug!(target: "beefy", "🥩 About to note gossip round #{}", round); + self.known_votes.write().insert(round); } - fn is_known(known_votes: &KnownVotes, round: &NumberFor, hash: &MessageHash) -> bool { - known_votes.get(round).map(|known| known.contains(hash)).unwrap_or(false) + /// Conclude a voting round. + /// + /// This can be called once round is complete so we stop gossiping for it. + pub(crate) fn conclude_round(&self, round: NumberFor) { + debug!(target: "beefy", "🥩 About to drop gossip round #{}", round); + self.known_votes.write().conclude(round); } } @@ -152,17 +154,17 @@ where { let known_votes = self.known_votes.read(); - if !GossipValidator::::is_live(&known_votes, &round) { + if !known_votes.is_live(&round) { return ValidationResult::Discard } - if GossipValidator::::is_known(&known_votes, &round, &msg_hash) { + if known_votes.is_known(&round, &msg_hash) { return ValidationResult::ProcessAndKeep(self.topic) } } if BeefyKeystore::verify(&msg.id, &msg.signature, &msg.commitment.encode()) { - GossipValidator::::add_known(&mut *self.known_votes.write(), &round, msg_hash); + self.known_votes.write().add_known(&round, msg_hash); return ValidationResult::ProcessAndKeep(self.topic) } else { // TODO: report peer @@ -182,7 +184,7 @@ where }; let round = msg.commitment.block_number; - let expired = !GossipValidator::::is_live(&known_votes, &round); + let expired = !known_votes.is_live(&round); trace!(target: "beefy", "🥩 Message for round #{} expired: {}", round, expired); @@ -212,11 +214,11 @@ where let msg = match VoteMessage::, Public, Signature>::decode(&mut data) { Ok(vote) => vote, - Err(_) => return true, + Err(_) => return false, }; let round = msg.commitment.block_number; - let allowed = GossipValidator::::is_live(&known_votes, &round); + let allowed = known_votes.is_live(&round); debug!(target: "beefy", "🥩 Message for round #{} allowed: {}", round, allowed); @@ -240,60 +242,58 @@ mod tests { use super::*; #[test] - fn note_round_works() { - let gv = GossipValidator::::new(); - - gv.note_round(1u64); - - let live = gv.known_votes.read(); - assert!(GossipValidator::::is_live(&live, &1u64)); - - drop(live); - - gv.note_round(3u64); - gv.note_round(7u64); - gv.note_round(10u64); - - let live = gv.known_votes.read(); - - assert_eq!(live.len(), MAX_LIVE_GOSSIP_ROUNDS); - - assert!(!GossipValidator::::is_live(&live, &1u64)); - assert!(GossipValidator::::is_live(&live, &3u64)); - assert!(GossipValidator::::is_live(&live, &7u64)); - assert!(GossipValidator::::is_live(&live, &10u64)); + fn known_votes_insert_remove() { + let mut kv = KnownVotes::::new(); + + kv.insert(1); + kv.insert(1); + kv.insert(2); + assert_eq!(kv.live.len(), 2); + + let mut kv = KnownVotes::::new(); + kv.insert(1); + kv.insert(2); + kv.insert(3); + + assert!(kv.last_done.is_none()); + kv.conclude(2); + assert_eq!(kv.live.len(), 1); + assert!(!kv.live.contains_key(&2)); + assert_eq!(kv.last_done, Some(2)); + + kv.conclude(1); + assert_eq!(kv.last_done, Some(2)); + + kv.conclude(3); + assert_eq!(kv.last_done, Some(3)); + assert!(kv.live.is_empty()); } #[test] - fn keeps_most_recent_max_rounds() { + fn note_and_drop_round_works() { let gv = GossipValidator::::new(); - gv.note_round(3u64); - gv.note_round(7u64); - gv.note_round(10u64); gv.note_round(1u64); - let live = gv.known_votes.read(); - - assert_eq!(live.len(), MAX_LIVE_GOSSIP_ROUNDS); + assert!(gv.known_votes.read().is_live(&1u64)); - assert!(GossipValidator::::is_live(&live, &3u64)); - assert!(!GossipValidator::::is_live(&live, &1u64)); - - drop(live); + gv.note_round(3u64); + gv.note_round(7u64); + gv.note_round(10u64); - gv.note_round(23u64); - gv.note_round(15u64); - gv.note_round(20u64); - gv.note_round(2u64); + assert_eq!(gv.known_votes.read().live.len(), 4); - let live = gv.known_votes.read(); + gv.conclude_round(7u64); - assert_eq!(live.len(), MAX_LIVE_GOSSIP_ROUNDS); + let votes = gv.known_votes.read(); - assert!(GossipValidator::::is_live(&live, &15u64)); - assert!(GossipValidator::::is_live(&live, &20u64)); - assert!(GossipValidator::::is_live(&live, &23u64)); + // rounds 1 and 3 are outdated, don't gossip anymore + assert!(!votes.is_live(&1u64)); + assert!(!votes.is_live(&3u64)); + // latest concluded round is still gossiped + assert!(votes.is_live(&7u64)); + // round 10 is alive and in-progress + assert!(votes.is_live(&10u64)); } #[test] @@ -304,22 +304,18 @@ mod tests { gv.note_round(7u64); gv.note_round(10u64); - let live = gv.known_votes.read(); - - assert_eq!(live.len(), MAX_LIVE_GOSSIP_ROUNDS); - - drop(live); + assert_eq!(gv.known_votes.read().live.len(), 3); // note round #7 again -> should not change anything gv.note_round(7u64); - let live = gv.known_votes.read(); + let votes = gv.known_votes.read(); - assert_eq!(live.len(), MAX_LIVE_GOSSIP_ROUNDS); + assert_eq!(votes.live.len(), 3); - assert!(GossipValidator::::is_live(&live, &3u64)); - assert!(GossipValidator::::is_live(&live, &7u64)); - assert!(GossipValidator::::is_live(&live, &10u64)); + assert!(votes.is_live(&3u64)); + assert!(votes.is_live(&7u64)); + assert!(votes.is_live(&10u64)); } struct TestContext; @@ -349,29 +345,32 @@ mod tests { beefy_keystore.sign(&who.public(), &commitment.encode()).unwrap() } + fn dummy_vote(block_number: u64) -> VoteMessage { + let payload = Payload::new(known_payload_ids::MMR_ROOT_ID, MmrRootHash::default().encode()); + let commitment = Commitment { payload, block_number, validator_set_id: 0 }; + let signature = sign_commitment(&Keyring::Alice, &commitment); + + VoteMessage { commitment, id: Keyring::Alice.public(), signature } + } + #[test] fn should_avoid_verifying_signatures_twice() { let gv = GossipValidator::::new(); let sender = sc_network::PeerId::random(); let mut context = TestContext; - let payload = Payload::new(known_payload_ids::MMR_ROOT_ID, MmrRootHash::default().encode()); - let commitment = Commitment { payload, block_number: 3_u64, validator_set_id: 0 }; - - let signature = sign_commitment(&Keyring::Alice, &commitment); - - let vote = VoteMessage { commitment, id: Keyring::Alice.public(), signature }; + let vote = dummy_vote(3); gv.note_round(3u64); gv.note_round(7u64); gv.note_round(10u64); - // first time the cache should be populated. + // first time the cache should be populated let res = gv.validate(&mut context, &sender, &vote.encode()); assert!(matches!(res, ValidationResult::ProcessAndKeep(_))); assert_eq!( - gv.known_votes.read().get(&vote.commitment.block_number).map(|x| x.len()), + gv.known_votes.read().live.get(&vote.commitment.block_number).map(|x| x.len()), Some(1) ); @@ -380,17 +379,84 @@ mod tests { assert!(matches!(res, ValidationResult::ProcessAndKeep(_))); - // next we should quickly reject if the round is not live. - gv.note_round(11_u64); - gv.note_round(12_u64); + // next we should quickly reject if the round is not live + gv.conclude_round(7_u64); - assert!(!GossipValidator::::is_live( - &*gv.known_votes.read(), - &vote.commitment.block_number - )); + assert!(!gv.known_votes.read().is_live(&vote.commitment.block_number)); let res = gv.validate(&mut context, &sender, &vote.encode()); assert!(matches!(res, ValidationResult::Discard)); } + + #[test] + fn messages_allowed_and_expired() { + let gv = GossipValidator::::new(); + let sender = sc_network::PeerId::random(); + let topic = Default::default(); + let intent = MessageIntent::Broadcast; + + // note round 2 and 3, then conclude 2 + gv.note_round(2u64); + gv.note_round(3u64); + gv.conclude_round(2u64); + let mut allowed = gv.message_allowed(); + let mut expired = gv.message_expired(); + + // check bad vote format + assert!(!allowed(&sender, intent, &topic, &mut [0u8; 16])); + assert!(expired(topic, &mut [0u8; 16])); + + // inactive round 1 -> expired + let vote = dummy_vote(1); + let mut encoded_vote = vote.encode(); + assert!(!allowed(&sender, intent, &topic, &mut encoded_vote)); + assert!(expired(topic, &mut encoded_vote)); + + // active round 2 -> !expired - concluded but still gossiped + let vote = dummy_vote(2); + let mut encoded_vote = vote.encode(); + assert!(allowed(&sender, intent, &topic, &mut encoded_vote)); + assert!(!expired(topic, &mut encoded_vote)); + + // in progress round 3 -> !expired + let vote = dummy_vote(3); + let mut encoded_vote = vote.encode(); + assert!(allowed(&sender, intent, &topic, &mut encoded_vote)); + assert!(!expired(topic, &mut encoded_vote)); + + // unseen round 4 -> !expired + let vote = dummy_vote(3); + let mut encoded_vote = vote.encode(); + assert!(allowed(&sender, intent, &topic, &mut encoded_vote)); + assert!(!expired(topic, &mut encoded_vote)); + } + + #[test] + fn messages_rebroadcast() { + let gv = GossipValidator::::new(); + let sender = sc_network::PeerId::random(); + let topic = Default::default(); + + let vote = dummy_vote(1); + let mut encoded_vote = vote.encode(); + + // re-broadcasting only allowed at `REBROADCAST_AFTER` intervals + let intent = MessageIntent::PeriodicRebroadcast; + let mut allowed = gv.message_allowed(); + + // rebroadcast not allowed so soon after GossipValidator creation + assert!(!allowed(&sender, intent, &topic, &mut encoded_vote)); + + // hack the inner deadline to be `now` + *gv.next_rebroadcast.lock() = Instant::now(); + + // still not allowed on old `allowed` closure result + assert!(!allowed(&sender, intent, &topic, &mut encoded_vote)); + + // renew closure result + let mut allowed = gv.message_allowed(); + // rebroadcast should be allowed now + assert!(allowed(&sender, intent, &topic, &mut encoded_vote)); + } } diff --git a/client/beefy/src/lib.rs b/client/beefy/src/lib.rs index 29d74c15dd599..8a6e175f58321 100644 --- a/client/beefy/src/lib.rs +++ b/client/beefy/src/lib.rs @@ -18,14 +18,14 @@ use std::sync::Arc; -use log::debug; use prometheus::Registry; use sc_client_api::{Backend, BlockchainEvents, Finalizer}; -use sc_network_gossip::{GossipEngine, Network as GossipNetwork}; +use sc_network_gossip::Network as GossipNetwork; use sp_api::ProvideRuntimeApi; use sp_blockchain::HeaderBackend; +use sp_consensus::SyncOracle; use sp_keystore::SyncCryptoStorePtr; use sp_runtime::traits::Block; @@ -41,6 +41,10 @@ mod round; mod worker; pub mod notification; + +#[cfg(test)] +mod tests; + pub use beefy_protocol_name::standard_name as protocol_standard_name; pub(crate) mod beefy_protocol_name { @@ -112,7 +116,7 @@ where BE: Backend, C: Client, C::Api: BeefyApi, - N: GossipNetwork + Clone + Send + 'static, + N: GossipNetwork + Clone + SyncOracle + Send + Sync + 'static, { /// BEEFY client pub client: Arc, @@ -134,6 +138,7 @@ where pub protocol_name: std::borrow::Cow<'static, str>, } +#[cfg(not(test))] /// Start the BEEFY gadget. /// /// This is a thin shim around running and awaiting a BEEFY worker. @@ -143,7 +148,7 @@ where BE: Backend, C: Client, C::Api: BeefyApi, - N: GossipNetwork + Clone + Send + 'static, + N: GossipNetwork + Clone + SyncOracle + Send + Sync + 'static, { let BeefyParams { client, @@ -157,18 +162,24 @@ where protocol_name, } = beefy_params; + let sync_oracle = network.clone(); let gossip_validator = Arc::new(gossip::GossipValidator::new()); - let gossip_engine = GossipEngine::new(network, protocol_name, gossip_validator.clone(), None); + let gossip_engine = sc_network_gossip::GossipEngine::new( + network, + protocol_name, + gossip_validator.clone(), + None, + ); let metrics = prometheus_registry.as_ref().map(metrics::Metrics::register).and_then( |result| match result { Ok(metrics) => { - debug!(target: "beefy", "🥩 Registered metrics"); + log::debug!(target: "beefy", "🥩 Registered metrics"); Some(metrics) }, Err(err) => { - debug!(target: "beefy", "🥩 Failed to register metrics: {:?}", err); + log::debug!(target: "beefy", "🥩 Failed to register metrics: {:?}", err); None }, }, @@ -184,54 +195,10 @@ where gossip_validator, min_block_delta, metrics, + sync_oracle, }; - let worker = worker::BeefyWorker::<_, _, _>::new(worker_params); + let worker = worker::BeefyWorker::<_, _, _, _>::new(worker_params); worker.run().await } - -#[cfg(test)] -mod tests { - use super::*; - use sc_chain_spec::{ChainSpec, GenericChainSpec}; - use serde::{Deserialize, Serialize}; - use sp_core::H256; - use sp_runtime::{BuildStorage, Storage}; - - #[derive(Debug, Serialize, Deserialize)] - struct Genesis(std::collections::BTreeMap); - impl BuildStorage for Genesis { - fn assimilate_storage(&self, storage: &mut Storage) -> Result<(), String> { - storage.top.extend( - self.0.iter().map(|(a, b)| (a.clone().into_bytes(), b.clone().into_bytes())), - ); - Ok(()) - } - } - - #[test] - fn beefy_protocol_name() { - let chain_spec = GenericChainSpec::::from_json_file(std::path::PathBuf::from( - "../chain-spec/res/chain_spec.json", - )) - .unwrap() - .cloned_box(); - - // Create protocol name using random genesis hash. - let genesis_hash = H256::random(); - let expected = format!("/{}/beefy/1", hex::encode(genesis_hash)); - let proto_name = beefy_protocol_name::standard_name(&genesis_hash, &chain_spec); - assert_eq!(proto_name.to_string(), expected); - - // Create protocol name using hardcoded genesis hash. Verify exact representation. - let genesis_hash = [ - 50, 4, 60, 123, 58, 106, 216, 246, 194, 188, 139, 193, 33, 212, 202, 171, 9, 55, 123, - 94, 8, 43, 12, 251, 187, 57, 173, 19, 188, 74, 205, 147, - ]; - let expected = - "/32043c7b3a6ad8f6c2bc8bc121d4caab09377b5e082b0cfbbb39ad13bc4acd93/beefy/1".to_string(); - let proto_name = beefy_protocol_name::standard_name(&genesis_hash, &chain_spec); - assert_eq!(proto_name.to_string(), expected); - } -} diff --git a/client/beefy/src/metrics.rs b/client/beefy/src/metrics.rs index 4b2a5c8dfd5c9..20fa98e52fdd5 100644 --- a/client/beefy/src/metrics.rs +++ b/client/beefy/src/metrics.rs @@ -18,7 +18,9 @@ //! BEEFY Prometheus metrics definition -use prometheus::{register, Counter, Gauge, PrometheusError, Registry, U64}; +#[cfg(not(test))] +use prometheus::{register, PrometheusError, Registry}; +use prometheus::{Counter, Gauge, U64}; /// BEEFY metrics exposed through Prometheus pub(crate) struct Metrics { @@ -37,6 +39,7 @@ pub(crate) struct Metrics { } impl Metrics { + #[cfg(not(test))] pub(crate) fn register(registry: &Registry) -> Result { Ok(Self { beefy_validator_set_id: register( @@ -97,3 +100,11 @@ macro_rules! metric_inc { } }}; } + +#[cfg(test)] +#[macro_export] +macro_rules! metric_get { + ($self:ident, $m:ident) => {{ + $self.metrics.as_ref().map(|metrics| metrics.$m.clone()) + }}; +} diff --git a/client/beefy/src/round.rs b/client/beefy/src/round.rs index e5404cfa6d216..eba769b2356f0 100644 --- a/client/beefy/src/round.rs +++ b/client/beefy/src/round.rs @@ -16,7 +16,10 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use std::{collections::BTreeMap, hash::Hash}; +use std::{ + collections::{BTreeMap, HashMap}, + hash::Hash, +}; use log::{debug, trace}; @@ -24,25 +27,33 @@ use beefy_primitives::{ crypto::{Public, Signature}, ValidatorSet, ValidatorSetId, }; -use sp_arithmetic::traits::AtLeast32BitUnsigned; -use sp_runtime::traits::MaybeDisplay; +use sp_runtime::traits::{Block, NumberFor}; +/// Tracks for each round which validators have voted/signed and +/// whether the local `self` validator has voted/signed. +/// +/// Does not do any validation on votes or signatures, layers above need to handle that (gossip). #[derive(Default)] struct RoundTracker { - votes: Vec<(Public, Signature)>, + self_vote: bool, + votes: HashMap, } impl RoundTracker { - fn add_vote(&mut self, vote: (Public, Signature)) -> bool { - // this needs to handle equivocations in the future - if self.votes.contains(&vote) { + fn add_vote(&mut self, vote: (Public, Signature), self_vote: bool) -> bool { + if self.votes.contains_key(&vote.0) { return false } - self.votes.push(vote); + self.self_vote = self.self_vote || self_vote; + self.votes.insert(vote.0, vote.1); true } + fn has_self_vote(&self) -> bool { + self.self_vote + } + fn is_done(&self, threshold: usize) -> bool { self.votes.len() >= threshold } @@ -53,74 +64,125 @@ fn threshold(authorities: usize) -> usize { authorities - faulty } -pub(crate) struct Rounds { - rounds: BTreeMap<(Payload, Number), RoundTracker>, +/// Keeps track of all voting rounds (block numbers) within a session. +/// Only round numbers > `best_done` are of interest, all others are considered stale. +/// +/// Does not do any validation on votes or signatures, layers above need to handle that (gossip). +pub(crate) struct Rounds { + rounds: BTreeMap<(Payload, NumberFor), RoundTracker>, + best_done: Option>, + session_start: NumberFor, validator_set: ValidatorSet, + prev_validator_set: ValidatorSet, } -impl Rounds +impl Rounds where - P: Ord + Hash, - N: Ord + AtLeast32BitUnsigned + MaybeDisplay, + P: Ord + Hash + Clone, + B: Block, { - pub(crate) fn new(validator_set: ValidatorSet) -> Self { - Rounds { rounds: BTreeMap::new(), validator_set } + pub(crate) fn new( + session_start: NumberFor, + validator_set: ValidatorSet, + prev_validator_set: ValidatorSet, + ) -> Self { + Rounds { + rounds: BTreeMap::new(), + best_done: None, + session_start, + validator_set, + prev_validator_set, + } } } -impl Rounds +impl Rounds where - H: Ord + Hash + Clone, - N: Ord + AtLeast32BitUnsigned + MaybeDisplay + Clone, + P: Ord + Hash + Clone, + B: Block, { - pub(crate) fn validator_set_id(&self) -> ValidatorSetId { - self.validator_set.id() + pub(crate) fn validator_set_id_for(&self, block_number: NumberFor) -> ValidatorSetId { + if block_number > self.session_start { + self.validator_set.id() + } else { + self.prev_validator_set.id() + } } - pub(crate) fn validators(&self) -> &[Public] { - self.validator_set.validators() + pub(crate) fn validators_for(&self, block_number: NumberFor) -> &[Public] { + if block_number > self.session_start { + self.validator_set.validators() + } else { + self.prev_validator_set.validators() + } } - pub(crate) fn add_vote(&mut self, round: &(H, N), vote: (Public, Signature)) -> bool { - if self.validator_set.validators().iter().any(|id| vote.0 == *id) { - self.rounds.entry(round.clone()).or_default().add_vote(vote) - } else { + pub(crate) fn validator_set(&self) -> &ValidatorSet { + &self.validator_set + } + + pub(crate) fn session_start(&self) -> &NumberFor { + &self.session_start + } + + pub(crate) fn should_self_vote(&self, round: &(P, NumberFor)) -> bool { + Some(round.1.clone()) > self.best_done && + self.rounds.get(round).map(|tracker| !tracker.has_self_vote()).unwrap_or(true) + } + + pub(crate) fn add_vote( + &mut self, + round: &(P, NumberFor), + vote: (Public, Signature), + self_vote: bool, + ) -> bool { + if Some(round.1.clone()) <= self.best_done { + debug!( + target: "beefy", + "🥩 received vote for old stale round {:?}, ignoring", + round.1 + ); + false + } else if !self.validator_set.validators().iter().any(|id| vote.0 == *id) { + debug!( + target: "beefy", + "🥩 received vote {:?} from validator that is not in the validator set, ignoring", + vote + ); false + } else { + self.rounds.entry(round.clone()).or_default().add_vote(vote, self_vote) } } - pub(crate) fn is_done(&self, round: &(H, N)) -> bool { + pub(crate) fn try_conclude( + &mut self, + round: &(P, NumberFor), + ) -> Option>> { let done = self .rounds .get(round) .map(|tracker| tracker.is_done(threshold(self.validator_set.len()))) .unwrap_or(false); - - debug!(target: "beefy", "🥩 Round #{} done: {}", round.1, done); - - done - } - - pub(crate) fn drop(&mut self, round: &(H, N)) -> Option>> { - trace!(target: "beefy", "🥩 About to drop round #{}", round.1); - - let signatures = self.rounds.remove(round)?.votes; - - Some( - self.validator_set - .validators() - .iter() - .map(|authority_id| { - signatures.iter().find_map(|(id, sig)| { - if id == authority_id { - Some(sig.clone()) - } else { - None - } - }) - }) - .collect(), - ) + trace!(target: "beefy", "🥩 Round #{} done: {}", round.1, done); + + if done { + // remove this and older (now stale) rounds + let signatures = self.rounds.remove(round)?.votes; + self.rounds.retain(|&(_, number), _| number > round.1); + self.best_done = self.best_done.clone().max(Some(round.1.clone())); + trace!(target: "beefy", "🥩 Concluded round #{}", round.1); + + Some( + self.validator_set + .validators() + .iter() + .map(|authority_id| signatures.get(authority_id).cloned()) + .collect(), + ) + } else { + None + } } } @@ -128,13 +190,52 @@ where mod tests { use sc_network_test::Block; use sp_core::H256; - use sp_runtime::traits::NumberFor; use beefy_primitives::{crypto::Public, ValidatorSet}; - use super::Rounds; + use super::{threshold, RoundTracker, Rounds}; use crate::keystore::tests::Keyring; + #[test] + fn round_tracker() { + let mut rt = RoundTracker::default(); + let bob_vote = (Keyring::Bob.public(), Keyring::Bob.sign(b"I am committed")); + let threshold = 2; + + // self vote not added yet + assert!(!rt.has_self_vote()); + + // adding new vote allowed + assert!(rt.add_vote(bob_vote.clone(), false)); + // adding existing vote not allowed + assert!(!rt.add_vote(bob_vote, false)); + + // self vote still not added yet + assert!(!rt.has_self_vote()); + + // vote is not done + assert!(!rt.is_done(threshold)); + + let alice_vote = (Keyring::Alice.public(), Keyring::Alice.sign(b"I am committed")); + // adding new vote (self vote this time) allowed + assert!(rt.add_vote(alice_vote, true)); + + // self vote registered + assert!(rt.has_self_vote()); + // vote is now done + assert!(rt.is_done(threshold)); + } + + #[test] + fn vote_threshold() { + assert_eq!(threshold(1), 1); + assert_eq!(threshold(2), 2); + assert_eq!(threshold(3), 3); + assert_eq!(threshold(4), 3); + assert_eq!(threshold(100), 67); + assert_eq!(threshold(300), 201); + } + #[test] fn new_rounds() { sp_tracing::try_init_simple(); @@ -145,116 +246,175 @@ mod tests { ) .unwrap(); - let rounds = Rounds::>::new(validators); - - assert_eq!(42, rounds.validator_set_id()); + let session_start = 1u64.into(); + let rounds = Rounds::::new(session_start, validators.clone(), validators); + assert_eq!(42, rounds.validator_set_id_for(session_start)); + assert_eq!(1, *rounds.session_start()); assert_eq!( &vec![Keyring::Alice.public(), Keyring::Bob.public(), Keyring::Charlie.public()], - rounds.validators() + rounds.validators_for(session_start) ); } #[test] - fn add_vote() { + fn add_and_conclude_votes() { sp_tracing::try_init_simple(); let validators = ValidatorSet::::new( - vec![Keyring::Alice.public(), Keyring::Bob.public(), Keyring::Charlie.public()], + vec![ + Keyring::Alice.public(), + Keyring::Bob.public(), + Keyring::Charlie.public(), + Keyring::Eve.public(), + ], Default::default(), ) .unwrap(); + let round = (H256::from_low_u64_le(1), 1); - let mut rounds = Rounds::>::new(validators); + let session_start = 1u64.into(); + let mut rounds = Rounds::::new(session_start, validators.clone(), validators); + // no self vote yet, should self vote + assert!(rounds.should_self_vote(&round)); + + // add 1st good vote assert!(rounds.add_vote( - &(H256::from_low_u64_le(1), 1), - (Keyring::Alice.public(), Keyring::Alice.sign(b"I am committed")) + &round, + (Keyring::Alice.public(), Keyring::Alice.sign(b"I am committed")), + true )); + // round not concluded + assert!(rounds.try_conclude(&round).is_none()); + // self vote already present, should not self vote + assert!(!rounds.should_self_vote(&round)); - assert!(!rounds.is_done(&(H256::from_low_u64_le(1), 1))); - - // invalid vote + // double voting not allowed assert!(!rounds.add_vote( - &(H256::from_low_u64_le(1), 1), - (Keyring::Dave.public(), Keyring::Dave.sign(b"I am committed")) + &round, + (Keyring::Alice.public(), Keyring::Alice.sign(b"I am committed")), + true )); - assert!(!rounds.is_done(&(H256::from_low_u64_le(1), 1))); + // invalid vote (Dave is not a validator) + assert!(!rounds.add_vote( + &round, + (Keyring::Dave.public(), Keyring::Dave.sign(b"I am committed")), + false + )); + assert!(rounds.try_conclude(&round).is_none()); + // add 2nd good vote assert!(rounds.add_vote( - &(H256::from_low_u64_le(1), 1), - (Keyring::Bob.public(), Keyring::Bob.sign(b"I am committed")) + &round, + (Keyring::Bob.public(), Keyring::Bob.sign(b"I am committed")), + false )); + // round not concluded + assert!(rounds.try_conclude(&round).is_none()); - assert!(!rounds.is_done(&(H256::from_low_u64_le(1), 1))); - + // add 3rd good vote assert!(rounds.add_vote( - &(H256::from_low_u64_le(1), 1), - (Keyring::Charlie.public(), Keyring::Charlie.sign(b"I am committed")) + &round, + (Keyring::Charlie.public(), Keyring::Charlie.sign(b"I am committed")), + false )); + // round concluded + assert!(rounds.try_conclude(&round).is_some()); - assert!(rounds.is_done(&(H256::from_low_u64_le(1), 1))); + // Eve is a validator, but round was concluded, adding vote disallowed + assert!(!rounds.add_vote( + &round, + (Keyring::Eve.public(), Keyring::Eve.sign(b"I am committed")), + false + )); } #[test] - fn drop() { + fn multiple_rounds() { sp_tracing::try_init_simple(); let validators = ValidatorSet::::new( - vec![Keyring::Alice.public(), Keyring::Bob.public(), Keyring::Charlie.public()], + vec![ + Keyring::Alice.public(), + Keyring::Bob.public(), + Keyring::Charlie.public(), + Keyring::Dave.public(), + ], Default::default(), ) .unwrap(); - let mut rounds = Rounds::>::new(validators); + let session_start = 1u64.into(); + let mut rounds = Rounds::::new(session_start, validators.clone(), validators); // round 1 - rounds.add_vote( + assert!(rounds.add_vote( &(H256::from_low_u64_le(1), 1), (Keyring::Alice.public(), Keyring::Alice.sign(b"I am committed")), - ); - rounds.add_vote( + true, + )); + assert!(rounds.add_vote( &(H256::from_low_u64_le(1), 1), (Keyring::Bob.public(), Keyring::Bob.sign(b"I am committed")), - ); + false, + )); + assert!(rounds.add_vote( + &(H256::from_low_u64_le(1), 1), + (Keyring::Charlie.public(), Keyring::Charlie.sign(b"I am committed")), + false, + )); // round 2 - rounds.add_vote( + assert!(rounds.add_vote( &(H256::from_low_u64_le(2), 2), (Keyring::Alice.public(), Keyring::Alice.sign(b"I am again committed")), - ); - rounds.add_vote( + true, + )); + assert!(rounds.add_vote( &(H256::from_low_u64_le(2), 2), (Keyring::Bob.public(), Keyring::Bob.sign(b"I am again committed")), - ); + false, + )); + assert!(rounds.add_vote( + &(H256::from_low_u64_le(2), 2), + (Keyring::Charlie.public(), Keyring::Charlie.sign(b"I am again committed")), + false, + )); // round 3 - rounds.add_vote( + assert!(rounds.add_vote( &(H256::from_low_u64_le(3), 3), (Keyring::Alice.public(), Keyring::Alice.sign(b"I am still committed")), - ); - rounds.add_vote( + true, + )); + assert!(rounds.add_vote( &(H256::from_low_u64_le(3), 3), (Keyring::Bob.public(), Keyring::Bob.sign(b"I am still committed")), - ); - + false, + )); + assert!(rounds.add_vote( + &(H256::from_low_u64_le(3), 3), + (Keyring::Charlie.public(), Keyring::Charlie.sign(b"I am still committed")), + false, + )); assert_eq!(3, rounds.rounds.len()); - // drop unknown round - assert!(rounds.drop(&(H256::from_low_u64_le(5), 5)).is_none()); + // conclude unknown round + assert!(rounds.try_conclude(&(H256::from_low_u64_le(5), 5)).is_none()); assert_eq!(3, rounds.rounds.len()); - // drop round 2 - let signatures = rounds.drop(&(H256::from_low_u64_le(2), 2)).unwrap(); - - assert_eq!(2, rounds.rounds.len()); + // conclude round 2 + let signatures = rounds.try_conclude(&(H256::from_low_u64_le(2), 2)).unwrap(); + assert_eq!(1, rounds.rounds.len()); assert_eq!( signatures, vec![ Some(Keyring::Alice.sign(b"I am again committed")), Some(Keyring::Bob.sign(b"I am again committed")), + Some(Keyring::Charlie.sign(b"I am again committed")), None ] ); diff --git a/client/beefy/src/tests.rs b/client/beefy/src/tests.rs new file mode 100644 index 0000000000000..92b5ad91c11e1 --- /dev/null +++ b/client/beefy/src/tests.rs @@ -0,0 +1,590 @@ +// This file is part of Substrate. + +// Copyright (C) 2018-2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Tests and test helpers for BEEFY. + +use futures::{future, stream::FuturesUnordered, Future, StreamExt}; +use parking_lot::Mutex; +use serde::{Deserialize, Serialize}; +use std::{sync::Arc, task::Poll}; +use tokio::{runtime::Runtime, time::Duration}; + +use sc_chain_spec::{ChainSpec, GenericChainSpec}; +use sc_client_api::HeaderBackend; +use sc_consensus::BoxJustificationImport; +use sc_keystore::LocalKeystore; +use sc_network::{config::ProtocolConfig, NetworkService}; +use sc_network_gossip::GossipEngine; +use sc_network_test::{ + Block, BlockImportAdapter, FullPeerConfig, PassThroughVerifier, Peer, PeersClient, + PeersFullClient, TestNetFactory, +}; +use sc_utils::notification::NotificationReceiver; + +use beefy_primitives::{ + crypto::AuthorityId, ConsensusLog, MmrRootHash, ValidatorSet, BEEFY_ENGINE_ID, + KEY_TYPE as BeefyKeyType, +}; +use sp_consensus::BlockOrigin; +use sp_core::H256; +use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr}; +use sp_runtime::{ + codec::Encode, generic::BlockId, traits::Header as HeaderT, BuildStorage, DigestItem, Storage, +}; + +use substrate_test_runtime_client::{runtime::Header, Backend, ClientExt}; + +use crate::{ + beefy_protocol_name, + keystore::tests::Keyring as BeefyKeyring, + notification::*, + worker::{tests::TestModifiers, BeefyWorker}, +}; + +const BEEFY_PROTOCOL_NAME: &'static str = "/beefy/1"; + +type BeefyValidatorSet = ValidatorSet; +type BeefyPeer = Peer; + +#[derive(Debug, Serialize, Deserialize)] +struct Genesis(std::collections::BTreeMap); +impl BuildStorage for Genesis { + fn assimilate_storage(&self, storage: &mut Storage) -> Result<(), String> { + storage + .top + .extend(self.0.iter().map(|(a, b)| (a.clone().into_bytes(), b.clone().into_bytes()))); + Ok(()) + } +} + +#[test] +fn beefy_protocol_name() { + let chain_spec = GenericChainSpec::::from_json_file(std::path::PathBuf::from( + "../chain-spec/res/chain_spec.json", + )) + .unwrap() + .cloned_box(); + + // Create protocol name using random genesis hash. + let genesis_hash = H256::random(); + let expected = format!("/{}/beefy/1", hex::encode(genesis_hash)); + let proto_name = beefy_protocol_name::standard_name(&genesis_hash, &chain_spec); + assert_eq!(proto_name.to_string(), expected); + + // Create protocol name using hardcoded genesis hash. Verify exact representation. + let genesis_hash = [ + 50, 4, 60, 123, 58, 106, 216, 246, 194, 188, 139, 193, 33, 212, 202, 171, 9, 55, 123, 94, + 8, 43, 12, 251, 187, 57, 173, 19, 188, 74, 205, 147, + ]; + let expected = + "/32043c7b3a6ad8f6c2bc8bc121d4caab09377b5e082b0cfbbb39ad13bc4acd93/beefy/1".to_string(); + let proto_name = beefy_protocol_name::standard_name(&genesis_hash, &chain_spec); + assert_eq!(proto_name.to_string(), expected); +} + +// TODO: compiler warns us about unused `signed_commitment_stream`, will use in later tests +#[allow(dead_code)] +#[derive(Clone)] +pub(crate) struct BeefyLinkHalf { + signed_commitment_stream: BeefySignedCommitmentStream, + beefy_best_block_stream: BeefyBestBlockStream, +} + +#[derive(Default)] +pub(crate) struct PeerData { + pub(crate) beefy_link_half: Mutex>, + pub(crate) test_modifiers: Option, +} + +impl PeerData { + pub(crate) fn use_validator_set(&mut self, validator_set: &ValidatorSet) { + if let Some(tm) = self.test_modifiers.as_mut() { + tm.active_validators = validator_set.clone(); + } else { + self.test_modifiers = Some(TestModifiers { + active_validators: validator_set.clone(), + corrupt_mmr_roots: false, + }); + } + } +} + +pub(crate) struct BeefyTestNet { + peers: Vec, +} + +impl BeefyTestNet { + pub(crate) fn new(n_authority: usize, n_full: usize) -> Self { + let mut net = BeefyTestNet { peers: Vec::with_capacity(n_authority + n_full) }; + for _ in 0..n_authority { + net.add_authority_peer(); + } + for _ in 0..n_full { + net.add_full_peer(); + } + net + } + + pub(crate) fn add_authority_peer(&mut self) { + self.add_full_peer_with_config(FullPeerConfig { + notifications_protocols: vec![BEEFY_PROTOCOL_NAME.into()], + is_authority: true, + ..Default::default() + }) + } + + pub(crate) fn generate_blocks( + &mut self, + count: usize, + session_length: u64, + validator_set: &BeefyValidatorSet, + ) { + self.peer(0).generate_blocks(count, BlockOrigin::File, |builder| { + let mut block = builder.build().unwrap().block; + + let block_num = *block.header.number(); + let num_byte = block_num.to_le_bytes().into_iter().next().unwrap(); + let mmr_root = MmrRootHash::repeat_byte(num_byte); + + add_mmr_digest(&mut block.header, mmr_root); + + if block_num % session_length == 0 { + add_auth_change_digest(&mut block.header, validator_set.clone()); + } + + block + }); + } +} + +impl TestNetFactory for BeefyTestNet { + type Verifier = PassThroughVerifier; + type BlockImport = PeersClient; + type PeerData = PeerData; + + /// Create new test network with peers and given config. + fn from_config(_config: &ProtocolConfig) -> Self { + BeefyTestNet { peers: Vec::new() } + } + + fn make_verifier( + &self, + _client: PeersClient, + _cfg: &ProtocolConfig, + _: &PeerData, + ) -> Self::Verifier { + PassThroughVerifier::new(false) // use non-instant finality. + } + + fn make_block_import( + &self, + client: PeersClient, + ) -> ( + BlockImportAdapter, + Option>, + Self::PeerData, + ) { + (client.as_block_import(), None, PeerData::default()) + } + + fn peer(&mut self, i: usize) -> &mut BeefyPeer { + &mut self.peers[i] + } + + fn peers(&self) -> &Vec { + &self.peers + } + + fn mut_peers)>(&mut self, closure: F) { + closure(&mut self.peers); + } + + fn add_full_peer(&mut self) { + self.add_full_peer_with_config(FullPeerConfig { + notifications_protocols: vec![BEEFY_PROTOCOL_NAME.into()], + is_authority: false, + ..Default::default() + }) + } +} + +fn add_mmr_digest(header: &mut Header, mmr_hash: MmrRootHash) { + header.digest_mut().push(DigestItem::Consensus( + BEEFY_ENGINE_ID, + ConsensusLog::::MmrRoot(mmr_hash).encode(), + )); +} + +fn add_auth_change_digest(header: &mut Header, new_auth_set: BeefyValidatorSet) { + header.digest_mut().push(DigestItem::Consensus( + BEEFY_ENGINE_ID, + ConsensusLog::::AuthoritiesChange(new_auth_set).encode(), + )); +} + +pub(crate) fn make_beefy_ids(keys: &[BeefyKeyring]) -> Vec { + keys.iter().map(|key| key.clone().public().into()).collect() +} + +pub(crate) fn create_beefy_keystore(authority: BeefyKeyring) -> SyncCryptoStorePtr { + let keystore = Arc::new(LocalKeystore::in_memory()); + SyncCryptoStore::ecdsa_generate_new(&*keystore, BeefyKeyType, Some(&authority.to_seed())) + .expect("Creates authority key"); + keystore +} + +pub(crate) fn create_beefy_worker( + peer: &BeefyPeer, + key: &BeefyKeyring, + min_block_delta: u32, +) -> BeefyWorker>> { + let keystore = create_beefy_keystore(*key); + + let (signed_commitment_sender, signed_commitment_stream) = + BeefySignedCommitmentStream::::channel(); + let (beefy_best_block_sender, beefy_best_block_stream) = + BeefyBestBlockStream::::channel(); + + let beefy_link_half = BeefyLinkHalf { signed_commitment_stream, beefy_best_block_stream }; + *peer.data.beefy_link_half.lock() = Some(beefy_link_half); + let test_modifiers = peer.data.test_modifiers.clone().unwrap(); + + let network = peer.network_service().clone(); + let sync_oracle = network.clone(); + let gossip_validator = Arc::new(crate::gossip::GossipValidator::new()); + let gossip_engine = + GossipEngine::new(network, BEEFY_PROTOCOL_NAME, gossip_validator.clone(), None); + let worker_params = crate::worker::WorkerParams { + client: peer.client().as_client(), + backend: peer.client().as_backend(), + key_store: Some(keystore).into(), + signed_commitment_sender, + beefy_best_block_sender, + gossip_engine, + gossip_validator, + min_block_delta, + metrics: None, + sync_oracle, + }; + + BeefyWorker::<_, _, _, _>::new(worker_params, test_modifiers) +} + +// Spawns beefy voters. Returns a future to spawn on the runtime. +fn initialize_beefy( + net: &mut BeefyTestNet, + peers: &[BeefyKeyring], + min_block_delta: u32, +) -> impl Future { + let voters = FuturesUnordered::new(); + + for (peer_id, key) in peers.iter().enumerate() { + let worker = create_beefy_worker(&net.peers[peer_id], key, min_block_delta); + let gadget = worker.run(); + + fn assert_send(_: &T) {} + assert_send(&gadget); + voters.push(gadget); + } + + voters.for_each(|_| async move {}) +} + +fn block_until(future: impl Future + Unpin, net: &Arc>, runtime: &mut Runtime) { + let drive_to_completion = futures::future::poll_fn(|cx| { + net.lock().poll(cx); + Poll::<()>::Pending + }); + runtime.block_on(future::select(future, drive_to_completion)); +} + +fn run_for(duration: Duration, net: &Arc>, runtime: &mut Runtime) { + let sleep = runtime.spawn(async move { tokio::time::sleep(duration).await }); + block_until(sleep, net, runtime); +} + +pub(crate) fn get_beefy_streams( + net: &mut BeefyTestNet, + peers: &[BeefyKeyring], +) -> (Vec>, Vec>>) { + let mut best_block_streams = Vec::new(); + let mut signed_commitment_streams = Vec::new(); + for peer_id in 0..peers.len() { + let beefy_link_half = + net.peer(peer_id).data.beefy_link_half.lock().as_ref().unwrap().clone(); + let BeefyLinkHalf { signed_commitment_stream, beefy_best_block_stream } = beefy_link_half; + best_block_streams.push(beefy_best_block_stream.subscribe()); + signed_commitment_streams.push(signed_commitment_stream.subscribe()); + } + (best_block_streams, signed_commitment_streams) +} + +fn wait_for_best_beefy_blocks( + streams: Vec>, + net: &Arc>, + runtime: &mut Runtime, + expected_beefy_blocks: &[u64], +) { + let mut wait_for = Vec::new(); + let len = expected_beefy_blocks.len(); + streams.into_iter().enumerate().for_each(|(i, stream)| { + let mut expected = expected_beefy_blocks.iter(); + wait_for.push(Box::pin(stream.take(len).for_each(move |best_beefy_hash| { + let expected = expected.next(); + async move { + let block_id = BlockId::hash(best_beefy_hash); + let header = + net.lock().peer(i).client().as_client().expect_header(block_id).unwrap(); + let best_beefy = *header.number(); + + assert_eq!(expected, Some(best_beefy).as_ref()); + } + }))); + }); + let wait_for = futures::future::join_all(wait_for); + block_until(wait_for, net, runtime); +} + +fn wait_for_beefy_signed_commitments( + streams: Vec>>, + net: &Arc>, + runtime: &mut Runtime, + expected_commitment_block_nums: &[u64], +) { + let mut wait_for = Vec::new(); + let len = expected_commitment_block_nums.len(); + streams.into_iter().for_each(|stream| { + let mut expected = expected_commitment_block_nums.iter(); + wait_for.push(Box::pin(stream.take(len).for_each(move |signed_commitment| { + let expected = expected.next(); + async move { + let commitment_block_num = signed_commitment.commitment.block_number; + assert_eq!(expected, Some(commitment_block_num).as_ref()); + // TODO: also verify commitment payload, validator set id, and signatures. + } + }))); + }); + let wait_for = futures::future::join_all(wait_for); + block_until(wait_for, net, runtime); +} + +fn streams_empty_after_timeout( + streams: Vec>, + net: &Arc>, + runtime: &mut Runtime, + timeout: Option, +) where + T: std::fmt::Debug, + T: std::cmp::PartialEq, +{ + if let Some(timeout) = timeout { + run_for(timeout, net, runtime); + } + streams.into_iter().for_each(|mut stream| { + runtime.block_on(future::poll_fn(move |cx| { + assert_eq!(stream.poll_next_unpin(cx), Poll::Pending); + Poll::Ready(()) + })); + }); +} + +fn finalize_block_and_wait_for_beefy( + net: &Arc>, + peers: &[BeefyKeyring], + runtime: &mut Runtime, + finalize_targets: &[u64], + expected_beefy: &[u64], +) { + let (best_blocks, signed_commitments) = get_beefy_streams(&mut *net.lock(), peers); + + for block in finalize_targets { + let finalize = BlockId::number(*block); + for i in 0..peers.len() { + net.lock().peer(i).client().as_client().finalize_block(finalize, None).unwrap(); + } + } + + if expected_beefy.is_empty() { + // run for 1 second then verify no new best beefy block available + let timeout = Some(Duration::from_millis(500)); + streams_empty_after_timeout(best_blocks, &net, runtime, timeout); + streams_empty_after_timeout(signed_commitments, &net, runtime, None); + } else { + // run until expected beefy blocks are received + wait_for_best_beefy_blocks(best_blocks, &net, runtime, expected_beefy); + wait_for_beefy_signed_commitments(signed_commitments, &net, runtime, expected_beefy); + } +} + +#[test] +fn beefy_finalizing_blocks() { + sp_tracing::try_init_simple(); + + let mut runtime = Runtime::new().unwrap(); + let peers = &[BeefyKeyring::Alice, BeefyKeyring::Bob]; + let validator_set = ValidatorSet::new(make_beefy_ids(peers), 0).unwrap(); + let session_len = 10; + let min_block_delta = 4; + + let mut net = BeefyTestNet::new(2, 0); + + for i in 0..peers.len() { + net.peer(i).data.use_validator_set(&validator_set); + } + runtime.spawn(initialize_beefy(&mut net, peers, min_block_delta)); + + // push 42 blocks including `AuthorityChange` digests every 10 blocks. + net.generate_blocks(42, session_len, &validator_set); + net.block_until_sync(); + + let net = Arc::new(Mutex::new(net)); + + // Minimum BEEFY block delta is 4. + + // finalize block #5 -> BEEFY should finalize #1 (mandatory) and #5 from diff-power-of-two rule. + finalize_block_and_wait_for_beefy(&net, peers, &mut runtime, &[5], &[1, 5]); + + // GRANDPA finalize #10 -> BEEFY finalize #10 (mandatory) + finalize_block_and_wait_for_beefy(&net, peers, &mut runtime, &[10], &[10]); + + // GRANDPA finalize #18 -> BEEFY finalize #14, then #18 (diff-power-of-two rule) + finalize_block_and_wait_for_beefy(&net, peers, &mut runtime, &[18], &[14, 18]); + + // GRANDPA finalize #20 -> BEEFY finalize #20 (mandatory) + finalize_block_and_wait_for_beefy(&net, peers, &mut runtime, &[20], &[20]); + + // GRANDPA finalize #21 -> BEEFY finalize nothing (yet) because min delta is 4 + finalize_block_and_wait_for_beefy(&net, peers, &mut runtime, &[21], &[]); +} + +#[test] +fn lagging_validators() { + sp_tracing::try_init_simple(); + + let mut runtime = Runtime::new().unwrap(); + let peers = &[BeefyKeyring::Charlie, BeefyKeyring::Dave]; + let validator_set = ValidatorSet::new(make_beefy_ids(peers), 0).unwrap(); + let session_len = 30; + let min_block_delta = 1; + + let mut net = BeefyTestNet::new(2, 0); + for i in 0..peers.len() { + net.peer(i).data.use_validator_set(&validator_set); + } + runtime.spawn(initialize_beefy(&mut net, peers, min_block_delta)); + + // push 42 blocks including `AuthorityChange` digests every 30 blocks. + net.generate_blocks(42, session_len, &validator_set); + net.block_until_sync(); + + let net = Arc::new(Mutex::new(net)); + + // finalize block #15 -> BEEFY should finalize #1 (mandatory) and #9, #13, #14, #15 from + // diff-power-of-two rule. + finalize_block_and_wait_for_beefy(&net, peers, &mut runtime, &[15], &[1, 9, 13, 14, 15]); + + // Charlie finalizes #25, Dave lags behind + let finalize = BlockId::number(25); + let (best_blocks, signed_commitments) = get_beefy_streams(&mut *net.lock(), peers); + net.lock().peer(0).client().as_client().finalize_block(finalize, None).unwrap(); + // verify nothing gets finalized by BEEFY + let timeout = Some(Duration::from_millis(500)); + streams_empty_after_timeout(best_blocks, &net, &mut runtime, timeout); + streams_empty_after_timeout(signed_commitments, &net, &mut runtime, None); + + // Dave catches up and also finalizes #25 + let (best_blocks, signed_commitments) = get_beefy_streams(&mut *net.lock(), peers); + net.lock().peer(1).client().as_client().finalize_block(finalize, None).unwrap(); + // expected beefy finalizes block #17 from diff-power-of-two + wait_for_best_beefy_blocks(best_blocks, &net, &mut runtime, &[23, 24, 25]); + wait_for_beefy_signed_commitments(signed_commitments, &net, &mut runtime, &[23, 24, 25]); + + // Both finalize #30 (mandatory session) and #32 -> BEEFY finalize #30 (mandatory), #31, #32 + finalize_block_and_wait_for_beefy(&net, peers, &mut runtime, &[30, 32], &[30, 31, 32]); +} + +#[test] +fn correct_beefy_payload() { + sp_tracing::try_init_simple(); + + let mut runtime = Runtime::new().unwrap(); + let peers = + &[BeefyKeyring::Alice, BeefyKeyring::Bob, BeefyKeyring::Charlie, BeefyKeyring::Dave]; + let validator_set = ValidatorSet::new(make_beefy_ids(peers), 0).unwrap(); + let session_len = 20; + let min_block_delta = 2; + + let mut net = BeefyTestNet::new(4, 0); + for i in 0..peers.len() { + net.peer(i).data.use_validator_set(&validator_set); + } + + // Dave will vote on bad mmr roots + net.peer(3).data.test_modifiers.as_mut().map(|tm| tm.corrupt_mmr_roots = true); + runtime.spawn(initialize_beefy(&mut net, peers, min_block_delta)); + + // push 10 blocks + net.generate_blocks(12, session_len, &validator_set); + net.block_until_sync(); + + let net = Arc::new(Mutex::new(net)); + // with 3 good voters and 1 bad one, consensus should happen and best blocks produced. + finalize_block_and_wait_for_beefy(&net, peers, &mut runtime, &[10], &[1, 9]); + + let (best_blocks, signed_commitments) = + get_beefy_streams(&mut *net.lock(), &[BeefyKeyring::Alice]); + + // now 2 good validators and 1 bad one are voting + net.lock() + .peer(0) + .client() + .as_client() + .finalize_block(BlockId::number(11), None) + .unwrap(); + net.lock() + .peer(1) + .client() + .as_client() + .finalize_block(BlockId::number(11), None) + .unwrap(); + net.lock() + .peer(3) + .client() + .as_client() + .finalize_block(BlockId::number(11), None) + .unwrap(); + + // verify consensus is _not_ reached + let timeout = Some(Duration::from_millis(500)); + streams_empty_after_timeout(best_blocks, &net, &mut runtime, timeout); + streams_empty_after_timeout(signed_commitments, &net, &mut runtime, None); + + // 3rd good validator catches up and votes as well + let (best_blocks, signed_commitments) = + get_beefy_streams(&mut *net.lock(), &[BeefyKeyring::Alice]); + net.lock() + .peer(2) + .client() + .as_client() + .finalize_block(BlockId::number(11), None) + .unwrap(); + + // verify consensus is reached + wait_for_best_beefy_blocks(best_blocks, &net, &mut runtime, &[11]); + wait_for_beefy_signed_commitments(signed_commitments, &net, &mut runtime, &[11]); +} diff --git a/client/beefy/src/worker.rs b/client/beefy/src/worker.rs index 3f23638758eca..85674c09a278b 100644 --- a/client/beefy/src/worker.rs +++ b/client/beefy/src/worker.rs @@ -16,7 +16,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use std::{collections::BTreeSet, fmt::Debug, marker::PhantomData, sync::Arc}; +use std::{collections::BTreeSet, fmt::Debug, marker::PhantomData, sync::Arc, time::Duration}; use codec::{Codec, Decode, Encode}; use futures::{future, FutureExt, StreamExt}; @@ -28,6 +28,7 @@ use sc_network_gossip::GossipEngine; use sp_api::BlockId; use sp_arithmetic::traits::AtLeast32Bit; +use sp_consensus::SyncOracle; use sp_runtime::{ generic::OpaqueDigestItemId, traits::{Block, Header, NumberFor}, @@ -35,7 +36,7 @@ use sp_runtime::{ }; use beefy_primitives::{ - crypto::{AuthorityId, Public, Signature}, + crypto::{AuthorityId, Signature}, known_payload_ids, BeefyApi, Commitment, ConsensusLog, MmrRootHash, Payload, SignedCommitment, ValidatorSet, VersionedFinalityProof, VoteMessage, BEEFY_ENGINE_ID, GENESIS_AUTHORITY_SET_ID, }; @@ -47,10 +48,11 @@ use crate::{ metric_inc, metric_set, metrics::Metrics, notification::{BeefyBestBlockSender, BeefySignedCommitmentSender}, - round, Client, + round::Rounds, + Client, }; -pub(crate) struct WorkerParams +pub(crate) struct WorkerParams where B: Block, { @@ -63,14 +65,16 @@ where pub gossip_validator: Arc>, pub min_block_delta: u32, pub metrics: Option, + pub sync_oracle: SO, } /// A BEEFY worker plays the BEEFY protocol -pub(crate) struct BeefyWorker +pub(crate) struct BeefyWorker where B: Block, BE: Backend, C: Client, + SO: SyncOracle + Send + Sync + Clone + 'static, { client: Arc, backend: Arc, @@ -81,26 +85,32 @@ where /// Min delta in block numbers between two blocks, BEEFY should vote on min_block_delta: u32, metrics: Option, - rounds: Option>>, + rounds: Option>, finality_notifications: FinalityNotifications, /// Best block we received a GRANDPA notification for - best_grandpa_block: NumberFor, + best_grandpa_block_header: ::Header, /// Best block a BEEFY voting round has been concluded for best_beefy_block: Option>, /// Used to keep RPC worker up to date on latest/best beefy beefy_best_block_sender: BeefyBestBlockSender, /// Validator set id for the last signed commitment last_signed_id: u64, + /// Handle to the sync oracle + sync_oracle: SO, // keep rustc happy _backend: PhantomData, + #[cfg(test)] + // behavior modifiers used in tests + test_res: tests::TestModifiers, } -impl BeefyWorker +impl BeefyWorker where B: Block + Codec, BE: Backend, C: Client, C::Api: BeefyApi, + SO: SyncOracle + Send + Sync + Clone + 'static, { /// Return a new BEEFY worker instance. /// @@ -108,7 +118,12 @@ where /// BEEFY pallet has been deployed on-chain. /// /// The BEEFY pallet is needed in order to keep track of the BEEFY authority set. - pub(crate) fn new(worker_params: WorkerParams) -> Self { + pub(crate) fn new( + worker_params: WorkerParams, + #[cfg(test)] + // behavior modifiers used in tests + test_res: tests::TestModifiers, + ) -> Self { let WorkerParams { client, backend, @@ -119,8 +134,13 @@ where gossip_validator, min_block_delta, metrics, + sync_oracle, } = worker_params; + let last_finalized_header = client + .expect_header(BlockId::number(client.info().finalized_number)) + .expect("latest block always has header available; qed."); + BeefyWorker { client: client.clone(), backend, @@ -128,200 +148,176 @@ where signed_commitment_sender, gossip_engine: Arc::new(Mutex::new(gossip_engine)), gossip_validator, - min_block_delta, + // always target at least one block better than current best beefy + min_block_delta: min_block_delta.max(1), metrics, rounds: None, finality_notifications: client.finality_notification_stream(), - best_grandpa_block: client.info().finalized_number, + best_grandpa_block_header: last_finalized_header, best_beefy_block: None, last_signed_id: 0, beefy_best_block_sender, + sync_oracle, _backend: PhantomData, + #[cfg(test)] + test_res, } } } -impl BeefyWorker +impl BeefyWorker where B: Block, BE: Backend, C: Client, C::Api: BeefyApi, + SO: SyncOracle + Send + Sync + Clone + 'static, { - /// Return `true`, if we should vote on block `number` - fn should_vote_on(&self, number: NumberFor) -> bool { - let best_beefy_block = if let Some(block) = self.best_beefy_block { - block - } else { - debug!(target: "beefy", "🥩 Missing best BEEFY block - won't vote for: {:?}", number); - return false - }; - - let target = vote_target(self.best_grandpa_block, best_beefy_block, self.min_block_delta); - - trace!(target: "beefy", "🥩 should_vote_on: #{:?}, next_block_to_vote_on: #{:?}", number, target); - - metric_set!(self, beefy_should_vote_on, target); - - number == target - } - - /// Return the current active validator set at header `header`. - /// - /// Note that the validator set could be `None`. This is the case if we don't find - /// a BEEFY authority set change and we can't fetch the authority set from the - /// BEEFY on-chain state. - /// - /// Such a failure is usually an indication that the BEEFY pallet has not been deployed (yet). - fn validator_set(&self, header: &B::Header) -> Option> { - let new = if let Some(new) = find_authorities_change::(header) { - Some(new) + /// Return `Some(number)` if we should be voting on block `number` now, + /// return `None` if there is no block we should vote on now. + fn current_vote_target(&self) -> Option> { + let rounds = if let Some(r) = &self.rounds { + r } else { - let at = BlockId::hash(header.hash()); - self.client.runtime_api().validator_set(&at).ok().flatten() + debug!(target: "beefy", "🥩 No voting round started"); + return None }; - trace!(target: "beefy", "🥩 active validator set: {:?}", new); - - new + let best_finalized = *self.best_grandpa_block_header.number(); + // `target` is guaranteed > `best_beefy` since `min_block_delta` is at least `1`. + let target = vote_target( + best_finalized, + self.best_beefy_block, + *rounds.session_start(), + self.min_block_delta, + ); + trace!( + target: "beefy", + "🥩 best beefy: #{:?}, best finalized: #{:?}, current_vote_target: {:?}", + self.best_beefy_block, + best_finalized, + target + ); + if let Some(target) = &target { + metric_set!(self, beefy_should_vote_on, target); + } + target } /// Verify `active` validator set for `block` against the key store /// - /// The critical case is, if we do have a public key in the key store which is not - /// part of the active validator set. + /// We want to make sure that we have _at least one_ key in our keystore that + /// is part of the validator set, that's because if there are no local keys + /// then we can't perform our job as a validator. /// /// Note that for a non-authority node there will be no keystore, and we will /// return an error and don't check. The error can usually be ignored. fn verify_validator_set( &self, block: &NumberFor, - active: &ValidatorSet, + active: &ValidatorSet, ) -> Result<(), error::Error> { - let active: BTreeSet<&Public> = active.validators().iter().collect(); + let active: BTreeSet<&AuthorityId> = active.validators().iter().collect(); let public_keys = self.key_store.public_keys()?; - let store: BTreeSet<&Public> = public_keys.iter().collect(); + let store: BTreeSet<&AuthorityId> = public_keys.iter().collect(); - let missing: Vec<_> = store.difference(&active).cloned().collect(); - - if !missing.is_empty() { - debug!(target: "beefy", "🥩 for block {:?} public key missing in validator set: {:?}", block, missing); + if store.intersection(&active).count() == 0 { + let msg = "no authority public key found in store".to_string(); + debug!(target: "beefy", "🥩 for block {:?} {}", block, msg); + Err(error::Error::Keystore(msg)) + } else { + Ok(()) } - - Ok(()) } - fn handle_finality_notification(&mut self, notification: FinalityNotification) { - trace!(target: "beefy", "🥩 Finality notification: {:?}", notification); - - // update best GRANDPA finalized block we have seen - self.best_grandpa_block = *notification.header.number(); - - if let Some(active) = self.validator_set(¬ification.header) { - // Authority set change or genesis set id triggers new voting rounds - // - // TODO: (grandpa-bridge-gadget#366) Enacting a new authority set will also - // implicitly 'conclude' the currently active BEEFY voting round by starting a - // new one. This should be replaced by proper round life-cycle handling. - if self.rounds.is_none() || - active.id() != self.rounds.as_ref().unwrap().validator_set_id() || - (active.id() == GENESIS_AUTHORITY_SET_ID && self.best_beefy_block.is_none()) - { - debug!(target: "beefy", "🥩 New active validator set id: {:?}", active); - metric_set!(self, beefy_validator_set_id, active.id()); - - // BEEFY should produce a signed commitment for each session - if active.id() != self.last_signed_id + 1 && active.id() != GENESIS_AUTHORITY_SET_ID - { - metric_inc!(self, beefy_skipped_sessions); - } - - if log_enabled!(target: "beefy", log::Level::Debug) { - // verify the new validator set - only do it if we're also logging the warning - let _ = self.verify_validator_set(notification.header.number(), &active); - } - - let id = active.id(); - self.rounds = Some(round::Rounds::new(active)); - - debug!(target: "beefy", "🥩 New Rounds for id: {:?}", id); - - self.best_beefy_block = Some(*notification.header.number()); + /// Set best BEEFY block to `block_num`. + /// + /// Also sends/updates the best BEEFY block hash to the RPC worker. + fn set_best_beefy_block(&mut self, block_num: NumberFor) { + if Some(block_num) > self.best_beefy_block { + // Try to get block hash ourselves. + let block_hash = match self.client.hash(block_num) { + Ok(h) => h, + Err(e) => { + error!(target: "beefy", "🥩 Failed to get hash for block number {}: {}", + block_num, e); + None + }, + }; + // Update RPC worker with new best BEEFY block hash. + block_hash.map(|hash| { self.beefy_best_block_sender - .notify(|| Ok::<_, ()>(notification.hash.clone())) - .expect("forwards closure result; the closure always returns Ok; qed."); - - // this metric is kind of 'fake'. Best BEEFY block should only be updated once we - // have a signed commitment for the block. Remove once the above TODO is done. - metric_set!(self, beefy_best_block, *notification.header.number()); - } + .notify(|| Ok::<_, ()>(hash)) + .expect("forwards closure result; the closure always returns Ok; qed.") + }); + // Set new best BEEFY block number. + self.best_beefy_block = Some(block_num); + metric_set!(self, beefy_best_block, block_num); + } else { + debug!(target: "beefy", "🥩 Can't set best beefy to older: {}", block_num); } + } - if self.should_vote_on(*notification.header.number()) { - let (validators, validator_set_id) = if let Some(rounds) = &self.rounds { - (rounds.validators(), rounds.validator_set_id()) - } else { - debug!(target: "beefy", "🥩 Missing validator set - can't vote for: {:?}", notification.header.hash()); - return - }; - let authority_id = if let Some(id) = self.key_store.authority_id(validators) { - debug!(target: "beefy", "🥩 Local authority id: {:?}", id); - id - } else { - debug!(target: "beefy", "🥩 Missing validator id - can't vote for: {:?}", notification.header.hash()); - return - }; - - let mmr_root = - if let Some(hash) = find_mmr_root_digest::(¬ification.header) { - hash - } else { - warn!(target: "beefy", "🥩 No MMR root digest found for: {:?}", notification.header.hash()); - return - }; - - let payload = Payload::new(known_payload_ids::MMR_ROOT_ID, mmr_root.encode()); - let commitment = Commitment { - payload, - block_number: notification.header.number(), - validator_set_id, - }; - let encoded_commitment = commitment.encode(); + /// Handle session changes by starting new voting round for mandatory blocks. + fn init_session_at(&mut self, active: ValidatorSet, session_start: NumberFor) { + debug!(target: "beefy", "🥩 New active validator set: {:?}", active); + metric_set!(self, beefy_validator_set_id, active.id()); + // BEEFY should produce a signed commitment for each session + if active.id() != self.last_signed_id + 1 && active.id() != GENESIS_AUTHORITY_SET_ID { + metric_inc!(self, beefy_skipped_sessions); + } - let signature = match self.key_store.sign(&authority_id, &*encoded_commitment) { - Ok(sig) => sig, - Err(err) => { - warn!(target: "beefy", "🥩 Error signing commitment: {:?}", err); - return - }, - }; + if log_enabled!(target: "beefy", log::Level::Debug) { + // verify the new validator set - only do it if we're also logging the warning + let _ = self.verify_validator_set(&session_start, &active); + } - trace!( - target: "beefy", - "🥩 Produced signature using {:?}, is_valid: {:?}", - authority_id, - BeefyKeystore::verify(&authority_id, &signature, &*encoded_commitment) - ); + let prev_validator_set = if let Some(r) = &self.rounds { + r.validator_set().clone() + } else { + // no previous rounds present use new validator set instead (genesis case) + active.clone() + }; + let id = active.id(); + self.rounds = Some(Rounds::new(session_start, active, prev_validator_set)); + info!(target: "beefy", "🥩 New Rounds for validator set id: {:?} with session_start {:?}", id, session_start); + } - let message = VoteMessage { commitment, id: authority_id, signature }; + fn handle_finality_notification(&mut self, notification: &FinalityNotification) { + trace!(target: "beefy", "🥩 Finality notification: {:?}", notification); + let number = *notification.header.number(); - let encoded_message = message.encode(); + // On start-up ignore old finality notifications that we're not interested in. + if number <= *self.best_grandpa_block_header.number() { + debug!(target: "beefy", "🥩 Got unexpected finality for old block #{:?}", number); + return + } - metric_inc!(self, beefy_votes_sent); + // update best GRANDPA finalized block we have seen + self.best_grandpa_block_header = notification.header.clone(); - debug!(target: "beefy", "🥩 Sent vote message: {:?}", message); + self.handle_finality(¬ification.header); + } - self.handle_vote( - (message.commitment.payload, *message.commitment.block_number), - (message.id, message.signature), - ); + fn handle_finality(&mut self, header: &B::Header) { + // Check for and handle potential new session. + if let Some(new_validator_set) = find_authorities_change::(header) { + self.init_session_at(new_validator_set, *header.number()); + } - self.gossip_engine.lock().gossip_message(topic::(), encoded_message, false); + // Vote if there's now a new vote target. + if let Some(target_number) = self.current_vote_target() { + self.do_vote(target_number); } } - fn handle_vote(&mut self, round: (Payload, NumberFor), vote: (Public, Signature)) { + fn handle_vote( + &mut self, + round: (Payload, NumberFor), + vote: (AuthorityId, Signature), + self_vote: bool, + ) { self.gossip_validator.note_round(round.1); let rounds = if let Some(rounds) = self.rounds.as_mut() { @@ -331,12 +327,12 @@ where return }; - let vote_added = rounds.add_vote(&round, vote); + if rounds.add_vote(&round, vote, self_vote) { + if let Some(signatures) = rounds.try_conclude(&round) { + self.gossip_validator.conclude_round(round.1); - if vote_added && rounds.is_done(&round) { - if let Some(signatures) = rounds.drop(&round) { // id is stored for skipped session metric calculation - self.last_signed_id = rounds.validator_set_id(); + self.last_signed_id = rounds.validator_set_id_for(round.1); let block_num = round.1; let commitment = Commitment { @@ -351,48 +347,167 @@ where info!(target: "beefy", "🥩 Round #{} concluded, committed: {:?}.", round.1, signed_commitment); - if self - .backend - .append_justification( - BlockId::Number(block_num), - ( - BEEFY_ENGINE_ID, - VersionedFinalityProof::V1(signed_commitment.clone()).encode(), - ), - ) - .is_err() - { - // just a trace, because until the round lifecycle is improved, we will - // conclude certain rounds multiple times. - trace!(target: "beefy", "🥩 Failed to append justification: {:?}", signed_commitment); + if let Err(e) = self.backend.append_justification( + BlockId::Number(block_num), + ( + BEEFY_ENGINE_ID, + VersionedFinalityProof::V1(signed_commitment.clone()).encode(), + ), + ) { + trace!(target: "beefy", "🥩 Error {:?} on appending justification: {:?}", e, signed_commitment); } self.signed_commitment_sender .notify(|| Ok::<_, ()>(signed_commitment)) .expect("forwards closure result; the closure always returns Ok; qed."); - self.best_beefy_block = Some(block_num); - if let Err(err) = self.client.hash(block_num).map(|h| { - if let Some(hash) = h { - self.beefy_best_block_sender - .notify(|| Ok::<_, ()>(hash)) - .expect("forwards closure result; the closure always returns Ok; qed."); - } - }) { - error!(target: "beefy", "🥩 Failed to get hash for block number {}: {}", - block_num, err); - } + self.set_best_beefy_block(block_num); - metric_set!(self, beefy_best_block, block_num); + // Vote if there's now a new vote target. + if let Some(target_number) = self.current_vote_target() { + self.do_vote(target_number); + } } } } + /// Create and gossip Signed Commitment for block number `target_number`. + /// + /// Also handle this self vote by calling `self.handle_vote()` for it. + fn do_vote(&mut self, target_number: NumberFor) { + trace!(target: "beefy", "🥩 Try voting on {}", target_number); + + // Most of the time we get here, `target` is actually `best_grandpa`, + // avoid asking `client` for header in that case. + let target_header = if target_number == *self.best_grandpa_block_header.number() { + self.best_grandpa_block_header.clone() + } else { + match self.client.expect_header(BlockId::Number(target_number)) { + Ok(h) => h, + Err(err) => { + debug!( + target: "beefy", + "🥩 Could not get header for block #{:?} (error: {:?}), skipping vote..", + target_number, + err + ); + return + }, + } + }; + let target_hash = target_header.hash(); + + let mmr_root = if let Some(hash) = self.extract_mmr_root_digest(&target_header) { + hash + } else { + warn!(target: "beefy", "🥩 No MMR root digest found for: {:?}", target_hash); + return + }; + let payload = Payload::new(known_payload_ids::MMR_ROOT_ID, mmr_root.encode()); + + let (validators, validator_set_id) = if let Some(rounds) = &self.rounds { + if !rounds.should_self_vote(&(payload.clone(), target_number)) { + debug!(target: "beefy", "🥩 Don't double vote for block number: {:?}", target_number); + return + } + (rounds.validators_for(target_number), rounds.validator_set_id_for(target_number)) + } else { + debug!(target: "beefy", "🥩 Missing validator set - can't vote for: {:?}", target_hash); + return + }; + let authority_id = if let Some(id) = self.key_store.authority_id(validators) { + debug!(target: "beefy", "🥩 Local authority id: {:?}", id); + id + } else { + debug!(target: "beefy", "🥩 Missing validator id - can't vote for: {:?}", target_hash); + return + }; + + let commitment = Commitment { payload, block_number: target_number, validator_set_id }; + let encoded_commitment = commitment.encode(); + + let signature = match self.key_store.sign(&authority_id, &*encoded_commitment) { + Ok(sig) => sig, + Err(err) => { + warn!(target: "beefy", "🥩 Error signing commitment: {:?}", err); + return + }, + }; + + trace!( + target: "beefy", + "🥩 Produced signature using {:?}, is_valid: {:?}", + authority_id, + BeefyKeystore::verify(&authority_id, &signature, &*encoded_commitment) + ); + + let message = VoteMessage { commitment, id: authority_id, signature }; + + let encoded_message = message.encode(); + + metric_inc!(self, beefy_votes_sent); + + debug!(target: "beefy", "🥩 Sent vote message: {:?}", message); + + self.handle_vote( + (message.commitment.payload, message.commitment.block_number), + (message.id, message.signature), + true, + ); + + self.gossip_engine.lock().gossip_message(topic::(), encoded_message, false); + } + + /// Wait for BEEFY runtime pallet to be available. + #[cfg(not(test))] + async fn wait_for_runtime_pallet(&mut self) { + self.client + .finality_notification_stream() + .take_while(|notif| { + let at = BlockId::hash(notif.header.hash()); + if let Some(active) = self.client.runtime_api().validator_set(&at).ok().flatten() { + if active.id() == GENESIS_AUTHORITY_SET_ID { + // When starting from genesis, there is no session boundary digest. + // Just initialize `rounds` to Block #1 as BEEFY mandatory block. + self.init_session_at(active, 1u32.into()); + } + // In all other cases, we just go without `rounds` initialized, meaning the + // worker won't vote until it witnesses a session change. + // Once we'll implement 'initial sync' (catch-up), the worker will be able to + // start voting right away. + self.handle_finality_notification(notif); + future::ready(false) + } else { + trace!(target: "beefy", "🥩 Finality notification: {:?}", notif); + trace!(target: "beefy", "🥩 Waiting for BEEFY pallet to become available..."); + future::ready(true) + } + }) + .for_each(|_| future::ready(())) + .await; + // get a new stream that provides _new_ notifications (from here on out) + self.finality_notifications = self.client.finality_notification_stream(); + } + + /// For tests don't use runtime pallet. Start rounds from block #1. + #[cfg(test)] + async fn wait_for_runtime_pallet(&mut self) { + let active = self.test_res.active_validators.clone(); + self.init_session_at(active, 1u32.into()); + } + + /// Main loop for BEEFY worker. + /// + /// Wait for BEEFY runtime pallet to be available, then start the main async loop + /// which is driven by finality notifications and gossiped votes. pub(crate) async fn run(mut self) { + info!(target: "beefy", "🥩 run BEEFY worker, best grandpa: #{:?}.", self.best_grandpa_block_header.number()); + self.wait_for_runtime_pallet().await; + let mut votes = Box::pin(self.gossip_engine.lock().messages_for(topic::()).filter_map( |notification| async move { debug!(target: "beefy", "🥩 Got vote message: {:?}", notification); - VoteMessage::, Public, Signature>::decode( + VoteMessage::, AuthorityId, Signature>::decode( &mut ¬ification.message[..], ) .ok() @@ -400,13 +515,18 @@ where )); loop { + while self.sync_oracle.is_major_syncing() { + debug!(target: "beefy", "Waiting for major sync to complete..."); + futures_timer::Delay::new(Duration::from_secs(5)).await; + } + let engine = self.gossip_engine.clone(); let gossip_engine = future::poll_fn(|cx| engine.lock().poll_unpin(cx)); futures::select! { notification = self.finality_notifications.next().fuse() => { if let Some(notification) = notification { - self.handle_finality_notification(notification); + self.handle_finality_notification(¬ification); } else { return; } @@ -416,6 +536,7 @@ where self.handle_vote( (vote.commitment.payload, vote.commitment.block_number), (vote.id, vote.signature), + false ); } else { return; @@ -428,20 +549,36 @@ where } } } + + /// Simple wrapper over mmr root extraction. + #[cfg(not(test))] + fn extract_mmr_root_digest(&self, header: &B::Header) -> Option { + find_mmr_root_digest::(header) + } + + /// For tests, have the option to modify mmr root. + #[cfg(test)] + fn extract_mmr_root_digest(&self, header: &B::Header) -> Option { + let mut mmr_root = find_mmr_root_digest::(header); + if self.test_res.corrupt_mmr_roots { + mmr_root.as_mut().map(|hash| *hash ^= MmrRootHash::random()); + } + mmr_root + } } /// Extract the MMR root hash from a digest in the given header, if it exists. -fn find_mmr_root_digest(header: &B::Header) -> Option +fn find_mmr_root_digest(header: &B::Header) -> Option where B: Block, - Id: Codec, { - header.digest().logs().iter().find_map(|log| { - match log.try_to::>(OpaqueDigestItemId::Consensus(&BEEFY_ENGINE_ID)) { - Some(ConsensusLog::MmrRoot(root)) => Some(root), - _ => None, - } - }) + let id = OpaqueDigestItemId::Consensus(&BEEFY_ENGINE_ID); + + let filter = |log: ConsensusLog| match log { + ConsensusLog::MmrRoot(root) => Some(root), + _ => None, + }; + header.digest().convert_first(|l| l.try_to(id).and_then(filter)) } /// Scan the `header` digest log for a BEEFY validator set change. Return either the new @@ -456,119 +593,402 @@ where ConsensusLog::AuthoritiesChange(validator_set) => Some(validator_set), _ => None, }; - header.digest().convert_first(|l| l.try_to(id).and_then(filter)) } -/// Calculate next block number to vote on -fn vote_target(best_grandpa: N, best_beefy: N, min_delta: u32) -> N +/// Calculate next block number to vote on. +/// +/// Return `None` if there is no voteable target yet. +fn vote_target( + best_grandpa: N, + best_beefy: Option, + session_start: N, + min_delta: u32, +) -> Option where N: AtLeast32Bit + Copy + Debug, { - let diff = best_grandpa.saturating_sub(best_beefy); - let diff = diff.saturated_into::(); - let target = best_beefy + min_delta.max(diff.next_power_of_two()).into(); - - trace!( - target: "beefy", - "🥩 vote target - diff: {:?}, next_power_of_two: {:?}, target block: #{:?}", - diff, - diff.next_power_of_two(), - target, - ); - - target + // if the mandatory block (session_start) does not have a beefy justification yet, + // we vote on it + let target = match best_beefy { + None => { + trace!( + target: "beefy", + "🥩 vote target - mandatory block: #{:?}", + session_start, + ); + session_start + }, + Some(bbb) if bbb < session_start => { + trace!( + target: "beefy", + "🥩 vote target - mandatory block: #{:?}", + session_start, + ); + session_start + }, + Some(bbb) => { + let diff = best_grandpa.saturating_sub(bbb) + 1u32.into(); + let diff = diff.saturated_into::() / 2; + let target = bbb + min_delta.max(diff.next_power_of_two()).into(); + + trace!( + target: "beefy", + "🥩 vote target - diff: {:?}, next_power_of_two: {:?}, target block: #{:?}", + diff, + diff.next_power_of_two(), + target, + ); + + target + }, + }; + + // Don't vote for targets until they've been finalized + // (`target` can be > `best_grandpa` when `min_delta` is big enough). + if target > best_grandpa { + None + } else { + Some(target) + } } #[cfg(test)] -mod tests { - use super::vote_target; +pub(crate) mod tests { + use super::*; + use crate::{ + keystore::tests::Keyring, + tests::{create_beefy_worker, get_beefy_streams, make_beefy_ids, BeefyTestNet}, + }; + + use futures::{executor::block_on, future::poll_fn, task::Poll}; + + use sc_client_api::HeaderBackend; + use sc_network::NetworkService; + use sc_network_test::{PeersFullClient, TestNetFactory}; + use sp_api::HeaderT; + use substrate_test_runtime_client::{ + runtime::{Block, Digest, DigestItem, Header, H256}, + Backend, + }; + + #[derive(Clone)] + pub struct TestModifiers { + pub active_validators: ValidatorSet, + pub corrupt_mmr_roots: bool, + } #[test] fn vote_on_min_block_delta() { - let t = vote_target(1u32, 0, 4); - assert_eq!(4, t); - let t = vote_target(2u32, 0, 4); - assert_eq!(4, t); - let t = vote_target(3u32, 0, 4); - assert_eq!(4, t); - let t = vote_target(4u32, 0, 4); - assert_eq!(4, t); - - let t = vote_target(4u32, 4, 4); - assert_eq!(8, t); - - let t = vote_target(10u32, 10, 4); - assert_eq!(14, t); - let t = vote_target(11u32, 10, 4); - assert_eq!(14, t); - let t = vote_target(12u32, 10, 4); - assert_eq!(14, t); - let t = vote_target(13u32, 10, 4); - assert_eq!(14, t); - - let t = vote_target(10u32, 10, 8); - assert_eq!(18, t); - let t = vote_target(11u32, 10, 8); - assert_eq!(18, t); - let t = vote_target(12u32, 10, 8); - assert_eq!(18, t); - let t = vote_target(13u32, 10, 8); - assert_eq!(18, t); + let t = vote_target(1u32, Some(1), 1, 4); + assert_eq!(None, t); + let t = vote_target(2u32, Some(1), 1, 4); + assert_eq!(None, t); + let t = vote_target(4u32, Some(2), 1, 4); + assert_eq!(None, t); + let t = vote_target(6u32, Some(2), 1, 4); + assert_eq!(Some(6), t); + + let t = vote_target(9u32, Some(4), 1, 4); + assert_eq!(Some(8), t); + + let t = vote_target(10u32, Some(10), 1, 8); + assert_eq!(None, t); + let t = vote_target(12u32, Some(10), 1, 8); + assert_eq!(None, t); + let t = vote_target(18u32, Some(10), 1, 8); + assert_eq!(Some(18), t); } #[test] fn vote_on_power_of_two() { - let t = vote_target(1008u32, 1000, 4); - assert_eq!(1008, t); + let t = vote_target(1008u32, Some(1000), 1, 4); + assert_eq!(Some(1004), t); - let t = vote_target(1016u32, 1000, 4); - assert_eq!(1016, t); + let t = vote_target(1016u32, Some(1000), 1, 4); + assert_eq!(Some(1008), t); - let t = vote_target(1032u32, 1000, 4); - assert_eq!(1032, t); + let t = vote_target(1032u32, Some(1000), 1, 4); + assert_eq!(Some(1016), t); - let t = vote_target(1064u32, 1000, 4); - assert_eq!(1064, t); + let t = vote_target(1064u32, Some(1000), 1, 4); + assert_eq!(Some(1032), t); - let t = vote_target(1128u32, 1000, 4); - assert_eq!(1128, t); + let t = vote_target(1128u32, Some(1000), 1, 4); + assert_eq!(Some(1064), t); - let t = vote_target(1256u32, 1000, 4); - assert_eq!(1256, t); + let t = vote_target(1256u32, Some(1000), 1, 4); + assert_eq!(Some(1128), t); - let t = vote_target(1512u32, 1000, 4); - assert_eq!(1512, t); + let t = vote_target(1512u32, Some(1000), 1, 4); + assert_eq!(Some(1256), t); - let t = vote_target(1024u32, 0, 4); - assert_eq!(1024, t); + let t = vote_target(1024u32, Some(1), 1, 4); + assert_eq!(Some(513), t); } #[test] fn vote_on_target_block() { - let t = vote_target(1008u32, 1002, 4); - assert_eq!(1010, t); - let t = vote_target(1010u32, 1002, 4); - assert_eq!(1010, t); - - let t = vote_target(1016u32, 1006, 4); - assert_eq!(1022, t); - let t = vote_target(1022u32, 1006, 4); - assert_eq!(1022, t); - - let t = vote_target(1032u32, 1012, 4); - assert_eq!(1044, t); - let t = vote_target(1044u32, 1012, 4); - assert_eq!(1044, t); - - let t = vote_target(1064u32, 1014, 4); - assert_eq!(1078, t); - let t = vote_target(1078u32, 1014, 4); - assert_eq!(1078, t); - - let t = vote_target(1128u32, 1008, 4); - assert_eq!(1136, t); - let t = vote_target(1136u32, 1008, 4); - assert_eq!(1136, t); + let t = vote_target(1008u32, Some(1002), 1, 4); + assert_eq!(Some(1006), t); + let t = vote_target(1010u32, Some(1002), 1, 4); + assert_eq!(Some(1006), t); + + let t = vote_target(1016u32, Some(1006), 1, 4); + assert_eq!(Some(1014), t); + let t = vote_target(1022u32, Some(1006), 1, 4); + assert_eq!(Some(1014), t); + + let t = vote_target(1032u32, Some(1012), 1, 4); + assert_eq!(Some(1028), t); + let t = vote_target(1044u32, Some(1012), 1, 4); + assert_eq!(Some(1028), t); + + let t = vote_target(1064u32, Some(1014), 1, 4); + assert_eq!(Some(1046), t); + let t = vote_target(1078u32, Some(1014), 1, 4); + assert_eq!(Some(1046), t); + + let t = vote_target(1128u32, Some(1008), 1, 4); + assert_eq!(Some(1072), t); + let t = vote_target(1136u32, Some(1008), 1, 4); + assert_eq!(Some(1072), t); + } + + #[test] + fn vote_on_mandatory_block() { + let t = vote_target(1008u32, Some(1002), 1004, 4); + assert_eq!(Some(1004), t); + let t = vote_target(1016u32, Some(1006), 1007, 4); + assert_eq!(Some(1007), t); + let t = vote_target(1064u32, Some(1014), 1063, 4); + assert_eq!(Some(1063), t); + let t = vote_target(1320u32, Some(1012), 1234, 4); + assert_eq!(Some(1234), t); + + let t = vote_target(1128u32, Some(1008), 1008, 4); + assert_eq!(Some(1072), t); + } + + #[test] + fn extract_authorities_change_digest() { + let mut header = Header::new( + 1u32.into(), + Default::default(), + Default::default(), + Default::default(), + Digest::default(), + ); + + // verify empty digest shows nothing + assert!(find_authorities_change::(&header).is_none()); + + let peers = &[Keyring::One, Keyring::Two]; + let id = 42; + let validator_set = ValidatorSet::new(make_beefy_ids(peers), id).unwrap(); + header.digest_mut().push(DigestItem::Consensus( + BEEFY_ENGINE_ID, + ConsensusLog::::AuthoritiesChange(validator_set.clone()).encode(), + )); + + // verify validator set is correctly extracted from digest + let extracted = find_authorities_change::(&header); + assert_eq!(extracted, Some(validator_set)); + } + + #[test] + fn extract_mmr_root_digest() { + let mut header = Header::new( + 1u32.into(), + Default::default(), + Default::default(), + Default::default(), + Digest::default(), + ); + + // verify empty digest shows nothing + assert!(find_mmr_root_digest::(&header).is_none()); + + let mmr_root_hash = H256::random(); + header.digest_mut().push(DigestItem::Consensus( + BEEFY_ENGINE_ID, + ConsensusLog::::MmrRoot(mmr_root_hash.clone()).encode(), + )); + + // verify validator set is correctly extracted from digest + let extracted = find_mmr_root_digest::(&header); + assert_eq!(extracted, Some(mmr_root_hash)); + } + + #[test] + fn should_vote_target() { + let keys = &[Keyring::Alice]; + let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); + let mut net = BeefyTestNet::new(1, 0); + net.peer(0).data.use_validator_set(&validator_set); + let mut worker = create_beefy_worker(&net.peer(0), &keys[0], 1); + + // rounds not initialized -> should vote: `None` + assert_eq!(worker.current_vote_target(), None); + + let set_up = |worker: &mut BeefyWorker< + Block, + PeersFullClient, + Backend, + Arc>, + >, + best_grandpa: u64, + best_beefy: Option, + session_start: u64, + min_delta: u32| { + let grandpa_header = Header::new( + best_grandpa, + Default::default(), + Default::default(), + Default::default(), + Default::default(), + ); + worker.best_grandpa_block_header = grandpa_header; + worker.best_beefy_block = best_beefy; + worker.min_block_delta = min_delta; + worker.rounds = + Some(Rounds::new(session_start, validator_set.clone(), validator_set.clone())); + }; + + // under min delta + set_up(&mut worker, 1, Some(1), 1, 4); + assert_eq!(worker.current_vote_target(), None); + set_up(&mut worker, 5, Some(2), 1, 4); + assert_eq!(worker.current_vote_target(), None); + + // vote on min delta + set_up(&mut worker, 9, Some(4), 1, 4); + assert_eq!(worker.current_vote_target(), Some(8)); + set_up(&mut worker, 18, Some(10), 1, 8); + assert_eq!(worker.current_vote_target(), Some(18)); + + // vote on power of two + set_up(&mut worker, 1008, Some(1000), 1, 1); + assert_eq!(worker.current_vote_target(), Some(1004)); + set_up(&mut worker, 1016, Some(1000), 1, 2); + assert_eq!(worker.current_vote_target(), Some(1008)); + + // nothing new to vote on + set_up(&mut worker, 1000, Some(1000), 1, 1); + assert_eq!(worker.current_vote_target(), None); + + // vote on mandatory + set_up(&mut worker, 1008, None, 1000, 8); + assert_eq!(worker.current_vote_target(), Some(1000)); + set_up(&mut worker, 1008, Some(1000), 1001, 8); + assert_eq!(worker.current_vote_target(), Some(1001)); + } + + #[test] + fn keystore_vs_validator_set() { + let keys = &[Keyring::Alice]; + let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); + let mut net = BeefyTestNet::new(1, 0); + net.peer(0).data.use_validator_set(&validator_set); + let mut worker = create_beefy_worker(&net.peer(0), &keys[0], 1); + + // keystore doesn't contain other keys than validators' + assert_eq!(worker.verify_validator_set(&1, &validator_set), Ok(())); + + // unknown `Bob` key + let keys = &[Keyring::Bob]; + let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); + let err_msg = "no authority public key found in store".to_string(); + let expected = Err(error::Error::Keystore(err_msg)); + assert_eq!(worker.verify_validator_set(&1, &validator_set), expected); + + // worker has no keystore + worker.key_store = None.into(); + let expected_err = Err(error::Error::Keystore("no Keystore".into())); + assert_eq!(worker.verify_validator_set(&1, &validator_set), expected_err); + } + + #[test] + fn setting_best_beefy_block() { + let keys = &[Keyring::Alice]; + let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); + let mut net = BeefyTestNet::new(1, 0); + net.peer(0).data.use_validator_set(&validator_set); + let mut worker = create_beefy_worker(&net.peer(0), &keys[0], 1); + + let (mut best_block_streams, _) = get_beefy_streams(&mut net, keys); + let mut best_block_stream = best_block_streams.drain(..).next().unwrap(); + + // no 'best beefy block' + assert_eq!(worker.best_beefy_block, None); + block_on(poll_fn(move |cx| { + assert_eq!(best_block_stream.poll_next_unpin(cx), Poll::Pending); + Poll::Ready(()) + })); + + // unknown hash for block #1 + let (mut best_block_streams, _) = get_beefy_streams(&mut net, keys); + let mut best_block_stream = best_block_streams.drain(..).next().unwrap(); + worker.set_best_beefy_block(1); + assert_eq!(worker.best_beefy_block, Some(1)); + block_on(poll_fn(move |cx| { + assert_eq!(best_block_stream.poll_next_unpin(cx), Poll::Pending); + Poll::Ready(()) + })); + + // generate 2 blocks, try again expect success + let (mut best_block_streams, _) = get_beefy_streams(&mut net, keys); + let mut best_block_stream = best_block_streams.drain(..).next().unwrap(); + net.generate_blocks(2, 10, &validator_set); + + worker.set_best_beefy_block(2); + assert_eq!(worker.best_beefy_block, Some(2)); + block_on(poll_fn(move |cx| { + match best_block_stream.poll_next_unpin(cx) { + // expect Some(hash-of-block-2) + Poll::Ready(Some(hash)) => { + let block_num = net.peer(0).client().as_client().number(hash).unwrap(); + assert_eq!(block_num, Some(2)); + }, + v => panic!("unexpected value: {:?}", v), + } + Poll::Ready(()) + })); + } + + #[test] + fn setting_initial_session() { + let keys = &[Keyring::Alice]; + let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); + let mut net = BeefyTestNet::new(1, 0); + net.peer(0).data.use_validator_set(&validator_set); + let mut worker = create_beefy_worker(&net.peer(0), &keys[0], 1); + + assert!(worker.rounds.is_none()); + + // verify setting the correct validator sets and boundary for genesis session + worker.init_session_at(validator_set.clone(), 1); + + let worker_rounds = worker.rounds.as_ref().unwrap(); + assert_eq!(worker_rounds.validator_set(), &validator_set); + assert_eq!(worker_rounds.session_start(), &1); + // in genesis case both current and prev validator sets are the same + assert_eq!(worker_rounds.validator_set_id_for(1), validator_set.id()); + assert_eq!(worker_rounds.validator_set_id_for(2), validator_set.id()); + + // new validator set + let keys = &[Keyring::Bob]; + let new_validator_set = ValidatorSet::new(make_beefy_ids(keys), 1).unwrap(); + + // verify setting the correct validator sets and boundary for non-genesis session + worker.init_session_at(new_validator_set.clone(), 11); + + let worker_rounds = worker.rounds.as_ref().unwrap(); + assert_eq!(worker_rounds.validator_set(), &new_validator_set); + assert_eq!(worker_rounds.session_start(), &11); + // mandatory block gets prev set, further blocks get new set + assert_eq!(worker_rounds.validator_set_id_for(11), validator_set.id()); + assert_eq!(worker_rounds.validator_set_id_for(12), new_validator_set.id()); + assert_eq!(worker_rounds.validator_set_id_for(13), new_validator_set.id()); } } diff --git a/frame/beefy/src/lib.rs b/frame/beefy/src/lib.rs index 4aa1d1337cd0a..744a06561e8c2 100644 --- a/frame/beefy/src/lib.rs +++ b/frame/beefy/src/lib.rs @@ -105,20 +105,20 @@ impl Pallet { } fn change_authorities(new: Vec, queued: Vec) { - // As in GRANDPA, we trigger a validator set change only if the the validator - // set has actually changed. - if new != Self::authorities() { - >::put(&new); - - let next_id = Self::validator_set_id() + 1u64; - >::put(next_id); - if let Some(validator_set) = ValidatorSet::::new(new, next_id) { - let log = DigestItem::Consensus( - BEEFY_ENGINE_ID, - ConsensusLog::AuthoritiesChange(validator_set).encode(), - ); - >::deposit_log(log); - } + // Always issue a change if `session` says that the validators have changed. + // Even if their session keys are the same as before, the underlying economic + // identities have changed. Furthermore, the digest below is used to signal + // BEEFY mandatory blocks. + >::put(&new); + + let next_id = Self::validator_set_id() + 1u64; + >::put(next_id); + if let Some(validator_set) = ValidatorSet::::new(new, next_id) { + let log = DigestItem::Consensus( + BEEFY_ENGINE_ID, + ConsensusLog::AuthoritiesChange(validator_set).encode(), + ); + >::deposit_log(log); } >::put(&queued); diff --git a/primitives/beefy/Cargo.toml b/primitives/beefy/Cargo.toml index 4aa53aff2c3cb..cf901f4a34fc6 100644 --- a/primitives/beefy/Cargo.toml +++ b/primitives/beefy/Cargo.toml @@ -4,8 +4,13 @@ version = "4.0.0-dev" authors = ["Parity Technologies "] edition = "2021" license = "Apache-2.0" +homepage = "https://substrate.io" repository = "https://github.com/paritytech/substrate" description = "Primitives for BEEFY protocol." +readme = "README.md" + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] [dependencies] codec = { version = "3.0.0", package = "parity-scale-codec", default-features = false, features = ["derive"] } diff --git a/test-utils/runtime/Cargo.toml b/test-utils/runtime/Cargo.toml index 8c61cbbf8adbe..ad7dca5e08fb1 100644 --- a/test-utils/runtime/Cargo.toml +++ b/test-utils/runtime/Cargo.toml @@ -13,6 +13,7 @@ publish = false targets = ["x86_64-unknown-linux-gnu"] [dependencies] +beefy-primitives = { version = "4.0.0-dev", default-features = false, path = "../../primitives/beefy" } sp-application-crypto = { version = "6.0.0", default-features = false, path = "../../primitives/application-crypto" } sp-consensus-aura = { version = "0.10.0-dev", default-features = false, path = "../../primitives/consensus/aura" } sp-consensus-babe = { version = "0.10.0-dev", default-features = false, path = "../../primitives/consensus/babe" } @@ -65,6 +66,7 @@ default = [ "std", ] std = [ + "beefy-primitives/std", "sp-application-crypto/std", "sp-consensus-aura/std", "sp-consensus-babe/std", diff --git a/test-utils/runtime/src/lib.rs b/test-utils/runtime/src/lib.rs index 861d95efb3087..743652a0ee899 100644 --- a/test-utils/runtime/src/lib.rs +++ b/test-utils/runtime/src/lib.rs @@ -926,6 +926,12 @@ cfg_if! { } } + impl beefy_primitives::BeefyApi for RuntimeApi { + fn validator_set() -> Option> { + None + } + } + impl frame_system_rpc_runtime_api::AccountNonceApi for Runtime { fn account_nonce(_account: AccountId) -> Index { 0