From aec6245ae98ed5291dde8203704c1a5128b84f84 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 24 Jun 2024 06:57:36 +0000 Subject: [PATCH] chore: rename `HeartbeatSender` to `HeartbeatAccepter` --- src/meta-srv/src/handler/failure_handler.rs | 12 ++++++------ src/meta-srv/src/region/supervisor.rs | 20 +++++++++++++------- 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs index ba7fabffaf8e..4c18f03330e8 100644 --- a/src/meta-srv/src/handler/failure_handler.rs +++ b/src/meta-srv/src/handler/failure_handler.rs @@ -18,17 +18,17 @@ use async_trait::async_trait; use crate::error::Result; use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler}; use crate::metasrv::Context; -use crate::region::supervisor::{DatanodeHeartbeat, HeartbeatSender, RegionSupervisor}; +use crate::region::supervisor::{DatanodeHeartbeat, HeartbeatAccepter, RegionSupervisor}; pub struct RegionFailureHandler { - heartbeat_sender: HeartbeatSender, + heartbeat_accepter: HeartbeatAccepter, } impl RegionFailureHandler { pub(crate) fn new(mut region_supervisor: RegionSupervisor) -> Self { - let heartbeat_sender = region_supervisor.heartbeat_sender(); + let heartbeat_accepter = region_supervisor.heartbeat_accepter(); tokio::spawn(async move { region_supervisor.run().await }); - Self { heartbeat_sender } + Self { heartbeat_accepter } } } @@ -48,8 +48,8 @@ impl HeartbeatHandler for RegionFailureHandler { return Ok(HandleControl::Continue); }; - self.heartbeat_sender - .send(DatanodeHeartbeat::from(stat)) + self.heartbeat_accepter + .accept(DatanodeHeartbeat::from(stat)) .await; Ok(HandleControl::Continue) diff --git a/src/meta-srv/src/region/supervisor.rs b/src/meta-srv/src/region/supervisor.rs index 4159124158c7..0f27f8eb44df 100644 --- a/src/meta-srv/src/region/supervisor.rs +++ b/src/meta-srv/src/region/supervisor.rs @@ -141,6 +141,8 @@ pub type RegionSupervisorRef = Arc; /// The default tick interval. pub const DEFAULT_TICK_INTERVAL: Duration = Duration::from_secs(1); +/// The [`RegionSupervisor`] is used to detect Region failures +/// and initiate Region failover upon detection, ensuring uninterrupted region service. pub struct RegionSupervisor { /// Used to detect the failure of regions. failure_detector: RegionFailureDetector, @@ -148,7 +150,9 @@ pub struct RegionSupervisor { tick_interval: Duration, /// Receives [Event]s. receiver: Receiver, + /// [Event] Sender. sender: Sender, + /// The context of [`SelectorRef`] selector_context: SelectorContext, /// Candidate node selector. selector: SelectorRef, @@ -159,12 +163,14 @@ pub struct RegionSupervisor { table_metadata_manager: TableMetadataManagerRef, } -pub(crate) struct HeartbeatSender { +/// [`HeartbeatAccepter`] forwards heartbeats to [`RegionSupervisor`]. +pub(crate) struct HeartbeatAccepter { sender: Sender, } -impl HeartbeatSender { - pub(crate) async fn send(&self, heartbeat: DatanodeHeartbeat) { +impl HeartbeatAccepter { + /// Accepts heartbeats from datanodes. + pub(crate) async fn accept(&self, heartbeat: DatanodeHeartbeat) { if let Err(e) = self.sender.send(Event::HeartbeatArrived(heartbeat)).await { error!(e; "RegionSupervisor is stop receiving heartbeat"); } @@ -202,14 +208,14 @@ impl RegionSupervisor { } } - /// Returns the [HeartbeatSender]. - pub(crate) fn heartbeat_sender(&self) -> HeartbeatSender { - HeartbeatSender { + /// Returns the [`HeartbeatAccepter`]. + pub(crate) fn heartbeat_accepter(&self) -> HeartbeatAccepter { + HeartbeatAccepter { sender: self.sender.clone(), } } - /// Returns the [RegionSupervisorTicker]. + /// Returns the [`RegionSupervisorTicker`]. pub(crate) fn ticker(&self) -> RegionSupervisorTickerRef { Arc::new(RegionSupervisorTicker { tick_interval: self.tick_interval,