This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 670ef41ed59dbf7069e313e5b0f8b373bd4245b4 Author: Jark Wu <j...@apache.org> AuthorDate: Tue Apr 21 00:04:12 2020 +0800 [FLINK-17169][table-blink] Fix allowLateness shouldn't affect producing updates of emit strategy This closes #11797 --- .../planner/plan/utils/WindowEmitStrategy.scala | 2 +- .../plan/stream/sql/agg/WindowAggregateTest.xml | 272 ++++++++++++--------- .../plan/stream/sql/agg/WindowAggregateTest.scala | 28 +++ .../operators/window/WindowOperatorBuilder.java | 2 - 4 files changed, 191 insertions(+), 113 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowEmitStrategy.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowEmitStrategy.scala index eff8a48..07b30f4 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowEmitStrategy.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowEmitStrategy.scala @@ -63,7 +63,7 @@ class WindowEmitStrategy( def produceUpdates: Boolean = { if (isEventTime) { - allowLateness > 0 || earlyFireDelayEnabled || lateFireDelayEnabled + earlyFireDelayEnabled || lateFireDelayEnabled } else { earlyFireDelayEnabled } diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml index 13553e4..2459697 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml @@ -134,6 +134,32 @@ Calc(select=[EXPR$0, wAvg, w$start AS EXPR$2, w$end AS EXPR$3]) ]]> </Resource> </TestCase> + <TestCase name="testWindowAggregateWithLateFire"> + <Resource name="sql"> + <![CDATA[ +SELECT TUMBLE_START(`rowtime`, INTERVAL '1' SECOND), COUNT(*) cnt +FROM MyTable +GROUP BY TUMBLE(`rowtime`, INTERVAL '1' SECOND) +]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(EXPR$0=[TUMBLE_START($0)], cnt=[$1]) ++- LogicalAggregate(group=[{0}], cnt=[COUNT()]) + +- LogicalProject($f0=[$TUMBLE($4, 1000:INTERVAL SECOND)]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[w$start AS EXPR$0, cnt], changelogMode=[I,UA]) ++- GroupWindowAggregate(window=[TumblingGroupWindow('w$, rowtime, 1000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) AS cnt, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime], emit=[late delay 5000 millisecond], changelogMode=[I,UA]) + +- Exchange(distribution=[single], changelogMode=[I]) + +- Calc(select=[rowtime], changelogMode=[I]) + +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime], changelogMode=[I]) +]]> + </Resource> + </TestCase> <TestCase name="testHopWindowWithProctime"> <Resource name="sql"> <![CDATA[ @@ -179,86 +205,6 @@ GroupWindowAggregate(window=[TumblingGroupWindow('w$, proctime, 3024000000)], se ]]> </Resource> </TestCase> - <TestCase name="testTumbleFunNotInGroupBy"> - <Resource name="sql"> - <![CDATA[ -SELECT weightedAvg(c, a) FROM - (SELECT a, b, c, - TUMBLE_START(rowtime, INTERVAL '15' MINUTE) as ping_start - FROM MyTable - GROUP BY a, b, c, TUMBLE(rowtime, INTERVAL '15' MINUTE)) AS t1 -GROUP BY b - ]]> - </Resource> - <Resource name="planBefore"> - <![CDATA[ -LogicalProject(EXPR$0=[$1]) -+- LogicalAggregate(group=[{0}], EXPR$0=[weightedAvg($1, $2)]) - +- LogicalProject(b=[$1], c=[$2], a=[$0]) - +- LogicalAggregate(group=[{0, 1, 2, 3}]) - +- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[$TUMBLE($4, 900000:INTERVAL MINUTE)]) - +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) -]]> - </Resource> - <Resource name="planAfter"> - <![CDATA[ -Calc(select=[EXPR$0]) -+- GroupAggregate(groupBy=[b], select=[b, weightedAvg(c, a) AS EXPR$0]) - +- Exchange(distribution=[hash[b]]) - +- GroupWindowAggregate(groupBy=[a, b, c], window=[TumblingGroupWindow('w$, rowtime, 900000)], select=[a, b, c]) - +- Exchange(distribution=[hash[a, b, c]]) - +- Calc(select=[a, b, c, rowtime]) - +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) -]]> - </Resource> - </TestCase> - <TestCase name="testMultiHopWindows"> - <Resource name="sql"> - <![CDATA[ -SELECT - HOP_START(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' HOUR), - HOP_END(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' HOUR), - count(*), - sum(c) -FROM MyTable -GROUP BY HOP(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' HOUR) -UNION ALL -SELECT - HOP_START(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' DAY), - HOP_END(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' DAY), - count(*), - sum(c) -FROM MyTable -GROUP BY HOP(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' DAY) - ]]> - </Resource> - <Resource name="planBefore"> - <![CDATA[ -LogicalUnion(all=[true]) -:- LogicalProject(EXPR$0=[HOP_START($0)], EXPR$1=[HOP_END($0)], EXPR$2=[$1], EXPR$3=[$2]) -: +- LogicalAggregate(group=[{0}], EXPR$2=[COUNT()], EXPR$3=[SUM($1)]) -: +- LogicalProject($f0=[HOP($4, 60000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], c=[$2]) -: +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) -+- LogicalProject(EXPR$0=[HOP_START($0)], EXPR$1=[HOP_END($0)], EXPR$2=[$1], EXPR$3=[$2]) - +- LogicalAggregate(group=[{0}], EXPR$2=[COUNT()], EXPR$3=[SUM($1)]) - +- LogicalProject($f0=[HOP($4, 60000:INTERVAL MINUTE, 86400000:INTERVAL DAY)], c=[$2]) - +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) -]]> - </Resource> - <Resource name="planAfter"> - <![CDATA[ -Union(all=[true], union=[EXPR$0, EXPR$1, EXPR$2, EXPR$3]) -:- Calc(select=[w$start AS EXPR$0, w$end AS EXPR$1, EXPR$2, EXPR$3]) -: +- GroupWindowAggregate(window=[SlidingGroupWindow('w$, rowtime, 3600000, 60000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) AS EXPR$2, SUM(c) AS EXPR$3, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) -: +- Exchange(distribution=[single], reuse_id=[1]) -: +- Calc(select=[rowtime, c]) -: +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) -+- Calc(select=[w$start AS EXPR$0, w$end AS EXPR$1, EXPR$2, EXPR$3]) - +- GroupWindowAggregate(window=[SlidingGroupWindow('w$, rowtime, 86400000, 60000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) AS EXPR$2, SUM(c) AS EXPR$3, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) - +- Reused(reference_id=[1]) -]]> - </Resource> - </TestCase> <TestCase name="testMultiHopWindowsJoin"> <Resource name="sql"> <![CDATA[ @@ -473,6 +419,40 @@ GroupWindowAggregate(window=[SessionGroupWindow('w$, $f2, 60000)], select=[SUM(a ]]> </Resource> </TestCase> + <TestCase name="testTumbleFunAndRegularAggFunInGroupBy"> + <Resource name="sql"> + <![CDATA[ +SELECT weightedAvg(c, a) FROM + (SELECT a, b, c, count(*) d, + TUMBLE_START(rowtime, INTERVAL '15' MINUTE) as ping_start + FROM MyTable + GROUP BY a, b, c, TUMBLE(rowtime, INTERVAL '15' MINUTE)) AS t1 +GROUP BY b, d, ping_start + ]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(EXPR$0=[$3]) ++- LogicalAggregate(group=[{0, 1, 2}], EXPR$0=[weightedAvg($3, $4)]) + +- LogicalProject(b=[$1], d=[$4], ping_start=[TUMBLE_START($3)], c=[$2], a=[$0]) + +- LogicalAggregate(group=[{0, 1, 2, 3}], d=[COUNT()]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[$TUMBLE($4, 900000:INTERVAL MINUTE)]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[EXPR$0]) ++- GroupAggregate(groupBy=[b, d, ping_start], select=[b, d, ping_start, weightedAvg(c, a) AS EXPR$0]) + +- Exchange(distribution=[hash[b, d, ping_start]]) + +- Calc(select=[b, d, w$start AS ping_start, c, a]) + +- GroupWindowAggregate(groupBy=[a, b, c], window=[TumblingGroupWindow('w$, rowtime, 900000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[a, b, c, COUNT(*) AS d, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) + +- Exchange(distribution=[hash[a, b, c]]) + +- Calc(select=[a, b, c, rowtime]) + +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) +]]> + </Resource> + </TestCase> <TestCase name="testTumbleFunction"> <Resource name="sql"> <![CDATA[ @@ -537,57 +517,103 @@ Calc(select=[EXPR$0]) ]]> </Resource> </TestCase> - <TestCase name="testTumblingWindowWithProctime"> + <TestCase name="testTumbleFunNotInGroupBy"> <Resource name="sql"> - <![CDATA[select sum(a), max(b) from MyTable1 group by TUMBLE(c, INTERVAL '1' SECOND)]]> + <![CDATA[ +SELECT weightedAvg(c, a) FROM + (SELECT a, b, c, + TUMBLE_START(rowtime, INTERVAL '15' MINUTE) as ping_start + FROM MyTable + GROUP BY a, b, c, TUMBLE(rowtime, INTERVAL '15' MINUTE)) AS t1 +GROUP BY b + ]]> </Resource> <Resource name="planBefore"> <![CDATA[ -LogicalProject(EXPR$0=[$1], EXPR$1=[$2]) -+- LogicalAggregate(group=[{0}], EXPR$0=[SUM($1)], EXPR$1=[MAX($2)]) - +- LogicalProject($f0=[$TUMBLE(PROCTIME(), 1000:INTERVAL SECOND)], a=[$0], b=[$1]) - +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [CollectionTableSource(a, b)]]]) +LogicalProject(EXPR$0=[$1]) ++- LogicalAggregate(group=[{0}], EXPR$0=[weightedAvg($1, $2)]) + +- LogicalProject(b=[$1], c=[$2], a=[$0]) + +- LogicalAggregate(group=[{0, 1, 2, 3}]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[$TUMBLE($4, 900000:INTERVAL MINUTE)]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) ]]> </Resource> <Resource name="planAfter"> <![CDATA[ -GroupWindowAggregate(window=[TumblingGroupWindow('w$, $f2, 1000)], select=[SUM(a) AS EXPR$0, MAX(b) AS EXPR$1]) -+- Exchange(distribution=[single]) - +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [CollectionTableSource(a, b)]]], fields=[a, b]) +Calc(select=[EXPR$0]) ++- GroupAggregate(groupBy=[b], select=[b, weightedAvg(c, a) AS EXPR$0]) + +- Exchange(distribution=[hash[b]]) + +- GroupWindowAggregate(groupBy=[a, b, c], window=[TumblingGroupWindow('w$, rowtime, 900000)], select=[a, b, c]) + +- Exchange(distribution=[hash[a, b, c]]) + +- Calc(select=[a, b, c, rowtime]) + +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) ]]> </Resource> </TestCase> - <TestCase name="testTumbleFunAndRegularAggFunInGroupBy"> + <TestCase name="testMultiHopWindows"> <Resource name="sql"> <![CDATA[ -SELECT weightedAvg(c, a) FROM - (SELECT a, b, c, count(*) d, - TUMBLE_START(rowtime, INTERVAL '15' MINUTE) as ping_start - FROM MyTable - GROUP BY a, b, c, TUMBLE(rowtime, INTERVAL '15' MINUTE)) AS t1 -GROUP BY b, d, ping_start +SELECT + HOP_START(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' HOUR), + HOP_END(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' HOUR), + count(*), + sum(c) +FROM MyTable +GROUP BY HOP(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' HOUR) +UNION ALL +SELECT + HOP_START(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' DAY), + HOP_END(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' DAY), + count(*), + sum(c) +FROM MyTable +GROUP BY HOP(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' DAY) ]]> </Resource> <Resource name="planBefore"> <![CDATA[ -LogicalProject(EXPR$0=[$3]) -+- LogicalAggregate(group=[{0, 1, 2}], EXPR$0=[weightedAvg($3, $4)]) - +- LogicalProject(b=[$1], d=[$4], ping_start=[TUMBLE_START($3)], c=[$2], a=[$0]) - +- LogicalAggregate(group=[{0, 1, 2, 3}], d=[COUNT()]) - +- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[$TUMBLE($4, 900000:INTERVAL MINUTE)]) - +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +LogicalUnion(all=[true]) +:- LogicalProject(EXPR$0=[HOP_START($0)], EXPR$1=[HOP_END($0)], EXPR$2=[$1], EXPR$3=[$2]) +: +- LogicalAggregate(group=[{0}], EXPR$2=[COUNT()], EXPR$3=[SUM($1)]) +: +- LogicalProject($f0=[HOP($4, 60000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], c=[$2]) +: +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) ++- LogicalProject(EXPR$0=[HOP_START($0)], EXPR$1=[HOP_END($0)], EXPR$2=[$1], EXPR$3=[$2]) + +- LogicalAggregate(group=[{0}], EXPR$2=[COUNT()], EXPR$3=[SUM($1)]) + +- LogicalProject($f0=[HOP($4, 60000:INTERVAL MINUTE, 86400000:INTERVAL DAY)], c=[$2]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) ]]> </Resource> <Resource name="planAfter"> <![CDATA[ -Calc(select=[EXPR$0]) -+- GroupAggregate(groupBy=[b, d, ping_start], select=[b, d, ping_start, weightedAvg(c, a) AS EXPR$0]) - +- Exchange(distribution=[hash[b, d, ping_start]]) - +- Calc(select=[b, d, w$start AS ping_start, c, a]) - +- GroupWindowAggregate(groupBy=[a, b, c], window=[TumblingGroupWindow('w$, rowtime, 900000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[a, b, c, COUNT(*) AS d, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) - +- Exchange(distribution=[hash[a, b, c]]) - +- Calc(select=[a, b, c, rowtime]) - +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) +Union(all=[true], union=[EXPR$0, EXPR$1, EXPR$2, EXPR$3]) +:- Calc(select=[w$start AS EXPR$0, w$end AS EXPR$1, EXPR$2, EXPR$3]) +: +- GroupWindowAggregate(window=[SlidingGroupWindow('w$, rowtime, 3600000, 60000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) AS EXPR$2, SUM(c) AS EXPR$3, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) +: +- Exchange(distribution=[single], reuse_id=[1]) +: +- Calc(select=[rowtime, c]) +: +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) ++- Calc(select=[w$start AS EXPR$0, w$end AS EXPR$1, EXPR$2, EXPR$3]) + +- GroupWindowAggregate(window=[SlidingGroupWindow('w$, rowtime, 86400000, 60000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) AS EXPR$2, SUM(c) AS EXPR$3, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) + +- Reused(reference_id=[1]) +]]> + </Resource> + </TestCase> + <TestCase name="testTumblingWindowWithProctime"> + <Resource name="sql"> + <![CDATA[select sum(a), max(b) from MyTable1 group by TUMBLE(c, INTERVAL '1' SECOND)]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(EXPR$0=[$1], EXPR$1=[$2]) ++- LogicalAggregate(group=[{0}], EXPR$0=[SUM($1)], EXPR$1=[MAX($2)]) + +- LogicalProject($f0=[$TUMBLE(PROCTIME(), 1000:INTERVAL SECOND)], a=[$0], b=[$1]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [CollectionTableSource(a, b)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +GroupWindowAggregate(window=[TumblingGroupWindow('w$, $f2, 1000)], select=[SUM(a) AS EXPR$0, MAX(b) AS EXPR$1]) ++- Exchange(distribution=[single]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [CollectionTableSource(a, b)]]], fields=[a, b]) ]]> </Resource> </TestCase> @@ -638,4 +664,30 @@ Union(all=[true], union=[EXPR$0]) ]]> </Resource> </TestCase> + <TestCase name="testWindowAggregateWithAllowLatenessOnly"> + <Resource name="sql"> + <![CDATA[ +SELECT TUMBLE_START(`rowtime`, INTERVAL '1' SECOND), COUNT(*) cnt +FROM MyTable +GROUP BY TUMBLE(`rowtime`, INTERVAL '1' SECOND) +]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(EXPR$0=[TUMBLE_START($0)], cnt=[$1]) ++- LogicalAggregate(group=[{0}], cnt=[COUNT()]) + +- LogicalProject($f0=[$TUMBLE($4, 1000:INTERVAL SECOND)]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[w$start AS EXPR$0, cnt], changelogMode=[I]) ++- GroupWindowAggregate(window=[TumblingGroupWindow('w$, rowtime, 1000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) AS cnt, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime], changelogMode=[I]) + +- Exchange(distribution=[single], changelogMode=[I]) + +- Calc(select=[rowtime], changelogMode=[I]) + +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime], changelogMode=[I]) +]]> + </Resource> + </TestCase> </Root> diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala index 83d5544..fe66dd4 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala @@ -18,10 +18,12 @@ package org.apache.flink.table.planner.plan.stream.sql.agg +import org.apache.flink.api.common.time.Time import org.apache.flink.api.scala._ import org.apache.flink.table.api.scala._ import org.apache.flink.table.api.{TableException, ValidationException} import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMerge +import org.apache.flink.table.planner.plan.utils.WindowEmitStrategy.{TABLE_EXEC_EMIT_LATE_FIRE_DELAY, TABLE_EXEC_EMIT_LATE_FIRE_ENABLED} import org.apache.flink.table.planner.utils.TableTestBase import org.junit.Test @@ -414,4 +416,30 @@ class WindowAggregateTest extends TableTestBase { util.verifyPlan(sql) } + + @Test + def testWindowAggregateWithLateFire(): Unit = { + util.conf.getConfiguration.setBoolean(TABLE_EXEC_EMIT_LATE_FIRE_ENABLED, true) + util.conf.getConfiguration.setString(TABLE_EXEC_EMIT_LATE_FIRE_DELAY, "5s") + util.conf.setIdleStateRetentionTime(Time.hours(1), Time.hours(2)) + val sql = + """ + |SELECT TUMBLE_START(`rowtime`, INTERVAL '1' SECOND), COUNT(*) cnt + |FROM MyTable + |GROUP BY TUMBLE(`rowtime`, INTERVAL '1' SECOND) + |""".stripMargin + util.verifyPlanWithTrait(sql) + } + + @Test + def testWindowAggregateWithAllowLatenessOnly(): Unit = { + util.conf.setIdleStateRetentionTime(Time.hours(1), Time.hours(2)) + val sql = + """ + |SELECT TUMBLE_START(`rowtime`, INTERVAL '1' SECOND), COUNT(*) cnt + |FROM MyTable + |GROUP BY TUMBLE(`rowtime`, INTERVAL '1' SECOND) + |""".stripMargin + util.verifyPlanWithTrait(sql) + } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/WindowOperatorBuilder.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/WindowOperatorBuilder.java index 9962438..4433cbc 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/WindowOperatorBuilder.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/WindowOperatorBuilder.java @@ -155,8 +155,6 @@ public class WindowOperatorBuilder { checkArgument(!allowedLateness.isNegative()); if (allowedLateness.toMillis() > 0) { this.allowedLateness = allowedLateness.toMillis(); - // allow late element, which means this window will send retractions - this.sendRetraction = true; } return this; }