Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not attempt to reconnect on every packet received #406

Merged
merged 2 commits into from
Apr 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,18 @@ impl Gateway {

async fn handle_uplink(&mut self, packet: PacketUp, received: Instant) {
if self.region_params.is_unknown() {
info!(downlink_mac = %self.downlink_mac, uplink = %packet, "ignored uplink, no region");
info!(
downlink_mac = %self.downlink_mac,
uplink = %packet,
region = %self.region_params,
"ignored uplink");
return;
}
info!(downlink_mac = %self.downlink_mac, uplink = %packet, "received uplink");
info!(
downlink_mac = %self.downlink_mac,
uplink = %packet,
region = %self.region_params,
"received uplink");
self.uplinks.uplink(packet, received).await;
}

Expand Down
30 changes: 24 additions & 6 deletions src/message_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,19 @@ pub struct MessageCache<T> {
max_messages: u16,
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct CacheMessage<T> {
received: Instant,
message: T,
}

impl<T> CacheMessage<T> {
pub fn hold_time(&self) -> Duration {
self.received.elapsed()
pub fn new(message: T, received: Instant) -> Self {
Self { message, received }
}

pub fn into_inner(self) -> T {
self.message
pub fn hold_time(&self) -> Duration {
self.received.elapsed()
}
}

Expand All @@ -42,13 +42,31 @@ impl<T> MessageCache<T> {
}
}

/// Pushes a given at the end of the cache. The message is tagged with the
/// given received time which can be used to calculate hold time of a
/// packet.
///
/// Pushing a packet onto the back of a full cache will cause the oldest
/// (first) message in the cache to be dropped.
pub fn push_back(&mut self, message: T, received: Instant) {
self.waiting.push_back(CacheMessage { message, received });
self.waiting.push_back(CacheMessage::new(message, received));
if self.len() > self.max_messages as usize {
self.waiting.pop_front();
}
}

/// Pushes a CacheMessage back on the front of the queue. This is useful to
/// push a packet back at the front after a failed delivery attempt.
///
/// Pushing to the front of a full cache will cause the given message to not
/// be added.
pub fn push_front(&mut self, cache_message: CacheMessage<T>) {
if self.len() > self.max_messages as usize {
return;
}
self.waiting.push_front(cache_message);
}

pub fn pop_front(&mut self, duration: Duration) -> (usize, Option<CacheMessage<T>>) {
let mut dropped = 0;
let mut front = None;
Expand Down
5 changes: 5 additions & 0 deletions src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ impl From<PacketUp> for PacketRouterPacketUpV1 {
value.0
}
}
impl From<&PacketUp> for PacketRouterPacketUpV1 {
fn from(value: &PacketUp) -> Self {
value.0.clone()
}
}

impl From<PacketRouterPacketDownV1> for PacketDown {
fn from(value: PacketRouterPacketDownV1) -> Self {
Expand Down
79 changes: 52 additions & 27 deletions src/packet_router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,10 @@ impl PacketRouter {
},
message = self.messages.recv() => match message {
Some(Message::Uplink{packet, received}) =>
self.handle_uplink(packet, received).await,
if self.handle_uplink(packet, received).await.is_err() {
warn!("router disconnected");
reconnect_sleep = self.next_connect(&reconnect_backoff, true);
},
Some(Message::Status(tx_resp)) => {
let status = RouterStatus {
uri: self.service.uri.clone(),
Expand All @@ -116,67 +119,89 @@ impl PacketRouter {
},
downlink = self.service.recv() => match downlink {
Ok(Some(message)) => self.handle_downlink(message).await,
Ok(None) => warn!("router disconnected"),
Err(err) => warn!("router error {:?}", err),
Ok(None) => {
warn!("router disconnected");
reconnect_sleep = self.next_connect(&reconnect_backoff, true)
},
Err(err) => {
warn!(?err, "router error");
reconnect_sleep = self.next_connect(&reconnect_backoff, true)
},
}
}
}
}

fn next_connect(&mut self, reconnect_backoff: &Backoff, inc_retry: bool) -> Instant {
if inc_retry {
if self.reconnect_retry == RECONNECT_BACKOFF_RETRIES {
self.reconnect_retry = 0;
} else {
self.reconnect_retry += 1;
}
}
let backoff = reconnect_backoff
.next(self.reconnect_retry)
.unwrap_or(RECONNECT_BACKOFF_MAX_WAIT);
info!(seconds = backoff.as_secs(), "next connect");
Instant::now() + backoff
}

async fn handle_reconnect(&mut self, reconnect_backoff: &Backoff) -> Instant {
info!("reconnecting");
match self.service.reconnect().await {
info!("connecting");
let inc_retry = match self.service.reconnect().await {
Ok(_) => {
info!("reconnected");
info!("connected");
self.reconnect_retry = RECONNECT_BACKOFF_RETRIES;
self.send_waiting_packets().await
self.send_waiting_packets().await.is_err()
}
Err(err) => {
warn!(%err, "failed to reconnect");
if self.reconnect_retry == RECONNECT_BACKOFF_RETRIES {
self.reconnect_retry = 0;
} else {
self.reconnect_retry += 1;
}
warn!(%err, "failed to connect");
true
}
}
Instant::now()
+ reconnect_backoff
.next(self.reconnect_retry)
.unwrap_or(RECONNECT_BACKOFF_MAX_WAIT)
};
self.next_connect(reconnect_backoff, inc_retry)
}

async fn handle_uplink(&mut self, uplink: PacketUp, received: StdInstant) {
async fn handle_uplink(&mut self, uplink: PacketUp, received: StdInstant) -> Result {
self.store.push_back(uplink, received);
self.send_waiting_packets().await;
if self.service.is_connected() {
self.send_waiting_packets().await?;
}
Ok(())
}

async fn handle_downlink(&mut self, message: PacketRouterPacketDownV1) {
self.transmit.downlink(message.into()).await;
}

async fn send_waiting_packets(&mut self) {
async fn send_waiting_packets(&mut self) -> Result {
while let (removed, Some(packet)) = self.store.pop_front(STORE_GC_INTERVAL) {
if removed > 0 {
info!("discarded {removed} queued packets");
info!(removed, "discarded queued packets");
}
if let Err(err) = self.send_packet(packet).await {
warn!(%err, "failed to send uplink")
if let Err(err) = self.send_packet(&packet).await {
warn!(%err, "failed to send uplink");
self.store.push_front(packet);
return Err(err);
}
}
Ok(())
}

pub async fn mk_uplink(
&self,
packet: CacheMessage<PacketUp>,
packet: &CacheMessage<PacketUp>,
) -> Result<PacketRouterPacketUpV1> {
let mut uplink: PacketRouterPacketUpV1 = packet.into_inner().into();
use std::ops::Deref;
let mut uplink: PacketRouterPacketUpV1 = packet.deref().into();
uplink.hold_time = packet.hold_time().as_millis() as u64;
uplink.gateway = self.keypair.public_key().into();
uplink.signature = uplink.sign(self.keypair.clone()).await?;
Ok(uplink)
}

async fn send_packet(&mut self, packet: CacheMessage<PacketUp>) -> Result {
async fn send_packet(&mut self, packet: &CacheMessage<PacketUp>) -> Result {
debug!(packet_hash = packet.hash().to_b64(), "sending packet");

let uplink = self.mk_uplink(packet).await?;
Expand Down