Skip to content

Commit

Permalink
fix: update object store crate to 0.10.2 (#870)
Browse files Browse the repository at this point in the history
Co-authored-by: parmesant <anant.v09@protonmail.com>
  • Loading branch information
parmesant and parmesant committed Aug 5, 2024
1 parent 225d640 commit e581c10
Show file tree
Hide file tree
Showing 9 changed files with 497 additions and 384 deletions.
353 changes: 219 additions & 134 deletions Cargo.lock

Large diffs are not rendered by default.

20 changes: 10 additions & 10 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ build = "build.rs"
[dependencies]
### apache arrow/datafusion dependencies
# arrow = "51.0.0"
arrow-schema = { version = "51.0.0", features = ["serde"] }
arrow-array = { version = "51.0.0" }
arrow-json = "51.0.0"
arrow-ipc = { version = "51.0.0", features = ["zstd"] }
arrow-select = "51.0.0"
datafusion = "37.1.0"
object_store = { version = "0.9.1", features = ["cloud", "aws"] } # cannot update object_store as datafusion has not caught up
parquet = "51.0.0"
arrow-flight = { version = "51.0.0", features = [ "tls" ] }
arrow-schema = { version = "52.1.0", features = ["serde"] }
arrow-array = { version = "52.1.0" }
arrow-json = "52.1.0"
arrow-ipc = { version = "52.1.0", features = ["zstd"] }
arrow-select = "52.1.0"
datafusion = { git = "https://github.com/apache/datafusion.git", rev = "a64df83502821f18067fb4ff65dd217815b305c9" }
object_store = { version = "0.10.2", features = ["cloud", "aws"] } # cannot update object_store as datafusion has not caught up
parquet = "52.1.0"
arrow-flight = { version = "52.1.0", features = [ "tls" ] }
tonic = {version = "0.11.0", features = ["tls", "transport", "gzip", "zstd"] }
tonic-web = "0.11.0"
tower-http = { version = "0.4.4", features = ["cors"] }
Expand Down Expand Up @@ -62,7 +62,7 @@ hex = "0.4"
hostname = "0.4.0"
http = "0.2.7"
humantime-serde = "1.1"
itertools = "0.12.1"
itertools = "0.13.0"
log = "0.4"
num_cpus = "1.15"
once_cell = "1.17.1"
Expand Down
2 changes: 1 addition & 1 deletion server/src/handlers/http/llm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ pub async fn make_llm_request(body: web::Json<AiPrompt>) -> Result<HttpResponse,
let stream_name = &body.stream;
let schema = STREAM_INFO.schema(stream_name)?;
let filtered_schema = schema
.all_fields()
.flattened_fields()
.into_iter()
.map(Field::from)
.collect_vec();
Expand Down
1 change: 1 addition & 0 deletions server/src/hottier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ impl HotTierManager {
Ok(file_processed)
}

#[allow(dead_code)]
///delete the files for the date range given from the hot tier directory for the stream
/// update the used and available size in the hot tier metadata
pub async fn delete_files_from_hot_tier(
Expand Down
10 changes: 5 additions & 5 deletions server/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ impl TableScanVisitor {
}
}

impl TreeNodeVisitor for TableScanVisitor {
impl TreeNodeVisitor<'_> for TableScanVisitor {
type Node = LogicalPlan;

fn f_down(&mut self, node: &Self::Node) -> Result<TreeNodeRecursion, DataFusionError> {
Expand Down Expand Up @@ -232,27 +232,27 @@ fn transform(
_start_time_filter =
PartialTimeFilter::Low(std::ops::Bound::Included(start_time))
.binary_expr(Expr::Column(Column::new(
Some(table.table_name.to_owned_reference()),
Some(table.table_name.to_owned()),
time_partition.clone(),
)));
_end_time_filter =
PartialTimeFilter::High(std::ops::Bound::Excluded(end_time))
.binary_expr(Expr::Column(Column::new(
Some(table.table_name.to_owned_reference()),
Some(table.table_name.to_owned()),
time_partition,
)));
}
None => {
_start_time_filter =
PartialTimeFilter::Low(std::ops::Bound::Included(start_time))
.binary_expr(Expr::Column(Column::new(
Some(table.table_name.to_owned_reference()),
Some(table.table_name.to_owned()),
event::DEFAULT_TIMESTAMP_KEY,
)));
_end_time_filter =
PartialTimeFilter::High(std::ops::Bound::Excluded(end_time))
.binary_expr(Expr::Column(Column::new(
Some(table.table_name.to_owned_reference()),
Some(table.table_name.to_owned()),
event::DEFAULT_TIMESTAMP_KEY,
)));
}
Expand Down
307 changes: 158 additions & 149 deletions server/src/query/filter_optimizer.rs
Original file line number Diff line number Diff line change
@@ -1,149 +1,158 @@
/*
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

use std::{collections::HashMap, sync::Arc};

use datafusion::{
common::{DFField, DFSchema},
logical_expr::{Filter, LogicalPlan, Projection},
optimizer::{optimize_children, OptimizerRule},
prelude::{lit, or, Column, Expr},
scalar::ScalarValue,
};

/// Rewrites logical plan for source using projection and filter
pub struct FilterOptimizerRule {
pub column: String,
pub literals: Vec<String>,
}

// Try to add filter node on table scan
// As every table supports projection push down
// we try to directly add projection for column directly to table
// To preserve the orignal projection we must add a projection node with orignal projection
impl OptimizerRule for FilterOptimizerRule {
fn try_optimize(
&self,
plan: &datafusion::logical_expr::LogicalPlan,
config: &dyn datafusion::optimizer::OptimizerConfig,
) -> datafusion::error::Result<Option<datafusion::logical_expr::LogicalPlan>> {
// if there are no patterns then the rule cannot be performed
let Some(filter_expr) = self.expr() else {
return Ok(None);
};

if let LogicalPlan::Filter(filter) = plan {
if filter.predicate == filter_expr {
return Ok(None);
}
}

if let LogicalPlan::TableScan(table) = plan {
if table.projection.is_none()
|| table
.filters
.iter()
.any(|expr| self.contains_valid_tag_filter(expr))
{
return Ok(None);
}

let mut table = table.clone();
let schema = &table.source.schema();
let orignal_projection = table.projected_schema.clone();

// add filtered column projection to table
if !table
.projected_schema
.has_column_with_unqualified_name(&self.column)
{
let tags_index = schema.index_of(&self.column)?;
let tags_field = schema.field(tags_index);
// modify source table projection to include tags
let mut df_schema = table.projected_schema.fields().clone();
df_schema.push(DFField::new(
Some(table.table_name.clone()),
tags_field.name(),
tags_field.data_type().clone(),
tags_field.is_nullable(),
));

table.projected_schema =
Arc::new(DFSchema::new_with_metadata(df_schema, HashMap::default())?);
if let Some(projection) = &mut table.projection {
projection.push(tags_index)
}
}

let filter = LogicalPlan::Filter(Filter::try_new(
filter_expr,
Arc::new(LogicalPlan::TableScan(table)),
)?);
let plan = LogicalPlan::Projection(Projection::new_from_schema(
Arc::new(filter),
orignal_projection,
));

return Ok(Some(plan));
}

// If we didn't find anything then recurse as normal and build the result.
optimize_children(self, plan, config)
}

fn name(&self) -> &str {
"parseable_read_filter"
}
}

impl FilterOptimizerRule {
fn expr(&self) -> Option<Expr> {
let mut patterns = self.literals.iter().map(|literal| {
Expr::Column(Column::from_name(&self.column)).like(lit(format!("%{}%", literal)))
});

let mut filter_expr = patterns.next()?;
for expr in patterns {
filter_expr = or(filter_expr, expr)
}

Some(filter_expr)
}

fn contains_valid_tag_filter(&self, expr: &Expr) -> bool {
match expr {
Expr::Like(like) => {
let matches_column = match &*like.expr {
Expr::Column(column) => column.name == self.column,
_ => return false,
};

let matches_pattern = match &*like.pattern {
Expr::Literal(ScalarValue::Utf8(Some(literal))) => {
let literal = literal.trim_matches('%');
self.literals.iter().any(|x| x == literal)
}
_ => false,
};

matches_column && matches_pattern && !like.negated
}
_ => false,
}
}
}
// /*
// * Parseable Server (C) 2022 - 2024 Parseable, Inc.
// *
// * This program is free software: you can redistribute it and/or modify
// * it under the terms of the GNU Affero General Public License as
// * published by the Free Software Foundation, either version 3 of the
// * License, or (at your option) any later version.
// *
// * This program is distributed in the hope that it will be useful,
// * but WITHOUT ANY WARRANTY; without even the implied warranty of
// * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// * GNU Affero General Public License for more details.
// *
// * You should have received a copy of the GNU Affero General Public License
// * along with this program. If not, see <http://www.gnu.org/licenses/>.
// *
// */
// use std::{collections::HashMap, sync::Arc};

// use arrow_schema::Field;
// use datafusion::{
// common::DFSchema,
// logical_expr::{Filter, LogicalPlan, Projection},
// optimizer::{optimize_children, OptimizerRule},
// prelude::{lit, or, Column, Expr},
// scalar::ScalarValue,
// };

// /// Rewrites logical plan for source using projection and filter
// pub struct FilterOptimizerRule {
// pub column: String,
// pub literals: Vec<String>,
// }

// // Try to add filter node on table scan
// // As every table supports projection push down
// // we try to directly add projection for column directly to table
// // To preserve the orignal projection we must add a projection node with orignal projection
// impl OptimizerRule for FilterOptimizerRule {
// fn try_optimize(
// &self,
// plan: &datafusion::logical_expr::LogicalPlan,
// config: &dyn datafusion::optimizer::OptimizerConfig,
// ) -> datafusion::error::Result<Option<datafusion::logical_expr::LogicalPlan>> {
// // if there are no patterns then the rule cannot be performed
// let Some(filter_expr) = self.expr() else {
// return Ok(None);
// };

// if let LogicalPlan::Filter(filter) = plan {
// if filter.predicate == filter_expr {
// return Ok(None);
// }
// }

// if let LogicalPlan::TableScan(table) = plan {
// if table.projection.is_none()
// || table
// .filters
// .iter()
// .any(|expr| self.contains_valid_tag_filter(expr))
// {
// return Ok(None);
// }

// let mut table = table.clone();
// let schema = &table.source.schema();
// let orignal_projection = table.projected_schema.clone();

// // add filtered column projection to table
// if !table
// .projected_schema
// .has_column_with_unqualified_name(&self.column)
// {
// let tags_index = schema.index_of(&self.column)?;
// let tags_field = schema.field(tags_index);
// // modify source table projection to include tags
// let df_schema = table.projected_schema.fields().clone();

// // from datafusion 37.1.0 -> 40.0.0
// // `DFField` has been removed
// // `DFSchema.new_with_metadata()` has changed
// // it requires `qualified_fields`(`Vec<(Option<TableReference>, Arc<Field>)>`) instead of `fields`
// // hence, use `DFSchema::from_unqualified_fields()` for relatively unchanged code

// df_schema.to_vec().push(Arc::new(Field::new(
// tags_field.name(),
// tags_field.data_type().clone(),
// tags_field.is_nullable(),
// )));

// table.projected_schema =
// Arc::new(DFSchema::from_unqualified_fields(df_schema, HashMap::default())?);
// if let Some(projection) = &mut table.projection {
// projection.push(tags_index)
// }
// }

// let filter = LogicalPlan::Filter(Filter::try_new(
// filter_expr,
// Arc::new(LogicalPlan::TableScan(table)),
// )?);
// let plan = LogicalPlan::Projection(Projection::new_from_schema(
// Arc::new(filter),
// orignal_projection,
// ));

// return Ok(Some(plan));
// }

// // If we didn't find anything then recurse as normal and build the result.

// // TODO: replace `optimize_children()` since it will be removed
// // But it is not being used anywhere, so might as well just let it be for now
// optimize_children(self, plan, config)
// }

// fn name(&self) -> &str {
// "parseable_read_filter"
// }
// }

// impl FilterOptimizerRule {
// fn expr(&self) -> Option<Expr> {
// let mut patterns = self.literals.iter().map(|literal| {
// Expr::Column(Column::from_name(&self.column)).like(lit(format!("%{}%", literal)))
// });

// let mut filter_expr = patterns.next()?;
// for expr in patterns {
// filter_expr = or(filter_expr, expr)
// }

// Some(filter_expr)
// }

// fn contains_valid_tag_filter(&self, expr: &Expr) -> bool {
// match expr {
// Expr::Like(like) => {
// let matches_column = match &*like.expr {
// Expr::Column(column) => column.name == self.column,
// _ => return false,
// };

// let matches_pattern = match &*like.pattern {
// Expr::Literal(ScalarValue::Utf8(Some(literal))) => {
// let literal = literal.trim_matches('%');
// self.literals.iter().any(|x| x == literal)
// }
// _ => false,
// };

// matches_column && matches_pattern && !like.negated
// }
// _ => false,
// }
// }
// }
Loading

0 comments on commit e581c10

Please sign in to comment.