Skip to content

Commit

Permalink
feat: emit metrics for any failed notif conversions
Browse files Browse the repository at this point in the history
Closes #33
  • Loading branch information
pjenvey committed Jul 20, 2018
1 parent d9d940a commit ed16b64
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 21 deletions.
1 change: 0 additions & 1 deletion src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,6 @@ where
&connected_at,
uaid.as_ref(),
&data.srv.opts.router_url,
&data.srv.metrics,
));
transition!(AwaitProcessHello {
response,
Expand Down
68 changes: 56 additions & 12 deletions src/db/commands.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::HashSet;
use std::fmt::Display;
use std::rc::Rc;
use std::result::Result as StdResult;
use uuid::Uuid;
Expand Down Expand Up @@ -49,6 +50,7 @@ pub fn list_tables(

pub fn fetch_messages(
ddb: Rc<Box<DynamoDb>>,
metrics: &Rc<StatsdClient>,
table_name: &str,
uaid: &Uuid,
limit: u32,
Expand All @@ -66,19 +68,22 @@ pub fn fetch_messages(
..Default::default()
};

let metrics = Rc::clone(metrics);
let cond = |err: &QueryError| matches!(err, &QueryError::ProvisionedThroughputExceeded(_));
retry_if(move || ddb.query(&input), cond)
.chain_err(|| "Error fetching messages")
.and_then(|output| {
.and_then(move |output| {
let mut notifs: Vec<DynamoDbNotification> =
output.items.map_or_else(Vec::new, |items| {
debug!("Got response of: {:?}", items);
// TODO: Capture translation errors and report them as we shouldn't
// have corrupt data
items
.into_iter()
.inspect(|i| debug!("Item: {:?}", i))
.filter_map(|item| serde_dynamodb::from_hashmap(item).ok())
.filter_map(|item| {
ok_or_inspect(serde_dynamodb::from_hashmap(item), |e| {
conversion_err(&metrics, e, "serde_dynamodb_from_hashmap")
})
})
.collect()
});
if notifs.is_empty() {
Expand All @@ -89,10 +94,13 @@ pub fn fetch_messages(
// the first DynamoDbNotification and remove it from the vec.
let timestamp = notifs.remove(0).current_timestamp;
// Convert any remaining DynamoDbNotifications to Notification's
// TODO: Capture translation errors and report them as we shouldn't have corrupt data
let messages = notifs
.into_iter()
.filter_map(|ddb_notif| ddb_notif.into_notif().ok())
.filter_map(|ddb_notif| {
ok_or_inspect(ddb_notif.into_notif(), |e| {
conversion_err(&metrics, e, "into_notif")
})
})
.collect();
Ok(FetchMessageResponse {
timestamp,
Expand All @@ -103,6 +111,7 @@ pub fn fetch_messages(

pub fn fetch_timestamp_messages(
ddb: Rc<Box<DynamoDb>>,
metrics: &Rc<StatsdClient>,
table_name: &str,
uaid: &Uuid,
timestamp: Option<u64>,
Expand All @@ -126,17 +135,25 @@ pub fn fetch_timestamp_messages(
..Default::default()
};

let metrics = Rc::clone(metrics);
let cond = |err: &QueryError| matches!(err, &QueryError::ProvisionedThroughputExceeded(_));
retry_if(move || ddb.query(&input), cond)
.chain_err(|| "Error fetching messages")
.and_then(|output| {
.and_then(move |output| {
let messages = output.items.map_or_else(Vec::new, |items| {
debug!("Got response of: {:?}", items);
// TODO: Capture translation errors and report them as we shouldn't have corrupt data
items
.into_iter()
.filter_map(|item| serde_dynamodb::from_hashmap(item).ok())
.filter_map(|ddb_notif: DynamoDbNotification| ddb_notif.into_notif().ok())
.filter_map(|item| {
ok_or_inspect(serde_dynamodb::from_hashmap(item), |e| {
conversion_err(&metrics, e, "serde_dynamodb_from_hashmap")
})
})
.filter_map(|ddb_notif: DynamoDbNotification| {
ok_or_inspect(ddb_notif.into_notif(), |e| {
conversion_err(&metrics, e, "into_notif")
})
})
.collect()
});
let timestamp = messages.iter().filter_map(|m| m.sortkey_timestamp).max();
Expand Down Expand Up @@ -334,13 +351,13 @@ pub fn unregister_channel_id(

pub fn lookup_user(
ddb: Rc<Box<DynamoDb>>,
metrics: &Rc<StatsdClient>,
uaid: &Uuid,
connected_at: &u64,
router_url: &str,
router_table_name: &str,
message_table_names: &[String],
current_message_month: &str,
metrics: &StatsdClient,
) -> MyFuture<(HelloResponse, Option<DynamoDbUser>)> {
let response = get_uaid(ddb.clone(), uaid, router_table_name);
// Prep all these for the move into the static closure capture
Expand All @@ -350,7 +367,7 @@ pub fn lookup_user(
let messages_tables = message_table_names.to_vec();
let connected_at = *connected_at;
let router_url = router_url.to_string();
let metrics = metrics.clone();
let metrics = Rc::clone(metrics);
let response = response.and_then(move |data| -> MyFuture<_> {
let mut hello_response = HelloResponse {
message_month: cur_month.clone(),
Expand Down Expand Up @@ -417,3 +434,30 @@ fn handle_user_result(
user.connected_at = connected_at;
Ok(user)
}

/// Like Result::ok, convert from Result<T, E> to Option<T> but applying a
/// function to the Err value
fn ok_or_inspect<T, E, F>(result: StdResult<T, E>, op: F) -> Option<T>
where
F: FnOnce(E),
{
match result {
Ok(t) => Some(t),
Err(e) => {
op(e);
None
}
}
}

/// Log/metric errors during conversions to Notification
fn conversion_err<E>(metrics: &StatsdClient, err: E, name: &'static str)
where
E: Display,
{
error!("Failed {} conversion: {}", name, err);
metrics
.incr_with_tags("ua.notification_read.error")
.with_tag("conversion", name)
.send();
}
16 changes: 10 additions & 6 deletions src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,14 @@ pub enum RegisterResponse {

pub struct DynamoStorage {
ddb: Rc<Box<DynamoDb>>,
metrics: Rc<StatsdClient>,
router_table_name: String,
message_table_names: Vec<String>,
pub current_message_month: String,
}

impl DynamoStorage {
pub fn from_opts(opts: &ServerOptions) -> Result<Self> {
pub fn from_opts(opts: &ServerOptions, metrics: StatsdClient) -> Result<Self> {
let ddb: Box<DynamoDb> = if let Ok(endpoint) = env::var("AWS_LOCAL_DYNAMODB") {
Box::new(DynamoDbClient::new(
RequestDispatcher::default(),
Expand All @@ -83,7 +84,7 @@ impl DynamoStorage {
};
let ddb = Rc::new(ddb);

let mut message_table_names = list_message_tables(ddb.clone(), &opts._message_table_name)
let mut message_table_names = list_message_tables(&ddb, &opts._message_table_name)
.map_err(|_| "Failed to locate message tables")?;
message_table_names.sort_unstable();
let current_message_month = message_table_names
Expand All @@ -93,6 +94,7 @@ impl DynamoStorage {

Ok(Self {
ddb,
metrics: Rc::new(metrics),
router_table_name: opts._router_table_name.clone(),
message_table_names,
current_message_month,
Expand Down Expand Up @@ -135,18 +137,17 @@ impl DynamoStorage {
connected_at: &u64,
uaid: Option<&Uuid>,
router_url: &str,
metrics: &StatsdClient,
) -> impl Future<Item = HelloResponse, Error = Error> {
let response: MyFuture<(HelloResponse, Option<DynamoDbUser>)> = if let Some(uaid) = uaid {
commands::lookup_user(
self.ddb.clone(),
&self.metrics,
&uaid,
connected_at,
router_url,
&self.router_table_name,
&self.message_table_names,
&self.current_message_month,
metrics,
)
} else {
Box::new(future::ok((
Expand Down Expand Up @@ -339,6 +340,7 @@ impl DynamoStorage {
let response: MyFuture<FetchMessageResponse> = if include_topic {
Box::new(commands::fetch_messages(
self.ddb.clone(),
&self.metrics,
table_name,
uaid,
11 as u32,
Expand All @@ -349,6 +351,7 @@ impl DynamoStorage {
let uaid = *uaid;
let table_name = table_name.to_string();
let ddb = self.ddb.clone();
let metrics = Rc::clone(&self.metrics);

response.and_then(move |resp| -> MyFuture<_> {
// Return now from this future if we have messages
Expand All @@ -370,6 +373,7 @@ impl DynamoStorage {
if resp.messages.is_empty() || resp.timestamp.is_some() {
Box::new(commands::fetch_timestamp_messages(
ddb,
&metrics,
table_name.as_ref(),
&uaid,
timestamp,
Expand All @@ -394,11 +398,11 @@ impl DynamoStorage {
}
}

pub fn list_message_tables(ddb: Rc<Box<DynamoDb>>, prefix: &str) -> Result<Vec<String>> {
pub fn list_message_tables(ddb: &Rc<Box<DynamoDb>>, prefix: &str) -> Result<Vec<String>> {
let mut names: Vec<String> = Vec::new();
let mut start_key = None;
loop {
let result = commands::list_tables(ddb.clone(), start_key).wait()?;
let result = commands::list_tables(Rc::clone(ddb), start_key).wait()?;
start_key = result.last_evaluated_table_name;
if let Some(table_names) = result.table_names {
names.extend(table_names);
Expand Down
6 changes: 4 additions & 2 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,15 +301,17 @@ impl Server {
} else {
BroadcastChangeTracker::new(Vec::new())
};
let metrics = metrics_from_opts(opts)?;

let srv = Rc::new(Server {
opts: opts.clone(),
broadcaster: RefCell::new(broadcaster),
ddb: DynamoStorage::from_opts(opts)?,
ddb: DynamoStorage::from_opts(opts, metrics.clone())?,
uaids: RefCell::new(HashMap::new()),
open_connections: Cell::new(0),
handle: core.handle(),
tls_acceptor: tls::configure(opts),
metrics: metrics_from_opts(opts)?,
metrics,
});
let addr = SocketAddr::from(([0, 0, 0, 0], srv.opts.port));
let ws_listener = TcpListener::bind(&addr, &srv.handle)?;
Expand Down

0 comments on commit ed16b64

Please sign in to comment.