Skip to content

Commit

Permalink
chore: rename HeartbeatSender to HeartbeatAcceptor
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Jun 24, 2024
1 parent 2b45d9e commit c948c76
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 13 deletions.
12 changes: 6 additions & 6 deletions src/meta-srv/src/handler/failure_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@ use async_trait::async_trait;
use crate::error::Result;
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
use crate::region::supervisor::{DatanodeHeartbeat, HeartbeatSender, RegionSupervisor};
use crate::region::supervisor::{DatanodeHeartbeat, HeartbeatAcceptor, RegionSupervisor};

pub struct RegionFailureHandler {
heartbeat_sender: HeartbeatSender,
heartbeat_acceptor: HeartbeatAcceptor,
}

impl RegionFailureHandler {
pub(crate) fn new(mut region_supervisor: RegionSupervisor) -> Self {
let heartbeat_sender = region_supervisor.heartbeat_sender();
let heartbeat_acceptor = region_supervisor.heartbeat_acceptor();
tokio::spawn(async move { region_supervisor.run().await });
Self { heartbeat_sender }
Self { heartbeat_acceptor }
}
}

Expand All @@ -48,8 +48,8 @@ impl HeartbeatHandler for RegionFailureHandler {
return Ok(HandleControl::Continue);
};

self.heartbeat_sender
.send(DatanodeHeartbeat::from(stat))
self.heartbeat_acceptor
.accept(DatanodeHeartbeat::from(stat))
.await;

Ok(HandleControl::Continue)
Expand Down
20 changes: 13 additions & 7 deletions src/meta-srv/src/region/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,18 @@ pub type RegionSupervisorRef = Arc<RegionSupervisor>;
/// The default tick interval.
pub const DEFAULT_TICK_INTERVAL: Duration = Duration::from_secs(1);

/// The [`RegionSupervisor`] is used to detect Region failures
/// and initiate Region failover upon detection, ensuring uninterrupted region service.
pub struct RegionSupervisor {
/// Used to detect the failure of regions.
failure_detector: RegionFailureDetector,
/// The interval of tick
tick_interval: Duration,
/// Receives [Event]s.
receiver: Receiver<Event>,
/// [Event] Sender.
sender: Sender<Event>,
/// The context of [`SelectorRef`]
selector_context: SelectorContext,
/// Candidate node selector.
selector: SelectorRef,
Expand All @@ -159,12 +163,14 @@ pub struct RegionSupervisor {
table_metadata_manager: TableMetadataManagerRef,
}

pub(crate) struct HeartbeatSender {
/// [`HeartbeatAcceptor`] forwards heartbeats to [`RegionSupervisor`].
pub(crate) struct HeartbeatAcceptor {
sender: Sender<Event>,
}

impl HeartbeatSender {
pub(crate) async fn send(&self, heartbeat: DatanodeHeartbeat) {
impl HeartbeatAcceptor {
/// Accepts heartbeats from datanodes.
pub(crate) async fn accept(&self, heartbeat: DatanodeHeartbeat) {
if let Err(e) = self.sender.send(Event::HeartbeatArrived(heartbeat)).await {
error!(e; "RegionSupervisor is stop receiving heartbeat");
}
Expand Down Expand Up @@ -202,14 +208,14 @@ impl RegionSupervisor {
}
}

/// Returns the [HeartbeatSender].
pub(crate) fn heartbeat_sender(&self) -> HeartbeatSender {
HeartbeatSender {
/// Returns the [`HeartbeatAcceptor`].
pub(crate) fn heartbeat_acceptor(&self) -> HeartbeatAcceptor {
HeartbeatAcceptor {
sender: self.sender.clone(),
}
}

/// Returns the [RegionSupervisorTicker].
/// Returns the [`RegionSupervisorTicker`].
pub(crate) fn ticker(&self) -> RegionSupervisorTickerRef {
Arc::new(RegionSupervisorTicker {
tick_interval: self.tick_interval,
Expand Down

0 comments on commit c948c76

Please sign in to comment.