Skip to content

Commit

Permalink
fix: capture additional errors that shouldn't be reported to Sentry
Browse files Browse the repository at this point in the history
Avoids capturing errors for the following:
- Unable to send to a client
- Invalid json text sent to server
- DynamoDB Internal Server Error that should be retried

Closes #87
  • Loading branch information
bbangert committed Jan 11, 2019
1 parent 7db476a commit e32cc5c
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 95 deletions.
15 changes: 12 additions & 3 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,9 @@ where
| Err(Error(ErrorKind::PongTimeout, _))
| Err(Error(ErrorKind::RepeatUaidDisconnect, _))
| Err(Error(ErrorKind::ExcessivePing, _))
| Err(Error(ErrorKind::InvalidStateTransition(_, _), _)) => None,
| Err(Error(ErrorKind::InvalidStateTransition(_, _), _))
| Err(Error(ErrorKind::InvalidClientMessage(_), _))
| Err(Error(ErrorKind::SendError, _)) => None,
Err(e) => Some(e),
}
};
Expand Down Expand Up @@ -732,7 +734,10 @@ where
} = **send;
if !smessages.is_empty() {
let item = smessages.remove(0);
let ret = data.ws.start_send(item).chain_err(|| "unable to send")?;
let ret = data
.ws
.start_send(item)
.chain_err(|| ErrorKind::SendError)?;
match ret {
AsyncSink::Ready => true,
AsyncSink::NotReady(returned) => {
Expand Down Expand Up @@ -843,7 +848,11 @@ where
let channel_id =
Uuid::parse_str(&channel_id_str).chain_err(|| "Invalid channelID")?;
if channel_id.to_hyphenated().to_string() != channel_id_str {
return Err("Bad UUID format, use lower case, dashed format".into());
return Err(ErrorKind::InvalidClientMessage(format!(
"Invalid UUID format, not lower-case/dashed: {}",
channel_id
))
.into());
}

let uaid = webpush.uaid;
Expand Down
187 changes: 97 additions & 90 deletions src/db/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use cadence::{Counted, StatsdClient};
use chrono::Utc;
use futures::{future, Future};
use futures_backoff::retry_if;
use matches::matches;
use rusoto_dynamodb::{
AttributeValue, DeleteItemError, DeleteItemInput, DeleteItemOutput, DynamoDb, GetItemError,
GetItemInput, GetItemOutput, ListTablesInput, ListTablesOutput, PutItemError, PutItemInput,
Expand All @@ -23,6 +22,25 @@ use crate::errors::*;
use crate::protocol::Notification;
use crate::util::timing::sec_since_epoch;

macro_rules! retryable_error {
($name:ident, $type:ty, $property:ident) => {
fn $name(err: &$type) -> bool {
match err {
$property::InternalServerError(_) | $property::ProvisionedThroughputExceeded(_) => {
true
}
_ => false,
}
}
};
}

retryable_error!(retryable_query_error, QueryError, QueryError);
retryable_error!(retryable_delete_error, DeleteItemError, DeleteItemError);
retryable_error!(retryable_getitem_error, GetItemError, GetItemError);
retryable_error!(retryable_putitem_error, PutItemError, PutItemError);
retryable_error!(retryable_updateitem_error, UpdateItemError, UpdateItemError);

#[derive(Default)]
pub struct FetchMessageResponse {
pub timestamp: Option<u64>,
Expand Down Expand Up @@ -73,47 +91,45 @@ pub fn fetch_messages(
};

let metrics = Rc::clone(metrics);
retry_if(
move || ddb.query(input.clone()),
|err: &QueryError| matches!(err, &QueryError::ProvisionedThroughputExceeded(_)),
)
.chain_err(|| "Error fetching messages")
.and_then(move |output| {
let mut notifs: Vec<DynamoDbNotification> = output.items.map_or_else(Vec::new, |items| {
debug!("Got response of: {:?}", items);
items
retry_if(move || ddb.query(input.clone()), retryable_query_error)
.chain_err(|| ErrorKind::MessageFetch)
.and_then(move |output| {
let mut notifs: Vec<DynamoDbNotification> =
output.items.map_or_else(Vec::new, |items| {
debug!("Got response of: {:?}", items);
items
.into_iter()
.inspect(|i| debug!("Item: {:?}", i))
.filter_map(|item| {
let item2 = item.clone();
ok_or_inspect(serde_dynamodb::from_hashmap(item), |e| {
conversion_err(&metrics, e, item2, "serde_dynamodb_from_hashmap")
})
})
.collect()
});
if notifs.is_empty() {
return Ok(Default::default());
}

// Load the current_timestamp from the subscription registry entry which is
// the first DynamoDbNotification and remove it from the vec.
let timestamp = notifs.remove(0).current_timestamp;
// Convert any remaining DynamoDbNotifications to Notification's
let messages = notifs
.into_iter()
.inspect(|i| debug!("Item: {:?}", i))
.filter_map(|item| {
let item2 = item.clone();
ok_or_inspect(serde_dynamodb::from_hashmap(item), |e| {
conversion_err(&metrics, e, item2, "serde_dynamodb_from_hashmap")
.filter_map(|ddb_notif| {
let ddb_notif2 = ddb_notif.clone();
ok_or_inspect(ddb_notif.into_notif(), |e| {
conversion_err(&metrics, e, ddb_notif2, "into_notif")
})
})
.collect()
});
if notifs.is_empty() {
return Ok(Default::default());
}

// Load the current_timestamp from the subscription registry entry which is
// the first DynamoDbNotification and remove it from the vec.
let timestamp = notifs.remove(0).current_timestamp;
// Convert any remaining DynamoDbNotifications to Notification's
let messages = notifs
.into_iter()
.filter_map(|ddb_notif| {
let ddb_notif2 = ddb_notif.clone();
ok_or_inspect(ddb_notif.into_notif(), |e| {
conversion_err(&metrics, e, ddb_notif2, "into_notif")
})
.collect();
Ok(FetchMessageResponse {
timestamp,
messages,
})
.collect();
Ok(FetchMessageResponse {
timestamp,
messages,
})
})
}

pub fn fetch_timestamp_messages(
Expand Down Expand Up @@ -143,36 +159,33 @@ pub fn fetch_timestamp_messages(
};

let metrics = Rc::clone(metrics);
retry_if(
move || ddb.query(input.clone()),
|err: &QueryError| matches!(err, &QueryError::ProvisionedThroughputExceeded(_)),
)
.chain_err(|| "Error fetching messages")
.and_then(move |output| {
let messages = output.items.map_or_else(Vec::new, |items| {
debug!("Got response of: {:?}", items);
items
.into_iter()
.filter_map(|item| {
let item2 = item.clone();
ok_or_inspect(serde_dynamodb::from_hashmap(item), |e| {
conversion_err(&metrics, e, item2, "serde_dynamodb_from_hashmap")
retry_if(move || ddb.query(input.clone()), retryable_query_error)
.chain_err(|| ErrorKind::MessageFetch)
.and_then(move |output| {
let messages = output.items.map_or_else(Vec::new, |items| {
debug!("Got response of: {:?}", items);
items
.into_iter()
.filter_map(|item| {
let item2 = item.clone();
ok_or_inspect(serde_dynamodb::from_hashmap(item), |e| {
conversion_err(&metrics, e, item2, "serde_dynamodb_from_hashmap")
})
})
})
.filter_map(|ddb_notif: DynamoDbNotification| {
let ddb_notif2 = ddb_notif.clone();
ok_or_inspect(ddb_notif.into_notif(), |e| {
conversion_err(&metrics, e, ddb_notif2, "into_notif")
.filter_map(|ddb_notif: DynamoDbNotification| {
let ddb_notif2 = ddb_notif.clone();
ok_or_inspect(ddb_notif.into_notif(), |e| {
conversion_err(&metrics, e, ddb_notif2, "into_notif")
})
})
})
.collect()
});
let timestamp = messages.iter().filter_map(|m| m.sortkey_timestamp).max();
Ok(FetchMessageResponse {
timestamp,
messages,
.collect()
});
let timestamp = messages.iter().filter_map(|m| m.sortkey_timestamp).max();
Ok(FetchMessageResponse {
timestamp,
messages,
})
})
})
}

pub fn drop_user(
Expand All @@ -187,7 +200,7 @@ pub fn drop_user(
};
retry_if(
move || ddb.delete_item(input.clone()),
|err: &DeleteItemError| matches!(err, &DeleteItemError::ProvisionedThroughputExceeded(_)),
retryable_delete_error,
)
.chain_err(|| "Error dropping user")
}
Expand All @@ -203,11 +216,8 @@ pub fn get_uaid(
key: ddb_item! { uaid: s => uaid.to_simple().to_string() },
..Default::default()
};
retry_if(
move || ddb.get_item(input.clone()),
|err: &GetItemError| matches!(err, &GetItemError::ProvisionedThroughputExceeded(_)),
)
.chain_err(|| "Error fetching user")
retry_if(move || ddb.get_item(input.clone()), retryable_getitem_error)
.chain_err(|| "Error fetching user")
}

pub fn register_user(
Expand Down Expand Up @@ -246,7 +256,7 @@ pub fn register_user(
..Default::default()
})
},
|err: &PutItemError| matches!(err, &PutItemError::ProvisionedThroughputExceeded(_)),
retryable_putitem_error,
)
.chain_err(|| "Error storing user record")
}
Expand Down Expand Up @@ -276,7 +286,7 @@ pub fn update_user_message_month(
ddb.update_item(update_item.clone())
.and_then(|_| future::ok(()))
},
|err: &UpdateItemError| matches!(err, &UpdateItemError::ProvisionedThroughputExceeded(_)),
retryable_updateitem_error,
)
.chain_err(|| "Error updating user message month")
}
Expand All @@ -296,22 +306,19 @@ pub fn all_channels(
..Default::default()
};

retry_if(
move || ddb.get_item(input.clone()),
|err: &GetItemError| matches!(err, &GetItemError::ProvisionedThroughputExceeded(_)),
)
.and_then(|output| {
let channels = output
.item
.and_then(|item| {
serde_dynamodb::from_hashmap(item)
.ok()
.and_then(|notif: DynamoDbNotification| notif.chids)
})
.unwrap_or_else(HashSet::new);
future::ok(channels)
})
.or_else(|_err| future::ok(HashSet::new()))
retry_if(move || ddb.get_item(input.clone()), retryable_getitem_error)
.and_then(|output| {
let channels = output
.item
.and_then(|item| {
serde_dynamodb::from_hashmap(item)
.ok()
.and_then(|notif: DynamoDbNotification| notif.chids)
})
.unwrap_or_else(HashSet::new);
future::ok(channels)
})
.or_else(|_err| future::ok(HashSet::new()))
}

pub fn save_channels(
Expand Down Expand Up @@ -342,7 +349,7 @@ pub fn save_channels(
ddb.update_item(update_item.clone())
.and_then(|_| future::ok(()))
},
|err: &UpdateItemError| matches!(err, &UpdateItemError::ProvisionedThroughputExceeded(_)),
retryable_updateitem_error,
)
.chain_err(|| "Error saving channels")
}
Expand Down Expand Up @@ -370,7 +377,7 @@ pub fn unregister_channel_id(

retry_if(
move || ddb.update_item(update_item.clone()),
|err: &UpdateItemError| matches!(err, &UpdateItemError::ProvisionedThroughputExceeded(_)),
retryable_updateitem_error,
)
.chain_err(|| "Error unregistering channel")
}
Expand Down
13 changes: 13 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,19 @@ error_chain! {
description("invalid state transition")
display("invalid state transition, from: {}, to: {}", from, to)
}

InvalidClientMessage(text: String) {
description("invalid json text")
display("invalid json: {}", text)
}

MessageFetch {
description("server error fetching messages")
}

SendError {
description("unable to send to client")
}
}
}

Expand Down
8 changes: 6 additions & 2 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -889,11 +889,15 @@ where
match msg {
Message::Text(ref s) => {
trace!("text message {}", s);
let msg = s.parse().chain_err(|| "invalid json text")?;
let msg = s
.parse()
.chain_err(|| ErrorKind::InvalidClientMessage(s.to_owned()))?;
return Ok(Some(msg).into());
}

Message::Binary(_) => return Err("binary messages not accepted".into()),
Message::Binary(_) => {
return Err(ErrorKind::InvalidClientMessage("binary content".to_string()).into())
}

// sending a pong is already managed by lower layers, just go to
// the next message
Expand Down

0 comments on commit e32cc5c

Please sign in to comment.