Skip to content

Commit

Permalink
Don’t reconnect immediately on router failure
Browse files Browse the repository at this point in the history
This change will recalculate the reconnect timer based on the fact that there was an error during the router.

If an error happens during a fresh connection the reconnect timer will be set to a short time using `next_connect` which will not try to connect immediately. The subseequent timer firing will end up reconnecting the router and sending the messages
  • Loading branch information
madninja committed Apr 7, 2023
1 parent d5f843b commit c792aa9
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 34 deletions.
13 changes: 11 additions & 2 deletions src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,19 @@ impl Gateway {

async fn handle_uplink(&mut self, packet: PacketUp, received: Instant) {
if self.region_params.is_unknown() {
info!(downlink_mac = %self.downlink_mac, uplink = %packet, "ignored uplink, no region");
info!(
downlink_mac = %self.downlink_mac,
uplink = %packet,
region = %self.region_params,
"ignored uplink");
return;
}
info!(downlink_mac = %self.downlink_mac, uplink = %packet, "received uplink");
info!(
downlink_mac =
%self.downlink_mac,
uplink = %packet,
region = %self.region_params,
"received uplink");
self.uplinks.uplink(packet, received).await;
}

Expand Down
30 changes: 24 additions & 6 deletions src/message_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,19 @@ pub struct MessageCache<T> {
max_messages: u16,
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct CacheMessage<T> {
received: Instant,
message: T,
}

impl<T> CacheMessage<T> {
pub fn hold_time(&self) -> Duration {
self.received.elapsed()
pub fn new(message: T, received: Instant) -> Self {
Self { message, received }
}

pub fn into_inner(self) -> T {
self.message
pub fn hold_time(&self) -> Duration {
self.received.elapsed()
}
}

Expand All @@ -42,13 +42,31 @@ impl<T> MessageCache<T> {
}
}

/// Pushes a given at the end of the cache. The message is tagged with the
/// given received time which can be used to calculate hold time of a
/// packet.
///
/// Pushing a packet onto the back of a full cache will cause the oldest
/// (first) message in the cache to be dropped.
pub fn push_back(&mut self, message: T, received: Instant) {
self.waiting.push_back(CacheMessage { message, received });
self.waiting.push_back(CacheMessage::new(message, received));
if self.len() > self.max_messages as usize {
self.waiting.pop_front();
}
}

/// Pushes a CacheMessage back on the front of the queue. This is useful to
/// push a packet back at the front after a failed delivery attempt.
///
/// Pushing to the front of a full cache will cause the given message to not
/// be added.
pub fn push_front(&mut self, cache_message: CacheMessage<T>) {
if self.len() > self.max_messages as usize {
return;
}
self.waiting.push_front(cache_message);
}

pub fn pop_front(&mut self, duration: Duration) -> (usize, Option<CacheMessage<T>>) {
let mut dropped = 0;
let mut front = None;
Expand Down
5 changes: 5 additions & 0 deletions src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ impl From<PacketUp> for PacketRouterPacketUpV1 {
value.0
}
}
impl From<&PacketUp> for PacketRouterPacketUpV1 {
fn from(value: &PacketUp) -> Self {
value.0.clone()
}
}

impl From<PacketRouterPacketDownV1> for PacketDown {
fn from(value: PacketRouterPacketDownV1) -> Self {
Expand Down
70 changes: 44 additions & 26 deletions src/packet_router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ impl PacketRouter {
},
message = self.messages.recv() => match message {
Some(Message::Uplink{packet, received}) =>
self.handle_uplink(packet, received).await,
if self.handle_uplink(packet, received).await.is_err() {
reconnect_sleep = self.next_connect(&reconnect_backoff, true);
},
Some(Message::Status(tx_resp)) => {
let status = RouterStatus {
uri: self.service.uri.clone(),
Expand All @@ -116,73 +118,89 @@ impl PacketRouter {
},
downlink = self.service.recv() => match downlink {
Ok(Some(message)) => self.handle_downlink(message).await,
Ok(None) => warn!("router disconnected"),
Ok(None) => {
warn!("router disconnected");
reconnect_sleep = self.next_connect(&reconnect_backoff, true)
},
Err(err) => {
warn!("router error {:?}", err);
reconnect_sleep = self.handle_reconnect(&reconnect_backoff).await;
warn!(?err, "router error");
reconnect_sleep = self.next_connect(&reconnect_backoff, true)
},
}
}
}
}

fn next_connect(&mut self, reconnect_backoff: &Backoff, inc_retry: bool) -> Instant {
if inc_retry {
if self.reconnect_retry == RECONNECT_BACKOFF_RETRIES {
self.reconnect_retry = 0;
} else {
self.reconnect_retry += 1;
}
}
let backoff = reconnect_backoff
.next(self.reconnect_retry)
.unwrap_or(RECONNECT_BACKOFF_MAX_WAIT);
info!(seconds = backoff.as_secs(), "next connect");
Instant::now() + backoff
}

async fn handle_reconnect(&mut self, reconnect_backoff: &Backoff) -> Instant {
info!("connecting");
match self.service.reconnect().await {
let inc_retry = match self.service.reconnect().await {
Ok(_) => {
info!("connected");
self.reconnect_retry = RECONNECT_BACKOFF_RETRIES;
self.send_waiting_packets().await
self.send_waiting_packets().await.is_err()
}
Err(err) => {
warn!(%err, "failed to connect");
if self.reconnect_retry == RECONNECT_BACKOFF_RETRIES {
self.reconnect_retry = 0;
} else {
self.reconnect_retry += 1;
}
true
}
}
let backoff = reconnect_backoff
.next(self.reconnect_retry)
.unwrap_or(RECONNECT_BACKOFF_MAX_WAIT);
info!(?backoff, "next reconnect");
Instant::now() + backoff
};
self.next_connect(reconnect_backoff, inc_retry)
}

async fn handle_uplink(&mut self, uplink: PacketUp, received: StdInstant) {
async fn handle_uplink(&mut self, uplink: PacketUp, received: StdInstant) -> Result {
self.store.push_back(uplink, received);
if self.service.is_connected() {
self.send_waiting_packets().await;
self.send_waiting_packets().await?;
}
Ok(())
}

async fn handle_downlink(&mut self, message: PacketRouterPacketDownV1) {
self.transmit.downlink(message.into()).await;
}

async fn send_waiting_packets(&mut self) {
async fn send_waiting_packets(&mut self) -> Result {
while let (removed, Some(packet)) = self.store.pop_front(STORE_GC_INTERVAL) {
if removed > 0 {
info!("discarded {removed} queued packets");
info!(removed, "discarded queued packets");
}
if let Err(err) = self.send_packet(packet).await {
warn!(%err, "failed to send uplink")
if let Err(err) = self.send_packet(&packet).await {
warn!(%err, "failed to send uplink");
self.store.push_front(packet);
return Err(err);
}
}
Ok(())
}

pub async fn mk_uplink(
&self,
packet: CacheMessage<PacketUp>,
packet: &CacheMessage<PacketUp>,
) -> Result<PacketRouterPacketUpV1> {
let mut uplink: PacketRouterPacketUpV1 = packet.into_inner().into();
use std::ops::Deref;
let mut uplink: PacketRouterPacketUpV1 = packet.deref().into();
uplink.hold_time = packet.hold_time().as_millis() as u64;
uplink.gateway = self.keypair.public_key().into();
uplink.signature = uplink.sign(self.keypair.clone()).await?;
Ok(uplink)
}

async fn send_packet(&mut self, packet: CacheMessage<PacketUp>) -> Result {
async fn send_packet(&mut self, packet: &CacheMessage<PacketUp>) -> Result {
debug!(packet_hash = packet.hash().to_b64(), "sending packet");

let uplink = self.mk_uplink(packet).await?;
Expand Down

0 comments on commit c792aa9

Please sign in to comment.