Skip to content

Commit

Permalink
feat: able to handle concurrent region edit requests
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelScofield committed Aug 15, 2024
1 parent 93be81c commit 7f083ed
Show file tree
Hide file tree
Showing 6 changed files with 227 additions and 32 deletions.
8 changes: 5 additions & 3 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ mod create_test;
#[cfg(test)]
mod drop_test;
#[cfg(test)]
mod edit_region_test;
#[cfg(test)]
mod filter_deleted_test;
#[cfg(test)]
mod flush_test;
Expand Down Expand Up @@ -88,7 +90,7 @@ use crate::manifest::action::RegionEdit;
use crate::metrics::HANDLE_REQUEST_ELAPSED;
use crate::read::scan_region::{ScanParallism, ScanRegion, Scanner};
use crate::region::RegionUsage;
use crate::request::WorkerRequest;
use crate::request::{RegionEditRequest, WorkerRequest};
use crate::wal::entry_distributor::{
build_wal_entry_distributor_and_receivers, DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
};
Expand Down Expand Up @@ -196,11 +198,11 @@ impl MitoEngine {
);

let (tx, rx) = oneshot::channel();
let request = WorkerRequest::EditRegion {
let request = WorkerRequest::EditRegion(RegionEditRequest {
region_id,
edit,
tx,
};
});
self.inner
.workers
.submit_to_worker(region_id, request)
Expand Down
123 changes: 123 additions & 0 deletions src/mito2/src/engine/edit_region_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use object_store::ObjectStore;
use store_api::region_engine::RegionEngine;
use store_api::region_request::RegionRequest;
use store_api::storage::RegionId;
use tokio::sync::Semaphore;
use tokio::task::JoinSet;

use crate::config::MitoConfig;
use crate::engine::MitoEngine;
use crate::manifest::action::RegionEdit;
use crate::region::MitoRegionRef;
use crate::sst::file::{FileId, FileMeta};
use crate::test_util::{CreateRequestBuilder, TestEnv};

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_edit_region_concurrently() {
const EDITS_PER_TASK: usize = 10;
let tasks_count = 10;

// A task that creates SST files and edits the region with them.
struct Task {
region: MitoRegionRef,
ssts: Vec<FileMeta>,
}

impl Task {
async fn create_ssts(&mut self, object_store: &ObjectStore) {
for _ in 0..EDITS_PER_TASK {
let file = FileMeta {
region_id: self.region.region_id,
file_id: FileId::random(),
level: 0,
..Default::default()
};
object_store
.write(
&format!("{}/{}.parquet", self.region.region_dir(), file.file_id),
b"x".as_slice(),
)
.await
.unwrap();
self.ssts.push(file);
}
}

async fn edit_region(self, engine: MitoEngine) {
for sst in self.ssts {
let edit = RegionEdit {
files_to_add: vec![sst],
files_to_remove: vec![],
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
};
engine
.edit_region(self.region.region_id, edit)
.await
.unwrap();
}
}
}

let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
engine
.handle_request(
region_id,
RegionRequest::Create(CreateRequestBuilder::new().build()),
)
.await
.unwrap();
let region = engine.get_region(region_id).unwrap();

let mut tasks = Vec::with_capacity(tasks_count);
let object_store = env.get_object_store().unwrap();
for _ in 0..tasks_count {
let mut task = Task {
region: region.clone(),
ssts: Vec::new(),
};
task.create_ssts(&object_store).await;
tasks.push(task);
}

let mut join_set = JoinSet::new();
// This semaphore is used to coordinate the tasks, making them started at roughly the same time.
let s = Arc::new(Semaphore::new(0));
for task in tasks {
join_set.spawn({
let s = s.clone();
let engine = engine.clone();
async move {
let _permit = s.acquire().await.unwrap();
task.edit_region(engine).await;
}
});
}
s.add_permits(tasks_count);
while join_set.join_next().await.is_some() {}

assert_eq!(
region.version().ssts.levels()[0].files.len(),
tasks_count * EDITS_PER_TASK
);
}
8 changes: 8 additions & 0 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Region {} is busy", region_id))]
RegionBusy {
region_id: RegionId,
#[snafu(implicit)]
location: Location,
},
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -973,6 +980,7 @@ impl ErrorExt for Error {
| FulltextFinish { source, .. }
| ApplyFulltextIndex { source, .. } => source.status_code(),
DecodeStats { .. } | StatsNotPresent { .. } => StatusCode::Internal,
RegionBusy { .. } => StatusCode::RegionBusy,
}
}

Expand Down
15 changes: 10 additions & 5 deletions src/mito2/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,11 +494,7 @@ pub(crate) enum WorkerRequest {
Stop,

/// Use [RegionEdit] to edit a region directly.
EditRegion {
region_id: RegionId,
edit: RegionEdit,
tx: Sender<Result<()>>,
},
EditRegion(RegionEditRequest),
}

impl WorkerRequest {
Expand Down Expand Up @@ -762,6 +758,15 @@ pub(crate) struct RegionChangeResult {
pub(crate) result: Result<()>,
}

/// Request to edit a region directly.
#[derive(Debug)]
pub(crate) struct RegionEditRequest {
pub(crate) region_id: RegionId,
pub(crate) edit: RegionEdit,
/// The sender to notify the result to the region engine.
pub(crate) tx: Sender<Result<()>>,
}

/// Notifies the regin the result of editing region.
#[derive(Debug)]
pub(crate) struct RegionEditResult {
Expand Down
14 changes: 7 additions & 7 deletions src/mito2/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::time_provider::{StdTimeProvider, TimeProviderRef};
use crate::wal::Wal;
use crate::worker::handle_manifest::RegionEditQueues;

/// Identifier for a worker.
pub(crate) type WorkerId = u32;
Expand Down Expand Up @@ -441,6 +442,7 @@ impl<S: LogStore> WorkerStarter<S> {
flush_receiver: self.flush_receiver,
stalled_count: WRITE_STALL_TOTAL.with_label_values(&[&id_string]),
region_count: REGION_COUNT.with_label_values(&[&id_string]),
region_edit_queues: RegionEditQueues::default(),
};
let handle = common_runtime::spawn_global(async move {
worker_thread.run().await;
Expand Down Expand Up @@ -629,6 +631,8 @@ struct RegionWorkerLoop<S> {
stalled_count: IntGauge,
/// Gauge of regions in the worker.
region_count: IntGauge,
/// Queues for region edit requests.
region_edit_queues: RegionEditQueues,
}

impl<S: LogStore> RegionWorkerLoop<S> {
Expand Down Expand Up @@ -727,12 +731,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
WorkerRequest::SetReadonlyGracefully { region_id, sender } => {
self.set_readonly_gracefully(region_id, sender).await;
}
WorkerRequest::EditRegion {
region_id,
edit,
tx,
} => {
self.handle_region_edit(region_id, edit, tx).await;
WorkerRequest::EditRegion(request) => {
self.handle_region_edit(request).await;
}
// We receive a stop signal, but we still want to process remaining
// requests. The worker thread will then check the running flag and
Expand Down Expand Up @@ -824,7 +824,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
BackgroundNotify::CompactionFailed(req) => self.handle_compaction_failure(req).await,
BackgroundNotify::Truncate(req) => self.handle_truncate_result(req).await,
BackgroundNotify::RegionChange(req) => self.handle_manifest_region_change_result(req),
BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req),
BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req).await,
}
}

Expand Down
91 changes: 74 additions & 17 deletions src/mito2/src/worker/handle_manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,38 +16,89 @@
//!
//! It updates the manifest and applies the changes to the region in background.

use std::collections::{HashMap, VecDeque};

use common_telemetry::{info, warn};
use snafu::ensure;
use store_api::storage::RegionId;
use tokio::sync::oneshot::Sender;

use crate::error::{InvalidRequestSnafu, RegionNotFoundSnafu, Result};
use crate::error::{InvalidRequestSnafu, RegionBusySnafu, RegionNotFoundSnafu, Result};
use crate::manifest::action::{
RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionTruncate,
};
use crate::region::{MitoRegionRef, RegionState};
use crate::request::{
BackgroundNotify, OptionOutputTx, RegionChangeResult, RegionEditResult, TruncateResult,
WorkerRequest,
BackgroundNotify, OptionOutputTx, RegionChangeResult, RegionEditRequest, RegionEditResult,
TruncateResult, WorkerRequest,
};
use crate::worker::RegionWorkerLoop;

pub(crate) type RegionEditQueues = HashMap<RegionId, RegionEditQueue>;

/// A queue for temporary store region edit requests, if the region is in the "Editing" state.
/// When the current region edit request is completed, the next (if there exists) request in the
/// queue will be processed.
/// Everything is done in the region worker loop.
pub(crate) struct RegionEditQueue {
region_id: RegionId,
requests: VecDeque<RegionEditRequest>,
}

impl RegionEditQueue {
const QUEUE_MAX_LEN: usize = 128;

fn new(region_id: RegionId) -> Self {
Self {
region_id,
requests: VecDeque::new(),
}
}

fn enqueue(&mut self, request: RegionEditRequest) {
if self.requests.len() > Self::QUEUE_MAX_LEN {
let _ = request.tx.send(
RegionBusySnafu {
region_id: self.region_id,
}
.fail(),
);
return;
};
self.requests.push_back(request);
}

fn dequeue(&mut self) -> Option<RegionEditRequest> {
self.requests.pop_front()
}
}

impl<S> RegionWorkerLoop<S> {
/// Handles region edit request.
pub(crate) async fn handle_region_edit(
&self,
region_id: RegionId,
edit: RegionEdit,
sender: Sender<Result<()>>,
) {
let region = match self.regions.writable_region(region_id) {
Ok(region) => region,
Err(e) => {
let _ = sender.send(Err(e));
return;
}
pub(crate) async fn handle_region_edit(&mut self, request: RegionEditRequest) {
let region_id = request.region_id;
let Some(region) = self.regions.get_region(region_id) else {
let _ = request.tx.send(RegionNotFoundSnafu { region_id }.fail());
return;
};

if !region.is_writable() {
if region.state() == RegionState::Editing {
self.region_edit_queues
.entry(region_id)
.or_insert_with(|| RegionEditQueue::new(region_id))
.enqueue(request);
} else {
let _ = request.tx.send(RegionBusySnafu { region_id }.fail());
}
return;
}

let RegionEditRequest {
region_id: _,
edit,
tx: sender,
} = request;

// Marks the region as editing.
if let Err(e) = region.set_editing() {
let _ = sender.send(Err(e));
Expand Down Expand Up @@ -79,7 +130,7 @@ impl<S> RegionWorkerLoop<S> {
}

/// Handles region edit result.
pub(crate) fn handle_region_edit_result(&self, edit_result: RegionEditResult) {
pub(crate) async fn handle_region_edit_result(&mut self, edit_result: RegionEditResult) {
let region = match self.regions.get_region(edit_result.region_id) {
Some(region) => region,
None => {
Expand All @@ -104,6 +155,12 @@ impl<S> RegionWorkerLoop<S> {
region.switch_state_to_writable(RegionState::Editing);

let _ = edit_result.sender.send(edit_result.result);

if let Some(edit_queue) = self.region_edit_queues.get_mut(&edit_result.region_id) {
if let Some(request) = edit_queue.dequeue() {
self.handle_region_edit(request).await;
}
}
}

/// Writes truncate action to the manifest and then applies it to the region in background.
Expand Down

0 comments on commit 7f083ed

Please sign in to comment.