Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
impl Stream for Jobs<Spawner, Job>
Browse files Browse the repository at this point in the history
This turns out to be relatively complicated and requires some
unsafe code, so we'll want either detailed review, or to choose
to revert this commit.
  • Loading branch information
coriolinus committed Jul 13, 2020
1 parent fa595c2 commit 940c111
Showing 1 changed file with 56 additions and 12 deletions.
68 changes: 56 additions & 12 deletions node/subsystem/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ use futures::{
future::Either,
prelude::*,
select,
task::{Spawn, SpawnError, SpawnExt},
stream::Stream,
task::{self, Spawn, SpawnError, SpawnExt},
};
use futures_timer::Delay;
use keystore::KeyStorePtr;
Expand All @@ -42,6 +43,7 @@ use sp_core::Pair;
use std::{
collections::HashMap,
convert::{TryFrom, TryInto},
marker::Unpin,
pin::Pin,
time::Duration,
};
Expand Down Expand Up @@ -296,7 +298,7 @@ impl<ToJob: ToJobTrait> JobHandle<ToJob> {
/// Jobs are instantiated and killed automatically on appropriate overseer messages.
/// Other messages are passed along to and from the job via the overseer to other
/// subsystems.
pub trait JobTrait {
pub trait JobTrait: Unpin {
/// Message type to the job. Typically a subset of AllMessages.
type ToJob: 'static + ToJobTrait + Send;
/// Message type from the job. Typically a subset of AllMessages.
Expand Down Expand Up @@ -413,25 +415,67 @@ impl<Spawner: Spawn, Job: JobTrait> Jobs<Spawner, Job> {
Ok(())
}

/// Get the next message from any of the underlying jobs.
async fn next(&mut self) -> Option<Job::FromJob> {
self.outgoing_msgs.next().await.and_then(|(e, _)| match e {
StreamYield::Item(e) => Some(e),
_ => None,
})
/// Get the pin projection for the `outgoing_msgs` field
///
/// From the [pin docs]:
///
/// > It is actually up to the author of the data structure to decide whether the pinned
/// > projection for a particular field turns `Pin<&mut Struct>` into `Pin<&mut Field>`
/// > or `&mut Field`. There are some constraints, though, and the most important constraint
/// > is _consistency_: every field can be _either_ projected to a pinned reference, _or_
/// > have pinning removed as part of the projection. If both are done for the same field,
/// > that will likely be unsound!
///
/// In this case, pinning is structural.
///
/// ## Considerations
///
/// 1. The struct must only be `Unpin` if all the structural fields are `Unpin`: ✔
/// 2. The destructor of the struct must not move structural fields out of its argument: ✔
/// 3. Uphold the `Drop` guarantee: once the struct is pinned, the memory which contains
/// the content is not overwritten or deallocated without calling the content's destructors.
/// I.e. you may not free or reuse the storage without calling `drop`. ✔
/// 4. You must not offer any other operations (i.e. `take`) which could lead to data being
/// moved out of the structural fields when your type is pinned: ✔
///
/// [pin docs]: https://doc.rust-lang.org/std/pin/index.html#projections-and-structural-pinning
fn pin_get_outgoing_msgs(self: Pin<&mut Self>) -> Pin<&mut StreamUnordered<mpsc::Receiver<Job::FromJob>>> {
// This is ok because `self.outgoing_msgs` is pinned when `self` is.
unsafe { self.map_unchecked_mut(|s| &mut s.outgoing_msgs) }

This comment has been minimized.

Copy link
@rphmeier

rphmeier Jul 13, 2020

Contributor

Would really prefer to avoid unsafe code if possible. The next() API is preferable to that.

This comment has been minimized.

Copy link
@rphmeier

rphmeier Jul 13, 2020

Contributor

However I can also suggest an alternative that might work: have a method on Jobs which returns an &mut impl Stream. So the Stream implementation doesn't need to be on Jobs itself, which is where the unsafe complexity seems to come from.

}
}

// Note that on drop, we don't have the chance to gracefully spin down each of the remaining handles;
// we just abort them all. Still better than letting them dangle.
impl<Spawner, Job: JobTrait> Drop for Jobs<Spawner, Job> {
fn drop(&mut self) {
for job_handle in self.running.values() {
job_handle.abort_handle.abort();
// `new_unchecked` is ok because we know this value is never used again after being dropped
inner_drop(unsafe { Pin::new_unchecked(self) });
fn inner_drop<Spawner, Job: JobTrait>(slf: Pin<&mut Jobs<Spawner, Job>>) {
for job_handle in slf.running.values() {
job_handle.abort_handle.abort();
}
}
}
}

impl<Spawner, Job> Stream for Jobs<Spawner, Job>
where
Spawner: Spawn,
Job: JobTrait,
{
type Item = Job::FromJob;

fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context) -> task::Poll<Option<Self::Item>> {
self.pin_get_outgoing_msgs()
.poll_next(cx)
.map(|opt| opt.and_then(|(stream_yield, _)| match stream_yield {
StreamYield::Item(msg) => Some(msg),
StreamYield::Finished(_) => None,
}))
}
}

/// A basic implementation of a subsystem.
///
/// This struct is responsible for handling message traffic between
Expand All @@ -447,7 +491,7 @@ pub struct JobManager<Spawner, Context, Job: JobTrait> {

impl<Spawner, Context, Job> JobManager<Spawner, Context, Job>
where
Spawner: Spawn + Clone + Send,
Spawner: Spawn + Clone + Send + Unpin,
Context: SubsystemContext,
Job: JobTrait,
Job::RunArgs: Clone,
Expand Down Expand Up @@ -552,7 +596,7 @@ where

impl<Spawner, Context, Job> Subsystem<Context> for JobManager<Spawner, Context, Job>
where
Spawner: Spawn + Send + Clone + 'static,
Spawner: Spawn + Send + Clone + Unpin + 'static,
Context: SubsystemContext,
<Context as SubsystemContext>::Message: Into<Job::ToJob>,
Job: JobTrait + Send,
Expand Down

0 comments on commit 940c111

Please sign in to comment.