diff --git a/src/gateway.rs b/src/gateway.rs index 9be0bef3..80b5dd6b 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -164,10 +164,18 @@ impl Gateway { async fn handle_uplink(&mut self, packet: PacketUp, received: Instant) { if self.region_params.is_unknown() { - info!(downlink_mac = %self.downlink_mac, uplink = %packet, "ignored uplink, no region"); + info!( + downlink_mac = %self.downlink_mac, + uplink = %packet, + region = %self.region_params, + "ignored uplink"); return; } - info!(downlink_mac = %self.downlink_mac, uplink = %packet, "received uplink"); + info!( + downlink_mac = %self.downlink_mac, + uplink = %packet, + region = %self.region_params, + "received uplink"); self.uplinks.uplink(packet, received).await; } diff --git a/src/message_cache.rs b/src/message_cache.rs index 7e3a0db5..0e875027 100644 --- a/src/message_cache.rs +++ b/src/message_cache.rs @@ -9,19 +9,19 @@ pub struct MessageCache { max_messages: u16, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct CacheMessage { received: Instant, message: T, } impl CacheMessage { - pub fn hold_time(&self) -> Duration { - self.received.elapsed() + pub fn new(message: T, received: Instant) -> Self { + Self { message, received } } - pub fn into_inner(self) -> T { - self.message + pub fn hold_time(&self) -> Duration { + self.received.elapsed() } } @@ -42,13 +42,31 @@ impl MessageCache { } } + /// Pushes a given at the end of the cache. The message is tagged with the + /// given received time which can be used to calculate hold time of a + /// packet. + /// + /// Pushing a packet onto the back of a full cache will cause the oldest + /// (first) message in the cache to be dropped. pub fn push_back(&mut self, message: T, received: Instant) { - self.waiting.push_back(CacheMessage { message, received }); + self.waiting.push_back(CacheMessage::new(message, received)); if self.len() > self.max_messages as usize { self.waiting.pop_front(); } } + /// Pushes a CacheMessage back on the front of the queue. This is useful to + /// push a packet back at the front after a failed delivery attempt. + /// + /// Pushing to the front of a full cache will cause the given message to not + /// be added. + pub fn push_front(&mut self, cache_message: CacheMessage) { + if self.len() > self.max_messages as usize { + return; + } + self.waiting.push_front(cache_message); + } + pub fn pop_front(&mut self, duration: Duration) -> (usize, Option>) { let mut dropped = 0; let mut front = None; diff --git a/src/packet.rs b/src/packet.rs index 4846acbc..f9a10778 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -36,6 +36,11 @@ impl From for PacketRouterPacketUpV1 { value.0 } } +impl From<&PacketUp> for PacketRouterPacketUpV1 { + fn from(value: &PacketUp) -> Self { + value.0.clone() + } +} impl From for PacketDown { fn from(value: PacketRouterPacketDownV1) -> Self { diff --git a/src/packet_router/mod.rs b/src/packet_router/mod.rs index 2edd2233..27b98b82 100644 --- a/src/packet_router/mod.rs +++ b/src/packet_router/mod.rs @@ -101,7 +101,10 @@ impl PacketRouter { }, message = self.messages.recv() => match message { Some(Message::Uplink{packet, received}) => - self.handle_uplink(packet, received).await, + if self.handle_uplink(packet, received).await.is_err() { + warn!("router disconnected"); + reconnect_sleep = self.next_connect(&reconnect_backoff, true); + }, Some(Message::Status(tx_resp)) => { let status = RouterStatus { uri: self.service.uri.clone(), @@ -116,67 +119,89 @@ impl PacketRouter { }, downlink = self.service.recv() => match downlink { Ok(Some(message)) => self.handle_downlink(message).await, - Ok(None) => warn!("router disconnected"), - Err(err) => warn!("router error {:?}", err), + Ok(None) => { + warn!("router disconnected"); + reconnect_sleep = self.next_connect(&reconnect_backoff, true) + }, + Err(err) => { + warn!(?err, "router error"); + reconnect_sleep = self.next_connect(&reconnect_backoff, true) + }, } } } } + fn next_connect(&mut self, reconnect_backoff: &Backoff, inc_retry: bool) -> Instant { + if inc_retry { + if self.reconnect_retry == RECONNECT_BACKOFF_RETRIES { + self.reconnect_retry = 0; + } else { + self.reconnect_retry += 1; + } + } + let backoff = reconnect_backoff + .next(self.reconnect_retry) + .unwrap_or(RECONNECT_BACKOFF_MAX_WAIT); + info!(seconds = backoff.as_secs(), "next connect"); + Instant::now() + backoff + } + async fn handle_reconnect(&mut self, reconnect_backoff: &Backoff) -> Instant { - info!("reconnecting"); - match self.service.reconnect().await { + info!("connecting"); + let inc_retry = match self.service.reconnect().await { Ok(_) => { - info!("reconnected"); + info!("connected"); self.reconnect_retry = RECONNECT_BACKOFF_RETRIES; - self.send_waiting_packets().await + self.send_waiting_packets().await.is_err() } Err(err) => { - warn!(%err, "failed to reconnect"); - if self.reconnect_retry == RECONNECT_BACKOFF_RETRIES { - self.reconnect_retry = 0; - } else { - self.reconnect_retry += 1; - } + warn!(%err, "failed to connect"); + true } - } - Instant::now() - + reconnect_backoff - .next(self.reconnect_retry) - .unwrap_or(RECONNECT_BACKOFF_MAX_WAIT) + }; + self.next_connect(reconnect_backoff, inc_retry) } - async fn handle_uplink(&mut self, uplink: PacketUp, received: StdInstant) { + async fn handle_uplink(&mut self, uplink: PacketUp, received: StdInstant) -> Result { self.store.push_back(uplink, received); - self.send_waiting_packets().await; + if self.service.is_connected() { + self.send_waiting_packets().await?; + } + Ok(()) } async fn handle_downlink(&mut self, message: PacketRouterPacketDownV1) { self.transmit.downlink(message.into()).await; } - async fn send_waiting_packets(&mut self) { + async fn send_waiting_packets(&mut self) -> Result { while let (removed, Some(packet)) = self.store.pop_front(STORE_GC_INTERVAL) { if removed > 0 { - info!("discarded {removed} queued packets"); + info!(removed, "discarded queued packets"); } - if let Err(err) = self.send_packet(packet).await { - warn!(%err, "failed to send uplink") + if let Err(err) = self.send_packet(&packet).await { + warn!(%err, "failed to send uplink"); + self.store.push_front(packet); + return Err(err); } } + Ok(()) } pub async fn mk_uplink( &self, - packet: CacheMessage, + packet: &CacheMessage, ) -> Result { - let mut uplink: PacketRouterPacketUpV1 = packet.into_inner().into(); + use std::ops::Deref; + let mut uplink: PacketRouterPacketUpV1 = packet.deref().into(); + uplink.hold_time = packet.hold_time().as_millis() as u64; uplink.gateway = self.keypair.public_key().into(); uplink.signature = uplink.sign(self.keypair.clone()).await?; Ok(uplink) } - async fn send_packet(&mut self, packet: CacheMessage) -> Result { + async fn send_packet(&mut self, packet: &CacheMessage) -> Result { debug!(packet_hash = packet.hash().to_b64(), "sending packet"); let uplink = self.mk_uplink(packet).await?;