Skip to content

Commit

Permalink
feat: lookup peer addr
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Jun 25, 2024
1 parent da52771 commit a819107
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 10 deletions.
16 changes: 15 additions & 1 deletion src/common/meta/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ use crate::kv_backend::KvBackendRef;
use crate::node_manager::{
Datanode, DatanodeRef, Flownode, FlownodeRef, NodeManager, NodeManagerRef,
};
use crate::peer::{Peer, StandalonePeerLookupService};
use crate::peer::{Peer, PeerLookupService, StandalonePeerLookupService};
use crate::region_keeper::MemoryRegionKeeper;
use crate::sequence::SequenceBuilder;
use crate::wal_options_allocator::WalOptionsAllocator;
use crate::{ClusterId, DatanodeId, FlownodeId};

#[async_trait::async_trait]
pub trait MockDatanodeHandler: Sync + Send + Clone {
Expand Down Expand Up @@ -183,3 +184,16 @@ pub fn new_ddl_context_with_kv_backend(
peer_lookup_service: Arc::new(StandalonePeerLookupService::new()),
}
}

pub struct NoopPeerLookupService;

#[async_trait::async_trait]
impl PeerLookupService for NoopPeerLookupService {
async fn datanode(&self, _cluster_id: ClusterId, id: DatanodeId) -> Result<Option<Peer>> {
Ok(Some(Peer::empty(id)))
}

async fn flownode(&self, _cluster_id: ClusterId, id: FlownodeId) -> Result<Option<Peer>> {
Ok(Some(Peer::empty(id)))
}
}
9 changes: 9 additions & 0 deletions src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ pub enum Error {
peer_id: u64,
},

#[snafu(display("Failed to lookup peer: {}", peer_id))]
LookupPeer {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
peer_id: u64,
},

#[snafu(display("Another migration procedure is running for region: {}", region_id))]
MigrationRunning {
#[snafu(implicit)]
Expand Down Expand Up @@ -980,6 +988,7 @@ impl ErrorExt for Error {
}

Error::Other { source, .. } => source.status_code(),
Error::LookupPeer { source, .. } => source.status_code(),
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/meta-srv/src/metasrv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ impl MetasrvBuilder {
server_addr: options.server_addr.clone(),
},
));
let peer_lookup_service = Arc::new(MetaPeerLookupService::new(meta_peer_client.clone()));
let ddl_manager = Arc::new(
DdlManager::try_new(
DdlContext {
Expand All @@ -292,9 +293,7 @@ impl MetasrvBuilder {
table_metadata_allocator: table_metadata_allocator.clone(),
flow_metadata_manager: flow_metadata_manager.clone(),
flow_metadata_allocator: flow_metadata_allocator.clone(),
peer_lookup_service: Arc::new(MetaPeerLookupService::new(
meta_peer_client.clone(),
)),
peer_lookup_service: peer_lookup_service.clone(),
},
procedure_manager.clone(),
true,
Expand Down Expand Up @@ -326,6 +325,7 @@ impl MetasrvBuilder {
selector.clone(),
region_migration_manager.clone(),
leader_cached_kv_backend.clone() as _,
peer_lookup_service,
);
let region_supervisor_ticker = region_supervisor.ticker();
(
Expand Down
26 changes: 20 additions & 6 deletions src/meta-srv/src/region/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ use std::time::Duration;

use common_meta::key::MAINTENANCE_KEY;
use common_meta::kv_backend::KvBackendRef;
use common_meta::peer::Peer;
use common_meta::peer::PeerLookupServiceRef;
use common_meta::{ClusterId, DatanodeId};
use common_runtime::JoinHandle;
use common_telemetry::{error, info, warn};
use common_time::util::current_time_millis;
use error::Error::{MigrationRunning, TableRouteNotFound};
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::time::{interval, MissedTickBehavior};
Expand Down Expand Up @@ -159,6 +159,8 @@ pub struct RegionSupervisor {
region_migration_manager: RegionMigrationManagerRef,
// TODO(weny): find a better way
kv_backend: KvBackendRef,
/// Peer lookup service
peer_lookup: PeerLookupServiceRef,
}

/// [`HeartbeatAcceptor`] forwards heartbeats to [`RegionSupervisor`].
Expand Down Expand Up @@ -191,6 +193,7 @@ impl RegionSupervisor {
selector: SelectorRef,
region_migration_manager: RegionMigrationManagerRef,
kv_backend: KvBackendRef,
peer_lookup: PeerLookupServiceRef,
) -> Self {
let (tx, rx) = tokio::sync::mpsc::channel(1024);
Self {
Expand All @@ -202,6 +205,7 @@ impl RegionSupervisor {
selector,
region_migration_manager,
kv_backend,
peer_lookup,
}
}

Expand Down Expand Up @@ -311,13 +315,20 @@ impl RegionSupervisor {
)
.await?;
let to_peer = peers.remove(0);
let from_peer = self
.peer_lookup
.datanode(cluster_id, datanode_id)
.await
.context(error::LookupPeerSnafu {
peer_id: datanode_id,
})?
.context(error::PeerUnavailableSnafu {
peer_id: datanode_id,
})?;
let task = RegionMigrationProcedureTask {
cluster_id,
region_id,
from_peer: Peer {
id: datanode_id,
addr: String::new(),
},
from_peer,
to_peer,
replay_timeout: Duration::from_secs(60),
};
Expand Down Expand Up @@ -374,6 +385,7 @@ pub(crate) mod tests {
use std::time::Duration;

use common_meta::peer::Peer;
use common_meta::test_util::NoopPeerLookupService;
use common_time::util::current_time_millis;
use rand::Rng;
use store_api::storage::RegionId;
Expand All @@ -397,6 +409,7 @@ pub(crate) mod tests {
context_factory,
));
let kv_backend = env.kv_backend();
let peer_lookup = Arc::new(NoopPeerLookupService);

RegionSupervisor::new(
Default::default(),
Expand All @@ -405,6 +418,7 @@ pub(crate) mod tests {
selector,
region_migration_manager,
kv_backend,
peer_lookup,
)
}

Expand Down

0 comments on commit a819107

Please sign in to comment.