Skip to content

Commit

Permalink
refactor: retrieve table routes in batches
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Jun 24, 2024
1 parent e698cc6 commit 117ea03
Showing 1 changed file with 46 additions and 15 deletions.
61 changes: 46 additions & 15 deletions src/meta-srv/src/region/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::fmt::Debug;
use std::sync::{Arc, Mutex};
use std::time::Duration;
Expand Down Expand Up @@ -247,6 +248,39 @@ impl RegionSupervisor {
return;
}
}

let table_ids = regions
.iter()
.map(|(_, _, region_id)| region_id.table_id())
.collect::<HashSet<_>>()
.into_iter()
.collect::<Vec<_>>();

let table_ids = match self
.table_metadata_manager
.table_route_manager()
.table_route_storage()
.batch_get(&table_ids)
.await
.context(error::TableMetadataManagerSnafu)
{
Ok(table_routes) => table_ids
.into_iter()
.zip(table_routes)
.flat_map(|(table_id, route)| {
if route.is_some() {
Some(table_id)
} else {
None
}
})
.collect::<HashSet<_>>(),
Err(err) => {
error!(err; "Failed to retrieves table routes: {table_ids:?}");
return;
}
};

warn!(
"Detects region failures: {:?}",
regions
Expand All @@ -255,9 +289,19 @@ impl RegionSupervisor {
.collect::<Vec<_>>()
);
for (cluster_id, datanode_id, region_id) in regions {
if let Err(err) = self.do_failover(cluster_id, datanode_id, region_id).await {
error!(err; "Failed to execute region failover for region: {region_id}, datanode: {datanode_id}");
if table_ids.contains(&region_id.table_id()) {
if let Err(err) = self.do_failover(cluster_id, datanode_id, region_id).await {
error!(err; "Failed to execute region failover for region: {region_id}, datanode: {datanode_id}");
} else {
self.failure_detector
.remove(&(cluster_id, datanode_id, region_id));
}
} else {
info!(
"Skipping to execute region failover for region: {}, target table:{} is not exists",
region_id,
region_id.table_id()
);
self.failure_detector
.remove(&(cluster_id, datanode_id, region_id));
}
Expand All @@ -277,7 +321,6 @@ impl RegionSupervisor {
datanode_id: DatanodeId,
region_id: RegionId,
) -> Result<()> {
// TODO(weny): skip the failover if meta is under maintenance mode.
let task = self.region_migration_manager.tracker().get(region_id);
match task {
Some(task) => {
Expand All @@ -287,18 +330,6 @@ impl RegionSupervisor {
);
}
None => {
if self
.table_metadata_manager
.table_route_manager()
.table_route_storage()
.get(region_id.table_id())
.await
.context(error::TableMetadataManagerSnafu)?
.is_none()
{
return Ok(());
}

let mut peers = self
.selector
.select(
Expand Down

0 comments on commit 117ea03

Please sign in to comment.