Skip to content

Commit

Permalink
Fix floor of nullable
Browse files Browse the repository at this point in the history
  • Loading branch information
cswinter committed Aug 3, 2024
1 parent de0228c commit 8524102
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 21 deletions.
1 change: 1 addition & 0 deletions src/engine/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ mod merge_keep;
mod merge_partitioned;
mod nonzero_compact;
mod nonzero_indices;
mod null_to_i64;
mod null_to_val;
mod null_to_vec;
mod null_vec;
Expand Down
49 changes: 49 additions & 0 deletions src/engine/operators/null_to_i64.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use crate::engine::*;

// Take a null count and expands it into a nullable vec of the same length with arbitrary type and all values set to null
#[derive(Debug)]
pub struct NullToI64 {
pub input: BufferRef<Any>,
pub output: BufferRef<i64>,

pub batch_size: usize,
}

impl<'a> VecOperator<'a> for NullToI64 {
fn execute(&mut self, _: bool, scratchpad: &mut Scratchpad<'a>) -> Result<(), QueryError> {
let len = scratchpad.get_any(self.input).len();
if self.batch_size > len {
let mut output = scratchpad.get_mut(self.output);
output.truncate(len);
}
Ok(())
}

fn init(&mut self, _: usize, batch_size: usize, scratchpad: &mut Scratchpad<'a>) {
self.batch_size = batch_size;
scratchpad.set(self.output, vec![I64_NULL; batch_size]);
}

fn inputs(&self) -> Vec<BufferRef<Any>> {
vec![self.input.any()]
}
fn inputs_mut(&mut self) -> Vec<&mut usize> {
vec![&mut self.input.i]
}
fn outputs(&self) -> Vec<BufferRef<Any>> {
vec![self.output.any()]
}
fn can_stream_input(&self, _: usize) -> bool {
true
}
fn can_stream_output(&self, _: usize) -> bool {
true
}
fn allocates(&self) -> bool {
true
}

fn display_op(&self, _: bool) -> String {
format!("{} expand as <i64>", self.input)
}
}
13 changes: 9 additions & 4 deletions src/engine/operators/vector_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ use super::merge_keep::*;
use super::merge_partitioned::MergePartitioned;
use super::nonzero_compact::{NonzeroCompact, NonzeroCompactNullable};
use super::nonzero_indices::{NonzeroIndices, NonzeroNonnullIndices};
use super::null_to_i64::NullToI64;
use super::null_to_val::NullToVal;
use super::null_to_vec::NullToVec;
use super::null_vec::NullVec;
Expand Down Expand Up @@ -1335,10 +1336,14 @@ pub mod operator {
}
}
} else if input.tag == EncodingType::Null {
reify_types! {
"null_to_vec";
output: NullablePrimitive;
Ok(Box::new(NullToVec { input: input.any(), output, batch_size: 0 }))
if output.tag == EncodingType::I64 {
Ok(Box::new(NullToI64 { input: input.any(), output: output.i64()?, batch_size: 0 }))
} else {
reify_types! {
"null_to_vec";
output: NullablePrimitive;
Ok(Box::new(NullToVec { input: input.any(), output, batch_size: 0 }))
}
}
} else if input.tag.is_constant() {
assert!(
Expand Down
13 changes: 13 additions & 0 deletions src/engine/planning/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,19 @@ fn propagate_nullability(operation: &QueryPlan, bp: &mut BufferProvider) -> Rewr
];
Rewrite::ReplaceWith(ops)
}
Floor { input, floor } => {
if floor.is_nullable() {
let floor_non_null = bp.named_buffer("floor_non_null", floor.tag.non_nullable());
let mut ops = vec![Floor {
input: input.forget_nullability(),
floor: floor_non_null,
}];
ops.extend(combine_nulls(bp, input, input, floor_non_null, floor));
Rewrite::ReplaceWith(ops)
} else {
Rewrite::None
}
}
And { lhs, rhs, and } if and.is_nullable() => {
let and_non_null = bp.named_buffer("and_non_null", and.tag.non_nullable());
let mut ops = vec![And {
Expand Down
20 changes: 10 additions & 10 deletions src/engine/planning/query_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ pub enum QueryPlan {
nullable: TypedBufferRef,
},
Floor {
input: BufferRef<of64>,
#[output]
floor: BufferRef<i64>,
input: TypedBufferRef,
#[output(t = "base=i64;null=input")]
floor: TypedBufferRef,
},
/// Converts NullableI64, NullableStr, or NullableF64 into a representation where nulls are encoded as part
/// of the data (i64 with i64::MAX representing null for NullableI64, Option<&str> for NullableStr, and special NaN value F64_NULL representing null for NullableF64).
Expand Down Expand Up @@ -799,7 +799,7 @@ impl Function2 {
Function2 {
factory: Box::new(|_, lhs, _| lhs),
input_type_signatures: vec![(BasicType::Null, t)],
type_out: Type::unencoded(t).mutable(),
type_out: Type::unencoded(BasicType::Null).mutable(),
encoding_invariance: false,
}
}
Expand All @@ -808,7 +808,7 @@ impl Function2 {
Function2 {
factory: Box::new(|_, _, rhs| rhs),
input_type_signatures: vec![(t, BasicType::Null)],
type_out: Type::unencoded(t).mutable(),
type_out: Type::unencoded(BasicType::Null).mutable(),
encoding_invariance: false,
}
}
Expand Down Expand Up @@ -1372,11 +1372,11 @@ impl QueryPlan {
Func1Type::Floor => {
let decoded = t.codec.decode(plan, planner);
match t.decoded {
BasicType::Integer => (decoded, t),
BasicType::Float => (planner.floor(decoded.f64()?).into(), t),
BasicType::Integer | BasicType::NullableInteger | BasicType::Null => (decoded, t),
BasicType::Float | BasicType::NullableFloat => (planner.floor(decoded), t),
_ => bail!(
QueryError::TypeError,
"Found floor({:?}), expected floor(float)",
"Found floor({:?}), expected floor(float|integer|null)",
&t
),
}
Expand Down Expand Up @@ -1551,7 +1551,7 @@ fn encoding_range(plan: &TypedBufferRef, qp: &QueryPlanner) -> Option<(i64, i64)
let max = p1.max(p2).max(p3).max(p4);
Some((min, max))
}
Floor { input, .. } => encoding_range(&input.into(), qp),
Floor { input, .. } => encoding_range(&input, qp),
ScalarI64 { value, .. } => Some((value, value)),
ScalarF64 { value, .. } => Some((value.floor() as i64, value.ceil() as i64)),
ref plan => {
Expand Down Expand Up @@ -1904,7 +1904,7 @@ pub(super) fn prepare<'a>(
QueryPlan::GetNullMap { nullable, present } => {
operator::get_null_map(nullable.nullable_any()?, present)
}
QueryPlan::Floor { input, floor } => operator::floor(input, floor),
QueryPlan::Floor { input, floor } => operator::floor(input.f64()?, floor.i64()?),
QueryPlan::FuseNulls { nullable, fused } => operator::fuse_nulls(nullable, fused)?,
QueryPlan::FuseIntNulls {
offset,
Expand Down
27 changes: 20 additions & 7 deletions tests/query_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1900,13 +1900,14 @@ fn test_float_greater_than_int() {
);
}

// #[test]
// fn test_missing_count() {
// test_query_ec(
// "SELECT COUNT(0) AS count FROM _meta_tables WHERE \"name\" = 'geistesblitz_dashboard'",
// &[vec![Int(0)]],
// );
// }
#[ignore]
#[test]
fn test_missing_count() {
test_query_ec(
"SELECT COUNT(0) AS count FROM _meta_tables WHERE \"name\" = 'geistesblitz_dashboard'",
&[vec![Int(0)]],
);
}

#[test]
fn test_floor1() {
Expand Down Expand Up @@ -1938,6 +1939,18 @@ fn test_floor2() {
);
}

#[test]
fn test_floor3() {
test_query_ec(
"SELECT MIN(id), MAX(id), FLOOR(nullable_float * 1e-30) FROM default",
&[
vec![Int(2), Int(6), Int(0)],
vec![Int(9), Int(9), Int(1)],
vec![Int(0), Int(8), Null],
],
);
}

#[test]
fn test_negative_constant() {
test_query_ec(
Expand Down

0 comments on commit 8524102

Please sign in to comment.