Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(inverted_index): implement apply for SstIndexApplier #3088

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ operator = { path = "src/operator" }
partition = { path = "src/partition" }
plugins = { path = "src/plugins" }
promql = { path = "src/promql" }
puffin = { path = "src/puffin" }
query = { path = "src/query" }
script = { path = "src/script" }
servers = { path = "src/servers" }
Expand Down
9 changes: 6 additions & 3 deletions src/index/src/inverted_index/search/index_apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

mod predicates_apply;

use std::collections::BTreeSet;

use async_trait::async_trait;
pub use predicates_apply::PredicatesIndexApplier;

Expand All @@ -24,15 +26,16 @@ use crate::inverted_index::format::reader::InvertedIndexReader;
///
/// Applier instances are reusable and work with various `InvertedIndexReader` instances,
/// avoiding repeated compilation of fixed predicates such as regex patterns.
#[mockall::automock]
#[async_trait]
pub trait IndexApplier {
/// Applies the predefined predicates to the data read by the given index reader, returning
/// a list of relevant indices (e.g., post IDs, group IDs, row IDs).
async fn apply(
async fn apply<'a>(
&self,
context: SearchContext,
reader: &mut dyn InvertedIndexReader,
) -> Result<Vec<usize>>;
reader: &mut (dyn InvertedIndexReader + 'a),
) -> Result<BTreeSet<usize>>;

/// Returns the memory usage of the applier.
fn memory_usage(&self) -> usize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeSet;
use std::mem::size_of;

use async_trait::async_trait;
Expand Down Expand Up @@ -43,11 +44,11 @@ pub struct PredicatesIndexApplier {
impl IndexApplier for PredicatesIndexApplier {
/// Applies all `FstApplier`s to the data in the inverted index reader, intersecting the individual
/// bitmaps obtained for each index to result in a final set of indices.
async fn apply(
async fn apply<'a>(
&self,
context: SearchContext,
reader: &mut dyn InvertedIndexReader,
) -> Result<Vec<usize>> {
reader: &mut (dyn InvertedIndexReader + 'a),
) -> Result<BTreeSet<usize>> {
let metadata = reader.metadata().await?;

let mut bitmap = Self::bitmap_full_range(&metadata);
Expand All @@ -60,7 +61,7 @@ impl IndexApplier for PredicatesIndexApplier {
let Some(meta) = metadata.metas.get(name) else {
match context.index_not_found_strategy {
IndexNotFoundStrategy::ReturnEmpty => {
return Ok(vec![]);
return Ok(BTreeSet::default());
}
IndexNotFoundStrategy::Ignore => {
continue;
Expand Down Expand Up @@ -209,7 +210,7 @@ mod tests {
.apply(SearchContext::default(), &mut mock_reader)
.await
.unwrap();
assert_eq!(indices, vec![0, 2, 4, 6]);
assert_eq!(indices, BTreeSet::from_iter([0, 2, 4, 6]));

// An index reader with a single tag "tag-0" but without value "tag-0_value-0"
let mut mock_reader = MockInvertedIndexReader::new();
Expand Down Expand Up @@ -263,7 +264,7 @@ mod tests {
.apply(SearchContext::default(), &mut mock_reader)
.await
.unwrap();
assert_eq!(indices, vec![0, 4, 6]);
assert_eq!(indices, BTreeSet::from_iter([0, 4, 6]));
}

#[tokio::test]
Expand All @@ -281,7 +282,7 @@ mod tests {
.apply(SearchContext::default(), &mut mock_reader)
.await
.unwrap();
assert_eq!(indices, vec![0, 1, 2, 3, 4, 5, 6, 7]); // full range to scan
assert_eq!(indices, BTreeSet::from_iter([0, 1, 2, 3, 4, 5, 6, 7])); // full range to scan
}

#[tokio::test]
Expand Down Expand Up @@ -353,7 +354,7 @@ mod tests {
)
.await
.unwrap();
assert_eq!(indices, vec![0, 1, 2, 3, 4, 5, 6, 7]);
assert_eq!(indices, BTreeSet::from_iter([0, 1, 2, 3, 4, 5, 6, 7]));
}

#[test]
Expand Down
2 changes: 2 additions & 0 deletions src/mito2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ num_cpus = "1.13"
object-store.workspace = true
parquet = { workspace = true, features = ["async"] }
paste.workspace = true
pin-project.workspace = true
prometheus.workspace = true
prost.workspace = true
puffin.workspace = true
regex = "1.5"
serde = { version = "1.0", features = ["derive"] }
serde_json.workspace = true
Expand Down
12 changes: 4 additions & 8 deletions src/mito2/src/access_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@

use std::sync::Arc;

use object_store::{util, ObjectStore};
use object_store::ObjectStore;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;

use crate::error::{DeleteSstSnafu, Result};
use crate::read::Source;
use crate::sst::file::{FileHandle, FileId};
use crate::sst::location;
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::sst::parquet::writer::ParquetWriter;

Expand Down Expand Up @@ -61,7 +62,7 @@ impl AccessLayer {

/// Deletes a SST file with given file id.
pub(crate) async fn delete_sst(&self, file_id: FileId) -> Result<()> {
let path = self.sst_file_path(&file_id.as_parquet());
let path = location::sst_file_path(&self.region_dir, file_id);
self.object_store
.delete(&path)
.await
Expand All @@ -81,12 +82,7 @@ impl AccessLayer {
metadata: RegionMetadataRef,
source: Source,
) -> ParquetWriter {
let path = self.sst_file_path(&file_id.as_parquet());
let path = location::sst_file_path(&self.region_dir, file_id);
ParquetWriter::new(path, metadata, source, self.object_store.clone())
}

/// Returns the `file_path` for the `file_name` in the object store.
fn sst_file_path(&self, file_name: &str) -> String {
util::join_path(&self.region_dir, file_name)
}
}
33 changes: 32 additions & 1 deletion src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,33 @@ pub enum Error {
source: datatypes::error::Error,
location: Location,
},

#[snafu(display("Failed to apply index"))]
ApplyIndex {
#[snafu(source)]
source: index::inverted_index::error::Error,
location: Location,
},

#[snafu(display("Failed to read puffin metadata"))]
PuffinReadMetadata {
#[snafu(source)]
source: puffin::error::Error,
location: Location,
},

#[snafu(display("Failed to read puffin blob"))]
PuffinReadBlob {
#[snafu(source)]
source: puffin::error::Error,
location: Location,
},

#[snafu(display("Blob type not found, blob_type: {blob_type}"))]
PuffinBlobTypeNotFound {
blob_type: String,
location: Location,
},
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -477,6 +504,7 @@ impl ErrorExt for Error {
| RegionCorrupted { .. }
| CreateDefault { .. }
| InvalidParquet { .. }
| PuffinBlobTypeNotFound { .. }
| UnexpectedReplay { .. } => StatusCode::Unexpected,
RegionNotFound { .. } => StatusCode::RegionNotFound,
ObjectStoreNotFound { .. }
Expand Down Expand Up @@ -522,8 +550,11 @@ impl ErrorExt for Error {
JsonOptions { .. } => StatusCode::InvalidArguments,
EmptyRegionDir { .. } | EmptyManifestDir { .. } => StatusCode::RegionNotFound,
ArrowReader { .. } => StatusCode::StorageUnavailable,
BuildIndexApplier { source, .. } => source.status_code(),
ConvertValue { source, .. } => source.status_code(),
BuildIndexApplier { source, .. } | ApplyIndex { source, .. } => source.status_code(),
PuffinReadMetadata { source, .. } | PuffinReadBlob { source, .. } => {
source.status_code()
}
}
}

Expand Down
48 changes: 48 additions & 0 deletions src/mito2/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ pub const STAGE_LABEL: &str = "stage";
pub const TYPE_LABEL: &str = "type";
/// Reason to flush.
pub const FLUSH_REASON: &str = "reason";
/// File type label.
pub const FILE_TYPE_LABEL: &str = "file_type";

lazy_static! {
/// Global write buffer size in bytes.
Expand Down Expand Up @@ -143,4 +145,50 @@ lazy_static! {
&[TYPE_LABEL]
)
.unwrap();
// ------- End of cache metrics.

// Index metrics.
/// Timer of index application.
pub static ref INDEX_APPLY_COST_TIME: Histogram = register_histogram!(
"index_apply_cost_time",
zhongzc marked this conversation as resolved.
Show resolved Hide resolved
"index apply cost time",
)
.unwrap();
/// Gauge of index apply memory usage.
pub static ref INDEX_APPLY_MEMORY_USAGE: IntGauge = register_int_gauge!(
"index_apply_memory_usage",
"index apply memory usage",
)
.unwrap();
/// Counter of r/w bytes on index related IO operations.
pub static ref INDEX_IO_BYTES_TOTAL: IntCounterVec = register_int_counter_vec!(
"index_io_bytes_total",
"index io bytes total",
&[TYPE_LABEL, FILE_TYPE_LABEL]
)
.unwrap();
/// Counter of read bytes on puffin files.
pub static ref INDEX_PUFFIN_READ_BYTES_TOTAL: IntCounter = INDEX_IO_BYTES_TOTAL
.with_label_values(&["read", "puffin"]);

/// Counter of r/w operations on index related IO operations, e.g. read, write, seek and flush.
pub static ref INDEX_IO_OP_TOTAL: IntCounterVec = register_int_counter_vec!(
"index_io_op_total",
"index io op total",
&[TYPE_LABEL, FILE_TYPE_LABEL]
)
.unwrap();
/// Counter of read operations on puffin files.
pub static ref INDEX_PUFFIN_READ_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
.with_label_values(&["read", "puffin"]);
/// Counter of seek operations on puffin files.
pub static ref INDEX_PUFFIN_SEEK_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
.with_label_values(&["seek", "puffin"]);
/// Counter of write operations on puffin files.
pub static ref INDEX_PUFFIN_WRITE_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
.with_label_values(&["write", "puffin"]);
/// Counter of flush operations on puffin files.
pub static ref INDEX_PUFFIN_FLUSH_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
.with_label_values(&["flush", "puffin"]);
// ------- End of index metrics.
}
3 changes: 2 additions & 1 deletion src/mito2/src/sst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

pub mod file;
pub mod file_purger;
mod index;
pub mod index;
pub mod location;
pub mod parquet;
pub(crate) mod version;
9 changes: 7 additions & 2 deletions src/mito2/src/sst/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use common_time::Timestamp;
use object_store::util::join_path;
use serde::{Deserialize, Serialize};
use snafu::{ResultExt, Snafu};
use store_api::storage::RegionId;
use uuid::Uuid;

use crate::sst::file_purger::{FilePurgerRef, PurgeRequest};
use crate::sst::location;

/// Type to store SST level.
pub type Level = u8;
Expand Down Expand Up @@ -57,6 +57,11 @@ impl FileId {
pub fn as_parquet(&self) -> String {
format!("{}{}", self, ".parquet")
}

/// Append `.puffin` to file id to make a complete file name
pub fn as_puffin(&self) -> String {
format!("{}{}", self, ".puffin")
}
}

impl fmt::Display for FileId {
Expand Down Expand Up @@ -131,7 +136,7 @@ impl FileHandle {

/// Returns the complete file path of the file.
pub fn file_path(&self, file_dir: &str) -> String {
join_path(file_dir, &self.file_id().as_parquet())
location::sst_file_path(file_dir, self.file_id())
}

/// Returns the time range of the file.
Expand Down
10 changes: 4 additions & 6 deletions src/mito2/src/sst/file_purger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,13 @@ impl FilePurger for LocalFilePurger {
mod tests {
use common_test_util::temp_dir::create_temp_dir;
use object_store::services::Fs;
use object_store::{util, ObjectStore};
use object_store::ObjectStore;

use super::*;
use crate::access_layer::AccessLayer;
use crate::schedule::scheduler::{LocalScheduler, Scheduler};
use crate::sst::file::{FileHandle, FileId, FileMeta, FileTimeRange};
use crate::sst::location;

#[tokio::test]
async fn test_file_purge() {
Expand All @@ -119,7 +120,7 @@ mod tests {
let object_store = ObjectStore::new(builder).unwrap().finish();
let sst_file_id = FileId::random();
let sst_dir = "table1";
let path = util::join_path(sst_dir, &sst_file_id.as_parquet());
let path = location::sst_file_path(sst_dir, sst_file_id);

object_store.write(&path, vec![0; 4096]).await.unwrap();

Expand All @@ -145,9 +146,6 @@ mod tests {

scheduler.stop(true).await.unwrap();

assert!(!object_store
.is_exist(&format!("{}/{}", sst_dir, sst_file_id.as_parquet()))
.await
.unwrap());
assert!(!object_store.is_exist(&path).await.unwrap());
}
}
3 changes: 3 additions & 0 deletions src/mito2/src/sst/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,6 @@

pub mod applier;
mod codec;
mod store;

const INDEX_BLOB_TYPE: &str = "greptime-inverted-index-v1";
zhongzc marked this conversation as resolved.
Show resolved Hide resolved
Loading
Loading