Skip to content

Commit

Permalink
use keyspacepb
Browse files Browse the repository at this point in the history
  • Loading branch information
andylokandy committed Dec 18, 2023
1 parent bab9704 commit f518d0e
Show file tree
Hide file tree
Showing 10 changed files with 98 additions and 74 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ prometheus = { version = "0.13", default-features = false }
prost = "0.12"
rand = "0.8"
regex = "1"
reqwest = { version = "0.11", features = ["json", "native-tls-vendored"] }
semver = "1.0"
serde = "1.0"
serde_derive = "1.0"
Expand All @@ -54,6 +53,7 @@ env_logger = "0.10"
fail = { version = "0.4", features = ["failpoints"] }
proptest = "1"
proptest-derive = "0.3"
reqwest = { version = "0.11", features = ["json", "native-tls-vendored"] }
rstest = "0.18.2"
serde_json = "1"
serial_test = "0.5.0"
Expand Down
2 changes: 1 addition & 1 deletion proto-build/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ edition = "2021"

[dependencies]
glob = "0.3"
tonic-build = { version = "0.10", features = ["cleanup-markdown"] }
tonic-build = { version = "0.10", features = ["cleanup-markdown"] }
6 changes: 2 additions & 4 deletions src/common/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,9 @@ pub enum Error {
#[error("gRPC error: {0}")]
Grpc(#[from] tonic::transport::Error),
/// Wraps a `reqwest::Error`.
#[error("http error: {0}")]
Http(#[from] reqwest::Error),
/// Wraps a `grpcio::Error`.
#[error("gRPC api error: {0}")]
GrpcAPI(#[from] tonic::Status),
#[error("Http request failed: unknown respond {0}")]
UnknownHttpRespond(String),
/// Wraps a `grpcio::Error`.
#[error("url error: {0}")]
Url(#[from] tonic::codegen::http::uri::InvalidUri),
Expand Down Expand Up @@ -113,6 +109,8 @@ pub enum Error {
inner: Box<Error>,
success_keys: Vec<Vec<u8>>,
},
#[error("Keyspace not found: {0}")]
KeyspaceNotFound(String),
}

impl From<crate::proto::errorpb::Error> for Error {
Expand Down
3 changes: 2 additions & 1 deletion src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use derive_new::new;
use crate::pd::PdClient;
use crate::pd::PdRpcClient;
use crate::pd::RetryClient;
use crate::proto::keyspacepb;
use crate::proto::metapb::RegionEpoch;
use crate::proto::metapb::{self};
use crate::region::RegionId;
Expand Down Expand Up @@ -215,7 +216,7 @@ impl PdClient for MockPdClient {

async fn invalidate_region_cache(&self, _ver_id: crate::region::RegionVerId) {}

async fn get_keyspace_id(&self, _keyspace: &str) -> Result<u32> {
async fn load_keyspace(&self, _keyspace: &str) -> Result<keyspacepb::KeyspaceMeta> {
unimplemented!()
}
}
7 changes: 4 additions & 3 deletions src/pd/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::kv::codec;
use crate::pd::retry::RetryClientTrait;
use crate::pd::Cluster;
use crate::pd::RetryClient;
use crate::proto::keyspacepb;
use crate::proto::kvrpcpb;
use crate::proto::metapb;
use crate::region::RegionId;
Expand Down Expand Up @@ -65,7 +66,7 @@ pub trait PdClient: Send + Sync + 'static {

async fn update_safepoint(self: Arc<Self>, safepoint: u64) -> Result<bool>;

async fn get_keyspace_id(&self, keyspace: &str) -> Result<u32>;
async fn load_keyspace(&self, keyspace: &str) -> Result<keyspacepb::KeyspaceMeta>;

/// In transactional API, `key` is in raw format
async fn store_for_key(self: Arc<Self>, key: &Key) -> Result<RegionStore> {
Expand Down Expand Up @@ -270,8 +271,8 @@ impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
self.region_cache.invalidate_region_cache(ver_id).await
}

async fn get_keyspace_id(&self, keyspace: &str) -> Result<u32> {
self.pd.get_keyspace_id(keyspace).await
async fn load_keyspace(&self, keyspace: &str) -> Result<keyspacepb::KeyspaceMeta> {
self.pd.load_keyspace(keyspace).await
}
}

Expand Down
126 changes: 72 additions & 54 deletions src/pd/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use tonic::Request;

use super::timestamp::TimestampOracle;
use crate::internal_err;
use crate::proto::keyspacepb;
use crate::proto::pdpb;
use crate::Error;
use crate::Result;
Expand All @@ -25,7 +26,7 @@ use crate::Timestamp;
pub struct Cluster {
id: u64,
client: pdpb::pd_client::PdClient<Channel>,
endpoint: String,
keyspace_client: keyspacepb::keyspace_client::KeyspaceClient<Channel>,
members: pdpb::GetMembersResponse,
tso: TimestampOracle,
}
Expand Down Expand Up @@ -94,16 +95,18 @@ impl Cluster {
req.send(&mut self.client, timeout).await
}

pub async fn get_keyspace_id(&self, keyspace: &str) -> Result<u32> {
let resp =
reqwest::get(format!("{}/pd/api/v2/keyspaces/{keyspace}", self.endpoint)).await?;
let body = resp.json::<serde_json::Value>().await?;
let keyspace_id = body
.get("id")
.ok_or_else(|| Error::UnknownHttpRespond(body.to_string()))?
.as_u64()
.ok_or_else(|| Error::UnknownHttpRespond(body.to_string()))?;
Ok(keyspace_id as u32)
pub async fn load_keyspace(
&mut self,
keyspace: &str,
timeout: Duration,
) -> Result<keyspacepb::KeyspaceMeta> {
let mut req = pd_request!(self.id, keyspacepb::LoadKeyspaceRequest);
req.name = keyspace.to_owned();
let resp = req.send(&mut self.keyspace_client, timeout).await?;
let keyspace = resp
.keyspace
.ok_or_else(|| Error::KeyspaceNotFound(keyspace.to_owned()))?;
Ok(keyspace)
}
}

Expand All @@ -123,13 +126,13 @@ impl Connection {
timeout: Duration,
) -> Result<Cluster> {
let members = self.validate_endpoints(endpoints, timeout).await?;
let (client, endpoint, members) = self.try_connect_leader(&members, timeout).await?;
let (client, keyspace_client, members) = self.try_connect_leader(&members, timeout).await?;
let id = members.header.as_ref().unwrap().cluster_id;
let tso = TimestampOracle::new(id, &client)?;
let cluster = Cluster {
id,
client,
endpoint,
keyspace_client,
members,
tso,
};
Expand All @@ -140,13 +143,13 @@ impl Connection {
pub async fn reconnect(&self, cluster: &mut Cluster, timeout: Duration) -> Result<()> {
warn!("updating pd client");
let start = Instant::now();
let (client, endpoint, members) =
let (client, keyspace_client, members) =
self.try_connect_leader(&cluster.members, timeout).await?;
let tso = TimestampOracle::new(cluster.id, &client)?;
*cluster = Cluster {
id: cluster.id,
client,
endpoint,
keyspace_client,
members,
tso,
};
Expand All @@ -169,7 +172,7 @@ impl Connection {
return Err(internal_err!("duplicated PD endpoint {}", ep));
}

let (_, resp) = match self.connect(ep, timeout).await {
let (_, _, resp) = match self.connect(ep, timeout).await {
Ok(resp) => resp,
// Ignore failed PD node.
Err(e) => {
Expand Down Expand Up @@ -211,27 +214,42 @@ impl Connection {
&self,
addr: &str,
_timeout: Duration,
) -> Result<(pdpb::pd_client::PdClient<Channel>, pdpb::GetMembersResponse)> {
) -> Result<(
pdpb::pd_client::PdClient<Channel>,
keyspacepb::keyspace_client::KeyspaceClient<Channel>,
pdpb::GetMembersResponse,
)> {
let mut client = self
.security_mgr
.connect(addr, pdpb::pd_client::PdClient::<Channel>::new)
.await?;
let keyspace_client = self
.security_mgr
.connect(
addr,
keyspacepb::keyspace_client::KeyspaceClient::<Channel>::new,
)
.await?;
let resp: pdpb::GetMembersResponse = client
.get_members(pdpb::GetMembersRequest::default())
.await?
.into_inner();
Ok((client, resp))
Ok((client, keyspace_client, resp))
}

async fn try_connect(
&self,
addr: &str,
cluster_id: u64,
timeout: Duration,
) -> Result<(pdpb::pd_client::PdClient<Channel>, pdpb::GetMembersResponse)> {
let (client, r) = self.connect(addr, timeout).await?;
) -> Result<(
pdpb::pd_client::PdClient<Channel>,
keyspacepb::keyspace_client::KeyspaceClient<Channel>,
pdpb::GetMembersResponse,
)> {
let (client, keyspace_client, r) = self.connect(addr, timeout).await?;
Connection::validate_cluster_id(addr, &r, cluster_id)?;
Ok((client, r))
Ok((client, keyspace_client, r))
}

fn validate_cluster_id(
Expand All @@ -258,7 +276,7 @@ impl Connection {
timeout: Duration,
) -> Result<(
pdpb::pd_client::PdClient<Channel>,
String,
keyspacepb::keyspace_client::KeyspaceClient<Channel>,
pdpb::GetMembersResponse,
)> {
let previous_leader = previous.leader.as_ref().unwrap();
Expand All @@ -274,7 +292,7 @@ impl Connection {
{
for ep in &m.client_urls {
match self.try_connect(ep.as_str(), cluster_id, timeout).await {
Ok((_, r)) => {
Ok((_, _, r)) => {
resp = Some(r);
break 'outer;
}
Expand All @@ -290,10 +308,10 @@ impl Connection {
if let Some(resp) = resp {
let leader = resp.leader.as_ref().unwrap();
for ep in &leader.client_urls {
if let Ok((client, members)) =
if let Ok((client, keyspace_client, members)) =
self.try_connect(ep.as_str(), cluster_id, timeout).await
{
return Ok((client, ep.to_string(), members));
return Ok((client, keyspace_client, members));
}
}
}
Expand All @@ -306,18 +324,12 @@ type GrpcResult<T> = std::result::Result<T, tonic::Status>;

#[async_trait]
trait PdMessage: Sized {
type Client: Send;
type Response: PdResponse;

async fn rpc(
req: Request<Self>,
client: &mut pdpb::pd_client::PdClient<Channel>,
) -> GrpcResult<Self::Response>;
async fn rpc(req: Request<Self>, client: &mut Self::Client) -> GrpcResult<Self::Response>;

async fn send(
self,
client: &mut pdpb::pd_client::PdClient<Channel>,
timeout: Duration,
) -> Result<Self::Response> {
async fn send(self, client: &mut Self::Client, timeout: Duration) -> Result<Self::Response> {
let mut req = self.into_request();
req.set_timeout(timeout);
let response = Self::rpc(req, client).await?;
Expand All @@ -332,64 +344,64 @@ trait PdMessage: Sized {

#[async_trait]
impl PdMessage for pdpb::GetRegionRequest {
type Client = pdpb::pd_client::PdClient<Channel>;
type Response = pdpb::GetRegionResponse;

async fn rpc(
req: Request<Self>,
client: &mut pdpb::pd_client::PdClient<Channel>,
) -> GrpcResult<Self::Response> {
async fn rpc(req: Request<Self>, client: &mut Self::Client) -> GrpcResult<Self::Response> {
Ok(client.get_region(req).await?.into_inner())
}
}

#[async_trait]
impl PdMessage for pdpb::GetRegionByIdRequest {
type Client = pdpb::pd_client::PdClient<Channel>;
type Response = pdpb::GetRegionResponse;

async fn rpc(
req: Request<Self>,
client: &mut pdpb::pd_client::PdClient<Channel>,
) -> GrpcResult<Self::Response> {
async fn rpc(req: Request<Self>, client: &mut Self::Client) -> GrpcResult<Self::Response> {
Ok(client.get_region_by_id(req).await?.into_inner())
}
}

#[async_trait]
impl PdMessage for pdpb::GetStoreRequest {
type Client = pdpb::pd_client::PdClient<Channel>;
type Response = pdpb::GetStoreResponse;

async fn rpc(
req: Request<Self>,
client: &mut pdpb::pd_client::PdClient<Channel>,
) -> GrpcResult<Self::Response> {
async fn rpc(req: Request<Self>, client: &mut Self::Client) -> GrpcResult<Self::Response> {
Ok(client.get_store(req).await?.into_inner())
}
}

#[async_trait]
impl PdMessage for pdpb::GetAllStoresRequest {
type Client = pdpb::pd_client::PdClient<Channel>;
type Response = pdpb::GetAllStoresResponse;

async fn rpc(
req: Request<Self>,
client: &mut pdpb::pd_client::PdClient<Channel>,
) -> GrpcResult<Self::Response> {
async fn rpc(req: Request<Self>, client: &mut Self::Client) -> GrpcResult<Self::Response> {
Ok(client.get_all_stores(req).await?.into_inner())
}
}

#[async_trait]
impl PdMessage for pdpb::UpdateGcSafePointRequest {
type Client = pdpb::pd_client::PdClient<Channel>;
type Response = pdpb::UpdateGcSafePointResponse;

async fn rpc(
req: Request<Self>,
client: &mut pdpb::pd_client::PdClient<Channel>,
) -> GrpcResult<Self::Response> {
async fn rpc(req: Request<Self>, client: &mut Self::Client) -> GrpcResult<Self::Response> {
Ok(client.update_gc_safe_point(req).await?.into_inner())
}
}

#[async_trait]
impl PdMessage for keyspacepb::LoadKeyspaceRequest {
type Client = keyspacepb::keyspace_client::KeyspaceClient<Channel>;
type Response = keyspacepb::LoadKeyspaceResponse;

async fn rpc(req: Request<Self>, client: &mut Self::Client) -> GrpcResult<Self::Response> {
Ok(client.load_keyspace(req).await?.into_inner())
}
}

trait PdResponse {
fn header(&self) -> &pdpb::ResponseHeader;
}
Expand Down Expand Up @@ -417,3 +429,9 @@ impl PdResponse for pdpb::UpdateGcSafePointResponse {
self.header.as_ref().unwrap()
}
}

impl PdResponse for keyspacepb::LoadKeyspaceResponse {
fn header(&self) -> &pdpb::ResponseHeader {
self.header.as_ref().unwrap()
}
}
Loading

0 comments on commit f518d0e

Please sign in to comment.