Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: update object store crate #870

Merged
merged 4 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
353 changes: 219 additions & 134 deletions Cargo.lock

Large diffs are not rendered by default.

20 changes: 10 additions & 10 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ build = "build.rs"
[dependencies]
### apache arrow/datafusion dependencies
# arrow = "51.0.0"
arrow-schema = { version = "51.0.0", features = ["serde"] }
arrow-array = { version = "51.0.0" }
arrow-json = "51.0.0"
arrow-ipc = { version = "51.0.0", features = ["zstd"] }
arrow-select = "51.0.0"
datafusion = "37.1.0"
object_store = { version = "0.9.1", features = ["cloud", "aws"] } # cannot update object_store as datafusion has not caught up
parquet = "51.0.0"
arrow-flight = { version = "51.0.0", features = [ "tls" ] }
arrow-schema = { version = "52.1.0", features = ["serde"] }
arrow-array = { version = "52.1.0" }
arrow-json = "52.1.0"
arrow-ipc = { version = "52.1.0", features = ["zstd"] }
arrow-select = "52.1.0"
datafusion = { git = "https://github.com/apache/datafusion.git", rev = "a64df83502821f18067fb4ff65dd217815b305c9" }
object_store = { version = "0.10.2", features = ["cloud", "aws"] } # cannot update object_store as datafusion has not caught up
parquet = "52.1.0"
arrow-flight = { version = "52.1.0", features = [ "tls" ] }
tonic = {version = "0.11.0", features = ["tls", "transport", "gzip", "zstd"] }
tonic-web = "0.11.0"
tower-http = { version = "0.4.4", features = ["cors"] }
Expand Down Expand Up @@ -62,7 +62,7 @@ hex = "0.4"
hostname = "0.4.0"
http = "0.2.7"
humantime-serde = "1.1"
itertools = "0.12.1"
itertools = "0.13.0"
log = "0.4"
num_cpus = "1.15"
once_cell = "1.17.1"
Expand Down
2 changes: 1 addition & 1 deletion server/src/handlers/http/llm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ pub async fn make_llm_request(body: web::Json<AiPrompt>) -> Result<HttpResponse,
let stream_name = &body.stream;
let schema = STREAM_INFO.schema(stream_name)?;
let filtered_schema = schema
.all_fields()
.flattened_fields()
.into_iter()
.map(Field::from)
.collect_vec();
Expand Down
1 change: 1 addition & 0 deletions server/src/hottier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ impl HotTierManager {
Ok(file_processed)
}

#[allow(dead_code)]
///delete the files for the date range given from the hot tier directory for the stream
/// update the used and available size in the hot tier metadata
pub async fn delete_files_from_hot_tier(
Expand Down
10 changes: 5 additions & 5 deletions server/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ impl TableScanVisitor {
}
}

impl TreeNodeVisitor for TableScanVisitor {
impl TreeNodeVisitor<'_> for TableScanVisitor {
type Node = LogicalPlan;

fn f_down(&mut self, node: &Self::Node) -> Result<TreeNodeRecursion, DataFusionError> {
Expand Down Expand Up @@ -232,27 +232,27 @@ fn transform(
_start_time_filter =
PartialTimeFilter::Low(std::ops::Bound::Included(start_time))
.binary_expr(Expr::Column(Column::new(
Some(table.table_name.to_owned_reference()),
Some(table.table_name.to_owned()),
time_partition.clone(),
)));
_end_time_filter =
PartialTimeFilter::High(std::ops::Bound::Excluded(end_time))
.binary_expr(Expr::Column(Column::new(
Some(table.table_name.to_owned_reference()),
Some(table.table_name.to_owned()),
time_partition,
)));
}
None => {
_start_time_filter =
PartialTimeFilter::Low(std::ops::Bound::Included(start_time))
.binary_expr(Expr::Column(Column::new(
Some(table.table_name.to_owned_reference()),
Some(table.table_name.to_owned()),
event::DEFAULT_TIMESTAMP_KEY,
)));
_end_time_filter =
PartialTimeFilter::High(std::ops::Bound::Excluded(end_time))
.binary_expr(Expr::Column(Column::new(
Some(table.table_name.to_owned_reference()),
Some(table.table_name.to_owned()),
event::DEFAULT_TIMESTAMP_KEY,
)));
}
Expand Down
307 changes: 158 additions & 149 deletions server/src/query/filter_optimizer.rs
Original file line number Diff line number Diff line change
@@ -1,149 +1,158 @@
/*
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

use std::{collections::HashMap, sync::Arc};

use datafusion::{
common::{DFField, DFSchema},
logical_expr::{Filter, LogicalPlan, Projection},
optimizer::{optimize_children, OptimizerRule},
prelude::{lit, or, Column, Expr},
scalar::ScalarValue,
};

/// Rewrites logical plan for source using projection and filter
pub struct FilterOptimizerRule {
pub column: String,
pub literals: Vec<String>,
}

// Try to add filter node on table scan
// As every table supports projection push down
// we try to directly add projection for column directly to table
// To preserve the orignal projection we must add a projection node with orignal projection
impl OptimizerRule for FilterOptimizerRule {
fn try_optimize(
&self,
plan: &datafusion::logical_expr::LogicalPlan,
config: &dyn datafusion::optimizer::OptimizerConfig,
) -> datafusion::error::Result<Option<datafusion::logical_expr::LogicalPlan>> {
// if there are no patterns then the rule cannot be performed
let Some(filter_expr) = self.expr() else {
return Ok(None);
};

if let LogicalPlan::Filter(filter) = plan {
if filter.predicate == filter_expr {
return Ok(None);
}
}

if let LogicalPlan::TableScan(table) = plan {
if table.projection.is_none()
|| table
.filters
.iter()
.any(|expr| self.contains_valid_tag_filter(expr))
{
return Ok(None);
}

let mut table = table.clone();
let schema = &table.source.schema();
let orignal_projection = table.projected_schema.clone();

// add filtered column projection to table
if !table
.projected_schema
.has_column_with_unqualified_name(&self.column)
{
let tags_index = schema.index_of(&self.column)?;
let tags_field = schema.field(tags_index);
// modify source table projection to include tags
let mut df_schema = table.projected_schema.fields().clone();
df_schema.push(DFField::new(
Some(table.table_name.clone()),
tags_field.name(),
tags_field.data_type().clone(),
tags_field.is_nullable(),
));

table.projected_schema =
Arc::new(DFSchema::new_with_metadata(df_schema, HashMap::default())?);
if let Some(projection) = &mut table.projection {
projection.push(tags_index)
}
}

let filter = LogicalPlan::Filter(Filter::try_new(
filter_expr,
Arc::new(LogicalPlan::TableScan(table)),
)?);
let plan = LogicalPlan::Projection(Projection::new_from_schema(
Arc::new(filter),
orignal_projection,
));

return Ok(Some(plan));
}

// If we didn't find anything then recurse as normal and build the result.
optimize_children(self, plan, config)
}

fn name(&self) -> &str {
"parseable_read_filter"
}
}

impl FilterOptimizerRule {
fn expr(&self) -> Option<Expr> {
let mut patterns = self.literals.iter().map(|literal| {
Expr::Column(Column::from_name(&self.column)).like(lit(format!("%{}%", literal)))
});

let mut filter_expr = patterns.next()?;
for expr in patterns {
filter_expr = or(filter_expr, expr)
}

Some(filter_expr)
}

fn contains_valid_tag_filter(&self, expr: &Expr) -> bool {
match expr {
Expr::Like(like) => {
let matches_column = match &*like.expr {
Expr::Column(column) => column.name == self.column,
_ => return false,
};

let matches_pattern = match &*like.pattern {
Expr::Literal(ScalarValue::Utf8(Some(literal))) => {
let literal = literal.trim_matches('%');
self.literals.iter().any(|x| x == literal)
}
_ => false,
};

matches_column && matches_pattern && !like.negated
}
_ => false,
}
}
}
// /*
// * Parseable Server (C) 2022 - 2024 Parseable, Inc.
// *
// * This program is free software: you can redistribute it and/or modify
// * it under the terms of the GNU Affero General Public License as
// * published by the Free Software Foundation, either version 3 of the
// * License, or (at your option) any later version.
// *
// * This program is distributed in the hope that it will be useful,
// * but WITHOUT ANY WARRANTY; without even the implied warranty of
// * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// * GNU Affero General Public License for more details.
// *
// * You should have received a copy of the GNU Affero General Public License
// * along with this program. If not, see <http://www.gnu.org/licenses/>.
// *
// */
// use std::{collections::HashMap, sync::Arc};

// use arrow_schema::Field;
// use datafusion::{
// common::DFSchema,
// logical_expr::{Filter, LogicalPlan, Projection},
// optimizer::{optimize_children, OptimizerRule},
// prelude::{lit, or, Column, Expr},
// scalar::ScalarValue,
// };

// /// Rewrites logical plan for source using projection and filter
// pub struct FilterOptimizerRule {
// pub column: String,
// pub literals: Vec<String>,
// }

// // Try to add filter node on table scan
// // As every table supports projection push down
// // we try to directly add projection for column directly to table
// // To preserve the orignal projection we must add a projection node with orignal projection
// impl OptimizerRule for FilterOptimizerRule {
// fn try_optimize(
// &self,
// plan: &datafusion::logical_expr::LogicalPlan,
// config: &dyn datafusion::optimizer::OptimizerConfig,
// ) -> datafusion::error::Result<Option<datafusion::logical_expr::LogicalPlan>> {
// // if there are no patterns then the rule cannot be performed
// let Some(filter_expr) = self.expr() else {
// return Ok(None);
// };

// if let LogicalPlan::Filter(filter) = plan {
// if filter.predicate == filter_expr {
// return Ok(None);
// }
// }

// if let LogicalPlan::TableScan(table) = plan {
// if table.projection.is_none()
// || table
// .filters
// .iter()
// .any(|expr| self.contains_valid_tag_filter(expr))
// {
// return Ok(None);
// }

// let mut table = table.clone();
// let schema = &table.source.schema();
// let orignal_projection = table.projected_schema.clone();

// // add filtered column projection to table
// if !table
// .projected_schema
// .has_column_with_unqualified_name(&self.column)
// {
// let tags_index = schema.index_of(&self.column)?;
// let tags_field = schema.field(tags_index);
// // modify source table projection to include tags
// let df_schema = table.projected_schema.fields().clone();

// // from datafusion 37.1.0 -> 40.0.0
// // `DFField` has been removed
// // `DFSchema.new_with_metadata()` has changed
// // it requires `qualified_fields`(`Vec<(Option<TableReference>, Arc<Field>)>`) instead of `fields`
// // hence, use `DFSchema::from_unqualified_fields()` for relatively unchanged code

// df_schema.to_vec().push(Arc::new(Field::new(
// tags_field.name(),
// tags_field.data_type().clone(),
// tags_field.is_nullable(),
// )));

// table.projected_schema =
// Arc::new(DFSchema::from_unqualified_fields(df_schema, HashMap::default())?);
// if let Some(projection) = &mut table.projection {
// projection.push(tags_index)
// }
// }

// let filter = LogicalPlan::Filter(Filter::try_new(
// filter_expr,
// Arc::new(LogicalPlan::TableScan(table)),
// )?);
// let plan = LogicalPlan::Projection(Projection::new_from_schema(
// Arc::new(filter),
// orignal_projection,
// ));

// return Ok(Some(plan));
// }

// // If we didn't find anything then recurse as normal and build the result.

// // TODO: replace `optimize_children()` since it will be removed
// // But it is not being used anywhere, so might as well just let it be for now
// optimize_children(self, plan, config)
// }

// fn name(&self) -> &str {
// "parseable_read_filter"
// }
// }

// impl FilterOptimizerRule {
// fn expr(&self) -> Option<Expr> {
// let mut patterns = self.literals.iter().map(|literal| {
// Expr::Column(Column::from_name(&self.column)).like(lit(format!("%{}%", literal)))
// });

// let mut filter_expr = patterns.next()?;
// for expr in patterns {
// filter_expr = or(filter_expr, expr)
// }

// Some(filter_expr)
// }

// fn contains_valid_tag_filter(&self, expr: &Expr) -> bool {
// match expr {
// Expr::Like(like) => {
// let matches_column = match &*like.expr {
// Expr::Column(column) => column.name == self.column,
// _ => return false,
// };

// let matches_pattern = match &*like.pattern {
// Expr::Literal(ScalarValue::Utf8(Some(literal))) => {
// let literal = literal.trim_matches('%');
// self.literals.iter().any(|x| x == literal)
// }
// _ => false,
// };

// matches_column && matches_pattern && !like.negated
// }
// _ => false,
// }
// }
// }
Loading
Loading