diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java index 10c2764a8072a..8aed985a7b081 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java @@ -16,6 +16,8 @@ */ package org.apache.calcite.sql2rel; +import org.apache.flink.table.planner.plan.rules.logical.FlinkFilterProjectTransposeRule; + import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -125,9 +127,9 @@ * Copied to fix calcite issues. FLINK modifications are at lines * *
    - *
  1. Was changed within FLINK-29280, FLINK-28682: Line 216 ~ 223 - *
  2. Should be removed after fix of FLINK-29540: Line 289 ~ 295 - *
  3. Should be removed after fix of FLINK-29540: Line 307 ~ 313 + *
  4. Was changed within FLINK-29280, FLINK-28682, FLINK-35804: Line 218 ~ 225, Line 273 ~ 288 + *
  5. Should be removed after fix of FLINK-29540: Line 293 ~ 299 + *
  6. Should be removed after fix of FLINK-29540: Line 311 ~ 317 *
*/ public class RelDecorrelator implements ReflectiveVisitor { @@ -268,20 +270,22 @@ protected RelNode decorrelate(RelNode root) { .FilterIntoJoinRuleConfig.class) .toRule()) .addRuleInstance( - CoreRules.FILTER_PROJECT_TRANSPOSE - .config - .withRelBuilderFactory(f) - .as(FilterProjectTransposeRule.Config.class) - .withOperandFor( - Filter.class, - filter -> - !RexUtil.containsCorrelation( - filter.getCondition()), - Project.class, - project -> true) - .withCopyFilter(true) - .withCopyProject(true) - .toRule()) + // ----- FLINK MODIFICATION BEGIN ----- + FlinkFilterProjectTransposeRule.build( + CoreRules.FILTER_PROJECT_TRANSPOSE + .config + .withRelBuilderFactory(f) + .as(FilterProjectTransposeRule.Config.class) + .withOperandFor( + Filter.class, + filter -> + !RexUtil.containsCorrelation( + filter.getCondition()), + Project.class, + project -> true) + .withCopyFilter(true) + .withCopyProject(true))) + // ----- FLINK MODIFICATION END ----- .addRuleInstance( FilterCorrelateRule.Config.DEFAULT .withRelBuilderFactory(f) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterProjectTransposeRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterProjectTransposeRule.java index fdca581b61263..54d6f277c8c6d 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterProjectTransposeRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterProjectTransposeRule.java @@ -39,6 +39,10 @@ public class FlinkFilterProjectTransposeRule extends FilterProjectTransposeRule public static final RelOptRule INSTANCE = new FlinkFilterProjectTransposeRule(Config.DEFAULT); + public static FlinkFilterProjectTransposeRule build(Config config) { + return new FlinkFilterProjectTransposeRule(config); + } + protected FlinkFilterProjectTransposeRule(Config config) { super(config); } diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml index 0b04d0a792435..69692846ffd63 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml @@ -69,6 +69,37 @@ LogicalProject(a=[$0]) (random_udf(b), 10)]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +]]> + + + + + 10 +]]> + + + ($1, 10)]) + +- LogicalProject(a=[$0], r=[random_udf($1)]) + +- LogicalProject(a=[$0], b=[$1], c1=[$3]) + +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{2}]) + :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) + +- LogicalTableFunctionScan(invocation=[str_split($cor0.c)], rowType=[RecordType(VARCHAR(2147483647) EXPR$0)]) +]]> + + + (r, 10)]) ++- Calc(select=[a, random_udf(b) AS r]) + +- Correlate(invocation=[str_split($cor0.c)], correlate=[table(str_split($cor0.c))], select=[a,b,c,EXPR$0], rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c, VARCHAR(2147483647) EXPR$0)], joinType=[INNER]) + +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml index 4397895cebcc0..215a6aa8eeac9 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml @@ -69,6 +69,37 @@ LogicalProject(a=[$0]) (random_udf(b), 10)]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +]]> + + + + + 10 +]]> + + + ($1, 10)]) + +- LogicalProject(a=[$0], r=[random_udf($1)]) + +- LogicalProject(a=[$0], b=[$1], c1=[$3]) + +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{2}]) + :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) + +- LogicalTableFunctionScan(invocation=[str_split($cor0.c)], rowType=[RecordType(VARCHAR(2147483647) EXPR$0)]) +]]> + + + (r, 10)]) ++- Calc(select=[a, random_udf(b) AS r]) + +- Correlate(invocation=[str_split($cor0.c)], correlate=[table(str_split($cor0.c))], select=[a,b,c,EXPR$0], rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c, VARCHAR(2147483647) EXPR$0)], joinType=[INNER]) + +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala index abd196d34af2c..49bd11af1128b 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala @@ -23,6 +23,7 @@ import org.apache.flink.api.scala._ import org.apache.flink.table.api._ import org.apache.flink.table.planner.plan.utils.MyPojo import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.NonDeterministicUdf +import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.{JavaTableFunc1, StringSplit} import org.apache.flink.table.planner.utils.TableTestBase import org.assertj.core.api.Assertions.assertThatExceptionOfType @@ -207,4 +208,19 @@ class CalcTest extends TableTestBase { val sqlQuery = "SELECT a FROM (SELECT a, b FROM MyTable) t WHERE random_udf(b) > 10" util.verifyRelPlan(sqlQuery) } + + @Test + def testCalcMergeWithCorrelate(): Unit = { + util.addTemporarySystemFunction("str_split", new StringSplit()) + val sqlQuery = + """ + |SELECT a, r FROM ( + | SELECT a, random_udf(b) r FROM ( + | select a, b, c1 FROM MyTable, LATERAL TABLE(str_split(c)) AS T(c1) + | ) t + |) + |WHERE r > 10 + |""".stripMargin + util.verifyRelPlan(sqlQuery) + } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala index a0643c21e6266..1c62fc054e2ea 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala @@ -23,6 +23,7 @@ import org.apache.flink.api.scala._ import org.apache.flink.table.api._ import org.apache.flink.table.planner.plan.utils.MyPojo import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.NonDeterministicUdf +import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.{JavaTableFunc1, StringSplit} import org.apache.flink.table.planner.utils.TableTestBase import org.assertj.core.api.Assertions.assertThatExceptionOfType @@ -201,4 +202,19 @@ class CalcTest extends TableTestBase { val sqlQuery = "SELECT a FROM (SELECT a, b FROM MyTable) t WHERE random_udf(b) > 10" util.verifyRelPlan(sqlQuery) } + + @Test + def testCalcMergeWithCorrelate(): Unit = { + util.addTemporarySystemFunction("str_split", new StringSplit()) + val sqlQuery = + """ + |SELECT a, r FROM ( + | SELECT a, random_udf(b) r FROM ( + | select a, b, c1 FROM MyTable, LATERAL TABLE(str_split(c)) AS T(c1) + | ) t + |) + |WHERE r > 10 + |""".stripMargin + util.verifyRelPlan(sqlQuery) + } }