diff --git a/Cargo.lock b/Cargo.lock index 3d1e5f1c..b46b2245 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2907,7 +2907,9 @@ dependencies = [ "color-eyre", "futures", "orb-messages", + "pin-project", "prost", + "thiserror", "tokio", "tokio-serial", "tracing", @@ -2922,6 +2924,7 @@ dependencies = [ "clap", "color-eyre", "crc32fast", + "futures", "image", "orb-mcu-interface", "tokio", @@ -3152,18 +3155,18 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.1.3" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422" +checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.3" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" +checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", diff --git a/mcu-interface/Cargo.toml b/mcu-interface/Cargo.toml index fe7f346b..c9a4b440 100644 --- a/mcu-interface/Cargo.toml +++ b/mcu-interface/Cargo.toml @@ -14,12 +14,14 @@ rust-version.workspace = true async-trait = "0.1.77" can-rs = { path = "../can", features = ["isotp"] } color-eyre.workspace = true +futures.workspace = true orb-messages.workspace = true prost = "0.12.3" -tokio.workspace = true +pin-project = "1.1.5" +thiserror.workspace = true tokio-serial = "5.4.1" +tokio.workspace = true tracing.workspace = true -futures.workspace = true [package.metadata.orb] unsupported_targets = [ diff --git a/mcu-interface/examples/log_messages.rs b/mcu-interface/examples/log_messages.rs index b563977b..c7dfb146 100644 --- a/mcu-interface/examples/log_messages.rs +++ b/mcu-interface/examples/log_messages.rs @@ -19,8 +19,9 @@ async fn main() -> Result<()> { .init(); let (msg_tx, mut msg_rx) = tokio::sync::mpsc::unbounded_channel(); - let _iface = CanRawMessaging::new(String::from("can0"), Device::Security, msg_tx) - .wrap_err("failed to create messaging interface")?; + let (iface, task_handle) = + CanRawMessaging::new(String::from("can0"), Device::Security, msg_tx) + .wrap_err("failed to create messaging interface")?; let recv_fut = async { while let Some(msg) = msg_rx.recv().await { @@ -29,8 +30,13 @@ async fn main() -> Result<()> { }; tokio::select! { - () = recv_fut => Ok(()), - result = tokio::signal::ctrl_c() => { println!("ctrl-c detected"); result.wrap_err("failed to listen for ctrl-c")} - + () = recv_fut => (), + result = tokio::signal::ctrl_c() => { + println!("ctrl-c detected"); + result.wrap_err("failed to listen for ctrl-c")?; + } } + + drop(iface); + task_handle.await.wrap_err("can task terminated uncleanly") } diff --git a/mcu-interface/src/can/canfd.rs b/mcu-interface/src/can/canfd.rs index 14ce0d0a..8e82416b 100644 --- a/mcu-interface/src/can/canfd.rs +++ b/mcu-interface/src/can/canfd.rs @@ -6,6 +6,7 @@ use color_eyre::eyre::{eyre, Context, Result}; use futures::FutureExt as _; use orb_messages::CommonAckError; use prost::Message; +use std::panic::AssertUnwindSafe; use std::sync::atomic::{AtomicU16, Ordering}; use std::sync::Arc; use tokio::sync::{mpsc, oneshot}; @@ -18,7 +19,7 @@ use crate::{ MessagingInterface, }; -use super::ACK_RX_TIMEOUT; +use super::{CanTaskHandle, CanTaskJoinError, CanTaskPanic, ACK_RX_TIMEOUT}; pub struct CanRawMessaging { stream: FrameStream, @@ -31,12 +32,15 @@ pub struct CanRawMessaging { impl CanRawMessaging { /// CanRawMessaging opens a CAN stream filtering messages addressed only to the Jetson - /// and start listening for incoming messages in a new blocking thread + /// and start listening for incoming messages in a new blocking thread. + /// + /// Returns a handle to join on the blocking thread and retrieve any errors it + /// produces. pub fn new( bus: String, can_node: Device, new_message_queue: mpsc::UnboundedSender, - ) -> Result { + ) -> Result<(Self, CanTaskHandle)> { // open socket let stream = FrameStream::::build() .nonblocking(false) @@ -55,18 +59,42 @@ impl CanRawMessaging { let (ack_tx, ack_rx) = mpsc::unbounded_channel(); let (kill_tx, kill_rx) = oneshot::channel(); + let (task_join_tx, task_join_rx) = oneshot::channel(); + let task_join_rx = CanTaskHandle(task_join_rx); let stream_copy = stream.try_clone()?; - tokio::task::spawn_blocking(move || { - can_rx(stream_copy, can_node, ack_tx, new_message_queue, kill_rx) + // We directly spawn a thread instead of tokio::task::spawn_blocking, + // for two reaasons: + // + // 1. Under normal conditions, this closure runs forever, and tokio + // advises only using spawn_blocking for operations that "eventually + // finish on their own" + // 2. tokio::main will not return until all tasks are completed. And + // unlike regular async tasks, blocking tasks cannot be cancelled. + // `kill_tx` partially solves this, but I think its just better to + // decouple tokio from this task. + std::thread::spawn(move || { + let result: Result<(), CanTaskJoinError> = + match std::panic::catch_unwind(AssertUnwindSafe(|| { + can_rx(stream_copy, can_node, ack_tx, new_message_queue, kill_rx) + })) { + Ok(Ok(())) => Ok(()), + Ok(Err(err)) => Err(CanTaskJoinError::Err(err)), + Err(panic) => Err(CanTaskPanic::new(panic).into()), + }; + debug!(result=?result, "raw can_rx task terminated"); + task_join_tx.send(result) }); - Ok(Self { - stream, - ack_num_lsb: AtomicU16::new(0), - ack_queue: ack_rx, - can_node, - _kill_tx: kill_tx, - }) + Ok(( + Self { + stream, + ack_num_lsb: AtomicU16::new(0), + ack_queue: ack_rx, + can_node, + _kill_tx: kill_tx, + }, + task_join_rx, + )) } async fn wait_ack(&mut self, expected_ack_number: u32) -> Result { @@ -129,6 +157,7 @@ fn can_rx( Err(oneshot::error::TryRecvError::Empty) => (), } + trace!("reading from raw"); match stream.recv(&mut frame, 0) { Ok(_nbytes) => { break; diff --git a/mcu-interface/src/can/isotp.rs b/mcu-interface/src/can/isotp.rs index df92d05a..8d5d02a0 100644 --- a/mcu-interface/src/can/isotp.rs +++ b/mcu-interface/src/can/isotp.rs @@ -4,6 +4,7 @@ use futures::FutureExt as _; use orb_messages::CommonAckError; use prost::Message; use std::io::{Read, Write}; +use std::panic::AssertUnwindSafe; use std::sync::atomic::{AtomicU16, Ordering}; use tokio::sync::{mpsc, oneshot}; use tokio::time::timeout; @@ -13,12 +14,13 @@ use can_rs::isotp::addr::CanIsotpAddr; use can_rs::isotp::stream::IsotpStream; use can_rs::{Id, CAN_DATA_LEN}; +use crate::can::CanTaskPanic; use crate::{ create_ack, handle_main_mcu_message, handle_sec_mcu_message, McuPayload, MessagingInterface, }; -use super::ACK_RX_TIMEOUT; +use super::{CanTaskHandle, CanTaskJoinError, ACK_RX_TIMEOUT}; /// ISO-TP addressing scheme /// 11-bit standard ID @@ -92,6 +94,9 @@ impl CanIsoTpMessaging { /// pairs of addresses, one for transmission of ISO-TP messages and one for reception. /// A blocking thread is created for listening to new incoming messages. /// + /// Returns a handle to join on the blocking thread and retrieve any errors it + /// produces. + /// /// One pair of addresses _should_ be uniquely used on the bus to prevent misinterpretation of /// transmitted messages. /// If a pair of addresses is used by several programs, they must ensure one, and only one, @@ -101,7 +106,7 @@ impl CanIsoTpMessaging { local: IsoTpNodeIdentifier, remote: IsoTpNodeIdentifier, new_message_queue: mpsc::UnboundedSender, - ) -> Result { + ) -> Result<(Self, CanTaskHandle)> { let (tx_stdid_src, tx_stdid_dst) = create_pair(local, remote)?; debug!("Sending on 0x{:x}->0x{:x}", tx_stdid_src, tx_stdid_dst); @@ -119,17 +124,40 @@ impl CanIsoTpMessaging { let (ack_tx, ack_rx) = mpsc::unbounded_channel(); let (kill_tx, kill_rx) = oneshot::channel(); - // spawn CAN receiver - tokio::task::spawn_blocking(move || { - can_rx(bus, remote, local, ack_tx, new_message_queue, kill_rx) + let (task_join_tx, task_join_rx) = oneshot::channel(); + let task_join_rx = CanTaskHandle(task_join_rx); + // We directly spawn a thread instead of tokio::task::spawn_blocking, + // for two reaasons: + // + // 1. Under normal conditions, this closure runs forever, and tokio + // advises only using spawn_blocking for operations that "eventually + // finish on their own" + // 2. tokio::main will not return until all tasks are completed. And + // unlike regular async tasks, blocking tasks cannot be cancelled. + // `kill_tx` partially solves this, but I think its just better to + // decouple tokio from this task. + std::thread::spawn(move || { + let result: Result<(), CanTaskJoinError> = + match std::panic::catch_unwind(AssertUnwindSafe(|| { + can_rx(bus, remote, local, ack_tx, new_message_queue, kill_rx) + })) { + Ok(Ok(())) => Ok(()), + Ok(Err(err)) => Err(CanTaskJoinError::Err(err)), + Err(panic) => Err(CanTaskPanic::new(panic).into()), + }; + debug!(result=?result, "isotp can_rx task terminated"); + task_join_tx.send(result) }); - Ok(CanIsoTpMessaging { - stream: tx_isotp_stream, - ack_num_lsb: AtomicU16::new(0), - ack_queue: ack_rx, - _kill_tx: kill_tx, - }) + Ok(( + CanIsoTpMessaging { + stream: tx_isotp_stream, + ack_num_lsb: AtomicU16::new(0), + ack_queue: ack_rx, + _kill_tx: kill_tx, + }, + task_join_rx, + )) } async fn wait_ack(&mut self, expected_ack_number: u32) -> Result { @@ -206,6 +234,7 @@ fn can_rx( Err(oneshot::error::TryRecvError::Empty) => (), } + trace!("reading from isotp"); let buffer = match rx_isotp_stream.read(&mut buffer) { Ok(_) => buffer, Err(e) => { diff --git a/mcu-interface/src/can/mod.rs b/mcu-interface/src/can/mod.rs index 42e82295..89b017df 100644 --- a/mcu-interface/src/can/mod.rs +++ b/mcu-interface/src/can/mod.rs @@ -1,6 +1,80 @@ -use std::time::Duration; +use std::{any::Any, task::Poll, time::Duration}; + +use pin_project::pin_project; +use tokio::sync::oneshot; pub mod canfd; pub mod isotp; const ACK_RX_TIMEOUT: Duration = Duration::from_millis(1500); + +pub type CanTaskResult = Result<(), CanTaskJoinError>; + +/// Handle that can be used to detect errors in the can receive task. +/// +/// Note that dropping this handle doesn't kill the task. +#[pin_project] +#[derive(Debug)] +pub struct CanTaskHandle(#[pin] oneshot::Receiver); + +impl std::future::Future for CanTaskHandle { + type Output = CanTaskResult; + + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll { + let rx = self.project().0; + rx.poll(cx).map(|recv| match recv { + Ok(Ok(())) | Err(oneshot::error::RecvError { .. }) => Ok(()), + Ok(Err(err)) => Err(err), + }) + } +} + +impl CanTaskHandle { + /// Blocks until the task is complete. + /// + /// It is recommended to simply .await instead, since `CanTaskHandle` implements + /// `Future`. + pub fn join(self) -> CanTaskResult { + match self.0.blocking_recv() { + Ok(Ok(())) | Err(oneshot::error::RecvError { .. }) => Ok(()), + Ok(Err(err)) => Err(err), + } + } +} + +#[derive(thiserror::Error, Debug)] +pub enum CanTaskJoinError { + #[error(transparent)] + Panic(#[from] CanTaskPanic), + #[error(transparent)] + Err(#[from] color_eyre::Report), +} + +#[derive(thiserror::Error)] +#[error("panic in thread used to receive from canbus")] +// Mutex is there to make it implement Sync without using `unsafe` +pub struct CanTaskPanic(std::sync::Mutex>); + +impl CanTaskPanic { + fn new(err: Box) -> Self { + Self(std::sync::Mutex::new(err)) + } + + /// Returns the object with which the task panicked. + /// + /// You can pass this into [`std::panic::resume_unwind()`] to propagate the + /// panic. + pub fn into_panic(self) -> Box { + self.0.into_inner().expect("infallible") + } +} + +impl std::fmt::Debug for CanTaskPanic { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_tuple(std::any::type_name::()) + .finish() + } +} diff --git a/mcu-util/Cargo.toml b/mcu-util/Cargo.toml index 5d619807..e5c83ea2 100644 --- a/mcu-util/Cargo.toml +++ b/mcu-util/Cargo.toml @@ -20,6 +20,7 @@ orb-mcu-interface.path = "../mcu-interface" tokio.workspace = true tracing-subscriber.workspace = true tracing.workspace = true +futures.workspace = true [package.metadata.orb] unsupported_targets = [ diff --git a/mcu-util/src/main.rs b/mcu-util/src/main.rs index 71c18fe5..0e054723 100644 --- a/mcu-util/src/main.rs +++ b/mcu-util/src/main.rs @@ -122,16 +122,15 @@ enum SecureElement { } async fn execute(args: Args) -> Result<()> { - let mut orb = Orb::new().await?; + let (mut orb, orb_tasks) = Orb::new().await?; match args.subcmd { SubCommand::Info => { let orb_info = orb.get_info().await?; debug!("{:?}", orb_info); println!("{:#}", orb_info); - Ok(()) } - SubCommand::Reboot(mcu) => orb.borrow_mut_mcu(mcu).reboot(None).await, + SubCommand::Reboot(mcu) => orb.borrow_mut_mcu(mcu).reboot(None).await?, SubCommand::Dump(DumpOpts { mcu, duration, @@ -139,20 +138,20 @@ async fn execute(args: Args) -> Result<()> { }) => { orb.borrow_mut_mcu(mcu) .dump(duration.map(Duration::from_secs), logs_only) - .await + .await? } SubCommand::Stress(StressOpts { duration, mcu }) => { orb.borrow_mut_mcu(mcu) .stress_test(duration.map(Duration::from_secs)) - .await + .await? } SubCommand::Image(Image::Switch(mcu)) => { - orb.borrow_mut_mcu(mcu).switch_images().await + orb.borrow_mut_mcu(mcu).switch_images().await? } SubCommand::Image(Image::Update(opts)) => { orb.borrow_mut_mcu(opts.mcu) .update_firmware(&opts.path, opts.can_fd) - .await + .await? } SubCommand::HardwareRevision { filename } => { let hw_str = orb.get_revision().await?; @@ -170,16 +169,27 @@ async fn execute(args: Args) -> Result<()> { })?; } } - Ok(()) } SubCommand::SecureElement(opts) => match opts { SecureElement::PowerCycle => { orb.borrow_mut_sec_board() .power_cycle_secure_element() - .await + .await? } }, } + + // Kills the tasks + drop(orb); + // Timeout because tasks might never check the kill signal because they are busy + // waiting to receive another message. In the event we timeout, most likely there + // have been no errors. + // TODO: We need to make the synchronous actually use nonblocking code to make the + // timeout unecessary + match tokio::time::timeout(Duration::from_millis(100), orb_tasks.join()).await { + Ok(result) => result, + Err(tokio::time::error::Elapsed { .. }) => Ok(()), + } } #[tokio::main] diff --git a/mcu-util/src/orb/main_board.rs b/mcu-util/src/orb/main_board.rs index 6a48af62..ffe7864c 100644 --- a/mcu-util/src/orb/main_board.rs +++ b/mcu-util/src/orb/main_board.rs @@ -1,10 +1,9 @@ use async_trait::async_trait; use color_eyre::eyre::{eyre, Result, WrapErr as _}; -use std::ops::Sub; use std::time::Duration; use tokio::sync::mpsc; use tokio::time; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, warn}; use orb_mcu_interface::can::canfd::CanRawMessaging; use orb_mcu_interface::can::isotp::{CanIsoTpMessaging, IsoTpNodeIdentifier}; @@ -17,6 +16,8 @@ use crate::orb::revision::OrbRevision; use crate::orb::{dfu, BatteryStatus}; use crate::orb::{Board, OrbInfo}; +use super::BoardTaskHandles; + const REBOOT_DELAY: u32 = 3; pub struct MainBoard { @@ -44,15 +45,15 @@ impl MainBoardBuilder { } } - pub async fn build(self) -> Result { - let mut canfd_iface = CanRawMessaging::new( + pub async fn build(self) -> Result<(MainBoard, BoardTaskHandles)> { + let (mut canfd_iface, raw_can_task_handle) = CanRawMessaging::new( String::from("can0"), Device::Main, self.message_queue_tx.clone(), ) .wrap_err("Failed to create CanRawMessaging for MainBoard")?; - let isotp_iface = CanIsoTpMessaging::new( + let (isotp_iface, isotp_can_task_handle) = CanIsoTpMessaging::new( String::from("can0"), IsoTpNodeIdentifier::JetsonApp7, IsoTpNodeIdentifier::MainMcu, @@ -75,12 +76,18 @@ impl MainBoardBuilder { )) .await?; - Ok(MainBoard { - canfd_iface, - isotp_iface, - serial_iface, - message_queue_rx: self.message_queue_rx, - }) + Ok(( + MainBoard { + canfd_iface, + isotp_iface, + serial_iface, + message_queue_rx: self.message_queue_rx, + }, + BoardTaskHandles { + raw: raw_can_task_handle, + isotp: isotp_can_task_handle, + }, + )) } } @@ -386,46 +393,69 @@ impl MainBoardInfo { error!("error asking for battery status: {e}"); } - let mut now = std::time::Instant::now(); - let mut timeout = std::time::Duration::from_secs(2); + match tokio::time::timeout( + Duration::from_secs(2), + self.listen_for_board_info(main_board), + ) + .await + { + Err(tokio::time::error::Elapsed { .. }) => { + warn!("Timeout waiting on main board info"); + is_err = true; + } + Ok(()) => { + debug!("Got main board info"); + } + } + + if is_err { + Ok(self) + } else { + Err(self) + } + } + + /// Mutates `self` while listening for board info messages. + /// + /// Does not terminate until all board info is populated. + async fn listen_for_board_info(&mut self, main_board: &mut MainBoard) { let mut battery_status = BatteryStatus { percentage: None, voltage_mv: None, is_charging: None, }; + loop { - if let Ok(Some(McuPayload::FromMain(main_mcu_payload))) = - tokio::time::timeout(timeout, main_board.message_queue_rx.recv()).await - { - match main_mcu_payload { - main_messaging::mcu_to_jetson::Payload::Versions(v) => { - self.fw_versions = Some(v); - } - main_messaging::mcu_to_jetson::Payload::Hardware(h) => { - self.hw_version = Some(OrbRevision(h)); - } - main_messaging::mcu_to_jetson::Payload::BatteryCapacity(b) => { - battery_status.percentage = Some(b.percentage); - } - main_messaging::mcu_to_jetson::Payload::BatteryVoltage(b) => { - battery_status.voltage_mv = Some( - (b.battery_cell1_mv - + b.battery_cell2_mv - + b.battery_cell3_mv - + b.battery_cell4_mv) - as u32, - ); - } - main_messaging::mcu_to_jetson::Payload::BatteryIsCharging(b) => { - battery_status.is_charging = Some(b.battery_is_charging); - } - _ => {} + let Some(mcu_payload) = main_board.message_queue_rx.recv().await else { + warn!("main board queue is closed"); + return; + }; + let McuPayload::FromMain(main_mcu_payload) = mcu_payload else { + unreachable!("should always be a message from the main board") + }; + + match main_mcu_payload { + main_messaging::mcu_to_jetson::Payload::Versions(v) => { + self.fw_versions = Some(v); } - timeout = timeout.sub(now.elapsed()); - now = std::time::Instant::now(); - } else { - error!("Timeout waiting on main board info"); - return Err(self); + main_messaging::mcu_to_jetson::Payload::Hardware(h) => { + self.hw_version = Some(OrbRevision(h)); + } + main_messaging::mcu_to_jetson::Payload::BatteryCapacity(b) => { + battery_status.percentage = Some(b.percentage); + } + main_messaging::mcu_to_jetson::Payload::BatteryVoltage(b) => { + battery_status.voltage_mv = Some( + (b.battery_cell1_mv + + b.battery_cell2_mv + + b.battery_cell3_mv + + b.battery_cell4_mv) as u32, + ); + } + main_messaging::mcu_to_jetson::Payload::BatteryIsCharging(b) => { + battery_status.is_charging = Some(b.battery_is_charging); + } + _ => {} } if self.battery_status.is_none() @@ -441,7 +471,7 @@ impl MainBoardInfo { && self.fw_versions.is_some() && self.battery_status.is_some() { - return if is_err { Err(self) } else { Ok(self) }; + return; } } } diff --git a/mcu-util/src/orb/mod.rs b/mcu-util/src/orb/mod.rs index 35022938..3c4414c0 100644 --- a/mcu-util/src/orb/mod.rs +++ b/mcu-util/src/orb/mod.rs @@ -2,8 +2,10 @@ use std::fmt::{Display, Formatter}; use std::time::Duration; use async_trait::async_trait; -use color_eyre::eyre::Result; +use color_eyre::eyre::{Context, Result}; +use futures::FutureExt; +use orb_mcu_interface::can::CanTaskHandle; use orb_mcu_interface::orb_messages::mcu_main as main_messaging; use orb_mcu_interface::orb_messages::mcu_sec as sec_messaging; @@ -57,16 +59,22 @@ pub struct Orb { } impl Orb { - pub async fn new() -> Result { - let main_board = MainBoard::builder().build().await?; - let sec_board = SecurityBoard::builder().build().await?; + pub async fn new() -> Result<(Self, OrbTaskHandles)> { + let (main_board, main_task_handle) = MainBoard::builder().build().await?; + let (sec_board, sec_task_handle) = SecurityBoard::builder().build().await?; let info = OrbInfo::default(); - Ok(Self { - main_board, - sec_board, - info, - }) + Ok(( + Self { + main_board, + sec_board, + info, + }, + OrbTaskHandles { + main: main_task_handle, + sec: sec_task_handle, + }, + )) } pub fn borrow_mut_mcu(&mut self, mcu: crate::Mcu) -> &mut dyn Board { @@ -252,3 +260,39 @@ impl Display for OrbInfo { Ok(()) } } + +#[derive(Debug)] +pub struct BoardTaskHandles { + pub raw: CanTaskHandle, + pub isotp: CanTaskHandle, +} + +impl BoardTaskHandles { + pub async fn join(self) -> color_eyre::Result<()> { + let ((), ()) = tokio::try_join!( + self.raw.map(|e| e.wrap_err("raw can task terminated")), + self.isotp.map(|e| e.wrap_err("isotp can task terminated")), + )?; + Ok(()) + } +} + +#[derive(Debug)] +pub struct OrbTaskHandles { + pub main: BoardTaskHandles, + pub sec: BoardTaskHandles, +} + +impl OrbTaskHandles { + pub async fn join(self) -> color_eyre::Result<()> { + let ((), ()) = tokio::try_join!( + self.main + .join() + .map(|e| e.wrap_err("main board task(s) terminated")), + self.sec + .join() + .map(|e| e.wrap_err("sec board task(s) terminated")) + )?; + Ok(()) + } +} diff --git a/mcu-util/src/orb/security_board.rs b/mcu-util/src/orb/security_board.rs index 1ce62a93..074c74c1 100644 --- a/mcu-util/src/orb/security_board.rs +++ b/mcu-util/src/orb/security_board.rs @@ -1,10 +1,9 @@ use async_trait::async_trait; use color_eyre::eyre::{eyre, Result, WrapErr as _}; -use std::ops::Sub; use std::time::Duration; use tokio::sync::mpsc; use tokio::time; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, warn}; use orb_mcu_interface::can::canfd::CanRawMessaging; use orb_mcu_interface::can::isotp::{CanIsoTpMessaging, IsoTpNodeIdentifier}; @@ -17,6 +16,8 @@ use crate::orb::dfu::BlockIterator; use crate::orb::{dfu, BatteryStatus}; use crate::orb::{Board, OrbInfo}; +use super::BoardTaskHandles; + const REBOOT_DELAY: u32 = 3; pub struct SecurityBoard { @@ -41,15 +42,15 @@ impl SecurityBoardBuilder { } } - pub async fn build(self) -> Result { - let mut canfd_iface = CanRawMessaging::new( + pub async fn build(self) -> Result<(SecurityBoard, BoardTaskHandles)> { + let (mut canfd_iface, raw_can_task) = CanRawMessaging::new( String::from("can0"), Device::Security, self.message_queue_tx.clone(), ) .wrap_err("Failed to create CanRawMessaging for SecurityBoard")?; - let isotp_iface = CanIsoTpMessaging::new( + let (isotp_iface, isotp_can_task) = CanIsoTpMessaging::new( String::from("can0"), IsoTpNodeIdentifier::JetsonApp7, IsoTpNodeIdentifier::SecurityMcu, @@ -70,11 +71,17 @@ impl SecurityBoardBuilder { )) .await?; - Ok(SecurityBoard { - canfd_iface, - isotp_iface, - message_queue_rx: self.message_queue_rx, - }) + Ok(( + SecurityBoard { + canfd_iface, + isotp_iface, + message_queue_rx: self.message_queue_rx, + }, + BoardTaskHandles { + raw: raw_can_task, + isotp: isotp_can_task, + }, + )) } } @@ -397,34 +404,57 @@ impl SecurityBoardInfo { error!("Failed to fetch battery status: {:?}", e); } - let mut now = std::time::Instant::now(); - let mut timeout = std::time::Duration::from_secs(2); + match tokio::time::timeout( + Duration::from_secs(2), + self.listen_for_board_info(sec_board), + ) + .await + { + Err(tokio::time::error::Elapsed { .. }) => { + warn!("Timeout waiting on security board info"); + is_err = true; + } + Ok(()) => { + debug!("Got security board info"); + } + } + + if is_err { + Ok(self) + } else { + Err(self) + } + } + + /// Mutates `self` while listening for board info messages. + /// + /// Does not terminate until all board info is populated. + async fn listen_for_board_info(&mut self, sec_board: &mut SecurityBoard) { let mut battery_status = BatteryStatus { percentage: None, voltage_mv: None, is_charging: None, }; loop { - if let Ok(Some(McuPayload::FromSec(sec_mcu_payload))) = - tokio::time::timeout(timeout, sec_board.message_queue_rx.recv()).await - { - match sec_mcu_payload { - security_messaging::sec_to_jetson::Payload::Versions(v) => { - self.fw_versions = Some(v); - } - security_messaging::sec_to_jetson::Payload::BatteryStatus(b) => { - battery_status.percentage = Some(b.percentage as u32); - battery_status.voltage_mv = Some(b.voltage_mv as u32); - battery_status.is_charging = - Some(b.state == (BatteryState::Charging as i32)); - } - _ => {} + let Some(mcu_payload) = sec_board.message_queue_rx.recv().await else { + warn!("security board queue is closed"); + return; + }; + let McuPayload::FromSec(sec_mcu_payload) = mcu_payload else { + unreachable!("should always be a message from the security board") + }; + + match sec_mcu_payload { + security_messaging::sec_to_jetson::Payload::Versions(v) => { + self.fw_versions = Some(v); } - timeout = timeout.sub(now.elapsed()); - now = std::time::Instant::now(); - } else { - error!("Timeout waiting on security board info"); - return Err(self); + security_messaging::sec_to_jetson::Payload::BatteryStatus(b) => { + battery_status.percentage = Some(b.percentage as u32); + battery_status.voltage_mv = Some(b.voltage_mv as u32); + battery_status.is_charging = + Some(b.state == (BatteryState::Charging as i32)); + } + _ => {} } if self.battery_status.is_none() @@ -437,7 +467,7 @@ impl SecurityBoardInfo { // check that all fields are set in BoardInfo if self.fw_versions.is_some() && self.battery_status.is_some() { - return if is_err { Err(self) } else { Ok(self) }; + return; } } }