Skip to content

Commit

Permalink
service/src/lib.rs: Register network event stream for authority disc (p…
Browse files Browse the repository at this point in the history
…aritytech#678)

* service/src/lib.rs: Register network event stream for authority disc

Previously one would create a sender and receiver channel pair, pass the
sender to the build_network_future through the service builder and
funnel network events returned from polling the network service into the
sender to be consumed by the authority discovery module owning the
receiver.

With recent changes it is now possible to register an event_stream
with the network service directly, thus one does not need to make the
detour through the build_network_future.

This commit is an adjusted clone of one targeting the Substrate
repository.

* service/src/lib.rs: Fix futures::stream imports

* [TMP] *: Replace polkadot-upstream with feature branch

* Revert "[TMP] *: Replace polkadot-upstream with feature branch"

This reverts commit 0c947b04ab80488bfca16c5aeac9657b77a93a44.
  • Loading branch information
mxinden authored and gavofyork committed Jan 6, 2020
1 parent c3661f0 commit eac2ae7
Showing 1 changed file with 14 additions and 14 deletions.
28 changes: 14 additions & 14 deletions service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

pub mod chain_spec;

use futures01::sync::mpsc;
use futures::{FutureExt, TryFutureExt, task::{Spawn, SpawnError, FutureObj}};
use client::LongestChain;
use std::sync::Arc;
Expand Down Expand Up @@ -280,8 +279,12 @@ pub fn new_full<Runtime, Dispatch, Extrinsic>(config: Configuration)
Dispatch: NativeExecutionDispatch + 'static,
Extrinsic: RuntimeExtrinsic,
{
use sc_network::DhtEvent;
use futures::{compat::Stream01CompatExt, stream::StreamExt};
use sc_network::Event;
use futures01::Stream;
use futures::{
compat::Stream01CompatExt,
stream::StreamExt,
};

let is_collator = config.custom.collating_for.is_some();
let is_authority = config.roles.is_authority() && !is_collator;
Expand All @@ -305,19 +308,11 @@ pub fn new_full<Runtime, Dispatch, Extrinsic>(config: Configuration)

let (builder, mut import_setup, inherent_data_providers) = new_full_start!(config, Runtime, Dispatch);

// Dht event channel from the network to the authority discovery module. Use
// bounded channel to ensure back-pressure. Authority discovery is triggering one
// event per authority within the current authority set. This estimates the
// authority set size to be somewhere below 10 000 thereby setting the channel
// buffer size to 10 000.
let (dht_event_tx, dht_event_rx) = mpsc::channel::<DhtEvent>(10000);

let service = builder
.with_network_protocol(|config| Ok(PolkadotProtocol::new(config.custom.collating_for.clone())))?
.with_finality_proof_provider(|client, backend|
Ok(Arc::new(GrandpaFinalityProofProvider::new(backend, client)) as _)
)?
.with_dht_event_tx(dht_event_tx)?
.build()?;

let (block_import, link_half, babe_link) = import_setup.take()
Expand Down Expand Up @@ -441,15 +436,20 @@ pub fn new_full<Runtime, Dispatch, Extrinsic>(config: Configuration)
service.spawn_essential_task(babe);

if authority_discovery_enabled {
let future03_dht_event_rx = dht_event_rx.compat()
let network = service.network();
let dht_event_stream = network.event_stream().filter_map(|e| match e {
Event::Dht(e) => Some(e),
_ => None,
});
let future03_dht_event_stream = dht_event_stream.compat()
.map(|x| x.expect("<mpsc::channel::Receiver as Stream> never returns an error; qed"))
.boxed();
let authority_discovery = authority_discovery::AuthorityDiscovery::new(
service.client(),
service.network(),
network,
sentry_nodes,
service.keystore(),
future03_dht_event_rx,
future03_dht_event_stream,
);
let future01_authority_discovery = authority_discovery.map(|x| Ok(x)).compat();

Expand Down

0 comments on commit eac2ae7

Please sign in to comment.