Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: migrate region failover implementation to region migration #4172

Merged
merged 19 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/common/meta/src/wal_options_allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ impl WalOptionsAllocator {
}
}
}

/// Returns true if it's the remote WAL.
pub fn is_remote_wal(&self) -> bool {
matches!(&self, WalOptionsAllocator::Kafka(_))
}
}

/// Allocates a wal options for each region. The allocated wal options is encoded immediately.
Expand Down
10 changes: 9 additions & 1 deletion src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,13 @@ pub enum Error {
#[snafu(source(from(common_config::error::Error, Box::new)))]
source: Box<common_config::error::Error>,
},

#[snafu(display("Region: {} leader peer is not found", region_id))]
RegionLeaderNotFound {
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
#[snafu(implicit)]
location: Location,
region_id: RegionId,
},
}

impl Error {
Expand Down Expand Up @@ -931,7 +938,8 @@ impl ErrorExt for Error {
| Error::RegionOpeningRace { .. }
| Error::RegionRouteNotFound { .. }
| Error::MigrationAbort { .. }
| Error::MigrationRunning { .. } => StatusCode::Unexpected,
| Error::MigrationRunning { .. }
| Error::RegionLeaderNotFound { .. } => StatusCode::Unexpected,
Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::SaveClusterInfo { source, .. }
| Error::InvalidClusterInfoFormat { source, .. } => source.status_code(),
Expand Down
4 changes: 2 additions & 2 deletions src/meta-srv/src/failure_detector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub(crate) struct PhiAccrualFailureDetector {
last_heartbeat_millis: Option<i64>,
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct PhiAccrualFailureDetectorOptions {
pub threshold: f32,
Expand Down Expand Up @@ -195,7 +195,7 @@ fn phi(time_diff: i64, mean: f64, std_deviation: f64) -> f64 {
/// It is capped by the number of samples specified in `max_sample_size`.
///
/// The stats (mean, variance, std_deviation) are not defined for empty HeartbeatHistory.
#[derive(Clone)]
#[derive(Debug, Clone)]
struct HeartbeatHistory {
/// Number of samples to use for calculation of mean and standard deviation of inter-arrival
/// times.
Expand Down
144 changes: 27 additions & 117 deletions src/meta-srv/src/handler/failure_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,48 +12,25 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod runner;

use std::sync::Arc;

use api::v1::meta::{HeartbeatRequest, Role};
use async_trait::async_trait;
use common_meta::RegionIdent;
use common_telemetry::info;
WenyXu marked this conversation as resolved.
Show resolved Hide resolved

use crate::error::Result;
use crate::failure_detector::PhiAccrualFailureDetectorOptions;
use crate::handler::failure_handler::runner::{FailureDetectControl, FailureDetectRunner};
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::{Context, ElectionRef};
use crate::procedure::region_failover::RegionFailoverManager;

pub(crate) struct DatanodeHeartbeat {
region_idents: Vec<RegionIdent>,
heartbeat_time: i64,
}
use crate::metasrv::Context;
use crate::region::supervisor::{DatanodeHeartbeat, HeartbeatAcceptor, RegionSupervisor};

pub struct RegionFailureHandler {
failure_detect_runner: FailureDetectRunner,
heartbeat_acceptor: HeartbeatAcceptor,
}

impl RegionFailureHandler {
pub(crate) async fn try_new(
election: Option<ElectionRef>,
region_failover_manager: Arc<RegionFailoverManager>,
failure_detector_options: PhiAccrualFailureDetectorOptions,
) -> Result<Self> {
region_failover_manager.try_start()?;

let mut failure_detect_runner = FailureDetectRunner::new(
election,
region_failover_manager.clone(),
failure_detector_options,
);
failure_detect_runner.start().await;

Ok(Self {
failure_detect_runner,
})
pub(crate) fn new(mut region_supervisor: RegionSupervisor) -> Self {
let heartbeat_acceptor = region_supervisor.heartbeat_acceptor();
info!("Starting region supervisor");
tokio::spawn(async move { region_supervisor.run().await });
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
Self { heartbeat_acceptor }
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -66,73 +43,45 @@ impl HeartbeatHandler for RegionFailureHandler {
async fn handle(
&self,
_: &HeartbeatRequest,
ctx: &mut Context,
_ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<HandleControl> {
if ctx.is_infancy {
self.failure_detect_runner
.send_control(FailureDetectControl::Purge)
.await;
}

let Some(stat) = acc.stat.as_ref() else {
return Ok(HandleControl::Continue);
};

let heartbeat = DatanodeHeartbeat {
region_idents: stat
.region_stats
.iter()
.map(|x| {
let region_id = x.id;
RegionIdent {
cluster_id: stat.cluster_id,
datanode_id: stat.id,
table_id: region_id.table_id(),
region_number: region_id.region_number(),
engine: x.engine.clone(),
}
})
.collect(),
heartbeat_time: stat.timestamp_millis,
};

self.failure_detect_runner.send_heartbeat(heartbeat).await;
self.heartbeat_acceptor
.accept(DatanodeHeartbeat::from(stat))
.await;
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
WenyXu marked this conversation as resolved.
Show resolved Hide resolved

Ok(HandleControl::Continue)
}
}

#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;

use api::v1::meta::HeartbeatRequest;
use common_catalog::consts::default_engine;
use common_meta::key::MAINTENANCE_KEY;
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;
use tokio::sync::oneshot;

use super::*;
use crate::handler::failure_handler::RegionFailureHandler;
use crate::handler::node_stat::{RegionStat, Stat};
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::builder::MetasrvBuilder;
use crate::test_util::create_region_failover_manager;
use crate::region::supervisor::tests::new_test_supervisor;
use crate::region::supervisor::Event;

#[tokio::test(flavor = "multi_thread")]
#[tokio::test]
async fn test_handle_heartbeat() {
let region_failover_manager = create_region_failover_manager();
let failure_detector_options = PhiAccrualFailureDetectorOptions::default();
let handler =
RegionFailureHandler::try_new(None, region_failover_manager, failure_detector_options)
.await
.unwrap();

let supervisor = new_test_supervisor();
let sender = supervisor.sender();
let handler = RegionFailureHandler::new(supervisor);
let req = &HeartbeatRequest::default();

let builder = MetasrvBuilder::new();
let metasrv = builder.build().await.unwrap();
let mut ctx = metasrv.new_ctx();
ctx.is_infancy = false;

let acc = &mut HeartbeatAccumulator::default();
fn new_region_stat(region_id: u64) -> RegionStat {
RegionStat {
Expand All @@ -153,48 +102,9 @@ mod tests {
});

handler.handle(req, &mut ctx, acc).await.unwrap();

let dump = handler.failure_detect_runner.dump().await;
assert_eq!(dump.iter().collect::<Vec<_>>().len(), 3);

// infancy makes heartbeats re-accumulated
ctx.is_infancy = true;
acc.stat = None;
handler.handle(req, &mut ctx, acc).await.unwrap();
let dump = handler.failure_detect_runner.dump().await;
assert_eq!(dump.iter().collect::<Vec<_>>().len(), 0);
}

#[tokio::test(flavor = "multi_thread")]
async fn test_maintenance_mode() {
let region_failover_manager = create_region_failover_manager();
let kv_backend = region_failover_manager.create_context().kv_backend.clone();
let _handler = RegionFailureHandler::try_new(
None,
region_failover_manager.clone(),
PhiAccrualFailureDetectorOptions::default(),
)
.await
.unwrap();

let kv_req = common_meta::rpc::store::PutRequest {
key: Vec::from(MAINTENANCE_KEY),
value: vec![],
prev_kv: false,
};
let _ = kv_backend.put(kv_req.clone()).await.unwrap();
assert_matches!(
region_failover_manager.is_maintenance_mode().await,
Ok(true)
);

let _ = kv_backend
.delete(MAINTENANCE_KEY.as_bytes(), false)
.await
.unwrap();
assert_matches!(
region_failover_manager.is_maintenance_mode().await,
Ok(false)
);
let (tx, rx) = oneshot::channel();
sender.send(Event::Dump(tx)).await.unwrap();
let detector = rx.await.unwrap();
assert_eq!(detector.iter().collect::<Vec<_>>().len(), 3);
}
}
Loading