Skip to content

Commit

Permalink
store: Fix SQL query for aggregations with no dimensions
Browse files Browse the repository at this point in the history
When an aggregation only has a `count`, there are no dimensions and the SQL query for rollups would contain a trailing comma.

Fixes #5634
  • Loading branch information
YaroShkvorets committed Sep 10, 2024
1 parent b72621e commit c5640b1
Showing 1 changed file with 36 additions and 39 deletions.
75 changes: 36 additions & 39 deletions store/postgres/src/relational/rollup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,18 +332,16 @@ impl<'a> RollupSql<'a> {
Ok(IdType::String) | Ok(IdType::Int8) => "max(id)",
Err(_) => unreachable!("we make sure that the primary key has an id_type"),
};
write!(w, "select {max_id} as id, timestamp, ")?;
write!(w, "select {max_id} as id, timestamp")?;
if with_block {
write!(w, "$3, ")?;
write!(w, ", $3")?;
}
write_dims(self.dimensions, w)?;
comma_sep(self.aggregates, self.dimensions.is_empty(), w, |w, agg| {
agg.aggregate("id", w)
})?;
comma_sep(self.aggregates, w, |w, agg| agg.aggregate("id", w))?;
let secs = self.interval.as_duration().as_secs();
write!(
w,
" from (select id, date_bin('{secs}s', timestamp, 'epoch'::timestamptz) as timestamp, "
" from (select id, date_bin('{secs}s', timestamp, 'epoch'::timestamptz) as timestamp"
)?;
write_dims(self.dimensions, w)?;
let agg_srcs: Vec<&str> = {
Expand All @@ -358,9 +356,7 @@ impl<'a> RollupSql<'a> {
agg_srcs.dedup();
agg_srcs
};
comma_sep(agg_srcs, self.dimensions.is_empty(), w, |w, col: &str| {
write!(w, "\"{}\"", col)
})?;
comma_sep(agg_srcs, w, |w, col: &str| write!(w, "\"{}\"", col))?;
write!(
w,
" from {src_table} where {src_table}.timestamp >= $1 and {src_table}.timestamp < $2",
Expand All @@ -371,10 +367,7 @@ impl<'a> RollupSql<'a> {
" order by {src_table}.timestamp) data group by timestamp",
src_table = self.src_table
)?;
Ok(if !self.dimensions.is_empty() {
write!(w, ", ")?;
write_dims(self.dimensions, w)?;
})
Ok(write_dims(self.dimensions, w)?)
}

fn select(&self, w: &mut dyn fmt::Write) -> fmt::Result {
Expand All @@ -388,11 +381,11 @@ impl<'a> RollupSql<'a> {
fn insert_into(&self, w: &mut dyn fmt::Write) -> fmt::Result {
write!(
w,
"insert into {}(id, timestamp, block$, ",
"insert into {}(id, timestamp, block$",
self.agg_table.qualified_name
)?;
write_dims(self.dimensions, w)?;
comma_sep(self.aggregates, self.dimensions.is_empty(), w, |w, agg| {
comma_sep(self.aggregates, w, |w, agg| {
write!(w, "\"{}\"", agg.agg_column.name)
})?;
write!(w, ") ")
Expand All @@ -413,10 +406,10 @@ impl<'a> RollupSql<'a> {
/// for any group keys that appear in `bucket`
fn select_prev(&self, w: &mut dyn fmt::Write) -> fmt::Result {
write!(w, "select bucket.id, bucket.timestamp")?;
comma_sep(self.dimensions, false, w, |w, col| {
comma_sep(self.dimensions, w, |w, col| {
write!(w, "bucket.\"{}\"", col.name)
})?;
comma_sep(self.aggregates, false, w, |w, agg| agg.prev_agg(w))?;
comma_sep(self.aggregates, w, |w, agg| agg.prev_agg(w))?;
write!(w, " from bucket cross join lateral (")?;
write!(w, "select * from {} prev", self.agg_table.qualified_name)?;
write!(w, " where prev.timestamp < $1")?;
Expand All @@ -432,19 +425,14 @@ impl<'a> RollupSql<'a> {

fn select_combined(&self, w: &mut dyn fmt::Write) -> fmt::Result {
write!(w, "select id, timestamp")?;
comma_sep(self.dimensions, false, w, |w, col| {
write!(w, "\"{}\"", col.name)
})?;
comma_sep(self.aggregates, false, w, |w, agg| agg.combine("seq", w))?;
comma_sep(self.dimensions, w, |w, col| write!(w, "\"{}\"", col.name))?;
comma_sep(self.aggregates, w, |w, agg| agg.combine("seq", w))?;
write!(
w,
" from (select *, 1 as seq from prev union all select *, 2 as seq from bucket) u "
)?;
write!(w, " group by id, timestamp")?;
if !self.dimensions.is_empty() {
write!(w, ", ")?;
write_dims(self.dimensions, w)?;
}
write_dims(self.dimensions, w)?;
Ok(())
}

Expand Down Expand Up @@ -476,9 +464,9 @@ impl<'a> RollupSql<'a> {
self.select_cte(w)?;
write!(w, " ")?;
self.insert_into(w)?;
write!(w, "select id, timestamp, $3 as block$, ")?;
write!(w, "select id, timestamp, $3 as block$")?;
write_dims(self.dimensions, w)?;
comma_sep(self.aggregates, self.dimensions.is_empty(), w, |w, agg| {
comma_sep(self.aggregates, w, |w, agg| {
write!(w, "\"{}\"", agg.agg_column.name)
})?;
write!(w, " from combined")
Expand All @@ -495,20 +483,12 @@ impl<'a> RollupSql<'a> {

/// Write the elements in `list` separated by commas into `w`. The list
/// elements are written by calling `out` with each of them.
fn comma_sep<T, F>(
list: impl IntoIterator<Item = T>,
mut first: bool,
w: &mut dyn fmt::Write,
out: F,
) -> fmt::Result
fn comma_sep<T, F>(list: impl IntoIterator<Item = T>, w: &mut dyn fmt::Write, out: F) -> fmt::Result
where
F: Fn(&mut dyn fmt::Write, T) -> fmt::Result,
{
for elem in list {
if !first {
write!(w, ", ")?;
}
first = false;
write!(w, ", ")?;
out(w, elem)?;
}
Ok(())
Expand All @@ -517,7 +497,7 @@ where
/// Write the names of the columns in `dimensions` into `w` as a
/// comma-separated list of quoted column names.
fn write_dims(dimensions: &[&Column], w: &mut dyn fmt::Write) -> fmt::Result {
comma_sep(dimensions, true, w, |w, col| write!(w, "\"{}\"", col.name))
comma_sep(dimensions, w, |w, col| write!(w, "\"{}\"", col.name))
}

#[cfg(test)]
Expand Down Expand Up @@ -592,6 +572,12 @@ mod tests {
total_count: Int8! @aggregate(fn: "count", cumulative: true)
total_sum: BigDecimal! @aggregate(fn: "sum", arg: "amount", cumulative: true)
}
type CountOnly @aggregation(intervals: ["day"], source: "Data") {
id: Int8!
timestamp: Timestamp!
count: Int8! @aggregate(fn: "count")
}
"#;

const STATS_HOUR_SQL: &str = r#"\
Expand Down Expand Up @@ -664,6 +650,14 @@ mod tests {
select id, timestamp, $3 as block$, "count", "sum", "total_count", "total_sum" from combined
"#;

const COUNT_ONLY_SQL: &str = r#"\
insert into "sgd007"."count_only_day"(id, timestamp, block$, "count") \
select max(id) as id, timestamp, $3, count(*) as "count" \
from (select id, date_bin('86400s', timestamp, 'epoch'::timestamptz) as timestamp from "sgd007"."data" \
where "sgd007"."data".timestamp >= $1 and "sgd007"."data".timestamp < $2 \
order by "sgd007"."data".timestamp) data \
group by timestamp"#;

#[track_caller]
fn rollup_for<'a>(layout: &'a Layout, table_name: &str) -> &'a Rollup {
layout
Expand All @@ -679,7 +673,7 @@ mod tests {
let site = Arc::new(make_dummy_site(hash, nsp, "rollup".to_string()));
let catalog = Catalog::for_tests(site.clone(), BTreeSet::new()).unwrap();
let layout = Layout::new(site, &schema, catalog).unwrap();
assert_eq!(5, layout.rollups.len());
assert_eq!(6, layout.rollups.len());

// Intervals are non-decreasing
assert!(layout.rollups[0].interval <= layout.rollups[1].interval);
Expand All @@ -698,5 +692,8 @@ mod tests {

let lifetime = rollup_for(&layout, "lifetime_day");
check_eqv(LIFETIME_SQL, &lifetime.insert_sql);

let count_only = rollup_for(&layout, "count_only_day");
check_eqv(COUNT_ONLY_SQL, &count_only.insert_sql);
}
}

0 comments on commit c5640b1

Please sign in to comment.