Skip to content

Commit

Permalink
refactor: migrate region failover implementation to region migration (#…
Browse files Browse the repository at this point in the history
…4172)

* refactor: migrate region failover implementation to region migration

* fix: use HEARTBEAT_INTERVAL_MILLIS as lease secs

* fix: return false if leader is downgraded

* fix: only remove failure detector after submitting procedure successfully

* feat: ignore dropped region

* refactor: retrieve table routes in batches

* refactor: disable region failover on local WAL implementation

* fix: move the guard into procedure

* feat: use real peer addr

* feat: use interval instead of sleep

* chore: rename `HeartbeatSender` to `HeartbeatAcceptor`

* chore: apply suggestions from CR

* chore: reduce duplicate code

* chore: apply suggestions from CR

* feat: lookup peer addr

* chore: add comments

* chore: apply suggestions from CR

* chore: apply suggestions from CR
  • Loading branch information
WenyXu committed Jun 25, 2024
1 parent f5ac158 commit 8cbe716
Show file tree
Hide file tree
Showing 19 changed files with 963 additions and 628 deletions.
2 changes: 1 addition & 1 deletion src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ use crate::rpc::store::BatchDeleteRequest;
use crate::DatanodeId;

pub const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.]*";
pub const MAINTENANCE_KEY: &str = "maintenance";
pub const MAINTENANCE_KEY: &str = "__maintenance";

const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info";
Expand Down
16 changes: 15 additions & 1 deletion src/common/meta/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ use crate::kv_backend::KvBackendRef;
use crate::node_manager::{
Datanode, DatanodeRef, Flownode, FlownodeRef, NodeManager, NodeManagerRef,
};
use crate::peer::{Peer, StandalonePeerLookupService};
use crate::peer::{Peer, PeerLookupService, StandalonePeerLookupService};
use crate::region_keeper::MemoryRegionKeeper;
use crate::sequence::SequenceBuilder;
use crate::wal_options_allocator::WalOptionsAllocator;
use crate::{ClusterId, DatanodeId, FlownodeId};

#[async_trait::async_trait]
pub trait MockDatanodeHandler: Sync + Send + Clone {
Expand Down Expand Up @@ -183,3 +184,16 @@ pub fn new_ddl_context_with_kv_backend(
peer_lookup_service: Arc::new(StandalonePeerLookupService::new()),
}
}

pub struct NoopPeerLookupService;

#[async_trait::async_trait]
impl PeerLookupService for NoopPeerLookupService {
async fn datanode(&self, _cluster_id: ClusterId, id: DatanodeId) -> Result<Option<Peer>> {
Ok(Some(Peer::empty(id)))
}

async fn flownode(&self, _cluster_id: ClusterId, id: FlownodeId) -> Result<Option<Peer>> {
Ok(Some(Peer::empty(id)))
}
}
5 changes: 5 additions & 0 deletions src/common/meta/src/wal_options_allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ impl WalOptionsAllocator {
}
}
}

/// Returns true if it's the remote WAL.
pub fn is_remote_wal(&self) -> bool {
matches!(&self, WalOptionsAllocator::Kafka(_))
}
}

/// Allocates a wal options for each region. The allocated wal options is encoded immediately.
Expand Down
9 changes: 9 additions & 0 deletions src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ pub enum Error {
peer_id: u64,
},

#[snafu(display("Failed to lookup peer: {}", peer_id))]
LookupPeer {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
peer_id: u64,
},

#[snafu(display("Another migration procedure is running for region: {}", region_id))]
MigrationRunning {
#[snafu(implicit)]
Expand Down Expand Up @@ -972,6 +980,7 @@ impl ErrorExt for Error {
}

Error::Other { source, .. } => source.status_code(),
Error::LookupPeer { source, .. } => source.status_code(),
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/meta-srv/src/failure_detector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub(crate) struct PhiAccrualFailureDetector {
last_heartbeat_millis: Option<i64>,
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct PhiAccrualFailureDetectorOptions {
pub threshold: f32,
Expand Down Expand Up @@ -195,7 +195,7 @@ fn phi(time_diff: i64, mean: f64, std_deviation: f64) -> f64 {
/// It is capped by the number of samples specified in `max_sample_size`.
///
/// The stats (mean, variance, std_deviation) are not defined for empty HeartbeatHistory.
#[derive(Clone)]
#[derive(Debug, Clone)]
struct HeartbeatHistory {
/// Number of samples to use for calculation of mean and standard deviation of inter-arrival
/// times.
Expand Down
144 changes: 27 additions & 117 deletions src/meta-srv/src/handler/failure_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,48 +12,25 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod runner;

use std::sync::Arc;

use api::v1::meta::{HeartbeatRequest, Role};
use async_trait::async_trait;
use common_meta::RegionIdent;
use common_telemetry::info;

use crate::error::Result;
use crate::failure_detector::PhiAccrualFailureDetectorOptions;
use crate::handler::failure_handler::runner::{FailureDetectControl, FailureDetectRunner};
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::{Context, ElectionRef};
use crate::procedure::region_failover::RegionFailoverManager;

pub(crate) struct DatanodeHeartbeat {
region_idents: Vec<RegionIdent>,
heartbeat_time: i64,
}
use crate::metasrv::Context;
use crate::region::supervisor::{DatanodeHeartbeat, HeartbeatAcceptor, RegionSupervisor};

pub struct RegionFailureHandler {
failure_detect_runner: FailureDetectRunner,
heartbeat_acceptor: HeartbeatAcceptor,
}

impl RegionFailureHandler {
pub(crate) async fn try_new(
election: Option<ElectionRef>,
region_failover_manager: Arc<RegionFailoverManager>,
failure_detector_options: PhiAccrualFailureDetectorOptions,
) -> Result<Self> {
region_failover_manager.try_start()?;

let mut failure_detect_runner = FailureDetectRunner::new(
election,
region_failover_manager.clone(),
failure_detector_options,
);
failure_detect_runner.start().await;

Ok(Self {
failure_detect_runner,
})
pub(crate) fn new(mut region_supervisor: RegionSupervisor) -> Self {
let heartbeat_acceptor = region_supervisor.heartbeat_acceptor();
info!("Starting region supervisor");
common_runtime::spawn_bg(async move { region_supervisor.run().await });
Self { heartbeat_acceptor }
}
}

Expand All @@ -66,73 +43,45 @@ impl HeartbeatHandler for RegionFailureHandler {
async fn handle(
&self,
_: &HeartbeatRequest,
ctx: &mut Context,
_ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<HandleControl> {
if ctx.is_infancy {
self.failure_detect_runner
.send_control(FailureDetectControl::Purge)
.await;
}

let Some(stat) = acc.stat.as_ref() else {
return Ok(HandleControl::Continue);
};

let heartbeat = DatanodeHeartbeat {
region_idents: stat
.region_stats
.iter()
.map(|x| {
let region_id = x.id;
RegionIdent {
cluster_id: stat.cluster_id,
datanode_id: stat.id,
table_id: region_id.table_id(),
region_number: region_id.region_number(),
engine: x.engine.clone(),
}
})
.collect(),
heartbeat_time: stat.timestamp_millis,
};

self.failure_detect_runner.send_heartbeat(heartbeat).await;
self.heartbeat_acceptor
.accept(DatanodeHeartbeat::from(stat))
.await;

Ok(HandleControl::Continue)
}
}

#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;

use api::v1::meta::HeartbeatRequest;
use common_catalog::consts::default_engine;
use common_meta::key::MAINTENANCE_KEY;
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;
use tokio::sync::oneshot;

use super::*;
use crate::handler::failure_handler::RegionFailureHandler;
use crate::handler::node_stat::{RegionStat, Stat};
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::builder::MetasrvBuilder;
use crate::test_util::create_region_failover_manager;
use crate::region::supervisor::tests::new_test_supervisor;
use crate::region::supervisor::Event;

#[tokio::test(flavor = "multi_thread")]
#[tokio::test]
async fn test_handle_heartbeat() {
let region_failover_manager = create_region_failover_manager();
let failure_detector_options = PhiAccrualFailureDetectorOptions::default();
let handler =
RegionFailureHandler::try_new(None, region_failover_manager, failure_detector_options)
.await
.unwrap();

let supervisor = new_test_supervisor();
let sender = supervisor.sender();
let handler = RegionFailureHandler::new(supervisor);
let req = &HeartbeatRequest::default();

let builder = MetasrvBuilder::new();
let metasrv = builder.build().await.unwrap();
let mut ctx = metasrv.new_ctx();
ctx.is_infancy = false;

let acc = &mut HeartbeatAccumulator::default();
fn new_region_stat(region_id: u64) -> RegionStat {
RegionStat {
Expand All @@ -153,48 +102,9 @@ mod tests {
});

handler.handle(req, &mut ctx, acc).await.unwrap();

let dump = handler.failure_detect_runner.dump().await;
assert_eq!(dump.iter().collect::<Vec<_>>().len(), 3);

// infancy makes heartbeats re-accumulated
ctx.is_infancy = true;
acc.stat = None;
handler.handle(req, &mut ctx, acc).await.unwrap();
let dump = handler.failure_detect_runner.dump().await;
assert_eq!(dump.iter().collect::<Vec<_>>().len(), 0);
}

#[tokio::test(flavor = "multi_thread")]
async fn test_maintenance_mode() {
let region_failover_manager = create_region_failover_manager();
let kv_backend = region_failover_manager.create_context().kv_backend.clone();
let _handler = RegionFailureHandler::try_new(
None,
region_failover_manager.clone(),
PhiAccrualFailureDetectorOptions::default(),
)
.await
.unwrap();

let kv_req = common_meta::rpc::store::PutRequest {
key: Vec::from(MAINTENANCE_KEY),
value: vec![],
prev_kv: false,
};
let _ = kv_backend.put(kv_req.clone()).await.unwrap();
assert_matches!(
region_failover_manager.is_maintenance_mode().await,
Ok(true)
);

let _ = kv_backend
.delete(MAINTENANCE_KEY.as_bytes(), false)
.await
.unwrap();
assert_matches!(
region_failover_manager.is_maintenance_mode().await,
Ok(false)
);
let (tx, rx) = oneshot::channel();
sender.send(Event::Dump(tx)).await.unwrap();
let detector = rx.await.unwrap();
assert_eq!(detector.iter().collect::<Vec<_>>().len(), 3);
}
}
Loading

0 comments on commit 8cbe716

Please sign in to comment.