Skip to content

Commit

Permalink
refactor: migrate region failover implementation to region migration
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Jun 20, 2024
1 parent d0c898e commit 9e5f220
Show file tree
Hide file tree
Showing 16 changed files with 863 additions and 622 deletions.
2 changes: 1 addition & 1 deletion src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,12 @@ pub mod flow;
pub mod schema_name;
pub mod table_info;
pub mod table_name;
pub mod view_info;
pub mod table_route;
#[cfg(any(test, feature = "testing"))]
pub mod test_utils;
mod tombstone;
pub(crate) mod txn_helper;
pub mod view_info;

use std::collections::{BTreeMap, HashMap, HashSet};
use std::fmt::Debug;
Expand Down
2 changes: 1 addition & 1 deletion src/meta-srv/src/failure_detector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub(crate) struct PhiAccrualFailureDetector {
last_heartbeat_millis: Option<i64>,
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct PhiAccrualFailureDetectorOptions {
pub threshold: f32,
Expand Down
167 changes: 51 additions & 116 deletions src/meta-srv/src/handler/failure_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,48 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod runner;

use std::sync::Arc;

use api::v1::meta::{HeartbeatRequest, Role};
use async_trait::async_trait;
use common_meta::RegionIdent;

use crate::error::Result;
use crate::failure_detector::PhiAccrualFailureDetectorOptions;
use crate::handler::failure_handler::runner::{FailureDetectControl, FailureDetectRunner};
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::{Context, ElectionRef};
use crate::procedure::region_failover::RegionFailoverManager;

pub(crate) struct DatanodeHeartbeat {
region_idents: Vec<RegionIdent>,
heartbeat_time: i64,
}
use crate::metasrv::Context;
use crate::region::supervisor::{DatanodeHeartbeat, HeartbeatSender, RegionSupervisor};

pub struct RegionFailureHandler {
failure_detect_runner: FailureDetectRunner,
heartbeat_sender: HeartbeatSender,
}

impl RegionFailureHandler {
pub(crate) async fn try_new(
election: Option<ElectionRef>,
region_failover_manager: Arc<RegionFailoverManager>,
failure_detector_options: PhiAccrualFailureDetectorOptions,
) -> Result<Self> {
region_failover_manager.try_start()?;

let mut failure_detect_runner = FailureDetectRunner::new(
election,
region_failover_manager.clone(),
failure_detector_options,
);
failure_detect_runner.start().await;

Ok(Self {
failure_detect_runner,
})
pub(crate) fn new(mut region_supervisor: RegionSupervisor) -> Self {
let heartbeat_sender = region_supervisor.heartbeat_sender();
tokio::spawn(async move { region_supervisor.run().await });
Self { heartbeat_sender }
}
}

Expand All @@ -66,73 +41,72 @@ impl HeartbeatHandler for RegionFailureHandler {
async fn handle(
&self,
_: &HeartbeatRequest,
ctx: &mut Context,
_ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<HandleControl> {
if ctx.is_infancy {
self.failure_detect_runner
.send_control(FailureDetectControl::Purge)
.await;
}

let Some(stat) = acc.stat.as_ref() else {
return Ok(HandleControl::Continue);
};

let heartbeat = DatanodeHeartbeat {
region_idents: stat
.region_stats
.iter()
.map(|x| {
let region_id = x.id;
RegionIdent {
cluster_id: stat.cluster_id,
datanode_id: stat.id,
table_id: region_id.table_id(),
region_number: region_id.region_number(),
engine: x.engine.clone(),
}
})
.collect(),
heartbeat_time: stat.timestamp_millis,
};

self.failure_detect_runner.send_heartbeat(heartbeat).await;
self.heartbeat_sender
.send(DatanodeHeartbeat::from(stat))
.await;

Ok(HandleControl::Continue)
}
}

#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::sync::Arc;
use std::time::Duration;

use api::v1::meta::HeartbeatRequest;
use common_catalog::consts::default_engine;
use common_meta::key::MAINTENANCE_KEY;
use common_meta::peer::Peer;
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;
use tokio::sync::oneshot;

use super::*;
use crate::handler::failure_handler::RegionFailureHandler;
use crate::handler::node_stat::{RegionStat, Stat};
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::builder::MetasrvBuilder;
use crate::test_util::create_region_failover_manager;
use crate::procedure::region_migration::manager::RegionMigrationManager;
use crate::procedure::region_migration::test_util::TestingEnv;
use crate::region::supervisor::{Event, RegionSupervisor};
use crate::selector::test_utils::{new_test_selector_context, RandomNodeSelector};

fn new_test_supervisor() -> RegionSupervisor {
let env = TestingEnv::new();
let kv_backend = env.kv_backend();
let selector_context = new_test_selector_context();
let selector = Arc::new(RandomNodeSelector::new(vec![Peer::empty(1)]));
let context_factory = env.context_factory();
let region_migration_manager = Arc::new(RegionMigrationManager::new(
env.procedure_manager().clone(),
context_factory,
));

RegionSupervisor::new(
Default::default(),
Duration::from_secs(1),
selector_context,
selector,
region_migration_manager,
kv_backend,
)
}

#[tokio::test(flavor = "multi_thread")]
#[tokio::test]
async fn test_handle_heartbeat() {
let region_failover_manager = create_region_failover_manager();
let failure_detector_options = PhiAccrualFailureDetectorOptions::default();
let handler =
RegionFailureHandler::try_new(None, region_failover_manager, failure_detector_options)
.await
.unwrap();

let supervisor = new_test_supervisor();
let sender = supervisor.sender();
let handler = RegionFailureHandler::new(supervisor);
let req = &HeartbeatRequest::default();

let builder = MetasrvBuilder::new();
let metasrv = builder.build().await.unwrap();
let mut ctx = metasrv.new_ctx();
ctx.is_infancy = false;

let acc = &mut HeartbeatAccumulator::default();
fn new_region_stat(region_id: u64) -> RegionStat {
RegionStat {
Expand All @@ -153,48 +127,9 @@ mod tests {
});

handler.handle(req, &mut ctx, acc).await.unwrap();

let dump = handler.failure_detect_runner.dump().await;
assert_eq!(dump.iter().collect::<Vec<_>>().len(), 3);

// infancy makes heartbeats re-accumulated
ctx.is_infancy = true;
acc.stat = None;
handler.handle(req, &mut ctx, acc).await.unwrap();
let dump = handler.failure_detect_runner.dump().await;
assert_eq!(dump.iter().collect::<Vec<_>>().len(), 0);
}

#[tokio::test(flavor = "multi_thread")]
async fn test_maintenance_mode() {
let region_failover_manager = create_region_failover_manager();
let kv_backend = region_failover_manager.create_context().kv_backend.clone();
let _handler = RegionFailureHandler::try_new(
None,
region_failover_manager.clone(),
PhiAccrualFailureDetectorOptions::default(),
)
.await
.unwrap();

let kv_req = common_meta::rpc::store::PutRequest {
key: Vec::from(MAINTENANCE_KEY),
value: vec![],
prev_kv: false,
};
let _ = kv_backend.put(kv_req.clone()).await.unwrap();
assert_matches!(
region_failover_manager.is_maintenance_mode().await,
Ok(true)
);

let _ = kv_backend
.delete(MAINTENANCE_KEY.as_bytes(), false)
.await
.unwrap();
assert_matches!(
region_failover_manager.is_maintenance_mode().await,
Ok(false)
);
let (tx, rx) = oneshot::channel();
sender.send(Event::Dump(tx)).await.unwrap();
let detector = rx.await.unwrap();
assert_eq!(detector.iter().collect::<Vec<_>>().len(), 3);
}
}
Loading

0 comments on commit 9e5f220

Please sign in to comment.