Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A0-1591: Remove unnecessary abstraction layers in network #741

Merged
merged 2 commits into from
Nov 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions finality-aleph/src/network/io.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,42 @@
use futures::channel::mpsc;

use crate::network::{
manager::NetworkData, ConnectionManagerIO, Data, Multiaddress, NetworkServiceIO as NetworkIo,
SessionManagerIO,
manager::{DataInSession, VersionedAuthentication},
ConnectionManagerIO, Data, Multiaddress, NetworkServiceIO as NetworkIO, SessionManagerIO,
};

type NetworkServiceIO<D, M> = NetworkIo<NetworkData<D, M>, M>;
type AuthenticationNetworkIO<D, M> = NetworkIO<VersionedAuthentication<M>, DataInSession<D>, M>;

pub fn setup<D: Data, M: Multiaddress + 'static>() -> (
maciejnems marked this conversation as resolved.
Show resolved Hide resolved
ConnectionManagerIO<D, M>,
NetworkServiceIO<D, M>,
AuthenticationNetworkIO<D, M>,
SessionManagerIO<D>,
) {
// Prepare and start the network
let (commands_for_network, commands_from_io) = mpsc::unbounded();
let (data_for_network, data_from_user) = mpsc::unbounded();
let (messages_for_network, messages_from_user) = mpsc::unbounded();
maciejnems marked this conversation as resolved.
Show resolved Hide resolved
let (commands_for_service, commands_from_user) = mpsc::unbounded();
let (messages_for_service, commands_from_manager) = mpsc::unbounded();
let (data_for_user, data_from_network) = mpsc::unbounded();
let (messages_for_user, messages_from_network) = mpsc::unbounded();

let connection_io = ConnectionManagerIO::new(
commands_for_network,
data_for_network,
messages_for_network,
commands_from_user,
commands_from_manager,
data_from_network,
messages_from_network,
);
let channels_for_network =
NetworkServiceIO::new(messages_from_user, messages_for_user, commands_from_io);
let channels_for_network = NetworkIO::new(
data_from_user,
messages_from_user,
data_for_user,
messages_for_user,
commands_from_io,
);
let channels_for_session_manager =
SessionManagerIO::new(commands_for_service, messages_for_service);

Expand Down
11 changes: 4 additions & 7 deletions finality-aleph/src/network/manager/compatibility.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ use codec::{Decode, Encode, Error as CodecError, Input as CodecInput};
use log::warn;

use crate::{
network::{
manager::{DiscoveryMessage, NetworkData},
Data, Multiaddress,
},
network::{manager::DiscoveryMessage, Multiaddress},
Version,
};

Expand All @@ -26,13 +23,13 @@ pub enum VersionedAuthentication<M: Multiaddress> {
V1(DiscoveryMessage<M>),
}

impl<D: Data, M: Multiaddress> TryInto<NetworkData<D, M>> for VersionedAuthentication<M> {
impl<M: Multiaddress> TryInto<DiscoveryMessage<M>> for VersionedAuthentication<M> {
type Error = Error;

fn try_into(self) -> Result<NetworkData<D, M>, Self::Error> {
fn try_into(self) -> Result<DiscoveryMessage<M>, Self::Error> {
use VersionedAuthentication::*;
match self {
V1(message) => Ok(NetworkData::Meta(message)),
V1(message) => Ok(message),
Other(v, _) => Err(Error::UnknownVersion(v)),
}
}
Expand Down
50 changes: 16 additions & 34 deletions finality-aleph/src/network/manager/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use log::{debug, info, trace};
use crate::{
network::{
manager::{Authentication, SessionHandler},
DataCommand, Multiaddress,
Multiaddress,
},
NodeIndex, SessionId,
};
Expand Down Expand Up @@ -40,20 +40,6 @@ pub struct Discovery<M: Multiaddress> {
_phantom: PhantomData<M>,
}

type DiscoveryCommand<M> = (
DiscoveryMessage<M>,
DataCommand<<M as Multiaddress>::PeerId>,
);

fn authentication_broadcast<M: Multiaddress>(
authentication: Authentication<M>,
) -> DiscoveryCommand<M> {
(
DiscoveryMessage::AuthenticationBroadcast(authentication),
DataCommand::Broadcast,
)
}

impl<M: Multiaddress> Discovery<M> {
/// Create a new discovery handler with the given response/broadcast cooldown.
pub fn new(cooldown: Duration) -> Self {
Expand All @@ -68,7 +54,7 @@ impl<M: Multiaddress> Discovery<M> {
pub fn discover_authorities(
&mut self,
handler: &SessionHandler<M>,
) -> Option<DiscoveryCommand<M>> {
) -> Option<DiscoveryMessage<M>> {
let authentication = match handler.authentication() {
Some(authentication) => authentication,
None => return None,
Expand All @@ -77,7 +63,7 @@ impl<M: Multiaddress> Discovery<M> {
let missing_authorities = handler.missing_nodes();
let node_count = handler.node_count();
info!(target: "aleph-network", "{}/{} authorities known for session {}.", node_count.0-missing_authorities.len(), node_count.0, handler.session_id().0);
Some(authentication_broadcast(authentication))
Some(DiscoveryMessage::AuthenticationBroadcast(authentication))
}

/// Checks the authentication using the handler and returns the addresses we should be
Expand All @@ -104,7 +90,7 @@ impl<M: Multiaddress> Discovery<M> {
&mut self,
authentication: Authentication<M>,
handler: &mut SessionHandler<M>,
) -> (Vec<M>, Option<DiscoveryCommand<M>>) {
) -> (Vec<M>, Option<DiscoveryMessage<M>>) {
debug!(target: "aleph-network", "Handling broadcast with authentication {:?}.", authentication);
let addresses = self.handle_authentication(authentication.clone(), handler);
if addresses.is_empty() {
Expand All @@ -116,7 +102,10 @@ impl<M: Multiaddress> Discovery<M> {
}
trace!(target: "aleph-network", "Rebroadcasting {:?}.", authentication);
self.last_broadcast.insert(node_id, Instant::now());
(addresses, Some(authentication_broadcast(authentication)))
(
addresses,
Some(DiscoveryMessage::AuthenticationBroadcast(authentication)),
)
}

/// Analyzes the provided message and returns all the new multiaddresses we should
Expand All @@ -126,7 +115,7 @@ impl<M: Multiaddress> Discovery<M> {
&mut self,
message: DiscoveryMessage<M>,
handler: &mut SessionHandler<M>,
) -> (Vec<M>, Option<DiscoveryCommand<M>>) {
) -> (Vec<M>, Option<DiscoveryMessage<M>>) {
use DiscoveryMessage::*;
match message {
AuthenticationBroadcast(authentication) => {
Expand All @@ -150,7 +139,6 @@ mod tests {
network::{
manager::SessionHandler,
mock::{crypto_basics, MockMultiaddress, MockPeerId},
DataCommand,
},
SessionId,
};
Expand Down Expand Up @@ -216,10 +204,7 @@ mod tests {
let message = discovery.discover_authorities(handler);
assert_eq!(
message.expect("there is a discovery message"),
(
DiscoveryMessage::AuthenticationBroadcast(handler.authentication().unwrap()),
DataCommand::Broadcast
)
DiscoveryMessage::AuthenticationBroadcast(handler.authentication().unwrap()),
);
}
}
Expand All @@ -241,10 +226,9 @@ mod tests {
handler,
);
assert_eq!(addresses, authentication.0.addresses());
assert!(matches!(command, Some((
assert!(matches!(command, Some(
DiscoveryMessage::AuthenticationBroadcast(rebroadcast_authentication),
DataCommand::Broadcast,
)) if rebroadcast_authentication == authentication));
) if rebroadcast_authentication == authentication));
}

#[tokio::test]
Expand All @@ -256,10 +240,9 @@ mod tests {
&mut non_validator,
);
assert_eq!(addresses, authentication.0.addresses());
assert!(matches!(command, Some((
assert!(matches!(command, Some(
DiscoveryMessage::AuthenticationBroadcast(rebroadcast_authentication),
DataCommand::Broadcast,
)) if rebroadcast_authentication == authentication));
) if rebroadcast_authentication == authentication));
}

#[tokio::test]
Expand Down Expand Up @@ -292,10 +275,9 @@ mod tests {
handler,
);
assert_eq!(addresses, authentication.0.addresses());
assert!(matches!(command, Some((
assert!(matches!(command, Some(
DiscoveryMessage::AuthenticationBroadcast(rebroadcast_authentication),
DataCommand::Broadcast,
)) if rebroadcast_authentication == authentication));
) if rebroadcast_authentication == authentication));
}

#[tokio::test]
Expand Down
13 changes: 0 additions & 13 deletions finality-aleph/src/network/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,3 @@ impl<D: Data> Encode for DataInSession<D> {
self.session_id.encode_to(dest);
}
}

impl<D: Data, M: Multiaddress> From<DataInSession<D>> for NetworkData<D, M> {
fn from(data: DataInSession<D>) -> Self {
NetworkData::Data(data.data, data.session_id)
}
}

/// The data that should be sent to the network service.
#[derive(Clone, Debug, PartialEq, Eq, Encode, Decode)]
pub enum NetworkData<D: Data, M: Multiaddress> {
Meta(DiscoveryMessage<M>),
Data(D, SessionId),
}
Loading