Skip to content

Commit

Permalink
feat: forgot collect flownode clusterinfo handler (#4236)
Browse files Browse the repository at this point in the history
* feat: forgot collect flownode clusterinfo handler

* fix: unit test

* fix: filter stale heartbeat
  • Loading branch information
fengjiachun committed Jul 1, 2024
1 parent fe2c5c3 commit 5d396bd
Show file tree
Hide file tree
Showing 5 changed files with 293 additions and 263 deletions.
47 changes: 36 additions & 11 deletions src/meta-srv/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ use crate::service::mailbox::{
pub mod check_leader_handler;
pub mod collect_cluster_info_handler;
pub mod collect_stats_handler;
pub mod extract_stat_handler;
pub mod failure_handler;
pub mod filter_inactive_region_stats;
pub mod keep_lease_handler;
pub mod mailbox_handler;
pub mod node_stat;
pub mod on_leader_start_handler;
pub mod persist_stats_handler;
pub mod publish_heartbeat_handler;
pub mod region_lease_handler;
pub mod response_header_handler;
Expand Down Expand Up @@ -439,10 +439,16 @@ mod tests {
use tokio::sync::mpsc;

use crate::handler::check_leader_handler::CheckLeaderHandler;
use crate::handler::collect_cluster_info_handler::{
CollectDatanodeClusterInfoHandler, CollectFlownodeClusterInfoHandler,
CollectFrontendClusterInfoHandler,
};
use crate::handler::collect_stats_handler::CollectStatsHandler;
use crate::handler::extract_stat_handler::ExtractStatHandler;
use crate::handler::filter_inactive_region_stats::FilterInactiveRegionStatsHandler;
use crate::handler::keep_lease_handler::{DatanodeKeepLeaseHandler, FlownodeKeepLeaseHandler};
use crate::handler::mailbox_handler::MailboxHandler;
use crate::handler::on_leader_start_handler::OnLeaderStartHandler;
use crate::handler::persist_stats_handler::PersistStatsHandler;
use crate::handler::response_header_handler::ResponseHeaderHandler;
use crate::handler::{HeartbeatHandlerGroup, HeartbeatMailbox, Pusher};
use crate::service::mailbox::{Channel, MailboxReceiver, MailboxRef};
Expand Down Expand Up @@ -516,20 +522,39 @@ mod tests {
async fn test_handler_name() {
let group = HeartbeatHandlerGroup::default();
group.add_handler(ResponseHeaderHandler).await;
group.add_handler(DatanodeKeepLeaseHandler).await;
group.add_handler(FlownodeKeepLeaseHandler).await;
group.add_handler(CheckLeaderHandler).await;
group.add_handler(OnLeaderStartHandler).await;
group.add_handler(CollectStatsHandler).await;
group.add_handler(ExtractStatHandler).await;
group.add_handler(CollectDatanodeClusterInfoHandler).await;
group.add_handler(CollectFrontendClusterInfoHandler).await;
group.add_handler(CollectFlownodeClusterInfoHandler).await;
group.add_handler(MailboxHandler).await;
group.add_handler(PersistStatsHandler::default()).await;
group.add_handler(FilterInactiveRegionStatsHandler).await;
group.add_handler(CollectStatsHandler::default()).await;

let handlers = group.handlers.read().await;

assert_eq!(6, handlers.len());
assert_eq!("ResponseHeaderHandler", handlers[0].handler.name());
assert_eq!("CheckLeaderHandler", handlers[1].handler.name());
assert_eq!("OnLeaderStartHandler", handlers[2].handler.name());
assert_eq!("CollectStatsHandler", handlers[3].handler.name());
assert_eq!("MailboxHandler", handlers[4].handler.name());
assert_eq!("PersistStatsHandler", handlers[5].handler.name());
assert_eq!(12, handlers.len());

let names = [
"ResponseHeaderHandler",
"DatanodeKeepLeaseHandler",
"FlownodeKeepLeaseHandler",
"CheckLeaderHandler",
"OnLeaderStartHandler",
"ExtractStatHandler",
"CollectDatanodeClusterInfoHandler",
"CollectFrontendClusterInfoHandler",
"CollectFlownodeClusterInfoHandler",
"MailboxHandler",
"FilterInactiveRegionStatsHandler",
"CollectStatsHandler",
];

for (handler, name) in handlers.iter().zip(names.into_iter()) {
assert_eq!(handler.name, name);
}
}
}
212 changes: 197 additions & 15 deletions src/meta-srv/src/handler/collect_stats_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,64 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::cmp::Ordering;

use api::v1::meta::{HeartbeatRequest, Role};
use common_meta::rpc::store::PutRequest;
use common_telemetry::warn;
use dashmap::DashMap;
use snafu::ResultExt;

use super::node_stat::Stat;
use crate::error::Result;
use crate::error::{self, Result};
use crate::handler::node_stat::Stat;
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::key::{DatanodeStatKey, DatanodeStatValue};
use crate::metasrv::Context;

pub struct CollectStatsHandler;
const MAX_CACHED_STATS_PER_KEY: usize = 10;

#[derive(Default)]
struct EpochStats {
stats: Vec<Stat>,
epoch: Option<u64>,
}

impl EpochStats {
#[inline]
fn drain_all(&mut self) -> Vec<Stat> {
self.stats.drain(..).collect()
}

#[inline]
fn clear_stats(&mut self) {
self.stats.clear();
}

#[inline]
fn push_stat(&mut self, stat: Stat) {
self.stats.push(stat);
}

#[inline]
fn len(&self) -> usize {
self.stats.len()
}

#[inline]
fn epoch(&self) -> Option<u64> {
self.epoch
}

#[inline]
fn set_epoch(&mut self, epoch: u64) {
self.epoch = Some(epoch);
}
}

#[derive(Default)]
pub struct CollectStatsHandler {
stats_cache: DashMap<DatanodeStatKey, EpochStats>,
}

#[async_trait::async_trait]
impl HeartbeatHandler for CollectStatsHandler {
Expand All @@ -30,25 +79,158 @@ impl HeartbeatHandler for CollectStatsHandler {

async fn handle(
&self,
req: &HeartbeatRequest,
_ctx: &mut Context,
_req: &HeartbeatRequest,
ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<HandleControl> {
if req.mailbox_message.is_some() {
// If the heartbeat is a mailbox message, it may have no other valid information,
// so we don't need to collect stats.
let Some(current_stat) = acc.stat.take() else {
return Ok(HandleControl::Continue);
}
};

match Stat::try_from(req.clone()) {
Ok(stat) => {
let _ = acc.stat.insert(stat);
}
Err(err) => {
warn!(err; "Incomplete heartbeat data: {:?}", req);
let key = current_stat.stat_key();
let mut entry = self.stats_cache.entry(key).or_default();

let key: Vec<u8> = key.into();
let epoch_stats = entry.value_mut();

let refresh = if let Some(epoch) = epoch_stats.epoch() {
match current_stat.node_epoch.cmp(&epoch) {
Ordering::Greater => {
// This node may have been redeployed.
epoch_stats.clear_stats();
epoch_stats.set_epoch(current_stat.node_epoch);
epoch_stats.push_stat(current_stat);
true
}
Ordering::Equal => {
epoch_stats.push_stat(current_stat);
false
}
Ordering::Less => {
warn!("Ignore stale heartbeat: {:?}", current_stat);
false
}
}
} else {
epoch_stats.set_epoch(current_stat.node_epoch);
epoch_stats.push_stat(current_stat);
// If the epoch is empty, it indicates that the current node sending the heartbeat
// for the first time to the current meta leader, so it is necessary to save
// the data to the KV store as soon as possible.
true
};

if !refresh && epoch_stats.len() < MAX_CACHED_STATS_PER_KEY {
return Ok(HandleControl::Continue);
}

let value: Vec<u8> = DatanodeStatValue {
stats: epoch_stats.drain_all(),
}
.try_into()?;
let put = PutRequest {
key,
value,
..Default::default()
};

let _ = ctx
.in_memory
.put(put)
.await
.context(error::KvBackendSnafu)?;

Ok(HandleControl::Continue)
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::sequence::SequenceBuilder;

use super::*;
use crate::cluster::MetaPeerClientBuilder;
use crate::handler::{HeartbeatMailbox, Pushers};
use crate::key::DatanodeStatKey;
use crate::service::store::cached_kv::LeaderCachedKvBackend;

#[tokio::test]
async fn test_handle_datanode_stats() {
let in_memory = Arc::new(MemoryKvBackend::new());
let kv_backend = Arc::new(MemoryKvBackend::new());
let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::with_always_leader(
kv_backend.clone(),
));
let seq = SequenceBuilder::new("test_seq", kv_backend.clone()).build();
let mailbox = HeartbeatMailbox::create(Pushers::default(), seq);
let meta_peer_client = MetaPeerClientBuilder::default()
.election(None)
.in_memory(in_memory.clone())
.build()
.map(Arc::new)
// Safety: all required fields set at initialization
.unwrap();
let ctx = Context {
server_addr: "127.0.0.1:0000".to_string(),
in_memory,
kv_backend: kv_backend.clone(),
leader_cached_kv_backend,
meta_peer_client,
mailbox,
election: None,
is_infancy: false,
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
};

let handler = CollectStatsHandler::default();
handle_request_many_times(ctx.clone(), &handler, 1).await;

let key = DatanodeStatKey {
cluster_id: 3,
node_id: 101,
};
let key: Vec<u8> = key.into();
let res = ctx.in_memory.get(&key).await.unwrap();
let kv = res.unwrap();
let key: DatanodeStatKey = kv.key.clone().try_into().unwrap();
assert_eq!(3, key.cluster_id);
assert_eq!(101, key.node_id);
let val: DatanodeStatValue = kv.value.try_into().unwrap();
// first new stat must be set in kv store immediately
assert_eq!(1, val.stats.len());
assert_eq!(1, val.stats[0].region_num);

handle_request_many_times(ctx.clone(), &handler, 10).await;

let key: Vec<u8> = key.into();
let res = ctx.in_memory.get(&key).await.unwrap();
let kv = res.unwrap();
let val: DatanodeStatValue = kv.value.try_into().unwrap();
// refresh every 10 stats
assert_eq!(10, val.stats.len());
}

async fn handle_request_many_times(
mut ctx: Context,
handler: &CollectStatsHandler,
loop_times: i32,
) {
let req = HeartbeatRequest::default();
for i in 1..=loop_times {
let mut acc = HeartbeatAccumulator {
stat: Some(Stat {
cluster_id: 3,
id: 101,
region_num: i as _,
..Default::default()
}),
..Default::default()
};
handler.handle(&req, &mut ctx, &mut acc).await.unwrap();
}
}
}
54 changes: 54 additions & 0 deletions src/meta-srv/src/handler/extract_stat_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use api::v1::meta::{HeartbeatRequest, Role};
use common_telemetry::warn;

use super::node_stat::Stat;
use crate::error::Result;
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;

pub struct ExtractStatHandler;

#[async_trait::async_trait]
impl HeartbeatHandler for ExtractStatHandler {
fn is_acceptable(&self, role: Role) -> bool {
role == Role::Datanode
}

async fn handle(
&self,
req: &HeartbeatRequest,
_ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<HandleControl> {
if req.mailbox_message.is_some() {
// If the heartbeat is a mailbox message, it may have no other valid information,
// so we don't need to collect stats.
return Ok(HandleControl::Continue);
}

match Stat::try_from(req.clone()) {
Ok(stat) => {
let _ = acc.stat.insert(stat);
}
Err(err) => {
warn!(err; "Incomplete heartbeat data: {:?}", req);
}
};

Ok(HandleControl::Continue)
}
}
Loading

0 comments on commit 5d396bd

Please sign in to comment.