Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug: add secondary_write for channels #597

Merged
merged 21 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
8d0dcd6
bug: add secondary_write for channels
jrconlin Feb 1, 2024
456070c
f ignore result of secondary adds
jrconlin Feb 1, 2024
df9be28
f r's
jrconlin Feb 1, 2024
a25487f
Merge branch 'master' into bug/SYNC-4121_secondary
jrconlin Feb 3, 2024
502cab7
Merge branch 'master' into bug/SYNC-4121_secondary
jrconlin Feb 5, 2024
fb17baa
Merge branch 'master' into bug/SYNC-4121_secondary
jrconlin Feb 5, 2024
e3c6984
Merge branch 'master' into bug/SYNC-4121_secondary
jrconlin Feb 6, 2024
09d6810
f add comments
jrconlin Feb 6, 2024
56135fc
f k use `write_to_secondary` for all functions
jrconlin Feb 7, 2024
10970a2
f force write for all `remove_*` functions.
jrconlin Feb 7, 2024
1be1cd4
f add force for increment_storage
jrconlin Feb 7, 2024
cbd5303
f add metrics for ignored errors
jrconlin Feb 8, 2024
7403707
Merge branch 'master' into bug/SYNC-4121_secondary
pjenvey Feb 8, 2024
eaf5a3a
Merge branch 'master' into bug/SYNC-4121_secondary
jrconlin Feb 8, 2024
915f9f0
Merge branch 'master' of github.com:mozilla-services/autopush-rs into…
jrconlin Feb 9, 2024
741be97
Merge branch 'bug/SYNC-4121_secondary' of github.com:mozilla-services…
jrconlin Feb 9, 2024
6d3569e
f remove secondary channel check (since it's no longer needed)
jrconlin Feb 9, 2024
e12da21
f fix typos
jrconlin Feb 9, 2024
963cd3b
Merge branch 'master' into bug/SYNC-4121_secondary
jrconlin Feb 12, 2024
f3ea8d8
Merge branch 'master' into bug/SYNC-4121_secondary
jrconlin Feb 13, 2024
603abeb
Merge branch 'master' into bug/SYNC-4121_secondary
jrconlin Feb 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion autoconnect/autoconnect-settings/src/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl AppState {
);
}
let crypto_key = &crypto_key[1..crypto_key.len() - 1];
debug!("Fernet keys: {:?}", &crypto_key);
debug!("🔐 Fernet keys: {:?}", &crypto_key);
let fernets: Vec<Fernet> = crypto_key
.split(',')
.map(|s| s.trim().to_string())
Expand Down
4 changes: 2 additions & 2 deletions autoendpoint/src/extractors/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl FromRequest for Subscription {
async move {
// Collect token info and server state
let token_info = TokenInfo::extract(&req).await?;
trace!("Token info: {:?}", &token_info);
trace!("🔐 Token info: {:?}", &token_info);
let app_state: Data<AppState> =
Data::extract(&req).await.expect("No server state found");
let metrics = Metrics::from(&app_state);
Expand All @@ -84,7 +84,7 @@ impl FromRequest for Subscription {
.fernet
.decrypt(&repad_base64(&token_info.token))
.map_err(|e| {
error!("fernet: {:?}", e);
error!("🔐 fernet: {:?}", e);
ApiErrorKind::InvalidToken
})?;

Expand Down
2 changes: 1 addition & 1 deletion autoendpoint/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl Settings {
let keys = &self.crypto_keys.replace(['"', ' '], "");
let fernets = Self::read_list_from_str(keys, "Invalid AUTOEND_CRYPTO_KEYS")
.map(|key| {
debug!("Fernet keys: {:?}", &key);
debug!("🔐 Fernet keys: {:?}", &key);
Fernet::new(key).expect("Invalid AUTOEND_CRYPTO_KEYS")
})
.collect();
Expand Down
1 change: 1 addition & 0 deletions autopush-common/src/db/bigtable/bigtable_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,7 @@ impl BigtableDb {
let (v, _stream) = r.into_future().await;
// Since this should return no rows (with the row key set to a value that shouldn't exist)
// the first component of the tuple should be None.
debug!("🉑 health check");
Ok(v.is_none())
}
}
Expand Down
157 changes: 138 additions & 19 deletions autopush-common/src/db/dual/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::collections::HashSet;
use std::sync::Arc;

use async_trait::async_trait;
use cadence::StatsdClient;
use cadence::{CountedExt, StatsdClient};
use serde::Deserialize;
use serde_json::from_str;
use uuid::Uuid;
Expand All @@ -27,30 +27,47 @@ use super::StorageType;

#[derive(Clone)]
pub struct DualClientImpl {
/// The primary data store, which will always be Bigtable.
primary: BigTableClientImpl,
/// The secondary data store, which will always be DynamoDB.
secondary: DdbClientImpl,
/// Write changes to the secondary, including messages and updates
/// as well as account and channel additions/deletions.
write_to_secondary: bool,
/// Hex value to use to specify the first byte of the median offset.
/// e.g. "0a" will start from include all UUIDs upto and including "0a"
median: Option<u8>,
metrics: Arc<StatsdClient>,
}

fn default_true() -> bool {
true
}

#[derive(Clone, Debug, Deserialize)]
pub struct DualDbSettings {
/// The primary data store, which will always be Bigtable.
primary: DbSettings,
/// The secondary data store, which will always be DynamoDB.
secondary: DbSettings,
#[serde(default)]
/// Write changes to the secondary, including messages and updates
/// as well as account and channel additions/deletions.
#[serde(default = "default_true")]
write_to_secondary: bool,
/// Hex value to use to specify the first byte of the median offset.
/// e.g. "0a" will start from include all UUIDs upto and including "0a"
#[serde(default)]
median: Option<String>,
}

impl DualClientImpl {
pub fn new(metrics: Arc<StatsdClient>, settings: &DbSettings) -> DbResult<Self> {
// Not really sure we need the dsn here.
info!("Trying: {:?}", settings.db_settings);
info!("Trying: {:?}", settings.db_settings);
let db_settings: DualDbSettings = from_str(&settings.db_settings).map_err(|e| {
DbError::General(format!("Could not parse DualDBSettings string {:?}", e))
})?;
debug!("settings: {:?}", &db_settings.median);
debug!("settings: {:?}", &db_settings);
if StorageType::from_dsn(&db_settings.primary.dsn) != StorageType::BigTable {
return Err(DbError::General(
"Invalid primary DSN specified (must be BigTable type)".to_owned(),
Expand Down Expand Up @@ -87,6 +104,7 @@ impl DualClientImpl {
secondary: secondary.clone(),
median,
write_to_secondary: db_settings.write_to_secondary,
metrics,
})
}
}
Expand All @@ -110,6 +128,10 @@ impl DualClientImpl {
} else {
(Box::new(&self.primary), true)
};
self.metrics
.incr_with_tags("database.dual.allot")
.with_tag("target", &target.0.name())
.send();
debug!("⚖ alloting to {}", target.0.name());
Ok(target)
}
Expand All @@ -119,22 +141,37 @@ impl DualClientImpl {
impl DbClient for DualClientImpl {
async fn add_user(&self, user: &User) -> DbResult<()> {
let (target, is_primary) = self.allot(&user.uaid).await?;
if is_primary && self.write_to_secondary {
let _ = self.secondary.add_user(user).await?;
}
debug!("⚖ adding user to {}...", target.name());
let result = target.add_user(user).await?;
if is_primary && self.write_to_secondary {
let _ = self.secondary.add_user(user).await.map_err(|e| {
error!("⚖ Error: {:?}", e);
self.metrics
.incr_with_tags("database.dual.error")
.with_tag("func", "add_user")
.send();
e
});
}
debug!("⚖ User added...");
Ok(result)
}

async fn update_user(&self, user: &User) -> DbResult<bool> {
// If the UAID is in the allowance, move them to the new data store
let (target, is_primary) = self.allot(&user.uaid).await?;
let result = target.update_user(user).await?;
if is_primary && self.write_to_secondary {
let _ = self.secondary.add_user(user).await?;
let _ = self.secondary.add_user(user).await.map_err(|e| {
error!("⚡ Error: {:?}", e);
self.metrics
.incr_with_tags("database.dual.error")
.with_tag("func", "remove_user")
jrconlin marked this conversation as resolved.
Show resolved Hide resolved
.send();
e
});
}
target.update_user(user).await
Ok(result)
}

async fn get_user(&self, uaid: &Uuid) -> DbResult<Option<User>> {
Expand Down Expand Up @@ -165,21 +202,56 @@ impl DbClient for DualClientImpl {
if is_primary {
// try removing the user from the old store, just in case.
// leaving them could cause false reporting later.
let _ = self.secondary.remove_user(uaid).await;
let _ = self.secondary.remove_user(uaid).await.map_err(|e| {
debug!("⚖ Secondary remove_user error {:?}", e);
self.metrics
.incr_with_tags("database.dual.error")
.with_tag("func", "remove_user")
.send();
e
});
}
Ok(result)
}

async fn add_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<()> {
debug!("⚖ getting target");
let (target, _) = self.allot(uaid).await?;
let (target, is_primary) = self.allot(uaid).await?;
debug!("⚖ Adding channel to {}", target.name());
target.add_channel(uaid, channel_id).await
let result = target.add_channel(uaid, channel_id).await;
if is_primary && self.write_to_secondary {
let _ = self
.secondary
.add_channel(uaid, channel_id)
.await
.map_err(|e| {
self.metrics
.incr_with_tags("database.dual.allot")
jrconlin marked this conversation as resolved.
Show resolved Hide resolved
.with_tag("func", "add_channel")
.send();
e
});
}
result
}

async fn add_channels(&self, uaid: &Uuid, channels: HashSet<Uuid>) -> DbResult<()> {
let (target, _) = self.allot(uaid).await?;
target.add_channels(uaid, channels).await
let (target, is_primary) = self.allot(uaid).await?;
let result = target.add_channels(uaid, channels.clone()).await;
if is_primary && self.write_to_secondary {
let _ = self
.secondary
.add_channels(uaid, channels)
.await
.map_err(|e| {
self.metrics
.incr_with_tags("database.dual.allot")
jrconlin marked this conversation as resolved.
Show resolved Hide resolved
.with_tag("func", "add_channels")
.send();
e
});
}
result
}

async fn get_channels(&self, uaid: &Uuid) -> DbResult<HashSet<Uuid>> {
Expand All @@ -195,8 +267,21 @@ impl DbClient for DualClientImpl {
async fn remove_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<bool> {
let (target, is_primary) = self.allot(uaid).await?;
let result = target.remove_channel(uaid, channel_id).await?;
// Always remove the channel
if is_primary {
let _ = self.secondary.remove_channel(uaid, channel_id).await?;
let _ = self
.secondary
.remove_channel(uaid, channel_id)
.await
.map_err(|e| {
debug!("⚖ Secondary remove_channel error: {:?}", e);
self.metrics
.incr_with_tags("database.dual.error")
.with_tag("func", "remove_channel")
.send();

e
});
}
Ok(result)
}
Expand All @@ -212,11 +297,21 @@ impl DbClient for DualClientImpl {
let mut result = target
.remove_node_id(uaid, node_id, connected_at, version)
.await?;
// Always remove the node_id.
if is_primary {
result = self
.secondary
.remove_node_id(uaid, node_id, connected_at, version)
.await?
.await
.unwrap_or_else(|e| {
debug!("⚖ Secondary remove_node_id error: {:?}", e);
self.metrics
.incr_with_tags("database.dual.error")
.with_tag("func", "remove_node_id")
.send();

false
})
|| result;
}
Ok(result)
Expand All @@ -233,8 +328,21 @@ impl DbClient for DualClientImpl {
async fn remove_message(&self, uaid: &Uuid, sort_key: &str) -> DbResult<()> {
let (target, is_primary) = self.allot(uaid).await?;
let result = target.remove_message(uaid, sort_key).await?;
// Always remove the message
if is_primary {
let _ = self.primary.remove_message(uaid, sort_key).await?;
// this will be increasingly chatty as we wind down dynamodb.
let _ = self
.secondary
.remove_message(uaid, sort_key)
.await
.map_err(|e| {
debug!("⚖ Secondary remove_message error: {:?}", e);
self.metrics
.incr_with_tags("database.dual.error")
.with_tag("func", "remove_message")
.send();
e
});
}
Ok(result)
}
Expand Down Expand Up @@ -281,8 +389,19 @@ impl DbClient for DualClientImpl {

async fn increment_storage(&self, uaid: &Uuid, timestamp: u64) -> DbResult<()> {
let (target, is_primary) = self.allot(uaid).await?;
if is_primary && self.write_to_secondary {
let _ = self.secondary.increment_storage(uaid, timestamp).await?;
if is_primary {
let _ = self
.secondary
.increment_storage(uaid, timestamp)
.await
.map_err(|e| {
debug!("⚖ Secondary increment_storage error: {:?}", e);
self.metrics
.incr_with_tags("database.dual.error")
.with_tag("func", "increment_storage")
.send();
e
});
}
target.increment_storage(uaid, timestamp).await
}
Expand Down
1 change: 1 addition & 0 deletions autopush-common/src/db/dynamodb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,7 @@ impl DbClient for DdbClientImpl {
.map_err(|e| DbError::General(format!("DynamoDB health check failure: {:?}", e)))?;
if let Some(names) = result.table_names {
// We found at least one table that matches the message_table
debug!("dynamodb ok");
return Ok(!names.is_empty());
}
// Huh, we couldn't find a message table? That's a failure.
Expand Down