Skip to content

Commit

Permalink
[FLINK-35804][table-planner] Fix incorrect calc merge during decorrel…
Browse files Browse the repository at this point in the history
…ate phase

This closes #25084

Co-authored-by: zhaorongsheng <zhaorongsheng@users.noreply.github.com>
  • Loading branch information
lincoln-lil and zhaorongsheng committed Jul 15, 2024
1 parent 7f96af3 commit acc3486
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.calcite.sql2rel;

import org.apache.flink.table.planner.plan.rules.logical.FlinkFilterProjectTransposeRule;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -125,9 +127,9 @@
* Copied to fix calcite issues. FLINK modifications are at lines
*
* <ol>
* <li>Was changed within FLINK-29280, FLINK-28682: Line 216 ~ 223
* <li>Should be removed after fix of FLINK-29540: Line 289 ~ 295
* <li>Should be removed after fix of FLINK-29540: Line 307 ~ 313
* <li>Was changed within FLINK-29280, FLINK-28682, FLINK-35804: Line 218 ~ 225, Line 273 ~ 288
* <li>Should be removed after fix of FLINK-29540: Line 293 ~ 299
* <li>Should be removed after fix of FLINK-29540: Line 311 ~ 317
* </ol>
*/
public class RelDecorrelator implements ReflectiveVisitor {
Expand Down Expand Up @@ -268,20 +270,22 @@ protected RelNode decorrelate(RelNode root) {
.FilterIntoJoinRuleConfig.class)
.toRule())
.addRuleInstance(
CoreRules.FILTER_PROJECT_TRANSPOSE
.config
.withRelBuilderFactory(f)
.as(FilterProjectTransposeRule.Config.class)
.withOperandFor(
Filter.class,
filter ->
!RexUtil.containsCorrelation(
filter.getCondition()),
Project.class,
project -> true)
.withCopyFilter(true)
.withCopyProject(true)
.toRule())
// ----- FLINK MODIFICATION BEGIN -----
FlinkFilterProjectTransposeRule.build(
CoreRules.FILTER_PROJECT_TRANSPOSE
.config
.withRelBuilderFactory(f)
.as(FilterProjectTransposeRule.Config.class)
.withOperandFor(
Filter.class,
filter ->
!RexUtil.containsCorrelation(
filter.getCondition()),
Project.class,
project -> true)
.withCopyFilter(true)
.withCopyProject(true)))
// ----- FLINK MODIFICATION END -----
.addRuleInstance(
FilterCorrelateRule.Config.DEFAULT
.withRelBuilderFactory(f)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public class FlinkFilterProjectTransposeRule extends FilterProjectTransposeRule

public static final RelOptRule INSTANCE = new FlinkFilterProjectTransposeRule(Config.DEFAULT);

public static FlinkFilterProjectTransposeRule build(Config config) {
return new FlinkFilterProjectTransposeRule(config);
}

protected FlinkFilterProjectTransposeRule(Config config) {
super(config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,37 @@ LogicalProject(a=[$0])
<![CDATA[
Calc(select=[a], where=[>(random_udf(b), 10)])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
<TestCase name="testCalcMergeWithCorrelate">
<Resource name="sql">
<![CDATA[
SELECT a, r FROM (
SELECT a, random_udf(b) r FROM (
select a, b, c1 FROM MyTable, LATERAL TABLE(str_split(c)) AS T(c1)
) t
)
WHERE r > 10
]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], r=[$1])
+- LogicalFilter(condition=[>($1, 10)])
+- LogicalProject(a=[$0], r=[random_udf($1)])
+- LogicalProject(a=[$0], b=[$1], c1=[$3])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{2}])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+- LogicalTableFunctionScan(invocation=[str_split($cor0.c)], rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[a, r], where=[>(r, 10)])
+- Calc(select=[a, random_udf(b) AS r])
+- Correlate(invocation=[str_split($cor0.c)], correlate=[table(str_split($cor0.c))], select=[a,b,c,EXPR$0], rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c, VARCHAR(2147483647) EXPR$0)], joinType=[INNER])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,37 @@ LogicalProject(a=[$0])
<![CDATA[
Calc(select=[a], where=[>(random_udf(b), 10)])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
<TestCase name="testCalcMergeWithCorrelate">
<Resource name="sql">
<![CDATA[
SELECT a, r FROM (
SELECT a, random_udf(b) r FROM (
select a, b, c1 FROM MyTable, LATERAL TABLE(str_split(c)) AS T(c1)
) t
)
WHERE r > 10
]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], r=[$1])
+- LogicalFilter(condition=[>($1, 10)])
+- LogicalProject(a=[$0], r=[random_udf($1)])
+- LogicalProject(a=[$0], b=[$1], c1=[$3])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{2}])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+- LogicalTableFunctionScan(invocation=[str_split($cor0.c)], rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[a, r], where=[>(r, 10)])
+- Calc(select=[a, random_udf(b) AS r])
+- Correlate(invocation=[str_split($cor0.c)], correlate=[table(str_split($cor0.c))], select=[a,b,c,EXPR$0], rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c, VARCHAR(2147483647) EXPR$0)], joinType=[INNER])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.planner.plan.utils.MyPojo
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.NonDeterministicUdf
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.{JavaTableFunc1, StringSplit}
import org.apache.flink.table.planner.utils.TableTestBase

import org.assertj.core.api.Assertions.assertThatExceptionOfType
Expand Down Expand Up @@ -207,4 +208,19 @@ class CalcTest extends TableTestBase {
val sqlQuery = "SELECT a FROM (SELECT a, b FROM MyTable) t WHERE random_udf(b) > 10"
util.verifyRelPlan(sqlQuery)
}

@Test
def testCalcMergeWithCorrelate(): Unit = {
util.addTemporarySystemFunction("str_split", new StringSplit())
val sqlQuery =
"""
|SELECT a, r FROM (
| SELECT a, random_udf(b) r FROM (
| select a, b, c1 FROM MyTable, LATERAL TABLE(str_split(c)) AS T(c1)
| ) t
|)
|WHERE r > 10
|""".stripMargin
util.verifyRelPlan(sqlQuery)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.planner.plan.utils.MyPojo
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.NonDeterministicUdf
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.{JavaTableFunc1, StringSplit}
import org.apache.flink.table.planner.utils.TableTestBase

import org.assertj.core.api.Assertions.assertThatExceptionOfType
Expand Down Expand Up @@ -201,4 +202,19 @@ class CalcTest extends TableTestBase {
val sqlQuery = "SELECT a FROM (SELECT a, b FROM MyTable) t WHERE random_udf(b) > 10"
util.verifyRelPlan(sqlQuery)
}

@Test
def testCalcMergeWithCorrelate(): Unit = {
util.addTemporarySystemFunction("str_split", new StringSplit())
val sqlQuery =
"""
|SELECT a, r FROM (
| SELECT a, random_udf(b) r FROM (
| select a, b, c1 FROM MyTable, LATERAL TABLE(str_split(c)) AS T(c1)
| ) t
|)
|WHERE r > 10
|""".stripMargin
util.verifyRelPlan(sqlQuery)
}
}

0 comments on commit acc3486

Please sign in to comment.