From c5640b15ac7a6eb6a5a07dddb29680c6dbfa057b Mon Sep 17 00:00:00 2001 From: Yaro Shkvorets Date: Tue, 10 Sep 2024 13:02:17 -0400 Subject: [PATCH] store: Fix SQL query for aggregations with no dimensions When an aggregation only has a `count`, there are no dimensions and the SQL query for rollups would contain a trailing comma. Fixes https://github.com/graphprotocol/graph-node/issues/5634 --- store/postgres/src/relational/rollup.rs | 75 ++++++++++++------------- 1 file changed, 36 insertions(+), 39 deletions(-) diff --git a/store/postgres/src/relational/rollup.rs b/store/postgres/src/relational/rollup.rs index 89aa22675a3..7a55bf20a75 100644 --- a/store/postgres/src/relational/rollup.rs +++ b/store/postgres/src/relational/rollup.rs @@ -332,18 +332,16 @@ impl<'a> RollupSql<'a> { Ok(IdType::String) | Ok(IdType::Int8) => "max(id)", Err(_) => unreachable!("we make sure that the primary key has an id_type"), }; - write!(w, "select {max_id} as id, timestamp, ")?; + write!(w, "select {max_id} as id, timestamp")?; if with_block { - write!(w, "$3, ")?; + write!(w, ", $3")?; } write_dims(self.dimensions, w)?; - comma_sep(self.aggregates, self.dimensions.is_empty(), w, |w, agg| { - agg.aggregate("id", w) - })?; + comma_sep(self.aggregates, w, |w, agg| agg.aggregate("id", w))?; let secs = self.interval.as_duration().as_secs(); write!( w, - " from (select id, date_bin('{secs}s', timestamp, 'epoch'::timestamptz) as timestamp, " + " from (select id, date_bin('{secs}s', timestamp, 'epoch'::timestamptz) as timestamp" )?; write_dims(self.dimensions, w)?; let agg_srcs: Vec<&str> = { @@ -358,9 +356,7 @@ impl<'a> RollupSql<'a> { agg_srcs.dedup(); agg_srcs }; - comma_sep(agg_srcs, self.dimensions.is_empty(), w, |w, col: &str| { - write!(w, "\"{}\"", col) - })?; + comma_sep(agg_srcs, w, |w, col: &str| write!(w, "\"{}\"", col))?; write!( w, " from {src_table} where {src_table}.timestamp >= $1 and {src_table}.timestamp < $2", @@ -371,10 +367,7 @@ impl<'a> RollupSql<'a> { " order by {src_table}.timestamp) data group by timestamp", src_table = self.src_table )?; - Ok(if !self.dimensions.is_empty() { - write!(w, ", ")?; - write_dims(self.dimensions, w)?; - }) + Ok(write_dims(self.dimensions, w)?) } fn select(&self, w: &mut dyn fmt::Write) -> fmt::Result { @@ -388,11 +381,11 @@ impl<'a> RollupSql<'a> { fn insert_into(&self, w: &mut dyn fmt::Write) -> fmt::Result { write!( w, - "insert into {}(id, timestamp, block$, ", + "insert into {}(id, timestamp, block$", self.agg_table.qualified_name )?; write_dims(self.dimensions, w)?; - comma_sep(self.aggregates, self.dimensions.is_empty(), w, |w, agg| { + comma_sep(self.aggregates, w, |w, agg| { write!(w, "\"{}\"", agg.agg_column.name) })?; write!(w, ") ") @@ -413,10 +406,10 @@ impl<'a> RollupSql<'a> { /// for any group keys that appear in `bucket` fn select_prev(&self, w: &mut dyn fmt::Write) -> fmt::Result { write!(w, "select bucket.id, bucket.timestamp")?; - comma_sep(self.dimensions, false, w, |w, col| { + comma_sep(self.dimensions, w, |w, col| { write!(w, "bucket.\"{}\"", col.name) })?; - comma_sep(self.aggregates, false, w, |w, agg| agg.prev_agg(w))?; + comma_sep(self.aggregates, w, |w, agg| agg.prev_agg(w))?; write!(w, " from bucket cross join lateral (")?; write!(w, "select * from {} prev", self.agg_table.qualified_name)?; write!(w, " where prev.timestamp < $1")?; @@ -432,19 +425,14 @@ impl<'a> RollupSql<'a> { fn select_combined(&self, w: &mut dyn fmt::Write) -> fmt::Result { write!(w, "select id, timestamp")?; - comma_sep(self.dimensions, false, w, |w, col| { - write!(w, "\"{}\"", col.name) - })?; - comma_sep(self.aggregates, false, w, |w, agg| agg.combine("seq", w))?; + comma_sep(self.dimensions, w, |w, col| write!(w, "\"{}\"", col.name))?; + comma_sep(self.aggregates, w, |w, agg| agg.combine("seq", w))?; write!( w, " from (select *, 1 as seq from prev union all select *, 2 as seq from bucket) u " )?; write!(w, " group by id, timestamp")?; - if !self.dimensions.is_empty() { - write!(w, ", ")?; - write_dims(self.dimensions, w)?; - } + write_dims(self.dimensions, w)?; Ok(()) } @@ -476,9 +464,9 @@ impl<'a> RollupSql<'a> { self.select_cte(w)?; write!(w, " ")?; self.insert_into(w)?; - write!(w, "select id, timestamp, $3 as block$, ")?; + write!(w, "select id, timestamp, $3 as block$")?; write_dims(self.dimensions, w)?; - comma_sep(self.aggregates, self.dimensions.is_empty(), w, |w, agg| { + comma_sep(self.aggregates, w, |w, agg| { write!(w, "\"{}\"", agg.agg_column.name) })?; write!(w, " from combined") @@ -495,20 +483,12 @@ impl<'a> RollupSql<'a> { /// Write the elements in `list` separated by commas into `w`. The list /// elements are written by calling `out` with each of them. -fn comma_sep( - list: impl IntoIterator, - mut first: bool, - w: &mut dyn fmt::Write, - out: F, -) -> fmt::Result +fn comma_sep(list: impl IntoIterator, w: &mut dyn fmt::Write, out: F) -> fmt::Result where F: Fn(&mut dyn fmt::Write, T) -> fmt::Result, { for elem in list { - if !first { - write!(w, ", ")?; - } - first = false; + write!(w, ", ")?; out(w, elem)?; } Ok(()) @@ -517,7 +497,7 @@ where /// Write the names of the columns in `dimensions` into `w` as a /// comma-separated list of quoted column names. fn write_dims(dimensions: &[&Column], w: &mut dyn fmt::Write) -> fmt::Result { - comma_sep(dimensions, true, w, |w, col| write!(w, "\"{}\"", col.name)) + comma_sep(dimensions, w, |w, col| write!(w, "\"{}\"", col.name)) } #[cfg(test)] @@ -592,6 +572,12 @@ mod tests { total_count: Int8! @aggregate(fn: "count", cumulative: true) total_sum: BigDecimal! @aggregate(fn: "sum", arg: "amount", cumulative: true) } + + type CountOnly @aggregation(intervals: ["day"], source: "Data") { + id: Int8! + timestamp: Timestamp! + count: Int8! @aggregate(fn: "count") + } "#; const STATS_HOUR_SQL: &str = r#"\ @@ -664,6 +650,14 @@ mod tests { select id, timestamp, $3 as block$, "count", "sum", "total_count", "total_sum" from combined "#; + const COUNT_ONLY_SQL: &str = r#"\ + insert into "sgd007"."count_only_day"(id, timestamp, block$, "count") \ + select max(id) as id, timestamp, $3, count(*) as "count" \ + from (select id, date_bin('86400s', timestamp, 'epoch'::timestamptz) as timestamp from "sgd007"."data" \ + where "sgd007"."data".timestamp >= $1 and "sgd007"."data".timestamp < $2 \ + order by "sgd007"."data".timestamp) data \ + group by timestamp"#; + #[track_caller] fn rollup_for<'a>(layout: &'a Layout, table_name: &str) -> &'a Rollup { layout @@ -679,7 +673,7 @@ mod tests { let site = Arc::new(make_dummy_site(hash, nsp, "rollup".to_string())); let catalog = Catalog::for_tests(site.clone(), BTreeSet::new()).unwrap(); let layout = Layout::new(site, &schema, catalog).unwrap(); - assert_eq!(5, layout.rollups.len()); + assert_eq!(6, layout.rollups.len()); // Intervals are non-decreasing assert!(layout.rollups[0].interval <= layout.rollups[1].interval); @@ -698,5 +692,8 @@ mod tests { let lifetime = rollup_for(&layout, "lifetime_day"); check_eqv(LIFETIME_SQL, &lifetime.insert_sql); + + let count_only = rollup_for(&layout, "count_only_day"); + check_eqv(COUNT_ONLY_SQL, &count_only.insert_sql); } }