Skip to content

Commit

Permalink
[FLINK-35829][table-planner] The requireWatermark property of StreamP…
Browse files Browse the repository at this point in the history
…hysicalWindowTableFunction needs to check the window's time type

This closes #25083
  • Loading branch information
xuyangzhong committed Jul 13, 2024
1 parent 41a1409 commit 7f96af3
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class StreamPhysicalWindowTableFunction(
extends CommonPhysicalWindowTableFunction(cluster, traitSet, inputRel, outputRowType, windowing)
with StreamPhysicalRel {

override def requireWatermark: Boolean = true
override def requireWatermark: Boolean = windowing.isRowtime

override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
new StreamPhysicalWindowTableFunction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,34 @@ Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, wi
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
]]>
</Resource>
</TestCase>
<TestCase name="testRowtimeWindowTVFWithMiniBatch">
<Resource name="sql">
<![CDATA[
SELECT *
FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5], window_start=[$6], window_end=[$7], window_time=[$8])
+- LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($4), 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 1000:INTERVAL SECOND)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[PROCTIME()])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, window_start, window_end, window_time])
+- WindowTableFunction(window=[TUMBLE(time_col=[rowtime], size=[15 min])])
+- MiniBatchAssigner(interval=[5000ms], mode=[RowTime])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
]]>
</Resource>
</TestCase>
Expand Down Expand Up @@ -407,17 +435,17 @@ Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, wi
]]>
</Resource>
</TestCase>
<TestCase name="testTumbleTVF">
<TestCase name="testTumbleTVFProctime">
<Resource name="sql">
<![CDATA[
SELECT *
FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '15' MINUTE))
]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5], window_start=[$6], window_end=[$7], window_time=[$8])
+- LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($4), 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+- LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($5), 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 1000:INTERVAL SECOND)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[PROCTIME()])
Expand All @@ -426,15 +454,15 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5], wind
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, window_start, window_end, window_time])
+- WindowTableFunction(window=[TUMBLE(time_col=[rowtime], size=[15 min])])
Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, window_start, window_end, PROCTIME_MATERIALIZE(window_time) AS window_time])
+- WindowTableFunction(window=[TUMBLE(time_col=[proctime], size=[15 min])])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
]]>
</Resource>
</TestCase>
<TestCase name="testTumbleTVFProctime">
<TestCase name="testProctimeWindowTVFWithMiniBatch">
<Resource name="sql">
<![CDATA[
SELECT *
Expand All @@ -455,6 +483,34 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5], wind
<![CDATA[
Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, window_start, window_end, PROCTIME_MATERIALIZE(window_time) AS window_time])
+- WindowTableFunction(window=[TUMBLE(time_col=[proctime], size=[15 min])])
+- MiniBatchAssigner(interval=[5000ms], mode=[ProcTime])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
]]>
</Resource>
</TestCase>
<TestCase name="testTumbleTVF">
<Resource name="sql">
<![CDATA[
SELECT *
FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5], window_start=[$6], window_end=[$7], window_time=[$8])
+- LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($4), 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 1000:INTERVAL SECOND)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[PROCTIME()])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, window_start, window_end, window_time])
+- WindowTableFunction(window=[TUMBLE(time_col=[rowtime], size=[15 min])])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
package org.apache.flink.table.planner.plan.stream.sql

import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.planner.utils.TableTestBase

import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.jupiter.api.Test

import java.time.Duration

/** Tests for window table-valued function. */
class WindowTableFunctionTest extends TableTestBase {

Expand Down Expand Up @@ -322,4 +325,38 @@ class WindowTableFunctionTest extends TableTestBase {

}

@Test
def testProctimeWindowTVFWithMiniBatch(): Unit = {
enableMiniBatch()
val sql =
"""
|SELECT *
|FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '15' MINUTE))
|""".stripMargin
util.verifyRelPlan(sql)
}

@Test
def testRowtimeWindowTVFWithMiniBatch(): Unit = {
enableMiniBatch()
val sql =
"""
|SELECT *
|FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
|""".stripMargin
util.verifyRelPlan(sql)
}

private def enableMiniBatch(): Unit = {
util.tableConfig.set(
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED,
java.lang.Boolean.TRUE)
util.tableConfig.set(
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE,
java.lang.Long.valueOf(5L))
util.tableConfig.set(
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
Duration.ofSeconds(5L))
}

}

0 comments on commit 7f96af3

Please sign in to comment.