Skip to content

Commit

Permalink
refactor: add 'fallback_to_local_compaction' region option
Browse files Browse the repository at this point in the history
  • Loading branch information
zyy17 committed Aug 16, 2024
1 parent c8de8b8 commit 3a0551f
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 3 deletions.
17 changes: 16 additions & 1 deletion src/mito2/src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use common_time::Timestamp;
use datafusion_common::ScalarValue;
use datafusion_expr::Expr;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use snafu::{Location, OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
use table::predicate::Predicate;
Expand All @@ -47,6 +47,7 @@ use crate::compaction::compactor::{CompactionRegion, DefaultCompactor};
use crate::compaction::picker::{new_picker, CompactionTask};
use crate::compaction::task::CompactionTaskImpl;
use crate::config::MitoConfig;
use crate::error::Error::RemoteCompaction;
use crate::error::{
CompactRegionSnafu, Error, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result,
TimeRangePredicateOverflowSnafu,
Expand Down Expand Up @@ -314,6 +315,20 @@ impl CompactionScheduler {
return Ok(());
}
Err(e) => {
if !current_version
.options
.compaction
.fallback_to_local_compaction()
{
error!(e; "Failed to schedule remote compaction job for region {}", region_id);
return Err(RemoteCompaction {
region_id,
job_id: None,
reason: e.reason,
location: Location::default(),
});
}

error!(e; "Failed to schedule remote compaction job for region {}, fallback to local compaction", region_id);

// Return the waiters back to the caller for local compaction.
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -757,14 +757,14 @@ pub enum Error {
},

#[snafu(display(
"Failed to remotely compact region {} by job {} due to {}",
"Failed to remotely compact region {} by job {:?} due to {}",
region_id,
job_id,
reason
))]
RemoteCompaction {
region_id: RegionId,
job_id: JobId,
job_id: Option<JobId>,
reason: String,
#[snafu(implicit)]
location: Location,
Expand Down
14 changes: 14 additions & 0 deletions src/mito2/src/region/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,12 @@ impl CompactionOptions {
CompactionOptions::Twcs(opts) => opts.remote_compaction,
}
}

pub(crate) fn fallback_to_local_compaction(&self) -> bool {
match self {
CompactionOptions::Twcs(opts) => opts.fallback_to_local_compaction,
}
}
}

impl Default for CompactionOptions {
Expand Down Expand Up @@ -201,6 +207,9 @@ pub struct TwcsOptions {
/// Whether to use remote compaction.
#[serde_as(as = "DisplayFromStr")]
pub remote_compaction: bool,
/// Whether to fall back to local compaction if remote compaction fails.
#[serde_as(as = "DisplayFromStr")]
pub fallback_to_local_compaction: bool,
}

with_prefix!(prefix_twcs "compaction.twcs.");
Expand Down Expand Up @@ -228,6 +237,7 @@ impl Default for TwcsOptions {
max_inactive_window_files: 1,
time_window: None,
remote_compaction: false,
fallback_to_local_compaction: true,
}
}
}
Expand Down Expand Up @@ -590,6 +600,7 @@ mod tests {
("compaction.twcs.time_window", "2h"),
("compaction.type", "twcs"),
("compaction.twcs.remote_compaction", "false"),
("compaction.twcs.fallback_to_local_compaction", "true"),
("storage", "S3"),
("append_mode", "false"),
("index.inverted_index.ignore_column_ids", "1,2,3"),
Expand All @@ -614,6 +625,7 @@ mod tests {
max_inactive_window_files: 3,
time_window: Some(Duration::from_secs(3600 * 2)),
remote_compaction: false,
fallback_to_local_compaction: true,
}),
storage: Some("S3".to_string()),
append_mode: false,
Expand Down Expand Up @@ -645,6 +657,7 @@ mod tests {
max_inactive_window_files: usize::MAX,
time_window: Some(Duration::from_secs(3600 * 2)),
remote_compaction: false,
fallback_to_local_compaction: true,
}),
storage: Some("S3".to_string()),
append_mode: false,
Expand Down Expand Up @@ -710,6 +723,7 @@ mod tests {
max_inactive_window_files: 7,
time_window: Some(Duration::from_secs(3600 * 2)),
remote_compaction: false,
fallback_to_local_compaction: true,
}),
storage: Some("S3".to_string()),
append_mode: false,
Expand Down

0 comments on commit 3a0551f

Please sign in to comment.