Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Switch to lighter weight health check #662

Merged
merged 6 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions autopush-common/src/db/bigtable/bigtable_client/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ pub enum BigTableError {
/// General Pool builder errors.
#[error("Pool Error: {0}")]
Pool(String),

#[error("BigTable config error: {0}")]
Config(String),
}

impl ReportableError for BigTableError {
Expand Down Expand Up @@ -144,6 +147,7 @@ impl ReportableError for BigTableError {
BigTableError::Recycle => "storage.bigtable.error.recycle",
BigTableError::Pool(_) => "storage.bigtable.error.pool",
BigTableError::GRPC(_) => "storage.bigtable.error.grpc",
BigTableError::Config(_) => "storage.bigtable.error.config",
};
Some(err)
}
Expand Down
40 changes: 19 additions & 21 deletions autopush-common/src/db/bigtable/bigtable_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use cadence::{CountedExt, StatsdClient};
use futures_util::StreamExt;
use google_cloud_rust_raw::bigtable::admin::v2::bigtable_table_admin::DropRowRangeRequest;
use google_cloud_rust_raw::bigtable::admin::v2::bigtable_table_admin_grpc::BigtableTableAdminClient;
use google_cloud_rust_raw::bigtable::v2::bigtable::ReadRowsRequest;
use google_cloud_rust_raw::bigtable::v2::bigtable::{PingAndWarmRequest, ReadRowsRequest};
use google_cloud_rust_raw::bigtable::v2::bigtable_grpc::BigtableClient;
use google_cloud_rust_raw::bigtable::v2::data::{RowFilter, RowFilter_Chain};
use google_cloud_rust_raw::bigtable::v2::{bigtable, data};
Expand Down Expand Up @@ -761,49 +761,47 @@ impl BigTableClientImpl {
#[derive(Clone)]
pub struct BigtableDb {
pub(super) conn: BigtableClient,
pub(super) metadata: Metadata,
pub(super) health_metadata: Metadata,
instance_name: String,
}

impl BigtableDb {
pub fn new(channel: Channel, metadata: &Metadata) -> Self {
pub fn new(channel: Channel, health_metadata: &Metadata, instance_name: &str) -> Self {
Self {
conn: BigtableClient::new(channel),
metadata: metadata.clone(),
health_metadata: health_metadata.clone(),
instance_name: instance_name.to_owned(),
}
}

/// Perform a simple connectivity check. This should return no actual results
/// but should verify that the connection is valid. We use this for the
/// Recycle check as well, so it has to be fairly low in the implementation
/// stack.
///
pub async fn health_check(
&mut self,
table_name: &str,
metrics: Arc<StatsdClient>,
) -> DbResult<bool> {
// Create a request that is GRPC valid, but does not point to a valid row.
let mut req = read_row_request(table_name, "NOT FOUND");
let mut filter = data::RowFilter::default();
filter.set_block_all_filter(true);
req.set_filter(filter);

let r = retry_policy(RETRY_COUNT)
/// "instance_name" is the "projects/{project}/instances/{instance}" portion of
/// the tablename.
///
pub async fn health_check(&mut self, metrics: Arc<StatsdClient>) -> DbResult<bool> {
let req = PingAndWarmRequest {
name: self.instance_name.clone(),
..Default::default()
};
// PingAndWarmResponse does not implement a stream since it does not return data.
let _r = retry_policy(RETRY_COUNT)
.retry_if(
|| async {
self.conn
.read_rows_opt(&req, call_opts(self.metadata.clone()))
.ping_and_warm_opt(&req, call_opts(self.health_metadata.clone()))
},
retryable_error(metrics.clone()),
)
.await
.map_err(|e| DbError::General(format!("BigTable connectivity error: {:?}", e)))?;

let (v, _stream) = r.into_future().await;
// Since this should return no rows (with the row key set to a value that shouldn't exist)
// the first component of the tuple should be None.
debug!("🉑 health check");
Ok(v.is_none())
Ok(true)
}
}

Expand Down Expand Up @@ -1321,7 +1319,7 @@ impl DbClient for BigTableClientImpl {
self.pool
.get()
.await?
.health_check(&self.settings.table_name, self.metrics.clone())
.health_check(self.metrics.clone())
.await
}

Expand Down
72 changes: 70 additions & 2 deletions autopush-common/src/db/bigtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,28 @@ pub struct BigTableDbSettings {
pub retry_count: usize,
}

// Used by test, but we don't want available for release.
#[allow(clippy::derivable_impls)]
#[cfg(test)]
impl Default for BigTableDbSettings {
fn default() -> Self {
Self {
table_name: Default::default(),
router_family: Default::default(),
message_family: Default::default(),
message_topic_family: Default::default(),
database_pool_max_size: Default::default(),
database_pool_create_timeout: Default::default(),
database_pool_wait_timeout: Default::default(),
database_pool_recycle_timeout: Default::default(),
database_pool_connection_ttl: Default::default(),
database_pool_max_idle: Default::default(),
route_to_leader: Default::default(),
retry_count: Default::default(),
}
}
}

impl BigTableDbSettings {
pub fn metadata(&self) -> Result<Metadata, BigTableError> {
MetadataBuilder::with_prefix(&self.table_name)
Expand All @@ -93,13 +115,20 @@ impl BigTableDbSettings {
.map_err(BigTableError::GRPC)
}

pub fn health_metadata(&self) -> Result<Metadata, BigTableError> {
MetadataBuilder::with_prefix(&self.table_name)
.routing_param("name", &self.get_instance_name()?)
.route_to_leader(self.route_to_leader)
.build()
.map_err(BigTableError::GRPC)
}

pub fn admin_metadata(&self) -> Result<Metadata, BigTableError> {
// Admin calls use a slightly different routing param and a truncated prefix
// See https://github.com/googleapis/google-cloud-cpp/issues/190#issuecomment-370520185
let Some(admin_prefix) = self.table_name.split_once("/tables/").map(|v| v.0) else {
return Err(BigTableError::Admin(
return Err(BigTableError::Config(
"Invalid table name specified".to_owned(),
None,
));
};
MetadataBuilder::with_prefix(admin_prefix)
Expand All @@ -108,6 +137,16 @@ impl BigTableDbSettings {
.build()
.map_err(BigTableError::GRPC)
}

pub fn get_instance_name(&self) -> Result<String, BigTableError> {
let parts: Vec<&str> = self.table_name.split('/').collect();
if parts.len() < 4 || parts[0] != "projects" || parts[2] != "instances" {
return Err(BigTableError::Config(
"Invalid table name specified. Cannot parse instance".to_owned(),
));
}
Ok(parts[0..4].join("/"))
}
}

impl TryFrom<&str> for BigTableDbSettings {
Expand Down Expand Up @@ -138,4 +177,33 @@ mod tests {
);
Ok(())
}
#[test]
fn test_get_instance() -> Result<(), super::BigTableError> {
let settings = super::BigTableDbSettings {
table_name: "projects/foo/instances/bar/tables/gorp".to_owned(),
..Default::default()
};
let res = settings.get_instance_name()?;
assert_eq!(res.as_str(), "projects/foo/instances/bar");

let settings = super::BigTableDbSettings {
table_name: "projects/foo/".to_owned(),
..Default::default()
};
assert!(settings.get_instance_name().is_err());

let settings = super::BigTableDbSettings {
table_name: "protect/foo/instances/bar/tables/gorp".to_owned(),
..Default::default()
};
assert!(settings.get_instance_name().is_err());

let settings = super::BigTableDbSettings {
table_name: "project/foo/instance/bar/tables/gorp".to_owned(),
..Default::default()
};
assert!(settings.get_instance_name().is_err());

Ok(())
}
}
8 changes: 6 additions & 2 deletions autopush-common/src/db/bigtable/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,11 @@ impl Manager for BigtableClientManager {
/// `BigtableClient` is the most atomic we can go.
async fn create(&self) -> Result<BigtableDb, DbError> {
debug!("🏊 Create a new pool entry.");
let entry = BigtableDb::new(self.get_channel()?, &self.settings.metadata()?);
let entry = BigtableDb::new(
self.get_channel()?,
&self.settings.health_metadata()?,
&self.settings.get_instance_name()?,
);
debug!("🏊 Bigtable connection acquired");
Ok(entry)
}
Expand Down Expand Up @@ -216,7 +220,7 @@ impl Manager for BigtableClientManager {
// note, this changes to `blocks_in_conditions` for 1.76+
#[allow(clippy::blocks_in_conditions)]
if !client
.health_check(&self.settings.table_name, self.metrics.clone())
.health_check(self.metrics.clone())
.await
.map_err(|e| {
debug!("🏊 Recycle requested (health). {:?}", e);
Expand Down