From 5d396bd6d7e17b26bc127512d6d879098a6d49d6 Mon Sep 17 00:00:00 2001 From: Jeremyhi Date: Mon, 1 Jul 2024 14:42:31 +0800 Subject: [PATCH] feat: forgot collect flownode clusterinfo handler (#4236) * feat: forgot collect flownode clusterinfo handler * fix: unit test * fix: filter stale heartbeat --- src/meta-srv/src/handler.rs | 47 +++- .../src/handler/collect_stats_handler.rs | 212 ++++++++++++++-- .../src/handler/extract_stat_handler.rs | 54 ++++ .../src/handler/persist_stats_handler.rs | 233 ------------------ src/meta-srv/src/metasrv/builder.rs | 10 +- 5 files changed, 293 insertions(+), 263 deletions(-) create mode 100644 src/meta-srv/src/handler/extract_stat_handler.rs delete mode 100644 src/meta-srv/src/handler/persist_stats_handler.rs diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index 93d99b682500..07a32cc2947f 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -43,13 +43,13 @@ use crate::service::mailbox::{ pub mod check_leader_handler; pub mod collect_cluster_info_handler; pub mod collect_stats_handler; +pub mod extract_stat_handler; pub mod failure_handler; pub mod filter_inactive_region_stats; pub mod keep_lease_handler; pub mod mailbox_handler; pub mod node_stat; pub mod on_leader_start_handler; -pub mod persist_stats_handler; pub mod publish_heartbeat_handler; pub mod region_lease_handler; pub mod response_header_handler; @@ -439,10 +439,16 @@ mod tests { use tokio::sync::mpsc; use crate::handler::check_leader_handler::CheckLeaderHandler; + use crate::handler::collect_cluster_info_handler::{ + CollectDatanodeClusterInfoHandler, CollectFlownodeClusterInfoHandler, + CollectFrontendClusterInfoHandler, + }; use crate::handler::collect_stats_handler::CollectStatsHandler; + use crate::handler::extract_stat_handler::ExtractStatHandler; + use crate::handler::filter_inactive_region_stats::FilterInactiveRegionStatsHandler; + use crate::handler::keep_lease_handler::{DatanodeKeepLeaseHandler, FlownodeKeepLeaseHandler}; use crate::handler::mailbox_handler::MailboxHandler; use crate::handler::on_leader_start_handler::OnLeaderStartHandler; - use crate::handler::persist_stats_handler::PersistStatsHandler; use crate::handler::response_header_handler::ResponseHeaderHandler; use crate::handler::{HeartbeatHandlerGroup, HeartbeatMailbox, Pusher}; use crate::service::mailbox::{Channel, MailboxReceiver, MailboxRef}; @@ -516,20 +522,39 @@ mod tests { async fn test_handler_name() { let group = HeartbeatHandlerGroup::default(); group.add_handler(ResponseHeaderHandler).await; + group.add_handler(DatanodeKeepLeaseHandler).await; + group.add_handler(FlownodeKeepLeaseHandler).await; group.add_handler(CheckLeaderHandler).await; group.add_handler(OnLeaderStartHandler).await; - group.add_handler(CollectStatsHandler).await; + group.add_handler(ExtractStatHandler).await; + group.add_handler(CollectDatanodeClusterInfoHandler).await; + group.add_handler(CollectFrontendClusterInfoHandler).await; + group.add_handler(CollectFlownodeClusterInfoHandler).await; group.add_handler(MailboxHandler).await; - group.add_handler(PersistStatsHandler::default()).await; + group.add_handler(FilterInactiveRegionStatsHandler).await; + group.add_handler(CollectStatsHandler::default()).await; let handlers = group.handlers.read().await; - assert_eq!(6, handlers.len()); - assert_eq!("ResponseHeaderHandler", handlers[0].handler.name()); - assert_eq!("CheckLeaderHandler", handlers[1].handler.name()); - assert_eq!("OnLeaderStartHandler", handlers[2].handler.name()); - assert_eq!("CollectStatsHandler", handlers[3].handler.name()); - assert_eq!("MailboxHandler", handlers[4].handler.name()); - assert_eq!("PersistStatsHandler", handlers[5].handler.name()); + assert_eq!(12, handlers.len()); + + let names = [ + "ResponseHeaderHandler", + "DatanodeKeepLeaseHandler", + "FlownodeKeepLeaseHandler", + "CheckLeaderHandler", + "OnLeaderStartHandler", + "ExtractStatHandler", + "CollectDatanodeClusterInfoHandler", + "CollectFrontendClusterInfoHandler", + "CollectFlownodeClusterInfoHandler", + "MailboxHandler", + "FilterInactiveRegionStatsHandler", + "CollectStatsHandler", + ]; + + for (handler, name) in handlers.iter().zip(names.into_iter()) { + assert_eq!(handler.name, name); + } } } diff --git a/src/meta-srv/src/handler/collect_stats_handler.rs b/src/meta-srv/src/handler/collect_stats_handler.rs index d68f1e3b38cf..14df81f028be 100644 --- a/src/meta-srv/src/handler/collect_stats_handler.rs +++ b/src/meta-srv/src/handler/collect_stats_handler.rs @@ -12,15 +12,64 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp::Ordering; + use api::v1::meta::{HeartbeatRequest, Role}; +use common_meta::rpc::store::PutRequest; use common_telemetry::warn; +use dashmap::DashMap; +use snafu::ResultExt; -use super::node_stat::Stat; -use crate::error::Result; +use crate::error::{self, Result}; +use crate::handler::node_stat::Stat; use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler}; +use crate::key::{DatanodeStatKey, DatanodeStatValue}; use crate::metasrv::Context; -pub struct CollectStatsHandler; +const MAX_CACHED_STATS_PER_KEY: usize = 10; + +#[derive(Default)] +struct EpochStats { + stats: Vec, + epoch: Option, +} + +impl EpochStats { + #[inline] + fn drain_all(&mut self) -> Vec { + self.stats.drain(..).collect() + } + + #[inline] + fn clear_stats(&mut self) { + self.stats.clear(); + } + + #[inline] + fn push_stat(&mut self, stat: Stat) { + self.stats.push(stat); + } + + #[inline] + fn len(&self) -> usize { + self.stats.len() + } + + #[inline] + fn epoch(&self) -> Option { + self.epoch + } + + #[inline] + fn set_epoch(&mut self, epoch: u64) { + self.epoch = Some(epoch); + } +} + +#[derive(Default)] +pub struct CollectStatsHandler { + stats_cache: DashMap, +} #[async_trait::async_trait] impl HeartbeatHandler for CollectStatsHandler { @@ -30,25 +79,158 @@ impl HeartbeatHandler for CollectStatsHandler { async fn handle( &self, - req: &HeartbeatRequest, - _ctx: &mut Context, + _req: &HeartbeatRequest, + ctx: &mut Context, acc: &mut HeartbeatAccumulator, ) -> Result { - if req.mailbox_message.is_some() { - // If the heartbeat is a mailbox message, it may have no other valid information, - // so we don't need to collect stats. + let Some(current_stat) = acc.stat.take() else { return Ok(HandleControl::Continue); - } + }; - match Stat::try_from(req.clone()) { - Ok(stat) => { - let _ = acc.stat.insert(stat); - } - Err(err) => { - warn!(err; "Incomplete heartbeat data: {:?}", req); + let key = current_stat.stat_key(); + let mut entry = self.stats_cache.entry(key).or_default(); + + let key: Vec = key.into(); + let epoch_stats = entry.value_mut(); + + let refresh = if let Some(epoch) = epoch_stats.epoch() { + match current_stat.node_epoch.cmp(&epoch) { + Ordering::Greater => { + // This node may have been redeployed. + epoch_stats.clear_stats(); + epoch_stats.set_epoch(current_stat.node_epoch); + epoch_stats.push_stat(current_stat); + true + } + Ordering::Equal => { + epoch_stats.push_stat(current_stat); + false + } + Ordering::Less => { + warn!("Ignore stale heartbeat: {:?}", current_stat); + false + } } + } else { + epoch_stats.set_epoch(current_stat.node_epoch); + epoch_stats.push_stat(current_stat); + // If the epoch is empty, it indicates that the current node sending the heartbeat + // for the first time to the current meta leader, so it is necessary to save + // the data to the KV store as soon as possible. + true }; + if !refresh && epoch_stats.len() < MAX_CACHED_STATS_PER_KEY { + return Ok(HandleControl::Continue); + } + + let value: Vec = DatanodeStatValue { + stats: epoch_stats.drain_all(), + } + .try_into()?; + let put = PutRequest { + key, + value, + ..Default::default() + }; + + let _ = ctx + .in_memory + .put(put) + .await + .context(error::KvBackendSnafu)?; + Ok(HandleControl::Continue) } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use common_meta::key::TableMetadataManager; + use common_meta::kv_backend::memory::MemoryKvBackend; + use common_meta::sequence::SequenceBuilder; + + use super::*; + use crate::cluster::MetaPeerClientBuilder; + use crate::handler::{HeartbeatMailbox, Pushers}; + use crate::key::DatanodeStatKey; + use crate::service::store::cached_kv::LeaderCachedKvBackend; + + #[tokio::test] + async fn test_handle_datanode_stats() { + let in_memory = Arc::new(MemoryKvBackend::new()); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::with_always_leader( + kv_backend.clone(), + )); + let seq = SequenceBuilder::new("test_seq", kv_backend.clone()).build(); + let mailbox = HeartbeatMailbox::create(Pushers::default(), seq); + let meta_peer_client = MetaPeerClientBuilder::default() + .election(None) + .in_memory(in_memory.clone()) + .build() + .map(Arc::new) + // Safety: all required fields set at initialization + .unwrap(); + let ctx = Context { + server_addr: "127.0.0.1:0000".to_string(), + in_memory, + kv_backend: kv_backend.clone(), + leader_cached_kv_backend, + meta_peer_client, + mailbox, + election: None, + is_infancy: false, + table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())), + }; + + let handler = CollectStatsHandler::default(); + handle_request_many_times(ctx.clone(), &handler, 1).await; + + let key = DatanodeStatKey { + cluster_id: 3, + node_id: 101, + }; + let key: Vec = key.into(); + let res = ctx.in_memory.get(&key).await.unwrap(); + let kv = res.unwrap(); + let key: DatanodeStatKey = kv.key.clone().try_into().unwrap(); + assert_eq!(3, key.cluster_id); + assert_eq!(101, key.node_id); + let val: DatanodeStatValue = kv.value.try_into().unwrap(); + // first new stat must be set in kv store immediately + assert_eq!(1, val.stats.len()); + assert_eq!(1, val.stats[0].region_num); + + handle_request_many_times(ctx.clone(), &handler, 10).await; + + let key: Vec = key.into(); + let res = ctx.in_memory.get(&key).await.unwrap(); + let kv = res.unwrap(); + let val: DatanodeStatValue = kv.value.try_into().unwrap(); + // refresh every 10 stats + assert_eq!(10, val.stats.len()); + } + + async fn handle_request_many_times( + mut ctx: Context, + handler: &CollectStatsHandler, + loop_times: i32, + ) { + let req = HeartbeatRequest::default(); + for i in 1..=loop_times { + let mut acc = HeartbeatAccumulator { + stat: Some(Stat { + cluster_id: 3, + id: 101, + region_num: i as _, + ..Default::default() + }), + ..Default::default() + }; + handler.handle(&req, &mut ctx, &mut acc).await.unwrap(); + } + } +} diff --git a/src/meta-srv/src/handler/extract_stat_handler.rs b/src/meta-srv/src/handler/extract_stat_handler.rs new file mode 100644 index 000000000000..8124ec833677 --- /dev/null +++ b/src/meta-srv/src/handler/extract_stat_handler.rs @@ -0,0 +1,54 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::meta::{HeartbeatRequest, Role}; +use common_telemetry::warn; + +use super::node_stat::Stat; +use crate::error::Result; +use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler}; +use crate::metasrv::Context; + +pub struct ExtractStatHandler; + +#[async_trait::async_trait] +impl HeartbeatHandler for ExtractStatHandler { + fn is_acceptable(&self, role: Role) -> bool { + role == Role::Datanode + } + + async fn handle( + &self, + req: &HeartbeatRequest, + _ctx: &mut Context, + acc: &mut HeartbeatAccumulator, + ) -> Result { + if req.mailbox_message.is_some() { + // If the heartbeat is a mailbox message, it may have no other valid information, + // so we don't need to collect stats. + return Ok(HandleControl::Continue); + } + + match Stat::try_from(req.clone()) { + Ok(stat) => { + let _ = acc.stat.insert(stat); + } + Err(err) => { + warn!(err; "Incomplete heartbeat data: {:?}", req); + } + }; + + Ok(HandleControl::Continue) + } +} diff --git a/src/meta-srv/src/handler/persist_stats_handler.rs b/src/meta-srv/src/handler/persist_stats_handler.rs deleted file mode 100644 index faa16c67419f..000000000000 --- a/src/meta-srv/src/handler/persist_stats_handler.rs +++ /dev/null @@ -1,233 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::cmp::Ordering; - -use api::v1::meta::{HeartbeatRequest, Role}; -use common_meta::rpc::store::PutRequest; -use common_telemetry::warn; -use dashmap::DashMap; -use snafu::ResultExt; - -use crate::error::{self, Result}; -use crate::handler::node_stat::Stat; -use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler}; -use crate::key::{DatanodeStatKey, DatanodeStatValue}; -use crate::metasrv::Context; - -const MAX_CACHED_STATS_PER_KEY: usize = 10; - -#[derive(Default)] -struct EpochStats { - stats: Vec, - epoch: Option, -} - -impl EpochStats { - #[inline] - fn drain_all(&mut self) -> Vec { - self.stats.drain(..).collect() - } - - #[inline] - fn clear(&mut self) { - self.stats.clear(); - } - - #[inline] - fn push(&mut self, stat: Stat) { - self.stats.push(stat); - } - - #[inline] - fn len(&self) -> usize { - self.stats.len() - } - - #[inline] - fn epoch(&self) -> Option { - self.epoch - } - - #[inline] - fn set_epoch(&mut self, epoch: u64) { - self.epoch = Some(epoch); - } -} - -#[derive(Default)] -pub struct PersistStatsHandler { - stats_cache: DashMap, -} - -#[async_trait::async_trait] -impl HeartbeatHandler for PersistStatsHandler { - fn is_acceptable(&self, role: Role) -> bool { - role == Role::Datanode - } - - async fn handle( - &self, - _req: &HeartbeatRequest, - ctx: &mut Context, - acc: &mut HeartbeatAccumulator, - ) -> Result { - let Some(current_stat) = acc.stat.take() else { - return Ok(HandleControl::Continue); - }; - - let key = current_stat.stat_key(); - let mut entry = self.stats_cache.entry(key).or_default(); - - let key: Vec = key.into(); - let epoch_stats = entry.value_mut(); - - let refresh = if let Some(epoch) = epoch_stats.epoch() { - match current_stat.node_epoch.cmp(&epoch) { - Ordering::Greater => { - // This node may have been redeployed. - epoch_stats.set_epoch(current_stat.node_epoch); - epoch_stats.clear(); - true - } - Ordering::Less => { - warn!("Ignore stale heartbeat: {:?}", current_stat); - false - } - Ordering::Equal => false, - } - } else { - epoch_stats.set_epoch(current_stat.node_epoch); - // If the epoch is empty, it indicates that the current node sending the heartbeat - // for the first time to the current meta leader, so it is necessary to persist - // the data to the KV store as soon as possible. - true - }; - - epoch_stats.push(current_stat); - - if !refresh && epoch_stats.len() < MAX_CACHED_STATS_PER_KEY { - return Ok(HandleControl::Continue); - } - - let value: Vec = DatanodeStatValue { - stats: epoch_stats.drain_all(), - } - .try_into()?; - let put = PutRequest { - key, - value, - ..Default::default() - }; - - let _ = ctx - .in_memory - .put(put) - .await - .context(error::KvBackendSnafu)?; - - Ok(HandleControl::Continue) - } -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use common_meta::key::TableMetadataManager; - use common_meta::kv_backend::memory::MemoryKvBackend; - use common_meta::sequence::SequenceBuilder; - - use super::*; - use crate::cluster::MetaPeerClientBuilder; - use crate::handler::{HeartbeatMailbox, Pushers}; - use crate::key::DatanodeStatKey; - use crate::service::store::cached_kv::LeaderCachedKvBackend; - - #[tokio::test] - async fn test_handle_datanode_stats() { - let in_memory = Arc::new(MemoryKvBackend::new()); - let kv_backend = Arc::new(MemoryKvBackend::new()); - let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::with_always_leader( - kv_backend.clone(), - )); - let seq = SequenceBuilder::new("test_seq", kv_backend.clone()).build(); - let mailbox = HeartbeatMailbox::create(Pushers::default(), seq); - let meta_peer_client = MetaPeerClientBuilder::default() - .election(None) - .in_memory(in_memory.clone()) - .build() - .map(Arc::new) - // Safety: all required fields set at initialization - .unwrap(); - let ctx = Context { - server_addr: "127.0.0.1:0000".to_string(), - in_memory, - kv_backend: kv_backend.clone(), - leader_cached_kv_backend, - meta_peer_client, - mailbox, - election: None, - is_infancy: false, - table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())), - }; - - let handler = PersistStatsHandler::default(); - handle_request_many_times(ctx.clone(), &handler, 1).await; - - let key = DatanodeStatKey { - cluster_id: 3, - node_id: 101, - }; - let key: Vec = key.into(); - let res = ctx.in_memory.get(&key).await.unwrap(); - let kv = res.unwrap(); - let key: DatanodeStatKey = kv.key.clone().try_into().unwrap(); - assert_eq!(3, key.cluster_id); - assert_eq!(101, key.node_id); - let val: DatanodeStatValue = kv.value.try_into().unwrap(); - // first new stat must be set in kv store immediately - assert_eq!(1, val.stats.len()); - assert_eq!(1, val.stats[0].region_num); - - handle_request_many_times(ctx.clone(), &handler, 10).await; - - let key: Vec = key.into(); - let res = ctx.in_memory.get(&key).await.unwrap(); - let kv = res.unwrap(); - let val: DatanodeStatValue = kv.value.try_into().unwrap(); - // refresh every 10 stats - assert_eq!(10, val.stats.len()); - } - - async fn handle_request_many_times( - mut ctx: Context, - handler: &PersistStatsHandler, - loop_times: i32, - ) { - let req = HeartbeatRequest::default(); - for i in 1..=loop_times { - let mut acc = HeartbeatAccumulator { - stat: Some(Stat { - cluster_id: 3, - id: 101, - region_num: i as _, - ..Default::default() - }), - ..Default::default() - }; - handler.handle(&req, &mut ctx, &mut acc).await.unwrap(); - } - } -} diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 3c039fc136b9..c8d351b3e63a 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -48,15 +48,16 @@ use crate::flow_meta_alloc::FlowPeerAllocator; use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; use crate::handler::check_leader_handler::CheckLeaderHandler; use crate::handler::collect_cluster_info_handler::{ - CollectDatanodeClusterInfoHandler, CollectFrontendClusterInfoHandler, + CollectDatanodeClusterInfoHandler, CollectFlownodeClusterInfoHandler, + CollectFrontendClusterInfoHandler, }; use crate::handler::collect_stats_handler::CollectStatsHandler; +use crate::handler::extract_stat_handler::ExtractStatHandler; use crate::handler::failure_handler::RegionFailureHandler; use crate::handler::filter_inactive_region_stats::FilterInactiveRegionStatsHandler; use crate::handler::keep_lease_handler::{DatanodeKeepLeaseHandler, FlownodeKeepLeaseHandler}; use crate::handler::mailbox_handler::MailboxHandler; use crate::handler::on_leader_start_handler::OnLeaderStartHandler; -use crate::handler::persist_stats_handler::PersistStatsHandler; use crate::handler::publish_heartbeat_handler::PublishHeartbeatHandler; use crate::handler::region_lease_handler::RegionLeaseHandler; use crate::handler::response_header_handler::ResponseHeaderHandler; @@ -383,9 +384,10 @@ impl MetasrvBuilder { group.add_handler(FlownodeKeepLeaseHandler).await; group.add_handler(CheckLeaderHandler).await; group.add_handler(OnLeaderStartHandler).await; - group.add_handler(CollectStatsHandler).await; + group.add_handler(ExtractStatHandler).await; group.add_handler(CollectDatanodeClusterInfoHandler).await; group.add_handler(CollectFrontendClusterInfoHandler).await; + group.add_handler(CollectFlownodeClusterInfoHandler).await; group.add_handler(MailboxHandler).await; group.add_handler(region_lease_handler).await; group.add_handler(FilterInactiveRegionStatsHandler).await; @@ -395,7 +397,7 @@ impl MetasrvBuilder { if let Some(publish_heartbeat_handler) = publish_heartbeat_handler { group.add_handler(publish_heartbeat_handler).await; } - group.add_handler(PersistStatsHandler::default()).await; + group.add_handler(CollectStatsHandler::default()).await; group } };