Skip to content

Commit

Permalink
refactor: expose DatanodeBuilder::build_object_store_manager() and …
Browse files Browse the repository at this point in the history
…`MitoConfig::sanitize()` (#4212)

* refactor: expose DatanodeBuilder::build_object_store_manager()

* refactor: expose MitoConfig::sanitize()
  • Loading branch information
zyy17 committed Jul 1, 2024
1 parent 6a634f8 commit fe2c5c3
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 18 deletions.
33 changes: 16 additions & 17 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use store_api::storage::RegionId;
use tokio::fs;
use tokio::sync::Notify;

use crate::config::{DatanodeOptions, RegionEngineConfig};
use crate::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
use crate::error::{
self, BuildMitoEngineSnafu, CreateDirSnafu, GetMetadataSnafu, MissingKvBackendSnafu,
MissingNodeIdSnafu, OpenLogStoreSnafu, Result, RuntimeResourceSnafu, ShutdownInstanceSnafu,
Expand Down Expand Up @@ -269,6 +269,20 @@ impl DatanodeBuilder {
})
}

/// Builds [ObjectStoreManager] from [StorageConfig].
pub async fn build_object_store_manager(cfg: &StorageConfig) -> Result<ObjectStoreManagerRef> {
let object_store = store::new_object_store(cfg.store.clone(), &cfg.data_home).await?;
let default_name = cfg.store.name();
let mut object_store_manager = ObjectStoreManager::new(default_name, object_store);
for store in &cfg.providers {
object_store_manager.add(
store.name(),
store::new_object_store(store.clone(), &cfg.data_home).await?,
);
}
Ok(Arc::new(object_store_manager))
}

#[cfg(test)]
/// Open all regions belong to this datanode.
async fn initialize_region_server(
Expand Down Expand Up @@ -328,7 +342,7 @@ impl DatanodeBuilder {
table_provider_factory,
);

let object_store_manager = Self::build_object_store_manager(opts).await?;
let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;
let engines = Self::build_store_engines(opts, object_store_manager).await?;
for engine in engines {
region_server.register_engine(engine);
Expand Down Expand Up @@ -431,21 +445,6 @@ impl DatanodeBuilder {
.context(OpenLogStoreSnafu)
.map(Arc::new)
}

/// Builds [ObjectStoreManager]
async fn build_object_store_manager(opts: &DatanodeOptions) -> Result<ObjectStoreManagerRef> {
let object_store =
store::new_object_store(opts.storage.store.clone(), &opts.storage.data_home).await?;
let default_name = opts.storage.store.name();
let mut object_store_manager = ObjectStoreManager::new(default_name, object_store);
for store in &opts.storage.providers {
object_store_manager.add(
store.name(),
store::new_object_store(store.clone(), &opts.storage.data_home).await?,
);
}
Ok(Arc::new(object_store_manager))
}
}

/// Open all regions belong to this datanode.
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ impl MitoConfig {
/// Sanitize incorrect configurations.
///
/// Returns an error if there is a configuration that unable to sanitize.
pub(crate) fn sanitize(&mut self, data_home: &str) -> Result<()> {
pub fn sanitize(&mut self, data_home: &str) -> Result<()> {
// Use default value if `num_workers` is 0.
if self.num_workers == 0 {
self.num_workers = divide_num_cpus(2);
Expand Down

0 comments on commit fe2c5c3

Please sign in to comment.