From f5beededc280d35fbf683de7b04c1ba371447fc8 Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 15 Aug 2024 20:06:49 +0800 Subject: [PATCH] feat(WIP): render reduce --- src/flow/src/compute/render/reduce.rs | 73 ++++++++++++++++++++- src/flow/src/expr.rs | 24 +++++++ src/flow/src/expr/relation/func.rs | 94 ++++++++++++++++++++++++++- src/flow/src/utils.rs | 2 +- 4 files changed, 188 insertions(+), 5 deletions(-) diff --git a/src/flow/src/compute/render/reduce.rs b/src/flow/src/compute/render/reduce.rs index 884a1e2e4cfd..a4b36a5e95e7 100644 --- a/src/flow/src/compute/render/reduce.rs +++ b/src/flow/src/compute/render/reduce.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; +use std::collections::{BTreeMap, BTreeSet}; use std::ops::Range; use datatypes::data_type::ConcreteDataType; @@ -89,7 +89,6 @@ impl<'referred, 'df> Context<'referred, 'df> { // TODO(discord9): better way to schedule future run let scheduler = self.compute_state.get_scheduler(); - let scheduler_inner = scheduler.clone(); let (out_send_port, out_recv_port) = self.df.make_edge::<_, Toff>(Self::REDUCE_BATCH); @@ -99,6 +98,8 @@ impl<'referred, 'df> Context<'referred, 'df> { input.collection.into_inner(), out_send_port, move |_ctx, recv, send| { + let now = *(now.borrow()); + let arrange = arrange_handler_inner.clone(); // mfp only need to passively receive updates from recvs let src_data = recv .take_inner() @@ -106,11 +107,65 @@ impl<'referred, 'df> Context<'referred, 'df> { .flat_map(|v| v.into_iter()) .collect_vec(); + let mut key_to_many_vals = BTreeMap::::new(); for batch in src_data { - let key_val_batches = + let (key_batch, val_batch) = batch_split_by_key_val(&batch, &key_val_plan, &err_collector); + err_collector.run(|| { + ensure!( + key_batch.row_count() == val_batch.row_count(), + InternalSnafu { + reason: "Key and val batch should have the same row count" + } + ); + + for row_idx in 0..key_batch.row_count() { + let key_row = key_batch.get_row(row_idx).unwrap(); + let val_row = val_batch.slice(row_idx, 1); + let val_batch = key_to_many_vals.entry(Row::new(key_row)).or_default(); + val_batch.append_batch(val_row)?; + } + + Ok(()) + }); + todo!() } + + // write lock the arrange for the rest of the function body + // to prevent wired race condition + let arrange = arrange.write(); + + for (key, val_batch) in key_to_many_vals { + err_collector.run(|| -> Result<(), _> { + let (accums, _, _) = arrange.get(now, &key).unwrap_or_default(); + let accum_list = from_accum_values_to_live_accums( + accums.unpack(), + accum_plan.simple_aggrs.len(), + )?; + + let mut accum_output = AccumOutput::new(); + for AggrWithIndex { + expr, + input_idx, + output_idx, + } in accum_plan.simple_aggrs.iter() + { + let cur_old_accum = accum_list[*output_idx].clone(); + let cur_input = val_batch.batch()[*input_idx].clone(); + + let (output, new_accum) = + expr.func.eval_batch(cur_old_accum, cur_input, None)?; + + accum_output.insert_accum(*output_idx, new_accum); + accum_output.insert_output(*output_idx, output); + } + + todo!("update arrange and output"); + }); + } + + todo!("send output"); }, ); @@ -249,6 +304,18 @@ impl<'referred, 'df> Context<'referred, 'df> { } } +fn from_accum_values_to_live_accums( + accums: Vec, + len: usize, +) -> Result>, EvalError> { + let accum_ranges = from_val_to_slice_idx(accums.first().cloned(), len)?; + let mut accum_list = vec![]; + for range in accum_ranges.iter() { + accum_list.push(accums[range.clone()].to_vec()); + } + Ok(accum_list) +} + /// All arrange(aka state) used in reduce operator pub struct ReduceArrange { /// The output arrange of reduce operator diff --git a/src/flow/src/expr.rs b/src/flow/src/expr.rs index 450bbffa8267..792a6d2b194d 100644 --- a/src/flow/src/expr.rs +++ b/src/flow/src/expr.rs @@ -25,6 +25,7 @@ mod signature; use arrow::array::BooleanArray; use datatypes::prelude::{ConcreteDataType, DataType}; +use datatypes::value::Value; use datatypes::vectors::VectorRef; pub(crate) use df_func::{DfScalarFunction, RawDfScalarFn}; pub(crate) use error::{EvalError, InvalidArgumentSnafu}; @@ -49,6 +50,12 @@ pub struct Batch { diffs: Option, } +impl Default for Batch { + fn default() -> Self { + Self::empty() + } +} + impl Batch { pub fn empty() -> Self { Self { @@ -77,6 +84,23 @@ impl Batch { self.row_count } + pub fn column_count(&self) -> usize { + self.batch.len() + } + + pub fn get_row(&self, idx: usize) -> Result, EvalError> { + ensure!( + idx < self.row_count, + InvalidArgumentSnafu { + reason: format!( + "Expect row index to be less than {}, found {}", + self.row_count, idx + ) + } + ); + Ok(self.batch.iter().map(|v| v.get(idx)).collect_vec()) + } + /// Slices the `Batch`, returning a new `Batch`. /// /// # Panics diff --git a/src/flow/src/expr/relation/func.rs b/src/flow/src/expr/relation/func.rs index 868d83b43f02..a7f1d6722643 100644 --- a/src/flow/src/expr/relation/func.rs +++ b/src/flow/src/expr/relation/func.rs @@ -18,15 +18,17 @@ use std::sync::OnceLock; use datatypes::prelude::ConcreteDataType; use datatypes::value::Value; +use datatypes::vectors::VectorRef; use serde::{Deserialize, Serialize}; use smallvec::smallvec; -use snafu::{IntoError, OptionExt}; +use snafu::{ensure, IntoError, OptionExt}; use strum::{EnumIter, IntoEnumIterator}; use crate::error::{DatafusionSnafu, Error, InvalidQuerySnafu}; use crate::expr::error::EvalError; use crate::expr::relation::accum::{Accum, Accumulator}; use crate::expr::signature::{GenericFn, Signature}; +use crate::expr::InvalidArgumentSnafu; use crate::repr::Diff; /// Aggregate functions that can be applied to a group of rows. @@ -131,6 +133,96 @@ impl AggregateFunc { let res = accum.eval(self)?; Ok((res, accum.into_state())) } + + /// return output value and new accumulator state + pub fn eval_batch( + &self, + accum: A, + vector: VectorRef, + diff: Option, + ) -> Result<(Value, Vec), EvalError> + where + A: IntoIterator, + { + let mut accum = accum.into_iter().peekable(); + + let mut accum = if accum.peek().is_none() { + Accum::new_accum(self)? + } else { + Accum::try_from_iter(self, &mut accum)? + }; + + let vector_diff = VectorDiff::try_new(vector, diff)?; + + accum.update_batch(self, vector_diff)?; + + let res = accum.eval(self)?; + Ok((res, accum.into_state())) + } +} + +struct VectorDiff { + vector: VectorRef, + diff: Option, +} + +impl VectorDiff { + fn len(&self) -> usize { + self.vector.len() + } + + fn try_new(vector: VectorRef, diff: Option) -> Result { + ensure!( + diff.as_ref() + .map_or(true, |diff| diff.len() == vector.len()), + InvalidArgumentSnafu { + reason: "Length of vector and diff should be the same" + } + ); + Ok(Self { vector, diff }) + } +} + +impl IntoIterator for VectorDiff { + type Item = (Value, Diff); + type IntoIter = VectorDiffIter; + + fn into_iter(self) -> Self::IntoIter { + VectorDiffIter { + vector: self.vector, + diff: self.diff, + idx: 0, + } + } +} + +struct VectorDiffIter { + vector: VectorRef, + diff: Option, + idx: usize, +} + +impl std::iter::Iterator for VectorDiffIter { + type Item = (Value, Diff); + + fn next(&mut self) -> Option { + if self.idx >= self.vector.len() { + return None; + } + let value = self.vector.get(self.idx); + let diff = if let Some(diff) = self.diff.as_ref() { + if let Ok(diff_at) = diff.get(self.idx).try_into() { + diff_at + } else { + return None; + } + } else { + 1 + }; + + self.idx += 1; + Some((value, diff)) + } } /// Generate signature for each aggregate function diff --git a/src/flow/src/utils.rs b/src/flow/src/utils.rs index 778fde49c9a3..269c53fa84aa 100644 --- a/src/flow/src/utils.rs +++ b/src/flow/src/utils.rs @@ -513,7 +513,7 @@ pub type ArrangeReader<'a> = tokio::sync::RwLockReadGuard<'a, Arrangement>; pub type ArrangeWriter<'a> = tokio::sync::RwLockWriteGuard<'a, Arrangement>; /// A handler to the inner Arrangement, can be cloned and shared, useful for query it's inner state -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ArrangeHandler { inner: Arc>, }