Skip to content

Commit

Permalink
feat: register & deregister region failure detectors actively (#4223)
Browse files Browse the repository at this point in the history
* feat: Use DATANODE_LEASE_SECS from distributed_time_constants for heartbeat pause duration

* feat: introduce `RegionFailureDetectorController` to manage region failure detectors

* feat: add `RegionFailureDetectorController` to `DdlContext`

* feat: add `region_failure_detector_controller` to `Context` in region migration

* feat: register region failure detectors during rollback region migration procedure

* feat: deregister region failure detectors during drop table procedure

* feat: register region failure detectors during create table procedure

* fix: update meta config

* chore: apply suggestions from CR

* chore: avoid cloning

* chore: rename

* chore: reduce the size of the test

* chore: apply suggestions from CR

* chore: move channel initialization into `RegionSupervisor::channel`

* chore: minor refactor

* chore: rename ident
  • Loading branch information
WenyXu committed Jul 1, 2024
1 parent 214fd38 commit 6a634f8
Show file tree
Hide file tree
Showing 26 changed files with 448 additions and 138 deletions.
2 changes: 1 addition & 1 deletion config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@
| `failure_detector` | -- | -- | -- |
| `failure_detector.threshold` | Float | `8.0` | -- |
| `failure_detector.min_std_deviation` | String | `100ms` | -- |
| `failure_detector.acceptable_heartbeat_pause` | String | `3000ms` | -- |
| `failure_detector.acceptable_heartbeat_pause` | String | `10000ms` | -- |
| `failure_detector.first_heartbeat_estimate` | String | `1000ms` | -- |
| `datanode` | -- | -- | Datanode options. |
| `datanode.client` | -- | -- | Datanode client options. |
Expand Down
2 changes: 1 addition & 1 deletion config/metasrv.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ max_metadata_value_size = "1500KiB"
[failure_detector]
threshold = 8.0
min_std_deviation = "100ms"
acceptable_heartbeat_pause = "3000ms"
acceptable_heartbeat_pause = "10000ms"
first_heartbeat_estimate = "1000ms"

## Datanode options.
Expand Down
3 changes: 2 additions & 1 deletion src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use common_meta::cache::LayeredCacheRegistryBuilder;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::ddl::flow_meta::{FlowMetadataAllocator, FlowMetadataAllocatorRef};
use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
use common_meta::ddl::{DdlContext, ProcedureExecutorRef};
use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl, ProcedureExecutorRef};
use common_meta::ddl_manager::DdlManager;
use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef};
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
Expand Down Expand Up @@ -559,6 +559,7 @@ impl StartCommand {
flow_metadata_manager,
flow_metadata_allocator,
peer_lookup_service: Arc::new(StandalonePeerLookupService::new()),
region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
},
procedure_manager,
true,
Expand Down
55 changes: 53 additions & 2 deletions src/common/meta/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::collections::HashMap;
use std::sync::Arc;

use common_telemetry::tracing_context::W3cTrace;
use store_api::storage::{RegionNumber, TableId};
use store_api::storage::{RegionId, RegionNumber, TableId};

use crate::cache_invalidator::CacheInvalidatorRef;
use crate::ddl::flow_meta::FlowMetadataAllocatorRef;
Expand All @@ -30,7 +30,7 @@ use crate::peer::PeerLookupServiceRef;
use crate::region_keeper::MemoryRegionKeeperRef;
use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
use crate::ClusterId;
use crate::{ClusterId, DatanodeId};

pub mod alter_logical_tables;
pub mod alter_table;
Expand Down Expand Up @@ -102,6 +102,33 @@ pub struct TableMetadata {
pub region_wal_options: HashMap<RegionNumber, String>,
}

pub type RegionFailureDetectorControllerRef = Arc<dyn RegionFailureDetectorController>;

pub type DetectingRegion = (ClusterId, DatanodeId, RegionId);

/// Used for actively registering Region failure detectors.
///
/// Ensuring the Region Supervisor can detect Region failures without relying on the first heartbeat from the datanode.
#[async_trait::async_trait]
pub trait RegionFailureDetectorController: Send + Sync {
/// Registers failure detectors for the given identifiers.
async fn register_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>);

/// Deregisters failure detectors for the given identifiers.
async fn deregister_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>);
}

/// A noop implementation of [`RegionFailureDetectorController`].
#[derive(Debug, Clone)]
pub struct NoopRegionFailureDetectorControl;

#[async_trait::async_trait]
impl RegionFailureDetectorController for NoopRegionFailureDetectorControl {
async fn register_failure_detectors(&self, _detecting_regions: Vec<DetectingRegion>) {}

async fn deregister_failure_detectors(&self, _detecting_regions: Vec<DetectingRegion>) {}
}

/// The context of ddl.
#[derive(Clone)]
pub struct DdlContext {
Expand All @@ -121,4 +148,28 @@ pub struct DdlContext {
pub flow_metadata_allocator: FlowMetadataAllocatorRef,
/// look up peer by id.
pub peer_lookup_service: PeerLookupServiceRef,
/// controller of region failure detector.
pub region_failure_detector_controller: RegionFailureDetectorControllerRef,
}

impl DdlContext {
/// Notifies the RegionSupervisor to register failure detector of new created regions.
///
/// The datanode may crash without sending a heartbeat that contains information about newly created regions,
/// which may prevent the RegionSupervisor from detecting failures in these newly created regions.
pub async fn register_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
self.region_failure_detector_controller
.register_failure_detectors(detecting_regions)
.await;
}

/// Notifies the RegionSupervisor to remove failure detectors.
///
/// Once the regions were dropped, subsequent heartbeats no longer include these regions.
/// Therefore, we should remove the failure detectors for these dropped regions.
async fn deregister_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
self.region_failure_detector_controller
.deregister_failure_detectors(detecting_regions)
.await;
}
}
16 changes: 14 additions & 2 deletions src/common/meta/src/ddl/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ use table::metadata::{RawTableInfo, TableId};
use table::table_reference::TableReference;

use crate::ddl::create_table_template::{build_template, CreateRequestBuilder};
use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error, region_storage_path};
use crate::ddl::utils::{
add_peer_context_if_needed, convert_region_routes_to_detecting_regions, handle_retry_error,
region_storage_path,
};
use crate::ddl::{DdlContext, TableMetadata, TableMetadataAllocatorContext};
use crate::error::{self, Result};
use crate::key::table_name::TableNameKey;
Expand Down Expand Up @@ -265,16 +268,25 @@ impl CreateTableProcedure {
/// - Failed to create table metadata.
async fn on_create_metadata(&mut self) -> Result<Status> {
let table_id = self.table_id();
let cluster_id = self.creator.data.cluster_id;
let manager = &self.context.table_metadata_manager;

let raw_table_info = self.table_info().clone();
// Safety: the region_wal_options must be allocated.
let region_wal_options = self.region_wal_options()?.clone();
// Safety: the table_route must be allocated.
let table_route = TableRouteValue::Physical(self.table_route()?.clone());
let physical_table_route = self.table_route()?.clone();
let detecting_regions = convert_region_routes_to_detecting_regions(
cluster_id,
&physical_table_route.region_routes,
);
let table_route = TableRouteValue::Physical(physical_table_route);
manager
.create_table_metadata(raw_table_info, table_route, region_wal_options)
.await?;
self.context
.register_failure_detectors(detecting_regions)
.await;
info!("Created table metadata for table {table_id}");

self.creator.opening_regions.clear();
Expand Down
4 changes: 4 additions & 0 deletions src/common/meta/src/ddl/drop_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::ddl::DdlContext;
use crate::error::Result;
use crate::key::table_name::TableNameValue;
use crate::lock_key::{CatalogLock, SchemaLock};
use crate::ClusterId;

pub struct DropDatabaseProcedure {
/// The context of procedure runtime.
Expand All @@ -53,6 +54,7 @@ pub(crate) enum DropTableTarget {

/// Context of [DropDatabaseProcedure] execution.
pub(crate) struct DropDatabaseContext {
cluster_id: ClusterId,
catalog: String,
schema: String,
drop_if_exists: bool,
Expand Down Expand Up @@ -85,6 +87,7 @@ impl DropDatabaseProcedure {
Self {
runtime_context: context,
context: DropDatabaseContext {
cluster_id: 0,
catalog,
schema,
drop_if_exists,
Expand All @@ -105,6 +108,7 @@ impl DropDatabaseProcedure {
Ok(Self {
runtime_context,
context: DropDatabaseContext {
cluster_id: 0,
catalog,
schema,
drop_if_exists,
Expand Down
3 changes: 3 additions & 0 deletions src/common/meta/src/ddl/drop_database/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ mod tests {
// It always starts from Logical
let mut state = DropDatabaseCursor::new(DropTableTarget::Logical);
let mut ctx = DropDatabaseContext {
cluster_id: 0,
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
Expand Down Expand Up @@ -256,6 +257,7 @@ mod tests {
// It always starts from Logical
let mut state = DropDatabaseCursor::new(DropTableTarget::Logical);
let mut ctx = DropDatabaseContext {
cluster_id: 0,
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
Expand Down Expand Up @@ -284,6 +286,7 @@ mod tests {
let ddl_context = new_ddl_context(node_manager);
let mut state = DropDatabaseCursor::new(DropTableTarget::Physical);
let mut ctx = DropDatabaseContext {
cluster_id: 0,
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
Expand Down
11 changes: 9 additions & 2 deletions src/common/meta/src/ddl/drop_database/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,11 @@ impl State for DropDatabaseExecutor {
async fn next(
&mut self,
ddl_ctx: &DdlContext,
_ctx: &mut DropDatabaseContext,
ctx: &mut DropDatabaseContext,
) -> Result<(Box<dyn State>, Status)> {
self.register_dropping_regions(ddl_ctx)?;
let executor = DropTableExecutor::new(self.table_name.clone(), self.table_id, true);
let executor =
DropTableExecutor::new(ctx.cluster_id, self.table_name.clone(), self.table_id, true);
// Deletes metadata for table permanently.
let table_route_value = TableRouteValue::new(
self.table_id,
Expand Down Expand Up @@ -186,6 +187,7 @@ mod tests {
DropTableTarget::Physical,
);
let mut ctx = DropDatabaseContext {
cluster_id: 0,
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
Expand All @@ -198,6 +200,7 @@ mod tests {
}
// Execute again
let mut ctx = DropDatabaseContext {
cluster_id: 0,
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
Expand Down Expand Up @@ -238,6 +241,7 @@ mod tests {
DropTableTarget::Logical,
);
let mut ctx = DropDatabaseContext {
cluster_id: 0,
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
Expand All @@ -250,6 +254,7 @@ mod tests {
}
// Execute again
let mut ctx = DropDatabaseContext {
cluster_id: 0,
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
Expand Down Expand Up @@ -339,6 +344,7 @@ mod tests {
DropTableTarget::Physical,
);
let mut ctx = DropDatabaseContext {
cluster_id: 0,
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
Expand Down Expand Up @@ -368,6 +374,7 @@ mod tests {
DropTableTarget::Physical,
);
let mut ctx = DropDatabaseContext {
cluster_id: 0,
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
Expand Down
2 changes: 2 additions & 0 deletions src/common/meta/src/ddl/drop_database/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ mod tests {
.unwrap();
let mut state = DropDatabaseRemoveMetadata;
let mut ctx = DropDatabaseContext {
cluster_id: 0,
catalog: "foo".to_string(),
schema: "bar".to_string(),
drop_if_exists: true,
Expand All @@ -144,6 +145,7 @@ mod tests {
// Schema not exists
let mut state = DropDatabaseRemoveMetadata;
let mut ctx = DropDatabaseContext {
cluster_id: 0,
catalog: "foo".to_string(),
schema: "bar".to_string(),
drop_if_exists: true,
Expand Down
3 changes: 3 additions & 0 deletions src/common/meta/src/ddl/drop_database/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ mod tests {
let ddl_context = new_ddl_context(node_manager);
let mut step = DropDatabaseStart;
let mut ctx = DropDatabaseContext {
cluster_id: 0,
catalog: "foo".to_string(),
schema: "bar".to_string(),
drop_if_exists: false,
Expand All @@ -104,6 +105,7 @@ mod tests {
let ddl_context = new_ddl_context(node_manager);
let mut state = DropDatabaseStart;
let mut ctx = DropDatabaseContext {
cluster_id: 0,
catalog: "foo".to_string(),
schema: "bar".to_string(),
drop_if_exists: true,
Expand All @@ -126,6 +128,7 @@ mod tests {
.unwrap();
let mut state = DropDatabaseStart;
let mut ctx = DropDatabaseContext {
cluster_id: 0,
catalog: "foo".to_string(),
schema: "bar".to_string(),
drop_if_exists: false,
Expand Down
1 change: 1 addition & 0 deletions src/common/meta/src/ddl/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ impl DropTableData {

fn build_executor(&self) -> DropTableExecutor {
DropTableExecutor::new(
self.cluster_id,
self.task.table_name(),
self.task.table_id,
self.task.drop_if_exists,
Expand Down
Loading

0 comments on commit 6a634f8

Please sign in to comment.