From e8cf6dbf439acf01e1336efff2e6fda8aec469b8 Mon Sep 17 00:00:00 2001 From: timorl Date: Fri, 18 Nov 2022 10:32:19 +0100 Subject: [PATCH 1/2] Remove unnecessary abstraction layers in network --- finality-aleph/src/network/io.rs | 21 +- .../src/network/manager/compatibility.rs | 11 +- .../src/network/manager/discovery.rs | 50 ++-- finality-aleph/src/network/manager/mod.rs | 13 - finality-aleph/src/network/manager/service.rs | 222 ++++++++---------- finality-aleph/src/network/mock.rs | 43 ++-- finality-aleph/src/network/mod.rs | 14 +- finality-aleph/src/network/service.rs | 222 +++++++----------- 8 files changed, 254 insertions(+), 342 deletions(-) diff --git a/finality-aleph/src/network/io.rs b/finality-aleph/src/network/io.rs index 815285f800..868115214b 100644 --- a/finality-aleph/src/network/io.rs +++ b/finality-aleph/src/network/io.rs @@ -1,33 +1,42 @@ use futures::channel::mpsc; use crate::network::{ - manager::NetworkData, ConnectionManagerIO, Data, Multiaddress, NetworkServiceIO as NetworkIo, - SessionManagerIO, + manager::{DataInSession, VersionedAuthentication}, + ConnectionManagerIO, Data, Multiaddress, NetworkServiceIO as NetworkIO, SessionManagerIO, }; -type NetworkServiceIO = NetworkIo, M>; +type AuthenticationNetworkIO = NetworkIO, DataInSession, M>; pub fn setup() -> ( ConnectionManagerIO, - NetworkServiceIO, + AuthenticationNetworkIO, SessionManagerIO, ) { // Prepare and start the network let (commands_for_network, commands_from_io) = mpsc::unbounded(); + let (data_for_network, data_from_user) = mpsc::unbounded(); let (messages_for_network, messages_from_user) = mpsc::unbounded(); let (commands_for_service, commands_from_user) = mpsc::unbounded(); let (messages_for_service, commands_from_manager) = mpsc::unbounded(); + let (data_for_user, data_from_network) = mpsc::unbounded(); let (messages_for_user, messages_from_network) = mpsc::unbounded(); let connection_io = ConnectionManagerIO::new( commands_for_network, + data_for_network, messages_for_network, commands_from_user, commands_from_manager, + data_from_network, messages_from_network, ); - let channels_for_network = - NetworkServiceIO::new(messages_from_user, messages_for_user, commands_from_io); + let channels_for_network = NetworkIO::new( + data_from_user, + messages_from_user, + data_for_user, + messages_for_user, + commands_from_io, + ); let channels_for_session_manager = SessionManagerIO::new(commands_for_service, messages_for_service); diff --git a/finality-aleph/src/network/manager/compatibility.rs b/finality-aleph/src/network/manager/compatibility.rs index 5afa8bd306..8e40afac4e 100644 --- a/finality-aleph/src/network/manager/compatibility.rs +++ b/finality-aleph/src/network/manager/compatibility.rs @@ -7,10 +7,7 @@ use codec::{Decode, Encode, Error as CodecError, Input as CodecInput}; use log::warn; use crate::{ - network::{ - manager::{DiscoveryMessage, NetworkData}, - Data, Multiaddress, - }, + network::{manager::DiscoveryMessage, Multiaddress}, Version, }; @@ -26,13 +23,13 @@ pub enum VersionedAuthentication { V1(DiscoveryMessage), } -impl TryInto> for VersionedAuthentication { +impl TryInto> for VersionedAuthentication { type Error = Error; - fn try_into(self) -> Result, Self::Error> { + fn try_into(self) -> Result, Self::Error> { use VersionedAuthentication::*; match self { - V1(message) => Ok(NetworkData::Meta(message)), + V1(message) => Ok(message), Other(v, _) => Err(Error::UnknownVersion(v)), } } diff --git a/finality-aleph/src/network/manager/discovery.rs b/finality-aleph/src/network/manager/discovery.rs index 77edad14cc..4a4f1bb833 100644 --- a/finality-aleph/src/network/manager/discovery.rs +++ b/finality-aleph/src/network/manager/discovery.rs @@ -10,7 +10,7 @@ use log::{debug, info, trace}; use crate::{ network::{ manager::{Authentication, SessionHandler}, - DataCommand, Multiaddress, + Multiaddress, }, NodeIndex, SessionId, }; @@ -40,20 +40,6 @@ pub struct Discovery { _phantom: PhantomData, } -type DiscoveryCommand = ( - DiscoveryMessage, - DataCommand<::PeerId>, -); - -fn authentication_broadcast( - authentication: Authentication, -) -> DiscoveryCommand { - ( - DiscoveryMessage::AuthenticationBroadcast(authentication), - DataCommand::Broadcast, - ) -} - impl Discovery { /// Create a new discovery handler with the given response/broadcast cooldown. pub fn new(cooldown: Duration) -> Self { @@ -68,7 +54,7 @@ impl Discovery { pub fn discover_authorities( &mut self, handler: &SessionHandler, - ) -> Option> { + ) -> Option> { let authentication = match handler.authentication() { Some(authentication) => authentication, None => return None, @@ -77,7 +63,7 @@ impl Discovery { let missing_authorities = handler.missing_nodes(); let node_count = handler.node_count(); info!(target: "aleph-network", "{}/{} authorities known for session {}.", node_count.0-missing_authorities.len(), node_count.0, handler.session_id().0); - Some(authentication_broadcast(authentication)) + Some(DiscoveryMessage::AuthenticationBroadcast(authentication)) } /// Checks the authentication using the handler and returns the addresses we should be @@ -104,7 +90,7 @@ impl Discovery { &mut self, authentication: Authentication, handler: &mut SessionHandler, - ) -> (Vec, Option>) { + ) -> (Vec, Option>) { debug!(target: "aleph-network", "Handling broadcast with authentication {:?}.", authentication); let addresses = self.handle_authentication(authentication.clone(), handler); if addresses.is_empty() { @@ -116,7 +102,10 @@ impl Discovery { } trace!(target: "aleph-network", "Rebroadcasting {:?}.", authentication); self.last_broadcast.insert(node_id, Instant::now()); - (addresses, Some(authentication_broadcast(authentication))) + ( + addresses, + Some(DiscoveryMessage::AuthenticationBroadcast(authentication)), + ) } /// Analyzes the provided message and returns all the new multiaddresses we should @@ -126,7 +115,7 @@ impl Discovery { &mut self, message: DiscoveryMessage, handler: &mut SessionHandler, - ) -> (Vec, Option>) { + ) -> (Vec, Option>) { use DiscoveryMessage::*; match message { AuthenticationBroadcast(authentication) => { @@ -150,7 +139,6 @@ mod tests { network::{ manager::SessionHandler, mock::{crypto_basics, MockMultiaddress, MockPeerId}, - DataCommand, }, SessionId, }; @@ -216,10 +204,7 @@ mod tests { let message = discovery.discover_authorities(handler); assert_eq!( message.expect("there is a discovery message"), - ( - DiscoveryMessage::AuthenticationBroadcast(handler.authentication().unwrap()), - DataCommand::Broadcast - ) + DiscoveryMessage::AuthenticationBroadcast(handler.authentication().unwrap()), ); } } @@ -241,10 +226,9 @@ mod tests { handler, ); assert_eq!(addresses, authentication.0.addresses()); - assert!(matches!(command, Some(( + assert!(matches!(command, Some( DiscoveryMessage::AuthenticationBroadcast(rebroadcast_authentication), - DataCommand::Broadcast, - )) if rebroadcast_authentication == authentication)); + ) if rebroadcast_authentication == authentication)); } #[tokio::test] @@ -256,10 +240,9 @@ mod tests { &mut non_validator, ); assert_eq!(addresses, authentication.0.addresses()); - assert!(matches!(command, Some(( + assert!(matches!(command, Some( DiscoveryMessage::AuthenticationBroadcast(rebroadcast_authentication), - DataCommand::Broadcast, - )) if rebroadcast_authentication == authentication)); + ) if rebroadcast_authentication == authentication)); } #[tokio::test] @@ -292,10 +275,9 @@ mod tests { handler, ); assert_eq!(addresses, authentication.0.addresses()); - assert!(matches!(command, Some(( + assert!(matches!(command, Some( DiscoveryMessage::AuthenticationBroadcast(rebroadcast_authentication), - DataCommand::Broadcast, - )) if rebroadcast_authentication == authentication)); + ) if rebroadcast_authentication == authentication)); } #[tokio::test] diff --git a/finality-aleph/src/network/manager/mod.rs b/finality-aleph/src/network/manager/mod.rs index d7ff87df50..35ab274ec3 100644 --- a/finality-aleph/src/network/manager/mod.rs +++ b/finality-aleph/src/network/manager/mod.rs @@ -77,16 +77,3 @@ impl Encode for DataInSession { self.session_id.encode_to(dest); } } - -impl From> for NetworkData { - fn from(data: DataInSession) -> Self { - NetworkData::Data(data.data, data.session_id) - } -} - -/// The data that should be sent to the network service. -#[derive(Clone, Debug, PartialEq, Eq, Encode, Decode)] -pub enum NetworkData { - Meta(DiscoveryMessage), - Data(D, SessionId), -} diff --git a/finality-aleph/src/network/manager/service.rs b/finality-aleph/src/network/manager/service.rs index fba4ebd456..247d3c3e23 100644 --- a/finality-aleph/src/network/manager/service.rs +++ b/finality-aleph/src/network/manager/service.rs @@ -16,10 +16,10 @@ use crate::{ crypto::{AuthorityPen, AuthorityVerifier}, network::{ manager::{ - Connections, Discovery, DiscoveryMessage, NetworkData, SessionHandler, - SessionHandlerError, + Connections, DataInSession, Discovery, DiscoveryMessage, SessionHandler, + SessionHandlerError, VersionedAuthentication, }, - ConnectionCommand, Data, DataCommand, Multiaddress, NetworkIdentity, PeerId, + AddressedData, ConnectionCommand, Data, Multiaddress, NetworkIdentity, PeerId, }, MillisecsPerBlock, NodeIndex, SessionId, SessionPeriod, STATUS_REPORT_INTERVAL, }; @@ -110,18 +110,19 @@ impl Config { } } -type MessageForNetwork = (NetworkData, DataCommand<::PeerId>); - -pub struct ServiceActions { +/// Actions that the service wants to take as the result of some information. Might contain a +/// command for connecting to or disconnecting from some peers or a message to broadcast for +/// discovery purposes. +pub struct ServiceActions { maybe_command: Option>, - maybe_data: Option>, + maybe_message: Option>, } -impl ServiceActions { +impl ServiceActions { fn noop() -> Self { ServiceActions { maybe_command: None, - maybe_data: None, + maybe_message: None, } } } @@ -186,29 +187,19 @@ impl Service { Self::delete_reserved(self.connections.remove_session(session_id)) } - fn network_message( - (message, command): (DiscoveryMessage, DataCommand), - ) -> MessageForNetwork { - (NetworkData::Meta(message), command) - } - fn discover_authorities( &mut self, session_id: &SessionId, - ) -> Option> { + ) -> Option> { self.sessions.get_mut(session_id).and_then( |Session { handler, discovery, .. - }| { - discovery - .discover_authorities(handler) - .map(Self::network_message) - }, + }| { discovery.discover_authorities(handler) }, ) } /// Returns all the network messages that should be sent as part of discovery at this moment. - pub fn discovery(&mut self) -> Vec> { + pub fn discovery(&mut self) -> Vec> { let sessions: Vec<_> = self.sessions.keys().cloned().collect(); sessions .iter() @@ -231,7 +222,7 @@ impl Service { addresses: Vec, ) -> Result< ( - Option>, + Option>, mpsc::UnboundedReceiver, ), SessionHandlerError, @@ -261,23 +252,18 @@ impl Service { async fn update_validator_session( &mut self, pre_session: PreValidatorSession, - ) -> Result< - ( - ServiceActions, - mpsc::UnboundedReceiver, - ), - SessionHandlerError, - > { + ) -> Result<(ServiceActions, mpsc::UnboundedReceiver), SessionHandlerError> + { let addresses = self.addresses(); let session = match self.sessions.get_mut(&pre_session.session_id) { Some(session) => session, None => { - let (maybe_data, data_from_network) = + let (maybe_message, data_from_network) = self.start_validator_session(pre_session, addresses).await?; return Ok(( ServiceActions { maybe_command: None, - maybe_data, + maybe_message, }, data_from_network, )); @@ -309,7 +295,7 @@ impl Service { Ok(( ServiceActions { maybe_command, - maybe_data: self.discover_authorities(&session_id), + maybe_message: self.discover_authorities(&session_id), }, data_from_network, )) @@ -319,7 +305,7 @@ impl Service { &mut self, pre_session: PreValidatorSession, result_for_user: Option>>, - ) -> Result, SessionHandlerError> { + ) -> Result, SessionHandlerError> { match self.update_validator_session(pre_session.clone()).await { Ok((actions, data_from_network)) => { if let Some(result_for_user) = result_for_user { @@ -393,12 +379,11 @@ impl Service { } /// Handle a session command. - /// Returns a command possibly changing what we should stay connected to and a list of data to - /// be sent over the network. + /// Returns actions the service wants to take or an error if the session command is invalid. pub async fn on_command( &mut self, command: SessionCommand, - ) -> Result, SessionHandlerError> { + ) -> Result, SessionHandlerError> { use SessionCommand::*; match command { StartValidator(session_id, verifier, node_id, pen, result_for_user) => { @@ -421,7 +406,7 @@ impl Service { } Stop(session_id) => Ok(ServiceActions { maybe_command: self.finish_session(session_id), - maybe_data: None, + maybe_message: None, }), } } @@ -430,26 +415,26 @@ impl Service { /// Returns a list of data to be sent over the network. pub fn on_user_message( &self, - message: D, + data: D, session_id: SessionId, recipient: Recipient, - ) -> Vec> { + ) -> Vec, ::PeerId>> { if let Some(handler) = self .sessions .get(&session_id) .map(|session| &session.handler) { - let to_send = NetworkData::Data(message, session_id); + let to_send = DataInSession { data, session_id }; match recipient { Recipient::Everyone => (0..handler.node_count().0) .map(NodeIndex) .flat_map(|node_id| handler.peer_id(&node_id)) - .map(|peer_id| (to_send.clone(), DataCommand::SendTo(peer_id))) + .map(|peer_id| (to_send.clone(), peer_id)) .collect(), Recipient::Node(node_id) => handler .peer_id(&node_id) .into_iter() - .map(|peer_id| (to_send.clone(), DataCommand::SendTo(peer_id))) + .map(|peer_id| (to_send.clone(), peer_id)) .collect(), } } else { @@ -458,18 +443,17 @@ impl Service { } /// Handle a discovery message. - /// Returns a command possibly changing what we should stay connected to and a list of data to - /// be sent over the network. + /// Returns actions the service wants to take. pub fn on_discovery_message( &mut self, message: DiscoveryMessage, - ) -> ServiceActions { + ) -> ServiceActions { let session_id = message.session_id(); match self.sessions.get_mut(&session_id) { Some(Session { handler, discovery, .. }) => { - let (addresses, response) = discovery.handle_message(message, handler); + let (addresses, maybe_message) = discovery.handle_message(message, handler); let maybe_command = match !addresses.is_empty() && handler.is_validator() { true => { debug!(target: "aleph-network", "Adding addresses for session {:?} to reserved: {:?}", session_id, addresses); @@ -485,7 +469,7 @@ impl Service { }; ServiceActions { maybe_command, - maybe_data: response.map(Self::network_message), + maybe_message, } } None => { @@ -514,7 +498,7 @@ impl Service { /// the request. pub async fn retry_session_start( &mut self, - ) -> Result, SessionHandlerError> { + ) -> Result, SessionHandlerError> { let (pre_session, result_for_user) = match self.to_retry.pop() { Some(to_retry) => to_retry, None => return Ok(ServiceActions::noop()), @@ -614,10 +598,12 @@ impl Service { /// Input/output interface for the connectiona manager service. pub struct IO { commands_for_network: mpsc::UnboundedSender>, - messages_for_network: mpsc::UnboundedSender>, + data_for_network: mpsc::UnboundedSender, M::PeerId>>, + authentications_for_network: mpsc::UnboundedSender>, commands_from_user: mpsc::UnboundedReceiver>, messages_from_user: mpsc::UnboundedReceiver<(D, SessionId, Recipient)>, - messages_from_network: mpsc::UnboundedReceiver>, + data_from_network: mpsc::UnboundedReceiver>, + authentications_from_network: mpsc::UnboundedReceiver>, } /// Errors that can happen during the network service operations. @@ -637,26 +623,36 @@ pub enum Error { impl IO { pub fn new( commands_for_network: mpsc::UnboundedSender>, - messages_for_network: mpsc::UnboundedSender>, + data_for_network: mpsc::UnboundedSender, M::PeerId>>, + authentications_for_network: mpsc::UnboundedSender>, commands_from_user: mpsc::UnboundedReceiver>, messages_from_user: mpsc::UnboundedReceiver<(D, SessionId, Recipient)>, - messages_from_network: mpsc::UnboundedReceiver>, + data_from_network: mpsc::UnboundedReceiver>, + authentications_from_network: mpsc::UnboundedReceiver>, ) -> IO { IO { commands_for_network, - messages_for_network, + data_for_network, + authentications_for_network, commands_from_user, messages_from_user, - messages_from_network, + data_from_network, + authentications_from_network, } } - fn send_data(&self, to_send: MessageForNetwork) -> Result<(), Error> { - self.messages_for_network + fn send_data(&self, to_send: AddressedData, M::PeerId>) -> Result<(), Error> { + self.data_for_network .unbounded_send(to_send) .map_err(|_| Error::NetworkSend) } + fn send_authentication(&self, to_send: DiscoveryMessage) -> Result<(), Error> { + self.authentications_for_network + .unbounded_send(VersionedAuthentication::V1(to_send)) + .map_err(|_| Error::NetworkSend) + } + fn send_command(&self, to_send: ConnectionCommand) -> Result<(), Error> { self.commands_for_network .unbounded_send(to_send) @@ -667,30 +663,18 @@ impl IO { &self, ServiceActions { maybe_command, - maybe_data, - }: ServiceActions, + maybe_message, + }: ServiceActions, ) -> Result<(), Error> { if let Some(command) = maybe_command { self.send_command(command)?; } - if let Some(data) = maybe_data { - self.send_data(data)?; + if let Some(message) = maybe_message { + self.send_authentication(message)?; } Ok(()) } - fn on_network_message>( - &self, - service: &mut Service, - message: NetworkData, - ) -> Result<(), Error> { - use NetworkData::*; - match message { - Meta(message) => self.send(service.on_discovery_message(message)), - Data(data, session_id) => service.send_session_data(&session_id, data), - } - } - /// Run the connection manager service with this IO. pub async fn run>( mut self, @@ -726,10 +710,10 @@ impl IO { None => return Err(Error::MessageChannel), } }, - maybe_message = self.messages_from_network.next() => { - trace!(target: "aleph-network", "Manager received a message from network"); - match maybe_message { - Some(message) => if let Err(e) = self.on_network_message(&mut service, message) { + maybe_data = self.data_from_network.next() => { + trace!(target: "aleph-network", "Manager received some data from network"); + match maybe_data { + Some(DataInSession{data, session_id}) => if let Err(e) = service.send_session_data(&session_id, data) { match e { Error::UserSend => trace!(target: "aleph-network", "Failed to send to user in session."), Error::NoSession => trace!(target: "aleph-network", "Received message for unknown session."), @@ -739,6 +723,16 @@ impl IO { None => return Err(Error::NetworkChannel), } }, + maybe_authentication = self.authentications_from_network.next() => { + trace!(target: "aleph-network", "Manager received an authentication from network"); + match maybe_authentication { + Some(authentication) => match authentication.try_into() { + Ok(message) => self.send(service.on_discovery_message(message))?, + Err(e) => warn!(target: "aleph-network", "Error casting versioned authentication to discovery message: {:?}", e), + }, + None => return Err(Error::NetworkChannel), + } + }, _ = maintenance.tick() => { debug!(target: "aleph-network", "Manager starts maintenence"); match service.retry_session_start().await { @@ -746,7 +740,7 @@ impl IO { Err(e) => warn!(target: "aleph-network", "Retry failed to update handler: {:?}", e), } for to_send in service.discovery() { - self.send_data(to_send)?; + self.send_authentication(to_send)?; } }, _ = status_ticker.tick() => { @@ -766,9 +760,9 @@ mod tests { use super::{Config, Error, Service, ServiceActions, SessionCommand}; use crate::{ network::{ - manager::{DiscoveryMessage, NetworkData}, + manager::{DataInSession, DiscoveryMessage}, mock::{crypto_basics, MockNetworkIdentity}, - ConnectionCommand, DataCommand, + ConnectionCommand, }, Recipient, SessionId, }; @@ -792,13 +786,13 @@ mod tests { let session_id = SessionId(43); let ServiceActions { maybe_command, - maybe_data, + maybe_message, } = service .on_command(SessionCommand::StartNonvalidator(session_id, verifier)) .await .unwrap(); assert!(maybe_command.is_none()); - assert!(maybe_data.is_none()); + assert!(maybe_message.is_none()); assert_eq!( service.send_session_data(&session_id, -43), Err(Error::NoSession) @@ -814,7 +808,7 @@ mod tests { let (result_for_user, result_from_service) = oneshot::channel(); let ServiceActions { maybe_command, - maybe_data, + maybe_message, } = service .on_command(SessionCommand::StartValidator( session_id, @@ -826,10 +820,7 @@ mod tests { .await .unwrap(); assert!(maybe_command.is_none()); - assert_eq!( - maybe_data.expect("there is a message").1, - DataCommand::Broadcast - ); + assert!(maybe_message.is_some()); let _data_from_network = result_from_service.await.unwrap(); assert_eq!(service.send_session_data(&session_id, -43), Ok(())); } @@ -843,7 +834,7 @@ mod tests { let (result_for_user, result_from_service) = oneshot::channel(); let ServiceActions { maybe_command, - maybe_data, + maybe_message, } = service .on_command(SessionCommand::StartValidator( session_id, @@ -855,22 +846,19 @@ mod tests { .await .unwrap(); assert!(maybe_command.is_none()); - assert_eq!( - maybe_data.expect("there is a message").1, - DataCommand::Broadcast - ); + assert!(maybe_message.is_some()); assert_eq!(service.send_session_data(&session_id, -43), Ok(())); let mut data_from_network = result_from_service.await.unwrap(); assert_eq!(data_from_network.next().await, Some(-43)); let ServiceActions { maybe_command, - maybe_data, + maybe_message, } = service .on_command(SessionCommand::Stop(session_id)) .await .unwrap(); assert!(maybe_command.is_none()); - assert!(maybe_data.is_none()); + assert!(maybe_message.is_none()); assert_eq!( service.send_session_data(&session_id, -43), Err(Error::NoSession) @@ -896,37 +884,28 @@ mod tests { .unwrap(); let mut other_service = build(); let (node_id, pen) = validator_data[1].clone(); - let ServiceActions { maybe_data, .. } = other_service + let ServiceActions { maybe_message, .. } = other_service .on_command(SessionCommand::StartValidator( session_id, verifier, node_id, pen, None, )) .await .unwrap(); - let broadcast = match maybe_data { - Some((NetworkData::Meta(broadcast), DataCommand::Broadcast)) => broadcast, - maybe_data => panic!( - "Expected discovery massage broadcast, got: {:?}", - maybe_data - ), - }; - let addresses = match &broadcast { + let message = maybe_message.expect("there should be a discovery message"); + let addresses = match &message { DiscoveryMessage::AuthenticationBroadcast((auth_data, _)) => auth_data.addresses(), - _ => panic!("Expected an authentication broadcast, got {:?}", broadcast), + _ => panic!("Expected an authentication broadcast, got {:?}", message), }; let ServiceActions { maybe_command, - maybe_data, - } = service.on_discovery_message(broadcast); + maybe_message, + } = service.on_discovery_message(message); assert_eq!( maybe_command, Some(ConnectionCommand::AddReserved( addresses.into_iter().collect() )) ); - assert_eq!( - maybe_data.expect("there is a message").1, - DataCommand::Broadcast - ); + assert!(maybe_message.is_some()); } #[tokio::test] @@ -947,24 +926,23 @@ mod tests { .unwrap(); let mut other_service = build(); let (node_id, pen) = validator_data[1].clone(); - let ServiceActions { maybe_data, .. } = other_service + let ServiceActions { maybe_message, .. } = other_service .on_command(SessionCommand::StartValidator( session_id, verifier, node_id, pen, None, )) .await .unwrap(); - let broadcast = match maybe_data { - Some((NetworkData::Meta(broadcast), DataCommand::Broadcast)) => broadcast, - maybe_data => panic!( - "Expected discovery massage broadcast, got: {:?}", - maybe_data - ), - }; - service.on_discovery_message(broadcast); + let message = maybe_message.expect("there should be a discovery message"); + service.on_discovery_message(message); let messages = service.on_user_message(2137, session_id, Recipient::Everyone); assert_eq!(messages.len(), 1); - let (network_data, data_command) = &messages[0]; - assert!(matches!(data_command, DataCommand::SendTo(_))); - assert_eq!(network_data, &NetworkData::Data(2137, session_id)); + let (network_data, _) = &messages[0]; + assert_eq!( + network_data, + &DataInSession { + data: 2137, + session_id + } + ); } } diff --git a/finality-aleph/src/network/mock.rs b/finality-aleph/src/network/mock.rs index 615df5a350..885b5bbcea 100644 --- a/finality-aleph/src/network/mock.rs +++ b/finality-aleph/src/network/mock.rs @@ -20,8 +20,8 @@ use tokio::time::timeout; use crate::{ crypto::{AuthorityPen, AuthorityVerifier}, network::{ - manager::NetworkData, ConnectionCommand, DataCommand, Event, EventStream, Multiaddress, - Network, NetworkIdentity, NetworkSender, NetworkServiceIO as NetworkIO, PeerId, Protocol, + manager::VersionedAuthentication, AddressedData, ConnectionCommand, Event, EventStream, + Multiaddress, Network, NetworkIdentity, NetworkSender, NetworkServiceIO, PeerId, Protocol, }, AuthorityId, NodeIndex, }; @@ -155,27 +155,40 @@ impl Default for Channel { pub type MockEvent = Event; pub type MockData = Vec; -type MessageForUser = (NetworkData, DataCommand<::PeerId>); -type NetworkServiceIO = NetworkIO, M>; pub struct MockIO { - pub messages_for_user: mpsc::UnboundedSender>, - pub messages_from_user: mpsc::UnboundedReceiver>, - pub commands_for_manager: mpsc::UnboundedSender>, + pub messages_for_network: mpsc::UnboundedSender>, + pub data_for_network: mpsc::UnboundedSender>, + pub messages_from_network: mpsc::UnboundedReceiver>, + pub data_from_network: mpsc::UnboundedReceiver, + pub commands_for_network: mpsc::UnboundedSender>, } impl MockIO { - pub fn new() -> (MockIO, NetworkServiceIO) { - let (mock_messages_for_user, messages_from_user) = mpsc::unbounded(); - let (messages_for_user, mock_messages_from_user) = mpsc::unbounded(); - let (mock_commands_for_manager, commands_from_manager) = mpsc::unbounded(); + pub fn new() -> ( + MockIO, + NetworkServiceIO, MockData, M>, + ) { + let (messages_for_network, messages_from_user) = mpsc::unbounded(); + let (data_for_network, data_from_user) = mpsc::unbounded(); + let (messages_for_user, messages_from_network) = mpsc::unbounded(); + let (data_for_user, data_from_network) = mpsc::unbounded(); + let (commands_for_network, commands_from_manager) = mpsc::unbounded(); ( MockIO { - messages_for_user: mock_messages_for_user, - messages_from_user: mock_messages_from_user, - commands_for_manager: mock_commands_for_manager, + messages_for_network, + data_for_network, + messages_from_network, + data_from_network, + commands_for_network, }, - NetworkServiceIO::new(messages_from_user, messages_for_user, commands_from_manager), + NetworkServiceIO::new( + data_from_user, + messages_from_user, + data_for_user, + messages_for_user, + commands_from_manager, + ), ) } } diff --git a/finality-aleph/src/network/mod.rs b/finality-aleph/src/network/mod.rs index 2b30ef09e5..cdffb48152 100644 --- a/finality-aleph/src/network/mod.rs +++ b/finality-aleph/src/network/mod.rs @@ -37,8 +37,7 @@ pub use split::{split, Split}; #[cfg(test)] pub mod testing { pub use super::manager::{ - Authentication, DataInSession, DiscoveryMessage, NetworkData, SessionHandler, - VersionedAuthentication, + Authentication, DataInSession, DiscoveryMessage, SessionHandler, VersionedAuthentication, }; } @@ -156,14 +155,6 @@ pub trait RequestBlocks: Clone + Send + Sync + 'static { fn is_major_syncing(&self) -> bool; } -/// What do do with a specific piece of data. -/// Note that broadcast does not specify the protocol, as we only broadcast Generic messages in this sense. -#[derive(Debug, PartialEq, Eq, Clone)] -pub enum DataCommand { - Broadcast, - SendTo(PID), -} - /// Commands for manipulating the reserved peers set. #[derive(Debug, PartialEq, Eq)] pub enum ConnectionCommand { @@ -182,6 +173,9 @@ pub trait Data: Clone + Codec + Send + Sync + 'static {} impl Data for D {} +// In practice D: Data and P: PeerId, but we cannot require that in type aliases. +type AddressedData = (D, P); + /// A generic interface for sending and receiving data. #[async_trait::async_trait] pub trait DataNetwork: Send + Sync { diff --git a/finality-aleph/src/network/service.rs b/finality-aleph/src/network/service.rs index 142e4cfb4c..ada661713a 100644 --- a/finality-aleph/src/network/service.rs +++ b/finality-aleph/src/network/service.rs @@ -5,25 +5,21 @@ use std::{ }; use aleph_primitives::AuthorityId; -use codec::{Decode, Encode}; use futures::{channel::mpsc, StreamExt}; use log::{debug, error, info, trace, warn}; use sc_service::SpawnTaskHandle; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use tokio::time; -use super::manager::DataInSession; use crate::{ network::{ - manager::{NetworkData, VersionedAuthentication}, - ConnectionCommand, Data, DataCommand, Event, EventStream, Multiaddress, Network, + AddressedData, ConnectionCommand, Data, Event, EventStream, Multiaddress, Network, NetworkSender, Protocol, }, validator_network::Network as ValidatorNetwork, STATUS_REPORT_INTERVAL, }; -type MessageFromUser = (NetworkData, DataCommand<::PeerId>); /// A service managing all the direct interaction with the underlying network implementation. It /// handles: /// 1. Incoming network events @@ -31,41 +27,48 @@ type MessageFromUser = (NetworkData, DataCommand< /// 2. Various forms of (dis)connecting, keeping track of all currently connected nodes. /// 2. Commands from the network manager, modifying the reserved peer set. /// 3. Outgoing messages, sending them out, using 1.2. to broadcast. -/// For the time of transition from old validator network (called legacy here) to new tcp validator network -/// we need to support both networks here. To do that we rename legacy network methods to have prefix `legacy_`. -/// We also support two connection managers one for each network. +/// Currently this also handles the validator network for sending in-session data, but this is +/// likely to change in the future. pub struct Service< N: Network, D: Data, + VD: Data, A: Data + Multiaddress, - VN: ValidatorNetwork>, + VN: ValidatorNetwork, > { network: N, validator_network: VN, - messages_from_user: mpsc::UnboundedReceiver>, - messages_for_user: mpsc::UnboundedSender>, + data_from_user: mpsc::UnboundedReceiver>, + messages_from_user: mpsc::UnboundedReceiver, + data_for_user: mpsc::UnboundedSender, + messages_for_user: mpsc::UnboundedSender, commands_from_manager: mpsc::UnboundedReceiver>, authentication_connected_peers: HashSet, - authentication_peer_senders: - HashMap>>, + authentication_peer_senders: HashMap>, spawn_handle: SpawnTaskHandle, } /// Input/output channels for the network service. -pub struct IO { - pub messages_from_user: mpsc::UnboundedReceiver<(D, DataCommand)>, +pub struct IO { + pub data_from_user: mpsc::UnboundedReceiver>, + pub messages_from_user: mpsc::UnboundedReceiver, + pub data_for_user: mpsc::UnboundedSender, pub messages_for_user: mpsc::UnboundedSender, pub commands_from_manager: mpsc::UnboundedReceiver>, } -impl IO { +impl IO { pub fn new( - messages_from_user: mpsc::UnboundedReceiver<(D, DataCommand)>, + data_from_user: mpsc::UnboundedReceiver>, + messages_from_user: mpsc::UnboundedReceiver, + data_for_user: mpsc::UnboundedSender, messages_for_user: mpsc::UnboundedSender, commands_from_manager: mpsc::UnboundedReceiver>, - ) -> IO { + ) -> IO { IO { + data_from_user, messages_from_user, + data_for_user, messages_for_user, commands_from_manager, } @@ -81,20 +84,23 @@ enum SendError { impl< N: Network, D: Data, + VD: Data, A: Data + Multiaddress, - VN: ValidatorNetwork>, - > Service + VN: ValidatorNetwork, + > Service { pub fn new( network: N, validator_network: VN, spawn_handle: SpawnTaskHandle, - io: IO, A>, - ) -> Service { + io: IO, + ) -> Service { Service { network, validator_network, + data_from_user: io.data_from_user, messages_from_user: io.messages_from_user, + data_for_user: io.data_for_user, messages_for_user: io.messages_for_user, commands_from_manager: io.commands_from_manager, spawn_handle, @@ -107,7 +113,7 @@ impl< &mut self, peer: &N::PeerId, protocol: Protocol, - ) -> Option<&mut TracingUnboundedSender>> { + ) -> Option<&mut TracingUnboundedSender> { match protocol { Protocol::Authentication => self.authentication_peer_senders.get_mut(peer), } @@ -116,7 +122,7 @@ impl< fn peer_sender( &self, peer_id: N::PeerId, - mut receiver: TracingUnboundedReceiver>, + mut receiver: TracingUnboundedReceiver, protocol: Protocol, ) -> impl Future + Send + 'static { let network = self.network.clone(); @@ -149,7 +155,7 @@ impl< fn send_to_peer( &mut self, - data: VersionedAuthentication, + data: D, peer: N::PeerId, protocol: Protocol, ) -> Result<(), SendError> { @@ -171,14 +177,11 @@ impl< } } - fn broadcast(&mut self, data: VersionedAuthentication, protocol: Protocol) { + fn broadcast(&mut self, data: D, protocol: Protocol) { let peers = match protocol { - // Validator protocol will never broadcast. Protocol::Authentication => self.authentication_connected_peers.clone(), }; for peer in peers { - // We only broadcast authentication information in this sense, so we use the generic - // Protocol. if let Err(e) = self.send_to_peer(data.clone(), peer.clone(), protocol) { trace!(target: "aleph-network", "Failed to send broadcast to peer{:?}, {:?}", peer, e); } @@ -188,7 +191,7 @@ impl< fn handle_network_event( &mut self, event: Event, - ) -> Result<(), mpsc::TrySendError>> { + ) -> Result<(), mpsc::TrySendError> { use Event::*; match event { Connected(multiaddress) => { @@ -229,19 +232,12 @@ impl< Messages(messages) => { for (protocol, data) in messages.into_iter() { match protocol { - Protocol::Authentication => { - match VersionedAuthentication::::decode(&mut &data[..]) - .map(|a| a.try_into()) - { - Ok(Ok(data)) => self.messages_for_user.unbounded_send(data)?, - Ok(Err(e)) => { - warn!(target: "aleph-network", "Error decoding authentication protocol message: {}", e) - } - Err(e) => { - warn!(target: "aleph-network", "Error decoding authentication protocol message: {}", e) - } + Protocol::Authentication => match D::decode(&mut &data[..]) { + Ok(data) => self.messages_for_user.unbounded_send(data)?, + Err(e) => { + warn!(target: "aleph-network", "Error decoding authentication protocol message: {}", e) } - } + }, }; } } @@ -249,11 +245,8 @@ impl< Ok(()) } - fn handle_validator_network_data( - &mut self, - data: DataInSession, - ) -> Result<(), mpsc::TrySendError>> { - self.messages_for_user.unbounded_send(data.into()) + fn handle_validator_network_data(&mut self, data: VD) -> Result<(), mpsc::TrySendError> { + self.data_for_user.unbounded_send(data) } fn on_manager_command(&mut self, command: ConnectionCommand) { @@ -274,32 +267,6 @@ impl< } } - fn on_user_message(&mut self, data: NetworkData, command: DataCommand) { - use DataCommand::*; - - match data { - NetworkData::Meta(discovery_message) => { - let data: VersionedAuthentication = discovery_message.into(); - match command { - Broadcast => self.broadcast(data, Protocol::Authentication), - SendTo(_) => { - // We ignore this for now. Sending Meta messages to peer is an optimization done for the sake of tests. - } - } - } - NetworkData::Data(data, session_id) => { - match command { - Broadcast => { - // We ignore this for now. AlephBFT does not broadcast data. - } - SendTo(peer) => self - .validator_network - .send(DataInSession { data, session_id }, peer), - } - } - } - } - fn status_report(&self) { let mut status = String::from("Network status report: "); @@ -336,8 +303,15 @@ impl< error!(target: "aleph-network", "Validator network event stream ended."); } }, + maybe_data = self.data_from_user.next() => match maybe_data { + Some((data, peer_id)) => self.validator_network.send(data, peer_id), + None => { + error!(target: "aleph-network", "User data stream ended."); + return; + } + }, maybe_message = self.messages_from_user.next() => match maybe_message { - Some((data, command)) => self.on_user_message(data, command), + Some(message) => self.broadcast(message, Protocol::Authentication), None => { error!(target: "aleph-network", "User message stream ended."); return; @@ -367,19 +341,18 @@ mod tests { use sc_service::TaskManager; use tokio::{runtime::Handle, task::JoinHandle}; - use super::{ConnectionCommand, DataCommand, Service}; + use super::{ConnectionCommand, Service}; use crate::{ network::{ - manager::{DataInSession, SessionHandler, VersionedAuthentication}, + manager::{SessionHandler, VersionedAuthentication}, mock::{ crypto_basics, MockData, MockEvent, MockIO, MockMultiaddress as MockAuthMultiaddress, MockNetwork, MockPeerId as MockAuthPeerId, MockSenderError, }, - testing::{DiscoveryMessage, NetworkData}, + testing::DiscoveryMessage, NetworkIdentity, Protocol, }, - // session::SessionId, testing::mocks::validator_network::{ random_authority_id, MockMultiaddress, MockNetwork as MockValidatorNetwork, }, @@ -390,7 +363,7 @@ mod tests { pub service_handle: JoinHandle<()>, pub exit_tx: oneshot::Sender<()>, pub network: MockNetwork, - pub validator_network: MockValidatorNetwork>, + pub validator_network: MockValidatorNetwork, pub mock_io: MockIO, // `TaskManager` can't be dropped for `SpawnTaskHandle` to work _task_manager: TaskManager, @@ -457,16 +430,13 @@ mod tests { } } - fn message(i: u8) -> DataInSession { - DataInSession { - data: vec![i, i + 1, i + 2], - session_id: SessionId(1), - } + fn message(i: u8) -> MockData { + vec![i, i + 1, i + 2] } async fn authentication( multiaddresses: Vec, - ) -> DiscoveryMessage { + ) -> VersionedAuthentication { let crypto_basics = crypto_basics(1).await; let handler = SessionHandler::new( Some(crypto_basics.0[0].clone()), @@ -476,7 +446,9 @@ mod tests { ) .await .unwrap(); - DiscoveryMessage::AuthenticationBroadcast(handler.authentication().unwrap()) + VersionedAuthentication::V1(DiscoveryMessage::AuthenticationBroadcast( + handler.authentication().unwrap(), + )) } #[tokio::test] @@ -546,8 +518,8 @@ mod tests { let message = authentication(test_data.validator_network.identity().0).await; test_data .mock_io - .messages_for_user - .unbounded_send((NetworkData::Meta(message.clone()), DataCommand::Broadcast)) + .messages_for_network + .unbounded_send(message.clone()) .unwrap(); let broadcasted_messages = HashSet::<_>::from_iter( @@ -559,13 +531,11 @@ mod tests { .into_iter(), ); - let expected_messages = HashSet::from_iter(peer_ids.into_iter().map(|peer_id| { - ( - VersionedAuthentication::V1(message.clone()).encode(), - peer_id, - Protocol::Authentication, - ) - })); + let expected_messages = HashSet::from_iter( + peer_ids + .into_iter() + .map(|peer_id| (message.clone().encode(), peer_id, Protocol::Authentication)), + ); assert_eq!(broadcasted_messages, expected_messages); @@ -600,8 +570,8 @@ mod tests { let message = authentication(test_data.validator_network.identity().0).await; test_data .mock_io - .messages_for_user - .unbounded_send((NetworkData::Meta(message.clone()), DataCommand::Broadcast)) + .messages_for_network + .unbounded_send(message.clone()) .unwrap(); let broadcasted_messages = HashSet::<_>::from_iter( @@ -617,13 +587,7 @@ mod tests { peer_ids .into_iter() .take(opened_authorities_n) - .map(|peer_id| { - ( - VersionedAuthentication::V1(message.clone()).encode(), - peer_id, - Protocol::Authentication, - ) - }), + .map(|peer_id| (message.clone().encode(), peer_id, Protocol::Authentication)), ); assert_eq!(broadcasted_messages, expected_messages); @@ -641,8 +605,8 @@ mod tests { test_data .mock_io - .messages_for_user - .unbounded_send((message.clone().into(), DataCommand::SendTo(peer_id.clone()))) + .data_for_network + .unbounded_send((message.clone(), peer_id.clone())) .unwrap(); let expected = (message, peer_id); @@ -668,16 +632,14 @@ mod tests { test_data.validator_network.next.send(message.clone()); - let expected: NetworkData<_, _> = message.into(); - assert_eq!( test_data .mock_io - .messages_from_user + .data_from_network .next() .await .expect("Should receive message"), - expected, + message, ); test_data.cleanup().await @@ -709,21 +671,17 @@ mod tests { test_data .mock_io - .messages_for_user - .unbounded_send((NetworkData::Meta(message_1), DataCommand::Broadcast)) + .messages_for_network + .unbounded_send(message_1) .unwrap(); test_data .mock_io - .messages_for_user - .unbounded_send((NetworkData::Meta(message_2.clone()), DataCommand::Broadcast)) + .messages_for_network + .unbounded_send(message_2.clone()) .unwrap(); - let expected = ( - VersionedAuthentication::V1(message_2).encode(), - peer_id, - Protocol::Authentication, - ); + let expected = (message_2.encode(), peer_id, Protocol::Authentication); assert_eq!( test_data @@ -764,21 +722,17 @@ mod tests { test_data .mock_io - .messages_for_user - .unbounded_send((NetworkData::Meta(message_1), DataCommand::Broadcast)) + .messages_for_network + .unbounded_send(message_1) .unwrap(); test_data .mock_io - .messages_for_user - .unbounded_send((NetworkData::Meta(message_2.clone()), DataCommand::Broadcast)) + .messages_for_network + .unbounded_send(message_2.clone()) .unwrap(); - let expected = ( - VersionedAuthentication::V1(message_2).encode(), - peer_id, - Protocol::Authentication, - ); + let expected = (message_2.encode(), peer_id, Protocol::Authentication); assert_eq!( test_data @@ -805,19 +759,17 @@ mod tests { test_data.network.emit_event(MockEvent::Messages(vec![( Protocol::Authentication, - VersionedAuthentication::V1(message.clone()).encode().into(), + message.clone().encode().into(), )])); - let expected = NetworkData::Meta(message); - assert_eq!( test_data .mock_io - .messages_from_user + .messages_from_network .next() .await .expect("Should receive message"), - expected + message, ); test_data.cleanup().await @@ -832,7 +784,7 @@ mod tests { test_data .mock_io - .commands_for_manager + .commands_for_network .unbounded_send(ConnectionCommand::AddReserved( iter::once(multiaddress.clone()).collect(), )) @@ -861,7 +813,7 @@ mod tests { test_data .mock_io - .commands_for_manager + .commands_for_network .unbounded_send(ConnectionCommand::DelReserved( iter::once(peer_id.clone()).collect(), )) From 54a2bea726ca8921a942c70efec90c0761c5fb51 Mon Sep 17 00:00:00 2001 From: timorl Date: Mon, 21 Nov 2022 11:47:03 +0100 Subject: [PATCH 2/2] Test name made little sense now --- finality-aleph/src/network/service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/finality-aleph/src/network/service.rs b/finality-aleph/src/network/service.rs index ada661713a..83f4ea22f4 100644 --- a/finality-aleph/src/network/service.rs +++ b/finality-aleph/src/network/service.rs @@ -596,7 +596,7 @@ mod tests { } #[tokio::test] - async fn test_validator_data_command_send_to() { + async fn test_send_validator_data() { let mut test_data = TestData::prepare().await; let peer_id = random_authority_id().await;