diff --git a/Cargo.toml b/Cargo.toml index b2efb2b9..a61b7992 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,7 +38,6 @@ prometheus = { version = "0.13", default-features = false } prost = "0.12" rand = "0.8" regex = "1" -reqwest = { version = "0.11", features = ["json", "native-tls-vendored"] } semver = "1.0" serde = "1.0" serde_derive = "1.0" @@ -54,6 +53,7 @@ env_logger = "0.10" fail = { version = "0.4", features = ["failpoints"] } proptest = "1" proptest-derive = "0.3" +reqwest = { version = "0.11", features = ["json", "native-tls-vendored"] } rstest = "0.18.2" serde_json = "1" serial_test = "0.5.0" diff --git a/proto-build/Cargo.toml b/proto-build/Cargo.toml index 45826e5f..36e0302c 100644 --- a/proto-build/Cargo.toml +++ b/proto-build/Cargo.toml @@ -12,4 +12,4 @@ edition = "2021" [dependencies] glob = "0.3" -tonic-build = { version = "0.10", features = ["cleanup-markdown"] } \ No newline at end of file +tonic-build = { version = "0.10", features = ["cleanup-markdown"] } diff --git a/src/common/errors.rs b/src/common/errors.rs index 59f55776..5e7f6303 100644 --- a/src/common/errors.rs +++ b/src/common/errors.rs @@ -51,13 +51,9 @@ pub enum Error { #[error("gRPC error: {0}")] Grpc(#[from] tonic::transport::Error), /// Wraps a `reqwest::Error`. - #[error("http error: {0}")] - Http(#[from] reqwest::Error), /// Wraps a `grpcio::Error`. #[error("gRPC api error: {0}")] GrpcAPI(#[from] tonic::Status), - #[error("Http request failed: unknown respond {0}")] - UnknownHttpRespond(String), /// Wraps a `grpcio::Error`. #[error("url error: {0}")] Url(#[from] tonic::codegen::http::uri::InvalidUri), @@ -113,6 +109,8 @@ pub enum Error { inner: Box, success_keys: Vec>, }, + #[error("Keyspace not found: {0}")] + KeyspaceNotFound(String), } impl From for Error { diff --git a/src/mock.rs b/src/mock.rs index 35245541..0a3fb83c 100644 --- a/src/mock.rs +++ b/src/mock.rs @@ -14,6 +14,7 @@ use derive_new::new; use crate::pd::PdClient; use crate::pd::PdRpcClient; use crate::pd::RetryClient; +use crate::proto::keyspacepb; use crate::proto::metapb::RegionEpoch; use crate::proto::metapb::{self}; use crate::region::RegionId; @@ -215,7 +216,7 @@ impl PdClient for MockPdClient { async fn invalidate_region_cache(&self, _ver_id: crate::region::RegionVerId) {} - async fn get_keyspace_id(&self, _keyspace: &str) -> Result { + async fn load_keyspace(&self, _keyspace: &str) -> Result { unimplemented!() } } diff --git a/src/pd/client.rs b/src/pd/client.rs index 2caac29b..ba36c0ef 100644 --- a/src/pd/client.rs +++ b/src/pd/client.rs @@ -14,6 +14,7 @@ use crate::kv::codec; use crate::pd::retry::RetryClientTrait; use crate::pd::Cluster; use crate::pd::RetryClient; +use crate::proto::keyspacepb; use crate::proto::kvrpcpb; use crate::proto::metapb; use crate::region::RegionId; @@ -65,7 +66,7 @@ pub trait PdClient: Send + Sync + 'static { async fn update_safepoint(self: Arc, safepoint: u64) -> Result; - async fn get_keyspace_id(&self, keyspace: &str) -> Result; + async fn load_keyspace(&self, keyspace: &str) -> Result; /// In transactional API, `key` is in raw format async fn store_for_key(self: Arc, key: &Key) -> Result { @@ -270,8 +271,8 @@ impl PdClient for PdRpcClient { self.region_cache.invalidate_region_cache(ver_id).await } - async fn get_keyspace_id(&self, keyspace: &str) -> Result { - self.pd.get_keyspace_id(keyspace).await + async fn load_keyspace(&self, keyspace: &str) -> Result { + self.pd.load_keyspace(keyspace).await } } diff --git a/src/pd/cluster.rs b/src/pd/cluster.rs index 4ad841c2..e74a78fb 100644 --- a/src/pd/cluster.rs +++ b/src/pd/cluster.rs @@ -15,6 +15,7 @@ use tonic::Request; use super::timestamp::TimestampOracle; use crate::internal_err; +use crate::proto::keyspacepb; use crate::proto::pdpb; use crate::Error; use crate::Result; @@ -25,7 +26,7 @@ use crate::Timestamp; pub struct Cluster { id: u64, client: pdpb::pd_client::PdClient, - endpoint: String, + keyspace_client: keyspacepb::keyspace_client::KeyspaceClient, members: pdpb::GetMembersResponse, tso: TimestampOracle, } @@ -94,16 +95,18 @@ impl Cluster { req.send(&mut self.client, timeout).await } - pub async fn get_keyspace_id(&self, keyspace: &str) -> Result { - let resp = - reqwest::get(format!("{}/pd/api/v2/keyspaces/{keyspace}", self.endpoint)).await?; - let body = resp.json::().await?; - let keyspace_id = body - .get("id") - .ok_or_else(|| Error::UnknownHttpRespond(body.to_string()))? - .as_u64() - .ok_or_else(|| Error::UnknownHttpRespond(body.to_string()))?; - Ok(keyspace_id as u32) + pub async fn load_keyspace( + &mut self, + keyspace: &str, + timeout: Duration, + ) -> Result { + let mut req = pd_request!(self.id, keyspacepb::LoadKeyspaceRequest); + req.name = keyspace.to_owned(); + let resp = req.send(&mut self.keyspace_client, timeout).await?; + let keyspace = resp + .keyspace + .ok_or_else(|| Error::KeyspaceNotFound(keyspace.to_owned()))?; + Ok(keyspace) } } @@ -123,13 +126,13 @@ impl Connection { timeout: Duration, ) -> Result { let members = self.validate_endpoints(endpoints, timeout).await?; - let (client, endpoint, members) = self.try_connect_leader(&members, timeout).await?; + let (client, keyspace_client, members) = self.try_connect_leader(&members, timeout).await?; let id = members.header.as_ref().unwrap().cluster_id; let tso = TimestampOracle::new(id, &client)?; let cluster = Cluster { id, client, - endpoint, + keyspace_client, members, tso, }; @@ -140,13 +143,13 @@ impl Connection { pub async fn reconnect(&self, cluster: &mut Cluster, timeout: Duration) -> Result<()> { warn!("updating pd client"); let start = Instant::now(); - let (client, endpoint, members) = + let (client, keyspace_client, members) = self.try_connect_leader(&cluster.members, timeout).await?; let tso = TimestampOracle::new(cluster.id, &client)?; *cluster = Cluster { id: cluster.id, client, - endpoint, + keyspace_client, members, tso, }; @@ -169,7 +172,7 @@ impl Connection { return Err(internal_err!("duplicated PD endpoint {}", ep)); } - let (_, resp) = match self.connect(ep, timeout).await { + let (_, _, resp) = match self.connect(ep, timeout).await { Ok(resp) => resp, // Ignore failed PD node. Err(e) => { @@ -211,16 +214,27 @@ impl Connection { &self, addr: &str, _timeout: Duration, - ) -> Result<(pdpb::pd_client::PdClient, pdpb::GetMembersResponse)> { + ) -> Result<( + pdpb::pd_client::PdClient, + keyspacepb::keyspace_client::KeyspaceClient, + pdpb::GetMembersResponse, + )> { let mut client = self .security_mgr .connect(addr, pdpb::pd_client::PdClient::::new) .await?; + let keyspace_client = self + .security_mgr + .connect( + addr, + keyspacepb::keyspace_client::KeyspaceClient::::new, + ) + .await?; let resp: pdpb::GetMembersResponse = client .get_members(pdpb::GetMembersRequest::default()) .await? .into_inner(); - Ok((client, resp)) + Ok((client, keyspace_client, resp)) } async fn try_connect( @@ -228,10 +242,14 @@ impl Connection { addr: &str, cluster_id: u64, timeout: Duration, - ) -> Result<(pdpb::pd_client::PdClient, pdpb::GetMembersResponse)> { - let (client, r) = self.connect(addr, timeout).await?; + ) -> Result<( + pdpb::pd_client::PdClient, + keyspacepb::keyspace_client::KeyspaceClient, + pdpb::GetMembersResponse, + )> { + let (client, keyspace_client, r) = self.connect(addr, timeout).await?; Connection::validate_cluster_id(addr, &r, cluster_id)?; - Ok((client, r)) + Ok((client, keyspace_client, r)) } fn validate_cluster_id( @@ -258,7 +276,7 @@ impl Connection { timeout: Duration, ) -> Result<( pdpb::pd_client::PdClient, - String, + keyspacepb::keyspace_client::KeyspaceClient, pdpb::GetMembersResponse, )> { let previous_leader = previous.leader.as_ref().unwrap(); @@ -274,7 +292,7 @@ impl Connection { { for ep in &m.client_urls { match self.try_connect(ep.as_str(), cluster_id, timeout).await { - Ok((_, r)) => { + Ok((_, _, r)) => { resp = Some(r); break 'outer; } @@ -290,10 +308,10 @@ impl Connection { if let Some(resp) = resp { let leader = resp.leader.as_ref().unwrap(); for ep in &leader.client_urls { - if let Ok((client, members)) = + if let Ok((client, keyspace_client, members)) = self.try_connect(ep.as_str(), cluster_id, timeout).await { - return Ok((client, ep.to_string(), members)); + return Ok((client, keyspace_client, members)); } } } @@ -306,18 +324,12 @@ type GrpcResult = std::result::Result; #[async_trait] trait PdMessage: Sized { + type Client: Send; type Response: PdResponse; - async fn rpc( - req: Request, - client: &mut pdpb::pd_client::PdClient, - ) -> GrpcResult; + async fn rpc(req: Request, client: &mut Self::Client) -> GrpcResult; - async fn send( - self, - client: &mut pdpb::pd_client::PdClient, - timeout: Duration, - ) -> Result { + async fn send(self, client: &mut Self::Client, timeout: Duration) -> Result { let mut req = self.into_request(); req.set_timeout(timeout); let response = Self::rpc(req, client).await?; @@ -332,64 +344,64 @@ trait PdMessage: Sized { #[async_trait] impl PdMessage for pdpb::GetRegionRequest { + type Client = pdpb::pd_client::PdClient; type Response = pdpb::GetRegionResponse; - async fn rpc( - req: Request, - client: &mut pdpb::pd_client::PdClient, - ) -> GrpcResult { + async fn rpc(req: Request, client: &mut Self::Client) -> GrpcResult { Ok(client.get_region(req).await?.into_inner()) } } #[async_trait] impl PdMessage for pdpb::GetRegionByIdRequest { + type Client = pdpb::pd_client::PdClient; type Response = pdpb::GetRegionResponse; - async fn rpc( - req: Request, - client: &mut pdpb::pd_client::PdClient, - ) -> GrpcResult { + async fn rpc(req: Request, client: &mut Self::Client) -> GrpcResult { Ok(client.get_region_by_id(req).await?.into_inner()) } } #[async_trait] impl PdMessage for pdpb::GetStoreRequest { + type Client = pdpb::pd_client::PdClient; type Response = pdpb::GetStoreResponse; - async fn rpc( - req: Request, - client: &mut pdpb::pd_client::PdClient, - ) -> GrpcResult { + async fn rpc(req: Request, client: &mut Self::Client) -> GrpcResult { Ok(client.get_store(req).await?.into_inner()) } } #[async_trait] impl PdMessage for pdpb::GetAllStoresRequest { + type Client = pdpb::pd_client::PdClient; type Response = pdpb::GetAllStoresResponse; - async fn rpc( - req: Request, - client: &mut pdpb::pd_client::PdClient, - ) -> GrpcResult { + async fn rpc(req: Request, client: &mut Self::Client) -> GrpcResult { Ok(client.get_all_stores(req).await?.into_inner()) } } #[async_trait] impl PdMessage for pdpb::UpdateGcSafePointRequest { + type Client = pdpb::pd_client::PdClient; type Response = pdpb::UpdateGcSafePointResponse; - async fn rpc( - req: Request, - client: &mut pdpb::pd_client::PdClient, - ) -> GrpcResult { + async fn rpc(req: Request, client: &mut Self::Client) -> GrpcResult { Ok(client.update_gc_safe_point(req).await?.into_inner()) } } +#[async_trait] +impl PdMessage for keyspacepb::LoadKeyspaceRequest { + type Client = keyspacepb::keyspace_client::KeyspaceClient; + type Response = keyspacepb::LoadKeyspaceResponse; + + async fn rpc(req: Request, client: &mut Self::Client) -> GrpcResult { + Ok(client.load_keyspace(req).await?.into_inner()) + } +} + trait PdResponse { fn header(&self) -> &pdpb::ResponseHeader; } @@ -417,3 +429,9 @@ impl PdResponse for pdpb::UpdateGcSafePointResponse { self.header.as_ref().unwrap() } } + +impl PdResponse for keyspacepb::LoadKeyspaceResponse { + fn header(&self) -> &pdpb::ResponseHeader { + self.header.as_ref().unwrap() + } +} diff --git a/src/pd/retry.rs b/src/pd/retry.rs index 718cfc35..c9ccf1e1 100644 --- a/src/pd/retry.rs +++ b/src/pd/retry.rs @@ -13,6 +13,7 @@ use tokio::time::sleep; use crate::pd::Cluster; use crate::pd::Connection; +use crate::proto::keyspacepb; use crate::proto::metapb; use crate::proto::pdpb::Timestamp; use crate::proto::pdpb::{self}; @@ -46,7 +47,7 @@ pub trait RetryClientTrait { async fn update_safepoint(self: Arc, safepoint: u64) -> Result; - async fn get_keyspace_id(&self, keyspace: &str) -> Result; + async fn load_keyspace(&self, keyspace: &str) -> Result; } /// Client for communication with a PD cluster. Has the facility to reconnect to the cluster. pub struct RetryClient { @@ -200,9 +201,9 @@ impl RetryClientTrait for RetryClient { }) } - async fn get_keyspace_id(&self, keyspace: &str) -> Result { - retry_mut!(self, "get_keyspace_id", |cluster| async { - cluster.get_keyspace_id(keyspace).await + async fn load_keyspace(&self, keyspace: &str) -> Result { + retry_mut!(self, "load_keyspace", |cluster| async { + cluster.load_keyspace(keyspace, self.timeout).await }) } } diff --git a/src/raw/client.rs b/src/raw/client.rs index 9029f905..9f081dff 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -111,8 +111,10 @@ impl Client { Arc::new(PdRpcClient::connect(&pd_endpoints, config.clone(), enable_codec).await?); let api_version = match config.keyspace { Some(keyspace) => { - let keyspace_id = rpc.get_keyspace_id(&keyspace).await?; - APIVersion::V2 { keyspace_id } + let keyspace = rpc.load_keyspace(&keyspace).await?; + APIVersion::V2 { + keyspace_id: keyspace.id, + } } None => APIVersion::V1, }; diff --git a/src/region_cache.rs b/src/region_cache.rs index bbf8921f..3a5c0a25 100644 --- a/src/region_cache.rs +++ b/src/region_cache.rs @@ -282,6 +282,7 @@ mod test { use super::RegionCache; use crate::common::Error; use crate::pd::RetryClientTrait; + use crate::proto::keyspacepb; use crate::proto::metapb::RegionEpoch; use crate::proto::metapb::{self}; use crate::region::RegionId; @@ -347,8 +348,8 @@ mod test { todo!() } - async fn get_keyspace_id(&self, _keyspace: &str) -> Result { - todo!() + async fn load_keyspace(&self, _keyspace: &str) -> Result { + unimplemented!() } } diff --git a/src/transaction/client.rs b/src/transaction/client.rs index 0b94c697..f2e9d067 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -111,8 +111,10 @@ impl Client { let pd = Arc::new(PdRpcClient::connect(&pd_endpoints, config.clone(), true).await?); let api_version = match config.keyspace { Some(keyspace) => { - let keyspace_id = pd.get_keyspace_id(&keyspace).await?; - APIVersion::V2 { keyspace_id } + let keyspace = pd.load_keyspace(&keyspace).await?; + APIVersion::V2 { + keyspace_id: keyspace.id, + } } None => APIVersion::V1, };