Skip to content

Commit

Permalink
perf(flow): use batch mode for flow (#4599)
Browse files Browse the repository at this point in the history
* generic bundle trait

* feat: impl get/let

* fix: drop batch

* test: tumble batch

* feat: use batch eval flow

* fix: div use arrow::div not mul

* perf: not append batch

* perf: use bool mask for reduce

* perf: tiny opt

* perf: refactor slow path

* feat: opt if then

* fix: WIP

* perf: if then

* chore: use trace instead

* fix: reduce missing non-first batch

* perf: flow if then using interleave

* docs: add TODO

* perf: remove unnecessary eq

* chore: remove unused import

* fix: run_available no longer loop forever

* feat: blocking on high input buf

* chore: increase threhold

* chore: after rebase

* chore: per review

* chore: per review

* fix: allow empty values in reduce&test

* tests: more flow doc example tests

* chore: per review

* chore: per review
  • Loading branch information
discord9 committed Sep 11, 2024
1 parent f252599 commit a3d567f
Show file tree
Hide file tree
Showing 20 changed files with 2,062 additions and 781 deletions.
350 changes: 184 additions & 166 deletions src/flow/src/adapter.rs

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions src/flow/src/adapter/flownode_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use api::v1::region::InsertRequests;
use common_error::ext::BoxedError;
use common_meta::error::{ExternalSnafu, Result, UnexpectedSnafu};
use common_meta::node_manager::Flownode;
use common_telemetry::debug;
use common_telemetry::{debug, trace};
use itertools::Itertools;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
Expand Down Expand Up @@ -189,7 +189,7 @@ impl Flownode for FlowWorkerManager {
})
.try_collect()?;
if !fetch_order.iter().enumerate().all(|(i, &v)| i == v) {
debug!("Reordering columns: {:?}", fetch_order)
trace!("Reordering columns: {:?}", fetch_order)
}
fetch_order
};
Expand Down
97 changes: 45 additions & 52 deletions src/flow/src/adapter/node_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
//! Node context, prone to change with every incoming requests

use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use common_telemetry::debug;
use common_telemetry::trace;
use session::context::QueryContext;
use snafu::{OptionExt, ResultExt};
use table::metadata::TableId;
Expand All @@ -27,9 +27,9 @@ use tokio::sync::{broadcast, mpsc, RwLock};
use crate::adapter::{FlowId, TableName, TableSource};
use crate::error::{Error, EvalSnafu, TableNotFoundSnafu};
use crate::expr::error::InternalSnafu;
use crate::expr::GlobalId;
use crate::expr::{Batch, GlobalId};
use crate::metrics::METRIC_FLOW_INPUT_BUF_SIZE;
use crate::repr::{DiffRow, RelationDesc, BROADCAST_CAP, SEND_BUF_CAP};
use crate::repr::{DiffRow, RelationDesc, BATCH_SIZE, BROADCAST_CAP, SEND_BUF_CAP};

/// A context that holds the information of the dataflow
#[derive(Default, Debug)]
Expand All @@ -47,39 +47,36 @@ pub struct FlownodeContext {
///
/// and send it back to the client, since we are mocking the sink table as a client, we should use table name as the key
/// note that the sink receiver should only have one, and we are using broadcast as mpsc channel here
pub sink_receiver: BTreeMap<
TableName,
(
mpsc::UnboundedSender<DiffRow>,
mpsc::UnboundedReceiver<DiffRow>,
),
>,
pub sink_receiver:
BTreeMap<TableName, (mpsc::UnboundedSender<Batch>, mpsc::UnboundedReceiver<Batch>)>,
/// the schema of the table, query from metasrv or inferred from TypedPlan
pub schema: HashMap<GlobalId, RelationDesc>,
/// All the tables that have been registered in the worker
pub table_repr: IdToNameMap,
pub query_context: Option<Arc<QueryContext>>,
}

/// a simple broadcast sender with backpressure and unbound capacity
/// a simple broadcast sender with backpressure, bounded capacity and blocking on send when send buf is full
/// note that it wouldn't evict old data, so it's possible to block forever if the receiver is slow
///
/// receiver still use tokio broadcast channel, since only sender side need to know
/// backpressure and adjust dataflow running duration to avoid blocking
#[derive(Debug)]
pub struct SourceSender {
// TODO(discord9): make it all Vec<DiffRow>?
sender: broadcast::Sender<DiffRow>,
send_buf_tx: mpsc::Sender<Vec<DiffRow>>,
send_buf_rx: RwLock<mpsc::Receiver<Vec<DiffRow>>>,
sender: broadcast::Sender<Batch>,
send_buf_tx: mpsc::Sender<Batch>,
send_buf_rx: RwLock<mpsc::Receiver<Batch>>,
send_buf_row_cnt: AtomicUsize,
}

impl Default for SourceSender {
fn default() -> Self {
// TODO(discord9): the capacity is arbitrary, we can adjust it later, might also want to limit the max number of rows in send buf
let (send_buf_tx, send_buf_rx) = mpsc::channel(SEND_BUF_CAP);
Self {
// TODO(discord9): found a better way then increase this to prevent lagging and hence missing input data
sender: broadcast::Sender::new(BROADCAST_CAP * 2),
sender: broadcast::Sender::new(SEND_BUF_CAP),
send_buf_tx,
send_buf_rx: RwLock::new(send_buf_rx),
send_buf_row_cnt: AtomicUsize::new(0),
Expand All @@ -90,7 +87,7 @@ impl Default for SourceSender {
impl SourceSender {
/// max number of iterations to try flush send buf
const MAX_ITERATIONS: usize = 16;
pub fn get_receiver(&self) -> broadcast::Receiver<DiffRow> {
pub fn get_receiver(&self) -> broadcast::Receiver<Batch> {
self.sender.subscribe()
}

Expand All @@ -106,30 +103,27 @@ impl SourceSender {
break;
}
// TODO(discord9): send rows instead so it's just moving a point
if let Some(rows) = send_buf.recv().await {
let len = rows.len();
self.send_buf_row_cnt
.fetch_sub(len, std::sync::atomic::Ordering::SeqCst);
for row in rows {
self.sender
.send(row)
.map_err(|err| {
InternalSnafu {
reason: format!("Failed to send row, error = {:?}", err),
}
.build()
})
.with_context(|_| EvalSnafu)?;
row_cnt += 1;
}
if let Some(batch) = send_buf.recv().await {
let len = batch.row_count();
self.send_buf_row_cnt.fetch_sub(len, Ordering::SeqCst);
row_cnt += len;
self.sender
.send(batch)
.map_err(|err| {
InternalSnafu {
reason: format!("Failed to send row, error = {:?}", err),
}
.build()
})
.with_context(|_| EvalSnafu)?;
}
}
if row_cnt > 0 {
debug!("Send {} rows", row_cnt);
trace!("Source Flushed {} rows", row_cnt);
METRIC_FLOW_INPUT_BUF_SIZE.sub(row_cnt as _);
debug!(
"Remaining Send buf.len() = {}",
self.send_buf_rx.read().await.len()
trace!(
"Remaining Source Send buf.len() = {}",
METRIC_FLOW_INPUT_BUF_SIZE.get()
);
}

Expand All @@ -138,12 +132,23 @@ impl SourceSender {

/// return number of rows it actual send(including what's in the buffer)
pub async fn send_rows(&self, rows: Vec<DiffRow>) -> Result<usize, Error> {
self.send_buf_tx.send(rows).await.map_err(|e| {
METRIC_FLOW_INPUT_BUF_SIZE.add(rows.len() as _);
while self.send_buf_row_cnt.load(Ordering::SeqCst) >= BATCH_SIZE * 4 {
tokio::task::yield_now().await;
}
// row count metrics is approx so relaxed order is ok
self.send_buf_row_cnt
.fetch_add(rows.len(), Ordering::SeqCst);
let batch = Batch::try_from_rows(rows.into_iter().map(|(row, _, _)| row).collect())
.context(EvalSnafu)?;
common_telemetry::trace!("Send one batch to worker with {} rows", batch.row_count());
self.send_buf_tx.send(batch).await.map_err(|e| {
crate::error::InternalSnafu {
reason: format!("Failed to send row, error = {:?}", e),
}
.build()
})?;

Ok(0)
}
}
Expand All @@ -159,8 +164,6 @@ impl FlownodeContext {
.with_context(|| TableNotFoundSnafu {
name: table_id.to_string(),
})?;

debug!("FlownodeContext::send: trying to send {} rows", rows.len());
sender.send_rows(rows).await
}

Expand All @@ -174,16 +177,6 @@ impl FlownodeContext {
}
Ok(sum)
}

/// Return the sum number of rows in all send buf
/// TODO(discord9): remove this since we can't get correct row cnt anyway
pub async fn get_send_buf_size(&self) -> usize {
let mut sum = 0;
for sender in self.source_sender.values() {
sum += sender.send_buf_rx.read().await.len();
}
sum
}
}

impl FlownodeContext {
Expand Down Expand Up @@ -230,7 +223,7 @@ impl FlownodeContext {
pub fn add_sink_receiver(&mut self, table_name: TableName) {
self.sink_receiver
.entry(table_name)
.or_insert_with(mpsc::unbounded_channel::<DiffRow>);
.or_insert_with(mpsc::unbounded_channel);
}

pub fn get_source_by_global_id(&self, id: &GlobalId) -> Result<&SourceSender, Error> {
Expand All @@ -254,7 +247,7 @@ impl FlownodeContext {
pub fn get_sink_by_global_id(
&self,
id: &GlobalId,
) -> Result<mpsc::UnboundedSender<DiffRow>, Error> {
) -> Result<mpsc::UnboundedSender<Batch>, Error> {
let table_name = self
.table_repr
.get_by_global_id(id)
Expand Down
34 changes: 18 additions & 16 deletions src/flow/src/adapter/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
use crate::adapter::FlowId;
use crate::compute::{Context, DataflowState, ErrCollector};
use crate::error::{Error, FlowAlreadyExistSnafu, InternalSnafu, UnexpectedSnafu};
use crate::expr::GlobalId;
use crate::expr::{Batch, GlobalId};
use crate::plan::TypedPlan;
use crate::repr::{self, DiffRow};

Expand Down Expand Up @@ -89,6 +89,8 @@ impl<'subgraph> ActiveDataflowState<'subgraph> {
err_collector: self.err_collector.clone(),
input_collection: Default::default(),
local_scope: Default::default(),
input_collection_batch: Default::default(),
local_scope_batch: Default::default(),
}
}

Expand Down Expand Up @@ -156,13 +158,13 @@ impl WorkerHandle {
///
/// the returned error is unrecoverable, and the worker should be shutdown/rebooted
pub async fn run_available(&self, now: repr::Timestamp, blocking: bool) -> Result<(), Error> {
common_telemetry::debug!("Running available with blocking={}", blocking);
common_telemetry::trace!("Running available with blocking={}", blocking);
if blocking {
let resp = self
.itc_client
.call_with_resp(Request::RunAvail { now, blocking })
.await?;
common_telemetry::debug!("Running available with response={:?}", resp);
common_telemetry::trace!("Running available with response={:?}", resp);
Ok(())
} else {
self.itc_client
Expand Down Expand Up @@ -225,9 +227,9 @@ impl<'s> Worker<'s> {
flow_id: FlowId,
plan: TypedPlan,
sink_id: GlobalId,
sink_sender: mpsc::UnboundedSender<DiffRow>,
sink_sender: mpsc::UnboundedSender<Batch>,
source_ids: &[GlobalId],
src_recvs: Vec<broadcast::Receiver<DiffRow>>,
src_recvs: Vec<broadcast::Receiver<Batch>>,
// TODO(discord9): set expire duration for all arrangement and compare to sys timestamp instead
expire_after: Option<repr::Duration>,
create_if_not_exists: bool,
Expand All @@ -249,12 +251,12 @@ impl<'s> Worker<'s> {
{
let mut ctx = cur_task_state.new_ctx(sink_id);
for (source_id, src_recv) in source_ids.iter().zip(src_recvs) {
let bundle = ctx.render_source(src_recv)?;
ctx.insert_global(*source_id, bundle);
let bundle = ctx.render_source_batch(src_recv)?;
ctx.insert_global_batch(*source_id, bundle);
}

let rendered = ctx.render_plan(plan)?;
ctx.render_unbounded_sink(rendered, sink_sender);
let rendered = ctx.render_plan_batch(plan)?;
ctx.render_unbounded_sink_batch(rendered, sink_sender);
}
self.task_states.insert(flow_id, cur_task_state);
Ok(Some(flow_id))
Expand Down Expand Up @@ -370,9 +372,9 @@ pub enum Request {
flow_id: FlowId,
plan: TypedPlan,
sink_id: GlobalId,
sink_sender: mpsc::UnboundedSender<DiffRow>,
sink_sender: mpsc::UnboundedSender<Batch>,
source_ids: Vec<GlobalId>,
src_recvs: Vec<broadcast::Receiver<DiffRow>>,
src_recvs: Vec<broadcast::Receiver<Batch>>,
expire_after: Option<repr::Duration>,
create_if_not_exists: bool,
err_collector: ErrCollector,
Expand Down Expand Up @@ -472,7 +474,7 @@ mod test {
use super::*;
use crate::expr::Id;
use crate::plan::Plan;
use crate::repr::{RelationType, Row};
use crate::repr::RelationType;

#[test]
fn drop_handle() {
Expand All @@ -497,8 +499,8 @@ mod test {
});
let handle = rx.await.unwrap();
let src_ids = vec![GlobalId::User(1)];
let (tx, rx) = broadcast::channel::<DiffRow>(1024);
let (sink_tx, mut sink_rx) = mpsc::unbounded_channel::<DiffRow>();
let (tx, rx) = broadcast::channel::<Batch>(1024);
let (sink_tx, mut sink_rx) = mpsc::unbounded_channel::<Batch>();
let (flow_id, plan) = (
1,
TypedPlan {
Expand All @@ -523,9 +525,9 @@ mod test {
handle.create_flow(create_reqs).await.unwrap(),
Some(flow_id)
);
tx.send((Row::empty(), 0, 0)).unwrap();
tx.send(Batch::empty()).unwrap();
handle.run_available(0, true).await.unwrap();
assert_eq!(sink_rx.recv().await.unwrap().0, Row::empty());
assert_eq!(sink_rx.recv().await.unwrap(), Batch::empty());
drop(handle);
worker_thread_handle.join().unwrap();
}
Expand Down
Loading

0 comments on commit a3d567f

Please sign in to comment.