Skip to content

Commit

Permalink
Session key for packet_router (#438)
Browse files Browse the repository at this point in the history
* Session key for packet_router
* sign session_init with gateway key
* Indicate session_capable in register
* Add session key log
  • Loading branch information
madninja committed Aug 8, 2023
1 parent f4884b5 commit d7fa801
Show file tree
Hide file tree
Showing 8 changed files with 361 additions and 172 deletions.
392 changes: 247 additions & 145 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ impl TryFrom<RouterRes> for crate::packet_router::RouterStatus {
Ok(Self {
uri: http::Uri::from_str(&value.uri)?,
connected: value.connected,
session_key: helium_crypto::PublicKey::try_from(value.session_key).ok(),
})
}
}
4 changes: 4 additions & 0 deletions src/api/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ impl Api for LocalServer {
Ok(Response::new(RouterRes {
uri: router_status.uri.to_string(),
connected: router_status.connected,
session_key: router_status
.session_key
.map(|k| k.to_vec())
.unwrap_or_default(),
}))
}

Expand Down
9 changes: 6 additions & 3 deletions src/gateway.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
beaconer, packet, packet_router, region_watcher, sync, PacketDown, PacketUp, RegionParams,
Result, Settings,
beaconer, packet, packet_router, region_watcher, sync, PacketDown, PacketUp, PublicKey,
RegionParams, Result, Settings,
};
use beacon::Beacon;
use lorawan::PHYPayload;
Expand Down Expand Up @@ -60,6 +60,7 @@ impl MessageSender {
}

pub struct Gateway {
public_key: PublicKey,
messages: MessageReceiver,
uplinks: packet_router::MessageSender,
beacons: beaconer::MessageSender,
Expand All @@ -79,7 +80,9 @@ impl Gateway {
beacons: beaconer::MessageSender,
) -> Result<Self> {
let region_params = region_watcher::current_value(&region_watch);
let public_key = settings.keypair.public_key().clone();
let gateway = Gateway {
public_key,
messages,
uplinks,
beacons,
Expand Down Expand Up @@ -136,7 +139,7 @@ impl Gateway {
info!(%mac, %addr, "disconnected packet forwarder")
}
Event::PacketReceived(rxpk, _gateway_mac) => {
match PacketUp::from_rxpk(rxpk, self.region_params.region) {
match PacketUp::from_rxpk(rxpk, &self.public_key, self.region_params.region) {
Ok(packet) if packet.is_potential_beacon() => {
self.handle_potential_beacon(packet).await;
}
Expand Down
11 changes: 11 additions & 0 deletions src/keypair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,17 @@ pub fn save_to_file(keypair: &Keypair, path: &str) -> io::Result<()> {
Ok(())
}

pub fn mk_session_keypair() -> Keypair {
let keypair = helium_crypto::Keypair::generate(
KeyTag {
network: Network::MainNet,
key_type: KeyType::Ed25519,
},
&mut OsRng,
);
keypair.into()
}

macro_rules! uri_error {
($format:expr) => {
error::DecodeError::keypair_uri(format!($format))
Expand Down
6 changes: 3 additions & 3 deletions src/packet.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{error::DecodeError, Error, Region, Result};
use crate::{error::DecodeError, Error, PublicKey, Region, Result};
use helium_proto::services::{
poc_lora,
router::{PacketRouterPacketDownV1, PacketRouterPacketUpV1},
Expand Down Expand Up @@ -88,7 +88,7 @@ impl TryFrom<PacketUp> for poc_lora::LoraWitnessReportReqV1 {
}

impl PacketUp {
pub fn from_rxpk(rxpk: push_data::RxPk, region: Region) -> Result<Self> {
pub fn from_rxpk(rxpk: push_data::RxPk, gateway: &PublicKey, region: Region) -> Result<Self> {
if rxpk.get_crc_status() != &CRC::OK {
return Err(DecodeError::invalid_crc());
}
Expand All @@ -105,7 +105,7 @@ impl PacketUp {
snr: rxpk.get_snr(),
region: region.into(),
hold_time: 0,
gateway: vec![],
gateway: gateway.into(),
signature: vec![],
};
Ok(Self(packet))
Expand Down
93 changes: 81 additions & 12 deletions src/packet_router/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
use crate::{
gateway,
keypair::mk_session_keypair,
message_cache::{CacheMessage, MessageCache},
service::packet_router::PacketRouterService,
sign, sync, Base64, Keypair, PacketUp, Result, Settings,
};
use exponential_backoff::Backoff;
use futures::TryFutureExt;
use helium_proto::{
services::router::{PacketRouterPacketDownV1, PacketRouterPacketUpV1},
services::router::{
envelope_down_v1, envelope_up_v1, PacketRouterPacketDownV1, PacketRouterPacketUpV1,
PacketRouterSessionInitV1, PacketRouterSessionOfferV1,
},
Message as ProtoMessage,
};
use serde::Serialize;
use std::{sync::Arc, time::Instant as StdInstant};
use tokio::time::{self, Duration, Instant};

use tracing::{debug, info, warn};

const STORE_GC_INTERVAL: Duration = Duration::from_secs(60);
Expand All @@ -34,6 +40,7 @@ pub struct RouterStatus {
#[serde(with = "http_serde::uri")]
pub uri: http::Uri,
pub connected: bool,
pub session_key: Option<helium_crypto::PublicKey>,
}

pub type MessageSender = sync::MessageSender<Message>;
Expand All @@ -58,6 +65,7 @@ pub struct PacketRouter {
transmit: gateway::MessageSender,
service: PacketRouterService,
reconnect_retry: u32,
session_key: Option<Arc<Keypair>>,
keypair: Arc<Keypair>,
store: MessageCache<PacketUp>,
}
Expand All @@ -75,6 +83,7 @@ impl PacketRouter {
Self {
service,
keypair: settings.keypair.clone(),
session_key: None,
transmit,
messages,
store,
Expand Down Expand Up @@ -105,13 +114,15 @@ impl PacketRouter {
message = self.messages.recv() => match message {
Some(Message::Uplink{packet, received}) =>
if self.handle_uplink(packet, received).await.is_err() {
self.disconnect();
warn!("router disconnected");
reconnect_sleep = self.next_connect(&reconnect_backoff, true);
},
Some(Message::Status(tx_resp)) => {
let status = RouterStatus {
uri: self.service.uri.clone(),
connected: self.service.is_connected(),
session_key: self.session_key.as_ref().map(|keypair| keypair.public_key().to_owned()),
};
tx_resp.send(status)
}
Expand All @@ -120,8 +131,9 @@ impl PacketRouter {
_ = time::sleep_until(reconnect_sleep) => {
reconnect_sleep = self.handle_reconnect(&reconnect_backoff).await;
},
downlink = self.service.recv() => match downlink {
Ok(Some(message)) => self.handle_downlink(message).await,
router_message = self.service.recv() => match router_message {
Ok(Some(envelope_down_v1::Data::Packet(message))) => self.handle_downlink(message).await,
Ok(Some(envelope_down_v1::Data::SessionOffer(message))) => self.handle_session_offer(message).await,
Ok(None) => {
warn!("router disconnected");
reconnect_sleep = self.next_connect(&reconnect_backoff, true)
Expand Down Expand Up @@ -154,9 +166,11 @@ impl PacketRouter {
info!("connecting");
let inc_retry = match self.service.reconnect().await {
Ok(_) => {
// Do not send waiting packets here since we wait for a sesson
// offer
info!("connected");
self.reconnect_retry = RECONNECT_BACKOFF_RETRIES;
self.send_waiting_packets().await.is_err()
false
}
Err(err) => {
warn!(%err, "failed to connect");
Expand All @@ -169,7 +183,9 @@ impl PacketRouter {
async fn handle_uplink(&mut self, uplink: PacketUp, received: StdInstant) -> Result {
self.store.push_back(uplink, received);
if self.service.is_connected() {
self.send_waiting_packets().await?;
if let Some(session_key) = &self.session_key {
self.send_waiting_packets(session_key.clone()).await?;
}
}
Ok(())
}
Expand All @@ -178,12 +194,43 @@ impl PacketRouter {
self.transmit.downlink(message.into()).await;
}

async fn send_waiting_packets(&mut self) -> Result {
async fn handle_session_offer(&mut self, message: PacketRouterSessionOfferV1) {
info!("received session offer");
let disconnect = match mk_session_key_init(self.keypair.clone(), &message)
.and_then(|(session_key, session_init)| {
self.service.send(session_init).map_ok(|_| session_key)
})
.await
{
Ok(session_key) => {
self.session_key = Some(session_key.clone());
info!(session_key = %session_key.public_key(),"initialized session");
self.send_waiting_packets(session_key.clone())
.inspect_err(|err| warn!(%err, "failed to send queued packets"))
.await
.is_err()
}
Err(err) => {
warn!(%err, "failed to initialize session");
true
}
};
if disconnect {
self.disconnect();
}
}

fn disconnect(&mut self) {
self.service.disconnect();
self.session_key = None;
}

async fn send_waiting_packets(&mut self, keypair: Arc<Keypair>) -> Result {
while let (removed, Some(packet)) = self.store.pop_front(STORE_GC_INTERVAL) {
if removed > 0 {
info!(removed, "discarded queued packets");
}
if let Err(err) = self.send_packet(&packet).await {
if let Err(err) = self.send_packet(&packet, keypair.clone()).await {
warn!(%err, "failed to send uplink");
self.store.push_front(packet);
return Err(err);
Expand All @@ -192,22 +239,44 @@ impl PacketRouter {
Ok(())
}

async fn send_packet(&mut self, packet: &CacheMessage<PacketUp>) -> Result {
async fn send_packet(
&mut self,
packet: &CacheMessage<PacketUp>,
keypair: Arc<Keypair>,
) -> Result {
debug!(packet_hash = packet.hash().to_b64(), "sending packet");

let uplink = mk_uplink(packet, self.keypair.clone()).await?;
let uplink = mk_uplink(packet, keypair).await?;
self.service.send(uplink).await
}
}

pub async fn mk_uplink(
packet: &CacheMessage<PacketUp>,
keypair: Arc<Keypair>,
) -> Result<PacketRouterPacketUpV1> {
) -> Result<envelope_up_v1::Data> {
use std::ops::Deref;
let mut uplink: PacketRouterPacketUpV1 = packet.deref().into();
uplink.hold_time = packet.hold_time().as_millis() as u64;
uplink.gateway = keypair.public_key().into();
uplink.signature = sign(keypair, uplink.encode_to_vec()).await?;
Ok(uplink)
let envelope = envelope_up_v1::Data::Packet(uplink);
Ok(envelope)
}

pub async fn mk_session_key_init(
keypair: Arc<Keypair>,
offer: &PacketRouterSessionOfferV1,
) -> Result<(Arc<Keypair>, envelope_up_v1::Data)> {
let session_keypair = Arc::new(mk_session_keypair());
let session_key = session_keypair.public_key();

let mut session_init = PacketRouterSessionInitV1 {
gateway: keypair.public_key().into(),
session_key: session_key.into(),
nonce: offer.nonce.clone(),
signature: vec![],
};
session_init.signature = sign(keypair, session_init.encode_to_vec()).await?;
let envelope = envelope_up_v1::Data::SessionInit(session_init);
Ok((session_keypair, envelope))
}
17 changes: 8 additions & 9 deletions src/service/packet_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use helium_proto::{
services::{
router::{
envelope_down_v1, envelope_up_v1, EnvelopeDownV1, EnvelopeUpV1, PacketRouterClient,
PacketRouterPacketDownV1, PacketRouterPacketUpV1, PacketRouterRegisterV1,
PacketRouterRegisterV1,
},
Channel, Endpoint,
},
Expand Down Expand Up @@ -73,21 +73,19 @@ impl PacketRouterConduit {
Ok(Self { tx, rx })
}

async fn recv(&mut self) -> Result<Option<PacketRouterPacketDownV1>> {
async fn recv(&mut self) -> Result<Option<envelope_down_v1::Data>> {
match self.rx.message().await {
Ok(Some(msg)) => match msg.data {
Some(envelope_down_v1::Data::Packet(packet)) => Ok(Some(packet)),
Some(data) => Ok(Some(data)),
None => Err(DecodeError::invalid_envelope()),
},
Ok(None) => Ok(None),
Err(err) => Err(err.into()),
}
}

async fn send(&mut self, msg: PacketRouterPacketUpV1) -> Result {
let msg = EnvelopeUpV1 {
data: Some(envelope_up_v1::Data::Packet(msg)),
};
async fn send(&mut self, msg: envelope_up_v1::Data) -> Result {
let msg = EnvelopeUpV1 { data: Some(msg) };
Ok(self.tx.send(msg).await?)
}

Expand All @@ -99,6 +97,7 @@ impl PacketRouterConduit {
.as_millis() as u64,
gateway: keypair.public_key().into(),
signature: vec![],
session_capable: true,
};
msg.signature = sign(keypair.clone(), msg.encode_to_vec()).await?;
let msg = EnvelopeUpV1 {
Expand All @@ -117,7 +116,7 @@ impl PacketRouterService {
}
}

pub async fn send(&mut self, msg: PacketRouterPacketUpV1) -> Result {
pub async fn send(&mut self, msg: envelope_up_v1::Data) -> Result {
if self.conduit.is_none() {
self.connect().await?;
}
Expand All @@ -131,7 +130,7 @@ impl PacketRouterService {
}
}

pub async fn recv(&mut self) -> Result<Option<PacketRouterPacketDownV1>> {
pub async fn recv(&mut self) -> Result<Option<envelope_down_v1::Data>> {
// Since recv is usually called from a select loop we don't try a
// connect every time it is called since the rate for attempted
// connections in failure setups would be as high as the loop rate of
Expand Down

0 comments on commit d7fa801

Please sign in to comment.