Skip to content

Commit

Permalink
Add packet ack configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
madninja committed Dec 6, 2023
1 parent 2eeddb5 commit 1937bc0
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 62 deletions.
83 changes: 55 additions & 28 deletions src/message_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@ impl<T: PartialEq + MessageHash> Deref for CacheMessage<T> {
}
}

pub enum PopFront<'a> {
Duration(Duration),
Ack(&'a [u8]),
}

impl<T: PartialEq + MessageHash> MessageCache<T> {
pub fn new(max_messages: u16) -> Self {
let waiting = VecDeque::new();
Expand All @@ -69,15 +64,18 @@ impl<T: PartialEq + MessageHash> MessageCache<T> {

/// Returns the index of the first matching message in the cache or None if
/// not present
pub fn index_of(&self, message: &T) -> Option<usize> {
self.cache.iter().position(|m| m.message == *message)
pub fn index_of<P>(&self, pred: P) -> Option<usize>
where
P: Fn(&T) -> bool,
{
self.cache.iter().position(|entry| pred(&entry.message))
}

/// Promotes the given message to the back of the queue, effectively
/// recreating an LRU cache. Returns true if a cache hit was found
pub fn tag(&mut self, message: T, received: Instant) -> bool {
let result = self
.index_of(&message)
.index_of(|msg| *msg == message)
.and_then(|index| self.cache.remove(index))
.is_some();
self.push_back(message, received);
Expand All @@ -100,30 +98,30 @@ impl<T: PartialEq + MessageHash> MessageCache<T> {
self.cache.push_front(cache_message);
}

pub fn pop_front(&mut self, args: PopFront) -> (usize, Option<CacheMessage<T>>) {
pub fn pop_front(&mut self, duration: Duration) -> (usize, Option<CacheMessage<T>>) {
let mut dropped = 0;
let mut front = None;
while let Some(msg) = self.cache.pop_front() {
match args {
PopFront::Duration(duration) => {
if msg.hold_time() <= duration {
front = Some(msg);
break;
}
}
PopFront::Ack(ack) => {
if msg.hash() == ack {
front = self.cache.pop_front();
break;
}
}
};
// held for too long or acked, count as dropped and move on
if msg.hold_time() <= duration {
front = Some(msg);
break;
}
dropped += 1;
}
(dropped, front)
}

/// Removes all items from the cache up to and including the given index.
///
/// The index is bounds checked and an index beyond the length of the cache
/// is ignored
pub fn remove_to(&mut self, index: usize) {
if index >= self.len() {
return;
}
self.cache = self.cache.split_off(index + 1);
}

/// Returns a reference to the first (and oldest/first to be removed)
/// message in the cache
pub fn peek_front(&self) -> Option<&CacheMessage<T>> {
Expand All @@ -141,7 +139,8 @@ impl<T: PartialEq + MessageHash> MessageCache<T> {

#[cfg(test)]
mod test {
use super::MessageCache;
use super::{Instant, MessageCache};
use sha2::{Digest, Sha256};

#[test]
fn test_cache_tagging() {
Expand All @@ -161,8 +160,36 @@ mod test {

// Third tag should evict the least recently used entry (2)
assert!(!cache.tag_now(vec![3]));
assert_eq!(Some(0), cache.index_of(&vec![1u8]));
assert_eq!(Some(1), cache.index_of(&vec![3u8]));
assert!(cache.index_of(&vec![2u8]).is_none());
assert_eq!(Some(0), cache.index_of(|msg| msg.as_slice() == &[1u8]));
assert_eq!(Some(1), cache.index_of(|msg| msg.as_slice() == &[3u8]));
assert!(cache.index_of(|msg| msg.as_slice() == &[2u8]).is_none());
}

#[test]
fn test_remove_to() {
let mut cache = MessageCache::<Vec<u8>>::new(5);
cache.push_back(vec![1], Instant::now());
cache.push_back(vec![2], Instant::now());
cache.push_back(vec![3], Instant::now());

let ack = Sha256::digest(vec![2]).to_vec();

// Find entry by hash as an example
let ack_index = cache.index_of(|msg| Sha256::digest(msg).to_vec() == ack);
assert_eq!(Some(1), ack_index);
// Can't find non existing
assert_eq!(None, cache.index_of(|_| false));

// remove and check inclusion of remove_to
cache.remove_to(1);
assert_eq!(1, cache.len());

// remove past last index
cache.remove_to(5);
assert_eq!(1, cache.len());

// remove last element
cache.remove_to(0);
assert!(cache.is_empty());
}
}
42 changes: 25 additions & 17 deletions src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use semtech_udp::{
push_data::{self, CRC},
CodingRate, DataRate, Modulation,
};
use sha2::{Digest, Sha256};
use std::{
convert::TryFrom,
fmt,
Expand All @@ -17,7 +18,10 @@ use std::{
};

#[derive(Debug, Clone, PartialEq)]
pub struct PacketUp(PacketRouterPacketUpV1);
pub struct PacketUp {
packet: PacketRouterPacketUpV1,
pub(crate) hash: Vec<u8>,
}

#[derive(Debug, Clone)]
pub struct PacketDown(PacketRouterPacketDownV1);
Expand All @@ -26,18 +30,19 @@ impl Deref for PacketUp {
type Target = PacketRouterPacketUpV1;

fn deref(&self) -> &Self::Target {
&self.0
&self.packet
}
}

impl From<PacketUp> for PacketRouterPacketUpV1 {
fn from(value: PacketUp) -> Self {
value.0
value.packet
}
}

impl From<&PacketUp> for PacketRouterPacketUpV1 {
fn from(value: &PacketUp) -> Self {
value.0.clone()
value.packet.clone()
}
}

Expand All @@ -51,12 +56,12 @@ impl fmt::Display for PacketUp {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_fmt(format_args!(
"@{} us, {:.2} MHz, {:?}, snr: {}, rssi: {}, len: {}",
self.0.timestamp,
self.0.frequency,
self.0.datarate(),
self.0.snr,
self.0.rssi,
self.0.payload.len()
self.packet.timestamp,
self.packet.frequency,
self.packet.datarate(),
self.packet.snr,
self.packet.rssi,
self.packet.payload.len()
))
}
}
Expand All @@ -66,15 +71,15 @@ impl TryFrom<PacketUp> for poc_lora::LoraWitnessReportReqV1 {
fn try_from(value: PacketUp) -> Result<Self> {
let report = poc_lora::LoraWitnessReportReqV1 {
data: vec![],
tmst: value.0.timestamp as u32,
tmst: value.packet.timestamp as u32,
timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(Error::from)?
.as_nanos() as u64,
signal: value.0.rssi * 10,
snr: (value.0.snr * 10.0) as i32,
frequency: value.0.frequency as u64,
datarate: value.0.datarate,
signal: value.packet.rssi * 10,
snr: (value.packet.snr * 10.0) as i32,
frequency: value.packet.frequency as u64,
datarate: value.packet.datarate,
pub_key: vec![],
signature: vec![],
};
Expand Down Expand Up @@ -106,7 +111,10 @@ impl PacketUp {
gateway: gateway.into(),
signature: vec![],
};
Ok(Self(packet))
Ok(Self {
hash: Sha256::digest(&packet.payload).to_vec(),
packet,
})
}

pub fn is_potential_beacon(&self) -> bool {
Expand All @@ -132,7 +140,7 @@ impl PacketUp {
}

pub fn payload(&self) -> &[u8] {
&self.0.payload
&self.packet.payload
}

pub fn parse_header(payload: &[u8]) -> Result<MHDR> {
Expand Down
43 changes: 33 additions & 10 deletions src/packet_router/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
gateway,
message_cache::{CacheMessage, MessageCache, MessageHash, PopFront},
service::{packet_router::PacketRouterService, Reconnect},
message_cache::{CacheMessage, MessageCache, MessageHash},
service::{packet_router::PacketRouterService, AckTimer, Reconnect},
sync, Base64, PacketUp, PublicKey, Result, Settings,
};
use futures::TryFutureExt;
Expand All @@ -13,7 +13,6 @@ use serde::Serialize;
use sha2::{Digest, Sha256};
use std::{ops::Deref, time::Instant as StdInstant};
use tokio::time::Duration;

use tracing::{debug, info, warn};

const STORE_GC_INTERVAL: Duration = Duration::from_secs(60);
Expand Down Expand Up @@ -57,6 +56,7 @@ pub struct PacketRouter {
transmit: gateway::MessageSender,
service: PacketRouterService,
reconnect: Reconnect,
ack_timer: AckTimer,
store: MessageCache<PacketUp>,
}

Expand All @@ -73,16 +73,21 @@ impl PacketRouter {
transmit: gateway::MessageSender,
) -> Self {
let router_settings = &settings.router;
let service =
PacketRouterService::new(router_settings.uri.clone(), settings.keypair.clone());
let service = PacketRouterService::new(
router_settings.uri.clone(),
router_settings.ack_timeout(),
settings.keypair.clone(),
);
let store = MessageCache::new(router_settings.queue);
let reconnect = Reconnect::default();
let ack_timer = AckTimer::new(router_settings.ack_timeout());
Self {
service,
transmit,
messages,
store,
reconnect,
ack_timer,
}
}

Expand All @@ -102,6 +107,7 @@ impl PacketRouter {
self.service.disconnect();
warn!("router disconnected");
self.reconnect.update_next_time(true);
self.ack_timer.update_next_time(false);
},
Some(Message::Status(tx_resp)) => {
let status = RouterStatus {
Expand All @@ -116,6 +122,13 @@ impl PacketRouter {
_ = self.reconnect.wait() => {
let reconnect_result = self.handle_reconnect().await;
self.reconnect.update_next_time(reconnect_result.is_err());
self.ack_timer.update_next_time(reconnect_result.is_ok());
},
_ = self.ack_timer.wait() => {
warn!("no packet acks received");
let reconnect_result = self.handle_reconnect().await;
self.reconnect.update_next_time(reconnect_result.is_err());
self.ack_timer.update_next_time(reconnect_result.is_ok());
},
router_message = self.service.recv() => match router_message {
Ok(envelope_down_v1::Data::Packet(message)) => self.handle_downlink(message).await,
Expand All @@ -130,11 +143,16 @@ impl PacketRouter {
self.service.disconnect();
}
self.reconnect.update_next_time(session_result.is_err());
self.ack_timer.update_next_time(session_result.is_ok());
},
Ok(envelope_down_v1::Data::PacketAck(message)) => {
self.handle_packet_ack(message).await;
self.ack_timer.update_next_time(true);
},
Ok(envelope_down_v1::Data::PacketAck(message)) => self.handle_packet_ack(message).await,
Err(err) => {
warn!(?err, "router error");
self.reconnect.update_next_time(true);
self.ack_timer.update_next_time(false);
},
}
}
Expand Down Expand Up @@ -164,7 +182,14 @@ impl PacketRouter {
}

async fn handle_packet_ack(&mut self, message: PacketRouterPacketAckV1) {
self.store.pop_front(PopFront::Ack(&message.payload_hash));
if message.payload_hash.is_empty() {
// Empty ack is just a heartbeat and is ignored
return;
}
if let Some(index) = self.store.index_of(|msg| msg.hash == message.payload_hash) {
self.store.remove_to(index);
debug!(removed = index, "removed acked packets");
}
}

async fn handle_session_offer(&mut self, message: PacketRouterSessionOfferV1) -> Result {
Expand All @@ -175,9 +200,7 @@ impl PacketRouter {
}

async fn send_waiting_packets(&mut self) -> Result {
while let (removed, Some(packet)) =
self.store.pop_front(PopFront::Duration(STORE_GC_INTERVAL))
{
while let (removed, Some(packet)) = self.store.pop_front(STORE_GC_INTERVAL) {
if removed > 0 {
info!(removed, "discarded queued packets");
}
Expand Down
32 changes: 32 additions & 0 deletions src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,35 @@ impl Reconnect {
self.next_time = Instant::now() + backoff;
}
}

pub struct AckTimer {
next_time: Instant,
timeout: Duration,
}

impl AckTimer {
pub fn new(timeout: Duration) -> Self {
Self {
next_time: Instant::now() + timeout,
timeout,
}
}

pub async fn wait(&self) {
if self.next_time >= Instant::now() {
time::sleep_until(self.next_time).await
} else {
std::future::pending().await
}
}

pub fn update_next_time(&mut self, active: bool) {
// timeout is 0 if the ack timer is not requested. Active means the
// connection is open and acks are to be expected
self.next_time = if self.timeout.as_secs() > 0 && active {
Instant::now() + self.timeout
} else {
Instant::now() - self.timeout
};
}
}
Loading

0 comments on commit 1937bc0

Please sign in to comment.