Skip to content

Commit

Permalink
feat: filter reads by latest cell (#599)
Browse files Browse the repository at this point in the history
and generally add the exact GC policy of the column family to all reads

also fix User::version from writing to DynamoDB (it's Bigtable only)

Issue: SYNC-4094
  • Loading branch information
pjenvey committed Feb 6, 2024
1 parent aa5773d commit e012955
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 54 deletions.
133 changes: 79 additions & 54 deletions autopush-common/src/db/bigtable/bigtable_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,16 @@ pub struct BigTableClientImpl {
pool: BigTablePool,
}

fn timestamp_filter() -> Result<data::RowFilter, error::BigTableError> {
/// Return a a RowFilter matching the GC policy of the router Column Family
fn router_gc_policy_filter() -> data::RowFilter {
let mut latest_cell_filter = data::RowFilter::default();
latest_cell_filter.set_cells_per_column_limit_filter(1);
latest_cell_filter
}

/// Return a chain of RowFilters matching the GC policy of the message Column
/// Families
fn message_gc_policy_filter() -> Result<Vec<data::RowFilter>, error::BigTableError> {
let mut timestamp_filter = data::RowFilter::default();
let bt_now: i64 = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
Expand All @@ -84,7 +93,7 @@ fn timestamp_filter() -> Result<data::RowFilter, error::BigTableError> {
range_filter.set_start_timestamp_micros(bt_now * 1000);
timestamp_filter.set_timestamp_range_filter(range_filter);

Ok(timestamp_filter)
Ok(vec![router_gc_policy_filter(), timestamp_filter])
}

/// Escape bytes for RE values
Expand All @@ -109,12 +118,9 @@ fn escape_bytes(bytes: &[u8]) -> Vec<u8> {
vec
}

/// Return a RowFilter limiting to a match of the specified `version`'s column
/// value
fn version_filter(version: &Uuid) -> data::RowFilter {
let mut router_filter_chain = RowFilter_Chain::default();
let mut filter_set: RepeatedField<RowFilter> = RepeatedField::default();

/// Return a chain of RowFilters limiting to a match of the specified
/// `version`'s column value
fn version_filter(version: &Uuid) -> Vec<data::RowFilter> {
let mut family_filter = data::RowFilter::default();
family_filter.set_family_name_regex_filter(format!("^{ROUTER_FAMILY}$"));

Expand All @@ -124,26 +130,28 @@ fn version_filter(version: &Uuid) -> data::RowFilter {
let mut value_filter = data::RowFilter::default();
value_filter.set_value_regex_filter(escape_bytes(version.as_bytes()));

filter_set.push(family_filter);
filter_set.push(cq_filter);
filter_set.push(value_filter);

router_filter_chain.set_filters(filter_set);
let mut router_filter = RowFilter::default();
router_filter.set_chain(router_filter_chain);
router_filter
vec![family_filter, cq_filter, value_filter]
}

/// Return a newly generated `version` column `Cell`
fn new_version_cell(timestamp: SystemTime) -> cell::Cell {
cell::Cell {
qualifier: "version".to_owned(),
value: Uuid::new_v4().into_bytes().to_vec(),
value: Uuid::new_v4().into(),
timestamp,
..Default::default()
}
}

/// Return a RowFilter chain from multiple RowFilters
fn filter_chain(filters: impl Into<RepeatedField<RowFilter>>) -> RowFilter {
let mut chain = RowFilter_Chain::default();
chain.set_filters(filters.into());
let mut filter = RowFilter::default();
filter.set_chain(chain);
filter
}

/// Return a ReadRowsRequest against table for a given row key
fn read_row_request(table_name: &str, row_key: &str) -> bigtable::ReadRowsRequest {
let mut req = bigtable::ReadRowsRequest::default();
Expand Down Expand Up @@ -431,6 +439,7 @@ impl BigTableClientImpl {
}

/// Delete all cell data from the specified columns with the optional time range.
#[allow(unused)]
async fn delete_cells(
&self,
row_key: &str,
Expand Down Expand Up @@ -638,8 +647,9 @@ impl DbClient for BigTableClientImpl {
let row = self.user_to_row(user);

// Only add when the user doesn't already exist
let mut filter = RowFilter::default();
filter.set_row_key_regex_filter(format!("^{}$", row.row_key).into_bytes());
let mut row_key_filter = RowFilter::default();
row_key_filter.set_row_key_regex_filter(format!("^{}$", row.row_key).into_bytes());
let filter = filter_chain(vec![router_gc_policy_filter(), row_key_filter]);

if self.check_and_mutate_row(row, filter, false).await? {
return Err(DbError::Conditional);
Expand All @@ -654,8 +664,13 @@ impl DbClient for BigTableClientImpl {
let Some(ref version) = user.version else {
return Err(DbError::General("Expected a user version field".to_owned()));
};

let mut filters = vec![router_gc_policy_filter()];
filters.extend(version_filter(version));
let filter = filter_chain(filters);

Ok(self
.check_and_mutate_row(self.user_to_row(user), version_filter(version), true)
.check_and_mutate_row(self.user_to_row(user), filter, true)
.await?)
}

Expand All @@ -666,19 +681,21 @@ impl DbClient for BigTableClientImpl {
};

trace!("🉑 Found a record for {}", row_key);
let version = row
.take_required_cell("version")?
.value
.try_into()
.map_err(|e| DbError::Serialization(format!("Could not deserialize version: {e:?}")))?;
let mut result = User {
uaid: *uaid,
connected_at: to_u64(
row.take_required_cell("connected_at")?.value,
"connected_at",
)?,
router_type: to_string(row.take_required_cell("router_type")?.value, "router_type")?,
version: Some(Uuid::from_bytes(version)),
version: Some(
row.take_required_cell("version")?
.value
.try_into()
.map_err(|e| {
DbError::Serialization(format!("Could not deserialize version: {e:?}"))
})?,
),
..Default::default()
};

Expand Down Expand Up @@ -761,23 +778,17 @@ impl DbClient for BigTableClientImpl {
let row_key = uaid.simple().to_string();
let mut req = self.read_row_request(&row_key);

let mut filter_set: RepeatedField<RowFilter> = RepeatedField::default();

let mut family_filter = data::RowFilter::default();
family_filter.set_family_name_regex_filter(format!("^{ROUTER_FAMILY}$"));

let mut cq_filter = data::RowFilter::default();
cq_filter.set_column_qualifier_regex_filter("^chid:.*$".as_bytes().to_vec());

filter_set.push(family_filter);
filter_set.push(cq_filter);

let mut filter_chain = RowFilter_Chain::default();
filter_chain.set_filters(filter_set);

let mut filter = data::RowFilter::default();
filter.set_chain(filter_chain);
req.set_filter(filter);
req.set_filter(filter_chain(vec![
router_gc_policy_filter(),
family_filter,
cq_filter,
]));

let mut rows = self.read_rows(req).await?;
let mut result = HashSet::new();
Expand Down Expand Up @@ -835,7 +846,10 @@ impl DbClient for BigTableClientImpl {
};

let mut req = self.check_and_mutate_row_request(&row_key);
req.set_predicate_filter(version_filter(version));

let mut filters = vec![router_gc_policy_filter()];
filters.extend(version_filter(version));
req.set_predicate_filter(filter_chain(filters));
req.set_true_mutations(self.get_delete_mutations(ROUTER_FAMILY, &["node_id"], None)?);

Ok(self.check_and_mutate(req).await?)
Expand Down Expand Up @@ -993,7 +1007,7 @@ impl DbClient for BigTableClientImpl {
rows.set_row_ranges(row_ranges);
req.set_rows(rows);

req.set_filter(timestamp_filter()?);
req.set_filter(filter_chain(message_gc_policy_filter()?));
if limit > 0 {
trace!("🉑 Setting limit to {limit}");
req.set_rows_limit(limit as i64);
Expand Down Expand Up @@ -1059,7 +1073,7 @@ impl DbClient for BigTableClientImpl {
// therefore run two filters, one to fetch the candidate IDs
// and another to fetch the content of the messages.
*/
req.set_filter(timestamp_filter()?);
req.set_filter(filter_chain(message_gc_policy_filter()?));
if limit > 0 {
req.set_rows_limit(limit as i64);
}
Expand Down Expand Up @@ -1128,7 +1142,7 @@ mod tests {
use uuid;

use super::*;
use crate::{db::DbSettings, util::ms_since_epoch};
use crate::{db::DbSettings, test_support::gen_test_uaid, util::ms_since_epoch};

const TEST_USER: &str = "DEADBEEF-0000-0000-0000-0123456789AB";
const TEST_CHID: &str = "DECAFBAD-0000-0000-0000-0123456789AB";
Expand Down Expand Up @@ -1393,16 +1407,8 @@ mod tests {

#[actix_rt::test]
async fn read_cells_family_id() -> DbResult<()> {
// let uaid = Uuid::parse_str(TEST_USER).unwrap();
// generate a somewhat random test UAID to prevent possible false test fails
// if the account is deleted before this test completes.
let uaid = {
let temp = Uuid::new_v4().to_string();
let mut parts: Vec<&str> = temp.split('-').collect();
parts[0] = "DEADBEEF";
Uuid::parse_str(&parts.join("-")).unwrap()
};
let client = new_client().unwrap();
let uaid = gen_test_uaid();
client.remove_user(&uaid).await.unwrap();

let qualifier = "foo".to_owned();
Expand All @@ -1426,12 +1432,10 @@ mod tests {
client.remove_user(&uaid).await
}

/*
// XXX: uncomment after the uaid clashing fix
#[actix_rt::test]
async fn add_user_existing() {
let client = new_client().unwrap();
let uaid = Uuid::parse_str(TEST_USER).unwrap();
let uaid = gen_test_uaid();
let user = User {
uaid,
..Default::default()
Expand All @@ -1442,5 +1446,26 @@ mod tests {
let err = client.add_user(&user).await.unwrap_err();
assert!(matches!(err, DbError::Conditional));
}
*/

#[actix_rt::test]
async fn version_check() {
let client = new_client().unwrap();
let uaid = gen_test_uaid();
let user = User {
uaid,
..Default::default()
};
client.remove_user(&uaid).await.unwrap();

client.add_user(&user).await.unwrap();
let user = client.get_user(&uaid).await.unwrap().unwrap();
assert!(client.update_user(&user).await.unwrap());

let fetched = client.get_user(&uaid).await.unwrap().unwrap();
assert_ne!(user.version, fetched.version);
// should now fail w/ a stale version
assert!(!client.update_user(&user).await.unwrap());

client.remove_user(&uaid).await.unwrap();
}
}
1 change: 1 addition & 0 deletions autopush-common/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ pub struct User {
#[serde(skip_serializing_if = "Option::is_none")]
pub current_timestamp: Option<u64>,
/// UUID4 version number for optimistic locking of updates on Bigtable
#[serde(skip_serializing)]
pub version: Option<Uuid>,
}

Expand Down
1 change: 1 addition & 0 deletions autopush-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub mod middleware;
pub mod notification;
pub mod sentry;
pub mod tags;
pub mod test_support;

#[macro_use]
pub mod util;
11 changes: 11 additions & 0 deletions autopush-common/src/test_support.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use uuid::Uuid;

/// Generate a UAID that is prefixed with the test-identification ID "DEADBEEF".
/// Note: It's absolutely possible that this might cause a conflict with valid UAIDs, but
/// the risk is reasonably small, and we could limit pruning to whenever we had
/// accidentally run the test script against production.
pub fn gen_test_uaid() -> Uuid {
let temp = Uuid::new_v4();
let (_, d2, d3, d4) = temp.as_fields();
Uuid::from_fields(0xdeadbeef, d2, d3, d4)
}

0 comments on commit e012955

Please sign in to comment.