From 3ccfdab0cab64f387a0d5e64b1b8bfb713f0ec3b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Sat, 9 Nov 2019 09:11:24 +0100 Subject: [PATCH] grandpa: remove the periodic block announcer (#4062) * grandpa: remove the periodic block announcer * grandpa: remove periodic block announcer test --- .../finality-grandpa/src/communication/mod.rs | 13 +- .../src/communication/periodic.rs | 142 ------------------ .../src/communication/tests.rs | 76 ---------- 3 files changed, 3 insertions(+), 228 deletions(-) diff --git a/core/finality-grandpa/src/communication/mod.rs b/core/finality-grandpa/src/communication/mod.rs index 4cc772fe9d4a0..247f9efd2df18 100644 --- a/core/finality-grandpa/src/communication/mod.rs +++ b/core/finality-grandpa/src/communication/mod.rs @@ -276,7 +276,6 @@ pub(crate) struct NetworkBridge> { service: N, validator: Arc>, neighbor_sender: periodic::NeighborPacketSender, - announce_sender: periodic::BlockAnnounceSender, } impl> NetworkBridge { @@ -341,10 +340,9 @@ impl> NetworkBridge { } let (rebroadcast_job, neighbor_sender) = periodic::neighbor_packet_worker(service.clone()); - let (announce_job, announce_sender) = periodic::block_announce_worker(service.clone()); let reporting_job = report_stream.consume(service.clone()); - let bridge = NetworkBridge { service, validator, neighbor_sender, announce_sender }; + let bridge = NetworkBridge { service, validator, neighbor_sender }; let startup_work = futures::future::lazy(move || { // lazily spawn these jobs onto their own tasks. the lazy future has access @@ -352,8 +350,6 @@ impl> NetworkBridge { let mut executor = tokio_executor::DefaultExecutor::current(); executor.spawn(Box::new(rebroadcast_job.select(on_exit.clone()).then(|_| Ok(())))) .expect("failed to spawn grandpa rebroadcast job task"); - executor.spawn(Box::new(announce_job.select(on_exit.clone()).then(|_| Ok(())))) - .expect("failed to spawn grandpa block announce job task"); executor.spawn(Box::new(reporting_job.select(on_exit.clone()).then(|_| Ok(())))) .expect("failed to spawn grandpa reporting job task"); Ok(()) @@ -470,7 +466,6 @@ impl> NetworkBridge { network: self.service.clone(), locals, sender: tx, - announce_sender: self.announce_sender.clone(), has_voted, }; @@ -676,7 +671,6 @@ impl> Clone for NetworkBridge { service: self.service.clone(), validator: Arc::clone(&self.validator), neighbor_sender: self.neighbor_sender.clone(), - announce_sender: self.announce_sender.clone(), } } } @@ -723,7 +717,6 @@ struct OutgoingMessages> { set_id: SetIdNumber, locals: Option<(AuthorityPair, AuthorityId)>, sender: mpsc::UnboundedSender>, - announce_sender: periodic::BlockAnnounceSender, network: N, has_voted: HasVoted, } @@ -781,8 +774,8 @@ impl> Sink for OutgoingMessages "block" => ?target_hash, "round" => ?self.round, "set_id" => ?self.set_id, ); - // send the target block hash to the background block announcer - self.announce_sender.send(target_hash, Vec::new()); + // announce the block we voted on to our peers. + self.network.announce(target_hash, Vec::new()); // propagate the message to peers let topic = round_topic::(self.round, self.set_id); diff --git a/core/finality-grandpa/src/communication/periodic.rs b/core/finality-grandpa/src/communication/periodic.rs index 81c18891d03b5..9dd662ce43461 100644 --- a/core/finality-grandpa/src/communication/periodic.rs +++ b/core/finality-grandpa/src/communication/periodic.rs @@ -16,7 +16,6 @@ //! Periodic rebroadcast of neighbor packets. -use std::collections::VecDeque; use std::time::{Instant, Duration}; use codec::Encode; @@ -32,11 +31,6 @@ use super::{gossip::{NeighborPacket, GossipMessage}, Network}; // how often to rebroadcast, if no other const REBROADCAST_AFTER: Duration = Duration::from_secs(2 * 60); -/// The number of block hashes that we have previously voted on that we should -/// keep around for announcement. The current value should be enough for 3 -/// rounds assuming we have prevoted and precommited on different blocks. -const LATEST_VOTED_BLOCKS_TO_ANNOUNCE: usize = 6; - fn rebroadcast_instant() -> Instant { Instant::now() + REBROADCAST_AFTER } @@ -114,139 +108,3 @@ pub(super) fn neighbor_packet_worker(net: N) -> ( (work, NeighborPacketSender(tx)) } - -/// A background worker for performing block announcements. -struct BlockAnnouncer { - net: N, - block_rx: mpsc::UnboundedReceiver<(B::Hash, Vec)>, - latest_voted_blocks: VecDeque, - reannounce_after: Duration, - delay: Delay, -} - -/// A background worker for announcing block hashes to peers. The worker keeps -/// track of `LATEST_VOTED_BLOCKS_TO_ANNOUNCE` and periodically announces these -/// blocks to all peers if no new blocks to announce are noted (i.e. presumably -/// GRANDPA progress is stalled). -pub(super) fn block_announce_worker>(net: N) -> ( - impl Future, - BlockAnnounceSender, -) { - block_announce_worker_aux(net, REBROADCAST_AFTER) -} - -#[cfg(test)] -pub(super) fn block_announce_worker_with_delay>( - net: N, - reannounce_after: Duration, -) -> ( - impl Future, - BlockAnnounceSender, -) { - block_announce_worker_aux(net, reannounce_after) -} - -fn block_announce_worker_aux>( - net: N, - reannounce_after: Duration, -) -> ( - impl Future, - BlockAnnounceSender, -) { - let latest_voted_blocks = VecDeque::with_capacity(LATEST_VOTED_BLOCKS_TO_ANNOUNCE); - - let (block_tx, block_rx) = mpsc::unbounded(); - - let announcer = BlockAnnouncer { - net, - block_rx, - latest_voted_blocks, - reannounce_after, - delay: Delay::new(Instant::now() + reannounce_after), - }; - - (announcer, BlockAnnounceSender(block_tx)) -} - - -impl BlockAnnouncer { - fn note_block(&mut self, block: B::Hash) -> bool { - if !self.latest_voted_blocks.contains(&block) { - if self.latest_voted_blocks.len() >= LATEST_VOTED_BLOCKS_TO_ANNOUNCE { - self.latest_voted_blocks.pop_front(); - } - - self.latest_voted_blocks.push_back(block); - - true - } else { - false - } - } - - fn reset_delay(&mut self) { - self.delay.reset(Instant::now() + self.reannounce_after); - } -} - -impl> Future for BlockAnnouncer { - type Item = (); - type Error = (); - - fn poll(&mut self) -> Poll { - // note any new blocks to announce and announce them - loop { - match self.block_rx.poll().expect("unbounded receivers do not error; qed") { - Async::Ready(None) => return Ok(Async::Ready(())), - Async::Ready(Some(block)) => { - if self.note_block(block.0) { - self.net.announce(block.0, block.1); - self.reset_delay(); - } - }, - Async::NotReady => break, - } - } - - // check the reannouncement delay timer, has to be done in a loop - // because it needs to be polled after re-scheduling. - loop { - match self.delay.poll() { - Err(e) => { - warn!(target: "afg", "Error in periodic block announcer timer: {:?}", e); - self.reset_delay(); - }, - // after the delay fires announce all blocks that we have - // stored. note that this only happens if we don't receive any - // new blocks above for the duration of `reannounce_after`. - Ok(Async::Ready(())) => { - self.reset_delay(); - - debug!( - target: "afg", - "Re-announcing latest voted blocks due to lack of progress: {:?}", - self.latest_voted_blocks, - ); - - for block in self.latest_voted_blocks.iter() { - self.net.announce(*block, Vec::new()); - } - }, - Ok(Async::NotReady) => return Ok(Async::NotReady), - } - } - } -} - -/// A sender used to send block hashes to announce to a background job. -#[derive(Clone)] -pub(super) struct BlockAnnounceSender(mpsc::UnboundedSender<(B::Hash, Vec)>); - -impl BlockAnnounceSender { - /// Send a block hash for the background worker to announce. - pub fn send(&self, block: B::Hash, associated_data: Vec) { - if let Err(err) = self.0.unbounded_send((block, associated_data)) { - debug!(target: "afg", "Failed to send block to background announcer: {:?}", err); - } - } -} diff --git a/core/finality-grandpa/src/communication/tests.rs b/core/finality-grandpa/src/communication/tests.rs index af6d842be3c02..e8b399aef39b6 100644 --- a/core/finality-grandpa/src/communication/tests.rs +++ b/core/finality-grandpa/src/communication/tests.rs @@ -504,79 +504,3 @@ fn peer_with_higher_view_leads_to_catch_up_request() { current_thread::block_on_all(test).unwrap(); } - -#[test] -fn periodically_reannounce_voted_blocks_on_stall() { - use futures::try_ready; - use std::collections::HashSet; - use std::sync::Arc; - use std::time::Duration; - use parking_lot::Mutex; - - let (tester, net) = make_test_network(); - let (announce_worker, announce_sender) = super::periodic::block_announce_worker_with_delay( - net, - Duration::from_secs(1), - ); - - let hashes = Arc::new(Mutex::new(Vec::new())); - - fn wait_all(tester: Tester, hashes: &[Hash]) -> impl Future { - struct WaitAll { - remaining_hashes: Arc>>, - events_fut: Box>, - } - - impl Future for WaitAll { - type Item = Tester; - type Error = (); - - fn poll(&mut self) -> Poll { - let tester = try_ready!(self.events_fut.poll()); - - if self.remaining_hashes.lock().is_empty() { - return Ok(Async::Ready(tester)); - } - - let remaining_hashes = self.remaining_hashes.clone(); - self.events_fut = Box::new(tester.filter_network_events(move |event| match event { - Event::Announce(h) => - remaining_hashes.lock().remove(&h) || panic!("unexpected announce"), - _ => false, - })); - - self.poll() - } - } - - WaitAll { - remaining_hashes: Arc::new(Mutex::new(hashes.iter().cloned().collect())), - events_fut: Box::new(futures::future::ok(tester)), - } - } - - let test = tester - .and_then(move |tester| { - current_thread::spawn(announce_worker); - Ok(tester) - }) - .and_then(|tester| { - // announce 12 blocks - for _ in 0..=12 { - let hash = Hash::random(); - hashes.lock().push(hash); - announce_sender.send(hash, Vec::new()); - } - - // we should see an event for each of those announcements - wait_all(tester, &hashes.lock()) - }) - .and_then(|tester| { - // after a period of inactivity we should see the last - // `LATEST_VOTED_BLOCKS_TO_ANNOUNCE` being rebroadcast - wait_all(tester, &hashes.lock()[7..=12]) - }); - - let mut runtime = current_thread::Runtime::new().unwrap(); - runtime.block_on(test).unwrap(); -}