diff --git a/prdoc/pr_5469.prdoc b/prdoc/pr_5469.prdoc new file mode 100644 index 000000000000..1e6aa3c0c072 --- /dev/null +++ b/prdoc/pr_5469.prdoc @@ -0,0 +1,11 @@ +title: Syncing strategy refactoring + +doc: + - audience: Node Dev + description: | + Mostly internal changes to syncing strategies that is a step towards making them configurable/extensible in the + future. It is unlikely that external developers will need to change their code. + +crates: + - name: sc-network-sync + bump: major diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 4b6ccb085834..86c1a7abf744 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -33,7 +33,7 @@ use crate::{ }, strategy::{ warp::{EncodedProof, WarpProofRequest, WarpSyncConfig}, - StrategyKey, SyncingAction, SyncingConfig, SyncingStrategy, + PolkadotSyncingStrategy, StrategyKey, SyncingAction, SyncingConfig, SyncingStrategy, }, types::{ BadPeer, ExtendedPeerInfo, OpaqueStateRequest, OpaqueStateResponse, PeerRequest, SyncEvent, @@ -189,7 +189,7 @@ pub struct Peer { pub struct SyncingEngine { /// Syncing strategy. - strategy: SyncingStrategy, + strategy: PolkadotSyncingStrategy, /// Blockchain client. client: Arc, @@ -389,7 +389,8 @@ where ); // Initialize syncing strategy. - let strategy = SyncingStrategy::new(syncing_config, client.clone(), warp_sync_config)?; + let strategy = + PolkadotSyncingStrategy::new(syncing_config, client.clone(), warp_sync_config)?; let block_announce_protocol_name = block_announce_config.protocol_name().clone(); let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync", 100_000); @@ -697,7 +698,7 @@ where number, ) }, - // Nothing to do, this is handled internally by `SyncingStrategy`. + // Nothing to do, this is handled internally by `PolkadotSyncingStrategy`. SyncingAction::Finished => {}, } } diff --git a/substrate/client/network/sync/src/strategy.rs b/substrate/client/network/sync/src/strategy.rs index ad3a9461c93b..f8d6976bbaa0 100644 --- a/substrate/client/network/sync/src/strategy.rs +++ b/substrate/client/network/sync/src/strategy.rs @@ -16,7 +16,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -//! [`SyncingStrategy`] is a proxy between [`crate::engine::SyncingEngine`] +//! [`PolkadotSyncingStrategy`] is a proxy between [`crate::engine::SyncingEngine`] //! and specific syncing algorithms. pub mod chain_sync; @@ -29,7 +29,7 @@ use crate::{ types::{BadPeer, OpaqueStateRequest, OpaqueStateResponse, SyncStatus}, LOG_TARGET, }; -use chain_sync::{ChainSync, ChainSyncAction, ChainSyncMode}; +use chain_sync::{ChainSync, ChainSyncMode}; use log::{debug, error, info}; use prometheus_endpoint::Registry; use sc_client_api::{BlockBackend, ProofProvider}; @@ -59,6 +59,108 @@ fn chain_sync_mode(sync_mode: SyncMode) -> ChainSyncMode { } } +/// Syncing strategy for syncing engine to use +pub trait SyncingStrategy: Send +where + B: BlockT, +{ + /// Notify syncing state machine that a new sync peer has connected. + fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor); + + /// Notify that a sync peer has disconnected. + fn remove_peer(&mut self, peer_id: &PeerId); + + /// Submit a validated block announcement. + /// + /// Returns new best hash & best number of the peer if they are updated. + #[must_use] + fn on_validated_block_announce( + &mut self, + is_best: bool, + peer_id: PeerId, + announce: &BlockAnnounce, + ) -> Option<(B::Hash, NumberFor)>; + + /// Configure an explicit fork sync request in case external code has detected that there is a + /// stale fork missing. + /// + /// Note that this function should not be used for recent blocks. + /// Sync should be able to download all the recent forks normally. + /// + /// Passing empty `peers` set effectively removes the sync request. + fn set_sync_fork_request(&mut self, peers: Vec, hash: &B::Hash, number: NumberFor); + + /// Request extra justification. + fn request_justification(&mut self, hash: &B::Hash, number: NumberFor); + + /// Clear extra justification requests. + fn clear_justification_requests(&mut self); + + /// Report a justification import (successful or not). + fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor, success: bool); + + /// Process block response. + fn on_block_response( + &mut self, + peer_id: PeerId, + key: StrategyKey, + request: BlockRequest, + blocks: Vec>, + ); + + /// Process state response. + fn on_state_response( + &mut self, + peer_id: PeerId, + key: StrategyKey, + response: OpaqueStateResponse, + ); + + /// Process warp proof response. + fn on_warp_proof_response( + &mut self, + peer_id: &PeerId, + key: StrategyKey, + response: EncodedProof, + ); + + /// A batch of blocks that have been processed, with or without errors. + /// + /// Call this when a batch of blocks that have been processed by the import queue, with or + /// without errors. + fn on_blocks_processed( + &mut self, + imported: usize, + count: usize, + results: Vec<(Result>, BlockImportError>, B::Hash)>, + ); + + /// Notify a syncing strategy that a block has been finalized. + fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor); + + /// Inform sync about a new best imported block. + fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor); + + // Are we in major sync mode? + fn is_major_syncing(&self) -> bool; + + /// Get the number of peers known to the syncing strategy. + fn num_peers(&self) -> usize; + + /// Returns the current sync status. + fn status(&self) -> SyncStatus; + + /// Get the total number of downloaded blocks. + fn num_downloaded_blocks(&self) -> usize; + + /// Get an estimate of the number of parallel sync requests. + fn num_sync_requests(&self) -> usize; + + /// Get actions that should be performed by the owner on the strategy's behalf + #[must_use] + fn actions(&mut self) -> Result>, ClientError>; +} + /// Syncing configuration containing data for all strategies. #[derive(Clone, Debug)] pub struct SyncingConfig { @@ -104,7 +206,7 @@ pub enum SyncingAction { number: NumberFor, justifications: Justifications, }, - /// Strategy finished. Nothing to do, this is handled by `SyncingStrategy`. + /// Strategy finished. Nothing to do, this is handled by `PolkadotSyncingStrategy`. Finished, } @@ -140,26 +242,8 @@ impl From> for SyncingAction { } } -impl From> for SyncingAction { - fn from(action: ChainSyncAction) -> Self { - match action { - ChainSyncAction::SendBlockRequest { peer_id, request } => - SyncingAction::SendBlockRequest { peer_id, key: StrategyKey::ChainSync, request }, - ChainSyncAction::SendStateRequest { peer_id, request } => - SyncingAction::SendStateRequest { peer_id, key: StrategyKey::ChainSync, request }, - ChainSyncAction::CancelRequest { peer_id } => - SyncingAction::CancelRequest { peer_id, key: StrategyKey::ChainSync }, - ChainSyncAction::DropPeer(bad_peer) => SyncingAction::DropPeer(bad_peer), - ChainSyncAction::ImportBlocks { origin, blocks } => - SyncingAction::ImportBlocks { origin, blocks }, - ChainSyncAction::ImportJustifications { peer_id, hash, number, justifications } => - SyncingAction::ImportJustifications { peer_id, hash, number, justifications }, - } - } -} - -/// Proxy to specific syncing strategies. -pub struct SyncingStrategy { +/// Proxy to specific syncing strategies used in Polkadot. +pub struct PolkadotSyncingStrategy { /// Initial syncing configuration. config: SyncingConfig, /// Client used by syncing strategies. @@ -171,11 +255,11 @@ pub struct SyncingStrategy { /// `ChainSync` strategy.` chain_sync: Option>, /// Connected peers and their best blocks used to seed a new strategy when switching to it in - /// [`SyncingStrategy::proceed_to_next`]. + /// `PolkadotSyncingStrategy::proceed_to_next`. peer_best_blocks: HashMap)>, } -impl SyncingStrategy +impl SyncingStrategy for PolkadotSyncingStrategy where B: BlockT, Client: HeaderBackend @@ -186,46 +270,7 @@ where + Sync + 'static, { - /// Initialize a new syncing strategy. - pub fn new( - config: SyncingConfig, - client: Arc, - warp_sync_config: Option>, - ) -> Result { - if let SyncMode::Warp = config.mode { - let warp_sync_config = warp_sync_config - .expect("Warp sync configuration must be supplied in warp sync mode."); - let warp_sync = WarpSync::new(client.clone(), warp_sync_config); - Ok(Self { - config, - client, - warp: Some(warp_sync), - state: None, - chain_sync: None, - peer_best_blocks: Default::default(), - }) - } else { - let chain_sync = ChainSync::new( - chain_sync_mode(config.mode), - client.clone(), - config.max_parallel_downloads, - config.max_blocks_per_request, - config.metrics_registry.as_ref(), - std::iter::empty(), - )?; - Ok(Self { - config, - client, - warp: None, - state: None, - chain_sync: Some(chain_sync), - peer_best_blocks: Default::default(), - }) - } - } - - /// Notify that a new peer has connected. - pub fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor) { + fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor) { self.peer_best_blocks.insert(peer_id, (best_hash, best_number)); self.warp.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number)); @@ -233,8 +278,7 @@ where self.chain_sync.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number)); } - /// Notify that a peer has disconnected. - pub fn remove_peer(&mut self, peer_id: &PeerId) { + fn remove_peer(&mut self, peer_id: &PeerId) { self.warp.as_mut().map(|s| s.remove_peer(peer_id)); self.state.as_mut().map(|s| s.remove_peer(peer_id)); self.chain_sync.as_mut().map(|s| s.remove_peer(peer_id)); @@ -242,10 +286,7 @@ where self.peer_best_blocks.remove(peer_id); } - /// Submit a validated block announcement. - /// - /// Returns new best hash & best number of the peer if they are updated. - pub fn on_validated_block_announce( + fn on_validated_block_announce( &mut self, is_best: bool, peer_id: PeerId, @@ -278,46 +319,35 @@ where new_best } - /// Configure an explicit fork sync request in case external code has detected that there is a - /// stale fork missing. - pub fn set_sync_fork_request( - &mut self, - peers: Vec, - hash: &B::Hash, - number: NumberFor, - ) { + fn set_sync_fork_request(&mut self, peers: Vec, hash: &B::Hash, number: NumberFor) { // Fork requests are only handled by `ChainSync`. if let Some(ref mut chain_sync) = self.chain_sync { chain_sync.set_sync_fork_request(peers.clone(), hash, number); } } - /// Request extra justification. - pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { + fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { // Justifications can only be requested via `ChainSync`. if let Some(ref mut chain_sync) = self.chain_sync { chain_sync.request_justification(hash, number); } } - /// Clear extra justification requests. - pub fn clear_justification_requests(&mut self) { + fn clear_justification_requests(&mut self) { // Justification requests can only be cleared by `ChainSync`. if let Some(ref mut chain_sync) = self.chain_sync { chain_sync.clear_justification_requests(); } } - /// Report a justification import (successful or not). - pub fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor, success: bool) { + fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor, success: bool) { // Only `ChainSync` is interested in justification import. if let Some(ref mut chain_sync) = self.chain_sync { chain_sync.on_justification_import(hash, number, success); } } - /// Process block response. - pub fn on_block_response( + fn on_block_response( &mut self, peer_id: PeerId, key: StrategyKey, @@ -329,7 +359,7 @@ where } else if let (StrategyKey::ChainSync, Some(ref mut chain_sync)) = (key, &mut self.chain_sync) { - chain_sync.on_block_response(peer_id, request, blocks); + chain_sync.on_block_response(peer_id, key, request, blocks); } else { error!( target: LOG_TARGET, @@ -340,8 +370,7 @@ where } } - /// Process state response. - pub fn on_state_response( + fn on_state_response( &mut self, peer_id: PeerId, key: StrategyKey, @@ -352,7 +381,7 @@ where } else if let (StrategyKey::ChainSync, Some(ref mut chain_sync)) = (key, &mut self.chain_sync) { - chain_sync.on_state_response(peer_id, response); + chain_sync.on_state_response(peer_id, key, response); } else { error!( target: LOG_TARGET, @@ -363,8 +392,7 @@ where } } - /// Process warp proof response. - pub fn on_warp_proof_response( + fn on_warp_proof_response( &mut self, peer_id: &PeerId, key: StrategyKey, @@ -382,8 +410,7 @@ where } } - /// A batch of blocks have been processed, with or without errors. - pub fn on_blocks_processed( + fn on_blocks_processed( &mut self, imported: usize, count: usize, @@ -397,24 +424,21 @@ where } } - /// Notify a syncing strategy that a block has been finalized. - pub fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor) { + fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor) { // Only `ChainSync` is interested in block finalization notifications. if let Some(ref mut chain_sync) = self.chain_sync { chain_sync.on_block_finalized(hash, number); } } - /// Inform sync about a new best imported block. - pub fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor) { + fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor) { // This is relevant to `ChainSync` only. if let Some(ref mut chain_sync) = self.chain_sync { chain_sync.update_chain_info(best_hash, best_number); } } - // Are we in major sync mode? - pub fn is_major_syncing(&self) -> bool { + fn is_major_syncing(&self) -> bool { self.warp.is_some() || self.state.is_some() || match self.chain_sync { @@ -423,13 +447,11 @@ where } } - /// Get the number of peers known to the syncing strategy. - pub fn num_peers(&self) -> usize { + fn num_peers(&self) -> usize { self.peer_best_blocks.len() } - /// Returns the current sync status. - pub fn status(&self) -> SyncStatus { + fn status(&self) -> SyncStatus { // This function presumes that strategies are executed serially and must be refactored // once we have parallel strategies. if let Some(ref warp) = self.warp { @@ -443,21 +465,17 @@ where } } - /// Get the total number of downloaded blocks. - pub fn num_downloaded_blocks(&self) -> usize { + fn num_downloaded_blocks(&self) -> usize { self.chain_sync .as_ref() .map_or(0, |chain_sync| chain_sync.num_downloaded_blocks()) } - /// Get an estimate of the number of parallel sync requests. - pub fn num_sync_requests(&self) -> usize { + fn num_sync_requests(&self) -> usize { self.chain_sync.as_ref().map_or(0, |chain_sync| chain_sync.num_sync_requests()) } - /// Get actions that should be performed by the owner on the strategy's behalf - #[must_use] - pub fn actions(&mut self) -> Result>, ClientError> { + fn actions(&mut self) -> Result>, ClientError> { // This function presumes that strategies are executed serially and must be refactored once // we have parallel strategies. let actions: Vec<_> = if let Some(ref mut warp) = self.warp { @@ -465,7 +483,7 @@ where } else if let Some(ref mut state) = self.state { state.actions().map(Into::into).collect() } else if let Some(ref mut chain_sync) = self.chain_sync { - chain_sync.actions().map(Into::into).collect() + chain_sync.actions()? } else { unreachable!("At least one syncing strategy is always active; qed") }; @@ -476,6 +494,56 @@ where Ok(actions) } +} + +impl PolkadotSyncingStrategy +where + B: BlockT, + Client: HeaderBackend + + BlockBackend + + HeaderMetadata + + ProofProvider + + Send + + Sync + + 'static, +{ + /// Initialize a new syncing strategy. + pub fn new( + config: SyncingConfig, + client: Arc, + warp_sync_config: Option>, + ) -> Result { + if let SyncMode::Warp = config.mode { + let warp_sync_config = warp_sync_config + .expect("Warp sync configuration must be supplied in warp sync mode."); + let warp_sync = WarpSync::new(client.clone(), warp_sync_config); + Ok(Self { + config, + client, + warp: Some(warp_sync), + state: None, + chain_sync: None, + peer_best_blocks: Default::default(), + }) + } else { + let chain_sync = ChainSync::new( + chain_sync_mode(config.mode), + client.clone(), + config.max_parallel_downloads, + config.max_blocks_per_request, + config.metrics_registry.as_ref(), + std::iter::empty(), + )?; + Ok(Self { + config, + client, + warp: None, + state: None, + chain_sync: Some(chain_sync), + peer_best_blocks: Default::default(), + }) + } + } /// Proceed with the next strategy if the active one finished. pub fn proceed_to_next(&mut self) -> Result<(), ClientError> { diff --git a/substrate/client/network/sync/src/strategy/chain_sync.rs b/substrate/client/network/sync/src/strategy/chain_sync.rs index cca83a5055cb..a8ba5558d1bc 100644 --- a/substrate/client/network/sync/src/strategy/chain_sync.rs +++ b/substrate/client/network/sync/src/strategy/chain_sync.rs @@ -35,7 +35,8 @@ use crate::{ strategy::{ disconnected_peers::DisconnectedPeers, state_sync::{ImportResult, StateSync, StateSyncProvider}, - warp::{WarpSyncPhase, WarpSyncProgress}, + warp::{EncodedProof, WarpSyncPhase, WarpSyncProgress}, + StrategyKey, SyncingAction, SyncingStrategy, }, types::{BadPeer, OpaqueStateRequest, OpaqueStateResponse, SyncState, SyncStatus}, LOG_TARGET, @@ -197,28 +198,6 @@ struct GapSync { target: NumberFor, } -/// Action that the parent of [`ChainSync`] should perform after reporting a network or block event. -#[derive(Debug)] -pub enum ChainSyncAction { - /// Send block request to peer. Always implies dropping a stale block request to the same peer. - SendBlockRequest { peer_id: PeerId, request: BlockRequest }, - /// Send state request to peer. - SendStateRequest { peer_id: PeerId, request: OpaqueStateRequest }, - /// Drop stale request. - CancelRequest { peer_id: PeerId }, - /// Peer misbehaved. Disconnect, report it and cancel the block request to it. - DropPeer(BadPeer), - /// Import blocks. - ImportBlocks { origin: BlockOrigin, blocks: Vec> }, - /// Import justifications. - ImportJustifications { - peer_id: PeerId, - hash: B::Hash, - number: NumberFor, - justifications: Justifications, - }, -} - /// Sync operation mode. #[derive(Copy, Clone, Debug, Eq, PartialEq)] pub enum ChainSyncMode { @@ -233,6 +212,75 @@ pub enum ChainSyncMode { }, } +/// All the data we have about a Peer that we are trying to sync with +#[derive(Debug, Clone)] +pub(crate) struct PeerSync { + /// Peer id of this peer. + pub peer_id: PeerId, + /// The common number is the block number that is a common point of + /// ancestry for both our chains (as far as we know). + pub common_number: NumberFor, + /// The hash of the best block that we've seen for this peer. + pub best_hash: B::Hash, + /// The number of the best block that we've seen for this peer. + pub best_number: NumberFor, + /// The state of syncing this peer is in for us, generally categories + /// into `Available` or "busy" with something as defined by `PeerSyncState`. + pub state: PeerSyncState, +} + +impl PeerSync { + /// Update the `common_number` iff `new_common > common_number`. + fn update_common_number(&mut self, new_common: NumberFor) { + if self.common_number < new_common { + trace!( + target: LOG_TARGET, + "Updating peer {} common number from={} => to={}.", + self.peer_id, + self.common_number, + new_common, + ); + self.common_number = new_common; + } + } +} + +struct ForkTarget { + number: NumberFor, + parent_hash: Option, + peers: HashSet, +} + +/// The state of syncing between a Peer and ourselves. +/// +/// Generally two categories, "busy" or `Available`. If busy, the enum +/// defines what we are busy with. +#[derive(Copy, Clone, Eq, PartialEq, Debug)] +pub(crate) enum PeerSyncState { + /// Available for sync requests. + Available, + /// Searching for ancestors the Peer has in common with us. + AncestorSearch { start: NumberFor, current: NumberFor, state: AncestorSearchState }, + /// Actively downloading new blocks, starting from the given Number. + DownloadingNew(NumberFor), + /// Downloading a stale block with given Hash. Stale means that it is a + /// block with a number that is lower than our best number. It might be + /// from a fork and not necessarily already imported. + DownloadingStale(B::Hash), + /// Downloading justification for given block hash. + DownloadingJustification(B::Hash), + /// Downloading state. + DownloadingState, + /// Actively downloading block history after warp sync. + DownloadingGap(NumberFor), +} + +impl PeerSyncState { + pub fn is_available(&self) -> bool { + matches!(self, Self::Available) + } +} + /// The main data structure which contains all the state for a chains /// active syncing strategy. pub struct ChainSync { @@ -280,77 +328,563 @@ pub struct ChainSync { /// Gap download process. gap_sync: Option>, /// Pending actions. - actions: Vec>, + actions: Vec>, /// Prometheus metrics. metrics: Option, } -/// All the data we have about a Peer that we are trying to sync with -#[derive(Debug, Clone)] -pub(crate) struct PeerSync { - /// Peer id of this peer. - pub peer_id: PeerId, - /// The common number is the block number that is a common point of - /// ancestry for both our chains (as far as we know). - pub common_number: NumberFor, - /// The hash of the best block that we've seen for this peer. - pub best_hash: B::Hash, - /// The number of the best block that we've seen for this peer. - pub best_number: NumberFor, - /// The state of syncing this peer is in for us, generally categories - /// into `Available` or "busy" with something as defined by `PeerSyncState`. - pub state: PeerSyncState, -} +impl SyncingStrategy for ChainSync +where + B: BlockT, + Client: HeaderBackend + + BlockBackend + + HeaderMetadata + + ProofProvider + + Send + + Sync + + 'static, +{ + fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor) { + match self.add_peer_inner(peer_id, best_hash, best_number) { + Ok(Some(request)) => self.actions.push(SyncingAction::SendBlockRequest { + peer_id, + key: StrategyKey::ChainSync, + request, + }), + Ok(None) => {}, + Err(bad_peer) => self.actions.push(SyncingAction::DropPeer(bad_peer)), + } + } + + fn remove_peer(&mut self, peer_id: &PeerId) { + self.blocks.clear_peer_download(peer_id); + if let Some(gap_sync) = &mut self.gap_sync { + gap_sync.blocks.clear_peer_download(peer_id) + } + + if let Some(state) = self.peers.remove(peer_id) { + if !state.state.is_available() { + if let Some(bad_peer) = + self.disconnected_peers.on_disconnect_during_request(*peer_id) + { + self.actions.push(SyncingAction::DropPeer(bad_peer)); + } + } + } + + self.extra_justifications.peer_disconnected(peer_id); + self.allowed_requests.set_all(); + self.fork_targets.retain(|_, target| { + target.peers.remove(peer_id); + !target.peers.is_empty() + }); + if let Some(metrics) = &self.metrics { + metrics.fork_targets.set(self.fork_targets.len().try_into().unwrap_or(u64::MAX)); + } + + let blocks = self.ready_blocks(); + + if !blocks.is_empty() { + self.validate_and_queue_blocks(blocks, false); + } + } + + fn on_validated_block_announce( + &mut self, + is_best: bool, + peer_id: PeerId, + announce: &BlockAnnounce, + ) -> Option<(B::Hash, NumberFor)> { + let number = *announce.header.number(); + let hash = announce.header.hash(); + let parent_status = + self.block_status(announce.header.parent_hash()).unwrap_or(BlockStatus::Unknown); + let known_parent = parent_status != BlockStatus::Unknown; + let ancient_parent = parent_status == BlockStatus::InChainPruned; + + let known = self.is_known(&hash); + let peer = if let Some(peer) = self.peers.get_mut(&peer_id) { + peer + } else { + error!(target: LOG_TARGET, "💔 Called `on_validated_block_announce` with a bad peer ID {peer_id}"); + return Some((hash, number)) + }; + + if let PeerSyncState::AncestorSearch { .. } = peer.state { + trace!(target: LOG_TARGET, "Peer {} is in the ancestor search state.", peer_id); + return None + } + + let peer_info = is_best.then(|| { + // update their best block + peer.best_number = number; + peer.best_hash = hash; + + (hash, number) + }); + + // If the announced block is the best they have and is not ahead of us, our common number + // is either one further ahead or it's the one they just announced, if we know about it. + if is_best { + if known && self.best_queued_number >= number { + self.update_peer_common_number(&peer_id, number); + } else if announce.header.parent_hash() == &self.best_queued_hash || + known_parent && self.best_queued_number >= number + { + self.update_peer_common_number(&peer_id, number.saturating_sub(One::one())); + } + } + self.allowed_requests.add(&peer_id); + + // known block case + if known || self.is_already_downloading(&hash) { + trace!(target: LOG_TARGET, "Known block announce from {}: {}", peer_id, hash); + if let Some(target) = self.fork_targets.get_mut(&hash) { + target.peers.insert(peer_id); + } + return peer_info + } + + if ancient_parent { + trace!( + target: LOG_TARGET, + "Ignored ancient block announced from {}: {} {:?}", + peer_id, + hash, + announce.header, + ); + return peer_info + } + + if self.status().state == SyncState::Idle { + trace!( + target: LOG_TARGET, + "Added sync target for block announced from {}: {} {:?}", + peer_id, + hash, + announce.summary(), + ); + self.fork_targets + .entry(hash) + .or_insert_with(|| { + if let Some(metrics) = &self.metrics { + metrics.fork_targets.inc(); + } + + ForkTarget { + number, + parent_hash: Some(*announce.header.parent_hash()), + peers: Default::default(), + } + }) + .peers + .insert(peer_id); + } + + peer_info + } + + // The implementation is similar to `on_validated_block_announce` with unknown parent hash. + fn set_sync_fork_request( + &mut self, + mut peers: Vec, + hash: &B::Hash, + number: NumberFor, + ) { + if peers.is_empty() { + peers = self + .peers + .iter() + // Only request blocks from peers who are ahead or on a par. + .filter(|(_, peer)| peer.best_number >= number) + .map(|(id, _)| *id) + .collect(); + + debug!( + target: LOG_TARGET, + "Explicit sync request for block {hash:?} with no peers specified. \ + Syncing from these peers {peers:?} instead.", + ); + } else { + debug!( + target: LOG_TARGET, + "Explicit sync request for block {hash:?} with {peers:?}", + ); + } + + if self.is_known(hash) { + debug!(target: LOG_TARGET, "Refusing to sync known hash {hash:?}"); + return + } + + trace!(target: LOG_TARGET, "Downloading requested old fork {hash:?}"); + for peer_id in &peers { + if let Some(peer) = self.peers.get_mut(peer_id) { + if let PeerSyncState::AncestorSearch { .. } = peer.state { + continue + } + + if number > peer.best_number { + peer.best_number = number; + peer.best_hash = *hash; + } + self.allowed_requests.add(peer_id); + } + } + + self.fork_targets + .entry(*hash) + .or_insert_with(|| { + if let Some(metrics) = &self.metrics { + metrics.fork_targets.inc(); + } + + ForkTarget { number, peers: Default::default(), parent_hash: None } + }) + .peers + .extend(peers); + } + + fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { + let client = &self.client; + self.extra_justifications + .schedule((*hash, number), |base, block| is_descendent_of(&**client, base, block)) + } + + fn clear_justification_requests(&mut self) { + self.extra_justifications.reset(); + } + + fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor, success: bool) { + let finalization_result = if success { Ok((hash, number)) } else { Err(()) }; + self.extra_justifications + .try_finalize_root((hash, number), finalization_result, true); + self.allowed_requests.set_all(); + } + + fn on_block_response( + &mut self, + peer_id: PeerId, + key: StrategyKey, + request: BlockRequest, + blocks: Vec>, + ) { + if key != StrategyKey::ChainSync { + error!( + target: LOG_TARGET, + "`on_block_response()` called with unexpected key {key:?} for chain sync", + ); + debug_assert!(false); + } + let block_response = BlockResponse:: { id: request.id, blocks }; + + let blocks_range = || match ( + block_response + .blocks + .first() + .and_then(|b| b.header.as_ref().map(|h| h.number())), + block_response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())), + ) { + (Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last), + (Some(first), Some(_)) => format!(" ({})", first), + _ => Default::default(), + }; + trace!( + target: LOG_TARGET, + "BlockResponse {} from {} with {} blocks {}", + block_response.id, + peer_id, + block_response.blocks.len(), + blocks_range(), + ); + + let res = if request.fields == BlockAttributes::JUSTIFICATION { + self.on_block_justification(peer_id, block_response) + } else { + self.on_block_data(&peer_id, Some(request), block_response) + }; + + if let Err(bad_peer) = res { + self.actions.push(SyncingAction::DropPeer(bad_peer)); + } + } + + fn on_state_response( + &mut self, + peer_id: PeerId, + key: StrategyKey, + response: OpaqueStateResponse, + ) { + if key != StrategyKey::ChainSync { + error!( + target: LOG_TARGET, + "`on_state_response()` called with unexpected key {key:?} for chain sync", + ); + debug_assert!(false); + } + if let Err(bad_peer) = self.on_state_data(&peer_id, response) { + self.actions.push(SyncingAction::DropPeer(bad_peer)); + } + } + + fn on_warp_proof_response( + &mut self, + _peer_id: &PeerId, + _key: StrategyKey, + _response: EncodedProof, + ) { + error!( + target: LOG_TARGET, + "`on_warp_proof_response()` called for chain sync strategy", + ); + debug_assert!(false); + } + + fn on_blocks_processed( + &mut self, + imported: usize, + count: usize, + results: Vec<(Result>, BlockImportError>, B::Hash)>, + ) { + trace!(target: LOG_TARGET, "Imported {imported} of {count}"); + + let mut has_error = false; + for (_, hash) in &results { + if self.queue_blocks.remove(hash) { + if let Some(metrics) = &self.metrics { + metrics.queued_blocks.dec(); + } + } + self.blocks.clear_queued(hash); + if let Some(gap_sync) = &mut self.gap_sync { + gap_sync.blocks.clear_queued(hash); + } + } + for (result, hash) in results { + if has_error { + break + } + + has_error |= result.is_err(); + + match result { + Ok(BlockImportStatus::ImportedKnown(number, peer_id)) => + if let Some(peer) = peer_id { + self.update_peer_common_number(&peer, number); + }, + Ok(BlockImportStatus::ImportedUnknown(number, aux, peer_id)) => { + if aux.clear_justification_requests { + trace!( + target: LOG_TARGET, + "Block imported clears all pending justification requests {number}: {hash:?}", + ); + self.clear_justification_requests(); + } + + if aux.needs_justification { + trace!( + target: LOG_TARGET, + "Block imported but requires justification {number}: {hash:?}", + ); + self.request_justification(&hash, number); + } + + if aux.bad_justification { + if let Some(ref peer) = peer_id { + warn!("💔 Sent block with bad justification to import"); + self.actions.push(SyncingAction::DropPeer(BadPeer( + *peer, + rep::BAD_JUSTIFICATION, + ))); + } + } + + if let Some(peer) = peer_id { + self.update_peer_common_number(&peer, number); + } + let state_sync_complete = + self.state_sync.as_ref().map_or(false, |s| s.target_hash() == hash); + if state_sync_complete { + info!( + target: LOG_TARGET, + "State sync is complete ({} MiB), restarting block sync.", + self.state_sync.as_ref().map_or(0, |s| s.progress().size / (1024 * 1024)), + ); + self.state_sync = None; + self.mode = ChainSyncMode::Full; + self.restart(); + } + let gap_sync_complete = + self.gap_sync.as_ref().map_or(false, |s| s.target == number); + if gap_sync_complete { + info!( + target: LOG_TARGET, + "Block history download is complete." + ); + self.gap_sync = None; + } + }, + Err(BlockImportError::IncompleteHeader(peer_id)) => + if let Some(peer) = peer_id { + warn!( + target: LOG_TARGET, + "💔 Peer sent block with incomplete header to import", + ); + self.actions + .push(SyncingAction::DropPeer(BadPeer(peer, rep::INCOMPLETE_HEADER))); + self.restart(); + }, + Err(BlockImportError::VerificationFailed(peer_id, e)) => { + let extra_message = peer_id + .map_or_else(|| "".into(), |peer| format!(" received from ({peer})")); + + warn!( + target: LOG_TARGET, + "💔 Verification failed for block {hash:?}{extra_message}: {e:?}", + ); + + if let Some(peer) = peer_id { + self.actions + .push(SyncingAction::DropPeer(BadPeer(peer, rep::VERIFICATION_FAIL))); + } + + self.restart(); + }, + Err(BlockImportError::BadBlock(peer_id)) => + if let Some(peer) = peer_id { + warn!( + target: LOG_TARGET, + "💔 Block {hash:?} received from peer {peer} has been blacklisted", + ); + self.actions.push(SyncingAction::DropPeer(BadPeer(peer, rep::BAD_BLOCK))); + }, + Err(BlockImportError::MissingState) => { + // This may happen if the chain we were requesting upon has been discarded + // in the meantime because other chain has been finalized. + // Don't mark it as bad as it still may be synced if explicitly requested. + trace!(target: LOG_TARGET, "Obsolete block {hash:?}"); + }, + e @ Err(BlockImportError::UnknownParent) | e @ Err(BlockImportError::Other(_)) => { + warn!(target: LOG_TARGET, "💔 Error importing block {hash:?}: {}", e.unwrap_err()); + self.state_sync = None; + self.restart(); + }, + Err(BlockImportError::Cancelled) => {}, + }; + } + + self.allowed_requests.set_all(); + } + + fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor) { + let client = &self.client; + let r = self.extra_justifications.on_block_finalized(hash, number, |base, block| { + is_descendent_of(&**client, base, block) + }); + + if let ChainSyncMode::LightState { skip_proofs, .. } = &self.mode { + if self.state_sync.is_none() { + if !self.peers.is_empty() && self.queue_blocks.is_empty() { + self.attempt_state_sync(*hash, number, *skip_proofs); + } else { + self.pending_state_sync_attempt.replace((*hash, number, *skip_proofs)); + } + } + } + + if let Err(err) = r { + warn!( + target: LOG_TARGET, + "💔 Error cleaning up pending extra justification data requests: {err}", + ); + } + } + + fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor) { + self.on_block_queued(best_hash, best_number); + } + + fn is_major_syncing(&self) -> bool { + self.status().state.is_major_syncing() + } + + fn num_peers(&self) -> usize { + self.peers.len() + } + + fn status(&self) -> SyncStatus { + let median_seen = self.median_seen(); + let best_seen_block = + median_seen.and_then(|median| (median > self.best_queued_number).then_some(median)); + let sync_state = if let Some(target) = median_seen { + // A chain is classified as downloading if the provided best block is + // more than `MAJOR_SYNC_BLOCKS` behind the best block or as importing + // if the same can be said about queued blocks. + let best_block = self.client.info().best_number; + if target > best_block && target - best_block > MAJOR_SYNC_BLOCKS.into() { + // If target is not queued, we're downloading, otherwise importing. + if target > self.best_queued_number { + SyncState::Downloading { target } + } else { + SyncState::Importing { target } + } + } else { + SyncState::Idle + } + } else { + SyncState::Idle + }; + + let warp_sync_progress = self.gap_sync.as_ref().map(|gap_sync| WarpSyncProgress { + phase: WarpSyncPhase::DownloadingBlocks(gap_sync.best_queued_number), + total_bytes: 0, + }); -impl PeerSync { - /// Update the `common_number` iff `new_common > common_number`. - fn update_common_number(&mut self, new_common: NumberFor) { - if self.common_number < new_common { - trace!( - target: LOG_TARGET, - "Updating peer {} common number from={} => to={}.", - self.peer_id, - self.common_number, - new_common, - ); - self.common_number = new_common; + SyncStatus { + state: sync_state, + best_seen_block, + num_peers: self.peers.len() as u32, + queued_blocks: self.queue_blocks.len() as u32, + state_sync: self.state_sync.as_ref().map(|s| s.progress()), + warp_sync: warp_sync_progress, } } -} -struct ForkTarget { - number: NumberFor, - parent_hash: Option, - peers: HashSet, -} + fn num_downloaded_blocks(&self) -> usize { + self.downloaded_blocks + } -/// The state of syncing between a Peer and ourselves. -/// -/// Generally two categories, "busy" or `Available`. If busy, the enum -/// defines what we are busy with. -#[derive(Copy, Clone, Eq, PartialEq, Debug)] -pub(crate) enum PeerSyncState { - /// Available for sync requests. - Available, - /// Searching for ancestors the Peer has in common with us. - AncestorSearch { start: NumberFor, current: NumberFor, state: AncestorSearchState }, - /// Actively downloading new blocks, starting from the given Number. - DownloadingNew(NumberFor), - /// Downloading a stale block with given Hash. Stale means that it is a - /// block with a number that is lower than our best number. It might be - /// from a fork and not necessarily already imported. - DownloadingStale(B::Hash), - /// Downloading justification for given block hash. - DownloadingJustification(B::Hash), - /// Downloading state. - DownloadingState, - /// Actively downloading block history after warp sync. - DownloadingGap(NumberFor), -} + fn num_sync_requests(&self) -> usize { + self.fork_targets + .values() + .filter(|f| f.number <= self.best_queued_number) + .count() + } -impl PeerSyncState { - pub fn is_available(&self) -> bool { - matches!(self, Self::Available) + fn actions(&mut self) -> Result>, ClientError> { + if !self.peers.is_empty() && self.queue_blocks.is_empty() { + if let Some((hash, number, skip_proofs)) = self.pending_state_sync_attempt.take() { + self.attempt_state_sync(hash, number, skip_proofs); + } + } + + let block_requests = self.block_requests().into_iter().map(|(peer_id, request)| { + SyncingAction::SendBlockRequest { peer_id, key: StrategyKey::ChainSync, request } + }); + self.actions.extend(block_requests); + + let justification_requests = + self.justification_requests().into_iter().map(|(peer_id, request)| { + SyncingAction::SendBlockRequest { peer_id, key: StrategyKey::ChainSync, request } + }); + self.actions.extend(justification_requests); + + let state_request = self.state_request().into_iter().map(|(peer_id, request)| { + SyncingAction::SendStateRequest { peer_id, key: StrategyKey::ChainSync, request } + }); + self.actions.extend(state_request); + + Ok(std::mem::take(&mut self.actions)) } } @@ -414,73 +948,6 @@ where Ok(sync) } - /// Returns the current sync status. - pub fn status(&self) -> SyncStatus { - let median_seen = self.median_seen(); - let best_seen_block = - median_seen.and_then(|median| (median > self.best_queued_number).then_some(median)); - let sync_state = if let Some(target) = median_seen { - // A chain is classified as downloading if the provided best block is - // more than `MAJOR_SYNC_BLOCKS` behind the best block or as importing - // if the same can be said about queued blocks. - let best_block = self.client.info().best_number; - if target > best_block && target - best_block > MAJOR_SYNC_BLOCKS.into() { - // If target is not queued, we're downloading, otherwise importing. - if target > self.best_queued_number { - SyncState::Downloading { target } - } else { - SyncState::Importing { target } - } - } else { - SyncState::Idle - } - } else { - SyncState::Idle - }; - - let warp_sync_progress = self.gap_sync.as_ref().map(|gap_sync| WarpSyncProgress { - phase: WarpSyncPhase::DownloadingBlocks(gap_sync.best_queued_number), - total_bytes: 0, - }); - - SyncStatus { - state: sync_state, - best_seen_block, - num_peers: self.peers.len() as u32, - queued_blocks: self.queue_blocks.len() as u32, - state_sync: self.state_sync.as_ref().map(|s| s.progress()), - warp_sync: warp_sync_progress, - } - } - - /// Get an estimate of the number of parallel sync requests. - pub fn num_sync_requests(&self) -> usize { - self.fork_targets - .values() - .filter(|f| f.number <= self.best_queued_number) - .count() - } - - /// Get the total number of downloaded blocks. - pub fn num_downloaded_blocks(&self) -> usize { - self.downloaded_blocks - } - - /// Get the number of peers known to the syncing state machine. - pub fn num_peers(&self) -> usize { - self.peers.len() - } - - /// Notify syncing state machine that a new sync peer has connected. - pub fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor) { - match self.add_peer_inner(peer_id, best_hash, best_number) { - Ok(Some(request)) => - self.actions.push(ChainSyncAction::SendBlockRequest { peer_id, request }), - Ok(None) => {}, - Err(bad_peer) => self.actions.push(ChainSyncAction::DropPeer(bad_peer)), - } - } - #[must_use] fn add_peer_inner( &mut self, @@ -550,138 +1017,53 @@ where peer_id, best_hash, best_number - ); - - ( - PeerSyncState::AncestorSearch { - current: common_best, - start: self.best_queued_number, - state: AncestorSearchState::ExponentialBackoff(One::one()), - }, - Some(ancestry_request::(common_best)), - ) - }; - - self.allowed_requests.add(&peer_id); - self.peers.insert( - peer_id, - PeerSync { - peer_id, - common_number: Zero::zero(), - best_hash, - best_number, - state, - }, - ); - - Ok(req) - }, - Ok(BlockStatus::Queued) | - Ok(BlockStatus::InChainWithState) | - Ok(BlockStatus::InChainPruned) => { - debug!( - target: LOG_TARGET, - "New peer {peer_id} with known best hash {best_hash} ({best_number}).", - ); - self.peers.insert( - peer_id, - PeerSync { - peer_id, - common_number: std::cmp::min(self.best_queued_number, best_number), - best_hash, - best_number, - state: PeerSyncState::Available, - }, - ); - self.allowed_requests.add(&peer_id); - Ok(None) - }, - } - } - - /// Inform sync about a new best imported block. - pub fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor) { - self.on_block_queued(best_hash, best_number); - } - - /// Request extra justification. - pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { - let client = &self.client; - self.extra_justifications - .schedule((*hash, number), |base, block| is_descendent_of(&**client, base, block)) - } - - /// Clear extra justification requests. - pub fn clear_justification_requests(&mut self) { - self.extra_justifications.reset(); - } - - /// Configure an explicit fork sync request in case external code has detected that there is a - /// stale fork missing. - /// - /// Note that this function should not be used for recent blocks. - /// Sync should be able to download all the recent forks normally. - /// - /// Passing empty `peers` set effectively removes the sync request. - // The implementation is similar to `on_validated_block_announce` with unknown parent hash. - pub fn set_sync_fork_request( - &mut self, - mut peers: Vec, - hash: &B::Hash, - number: NumberFor, - ) { - if peers.is_empty() { - peers = self - .peers - .iter() - // Only request blocks from peers who are ahead or on a par. - .filter(|(_, peer)| peer.best_number >= number) - .map(|(id, _)| *id) - .collect(); - - debug!( - target: LOG_TARGET, - "Explicit sync request for block {hash:?} with no peers specified. \ - Syncing from these peers {peers:?} instead.", - ); - } else { - debug!( - target: LOG_TARGET, - "Explicit sync request for block {hash:?} with {peers:?}", - ); - } - - if self.is_known(hash) { - debug!(target: LOG_TARGET, "Refusing to sync known hash {hash:?}"); - return; - } - - trace!(target: LOG_TARGET, "Downloading requested old fork {hash:?}"); - for peer_id in &peers { - if let Some(peer) = self.peers.get_mut(peer_id) { - if let PeerSyncState::AncestorSearch { .. } = peer.state { - continue; - } - - if number > peer.best_number { - peer.best_number = number; - peer.best_hash = *hash; - } - self.allowed_requests.add(peer_id); - } - } + ); - self.fork_targets - .entry(*hash) - .or_insert_with(|| { - if let Some(metrics) = &self.metrics { - metrics.fork_targets.inc(); - } + ( + PeerSyncState::AncestorSearch { + current: common_best, + start: self.best_queued_number, + state: AncestorSearchState::ExponentialBackoff(One::one()), + }, + Some(ancestry_request::(common_best)), + ) + }; - ForkTarget { number, peers: Default::default(), parent_hash: None } - }) - .peers - .extend(peers); + self.allowed_requests.add(&peer_id); + self.peers.insert( + peer_id, + PeerSync { + peer_id, + common_number: Zero::zero(), + best_hash, + best_number, + state, + }, + ); + + Ok(req) + }, + Ok(BlockStatus::Queued) | + Ok(BlockStatus::InChainWithState) | + Ok(BlockStatus::InChainPruned) => { + debug!( + target: LOG_TARGET, + "New peer {peer_id} with known best hash {best_hash} ({best_number}).", + ); + self.peers.insert( + peer_id, + PeerSync { + peer_id, + common_number: std::cmp::min(self.best_queued_number, best_number), + best_hash, + best_number, + state: PeerSyncState::Available, + }, + ); + self.allowed_requests.add(&peer_id); + Ok(None) + }, + } } /// Submit a block response for processing. @@ -857,8 +1239,9 @@ where state: next_state, }; let request = ancestry_request::(next_num); - self.actions.push(ChainSyncAction::SendBlockRequest { + self.actions.push(SyncingAction::SendBlockRequest { peer_id: *peer_id, + key: StrategyKey::ChainSync, request, }); return Ok(()); @@ -965,240 +1348,45 @@ where // We only request one justification at a time let justification = if let Some(block) = response.blocks.into_iter().next() { - if hash != block.hash { - warn!( - target: LOG_TARGET, - "💔 Invalid block justification provided by {}: requested: {:?} got: {:?}", - peer_id, - hash, - block.hash, - ); - return Err(BadPeer(peer_id, rep::BAD_JUSTIFICATION)); - } - - block - .justifications - .or_else(|| legacy_justification_mapping(block.justification)) - } else { - // we might have asked the peer for a justification on a block that we assumed it - // had but didn't (regardless of whether it had a justification for it or not). - trace!( - target: LOG_TARGET, - "Peer {peer_id:?} provided empty response for justification request {hash:?}", - ); - - None - }; - - if let Some((peer_id, hash, number, justifications)) = - self.extra_justifications.on_response(peer_id, justification) - { - self.actions.push(ChainSyncAction::ImportJustifications { - peer_id, - hash, - number, - justifications, - }); - return Ok(()); - } - } - - Ok(()) - } - - /// Report a justification import (successful or not). - pub fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor, success: bool) { - let finalization_result = if success { Ok((hash, number)) } else { Err(()) }; - self.extra_justifications - .try_finalize_root((hash, number), finalization_result, true); - self.allowed_requests.set_all(); - } - - /// Notify sync that a block has been finalized. - pub fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor) { - let client = &self.client; - let r = self.extra_justifications.on_block_finalized(hash, number, |base, block| { - is_descendent_of(&**client, base, block) - }); - - if let ChainSyncMode::LightState { skip_proofs, .. } = &self.mode { - if self.state_sync.is_none() { - if !self.peers.is_empty() && self.queue_blocks.is_empty() { - self.attempt_state_sync(*hash, number, *skip_proofs); - } else { - self.pending_state_sync_attempt.replace((*hash, number, *skip_proofs)); - } - } - } - - if let Err(err) = r { - warn!( - target: LOG_TARGET, - "💔 Error cleaning up pending extra justification data requests: {err}", - ); - } - } - - fn attempt_state_sync( - &mut self, - finalized_hash: B::Hash, - finalized_number: NumberFor, - skip_proofs: bool, - ) { - let mut heads: Vec<_> = self.peers.values().map(|peer| peer.best_number).collect(); - heads.sort(); - let median = heads[heads.len() / 2]; - if finalized_number + STATE_SYNC_FINALITY_THRESHOLD.saturated_into() >= median { - if let Ok(Some(header)) = self.client.header(finalized_hash) { - log::debug!( - target: LOG_TARGET, - "Starting state sync for #{finalized_number} ({finalized_hash})", - ); - self.state_sync = - Some(StateSync::new(self.client.clone(), header, None, None, skip_proofs)); - self.allowed_requests.set_all(); - } else { - log::error!( - target: LOG_TARGET, - "Failed to start state sync: header for finalized block \ - #{finalized_number} ({finalized_hash}) is not available", - ); - debug_assert!(false); - } - } - } - - /// Submit a validated block announcement. - /// - /// Returns new best hash & best number of the peer if they are updated. - #[must_use] - pub fn on_validated_block_announce( - &mut self, - is_best: bool, - peer_id: PeerId, - announce: &BlockAnnounce, - ) -> Option<(B::Hash, NumberFor)> { - let number = *announce.header.number(); - let hash = announce.header.hash(); - let parent_status = - self.block_status(announce.header.parent_hash()).unwrap_or(BlockStatus::Unknown); - let known_parent = parent_status != BlockStatus::Unknown; - let ancient_parent = parent_status == BlockStatus::InChainPruned; - - let known = self.is_known(&hash); - let peer = if let Some(peer) = self.peers.get_mut(&peer_id) { - peer - } else { - error!(target: LOG_TARGET, "💔 Called `on_validated_block_announce` with a bad peer ID {peer_id}"); - return Some((hash, number)); - }; - - if let PeerSyncState::AncestorSearch { .. } = peer.state { - trace!(target: LOG_TARGET, "Peer {} is in the ancestor search state.", peer_id); - return None; - } - - let peer_info = is_best.then(|| { - // update their best block - peer.best_number = number; - peer.best_hash = hash; - - (hash, number) - }); - - // If the announced block is the best they have and is not ahead of us, our common number - // is either one further ahead or it's the one they just announced, if we know about it. - if is_best { - if known && self.best_queued_number >= number { - self.update_peer_common_number(&peer_id, number); - } else if announce.header.parent_hash() == &self.best_queued_hash || - known_parent && self.best_queued_number >= number - { - self.update_peer_common_number(&peer_id, number.saturating_sub(One::one())); - } - } - self.allowed_requests.add(&peer_id); - - // known block case - if known || self.is_already_downloading(&hash) { - trace!(target: LOG_TARGET, "Known block announce from {}: {}", peer_id, hash); - if let Some(target) = self.fork_targets.get_mut(&hash) { - target.peers.insert(peer_id); - } - return peer_info; - } - - if ancient_parent { - trace!( - target: LOG_TARGET, - "Ignored ancient block announced from {}: {} {:?}", - peer_id, - hash, - announce.header, - ); - return peer_info; - } - - if self.status().state == SyncState::Idle { - trace!( - target: LOG_TARGET, - "Added sync target for block announced from {}: {} {:?}", - peer_id, - hash, - announce.summary(), - ); - self.fork_targets - .entry(hash) - .or_insert_with(|| { - if let Some(metrics) = &self.metrics { - metrics.fork_targets.inc(); - } - - ForkTarget { - number, - parent_hash: Some(*announce.header.parent_hash()), - peers: Default::default(), - } - }) - .peers - .insert(peer_id); - } - - peer_info - } - - /// Notify that a sync peer has disconnected. - pub fn remove_peer(&mut self, peer_id: &PeerId) { - self.blocks.clear_peer_download(peer_id); - if let Some(gap_sync) = &mut self.gap_sync { - gap_sync.blocks.clear_peer_download(peer_id) - } - - if let Some(state) = self.peers.remove(peer_id) { - if !state.state.is_available() { - if let Some(bad_peer) = - self.disconnected_peers.on_disconnect_during_request(*peer_id) - { - self.actions.push(ChainSyncAction::DropPeer(bad_peer)); + if hash != block.hash { + warn!( + target: LOG_TARGET, + "💔 Invalid block justification provided by {}: requested: {:?} got: {:?}", + peer_id, + hash, + block.hash, + ); + return Err(BadPeer(peer_id, rep::BAD_JUSTIFICATION)); } - } - } - self.extra_justifications.peer_disconnected(peer_id); - self.allowed_requests.set_all(); - self.fork_targets.retain(|_, target| { - target.peers.remove(peer_id); - !target.peers.is_empty() - }); - if let Some(metrics) = &self.metrics { - metrics.fork_targets.set(self.fork_targets.len().try_into().unwrap_or(u64::MAX)); - } + block + .justifications + .or_else(|| legacy_justification_mapping(block.justification)) + } else { + // we might have asked the peer for a justification on a block that we assumed it + // had but didn't (regardless of whether it had a justification for it or not). + trace!( + target: LOG_TARGET, + "Peer {peer_id:?} provided empty response for justification request {hash:?}", + ); - let blocks = self.ready_blocks(); + None + }; - if !blocks.is_empty() { - self.validate_and_queue_blocks(blocks, false); + if let Some((peer_id, hash, number, justifications)) = + self.extra_justifications.on_response(peer_id, justification) + { + self.actions.push(SyncingAction::ImportJustifications { + peer_id, + hash, + number, + justifications, + }); + return Ok(()); + } } + + Ok(()) } /// Returns the median seen block number. @@ -1272,7 +1460,7 @@ where .set(self.queue_blocks.len().try_into().unwrap_or(u64::MAX)); } - self.actions.push(ChainSyncAction::ImportBlocks { origin, blocks: new_blocks }) + self.actions.push(SyncingAction::ImportBlocks { origin, blocks: new_blocks }) } fn update_peer_common_number(&mut self, peer_id: &PeerId, new_common: NumberFor) { @@ -1351,7 +1539,10 @@ where PeerSyncState::DownloadingGap(_) | PeerSyncState::DownloadingState => { // Cancel a request first, as `add_peer` may generate a new request. - self.actions.push(ChainSyncAction::CancelRequest { peer_id }); + self.actions.push(SyncingAction::CancelRequest { + peer_id, + key: StrategyKey::ChainSync, + }); self.add_peer(peer_id, peer_sync.best_hash, peer_sync.best_number); }, PeerSyncState::DownloadingJustification(_) => { @@ -1467,53 +1658,6 @@ where .collect() } - /// Submit blocks received in a response. - pub fn on_block_response( - &mut self, - peer_id: PeerId, - request: BlockRequest, - blocks: Vec>, - ) { - let block_response = BlockResponse:: { id: request.id, blocks }; - - let blocks_range = || match ( - block_response - .blocks - .first() - .and_then(|b| b.header.as_ref().map(|h| h.number())), - block_response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())), - ) { - (Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last), - (Some(first), Some(_)) => format!(" ({})", first), - _ => Default::default(), - }; - trace!( - target: LOG_TARGET, - "BlockResponse {} from {} with {} blocks {}", - block_response.id, - peer_id, - block_response.blocks.len(), - blocks_range(), - ); - - let res = if request.fields == BlockAttributes::JUSTIFICATION { - self.on_block_justification(peer_id, block_response) - } else { - self.on_block_data(&peer_id, Some(request), block_response) - }; - - if let Err(bad_peer) = res { - self.actions.push(ChainSyncAction::DropPeer(bad_peer)); - } - } - - /// Submit a state received in a response. - pub fn on_state_response(&mut self, peer_id: PeerId, response: OpaqueStateResponse) { - if let Err(bad_peer) = self.on_state_data(&peer_id, response) { - self.actions.push(ChainSyncAction::DropPeer(bad_peer)); - } - } - /// Get justification requests scheduled by sync to be sent out. fn justification_requests(&mut self) -> Vec<(PeerId, BlockRequest)> { let peers = &mut self.peers; @@ -1751,7 +1895,7 @@ where state: Some(state), }; debug!(target: LOG_TARGET, "State download is complete. Import is queued"); - self.actions.push(ChainSyncAction::ImportBlocks { origin, blocks: vec![block] }); + self.actions.push(SyncingAction::ImportBlocks { origin, blocks: vec![block] }); Ok(()) }, ImportResult::Continue => Ok(()), @@ -1762,181 +1906,39 @@ where } } - /// A batch of blocks have been processed, with or without errors. - /// - /// Call this when a batch of blocks have been processed by the import - /// queue, with or without errors. - pub fn on_blocks_processed( + fn attempt_state_sync( &mut self, - imported: usize, - count: usize, - results: Vec<(Result>, BlockImportError>, B::Hash)>, + finalized_hash: B::Hash, + finalized_number: NumberFor, + skip_proofs: bool, ) { - trace!(target: LOG_TARGET, "Imported {imported} of {count}"); - - let mut has_error = false; - for (_, hash) in &results { - if self.queue_blocks.remove(hash) { - if let Some(metrics) = &self.metrics { - metrics.queued_blocks.dec(); - } - } - self.blocks.clear_queued(hash); - if let Some(gap_sync) = &mut self.gap_sync { - gap_sync.blocks.clear_queued(hash); - } - } - for (result, hash) in results { - if has_error { - break; - } - - has_error |= result.is_err(); - - match result { - Ok(BlockImportStatus::ImportedKnown(number, peer_id)) => { - if let Some(peer) = peer_id { - self.update_peer_common_number(&peer, number); - } - }, - Ok(BlockImportStatus::ImportedUnknown(number, aux, peer_id)) => { - if aux.clear_justification_requests { - trace!( - target: LOG_TARGET, - "Block imported clears all pending justification requests {number}: {hash:?}", - ); - self.clear_justification_requests(); - } - - if aux.needs_justification { - trace!( - target: LOG_TARGET, - "Block imported but requires justification {number}: {hash:?}", - ); - self.request_justification(&hash, number); - } - - if aux.bad_justification { - if let Some(ref peer) = peer_id { - warn!("💔 Sent block with bad justification to import"); - self.actions.push(ChainSyncAction::DropPeer(BadPeer( - *peer, - rep::BAD_JUSTIFICATION, - ))); - } - } - - if let Some(peer) = peer_id { - self.update_peer_common_number(&peer, number); - } - let state_sync_complete = - self.state_sync.as_ref().map_or(false, |s| s.target_hash() == hash); - if state_sync_complete { - info!( - target: LOG_TARGET, - "State sync is complete ({} MiB), restarting block sync.", - self.state_sync.as_ref().map_or(0, |s| s.progress().size / (1024 * 1024)), - ); - self.state_sync = None; - self.mode = ChainSyncMode::Full; - self.restart(); - } - let gap_sync_complete = - self.gap_sync.as_ref().map_or(false, |s| s.target == number); - if gap_sync_complete { - info!( - target: LOG_TARGET, - "Block history download is complete." - ); - self.gap_sync = None; - } - }, - Err(BlockImportError::IncompleteHeader(peer_id)) => - if let Some(peer) = peer_id { - warn!( - target: LOG_TARGET, - "💔 Peer sent block with incomplete header to import", - ); - self.actions - .push(ChainSyncAction::DropPeer(BadPeer(peer, rep::INCOMPLETE_HEADER))); - self.restart(); - }, - Err(BlockImportError::VerificationFailed(peer_id, e)) => { - let extra_message = peer_id - .map_or_else(|| "".into(), |peer| format!(" received from ({peer})")); - - warn!( - target: LOG_TARGET, - "💔 Verification failed for block {hash:?}{extra_message}: {e:?}", - ); - - if let Some(peer) = peer_id { - self.actions - .push(ChainSyncAction::DropPeer(BadPeer(peer, rep::VERIFICATION_FAIL))); - } - - self.restart(); - }, - Err(BlockImportError::BadBlock(peer_id)) => - if let Some(peer) = peer_id { - warn!( - target: LOG_TARGET, - "💔 Block {hash:?} received from peer {peer} has been blacklisted", - ); - self.actions.push(ChainSyncAction::DropPeer(BadPeer(peer, rep::BAD_BLOCK))); - }, - Err(BlockImportError::MissingState) => { - // This may happen if the chain we were requesting upon has been discarded - // in the meantime because other chain has been finalized. - // Don't mark it as bad as it still may be synced if explicitly requested. - trace!(target: LOG_TARGET, "Obsolete block {hash:?}"); - }, - e @ Err(BlockImportError::UnknownParent) | e @ Err(BlockImportError::Other(_)) => { - warn!(target: LOG_TARGET, "💔 Error importing block {hash:?}: {}", e.unwrap_err()); - self.state_sync = None; - self.restart(); - }, - Err(BlockImportError::Cancelled) => {}, - }; - } - - self.allowed_requests.set_all(); - } - - /// Get pending actions to perform. - #[must_use] - pub fn actions(&mut self) -> impl Iterator> { - if !self.peers.is_empty() && self.queue_blocks.is_empty() { - if let Some((hash, number, skip_proofs)) = self.pending_state_sync_attempt.take() { - self.attempt_state_sync(hash, number, skip_proofs); + let mut heads: Vec<_> = self.peers.values().map(|peer| peer.best_number).collect(); + heads.sort(); + let median = heads[heads.len() / 2]; + if finalized_number + STATE_SYNC_FINALITY_THRESHOLD.saturated_into() >= median { + if let Ok(Some(header)) = self.client.header(finalized_hash) { + log::debug!( + target: LOG_TARGET, + "Starting state sync for #{finalized_number} ({finalized_hash})", + ); + self.state_sync = + Some(StateSync::new(self.client.clone(), header, None, None, skip_proofs)); + self.allowed_requests.set_all(); + } else { + log::error!( + target: LOG_TARGET, + "Failed to start state sync: header for finalized block \ + #{finalized_number} ({finalized_hash}) is not available", + ); + debug_assert!(false); } } - - let block_requests = self - .block_requests() - .into_iter() - .map(|(peer_id, request)| ChainSyncAction::SendBlockRequest { peer_id, request }); - self.actions.extend(block_requests); - - let justification_requests = self - .justification_requests() - .into_iter() - .map(|(peer_id, request)| ChainSyncAction::SendBlockRequest { peer_id, request }); - self.actions.extend(justification_requests); - - let state_request = self - .state_request() - .into_iter() - .map(|(peer_id, request)| ChainSyncAction::SendStateRequest { peer_id, request }); - self.actions.extend(state_request); - - std::mem::take(&mut self.actions).into_iter() } /// A version of `actions()` that doesn't schedule extra requests. For testing only. #[cfg(test)] #[must_use] - fn take_actions(&mut self) -> impl Iterator> { + fn take_actions(&mut self) -> impl Iterator> { std::mem::take(&mut self.actions).into_iter() } } diff --git a/substrate/client/network/sync/src/strategy/chain_sync/test.rs b/substrate/client/network/sync/src/strategy/chain_sync/test.rs index 39d0c8f8d4d6..59436f387db6 100644 --- a/substrate/client/network/sync/src/strategy/chain_sync/test.rs +++ b/substrate/client/network/sync/src/strategy/chain_sync/test.rs @@ -128,10 +128,10 @@ fn restart_doesnt_affect_peers_downloading_finality_data() { // we wil send block requests to these peers // for these blocks we don't know about - let actions = sync.actions().collect::>(); + let actions = sync.actions().unwrap(); assert_eq!(actions.len(), 2); assert!(actions.iter().all(|action| match action { - ChainSyncAction::SendBlockRequest { peer_id, .. } => + SyncingAction::SendBlockRequest { peer_id, .. } => peer_id == &peer_id1 || peer_id == &peer_id2, _ => false, })); @@ -162,15 +162,15 @@ fn restart_doesnt_affect_peers_downloading_finality_data() { sync.restart(); // which should make us cancel and send out again block requests to the first two peers - let actions = sync.actions().collect::>(); + let actions = sync.actions().unwrap(); assert_eq!(actions.len(), 4); let mut cancelled_first = HashSet::new(); assert!(actions.iter().all(|action| match action { - ChainSyncAction::CancelRequest { peer_id, .. } => { + SyncingAction::CancelRequest { peer_id, .. } => { cancelled_first.insert(peer_id); peer_id == &peer_id1 || peer_id == &peer_id2 }, - ChainSyncAction::SendBlockRequest { peer_id, .. } => { + SyncingAction::SendBlockRequest { peer_id, .. } => { assert!(cancelled_first.remove(peer_id)); peer_id == &peer_id1 || peer_id == &peer_id2 }, @@ -329,7 +329,7 @@ fn do_ancestor_search_when_common_block_to_best_queued_gap_is_to_big() { assert_eq!(actions.len(), 1); assert!(matches!( &actions[0], - ChainSyncAction::ImportBlocks{ origin: _, blocks } if blocks.len() == max_blocks_to_request as usize, + SyncingAction::ImportBlocks{ origin: _, blocks } if blocks.len() == max_blocks_to_request as usize, )); best_block_num += max_blocks_to_request as u32; @@ -476,7 +476,7 @@ fn can_sync_huge_fork() { } else { assert_eq!(actions.len(), 1); match &actions[0] { - ChainSyncAction::SendBlockRequest { peer_id: _, request } => request.clone(), + SyncingAction::SendBlockRequest { peer_id: _, request, key: _ } => request.clone(), action @ _ => panic!("Unexpected action: {action:?}"), } }; @@ -508,7 +508,7 @@ fn can_sync_huge_fork() { assert_eq!(actions.len(), 1); assert!(matches!( &actions[0], - ChainSyncAction::ImportBlocks{ origin: _, blocks } if blocks.len() == sync.max_blocks_per_request as usize + SyncingAction::ImportBlocks{ origin: _, blocks } if blocks.len() == sync.max_blocks_per_request as usize )); best_block_num += sync.max_blocks_per_request as u32; @@ -610,7 +610,7 @@ fn syncs_fork_without_duplicate_requests() { } else { assert_eq!(actions.len(), 1); match &actions[0] { - ChainSyncAction::SendBlockRequest { peer_id: _, request } => request.clone(), + SyncingAction::SendBlockRequest { peer_id: _, request, key: _ } => request.clone(), action @ _ => panic!("Unexpected action: {action:?}"), } }; @@ -646,7 +646,7 @@ fn syncs_fork_without_duplicate_requests() { assert_eq!(actions.len(), 1); assert!(matches!( &actions[0], - ChainSyncAction::ImportBlocks{ origin: _, blocks } if blocks.len() == max_blocks_to_request as usize + SyncingAction::ImportBlocks{ origin: _, blocks } if blocks.len() == max_blocks_to_request as usize )); best_block_num += max_blocks_to_request as u32; @@ -839,10 +839,10 @@ fn sync_restart_removes_block_but_not_justification_requests() { let actions = sync.take_actions().collect::>(); for action in actions.iter() { match action { - ChainSyncAction::CancelRequest { peer_id } => { + SyncingAction::CancelRequest { peer_id, key: _ } => { pending_responses.remove(&peer_id); }, - ChainSyncAction::SendBlockRequest { peer_id, .. } => { + SyncingAction::SendBlockRequest { peer_id, .. } => { // we drop obsolete response, but don't register a new request, it's checked in // the `assert!` below pending_responses.remove(&peer_id); @@ -852,7 +852,7 @@ fn sync_restart_removes_block_but_not_justification_requests() { } assert!(actions.iter().any(|action| { match action { - ChainSyncAction::SendBlockRequest { peer_id, .. } => peer_id == &peers[0], + SyncingAction::SendBlockRequest { peer_id, .. } => peer_id == &peers[0], _ => false, } })); @@ -943,7 +943,7 @@ fn request_across_forks() { assert_eq!(actions.len(), 1); assert!(matches!( &actions[0], - ChainSyncAction::ImportBlocks{ origin: _, blocks } if blocks.len() == 7_usize + SyncingAction::ImportBlocks{ origin: _, blocks } if blocks.len() == 7_usize )); assert_eq!(sync.best_queued_number, 107); assert_eq!(sync.best_queued_hash, block.hash()); @@ -988,7 +988,7 @@ fn request_across_forks() { assert_eq!(actions.len(), 1); assert!(matches!( &actions[0], - ChainSyncAction::ImportBlocks{ origin: _, blocks } if blocks.len() == 1_usize + SyncingAction::ImportBlocks{ origin: _, blocks } if blocks.len() == 1_usize )); assert!(sync.is_known(&block.header.parent_hash())); }