Skip to content

Commit

Permalink
feat(WIP): render reduce
Browse files Browse the repository at this point in the history
  • Loading branch information
discord9 committed Aug 15, 2024
1 parent b90e947 commit f5beede
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 5 deletions.
73 changes: 70 additions & 3 deletions src/flow/src/compute/render/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::collections::{BTreeMap, BTreeSet};
use std::ops::Range;

use datatypes::data_type::ConcreteDataType;
Expand Down Expand Up @@ -89,7 +89,6 @@ impl<'referred, 'df> Context<'referred, 'df> {

// TODO(discord9): better way to schedule future run
let scheduler = self.compute_state.get_scheduler();
let scheduler_inner = scheduler.clone();

let (out_send_port, out_recv_port) =
self.df.make_edge::<_, Toff<Batch>>(Self::REDUCE_BATCH);
Expand All @@ -99,18 +98,74 @@ impl<'referred, 'df> Context<'referred, 'df> {
input.collection.into_inner(),
out_send_port,
move |_ctx, recv, send| {
let now = *(now.borrow());
let arrange = arrange_handler_inner.clone();
// mfp only need to passively receive updates from recvs
let src_data = recv
.take_inner()
.into_iter()
.flat_map(|v| v.into_iter())
.collect_vec();

let mut key_to_many_vals = BTreeMap::<Row, Batch>::new();
for batch in src_data {
let key_val_batches =
let (key_batch, val_batch) =
batch_split_by_key_val(&batch, &key_val_plan, &err_collector);
err_collector.run(|| {
ensure!(
key_batch.row_count() == val_batch.row_count(),
InternalSnafu {
reason: "Key and val batch should have the same row count"
}
);

for row_idx in 0..key_batch.row_count() {
let key_row = key_batch.get_row(row_idx).unwrap();
let val_row = val_batch.slice(row_idx, 1);
let val_batch = key_to_many_vals.entry(Row::new(key_row)).or_default();
val_batch.append_batch(val_row)?;
}

Ok(())
});

todo!()
}

// write lock the arrange for the rest of the function body
// to prevent wired race condition
let arrange = arrange.write();

for (key, val_batch) in key_to_many_vals {
err_collector.run(|| -> Result<(), _> {
let (accums, _, _) = arrange.get(now, &key).unwrap_or_default();
let accum_list = from_accum_values_to_live_accums(
accums.unpack(),
accum_plan.simple_aggrs.len(),
)?;

let mut accum_output = AccumOutput::new();
for AggrWithIndex {
expr,
input_idx,
output_idx,
} in accum_plan.simple_aggrs.iter()
{
let cur_old_accum = accum_list[*output_idx].clone();
let cur_input = val_batch.batch()[*input_idx].clone();

let (output, new_accum) =
expr.func.eval_batch(cur_old_accum, cur_input, None)?;

accum_output.insert_accum(*output_idx, new_accum);
accum_output.insert_output(*output_idx, output);
}

todo!("update arrange and output");
});
}

todo!("send output");
},
);

Expand Down Expand Up @@ -249,6 +304,18 @@ impl<'referred, 'df> Context<'referred, 'df> {
}
}

fn from_accum_values_to_live_accums(
accums: Vec<Value>,
len: usize,
) -> Result<Vec<Vec<Value>>, EvalError> {
let accum_ranges = from_val_to_slice_idx(accums.first().cloned(), len)?;
let mut accum_list = vec![];
for range in accum_ranges.iter() {
accum_list.push(accums[range.clone()].to_vec());
}
Ok(accum_list)
}

/// All arrange(aka state) used in reduce operator
pub struct ReduceArrange {
/// The output arrange of reduce operator
Expand Down
24 changes: 24 additions & 0 deletions src/flow/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod signature;

use arrow::array::BooleanArray;
use datatypes::prelude::{ConcreteDataType, DataType};
use datatypes::value::Value;
use datatypes::vectors::VectorRef;
pub(crate) use df_func::{DfScalarFunction, RawDfScalarFn};
pub(crate) use error::{EvalError, InvalidArgumentSnafu};
Expand All @@ -49,6 +50,12 @@ pub struct Batch {
diffs: Option<VectorRef>,
}

impl Default for Batch {
fn default() -> Self {
Self::empty()
}
}

impl Batch {
pub fn empty() -> Self {
Self {
Expand Down Expand Up @@ -77,6 +84,23 @@ impl Batch {
self.row_count
}

pub fn column_count(&self) -> usize {
self.batch.len()
}

pub fn get_row(&self, idx: usize) -> Result<Vec<Value>, EvalError> {
ensure!(
idx < self.row_count,
InvalidArgumentSnafu {
reason: format!(
"Expect row index to be less than {}, found {}",
self.row_count, idx
)
}
);
Ok(self.batch.iter().map(|v| v.get(idx)).collect_vec())
}

/// Slices the `Batch`, returning a new `Batch`.
///
/// # Panics
Expand Down
94 changes: 93 additions & 1 deletion src/flow/src/expr/relation/func.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@ use std::sync::OnceLock;

use datatypes::prelude::ConcreteDataType;
use datatypes::value::Value;
use datatypes::vectors::VectorRef;
use serde::{Deserialize, Serialize};
use smallvec::smallvec;
use snafu::{IntoError, OptionExt};
use snafu::{ensure, IntoError, OptionExt};
use strum::{EnumIter, IntoEnumIterator};

use crate::error::{DatafusionSnafu, Error, InvalidQuerySnafu};
use crate::expr::error::EvalError;
use crate::expr::relation::accum::{Accum, Accumulator};
use crate::expr::signature::{GenericFn, Signature};
use crate::expr::InvalidArgumentSnafu;
use crate::repr::Diff;

/// Aggregate functions that can be applied to a group of rows.
Expand Down Expand Up @@ -131,6 +133,96 @@ impl AggregateFunc {
let res = accum.eval(self)?;
Ok((res, accum.into_state()))
}

/// return output value and new accumulator state
pub fn eval_batch<A>(
&self,
accum: A,
vector: VectorRef,
diff: Option<VectorRef>,
) -> Result<(Value, Vec<Value>), EvalError>
where
A: IntoIterator<Item = Value>,
{
let mut accum = accum.into_iter().peekable();

let mut accum = if accum.peek().is_none() {
Accum::new_accum(self)?
} else {
Accum::try_from_iter(self, &mut accum)?
};

let vector_diff = VectorDiff::try_new(vector, diff)?;

accum.update_batch(self, vector_diff)?;

let res = accum.eval(self)?;
Ok((res, accum.into_state()))
}
}

struct VectorDiff {
vector: VectorRef,
diff: Option<VectorRef>,
}

impl VectorDiff {
fn len(&self) -> usize {
self.vector.len()
}

fn try_new(vector: VectorRef, diff: Option<VectorRef>) -> Result<Self, EvalError> {
ensure!(
diff.as_ref()
.map_or(true, |diff| diff.len() == vector.len()),
InvalidArgumentSnafu {
reason: "Length of vector and diff should be the same"
}
);
Ok(Self { vector, diff })
}
}

impl IntoIterator for VectorDiff {
type Item = (Value, Diff);
type IntoIter = VectorDiffIter;

fn into_iter(self) -> Self::IntoIter {
VectorDiffIter {
vector: self.vector,
diff: self.diff,
idx: 0,
}
}
}

struct VectorDiffIter {
vector: VectorRef,
diff: Option<VectorRef>,
idx: usize,
}

impl std::iter::Iterator for VectorDiffIter {
type Item = (Value, Diff);

fn next(&mut self) -> Option<Self::Item> {
if self.idx >= self.vector.len() {
return None;
}
let value = self.vector.get(self.idx);
let diff = if let Some(diff) = self.diff.as_ref() {
if let Ok(diff_at) = diff.get(self.idx).try_into() {
diff_at
} else {
return None;
}
} else {
1
};

self.idx += 1;
Some((value, diff))
}
}

/// Generate signature for each aggregate function
Expand Down
2 changes: 1 addition & 1 deletion src/flow/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ pub type ArrangeReader<'a> = tokio::sync::RwLockReadGuard<'a, Arrangement>;
pub type ArrangeWriter<'a> = tokio::sync::RwLockWriteGuard<'a, Arrangement>;

/// A handler to the inner Arrangement, can be cloned and shared, useful for query it's inner state
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct ArrangeHandler {
inner: Arc<RwLock<Arrangement>>,
}
Expand Down

0 comments on commit f5beede

Please sign in to comment.