Skip to content

Commit

Permalink
feat: introduce end_median for dual. (#663)
Browse files Browse the repository at this point in the history
`end_median` is an optional database config argument for `dual`. Like
`median` it will filter UAIDs for migration by the last octet. (Note,
this will still allow UAIDs that match against the `median`)

This is an attempt to reduce hot tablet entries during migration phases.
It is recommended that `end_median` be slowly ramped up instead of
`median`.

Closes SYNC-4198
  • Loading branch information
jrconlin committed Mar 19, 2024
1 parent 133bcb5 commit 38d6a6d
Showing 1 changed file with 63 additions and 16 deletions.
79 changes: 63 additions & 16 deletions autopush-common/src/db/dual/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ pub struct DualClientImpl {
/// Hex value to use to specify the first byte of the median offset.
/// e.g. "0a" will start from include all UUIDs upto and including "0a"
median: Option<u8>,
/// Ending octet to use for more distributed account migration
end_median: Option<u8>,
metrics: Arc<StatsdClient>,
}

Expand All @@ -53,10 +55,15 @@ pub struct DualDbSettings {
/// as well as account and channel additions/deletions.
#[serde(default = "default_true")]
write_to_secondary: bool,
/// Hex value to use to specify the first byte of the median offset.
/// Hex value to specify the first byte of the median offset.
/// e.g. "0a" will start from include all UUIDs upto and including "0a"
#[serde(default)]
median: Option<String>,
/// Hex value to specify the last byte of the median offset to include.
/// this value is "OR"ed withe "median" to produce a more distributed set of
/// uaids to migrate
#[serde(default)]
end_median: Option<String>,
}

impl DualClientImpl {
Expand Down Expand Up @@ -95,6 +102,25 @@ impl DualClientImpl {
} else {
None
};
// determine which uaids to move based on the last byte of their UAID.
// This should reduce the hot table problem.
let end_median = if let Some(end_median) = db_settings.end_median {
let end_median = hex::decode(end_median).map_err(|e| {
DbError::General(format!(
"Could not parse end_median string. Please use a valid Hex identifier: {:?}",
e,
))
})?[0];
debug!(
"⚖ Setting end_median to {:02} ({})",
hex::encode([end_median]),
&end_median
);
Some(end_median)
} else {
None
};

let primary = BigTableClientImpl::new(metrics.clone(), &db_settings.primary)?;
let secondary = DdbClientImpl::new(metrics.clone(), &db_settings.secondary)?;
debug!("⚖ Got primary and secondary");
Expand All @@ -104,11 +130,16 @@ impl DualClientImpl {
"median",
&median.map_or_else(|| "None".to_owned(), |m| m.to_string()),
)
.with_tag(
"end_median",
&end_median.map_or_else(|| "None".to_owned(), |m| m.to_string()),
)
.send();
Ok(Self {
primary,
secondary: secondary.clone(),
median,
end_median,
write_to_secondary: db_settings.write_to_secondary,
metrics,
})
Expand All @@ -123,19 +154,26 @@ impl DualClientImpl {
/// Wrapper functions to allow us to change which data store system actually manages the
/// user allocation routing table.
impl DualClientImpl {
fn should_migrate(&self, uaid: &Uuid) -> bool {
let bytes = uaid.as_bytes();
let mut result: bool = false;
if let Some(median) = self.median {
result |= bytes.first() <= Some(&median);
};
if let Some(end_median) = self.end_median {
result |= bytes.last() <= Some(&end_median);
}
result
}
/// Route and assign a user to the appropriate back end based on the defined
/// allowance
/// Returns the dbclient to use and whether or not it's the primary database.
async fn allot<'a>(&'a self, uaid: &Uuid) -> DbResult<(Box<&'a dyn DbClient>, bool)> {
let target: (Box<&'a dyn DbClient>, bool) = if let Some(median) = self.median {
if uaid.as_bytes()[0] <= median {
debug!("⚖ Routing user to Bigtable");
(Box::new(&self.primary), true)
} else {
(Box::new(&self.secondary), false)
}
} else {
let target: (Box<&'a dyn DbClient>, bool) = if self.should_migrate(uaid) {
debug!("⚖ Routing user to Bigtable");
(Box::new(&self.primary), true)
} else {
(Box::new(&self.secondary), false)
};
debug!("⚖ alloting to {}", target.0.name());
Ok(target)
Expand Down Expand Up @@ -476,7 +514,7 @@ mod test {
use serde_json::json;
use std::str::FromStr;

fn test_args(median: Option<&str>) -> String {
fn test_args(median: Option<&str>, end_median: Option<&str>) -> String {
json!({
"primary": {
"dsn": "grpc://bigtable.googleapis.com", // Note that this is the general endpoint.
Expand All @@ -494,6 +532,7 @@ mod test {
}).to_string(),
},
"median": median.to_owned(),
"end_median": end_median.to_owned(),
"write_to_secondary": false,
})
.to_string()
Expand All @@ -503,7 +542,7 @@ mod test {
/// documentation for how the db_settings argument should be structured
#[test]
fn arg_parsing() -> DbResult<()> {
let arg_str = test_args(None);
let arg_str = test_args(None, None);
// the output string looks like:
/*
"{\"primary\":{\"db_settings\":\"{\\\"message_family\\\":\\\"message\\\",\\\"router_family\\\":\\\"router\\\",\\\"table_name\\\":\\\"projects/some-project/instances/some-instance/tables/some-table\\\"}\",\"dsn\":\"grpc://bigtable.googleapis.com\"},\"secondary\":{\"db_settings\":\"{\\\"message_table\\\":\\\"test_message\\\",\\\"router_table\\\":\\\"test_router\\\"}\",\"dsn\":\"http://localhost:8000/\"}}"
Expand Down Expand Up @@ -531,24 +570,32 @@ mod test {

#[actix_rt::test]
async fn allocation() -> DbResult<()> {
let arg_str = test_args(Some("0A"));
let arg_str = test_args(Some("0A"), Some("88"));
let metrics = Arc::new(StatsdClient::builder("", NopMetricSink).build());
let dual_settings = DbSettings {
dsn: Some("dual".to_owned()),
db_settings: arg_str,
};
let dual = DualClientImpl::new(metrics, &dual_settings)?;

// Should be included.
let low_uaid = Uuid::from_str("04DDDDDD-2040-4b4d-be3d-a340fc2d15a6").unwrap();
// Should be excluded.
let hi_uaid = Uuid::from_str("0BDDDDDD-2040-4b4d-be3d-a340fc2d15a6").unwrap();
// Should be included (Note: high end median)
let low_uaid = Uuid::from_str("04DDDDDD-1234-1234-1234-0000000000CC").unwrap();
let (result, is_primary) = dual.allot(&low_uaid).await?;
assert_eq!(result.name(), dual.primary.name());
assert!(is_primary);

// Should be excluded (Note: high end_median)
let hi_uaid = Uuid::from_str("0BDDDDDD-1234-1234-1234-0000000000CC").unwrap();
let (result, is_primary) = dual.allot(&hi_uaid).await?;
assert_eq!(result.name(), dual.secondary.name());
assert!(!is_primary);

// Should be included (Note: high median with low end median)
let hi_end_uaid = Uuid::from_str("0BDDDDDD-1234-1234-1234-000000000080").unwrap();
let (result, is_primary) = dual.allot(&hi_end_uaid).await?;
assert_eq!(result.name(), dual.primary.name());
assert!(is_primary);

Ok(())
}
}

0 comments on commit 38d6a6d

Please sign in to comment.