Skip to content

Commit

Permalink
feat: Route notifications to autopush connection servers (#167)
Browse files Browse the repository at this point in the history
* Generate a message ID for each notification

* Use a default TTL of 0 in NotificationHeaders

* Impl Serialize for Notification and NotificationHeaders

* Add a Router trait and stub WebPushRouter

* Use async_trait in Router and add RouterResponse

* Implement direct-route-to-node happy-path for WebPushRouter

* Replace serde Serialize notification impls with more direct impls

* Add DynamoStorage::remove_node_id and use in webpush router

The Router async functions may not return Send futures, so the
async_trait annotation is modified to not require Send.

* Add DynamoStorage::store_message and use in webpush router

* Add a UserNotFound error kind to the common code and use in get_uaid

* Try to notify the node after storing a message, if available

* Remove the `updates.client.deleted` metric

Based on metrics spreadsheet discussion.

* Add RouterType enum for easy matching on the router type

* Take notification by reference when routing

* Add Routers extractor and use in webpush_route

* Add endpoint_url setting and remove unused database settings

* Remove settings banner, as actix already logs the data as INFO

* Fix the Location header TODO in webpush router

* Add RouterError

* Fix incorrect usage of UserNotFound error (remove it)

* Return the router response from the HTTP handler

* Fix missing Clone impl (rebase error)

* Fix serialization of WebPush notifications

Notifications were serialized without required fields (due to
`skip_serializing` in the autopush-common code). While this is correct
when giving the notification to the UA, the connection server needs
these fields. We now perform this serialization separately from the
autopush-common Notification serialization.

* Add debug and trace level logging to the WebPush router

* Ignore local configs

* Fix wrong content encoding key in header map

WebPush expects "encoding" instead of "content_encoding"

* Return an error if there is no TTL value

* Fix NotificationHeader tests after requiring TTL

* Update errnos for RouterError and NoTTL

Closes #161
  • Loading branch information
AzureMarker committed Jul 13, 2020
1 parent 91d483a commit e73dff1
Show file tree
Hide file tree
Showing 18 changed files with 626 additions and 65 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,6 @@ target
requirements.txt
test-requirements.txt
venv

# Local configs
*.local.toml
8 changes: 5 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions autoendpoint/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ actix-http = "1.0"
actix-web = "2.0"
actix-rt = "1.0"
actix-cors = "0.2.0"
async-trait = "0.1.36"
autopush_common = { path = "../autopush-common" }
backtrace = "0.3"
base64 = "0.12.1"
Expand All @@ -22,6 +23,7 @@ jsonwebtoken = "7.1.1"
lazy_static = "1.4.0"
openssl = "0.10"
regex = "1.3"
reqwest = "0.10.6"
sentry = { version = "0.18", features = ["with_curl_transport"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
Expand Down
35 changes: 27 additions & 8 deletions autoendpoint/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Error types and transformations

use crate::server::VapidError;
use crate::server::{RouterError, VapidError};
use actix_web::{
dev::{HttpResponseBuilder, ServiceResponse},
error::{PayloadError, ResponseError},
Expand Down Expand Up @@ -59,6 +59,9 @@ pub enum ApiErrorKind {
#[error(transparent)]
VapidError(#[from] VapidError),

#[error(transparent)]
Router(#[from] RouterError),

#[error(transparent)]
Uuid(#[from] uuid::Error),

Expand All @@ -84,13 +87,16 @@ pub enum ApiErrorKind {
#[error("{0}")]
InvalidEncryption(String),

#[error("Data payload must be smaller than {} bytes", .0)]
#[error("Data payload must be smaller than {0} bytes")]
PayloadTooLarge(usize),

/// Used if the API version given is not v1 or v2
#[error("Invalid API version")]
InvalidApiVersion,

#[error("Missing TTL value")]
NoTTL,

#[error("{0}")]
Internal(String),
}
Expand All @@ -100,11 +106,13 @@ impl ApiErrorKind {
pub fn status(&self) -> StatusCode {
match self {
ApiErrorKind::PayloadError(e) => e.status_code(),
ApiErrorKind::Router(e) => e.status(),

ApiErrorKind::Validation(_)
| ApiErrorKind::InvalidEncryption(_)
| ApiErrorKind::TokenHashValidation(_)
| ApiErrorKind::Uuid(_) => StatusCode::BAD_REQUEST,
| ApiErrorKind::Uuid(_)
| ApiErrorKind::NoTTL => StatusCode::BAD_REQUEST,

ApiErrorKind::NoUser | ApiErrorKind::NoSubscription => StatusCode::GONE,

Expand All @@ -124,16 +132,27 @@ impl ApiErrorKind {
/// Get the associated error number
pub fn errno(&self) -> Option<usize> {
match self {
ApiErrorKind::InvalidEncryption(_) => Some(110),
ApiErrorKind::VapidError(_)
| ApiErrorKind::TokenHashValidation(_)
| ApiErrorKind::Jwt(_) => Some(109),
ApiErrorKind::Router(e) => Some(e.errno()),

ApiErrorKind::InvalidToken => Some(102),

ApiErrorKind::NoUser => Some(103),
ApiErrorKind::NoSubscription => Some(106),

ApiErrorKind::PayloadError(PayloadError::Overflow)
| ApiErrorKind::PayloadTooLarge(_) => Some(104),

ApiErrorKind::NoSubscription => Some(106),

ApiErrorKind::VapidError(_)
| ApiErrorKind::TokenHashValidation(_)
| ApiErrorKind::Jwt(_) => Some(109),

ApiErrorKind::InvalidEncryption(_) => Some(110),

ApiErrorKind::NoTTL => Some(111),

ApiErrorKind::Internal(_) => Some(999),

_ => None,
}
}
Expand Down
1 change: 0 additions & 1 deletion autoendpoint/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
let _sentry_guard = configure_sentry();

// Run server...
debug!("{}", settings.banner());
let server = server::Server::with_settings(settings).expect("Could not start server");
info!("Server started");
server.await?;
Expand Down
1 change: 1 addition & 0 deletions autoendpoint/src/server/extractors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

pub mod notification;
pub mod notification_headers;
pub mod routers;
pub mod subscription;
pub mod token_info;
pub mod user;
99 changes: 97 additions & 2 deletions autoendpoint/src/server/extractors/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,15 @@ use actix_web::web::Data;
use actix_web::{FromRequest, HttpRequest};
use autopush_common::util::sec_since_epoch;
use cadence::Counted;
use fernet::MultiFernet;
use futures::{future, FutureExt, StreamExt};
use std::collections::HashMap;
use uuid::Uuid;

/// Extracts notification data from `Subscription` and request data
#[derive(Clone, Debug)]
pub struct Notification {
pub message_id: String,
pub subscription: Subscription,
pub headers: NotificationHeaders,
pub timestamp: u64,
Expand Down Expand Up @@ -52,9 +57,17 @@ impl FromRequest for Notification {
};

let headers = NotificationHeaders::from_request(&req, data.is_some())?;
let timestamp = sec_since_epoch();
let message_id = Self::generate_message_id(
&state.fernet,
&subscription.user.uaid,
&subscription.channel_id,
headers.topic.as_deref(),
timestamp,
);

// Record the encoding if we have an encrypted payload
if let Some(encoding) = &headers.content_encoding {
if let Some(encoding) = &headers.encoding {
if data.is_some() {
state
.metrics
Expand All @@ -64,12 +77,94 @@ impl FromRequest for Notification {
}

Ok(Notification {
message_id,
subscription,
headers,
timestamp: sec_since_epoch(),
timestamp,
data,
})
}
.boxed_local()
}
}

impl From<Notification> for autopush_common::notification::Notification {
fn from(notification: Notification) -> Self {
autopush_common::notification::Notification {
channel_id: notification.subscription.channel_id,
version: notification.message_id,
ttl: notification.headers.ttl as u64,
topic: notification.headers.topic.clone(),
timestamp: notification.timestamp,
data: notification.data,
sortkey_timestamp: Some(notification.timestamp),
headers: Some(notification.headers.into()),
}
}
}

impl Notification {
/// Generate a message-id suitable for accessing the message
///
/// For topic messages, a sort_key version of 01 is used, and the topic
/// is included for reference:
///
/// Encrypted('01' : uaid.hex : channel_id.hex : topic)
///
/// For non-topic messages, a sort_key version of 02 is used:
///
/// Encrypted('02' : uaid.hex : channel_id.hex : timestamp)
fn generate_message_id(
fernet: &MultiFernet,
uaid: &Uuid,
channel_id: &Uuid,
topic: Option<&str>,
timestamp: u64,
) -> String {
let message_id = if let Some(topic) = topic {
format!(
"01:{}:{}:{}",
uaid.to_simple_ref(),
channel_id.to_simple_ref(),
topic
)
} else {
format!(
"02:{}:{}:{}",
uaid.to_simple_ref(),
channel_id.to_simple_ref(),
timestamp
)
};

fernet.encrypt(message_id.as_bytes())
}

/// Serialize the notification for delivery to the connection server. Some
/// fields in `autopush_common`'s `Notification` are marked with
/// `#[serde(skip_serializing)]` so they are not shown to the UA. These
/// fields are still required when delivering to the connection server, so
/// we can't simply convert this notification type to that one and serialize
/// via serde.
pub fn serialize_for_delivery(&self) -> HashMap<&'static str, serde_json::Value> {
let mut map = HashMap::new();

map.insert(
"channelID",
serde_json::to_value(&self.subscription.channel_id).unwrap(),
);
map.insert("version", serde_json::to_value(&self.message_id).unwrap());
map.insert("ttl", serde_json::to_value(self.headers.ttl).unwrap());
map.insert("topic", serde_json::to_value(&self.headers.topic).unwrap());
map.insert("timestamp", serde_json::to_value(self.timestamp).unwrap());

if let Some(data) = &self.data {
map.insert("data", serde_json::to_value(&data).unwrap());

let headers: HashMap<_, _> = self.headers.clone().into();
map.insert("headers", serde_json::to_value(&headers).unwrap());
}

map
}
}
Loading

0 comments on commit e73dff1

Please sign in to comment.