Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Allow communication between subsystems and outside world
Browse files Browse the repository at this point in the history
  • Loading branch information
montekki committed May 30, 2020
1 parent 30bbc29 commit 564964c
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 35 deletions.
24 changes: 21 additions & 3 deletions overseer/examples/minimal-example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//! * Establishing message passing

use std::time::Duration;
use futures::{pending, executor};
use futures::{pending, pin_mut, executor, select, stream, FutureExt, StreamExt};
use futures_timer::Delay;
use kv_log_macro as log;

Expand Down Expand Up @@ -132,7 +132,25 @@ fn main() {
(SubsystemId::Subsystem2, Box::new(Subsystem2::new())),
];

let overseer = Overseer::new(subsystems, spawner);
overseer.run().await;
let timer_stream = stream::repeat(()).then(|_| async {
Delay::new(Duration::from_secs(1)).await;
});

let (overseer, mut handler) = Overseer::new(subsystems, spawner);
let overseer_fut = overseer.run().fuse();
let timer_stream = timer_stream;

pin_mut!(timer_stream);
pin_mut!(overseer_fut);

loop {
select! {
_ = overseer_fut => break,
_ = timer_stream.next() => {
handler.send_to_subsystem(SubsystemId::Subsystem1, 42usize).await.unwrap();
}
complete => break,
}
}
});
}
114 changes: 82 additions & 32 deletions overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ enum ToOverseer<M: Debug, I> {
/// If that `to` is present the message will be targetedly sent to the intended
/// receiver. The most obvious use case of this is communicating with children.
SubsystemMessage {
to: I,
to: Option<I>,
msg: M,
},
/// A message that wraps something the `Subsystem` is desiring to
Expand All @@ -143,28 +143,51 @@ enum ToOverseer<M: Debug, I> {
},
}

enum Event {
/// Some event from outer world.
enum Event<M, I> {
BlockImport,
BlockFinalized,
MsgToSubsystem {
msg: M,
to: I,
},
Stop,
}

/// Some message that is sent from one of the `Subsystem`s to the outside world.
pub enum OutboundMessage<M, I> {
SubsystemMessage {
msg: M,
from: I,
}
}

/// A handler used to communicate with the [`Overseer`].
///
/// [`Overseer`]: struct.Overseer.html
#[derive(Clone)]
pub struct OverseerHandler {
events_tx: mpsc::Sender<Event>,
pub struct OverseerHandler<M, I> {
events_tx: mpsc::Sender<Event<M, I>>,
outside_rx: mpsc::Receiver<OutboundMessage<M, I>>,
}

impl OverseerHandler {
impl<M, I> OverseerHandler<M, I> {
/// Inform the `Overseer` that that some block was imported.
pub async fn block_imported(&mut self) -> SubsystemResult<()> {
self.events_tx.send(Event::BlockImport).await?;

Ok(())
}

/// Send some message to one of the `Subsystem`s.
pub async fn send_to_subsystem(&mut self, to: I, msg: M) -> SubsystemResult<()> {
self.events_tx.send(Event::MsgToSubsystem {
msg,
to,
}).await?;

Ok(())
}

/// Inform the `Overseer` that that some block was finalized.
pub async fn block_finalized(&mut self) -> SubsystemResult<()> {
self.events_tx.send(Event::BlockFinalized).await?;
Expand All @@ -178,6 +201,10 @@ impl OverseerHandler {

Ok(())
}

pub async fn recv_msg(&mut self) -> Option<OutboundMessage<M, I>> {
self.outside_rx.next().await
}
}

impl<M: Debug, I: Debug> Debug for ToOverseer<M, I> {
Expand Down Expand Up @@ -273,7 +300,17 @@ impl<M: Debug, I> SubsystemContext<M, I> {
/// Send a direct message to some other `Subsystem` you know the `I`d of.
pub async fn send_msg(&mut self, to: I, msg: M) -> SubsystemResult<()> {
self.tx.send(ToOverseer::SubsystemMessage{
to,
to: Some(to),
msg,
}).await?;

Ok(())
}

/// Send a message to some entity that resides outside of the `Overseer`.
pub async fn send_msg_outside(&mut self, msg: M) -> SubsystemResult<()> {

This comment has been minimized.

Copy link
@rphmeier

rphmeier May 30, 2020

Contributor

How is the message addressed? Can you explain the use-case or the control flow a bit more?

Best example I think is the block-authorship Proposer fetching data from a BlockAuthorshipData subsystem. There are numerous examples from the guide of subsystem -> subsystem interaction.

Here are the main broad use-cases:

  • Subsystem posts notification to another subsystem
  • Subsystem requests data from another subsystem and receives response
  • Outside world requests data from a subsystem and receives response
self.tx.send(ToOverseer::SubsystemMessage {
to: None,
msg,
}).await?;

Expand Down Expand Up @@ -333,10 +370,10 @@ pub struct Overseer<M: Debug, S: Spawn, I> {
channel_capacity: usize,

/// Events that are sent to the overseer from the outside world
events_rx: mpsc::Receiver<Event>,
events_rx: mpsc::Receiver<Event<M, I>>,

/// A sender for the `events_rx`, used to return `OverseerHandler` to the user.
events_tx: mpsc::Sender<Event>,
/// A sender to send things to the outside
outside_tx: mpsc::Sender<OutboundMessage<M, I>>,
}

impl<M, S, I> Overseer<M, S, I>
Expand Down Expand Up @@ -370,33 +407,31 @@ where
/// ```
///
/// [`Subsystem`]: trait.Subsystem.html
pub fn new<T: IntoIterator<Item = (I, Box<dyn Subsystem<M, I> + Send>)>>(subsystems: T, s: S) -> Self {
pub fn new<T>(subsystems: T, s: S) -> (Self, OverseerHandler<M, I>)
where
T: IntoIterator<Item = (I, Box<dyn Subsystem<M, I> + Send>)> {
let (events_tx, events_rx) = mpsc::channel(CHANNEL_CAPACITY);
let (outside_tx, outside_rx) = mpsc::channel(CHANNEL_CAPACITY);

let handler = OverseerHandler {
events_tx: events_tx.clone(),
outside_rx,
};

let mut this = Self {
subsystems: HashMap::new(),
s,
running_subsystems: FuturesUnordered::new(),
channel_capacity: CHANNEL_CAPACITY,
events_rx,
events_tx,
outside_tx,
};

for s in subsystems.into_iter() {
let _ = this.spawn(s);
}

this
}

/// Get the [`OverseerHandler`] to communicate with the overseer.
///
/// [`OverseerHandler`]: struct.OverseerHandler.html
pub fn handler(&mut self) -> OverseerHandler {
let events_tx = self.events_tx.clone();

OverseerHandler {
events_tx,
}
(this, handler)
}

// Stop the overseer.
Expand Down Expand Up @@ -430,8 +465,19 @@ where
let mut msgs = Vec::default();

while let Poll::Ready(Some(msg)) = poll!(&mut self.events_rx.next()) {
if let Event::Stop = msg {
return self.stop().await;
match msg {
Event::MsgToSubsystem { msg, to } => {
if let Some(subsystem) = self.subsystems.get_mut(&to) {
if let Some(ref mut i) = subsystem.instance {
let _ = i.tx.send(FromOverseer::Communication {
msg,
from: None,
}).await;
}
}
}
Event::Stop => return self.stop().await,
_ => ()
}
}

Expand All @@ -447,7 +493,7 @@ where
// Do the message dispatching be it broadcasting or direct messages.
for msg in msgs.into_iter() {
match msg.1 {
ToOverseer::SubsystemMessage{ to, msg: m } => {
ToOverseer::SubsystemMessage { to: Some(to), msg: m } => {
if let Some(subsystem) = self.subsystems.get_mut(&to) {
if let Some(ref mut i) = subsystem.instance {
let _ = i.tx.send(FromOverseer::Communication {
Expand All @@ -457,6 +503,12 @@ where
}
}
}
ToOverseer::SubsystemMessage { msg: m, .. } => {
let _ = self.outside_tx.send(OutboundMessage::SubsystemMessage {
msg: m,
from: msg.0,
}).await;
}
ToOverseer::SpawnJob { s, res } => {
let s = self.spawn_job(s);

Expand Down Expand Up @@ -641,8 +693,7 @@ mod tests {
(SubsystemId::Subsystem1, Box::new(TestSubsystem1(s1_tx))),
(SubsystemId::Subsystem2, Box::new(TestSubsystem2(s2_tx))),
];
let mut overseer = Overseer::new(subsystems, spawner);
let mut handler = overseer.handler();
let (overseer, mut handler) = Overseer::new(subsystems, spawner);
let overseer_fut = overseer.run().fuse();

pin_mut!(overseer_fut);
Expand Down Expand Up @@ -688,8 +739,7 @@ mod tests {
let subsystems: Vec<(SubsystemId, Box<dyn Subsystem<usize, SubsystemId> + Send>)> = vec![
(SubsystemId::Subsystem3, Box::new(TestSubsystem3(Some(tx)))),
];
let mut overseer = Overseer::new(subsystems, spawner);
let mut handler = overseer.handler();
let (overseer, mut handler) = Overseer::new(subsystems, spawner);
let overseer_fut = overseer.run().fuse();

let mut rx = rx.fuse();
Expand Down Expand Up @@ -722,7 +772,7 @@ mod tests {
(SubsystemId::Subsystem4, Box::new(TestSubsystem4)),
];

let overseer = Overseer::new(subsystems, spawner);
let (overseer, _) = Overseer::new(subsystems, spawner);
let overseer_fut = overseer.run().fuse();
pin_mut!(overseer_fut);

Expand Down

0 comments on commit 564964c

Please sign in to comment.