diff --git a/.circleci/config.yml b/.circleci/config.yml index f4d8e09b..5ef6e129 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -92,10 +92,18 @@ jobs: name: Rust tests command: cargo test - run: - name: Integration tests + name: Integration tests (Autopush Legacy) command: | cd tests py.test -v + - run: + name: Integration tests (Autoconnect) + environment: + CONNECTION_BINARY: autoconnect + CONNECTION_SETTINGS_PREFIX: autoconnect__ + command: | + cd tests + py.test -v || true # currently has failures - save_cache: name: Save Python cache key: python-v1-{{ checksum "tests/requirements.txt" }}-{{ checksum "tests/test_integration_all_rust.py"}} diff --git a/Cargo.lock b/Cargo.lock index 2baae95f..2eb3a2d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -577,6 +577,7 @@ dependencies = [ "fernet", "lazy_static", "mozsvc-common", + "reqwest 0.11.16", "serde", "serde_derive", "slog", @@ -622,7 +623,8 @@ dependencies = [ "autoconnect_settings", "autoconnect_ws_sm", "futures 0.3.27", - "mockall 0.11.4", + "mockall", + "sentry", "serde_json", "slog-scope", "strum", @@ -634,11 +636,15 @@ dependencies = [ name = "autoconnect_ws_sm" version = "1.66.0" dependencies = [ + "actix-rt", + "actix-web", "actix-ws", "autoconnect_common", "autoconnect_settings", "autopush_common", "cadence", + "mockall", + "reqwest 0.11.16", "slog-scope", "thiserror", "tokio 1.27.0", @@ -669,7 +675,7 @@ dependencies = [ "jsonwebtoken", "lazy_static", "log", - "mockall 0.8.3", + "mockall", "mockito", "openssl", "rand 0.8.5", @@ -777,7 +783,7 @@ dependencies = [ "hyper 0.14.25", "lazy_static", "log", - "mockall 0.8.3", + "mockall", "mockito", "mozsvc-common", "openssl", @@ -1447,12 +1453,6 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "524cbf6897b527295dff137cec09ecf3a05f4fddffd7dfcd1585403449e74198" -[[package]] -name = "difflib" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8" - [[package]] name = "digest" version = "0.8.1" @@ -1537,12 +1537,6 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4bb454f0228b18c7f4c3b0ebbee346ed9c52e7443b0999cd543ff3571205701d" -[[package]] -name = "downcast" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" - [[package]] name = "dtoa" version = "0.4.8" @@ -1707,15 +1701,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "float-cmp" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "98de4bbd547a563b716d8dfa9aad1cb19bfab00f4fa09a6a4ed21dbcf44ce9c4" -dependencies = [ - "num-traits", -] - [[package]] name = "fnv" version = "1.0.7" @@ -2807,26 +2792,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "41cabea45a7fc0e37093f4f30a5e2b62602253f91791c057d5f0470c63260c3d" dependencies = [ "cfg-if 0.1.10", - "downcast 0.10.0", + "downcast", "fragile 1.2.2", "lazy_static", - "mockall_derive 0.8.3", - "predicates 1.0.8", - "predicates-tree", -] - -[[package]] -name = "mockall" -version = "0.11.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c84490118f2ee2d74570d114f3d0493cbf02790df303d2707606c3e14e07c96" -dependencies = [ - "cfg-if 1.0.0", - "downcast 0.11.0", - "fragile 2.0.0", - "lazy_static", - "mockall_derive 0.11.4", - "predicates 2.1.5", + "mockall_derive", + "predicates", "predicates-tree", ] @@ -2842,18 +2812,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "mockall_derive" -version = "0.11.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22ce75669015c4f47b289fd4d4f56e894e4c96003ffdf3ac51313126f94c6cbb" -dependencies = [ - "cfg-if 1.0.0", - "proc-macro2 1.0.54", - "quote 1.0.26", - "syn 1.0.109", -] - [[package]] name = "mockito" version = "0.31.1" @@ -3294,21 +3252,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f49cfaf7fdaa3bfacc6fa3e7054e65148878354a5cfddcf661df4c851f8021df" dependencies = [ "difference", - "float-cmp 0.8.0", - "normalize-line-endings", - "predicates-core", - "regex", -] - -[[package]] -name = "predicates" -version = "2.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59230a63c37f3e18569bdb90e4a89cbf5bf8b06fea0b84e65ea10cc4df47addd" -dependencies = [ - "difflib", - "float-cmp 0.9.0", - "itertools", + "float-cmp", "normalize-line-endings", "predicates-core", "regex", diff --git a/Cargo.toml b/Cargo.toml index 76bd677c..216fbc0b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,6 +53,7 @@ httparse = "1.3" hyper = "0.14" lazy_static = "1.4" log = { version = "0.4", features = ["max_level_debug", "release_max_level_info"] } +mockall = "0.8.3" # 0.9+ requires reworking tests mozsvc-common = "0.2" openssl = "0.10" rand = "0.8" diff --git a/autoconnect/autoconnect-common/src/test_support.rs b/autoconnect/autoconnect-common/src/test_support.rs index 8f7501e0..ec1eab45 100644 --- a/autoconnect/autoconnect-common/src/test_support.rs +++ b/autoconnect/autoconnect-common/src/test_support.rs @@ -1,6 +1,6 @@ use uuid::Uuid; -use autopush_common::db::{mock::MockDbClient, HelloResponse}; +use autopush_common::db::{mock::MockDbClient, User}; pub const UA: &str = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/110.0"; @@ -15,20 +15,37 @@ pub const HELLO: &str = r#"{"messageType": "hello", "use_webpush": true}"#; pub const HELLO_AGAIN: &str = r#"{"messageType": "hello", "use_webpush": true, "uaid": "deadbeef-0000-0000-deca-fbad00000000"}"#; +pub const CURRENT_MONTH: &str = "message_2018_06"; + /// Return a simple MockDbClient that responds to hello (once) with a new uaid. pub fn hello_db() -> MockDbClient { - hello_again_db(uuid::Uuid::new_v4()) + let mut db = MockDbClient::new(); + db.expect_message_table() + .times(1) + .return_const(CURRENT_MONTH.to_owned()); + db } /// Return a simple MockDbClient that responds to hello (once) with the /// specified uaid. pub fn hello_again_db(uaid: Uuid) -> MockDbClient { let mut db = MockDbClient::new(); - db.expect_hello().times(1).return_once(move |_, _, _, _| { - Ok(HelloResponse { - uaid: Some(uaid), + db.expect_get_user().times(1).return_once(move |_| { + Ok(Some(User { + uaid, + current_month: Some(CURRENT_MONTH.to_owned()), ..Default::default() - }) + })) }); + db.expect_message_table() + .times(1) + .return_const(CURRENT_MONTH.to_owned()); + db.expect_update_user().times(1).return_once(|_| Ok(())); + db.expect_fetch_messages() + .times(1) + .return_once(|_, _| Ok(Default::default())); + db.expect_fetch_timestamp_messages() + .times(1) + .return_once(|_, _, _| Ok(Default::default())); db } diff --git a/autoconnect/autoconnect-settings/Cargo.toml b/autoconnect/autoconnect-settings/Cargo.toml index 08ba80e7..46a595bf 100644 --- a/autoconnect/autoconnect-settings/Cargo.toml +++ b/autoconnect/autoconnect-settings/Cargo.toml @@ -10,6 +10,7 @@ config.workspace = true fernet.workspace = true lazy_static.workspace = true mozsvc-common.workspace = true +reqwest.workspace = true serde.workspace = true serde_derive.workspace = true slog.workspace = true diff --git a/autoconnect/autoconnect-settings/src/app_state.rs b/autoconnect/autoconnect-settings/src/app_state.rs index dd238555..94a99df1 100644 --- a/autoconnect/autoconnect-settings/src/app_state.rs +++ b/autoconnect/autoconnect-settings/src/app_state.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use cadence::StatsdClient; use fernet::{Fernet, MultiFernet}; @@ -17,6 +17,7 @@ pub struct AppState { /// Handle to the data storage object pub db: Box, pub metrics: Arc, + pub http: reqwest::Client, /// Encryption object for the endpoint URL pub fernet: MultiFernet, @@ -63,6 +64,10 @@ impl AppState { StorageType::DynamoDb => Box::new(DdbClientImpl::new(metrics.clone(), &db_settings)?), StorageType::INVALID => panic!("Invalid Storage type. Check {}_DB_DSN.", ENV_PREFIX), }; + let http = reqwest::Client::builder() + .timeout(Duration::from_secs(1)) + .build() + .unwrap_or_else(|e| panic!("Error while building reqwest::Client: {}", e)); let router_url = settings.router_url(); let endpoint_url = settings.endpoint_url(); @@ -70,6 +75,7 @@ impl AppState { Ok(Self { db, metrics, + http, fernet, clients: Arc::new(ClientRegistry::default()), settings, diff --git a/autoconnect/autoconnect-web/src/lib.rs b/autoconnect/autoconnect-web/src/lib.rs index 606d6c06..75bd68d2 100644 --- a/autoconnect/autoconnect-web/src/lib.rs +++ b/autoconnect/autoconnect-web/src/lib.rs @@ -8,13 +8,12 @@ extern crate slog_scope; pub mod client; pub mod dockerflow; pub mod metrics; +pub mod routes; #[cfg(test)] mod test; use actix_web::web; -use autoconnect_ws::ws_handler; - /// Requires import of the `config` function also in this module to use. #[macro_export] macro_rules! build_app { @@ -38,12 +37,14 @@ macro_rules! build_app { pub fn config(cfg: &mut web::ServiceConfig) { cfg // Websocket Handler - .route("/", web::get().to(ws_handler)) - // TODO: Internode Message handler - //.service(web::resource("/push/{uaid}").route(web::push().to(autoconnect_web::route::InterNode::put)) + .route("/", web::get().to(autoconnect_ws::ws_handler)) + .service(web::resource("/push/{uaid}").route(web::put().to(crate::routes::push_route))) + .service( + web::resource("/notif/{uaid}").route(web::put().to(crate::routes::check_storage_route)), + ) .service(web::resource("/status").route(web::get().to(dockerflow::status_route))) .service(web::resource("/health").route(web::get().to(dockerflow::health_route))) - .service(web::resource("/v1/err").route(web::get().to(dockerflow::log_check))) + .service(web::resource("/v1/err/crit").route(web::get().to(dockerflow::log_check))) // standardized .service(web::resource("/__error__").route(web::get().to(dockerflow::log_check))) // Dockerflow diff --git a/autoconnect/autoconnect-web/src/route.rs b/autoconnect/autoconnect-web/src/route.rs deleted file mode 100644 index 54b4419d..00000000 --- a/autoconnect/autoconnect-web/src/route.rs +++ /dev/null @@ -1,35 +0,0 @@ -/// Handle incoming notifications from endpoints. -/// - -use actix_web::{ - FromRequest, HttpRequest, HttpResponse, - dev::{Payload, PayloadStream}, - web::{Data, Json}, -}; -use reqwest::StatusCode; -use serde_json::json; -use uuid::Uuid; - -use autopush_common::error::{ApiError, ApiErrorKind}; - - -use autoconnect_settings::options::AppState; - -pub struct InterNodeArgs { - pub uaid: Uuid -} - -impl FromRequest for InterNodeArgs { - type Error = ApiError; - -} - -pub struct InterNode { - -} - -impl InterNode { - pub fn put(state: Data) -> HttpResponse { - - } -} diff --git a/autoconnect/autoconnect-web/src/routes.rs b/autoconnect/autoconnect-web/src/routes.rs new file mode 100644 index 00000000..974d98cd --- /dev/null +++ b/autoconnect/autoconnect-web/src/routes.rs @@ -0,0 +1,42 @@ +/// Handle incoming notifications from autoendpoint +use actix_web::{web, HttpResponse}; +use uuid::Uuid; + +use autoconnect_settings::AppState; +use autopush_common::notification::Notification; + +/// Deliver a Push notification directly to a connected client +pub async fn push_route( + uaid: web::Path, + notif: web::Json, + state: web::Data, +) -> HttpResponse { + trace!( + "push_route, uaid: {} channel_id: {}", + uaid, + notif.channel_id + ); + let result = state + .clients + .notify(uaid.into_inner(), notif.into_inner()) + .await; + if result.is_ok() { + HttpResponse::Ok().finish() + } else { + HttpResponse::NotFound().body("Client not available") + } +} + +/// Notify a connected client to check storage for new notifications +pub async fn check_storage_route( + uaid: web::Path, + state: web::Data, +) -> HttpResponse { + trace!("check_storage_route, uaid: {}", uaid); + let result = state.clients.check_storage(uaid.into_inner()).await; + if result.is_ok() { + HttpResponse::Ok().finish() + } else { + HttpResponse::NotFound().body("Client not available") + } +} diff --git a/autoconnect/autoconnect-web/src/test.rs b/autoconnect/autoconnect-web/src/test.rs index f06c852f..a92f0bef 100644 --- a/autoconnect/autoconnect-web/src/test.rs +++ b/autoconnect/autoconnect-web/src/test.rs @@ -124,6 +124,7 @@ pub async fn malformed_webpush_message() { panic!("Expected Close(Some(..)) not {:#?}", item); }; assert_eq!(close_reason.code, actix_http::ws::CloseCode::Error); + assert_eq!(close_reason.description.unwrap(), "Json"); assert!(framed.next().await.is_none()); } diff --git a/autoconnect/autoconnect-ws/Cargo.toml b/autoconnect/autoconnect-ws/Cargo.toml index ef2236f0..3a6bc4d6 100644 --- a/autoconnect/autoconnect-ws/Cargo.toml +++ b/autoconnect/autoconnect-ws/Cargo.toml @@ -12,13 +12,14 @@ actix-rt.workspace = true actix-web.workspace = true actix-ws.workspace = true futures.workspace = true +mockall.workspace = true serde_json.workspace = true +sentry.workspace = true slog-scope.workspace = true thiserror.workspace = true tokio.workspace = true async-trait = "0.1" -mockall = "0.11" strum = { version = "0.24", features = ["derive"] } autoconnect_common.workspace = true diff --git a/autoconnect/autoconnect-ws/autoconnect-ws-sm/Cargo.toml b/autoconnect/autoconnect-ws/autoconnect-ws-sm/Cargo.toml index 200be60f..f36cdf9c 100644 --- a/autoconnect/autoconnect-ws/autoconnect-ws-sm/Cargo.toml +++ b/autoconnect/autoconnect-ws/autoconnect-ws-sm/Cargo.toml @@ -7,8 +7,10 @@ authors.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +actix-web.workspace = true actix-ws.workspace = true cadence.workspace = true +reqwest.workspace = true slog-scope.workspace = true uuid.workspace = true thiserror.workspace = true @@ -18,6 +20,8 @@ autoconnect_settings.workspace = true autopush_common.workspace = true [dev-dependencies] +actix-rt.workspace = true +mockall.workspace = true tokio.workspace = true autoconnect_common = { workspace = true, features = ["test-support"] } diff --git a/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/error.rs b/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/error.rs index 5e68703a..5d6e621b 100644 --- a/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/error.rs +++ b/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/error.rs @@ -11,6 +11,12 @@ pub enum SMError { #[error("Invalid WebPush message: {0}")] InvalidMessage(String), + #[error("Internal error: {0}")] + Internal(String), + + #[error("Reqwest error: {0}")] + Reqwest(#[from] reqwest::Error), + #[error("UAID dropped")] UaidReset, diff --git a/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/identified/mod.rs b/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/identified/mod.rs index 4082ec81..b4c362b0 100644 --- a/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/identified/mod.rs +++ b/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/identified/mod.rs @@ -1,12 +1,14 @@ -use std::{fmt, sync::Arc}; +use std::{fmt, mem, sync::Arc}; -use cadence::Timed; +use actix_web::rt; +use cadence::{CountedExt, Timed}; use uuid::Uuid; use autoconnect_common::protocol::ServerMessage; + use autoconnect_settings::AppState; use autopush_common::{ - db::{HelloResponse, User}, + db::{error::DbResult, User, USER_RECORD_VERSION}, notification::Notification, util::{ms_since_epoch, user_agent::UserAgentInfo}, }; @@ -16,25 +18,36 @@ use crate::error::SMError; mod on_client_msg; mod on_server_notif; -/// TODO: more docs in this module +/// A WebPush Client that's successfully identified itself to the server via a +/// Hello message. +/// +/// The `webpush_ws` handler feeds input from both the WebSocket connection +/// (`ClientMessage`) and the `ClientRegistry` (`ServerNotification`) +/// triggered by autoendpoint to this type's `on_client_msg` and +/// `on_server_notif` methods whose impls reside in their own modules. +/// +/// Note the `check_storage` method (in the `on_server_notif` module) is +/// triggered by both a `ServerNotification` and also the `new` constructor pub struct WebPushClient { /// Push User Agent identifier. Each Push client recieves a unique UAID pub uaid: Uuid, /// Unique, local (to each autoconnect instance) identifier pub uid: Uuid, /// The User Agent information block derived from the User-Agent header - ua_info: UserAgentInfo, + pub ua_info: UserAgentInfo, //broadcast_subs: BroadcastSubs, flags: ClientFlags, ack_state: AckState, /// Count of messages sent from storage (for enforcing - /// `settings.msg_limit`). Resets to 0 when storage is emptied. - #[allow(dead_code)] + /// `settings.msg_limit`). Resets to 0 when storage is emptied sent_from_storage: u32, - /// Exists when we didn't register this new user during Hello - deferred_user_registration: Option, + /// Exists for new User records: these are not written to the db during + /// Hello, instead they're lazily added to the db on their first Register + /// message + deferred_add_user: Option, + /// WebPush Session Statistics stats: SessionStatistics, /// Timestamp of when the UA connected (used by database lookup, thus u64) @@ -51,13 +64,11 @@ impl fmt::Debug for WebPushClient { .field("uaid", &self.uaid) .field("uid", &self.uid) .field("ua_info", &self.ua_info) + //.field("broadcast_subs", &self.broadcast_subs) .field("flags", &self.flags) .field("ack_state", &self.ack_state) .field("sent_from_storage", &self.sent_from_storage) - .field( - "deferred_user_registration", - &self.deferred_user_registration, - ) + .field("deferred_add_user", &self.deferred_add_user) .field("stats", &self.stats) .field("connected_at", &self.connected_at) .field("last_ping", &self.last_ping) @@ -71,9 +82,14 @@ impl WebPushClient { ua: String, flags: ClientFlags, connected_at: u64, - deferred_user_registration: Option, + deferred_add_user: Option, app_state: Arc, ) -> Result<(Self, Vec), SMError> { + trace!("WebPushClient::new"); + let stats = SessionStatistics { + existing_uaid: deferred_add_user.is_none(), + ..Default::default() + }; let mut client = WebPushClient { uaid, uid: Uuid::new_v4(), @@ -82,35 +98,144 @@ impl WebPushClient { ack_state: Default::default(), sent_from_storage: Default::default(), connected_at, - deferred_user_registration, + deferred_add_user, last_ping: Default::default(), - stats: Default::default(), + stats, app_state, }; + let smsgs = if client.flags.check_storage { client.check_storage().await? } else { vec![] }; + debug!( + "WebPushClient::new: Initial check_storage smsgs count: {}", + smsgs.len() + ); Ok((client, smsgs)) } - pub fn shutdown(&mut self) { - // TODO: logging - let now = ms_since_epoch(); - let elapsed = (now - self.connected_at) / 1_000; + /// Cleanup after the session has ended + pub fn shutdown(&mut self, reason: Option) { + trace!("WebPushClient::shutdown"); + self.save_and_notify_unacked_direct_notifs(); + + let ua_info = &self.ua_info; + let stats = &self.stats; + let elapsed_sec = (ms_since_epoch() - self.connected_at) / 1_000; self.app_state .metrics - .time_with_tags("ua.connection.lifespan", elapsed) - .with_tag("ua_os_family", &self.ua_info.metrics_os) - .with_tag("ua_browser_family", &self.ua_info.metrics_browser) + .time_with_tags("ua.connection.lifespan", elapsed_sec) + .with_tag("ua_os_family", &ua_info.metrics_os) + .with_tag("ua_browser_family", &ua_info.metrics_browser) .send(); - // TODO: save unacked notifs, logging + // Log out the final stats message + info!("Session"; + "uaid_hash" => self.uaid.as_simple().to_string(), + "uaid_reset" => self.flags.reset_uaid, + "existing_uaid" => stats.existing_uaid, + "connection_type" => "webpush", + "ua_name" => &ua_info.browser_name, + "ua_os_family" => &ua_info.metrics_os, + "ua_os_ver" => &ua_info.os_version, + "ua_browser_family" => &ua_info.metrics_browser, + "ua_browser_ver" => &ua_info.browser_version, + "ua_category" => &ua_info.category, + "connection_time" => elapsed_sec, + "direct_acked" => stats.direct_acked, + "direct_storage" => stats.direct_storage, + "stored_retrieved" => stats.stored_retrieved, + "stored_acked" => stats.stored_acked, + "nacks" => stats.nacks, + "registers" => stats.registers, + "unregisters" => stats.unregisters, + "disconnect_reason" => reason.unwrap_or_else(|| "".to_owned()), + ); + } + + /// Save any Direct unAck'd messages to the db (on shutdown) + /// + /// Direct messages are solely stored in memory until Ack'd by the Client, + /// so on shutdown, any not Ack'd are stored in the db to not be lost + fn save_and_notify_unacked_direct_notifs(&mut self) { + let mut notifs = mem::take(&mut self.ack_state.unacked_direct_notifs); + trace!( + "WebPushClient::save_and_notify_unacked_direct_notifs len: {}", + notifs.len() + ); + if notifs.is_empty() { + return; + } + + self.stats.direct_storage += notifs.len() as i32; + // TODO: clarify this comment re the Python version + // Ensure we don't store these as legacy by setting a 0 as the + // sortkey_timestamp. This ensures the Python side doesn't mark it as + // legacy during conversion and still get the correct default us_time + // when saving + for notif in &mut notifs { + notif.sortkey_timestamp = Some(0); + } + + let app_state = Arc::clone(&self.app_state); + let uaid = self.uaid; + let connected_at = self.connected_at; + rt::spawn(async move { + app_state.db.save_messages(&uaid, notifs).await?; + debug!("Finished saving unacked direct notifs, checking for reconnect"); + let Some(user) = app_state.db.get_user(&uaid).await? else { + return Err(SMError::Internal(format!("User not found for unacked direct notifs: {uaid}"))); + }; + if connected_at == user.connected_at { + return Ok(()); + } + if let Some(node_id) = user.node_id { + app_state + .http + .put(&format!("{}/notif/{}", node_id, uaid.as_simple())) + .send() + .await? + .error_for_status()?; + } + Ok(()) + }); + } +} + +/// Ensure an existing user's record is valid, returning its `ClientFlags` +/// +/// Somewhat similar to autoendpoint's `validate_webpush_user` function. When a +/// User record is invalid it will be dropped from the db and `None` will be +/// returned. +pub async fn process_existing_user( + app_state: &Arc, + user: &User, +) -> DbResult> { + if user.current_month.is_none() { + app_state + .metrics + .incr_with_tags("ua.expiration") + .with_tag("errno", "104") + .send(); + app_state.db.remove_user(&user.uaid).await?; + return Ok(None); } + // TODO: (but probably not) drop the user if their current_month is not in + // the db's list of message_tables + + let flags = ClientFlags { + check_storage: true, + reset_uaid: user + .record_version + .map_or(true, |rec_ver| rec_ver < USER_RECORD_VERSION), + rotate_message_table: user.current_month.as_deref() != Some(app_state.db.message_table()), + ..Default::default() + }; + Ok(Some(flags)) } -#[allow(dead_code)] #[derive(Debug)] pub struct ClientFlags { /// Whether check_storage queries for topic (not "timestamped") messages @@ -124,17 +249,6 @@ pub struct ClientFlags { rotate_message_table: bool, } -impl ClientFlags { - pub fn from_hello(hello_response: &HelloResponse) -> Self { - Self { - check_storage: hello_response.check_storage, - reset_uaid: hello_response.reset_uaid, - rotate_message_table: hello_response.rotate_message_table, - ..Default::default() - } - } -} - impl Default for ClientFlags { fn default() -> Self { Self { @@ -147,27 +261,31 @@ impl Default for ClientFlags { } } -#[allow(dead_code)] +/// WebPush Session Statistics +/// +/// Tracks statistics about the session that are logged when the session's +/// closed #[derive(Debug, Default)] pub struct SessionStatistics { - /// Number of acknowledged messages that were sent directly (not vai storage) + /// Number of acknowledged messages that were sent directly (not via storage) direct_acked: i32, - /// number of messages sent to storage + /// Number of messages sent to storage direct_storage: i32, - /// number of messages taken from storage + /// Number of messages taken from storage stored_retrieved: i32, - /// number of message pulled from storage and acknowledged + /// Number of message pulled from storage and acknowledged stored_acked: i32, - /// number of messages total that are not acknowledged. + /// Number of messages total that are not acknowledged. nacks: i32, - /// number of unregister requests made + /// Number of unregister requests unregisters: i32, - /// number of register requests made + /// Number of register requests registers: i32, + /// Whether this uaid was previously registered + existing_uaid: bool, } /// Record of Notifications sent to the Client. -#[allow(dead_code)] #[derive(Debug, Default)] struct AckState { /// List of unAck'd directly sent (never stored) notifications @@ -180,6 +298,8 @@ struct AckState { } impl AckState { + /// Whether the Client has outstanding notifications sent to it that it has + /// yet to Ack fn unacked_notifs(&self) -> bool { !self.unacked_stored_notifs.is_empty() || !self.unacked_direct_notifs.is_empty() } @@ -192,11 +312,14 @@ mod tests { use uuid::Uuid; use autoconnect_common::{ - protocol::{ClientMessage, ServerMessage}, + protocol::{ClientMessage, ServerMessage, ServerNotification}, test_support::{DUMMY_UAID, UA}, }; use autoconnect_settings::AppState; - use autopush_common::util::ms_since_epoch; + use autopush_common::{ + db::{client::FetchMessageResponse, mock::MockDbClient}, + util::{ms_since_epoch, sec_since_epoch}, + }; use super::WebPushClient; @@ -213,10 +336,72 @@ mod tests { .unwrap() } - #[tokio::test] + #[actix_rt::test] async fn webpush_ping() { let (mut client, _) = wpclient(DUMMY_UAID, Default::default()).await; let pong = client.on_client_msg(ClientMessage::Ping).await.unwrap(); assert!(matches!(pong.as_slice(), [ServerMessage::Ping])); } + + #[actix_rt::test] + async fn expired_increments_storage() { + let mut db = MockDbClient::new(); + let mut seq = mockall::Sequence::new(); + let timestamp = sec_since_epoch(); + // No topic messages + db.expect_fetch_messages() + .times(1) + .in_sequence(&mut seq) + .return_once(move |_, _| { + Ok(FetchMessageResponse { + timestamp: None, + messages: vec![], + }) + }); + // Return expired notifs (default ttl of 0) + db.expect_fetch_timestamp_messages() + .times(1) + .in_sequence(&mut seq) + .return_once(move |_, _, _| { + Ok(FetchMessageResponse { + timestamp: Some(timestamp), + messages: vec![Default::default(), Default::default()], + }) + }); + // EOF + db.expect_fetch_timestamp_messages() + .times(1) + .in_sequence(&mut seq) + .withf(move |_, ts, _| ts == &Some(timestamp)) + .return_once(|_, _, _| { + Ok(FetchMessageResponse { + timestamp: None, + messages: vec![], + }) + }); + // Ensure increment_storage's called to advance the timestamp messages + // despite check_storage returning nothing (all filtered out as + // expired) + db.expect_increment_storage() + .times(1) + .in_sequence(&mut seq) + .withf(move |_, ts| ts == ×tamp) + .return_once(|_, _| Ok(())); + + // No check_storage called here (via default ClientFlags) + let (mut client, _) = wpclient( + DUMMY_UAID, + AppState { + db: db.into_boxed_arc(), + ..Default::default() + }, + ) + .await; + + let smsgs = client + .on_server_notif(ServerNotification::CheckStorage) + .await + .unwrap(); + assert!(smsgs.is_empty()) + } } diff --git a/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/identified/on_client_msg.rs b/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/identified/on_client_msg.rs index d7289127..c1161f74 100644 --- a/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/identified/on_client_msg.rs +++ b/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/identified/on_client_msg.rs @@ -10,6 +10,8 @@ use super::WebPushClient; use crate::error::SMError; impl WebPushClient { + /// Handle a WebPush `ClientMessage` sent from the user agent over the + /// WebSocket for this user pub async fn on_client_msg( &mut self, msg: ClientMessage, @@ -25,18 +27,18 @@ impl WebPushClient { Ok(vec![self.unregister(channel_id, code).await?]) } ClientMessage::BroadcastSubscribe { broadcasts } => Ok(self - .broadcast_subscribe(broadcasts) - .await? + .broadcast_subscribe(broadcasts)? .map_or_else(Vec::new, |smsg| vec![smsg])), ClientMessage::Ack { updates } => self.ack(&updates).await, ClientMessage::Nack { code, .. } => { - self.nack(code).await?; + self.nack(code); Ok(vec![]) } - ClientMessage::Ping => Ok(vec![self.ping().await?]), + ClientMessage::Ping => Ok(vec![self.ping()?]), } } + /// Register a new Push subscription async fn register( &mut self, channel_id_str: String, @@ -55,26 +57,26 @@ impl WebPushClient { ))); } - let smsg = match self.do_register(&channel_id, key).await { + let (status, push_endpoint) = match self.do_register(&channel_id, key).await { Ok(endpoint) => { let _ = self.app_state.metrics.incr("ua.command.register"); self.stats.registers += 1; - ServerMessage::Register { - channel_id, - status: 200, - push_endpoint: endpoint, - } + (200, endpoint) + } + Err(SMError::MakeEndpoint(msg)) => { + error!("WebPushClient::register make_endpoint failed: {}", msg); + (400, "Failed to generate endpoint".to_owned()) } Err(e) => { error!("WebPushClient::register failed: {}", e); - ServerMessage::Register { - channel_id, - status: 500, - push_endpoint: "".to_owned(), - } + (500, "".to_owned()) } }; - Ok(smsg) + Ok(ServerMessage::Register { + channel_id, + status, + push_endpoint, + }) } async fn do_register( @@ -82,13 +84,13 @@ impl WebPushClient { channel_id: &Uuid, key: Option, ) -> Result { - if let Some(user) = &self.deferred_user_registration { - trace!( - "💬WebPushClient::register: User not yet registered... {:?}", + if let Some(user) = &self.deferred_add_user { + debug!( + "💬WebPushClient::register: User not yet registered: {}", &user.uaid ); self.app_state.db.add_user(user).await?; - self.deferred_user_registration = None; + self.deferred_add_user = None; } let endpoint = make_endpoint( @@ -106,6 +108,7 @@ impl WebPushClient { Ok(endpoint) } + /// Unregister an existing Push subscription async fn unregister( &mut self, channel_id: Uuid, @@ -124,7 +127,7 @@ impl WebPushClient { .db .remove_channel(&self.uaid, &channel_id) .await; - let smsg = match result { + let status = match result { Ok(_) => { self.app_state .metrics @@ -132,70 +135,154 @@ impl WebPushClient { .with_tag("code", &code.unwrap_or(200).to_string()) .send(); self.stats.unregisters += 1; - ServerMessage::Unregister { - channel_id, - status: 200, - } + 200 } Err(e) => { error!("WebPushClient::unregister failed: {}", e); - ServerMessage::Unregister { - channel_id, - status: 500, - } + 500 } }; - Ok(smsg) + Ok(ServerMessage::Unregister { channel_id, status }) } - async fn broadcast_subscribe( + /// Subscribe to a new set of Broadcasts + fn broadcast_subscribe( &mut self, _broadcasts: HashMap, ) -> Result, SMError> { + trace!("WebPushClient:broadcast_subscribe"); unimplemented!(); } - async fn ack(&mut self, _updates: &[ClientAck]) -> Result, SMError> { - // TODO: - self.maybe_post_process_acks().await + /// Acknowledge receipt of one or more Push Notifications + async fn ack(&mut self, updates: &[ClientAck]) -> Result, SMError> { + trace!("WebPushClient:ack"); + let _ = self.app_state.metrics.incr("ua.command.ack"); + + for notif in updates { + let pos = self + .ack_state + .unacked_direct_notifs + .iter() + .position(|n| n.channel_id == notif.channel_id && n.version == notif.version); + if let Some(pos) = pos { + debug!("Ack (Direct)"; + "channel_id" => notif.channel_id.as_hyphenated().to_string(), + "version" => ¬if.version + ); + self.ack_state.unacked_direct_notifs.remove(pos); + self.stats.direct_acked += 1; + continue; + }; + + let pos = self + .ack_state + .unacked_stored_notifs + .iter() + .position(|n| n.channel_id == notif.channel_id && n.version == notif.version); + if let Some(pos) = pos { + debug!( + "Ack (Stored)"; + "channel_id" => notif.channel_id.as_hyphenated().to_string(), + "version" => ¬if.version + ); + let n = &self.ack_state.unacked_stored_notifs[pos]; + // Topic/legacy messages have no sortkey_timestamp + if n.sortkey_timestamp.is_none() { + debug!( + "WebPushClient:ack removing Stored, sort_key: {}", + &n.sort_key() + ); + self.app_state + .db + .remove_message(&self.uaid, &n.sort_key()) + .await?; + } + self.ack_state.unacked_stored_notifs.remove(pos); + self.stats.stored_acked += 1; + continue; + }; + } + + if self.ack_state.unacked_notifs() { + // Wait for the Client to Ack all notifications before further + // processing + Ok(vec![]) + } else { + self.post_process_all_acked().await + } } - async fn nack(&mut self, _code: Option) -> Result<(), SMError> { - unimplemented!(); + /// Negative Acknowledgement (a Client error occurred) of one or more Push + /// Notifications + fn nack(&mut self, code: Option) { + trace!("WebPushClient:nack"); + // only metric codes expected from the client (or 0) + let code = code + .and_then(|code| (301..=303).contains(&code).then_some(code)) + .unwrap_or(0); + self.app_state + .metrics + .incr_with_tags("ua.command.nack") + .with_tag("code", &code.to_string()) + .send(); + self.stats.nacks += 1; } - async fn ping(&mut self) -> Result { + /// Handle a WebPush Ping + /// + /// Note this is the WebPush Protocol level's Ping: this differs from the + /// lower level WebSocket Ping frame (handled by the `webpush_ws` handler). + fn ping(&mut self) -> Result { + trace!("WebPushClient:ping"); // TODO: why is this 45 vs the comment describing a minute? and 45 // should be a setting // Clients shouldn't ping > than once per minute or we disconnect them if sec_since_epoch() - self.last_ping >= 45 { trace!("🏓WebPushClient Got a WebPush Ping, sending WebPush Pong"); + self.last_ping = sec_since_epoch(); Ok(ServerMessage::Ping) } else { Err(SMError::ExcessivePing) } } - async fn maybe_post_process_acks(&mut self) -> Result, SMError> { - if self.ack_state.unacked_notifs() { - // Waiting for the Client to Ack all notifications it's been sent - // before further processing - return Ok(vec![]); + /// Post process the Client succesfully Ack'ing all Push Notifications it's + /// been sent. + /// + /// TODO: more docs + async fn post_process_all_acked(&mut self) -> Result, SMError> { + trace!("▶️ WebPushClient:post_process_all_acked"); + let flags = &self.flags; + if flags.check_storage { + if flags.increment_storage { + debug!("▶️ WebPushClient:post_process_all_acked check_storage && increment_storage"); + self.increment_storage().await?; + } + + debug!("▶️ WebPushClient:post_process_all_acked check_storage"); + let smsgs = self.check_storage_loop().await?; + if !smsgs.is_empty() { + debug_assert!(self.flags.check_storage); + // More outgoing notifications: send them out and go back to + // waiting for the Client to Ack them all before further + // processing + return Ok(smsgs); + } + // Otherwise check_storage is finished + debug_assert!(!self.flags.check_storage); + debug_assert!(!self.flags.increment_storage); } - // TODO: + // All Ack'd and finished checking/incrementing storage + debug_assert!(!self.ack_state.unacked_notifs()); let flags = &self.flags; - if flags.check_storage && flags.increment_storage { - trace!("WebPushClient:maybe_post_process_acks check_storage && increment_storage"); - unimplemented!() - } else if flags.check_storage { - trace!("WebPushClient:maybe_post_process_acks check_storage"); - self.check_storage().await - } else if flags.rotate_message_table { - trace!("WebPushClient:maybe_post_process_acks rotate_message_table"); + if flags.rotate_message_table { + // TODO: probably remove entirely + debug!("▶️ WebPushClient:post_process_all_acked rotate_message_table"); unimplemented!() } else if flags.reset_uaid { - trace!("WebPushClient:maybe_post_process_acks reset_uaid"); + debug!("▶️ WebPushClient:post_process_all_acked reset_uaid"); self.app_state.db.remove_user(&self.uaid).await?; Err(SMError::UaidReset) } else { diff --git a/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/identified/on_server_notif.rs b/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/identified/on_server_notif.rs index 40be8e01..253338eb 100644 --- a/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/identified/on_server_notif.rs +++ b/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/identified/on_server_notif.rs @@ -1,39 +1,300 @@ +use actix_web::rt; +use cadence::{Counted, CountedExt}; + use autoconnect_common::protocol::{ServerMessage, ServerNotification}; -use autopush_common::notification::Notification; +use autopush_common::{ + db::CheckStorageResponse, notification::Notification, util::sec_since_epoch, +}; use super::WebPushClient; use crate::error::SMError; impl WebPushClient { + /// Handle a `ServerNotification` for this user + /// + /// `ServerNotification::Disconnect` is emitted by the same autoconnect + /// node recieving it when a User has logged into that same node twice to + /// "Ghost" (disconnect) the first user's session for its second session. + /// + /// Other variants are emitted by autoendpoint pub async fn on_server_notif( &mut self, snotif: ServerNotification, ) -> Result, SMError> { match snotif { + ServerNotification::Notification(notif) => Ok(vec![self.notif(notif)?]), ServerNotification::CheckStorage => self.check_storage().await, - ServerNotification::Notification(notif) => Ok(vec![self.notif(notif).await?]), ServerNotification::Disconnect => Err(SMError::Ghost), } } - /// Move queued push notifications to unacked_direct_notifs (to be stored - /// in the db) + /// After disconnecting from the `ClientRegistry`, moves any queued Direct + /// Push Notifications to unacked_direct_notifs (to be stored in the db on + /// `shutdown`) pub fn on_server_notif_shutdown(&mut self, snotif: ServerNotification) { + trace!("WebPushClient::on_server_notif_shutdown"); if let ServerNotification::Notification(notif) = snotif { self.ack_state.unacked_direct_notifs.push(notif); } } - pub(super) async fn check_storage(&mut self) -> Result, SMError> { - // TODO: - Ok(vec![]) - } - - async fn notif(&mut self, notif: Notification) -> Result { + /// Send a Direct Push Notification to this user + fn notif(&mut self, notif: Notification) -> Result { trace!("WebPushClient::notif Sending a direct notif"); if notif.ttl != 0 { self.ack_state.unacked_direct_notifs.push(notif.clone()); } + self.emit_send_metrics(¬if, "Direct"); Ok(ServerMessage::Notification(notif)) } + + /// Top level read of Push Notifications from storage + /// + /// Initializes the top level `check_storage` and `include_topic` flags and + /// runs `check_storage_loop` + pub(super) async fn check_storage(&mut self) -> Result, SMError> { + trace!("🗄️ WebPushClient::check_storage"); + self.flags.check_storage = true; + self.flags.include_topic = true; + self.check_storage_loop().await + } + + /// Loop the read of Push Notifications from storage + /// + /// Loops until any unexpired Push Notifications are read or there's no + /// more Notifications in storage + pub(super) async fn check_storage_loop(&mut self) -> Result, SMError> { + trace!("🗄️ WebPushClient::check_storage_loop"); + while self.flags.check_storage { + let smsgs = self.check_storage_advance().await?; + if !smsgs.is_empty() { + self.check_msg_limit().await?; + return Ok(smsgs); + } + } + // No more notifications (check_storage = false). Despite returning no + // notifs we may have advanced through expired timestamp messages and + // need to increment_storage to mark them as deleted + if self.flags.increment_storage { + debug!("🗄️ WebPushClient::check_storage_loop increment_storage"); + self.increment_storage().await?; + } + Ok(vec![]) + } + + /// Read a chunk (max count 10 returned) of Notifications from storage + /// + /// This filters out expired Notifications and may return an empty result + /// set when there's still pending Notifications to be read: so it should + /// be called in a loop to advance through all Notification records + async fn check_storage_advance(&mut self) -> Result, SMError> { + trace!("🗄️ WebPushClient::check_storage_advance"); + let CheckStorageResponse { + include_topic, + mut messages, + timestamp, + } = self.do_check_storage().await?; + + debug!( + "🗄️ WebPushClient::check_storage_advance \ + include_topic: {} -> {} \ + unacked_stored_highest: {:?} -> {:?}", + self.flags.include_topic, + include_topic, + self.ack_state.unacked_stored_highest, + timestamp + ); + self.flags.include_topic = include_topic; + self.ack_state.unacked_stored_highest = timestamp; + + if messages.is_empty() { + trace!("🗄️ WebPushClient::check_storage_advance finished"); + self.flags.check_storage = false; + self.sent_from_storage = 0; + return Ok(vec![]); + } + + // Filter out TTL expired messages + let now_sec = sec_since_epoch(); + messages.retain(|n| { + if !n.expired(now_sec) { + return true; + } + // TODO: A batch remove_messages would be nicer + if n.sortkey_timestamp.is_none() { + self.spawn_remove_message(n.sort_key()); + } + false + }); + + self.flags.increment_storage = !include_topic && timestamp.is_some(); + + if messages.is_empty() { + trace!("🗄️ WebPushClient::check_storage_advance empty response (filtered expired)"); + return Ok(vec![]); + } + + self.ack_state + .unacked_stored_notifs + .extend(messages.iter().cloned()); + let smsgs: Vec<_> = messages + .into_iter() + .inspect(|msg| { + trace!("🗄️ WebPushClient::check_storage_advance Sending stored"); + self.emit_send_metrics(msg, "Stored") + }) + .map(ServerMessage::Notification) + .collect(); + + let count = smsgs.len() as u32; + debug!( + "🗄️ WebPushClient::check_storage_advance: sent_from_storage: {}, +{}", + self.sent_from_storage, count + ); + self.sent_from_storage += count; + Ok(smsgs) + } + + /// Read a chunk (max count 10 returned) of Notifications from storage + /// + /// TODO: document topic vs timestamp messages + async fn do_check_storage(&self) -> Result { + trace!("🗄️ WebPushClient::do_check_storage"); + let timestamp = self.ack_state.unacked_stored_highest; + let topic_resp = if self.flags.include_topic { + trace!("🗄️ WebPushClient::do_check_storage: fetch_messages"); + self.app_state.db.fetch_messages(&self.uaid, 11).await? + } else { + Default::default() + }; + if !topic_resp.messages.is_empty() { + trace!( + "🗄️ WebPushClient::do_check_storage: Topic message returns: {:#?}", + topic_resp.messages + ); + self.app_state + .metrics + .count_with_tags( + "notification.message.retrieved", + topic_resp.messages.len() as i64, + ) + .with_tag("topic", "true") + .send(); + return Ok(CheckStorageResponse { + include_topic: true, + messages: topic_resp.messages, + timestamp: topic_resp.timestamp, + }); + } + + let timestamp = if self.flags.include_topic { + topic_resp.timestamp + } else { + timestamp + }; + trace!( + "🗄️ WebPushClient::do_check_storage: fetch_timestamp_messages timestamp: {:?}", + timestamp + ); + let timestamp_resp = self + .app_state + .db + .fetch_timestamp_messages(&self.uaid, timestamp, 10) + .await?; + if !timestamp_resp.messages.is_empty() { + trace!( + "🗄️ WebPushClient::do_check_storage: Timestamp message returns: {:#?}", + timestamp_resp.messages + ); + self.app_state + .metrics + .count_with_tags( + "notification.message.retrieved", + timestamp_resp.messages.len() as i64, + ) + .with_tag("topic", "false") + .send(); + } + + Ok(CheckStorageResponse { + include_topic: false, + messages: timestamp_resp.messages, + // If we didn't get a timestamp off the last query, use the + // original value if passed one + timestamp: timestamp_resp.timestamp.or(timestamp), + }) + } + + /// Update the user's last message read timestamp (for timestamp messages) + /// TODO: more docs + pub(super) async fn increment_storage(&mut self) -> Result<(), SMError> { + trace!( + "🗄️ WebPushClient::increment_storage: unacked_stored_highest: {:?}", + self.ack_state.unacked_stored_highest + ); + let Some(timestamp) = self.ack_state.unacked_stored_highest else { + return Err(SMError::Internal("increment_storage w/ no unacked_stored_highest".to_owned())); + }; + self.app_state + .db + .increment_storage(&self.uaid, timestamp) + .await?; + self.flags.increment_storage = false; + Ok(()) + } + + /// Ensure this user hasn't exceeded the maximum allowed number of messages + /// read from storage (`Settings::msg_limit`) + /// + /// Drops the user record and returns the `SMError::UaidReset` error if + /// they have + async fn check_msg_limit(&mut self) -> Result<(), SMError> { + trace!( + "WebPushClient::check_msg_limit: sent_from_storage: {} msg_limit: {}", + self.sent_from_storage, + self.app_state.settings.msg_limit + ); + if self.sent_from_storage > self.app_state.settings.msg_limit { + // Exceeded the max limit of stored messages: drop the user to + // trigger a re-register + self.app_state.db.remove_user(&self.uaid).await?; + return Err(SMError::UaidReset); + } + Ok(()) + } + + /// Spawn a background task to remove a message from storage + fn spawn_remove_message(&self, sort_key: String) { + let db = self.app_state.db.clone(); + let uaid = self.uaid; + rt::spawn(async move { + if db.remove_message(&uaid, &sort_key).await.is_ok() { + debug!( + "Deleted expired message without sortkey_timestamp, sort_key: {}", + sort_key + ); + } + }); + } + + /// Emit metrics for a Notification to be sent to the user + fn emit_send_metrics(&self, notif: &Notification, source: &'static str) { + let metrics = &self.app_state.metrics; + let ua_info = &self.ua_info; + metrics + .incr_with_tags("ua.notification.sent") + .with_tag("source", source) + .with_tag("topic", ¬if.topic.is_some().to_string()) + .with_tag("os", &ua_info.metrics_os) + // TODO: include `internal` if meta is set + .send(); + metrics + .count_with_tags( + "ua.message_data", + notif.data.as_ref().map_or(0, |data| data.len() as i64), + ) + .with_tag("source", source) + .with_tag("os", &ua_info.metrics_os) + .send(); + } } diff --git a/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/unidentified.rs b/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/unidentified.rs index ff8d31cd..e8d15c7a 100644 --- a/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/unidentified.rs +++ b/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/unidentified.rs @@ -1,14 +1,18 @@ use std::{fmt, sync::Arc}; use cadence::CountedExt; +use uuid::Uuid; use autoconnect_common::protocol::{ClientMessage, ServerMessage}; use autoconnect_settings::AppState; -use autopush_common::util::ms_since_epoch; +use autopush_common::{ + db::{error::DbResult, User}, + util::ms_since_epoch, +}; use crate::{ error::SMError, - identified::{ClientFlags, WebPushClient}, + identified::{process_existing_user, ClientFlags, WebPushClient}, }; /// Represents a Client waiting for (or yet to process) a Hello message @@ -32,6 +36,10 @@ impl UnidentifiedClient { UnidentifiedClient { ua, app_state } } + /// Handle a WebPush `ClientMessage` sent from the user agent over the + /// WebSocket for this user + /// + /// Anything but a Hello message is rejected as an Error pub async fn on_client_msg( self, msg: ClientMessage, @@ -47,39 +55,29 @@ impl UnidentifiedClient { r#"Expected messageType="hello", "use_webpush": true"#.to_owned() )); }; - trace!("👋UnidentifiedClient::on_client_msg Hello {:?}", uaid); + debug!( + "👋UnidentifiedClient::on_client_msg Hello from uaid?: {:?}", + uaid + ); - let connected_at = ms_since_epoch(); let uaid = uaid .as_deref() - .map(uuid::Uuid::try_parse) + .map(Uuid::try_parse) .transpose() .map_err(|_| SMError::InvalidMessage("Invalid uaid".to_owned()))?; - let defer_registration = uaid.is_none(); - let hello_response = self - .app_state - .db - .hello( - connected_at, - uaid.as_ref(), - &self.app_state.router_url, - defer_registration, - ) - .await?; - trace!( - "💬UnidentifiedClient::on_client_msg Hello! uaid: {:?} user_is_registered: {}", - hello_response.uaid, - hello_response.deferred_user_registration.is_none() + let GetOrCreateUser { + user, + existing_user, + flags, + } = self.get_or_create_user(uaid).await?; + let uaid = user.uaid; + debug!( + "💬UnidentifiedClient::on_client_msg Hello! uaid: {} existing_user: {}", + uaid, existing_user, ); - let Some(uaid) = hello_response.uaid else { - trace!("💬UnidentifiedClient::on_client_msg AlreadyConnected {:?}", hello_response.uaid); - return Err(SMError::AlreadyConnected); - }; - let _ = self.app_state.metrics.incr("ua.command.hello"); - let flags = ClientFlags::from_hello(&hello_response); // TODO: broadcasts //let desired_broadcasts = Broadcast::from_hasmap(broadcasts.unwrap_or_default()); //let (initialized_subs, broadcasts) = app_state.broadcast_init(&desired_broadcasts); @@ -87,8 +85,8 @@ impl UnidentifiedClient { uaid, self.ua, flags, - connected_at, - hello_response.deferred_user_registration, + user.connected_at, + (!existing_user).then_some(user), self.app_state, ) .await?; @@ -102,6 +100,58 @@ impl UnidentifiedClient { let smsgs = std::iter::once(smsg).chain(check_storage_smsgs); Ok((wpclient, smsgs)) } + + /// Lookup a User or return a new User record if the lookup failed + async fn get_or_create_user(&self, uaid: Option) -> DbResult { + trace!("❓UnidentifiedClient::get_or_create_user"); + let connected_at = ms_since_epoch(); + + if let Some(uaid) = uaid { + // NOTE: previously a user would be dropped when + // serde_dynamodb::from_hashmap failed (but this now occurs inside + // the db impl) + let maybe_user = self.app_state.db.get_user(&uaid).await?; + if let Some(mut user) = maybe_user { + let maybe_flags = process_existing_user(&self.app_state, &user).await?; + if let Some(flags) = maybe_flags { + user.node_id = Some(self.app_state.router_url.to_owned()); + user.connected_at = connected_at; + user.set_last_connect(); + self.app_state.db.update_user(&user).await?; + return Ok(GetOrCreateUser { + user, + existing_user: true, + flags, + }); + } + } + // NOTE: when the client's specified a uaid but get_user returns + // None (or process_existing_user dropped the user record due to it + // being invalid) we're now deferring the db.add_user call (a + // change from the previous state machine impl) + } + + // TODO: NOTE: A new User doesn't get a `set_last_connect()` (matching + // the previous impl) + let user = User { + current_month: Some(self.app_state.db.message_table().to_owned()), + node_id: Some(self.app_state.router_url.to_owned()), + connected_at, + ..Default::default() + }; + Ok(GetOrCreateUser { + user, + existing_user: false, + flags: Default::default(), + }) + } +} + +/// Result of a User lookup for a Hello message +struct GetOrCreateUser { + user: User, + existing_user: bool, + flags: ClientFlags, } #[cfg(test)] @@ -110,10 +160,9 @@ mod tests { use autoconnect_common::{ protocol::ClientMessage, - test_support::{hello_again_db, DUMMY_CHID, DUMMY_UAID, UA}, + test_support::{hello_again_db, hello_db, DUMMY_CHID, DUMMY_UAID, UA}, }; use autoconnect_settings::AppState; - use autopush_common::db::{mock::MockDbClient, HelloResponse, User}; use crate::error::SMError; @@ -129,7 +178,7 @@ mod tests { let result = client.on_client_msg(ClientMessage::Ping).await; assert!(matches!(result, Err(SMError::InvalidMessage(_)))); - let client = UnidentifiedClient::new(UA.to_owned(), Default::default()); + let client = uclient(Default::default()); let result = client .on_client_msg(ClientMessage::Register { channel_id: DUMMY_CHID.to_string(), @@ -156,22 +205,9 @@ mod tests { #[tokio::test] async fn hello_new_user() { - let mut db = MockDbClient::new(); - // Ensure no write to the db - db.expect_hello() - .withf(|_, _, _, defer_registration| defer_registration == &true) - .return_once(move |_, _, _, _| { - Ok(HelloResponse { - uaid: Some(DUMMY_UAID), - deferred_user_registration: Some(User { - uaid: DUMMY_UAID, - ..Default::default() - }), - ..Default::default() - }) - }); let client = uclient(AppState { - db: db.into_boxed_arc(), + // Simple hello_db ensures no writes to the db + db: hello_db().into_boxed_arc(), ..Default::default() }); let msg = ClientMessage::Hello { diff --git a/autoconnect/autoconnect-ws/src/error.rs b/autoconnect/autoconnect-ws/src/error.rs index 999afa2b..c5c13c14 100644 --- a/autoconnect/autoconnect-ws/src/error.rs +++ b/autoconnect/autoconnect-ws/src/error.rs @@ -1,11 +1,12 @@ use actix_ws::CloseCode; -use autoconnect_ws_sm::SMError; +use autoconnect_ws_sm::{SMError, WebPushClient}; +// TODO: WSError should likely include a backtrace /// WebPush WebSocket Handler Errors #[derive(Debug, strum::AsRefStr, thiserror::Error)] pub enum WSError { - #[error("State machine error: {0}")] + #[error("State error: {0}")] SM(#[from] SMError), #[error("Couldn't parse WebSocket message JSON: {0}")] @@ -28,9 +29,6 @@ pub enum WSError { #[error("ClientRegistry unexpectedly disconnected")] RegistryDisconnected, - - #[error("ClientRegistry disconnect unexpectedly failed (Client not connected)")] - RegistryNotConnected, } impl WSError { @@ -52,4 +50,39 @@ impl WSError { pub fn close_description(&self) -> &str { self.as_ref() } + + /// Whether this error is reported to sentry + pub fn is_sentry_event(&self) -> bool { + !matches!(self, WSError::Json(_)) + } + + /// Create a sentry Event from this Error with `WebPushClient` information + pub fn to_sentry_event(&self, client: &WebPushClient) -> sentry::protocol::Event<'static> { + let mut event = sentry::event_from_error(self); + // TODO: + //event.exception.last_mut().unwrap().stacktrace = + // sentry::integrations::backtrace::backtrace_to_stacktrace(&self.backtrace); + + event.user = Some(sentry::User { + id: Some(client.uaid.as_simple().to_string()), + ..Default::default() + }); + let ua_info = client.ua_info.clone(); + event + .tags + .insert("ua_name".to_owned(), ua_info.browser_name); + event + .tags + .insert("ua_os_family".to_owned(), ua_info.metrics_os); + event + .tags + .insert("ua_os_ver".to_owned(), ua_info.os_version); + event + .tags + .insert("ua_browser_family".to_owned(), ua_info.metrics_browser); + event + .tags + .insert("ua_browser_ver".to_owned(), ua_info.browser_version); + event + } } diff --git a/autoconnect/autoconnect-ws/src/handler.rs b/autoconnect/autoconnect-ws/src/handler.rs index 0cab1811..1bf5e6c4 100644 --- a/autoconnect/autoconnect-ws/src/handler.rs +++ b/autoconnect/autoconnect-ws/src/handler.rs @@ -29,6 +29,7 @@ pub fn spawn_webpush_ws( let close_reason = webpush_ws(client, &mut session, msg_stream) .await .unwrap_or_else(|e| { + error!("spawn_webpush_ws: Error: {}", e); Some(CloseReason { code: e.close_code(), description: Some(e.close_description().to_owned()), @@ -57,47 +58,38 @@ async fn webpush_ws( mut msg_stream: impl futures::Stream> + Unpin, ) -> Result, WSError> { + // NOTE: UnidentifiedClient doesn't require shutdown/cleanup, so its + // Error's propagated. We don't propagate Errors afterwards to handle + // shutdown/cleanup of WebPushClient let (mut client, smsgs) = unidentified_ws(client, &mut msg_stream).await?; // Client now identified: add them to the registry to recieve ServerNotifications - // TODO: should return a scopeguard to ensure disconnect() always called let mut snotif_stream = client .app_state .clients .connect(client.uaid, client.uid) .await; - // Then send their Hello response and any initial notifications from storage - for smsg in smsgs { - trace!( - "webpush_ws: New WebPushClient, ServerMessage -> session: {:#?}", - smsg - ); - // TODO: Ensure these added to "unacked_stored_notifs" - session - .text(smsg) - .await - // TODO: try! (?) here dictates a scopeguard/other recovery to - // cleanup the clients entry - .map_err(|_| WSError::StreamClosed)?; - } - - let result = identified_ws(&mut client, session, &mut msg_stream, &mut snotif_stream).await; + let result = identified_ws(&mut client, smsgs, session, msg_stream, &mut snotif_stream).await; - client + // Ignore disconnect Errors (Client wasn't connected) + let _ = client .app_state .clients .disconnect(&client.uaid, &client.uid) - .await - .map_err(|_| WSError::RegistryNotConnected)?; + .await; + snotif_stream.close(); while let Some(snotif) = snotif_stream.next().await { client.on_server_notif_shutdown(snotif); } - client.shutdown(); - - // TODO: report errors to sentry + client.shutdown(result.as_ref().err().map(|e| e.to_string())); + if let Err(ref e) = result { + if e.is_sentry_event() { + sentry::capture_event(e.to_sentry_event(&client)); + } + } result } @@ -119,7 +111,7 @@ async fn unidentified_ws( Ok(None) => return Err(WSError::StreamClosed), Err(_) => return Err(WSError::HandshakeTimeout), }; - trace!("unidentified_ws: Handshake msg: {:#?}", msg); + trace!("unidentified_ws: Handshake msg: {:?}", msg); let client_msg = match msg { Message::Text(ref bytestring) => bytestring.parse()?, @@ -135,11 +127,21 @@ async fn unidentified_ws( /// TODO: docs async fn identified_ws( client: &mut WebPushClient, + smsgs: impl IntoIterator, session: &mut impl Session, - msg_stream: &mut (impl futures::Stream> - + Unpin), + mut msg_stream: impl futures::Stream> + + Unpin, snotif_stream: &mut mpsc::UnboundedReceiver, ) -> Result, WSError> { + // Send the Hello response and any initial notifications from storage + for smsg in smsgs { + trace!( + "identified_ws: New WebPushClient, ServerMessage -> session: {:#?}", + smsg + ); + session.text(smsg).await?; + } + let mut ping_interval = interval(client.app_state.settings.auto_ping_interval); ping_interval.tick().await; let close_reason = loop { @@ -170,10 +172,7 @@ async fn identified_ws( }; for smsg in client.on_client_msg(client_msg).await? { trace!("identified_ws: msg_stream, ServerMessage -> session {:#?}", smsg); - session - .text(smsg) - .await - .map_err(|_| WSError::StreamClosed)?; + session.text(smsg).await?; } }, @@ -184,10 +183,7 @@ async fn identified_ws( }; for smsg in client.on_server_notif(snotif).await? { trace!("identified_ws: snotif_stream, ServerMessage -> session {:#?}", smsg); - session - .text(smsg) - .await - .map_err(|_| WSError::StreamClosed)?; + session.text(smsg).await?; } } diff --git a/autoconnect/src/main.rs b/autoconnect/src/main.rs index 687ae547..6a2b4f91 100644 --- a/autoconnect/src/main.rs +++ b/autoconnect/src/main.rs @@ -15,7 +15,10 @@ use std::sync::RwLock; use autoconnect_settings::{AppState, Settings}; use autoconnect_web::{build_app, client::ClientChannels, config}; -use autopush_common::errors::{ApcErrorKind, Result}; +use autopush_common::{ + errors::{ApcErrorKind, Result}, + logging, +}; mod middleware; @@ -52,6 +55,8 @@ async fn main() -> Result<()> { } let settings = Settings::with_env_and_config_files(&filenames).map_err(ApcErrorKind::ConfigError)?; + logging::init_logging(!settings.human_logs).expect("Logging failed to initialize"); + debug!("Starting up..."); //TODO: Eventually this will match between the various storage engines that // we support. For now, it's just the one, DynamoDB. @@ -81,11 +86,15 @@ async fn main() -> Result<()> { }); let port = settings.port; + let router_port = settings.router_port; let app_state = AppState::from_settings(settings)?; let _client_channels: ClientChannels = Arc::new(RwLock::new(HashMap::new())); - info!("Starting autoconnect on port {:?}", port); - let srv = HttpServer::new(move || { + info!( + "Starting autoconnect on port {} (router_port: {})", + port, router_port + ); + HttpServer::new(move || { let app = build_app!(app_state); // TODO: should live in build_app! app.wrap(crate::middleware::sentry::SentryWrapper::new( @@ -94,10 +103,11 @@ async fn main() -> Result<()> { )) }) .bind(("0.0.0.0", port))? - .run(); - - info!("Server starting, port: {}", port); - srv.await.map_err(|e| e.into()).map(|v| { + .bind(("0.0.0.0", router_port))? + .run() + .await + .map_err(|e| e.into()) + .map(|v| { info!("Shutting down autoconnect"); v }) diff --git a/autopush-common/Cargo.toml b/autopush-common/Cargo.toml index 34ed1289..d3f3730e 100644 --- a/autopush-common/Cargo.toml +++ b/autopush-common/Cargo.toml @@ -23,6 +23,7 @@ httparse.workspace = true hyper.workspace = true lazy_static.workspace = true log.workspace = true +mockall.workspace = true openssl.workspace = true rand.workspace = true regex.workspace = true @@ -57,7 +58,6 @@ async-trait = "0.1" futures-backoff = "0.1.0" mozsvc-common = "0.2" woothee = "0.13" -mockall = "0.8.3" # 0.9+ requires reworking tests [dev-dependencies] mockito = "0.31" diff --git a/autopush-common/src/db/client.rs b/autopush-common/src/db/client.rs index a1023ee1..9d2e33cf 100644 --- a/autopush-common/src/db/client.rs +++ b/autopush-common/src/db/client.rs @@ -61,6 +61,9 @@ pub trait DbClient: Send + Sync { /// Save a message to the message table async fn save_message(&self, uaid: &Uuid, message: Notification) -> DbResult<()>; + /// Save multiple messages to the message table + async fn save_messages(&self, uaid: &Uuid, messages: Vec) -> DbResult<()>; + /// Fetch stored messages for a user async fn fetch_messages(&self, uaid: &Uuid, limit: usize) -> DbResult; @@ -72,6 +75,9 @@ pub trait DbClient: Send + Sync { limit: usize, ) -> DbResult; + /// Update the last read timestamp for a user + async fn increment_storage(&self, uaid: &Uuid, timestamp: u64) -> DbResult<()>; + /// Delete a notification async fn remove_message(&self, uaid: &Uuid, sort_key: &str) -> DbResult<()>; diff --git a/autopush-common/src/db/dynamodb/mod.rs b/autopush-common/src/db/dynamodb/mod.rs index a2ac9d6b..4b1bb784 100644 --- a/autopush-common/src/db/dynamodb/mod.rs +++ b/autopush-common/src/db/dynamodb/mod.rs @@ -6,8 +6,9 @@ use std::sync::Arc; use crate::db::client::DbClient; use crate::db::dynamodb::retry::{ - retry_policy, retryable_delete_error, retryable_describe_table_error, retryable_getitem_error, - retryable_putitem_error, retryable_updateitem_error, + retry_policy, retryable_batchwriteitem_error, retryable_delete_error, + retryable_describe_table_error, retryable_getitem_error, retryable_putitem_error, + retryable_updateitem_error, }; use crate::db::error::{DbError, DbResult}; use crate::db::{ @@ -17,14 +18,14 @@ use crate::notification::Notification; use crate::util::sec_since_epoch; use async_trait::async_trait; -// use crate::db::dynamodb::{ddb_item, hashmap, val}; use cadence::{CountedExt, StatsdClient}; use chrono::Utc; use rusoto_core::credential::StaticProvider; use rusoto_core::{HttpClient, Region, RusotoError}; use rusoto_dynamodb::{ - AttributeValue, DeleteItemInput, DescribeTableError, DescribeTableInput, DynamoDb, - DynamoDbClient, GetItemInput, PutItemInput, QueryInput, UpdateItemInput, + AttributeValue, BatchWriteItemInput, DeleteItemInput, DescribeTableError, DescribeTableInput, + DynamoDb, DynamoDbClient, GetItemInput, PutItemInput, PutRequest, QueryInput, UpdateItemInput, + WriteRequest, }; use serde::{Deserialize, Serialize}; use uuid::Uuid; @@ -510,6 +511,35 @@ impl DbClient for DdbClientImpl { }) } + async fn increment_storage(&self, uaid: &Uuid, timestamp: u64) -> DbResult<()> { + let expiry = sec_since_epoch() + 2 * MAX_EXPIRY; + let attr_values = hashmap! { + ":timestamp".to_string() => val!(N => timestamp.to_string()), + ":expiry".to_string() => val!(N => expiry), + }; + let update_input = UpdateItemInput { + key: ddb_item! { + uaid: s => uaid.as_simple().to_string(), + chidmessageid: s => " ".to_string() + }, + update_expression: Some( + "SET current_timestamp = :timestamp, expiry = :expiry".to_string(), + ), + expression_attribute_values: Some(attr_values), + table_name: self.settings.message_table.clone(), + ..Default::default() + }; + + retry_policy() + .retry_if( + || self.db_client.update_item(update_input.clone()), + retryable_updateitem_error(self.metrics.clone()), + ) + .await?; + + Ok(()) + } + async fn save_message(&self, uaid: &Uuid, message: Notification) -> DbResult<()> { let topic = message.topic.is_some().to_string(); let input = PutItemInput { @@ -532,6 +562,37 @@ impl DbClient for DdbClientImpl { Ok(()) } + async fn save_messages(&self, uaid: &Uuid, messages: Vec) -> DbResult<()> { + let put_items: Vec = messages + .into_iter() + .filter_map(|n| { + // eventually include `internal` if `meta` defined. + self.metrics + .incr_with_tags("notification.message.stored") + .with_tag("topic", &n.topic.is_some().to_string()) + .send(); + serde_dynamodb::to_hashmap(&NotificationRecord::from_notif(uaid, n)) + .ok() + .map(|hm| WriteRequest { + put_request: Some(PutRequest { item: hm }), + delete_request: None, + }) + }) + .collect(); + let batch_input = BatchWriteItemInput { + request_items: hashmap! { self.settings.message_table.clone() => put_items }, + ..Default::default() + }; + + retry_policy() + .retry_if( + || self.db_client.batch_write_item(batch_input.clone()), + retryable_batchwriteitem_error(self.metrics.clone()), + ) + .await?; + Ok(()) + } + async fn remove_message(&self, uaid: &Uuid, sort_key: &str) -> DbResult<()> { let input = DeleteItemInput { table_name: self.settings.message_table.clone(), @@ -676,7 +737,7 @@ impl DbClient for DdbClientImpl { } /// Indicate whether this last_connect falls in the current month -fn has_connected_this_month(user: &User) -> bool { +pub(crate) fn has_connected_this_month(user: &User) -> bool { user.last_connect.map_or(false, |v| { let pat = Utc::now().format("%Y%m").to_string(); v.to_string().starts_with(&pat) diff --git a/autopush-common/src/db/dynamodb/retry.rs b/autopush-common/src/db/dynamodb/retry.rs index 63b1d162..2dd0f92d 100644 --- a/autopush-common/src/db/dynamodb/retry.rs +++ b/autopush-common/src/db/dynamodb/retry.rs @@ -2,7 +2,8 @@ use again::RetryPolicy; use cadence::{CountedExt, StatsdClient}; use rusoto_core::RusotoError; use rusoto_dynamodb::{ - DeleteItemError, DescribeTableError, GetItemError, PutItemError, UpdateItemError, + BatchWriteItemError, DeleteItemError, DescribeTableError, GetItemError, PutItemError, + UpdateItemError, }; use std::sync::Arc; @@ -30,6 +31,11 @@ retryable_error!(retryable_getitem_error, GetItemError, "get_item"); retryable_error!(retryable_updateitem_error, UpdateItemError, "update_item"); retryable_error!(retryable_putitem_error, PutItemError, "put_item"); retryable_error!(retryable_delete_error, DeleteItemError, "delete_item"); +retryable_error!( + retryable_batchwriteitem_error, + BatchWriteItemError, + "batch_write_item" +); // DescribeTableError does not have a ProvisionedThroughputExceeded variant pub fn retryable_describe_table_error( diff --git a/autopush-common/src/db/error.rs b/autopush-common/src/db/error.rs index 1d0caacb..97e78735 100644 --- a/autopush-common/src/db/error.rs +++ b/autopush-common/src/db/error.rs @@ -1,6 +1,7 @@ use rusoto_core::RusotoError; use rusoto_dynamodb::{ - DeleteItemError, DescribeTableError, GetItemError, PutItemError, QueryError, UpdateItemError, + BatchWriteItemError, DeleteItemError, DescribeTableError, GetItemError, PutItemError, + QueryError, UpdateItemError, }; use thiserror::Error; @@ -20,6 +21,9 @@ pub enum DbError { #[error("Database error while performing DeleteItem")] DdbDeleteItem(#[from] RusotoError), + #[error("Database error while performing BatchWriteItem")] + DdbBatchWriteItem(#[from] RusotoError), + #[error("Database error while performing DescribeTable")] DdbDescribeTable(#[from] RusotoError), diff --git a/autopush-common/src/db/mock.rs b/autopush-common/src/db/mock.rs index ad782bcf..edea67c3 100644 --- a/autopush-common/src/db/mock.rs +++ b/autopush-common/src/db/mock.rs @@ -38,6 +38,12 @@ mockall::mock! { fn save_message(&self, uaid: &Uuid, message: Notification) -> DbResult<()>; + fn save_messages( + &self, + uaid: &Uuid, + messages: Vec, + ) -> DbResult<()>; + fn fetch_messages(&self, uaid: &Uuid, limit: usize) -> DbResult; fn fetch_timestamp_messages( @@ -47,6 +53,8 @@ mockall::mock! { limit: usize, ) -> DbResult; + fn increment_storage(&self, uaid: &Uuid, timestamp: u64) -> DbResult<()>; + fn remove_message(&self, uaid: &Uuid, sort_key: &str) -> DbResult<()>; fn hello(&self, connected_at: u64, uaid: Option, router_url: &str, defer_registration: bool) -> DbResult; @@ -113,6 +121,10 @@ impl DbClient for Arc { Arc::as_ref(self).save_message(uaid, message) } + async fn save_messages(&self, uaid: &Uuid, messages: Vec) -> DbResult<()> { + Arc::as_ref(self).save_messages(uaid, messages) + } + async fn fetch_messages(&self, uaid: &Uuid, limit: usize) -> DbResult { Arc::as_ref(self).fetch_messages(uaid, limit) } @@ -126,6 +138,10 @@ impl DbClient for Arc { Arc::as_ref(self).fetch_timestamp_messages(uaid, timestamp, limit) } + async fn increment_storage(&self, uaid: &Uuid, timestamp: u64) -> DbResult<()> { + Arc::as_ref(self).increment_storage(uaid, timestamp) + } + async fn remove_message(&self, uaid: &Uuid, sort_key: &str) -> DbResult<()> { Arc::as_ref(self).remove_message(uaid, sort_key) } diff --git a/autopush-common/src/db/mod.rs b/autopush-common/src/db/mod.rs index 7d877943..0de181ad 100644 --- a/autopush-common/src/db/mod.rs +++ b/autopush-common/src/db/mod.rs @@ -19,7 +19,7 @@ use serde::Serializer; use serde_derive::{Deserialize, Serialize}; use uuid::Uuid; -use crate::db::util::generate_last_connect; +use crate::db::{dynamodb::has_connected_this_month, util::generate_last_connect}; pub mod client; pub mod dynamodb; @@ -38,7 +38,7 @@ use crate::util::timing::{ms_since_epoch, sec_since_epoch}; use models::{NotificationHeaders, RangeKey}; const MAX_EXPIRY: u64 = 2_592_000; -const USER_RECORD_VERSION: u8 = 1; +pub const USER_RECORD_VERSION: u8 = 1; /// The maximum TTL for channels, 30 days pub const MAX_CHANNEL_TTL: u64 = 30 * 24 * 60 * 60; @@ -287,6 +287,17 @@ impl Default for User { } } +impl User { + pub fn set_last_connect(&mut self) { + self.last_connect = if has_connected_this_month(self) { + None + } else { + Some(generate_last_connect()) + } + } +} + +/// TODO: Accurate? This is the record in the Db. /// The outbound message record. /// This is different that the stored `Notification` #[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)] diff --git a/tests/test_integration_all_rust.py b/tests/test_integration_all_rust.py index b53dac84..39521f73 100644 --- a/tests/test_integration_all_rust.py +++ b/tests/test_integration_all_rust.py @@ -66,6 +66,11 @@ MP_CONNECTION_PORT = 9052 MP_ROUTER_PORT = 9072 +CONNECTION_BINARY = os.environ.get("CONNECTION_BINARY", "autopush_rs") +CONNECTION_SETTINGS_PREFIX = os.environ.get( + "CONNECTION_SETTINGS_PREFIX", "autopush__" +) + CN_SERVER: Optional[subprocess.Popen] = None CN_MP_SERVER: Optional[subprocess.Popen] = None EP_SERVER: Optional[subprocess.Popen] = None @@ -101,7 +106,6 @@ def get_free_port() -> int: endpoint_port=ENDPOINT_PORT, router_port=ROUTER_PORT, endpoint_scheme="http", - statsd_host="", router_tablename=ROUTER_TABLE, message_tablename=MESSAGE_TABLE, crypto_key="[{}]".format(CRYPTO_KEY), @@ -111,6 +115,15 @@ def get_free_port() -> int: max_connections=5000, human_logs="true", msg_limit=MSG_LIMIT, + # new autoconnect + db_dsn="http://127.0.0.1:8000", + db_settings=json.dumps( + dict( + router_table=ROUTER_TABLE, + message_table=MESSAGE_TABLE, + current_message_month=MESSAGE_TABLE, + ) + ), ) """Connection Megaphone Config: @@ -240,7 +253,6 @@ def register(self, chid: Optional[str] = None, key=None, status=200): self.channels[chid] = result["pushEndpoint"] return result - def unregister(self, chid): msg = json.dumps(dict(messageType="unregister", channelID=chid)) log.debug("Send: %s", msg) @@ -249,7 +261,6 @@ def unregister(self, chid): log.debug("Recv: %s", result) return result - def delete_notification(self, channel, message=None, status=204): messages = self.messages[channel] if not message: @@ -362,7 +373,6 @@ def get_broadcast(self, timeout=1): # pragma: nocover finally: self.ws.settimeout(orig_timeout) - def ping(self): log.debug("Send: %s", "{}") self.ws.send("{}") @@ -456,7 +466,7 @@ def process_logs(testcase): conn_count = sum(queue.qsize() for queue in CN_QUEUES) endpoint_count = sum(queue.qsize() for queue in EP_QUEUES) - print_lines_in_queues(CN_QUEUES, "AUTOPUSH: ") + print_lines_in_queues(CN_QUEUES, f"{CONNECTION_BINARY.upper()}: ") print_lines_in_queues(EP_QUEUES, "AUTOENDPOINT: ") if not STRICT_LOG_COUNTS: @@ -593,7 +603,7 @@ def setup_mock_server(): MOCK_SERVER_THREAD = Thread( target=app.run, kwargs=dict(port=MOCK_SERVER_PORT, debug=True) ) - MOCK_SERVER_THREAD.setDaemon(True) + MOCK_SERVER_THREAD.daemon = True MOCK_SERVER_THREAD.start() # Sentry API mock @@ -615,10 +625,10 @@ def setup_connection_server(connection_binary): CONNECTION_CONFIG["hostname"] = parsed.hostname CONNECTION_CONFIG["port"] = parsed.port CONNECTION_CONFIG["endpoint_scheme"] = parsed.scheme - write_config_to_env(CONNECTION_CONFIG, "autopush__") + write_config_to_env(CONNECTION_CONFIG, CONNECTION_SETTINGS_PREFIX) return else: - write_config_to_env(CONNECTION_CONFIG, "autopush__") + write_config_to_env(CONNECTION_CONFIG, CONNECTION_SETTINGS_PREFIX) cmd = [connection_binary] CN_SERVER = subprocess.Popen( cmd, @@ -648,10 +658,10 @@ def setup_megaphone_server(connection_binary): if url is not None: parsed = urlparse(url) MEGAPHONE_CONFIG["endpoint_port"] = parsed.port - write_config_to_env(MEGAPHONE_CONFIG, "autopush__") + write_config_to_env(MEGAPHONE_CONFIG, CONNECTION_SETTINGS_PREFIX) return else: - write_config_to_env(MEGAPHONE_CONFIG, "autopush__") + write_config_to_env(MEGAPHONE_CONFIG, CONNECTION_SETTINGS_PREFIX) cmd = [connection_binary] CN_MP_SERVER = subprocess.Popen(cmd, shell=True, env=os.environ) @@ -708,7 +718,7 @@ def setup_module(): setup_mock_server() os.environ["RUST_LOG"] = RUST_LOG - connection_binary = get_rust_binary_path("autopush_rs") + connection_binary = get_rust_binary_path(CONNECTION_BINARY) setup_connection_server(connection_binary) setup_megaphone_server(connection_binary) setup_endpoint_server()