Skip to content

Commit

Permalink
Add packet ack configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
madninja committed Dec 6, 2023
1 parent 2eeddb5 commit 2a108ad
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 73 deletions.
23 changes: 12 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

83 changes: 55 additions & 28 deletions src/message_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@ impl<T: PartialEq + MessageHash> Deref for CacheMessage<T> {
}
}

pub enum PopFront<'a> {
Duration(Duration),
Ack(&'a [u8]),
}

impl<T: PartialEq + MessageHash> MessageCache<T> {
pub fn new(max_messages: u16) -> Self {
let waiting = VecDeque::new();
Expand All @@ -69,15 +64,18 @@ impl<T: PartialEq + MessageHash> MessageCache<T> {

/// Returns the index of the first matching message in the cache or None if
/// not present
pub fn index_of(&self, message: &T) -> Option<usize> {
self.cache.iter().position(|m| m.message == *message)
pub fn index_of<P>(&self, pred: P) -> Option<usize>
where
P: Fn(&T) -> bool,
{
self.cache.iter().position(|entry| pred(&entry.message))
}

/// Promotes the given message to the back of the queue, effectively
/// recreating an LRU cache. Returns true if a cache hit was found
pub fn tag(&mut self, message: T, received: Instant) -> bool {
let result = self
.index_of(&message)
.index_of(|msg| *msg == message)
.and_then(|index| self.cache.remove(index))
.is_some();
self.push_back(message, received);
Expand All @@ -100,30 +98,30 @@ impl<T: PartialEq + MessageHash> MessageCache<T> {
self.cache.push_front(cache_message);
}

pub fn pop_front(&mut self, args: PopFront) -> (usize, Option<CacheMessage<T>>) {
pub fn pop_front(&mut self, duration: Duration) -> (usize, Option<CacheMessage<T>>) {
let mut dropped = 0;
let mut front = None;
while let Some(msg) = self.cache.pop_front() {
match args {
PopFront::Duration(duration) => {
if msg.hold_time() <= duration {
front = Some(msg);
break;
}
}
PopFront::Ack(ack) => {
if msg.hash() == ack {
front = self.cache.pop_front();
break;
}
}
};
// held for too long or acked, count as dropped and move on
if msg.hold_time() <= duration {
front = Some(msg);
break;
}
dropped += 1;
}
(dropped, front)
}

/// Removes all items from the cache up to and including the given index.
///
/// The index is bounds checked and an index beyond the length of the cache
/// is ignored
pub fn remove_to(&mut self, index: usize) {
if index >= self.len() {
return;
}
self.cache = self.cache.split_off(index + 1);
}

/// Returns a reference to the first (and oldest/first to be removed)
/// message in the cache
pub fn peek_front(&self) -> Option<&CacheMessage<T>> {
Expand All @@ -141,7 +139,8 @@ impl<T: PartialEq + MessageHash> MessageCache<T> {

#[cfg(test)]
mod test {
use super::MessageCache;
use super::{Instant, MessageCache};
use sha2::{Digest, Sha256};

#[test]
fn test_cache_tagging() {
Expand All @@ -161,8 +160,36 @@ mod test {

// Third tag should evict the least recently used entry (2)
assert!(!cache.tag_now(vec![3]));
assert_eq!(Some(0), cache.index_of(&vec![1u8]));
assert_eq!(Some(1), cache.index_of(&vec![3u8]));
assert!(cache.index_of(&vec![2u8]).is_none());
assert_eq!(Some(0), cache.index_of(|msg| msg.as_slice() == &[1u8]));
assert_eq!(Some(1), cache.index_of(|msg| msg.as_slice() == &[3u8]));
assert!(cache.index_of(|msg| msg.as_slice() == &[2u8]).is_none());
}

#[test]
fn test_remove_to() {
let mut cache = MessageCache::<Vec<u8>>::new(5);
cache.push_back(vec![1], Instant::now());
cache.push_back(vec![2], Instant::now());
cache.push_back(vec![3], Instant::now());

let ack = Sha256::digest(vec![2]).to_vec();

// Find entry by hash as an example
let ack_index = cache.index_of(|msg| Sha256::digest(msg).to_vec() == ack);
assert_eq!(Some(1), ack_index);
// Can't find non existing
assert_eq!(None, cache.index_of(|_| false));

// remove and check inclusion of remove_to
cache.remove_to(1);
assert_eq!(1, cache.len());

// remove past last index
cache.remove_to(5);
assert_eq!(1, cache.len());

// remove last element
cache.remove_to(0);
assert!(cache.is_empty());
}
}
42 changes: 25 additions & 17 deletions src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use semtech_udp::{
push_data::{self, CRC},
CodingRate, DataRate, Modulation,
};
use sha2::{Digest, Sha256};
use std::{
convert::TryFrom,
fmt,
Expand All @@ -17,7 +18,10 @@ use std::{
};

#[derive(Debug, Clone, PartialEq)]
pub struct PacketUp(PacketRouterPacketUpV1);
pub struct PacketUp {
packet: PacketRouterPacketUpV1,
pub(crate) hash: Vec<u8>,
}

#[derive(Debug, Clone)]
pub struct PacketDown(PacketRouterPacketDownV1);
Expand All @@ -26,18 +30,19 @@ impl Deref for PacketUp {
type Target = PacketRouterPacketUpV1;

fn deref(&self) -> &Self::Target {
&self.0
&self.packet
}
}

impl From<PacketUp> for PacketRouterPacketUpV1 {
fn from(value: PacketUp) -> Self {
value.0
value.packet
}
}

impl From<&PacketUp> for PacketRouterPacketUpV1 {
fn from(value: &PacketUp) -> Self {
value.0.clone()
value.packet.clone()
}
}

Expand All @@ -51,12 +56,12 @@ impl fmt::Display for PacketUp {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_fmt(format_args!(
"@{} us, {:.2} MHz, {:?}, snr: {}, rssi: {}, len: {}",
self.0.timestamp,
self.0.frequency,
self.0.datarate(),
self.0.snr,
self.0.rssi,
self.0.payload.len()
self.packet.timestamp,
self.packet.frequency,
self.packet.datarate(),
self.packet.snr,
self.packet.rssi,
self.packet.payload.len()
))
}
}
Expand All @@ -66,15 +71,15 @@ impl TryFrom<PacketUp> for poc_lora::LoraWitnessReportReqV1 {
fn try_from(value: PacketUp) -> Result<Self> {
let report = poc_lora::LoraWitnessReportReqV1 {
data: vec![],
tmst: value.0.timestamp as u32,
tmst: value.packet.timestamp as u32,
timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(Error::from)?
.as_nanos() as u64,
signal: value.0.rssi * 10,
snr: (value.0.snr * 10.0) as i32,
frequency: value.0.frequency as u64,
datarate: value.0.datarate,
signal: value.packet.rssi * 10,
snr: (value.packet.snr * 10.0) as i32,
frequency: value.packet.frequency as u64,
datarate: value.packet.datarate,
pub_key: vec![],
signature: vec![],
};
Expand Down Expand Up @@ -106,7 +111,10 @@ impl PacketUp {
gateway: gateway.into(),
signature: vec![],
};
Ok(Self(packet))
Ok(Self {
hash: Sha256::digest(&packet.payload).to_vec(),
packet,
})
}

pub fn is_potential_beacon(&self) -> bool {
Expand All @@ -132,7 +140,7 @@ impl PacketUp {
}

pub fn payload(&self) -> &[u8] {
&self.0.payload
&self.packet.payload
}

pub fn parse_header(payload: &[u8]) -> Result<MHDR> {
Expand Down
Loading

0 comments on commit 2a108ad

Please sign in to comment.