diff --git a/src/client.rs b/src/client.rs index 655385842..849f50320 100644 --- a/src/client.rs +++ b/src/client.rs @@ -344,7 +344,6 @@ where &connected_at, uaid.as_ref(), &data.srv.opts.router_url, - &data.srv.metrics, )); transition!(AwaitProcessHello { response, diff --git a/src/db/commands.rs b/src/db/commands.rs index 609cfe8c7..ca8807ee5 100644 --- a/src/db/commands.rs +++ b/src/db/commands.rs @@ -1,4 +1,5 @@ use std::collections::HashSet; +use std::fmt::Display; use std::rc::Rc; use std::result::Result as StdResult; use uuid::Uuid; @@ -49,6 +50,7 @@ pub fn list_tables( pub fn fetch_messages( ddb: Rc>, + metrics: &Rc, table_name: &str, uaid: &Uuid, limit: u32, @@ -66,19 +68,22 @@ pub fn fetch_messages( ..Default::default() }; + let metrics = Rc::clone(metrics); let cond = |err: &QueryError| matches!(err, &QueryError::ProvisionedThroughputExceeded(_)); retry_if(move || ddb.query(&input), cond) .chain_err(|| "Error fetching messages") - .and_then(|output| { + .and_then(move |output| { let mut notifs: Vec = output.items.map_or_else(Vec::new, |items| { debug!("Got response of: {:?}", items); - // TODO: Capture translation errors and report them as we shouldn't - // have corrupt data items .into_iter() .inspect(|i| debug!("Item: {:?}", i)) - .filter_map(|item| serde_dynamodb::from_hashmap(item).ok()) + .filter_map(|item| { + ok_or_inspect(serde_dynamodb::from_hashmap(item), |e| { + conversion_err(&metrics, e, "serde_dynamodb_from_hashmap") + }) + }) .collect() }); if notifs.is_empty() { @@ -89,10 +94,13 @@ pub fn fetch_messages( // the first DynamoDbNotification and remove it from the vec. let timestamp = notifs.remove(0).current_timestamp; // Convert any remaining DynamoDbNotifications to Notification's - // TODO: Capture translation errors and report them as we shouldn't have corrupt data let messages = notifs .into_iter() - .filter_map(|ddb_notif| ddb_notif.into_notif().ok()) + .filter_map(|ddb_notif| { + ok_or_inspect(ddb_notif.into_notif(), |e| { + conversion_err(&metrics, e, "into_notif") + }) + }) .collect(); Ok(FetchMessageResponse { timestamp, @@ -103,6 +111,7 @@ pub fn fetch_messages( pub fn fetch_timestamp_messages( ddb: Rc>, + metrics: &Rc, table_name: &str, uaid: &Uuid, timestamp: Option, @@ -126,17 +135,25 @@ pub fn fetch_timestamp_messages( ..Default::default() }; + let metrics = Rc::clone(metrics); let cond = |err: &QueryError| matches!(err, &QueryError::ProvisionedThroughputExceeded(_)); retry_if(move || ddb.query(&input), cond) .chain_err(|| "Error fetching messages") - .and_then(|output| { + .and_then(move |output| { let messages = output.items.map_or_else(Vec::new, |items| { debug!("Got response of: {:?}", items); - // TODO: Capture translation errors and report them as we shouldn't have corrupt data items .into_iter() - .filter_map(|item| serde_dynamodb::from_hashmap(item).ok()) - .filter_map(|ddb_notif: DynamoDbNotification| ddb_notif.into_notif().ok()) + .filter_map(|item| { + ok_or_inspect(serde_dynamodb::from_hashmap(item), |e| { + conversion_err(&metrics, e, "serde_dynamodb_from_hashmap") + }) + }) + .filter_map(|ddb_notif: DynamoDbNotification| { + ok_or_inspect(ddb_notif.into_notif(), |e| { + conversion_err(&metrics, e, "into_notif") + }) + }) .collect() }); let timestamp = messages.iter().filter_map(|m| m.sortkey_timestamp).max(); @@ -334,13 +351,13 @@ pub fn unregister_channel_id( pub fn lookup_user( ddb: Rc>, + metrics: &Rc, uaid: &Uuid, connected_at: &u64, router_url: &str, router_table_name: &str, message_table_names: &[String], current_message_month: &str, - metrics: &StatsdClient, ) -> MyFuture<(HelloResponse, Option)> { let response = get_uaid(ddb.clone(), uaid, router_table_name); // Prep all these for the move into the static closure capture @@ -350,7 +367,7 @@ pub fn lookup_user( let messages_tables = message_table_names.to_vec(); let connected_at = *connected_at; let router_url = router_url.to_string(); - let metrics = metrics.clone(); + let metrics = Rc::clone(metrics); let response = response.and_then(move |data| -> MyFuture<_> { let mut hello_response = HelloResponse { message_month: cur_month.clone(), @@ -417,3 +434,30 @@ fn handle_user_result( user.connected_at = connected_at; Ok(user) } + +/// Like Result::ok, convert from Result to Option but applying a +/// function to the Err value +fn ok_or_inspect(result: StdResult, op: F) -> Option +where + F: FnOnce(E), +{ + match result { + Ok(t) => Some(t), + Err(e) => { + op(e); + None + } + } +} + +/// Log/metric errors during conversions to Notification +fn conversion_err(metrics: &StatsdClient, err: E, name: &'static str) +where + E: Display, +{ + error!("Failed {} conversion: {}", name, err); + metrics + .incr_with_tags("ua.notification_read.error") + .with_tag("conversion", name) + .send(); +} diff --git a/src/db/mod.rs b/src/db/mod.rs index 1f2f88fff..59910b289 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -62,13 +62,14 @@ pub enum RegisterResponse { pub struct DynamoStorage { ddb: Rc>, + metrics: Rc, router_table_name: String, message_table_names: Vec, pub current_message_month: String, } impl DynamoStorage { - pub fn from_opts(opts: &ServerOptions) -> Result { + pub fn from_opts(opts: &ServerOptions, metrics: StatsdClient) -> Result { let ddb: Box = if let Ok(endpoint) = env::var("AWS_LOCAL_DYNAMODB") { Box::new(DynamoDbClient::new( RequestDispatcher::default(), @@ -83,7 +84,7 @@ impl DynamoStorage { }; let ddb = Rc::new(ddb); - let mut message_table_names = list_message_tables(ddb.clone(), &opts._message_table_name) + let mut message_table_names = list_message_tables(&ddb, &opts._message_table_name) .map_err(|_| "Failed to locate message tables")?; message_table_names.sort_unstable(); let current_message_month = message_table_names @@ -93,6 +94,7 @@ impl DynamoStorage { Ok(Self { ddb, + metrics: Rc::new(metrics), router_table_name: opts._router_table_name.clone(), message_table_names, current_message_month, @@ -135,18 +137,17 @@ impl DynamoStorage { connected_at: &u64, uaid: Option<&Uuid>, router_url: &str, - metrics: &StatsdClient, ) -> impl Future { let response: MyFuture<(HelloResponse, Option)> = if let Some(uaid) = uaid { commands::lookup_user( self.ddb.clone(), + &self.metrics, &uaid, connected_at, router_url, &self.router_table_name, &self.message_table_names, &self.current_message_month, - metrics, ) } else { Box::new(future::ok(( @@ -339,6 +340,7 @@ impl DynamoStorage { let response: MyFuture = if include_topic { Box::new(commands::fetch_messages( self.ddb.clone(), + &self.metrics, table_name, uaid, 11 as u32, @@ -349,6 +351,7 @@ impl DynamoStorage { let uaid = *uaid; let table_name = table_name.to_string(); let ddb = self.ddb.clone(); + let metrics = Rc::clone(&self.metrics); response.and_then(move |resp| -> MyFuture<_> { // Return now from this future if we have messages @@ -370,6 +373,7 @@ impl DynamoStorage { if resp.messages.is_empty() || resp.timestamp.is_some() { Box::new(commands::fetch_timestamp_messages( ddb, + &metrics, table_name.as_ref(), &uaid, timestamp, @@ -394,11 +398,11 @@ impl DynamoStorage { } } -pub fn list_message_tables(ddb: Rc>, prefix: &str) -> Result> { +pub fn list_message_tables(ddb: &Rc>, prefix: &str) -> Result> { let mut names: Vec = Vec::new(); let mut start_key = None; loop { - let result = commands::list_tables(ddb.clone(), start_key).wait()?; + let result = commands::list_tables(Rc::clone(ddb), start_key).wait()?; start_key = result.last_evaluated_table_name; if let Some(table_names) = result.table_names { names.extend(table_names); diff --git a/src/server/mod.rs b/src/server/mod.rs index a21070bcf..db82a4989 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -301,15 +301,17 @@ impl Server { } else { BroadcastChangeTracker::new(Vec::new()) }; + let metrics = metrics_from_opts(opts)?; + let srv = Rc::new(Server { opts: opts.clone(), broadcaster: RefCell::new(broadcaster), - ddb: DynamoStorage::from_opts(opts)?, + ddb: DynamoStorage::from_opts(opts, metrics.clone())?, uaids: RefCell::new(HashMap::new()), open_connections: Cell::new(0), handle: core.handle(), tls_acceptor: tls::configure(opts), - metrics: metrics_from_opts(opts)?, + metrics, }); let addr = SocketAddr::from(([0, 0, 0, 0], srv.opts.port)); let ws_listener = TcpListener::bind(&addr, &srv.handle)?;