Skip to content

Commit

Permalink
feat: use real peer addr
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Jun 24, 2024
1 parent 9212c06 commit 321cecf
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 31 deletions.
10 changes: 9 additions & 1 deletion src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,13 @@ pub enum Error {
#[snafu(source(from(common_config::error::Error, Box::new)))]
source: Box<common_config::error::Error>,
},

#[snafu(display("Region: {} leader peer is not found", region_id))]
RegionLeaderNotFound {
#[snafu(implicit)]
location: Location,
region_id: RegionId,
},
}

impl Error {
Expand Down Expand Up @@ -931,7 +938,8 @@ impl ErrorExt for Error {
| Error::RegionOpeningRace { .. }
| Error::RegionRouteNotFound { .. }
| Error::MigrationAbort { .. }
| Error::MigrationRunning { .. } => StatusCode::Unexpected,
| Error::MigrationRunning { .. }
| Error::RegionLeaderNotFound { .. } => StatusCode::Unexpected,
Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::SaveClusterInfo { source, .. }
| Error::InvalidClusterInfoFormat { source, .. } => source.status_code(),
Expand Down
95 changes: 65 additions & 30 deletions src/meta-srv/src/region/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ use std::time::Duration;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef, MAINTENANCE_KEY};
use common_meta::kv_backend::KvBackendRef;
use common_meta::peer::Peer;
use common_meta::rpc::router::RegionRoute;
use common_meta::{ClusterId, DatanodeId};
use common_runtime::JoinHandle;
use common_telemetry::{error, info, warn};
use common_time::util::current_time_millis;
use snafu::ResultExt;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::RegionId;
use tokio::sync::mpsc::{Receiver, Sender};

Expand Down Expand Up @@ -256,25 +257,14 @@ impl RegionSupervisor {
.into_iter()
.collect::<Vec<_>>();

let table_ids = match self
let table_routes = match self
.table_metadata_manager
.table_route_manager()
.table_route_storage()
.batch_get(&table_ids)
.batch_get_physical_table_routes(&table_ids)
.await
.context(error::TableMetadataManagerSnafu)
{
Ok(table_routes) => table_ids
.into_iter()
.zip(table_routes)
.flat_map(|(table_id, route)| {
if route.is_some() {
Some(table_id)
} else {
None
}
})
.collect::<HashSet<_>>(),
Ok(table_routes) => table_routes,
Err(err) => {
error!(err; "Failed to retrieves table routes: {table_ids:?}");
return;
Expand All @@ -289,25 +279,71 @@ impl RegionSupervisor {
.collect::<Vec<_>>()
);
for (cluster_id, datanode_id, region_id) in regions {
if table_ids.contains(&region_id.table_id()) {
if let Err(err) = self.do_failover(cluster_id, datanode_id, region_id).await {
error!(err; "Failed to execute region failover for region: {region_id}, datanode: {datanode_id}");
} else {
match table_routes.get(&region_id.table_id()) {
Some(route) => {
match self
.handle_region_failure(
cluster_id,
datanode_id,
region_id,
&route.region_routes,
)
.await
{
Ok(_) => {
self.failure_detector
.remove(&(cluster_id, datanode_id, region_id))
}
Err(err) => {
error!(err; "Failed to execute region failover for region: {region_id}, datanode: {datanode_id}");
}
}
}
None => {
info!(
"Skipping to execute region failover for region: {}, target table: {} is not exists",
region_id,
region_id.table_id()
);
self.failure_detector
.remove(&(cluster_id, datanode_id, region_id));
}
} else {
info!(
"Skipping to execute region failover for region: {}, target table:{} is not exists",
region_id,
region_id.table_id()
);
self.failure_detector
.remove(&(cluster_id, datanode_id, region_id));
}
}
}

async fn handle_region_failure(
&self,
cluster_id: ClusterId,
datanode_id: DatanodeId,
region_id: RegionId,
region_routes: &[RegionRoute],
) -> Result<()> {
let region_leader_peer = region_routes
.iter()
.find_map(|region| {
if region.region.id == region_id {
region.leader_peer.clone()
} else {
None
}
})
.context(error::RegionLeaderNotFoundSnafu { region_id })?;
ensure!(
region_leader_peer.id == datanode_id,
error::UnexpectedSnafu {
violated: format!(
"Region leader peer is changed, expected: Datanode {}, actual: Datanode {}",
datanode_id, region_leader_peer.id
)
}
);
self.do_failover(cluster_id, region_leader_peer, region_id)
.await?;

Ok(())
}

pub(crate) async fn is_maintenance_mode(&self) -> Result<bool> {
self.kv_backend
.exists(MAINTENANCE_KEY.as_bytes())
Expand All @@ -318,7 +354,7 @@ impl RegionSupervisor {
async fn do_failover(
&self,
cluster_id: ClusterId,
datanode_id: DatanodeId,
from_peer: Peer,
region_id: RegionId,
) -> Result<()> {
let task = self.region_migration_manager.tracker().get(region_id);
Expand All @@ -345,8 +381,7 @@ impl RegionSupervisor {
let task = RegionMigrationProcedureTask {
cluster_id,
region_id,
// TODO(weny): use real addr.
from_peer: Peer::new(datanode_id, String::new()),
from_peer,
to_peer,
replay_timeout: Duration::from_secs(60),
};
Expand Down

0 comments on commit 321cecf

Please sign in to comment.