This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 670ef41ed59dbf7069e313e5b0f8b373bd4245b4
Author: Jark Wu <j...@apache.org>
AuthorDate: Tue Apr 21 00:04:12 2020 +0800

    [FLINK-17169][table-blink] Fix allowLateness shouldn't affect producing 
updates of emit strategy
    
    This closes #11797
---
 .../planner/plan/utils/WindowEmitStrategy.scala    |   2 +-
 .../plan/stream/sql/agg/WindowAggregateTest.xml    | 272 ++++++++++++---------
 .../plan/stream/sql/agg/WindowAggregateTest.scala  |  28 +++
 .../operators/window/WindowOperatorBuilder.java    |   2 -
 4 files changed, 191 insertions(+), 113 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowEmitStrategy.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowEmitStrategy.scala
index eff8a48..07b30f4 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowEmitStrategy.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowEmitStrategy.scala
@@ -63,7 +63,7 @@ class WindowEmitStrategy(
 
   def produceUpdates: Boolean = {
     if (isEventTime) {
-      allowLateness > 0 || earlyFireDelayEnabled || lateFireDelayEnabled
+      earlyFireDelayEnabled || lateFireDelayEnabled
     } else {
       earlyFireDelayEnabled
     }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
index 13553e4..2459697 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
@@ -134,6 +134,32 @@ Calc(select=[EXPR$0, wAvg, w$start AS EXPR$2, w$end AS 
EXPR$3])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testWindowAggregateWithLateFire">
+    <Resource name="sql">
+      <![CDATA[
+SELECT TUMBLE_START(`rowtime`, INTERVAL '1' SECOND), COUNT(*) cnt
+FROM MyTable
+GROUP BY TUMBLE(`rowtime`, INTERVAL '1' SECOND)
+]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(EXPR$0=[TUMBLE_START($0)], cnt=[$1])
++- LogicalAggregate(group=[{0}], cnt=[COUNT()])
+   +- LogicalProject($f0=[$TUMBLE($4, 1000:INTERVAL SECOND)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[w$start AS EXPR$0, cnt], changelogMode=[I,UA])
++- GroupWindowAggregate(window=[TumblingGroupWindow('w$, rowtime, 1000)], 
properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) AS cnt, 
start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, 
proctime('w$) AS w$proctime], emit=[late delay 5000 millisecond], 
changelogMode=[I,UA])
+   +- Exchange(distribution=[single], changelogMode=[I])
+      +- Calc(select=[rowtime], changelogMode=[I])
+         +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime], changelogMode=[I])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testHopWindowWithProctime">
     <Resource name="sql">
       <![CDATA[
@@ -179,86 +205,6 @@ GroupWindowAggregate(window=[TumblingGroupWindow('w$, 
proctime, 3024000000)], se
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testTumbleFunNotInGroupBy">
-    <Resource name="sql">
-      <![CDATA[
-SELECT weightedAvg(c, a) FROM
-    (SELECT a, b, c,
-        TUMBLE_START(rowtime, INTERVAL '15' MINUTE) as ping_start
-     FROM MyTable
-         GROUP BY a, b, c, TUMBLE(rowtime, INTERVAL '15' MINUTE)) AS t1
-GROUP BY b
-      ]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalProject(EXPR$0=[$1])
-+- LogicalAggregate(group=[{0}], EXPR$0=[weightedAvg($1, $2)])
-   +- LogicalProject(b=[$1], c=[$2], a=[$0])
-      +- LogicalAggregate(group=[{0, 1, 2, 3}])
-         +- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[$TUMBLE($4, 
900000:INTERVAL MINUTE)])
-            +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-Calc(select=[EXPR$0])
-+- GroupAggregate(groupBy=[b], select=[b, weightedAvg(c, a) AS EXPR$0])
-   +- Exchange(distribution=[hash[b]])
-      +- GroupWindowAggregate(groupBy=[a, b, c], 
window=[TumblingGroupWindow('w$, rowtime, 900000)], select=[a, b, c])
-         +- Exchange(distribution=[hash[a, b, c]])
-            +- Calc(select=[a, b, c, rowtime])
-               +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testMultiHopWindows">
-    <Resource name="sql">
-      <![CDATA[
-SELECT
-   HOP_START(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' HOUR),
-   HOP_END(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' HOUR),
-   count(*),
-   sum(c)
-FROM MyTable
-GROUP BY HOP(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' HOUR)
-UNION ALL
-SELECT
-   HOP_START(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' DAY),
-   HOP_END(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' DAY),
-   count(*),
-   sum(c)
-FROM MyTable
-GROUP BY HOP(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' DAY)
-      ]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalUnion(all=[true])
-:- LogicalProject(EXPR$0=[HOP_START($0)], EXPR$1=[HOP_END($0)], EXPR$2=[$1], 
EXPR$3=[$2])
-:  +- LogicalAggregate(group=[{0}], EXPR$2=[COUNT()], EXPR$3=[SUM($1)])
-:     +- LogicalProject($f0=[HOP($4, 60000:INTERVAL MINUTE, 3600000:INTERVAL 
HOUR)], c=[$2])
-:        +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
-+- LogicalProject(EXPR$0=[HOP_START($0)], EXPR$1=[HOP_END($0)], EXPR$2=[$1], 
EXPR$3=[$2])
-   +- LogicalAggregate(group=[{0}], EXPR$2=[COUNT()], EXPR$3=[SUM($1)])
-      +- LogicalProject($f0=[HOP($4, 60000:INTERVAL MINUTE, 86400000:INTERVAL 
DAY)], c=[$2])
-         +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-Union(all=[true], union=[EXPR$0, EXPR$1, EXPR$2, EXPR$3])
-:- Calc(select=[w$start AS EXPR$0, w$end AS EXPR$1, EXPR$2, EXPR$3])
-:  +- GroupWindowAggregate(window=[SlidingGroupWindow('w$, rowtime, 3600000, 
60000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) 
AS EXPR$2, SUM(c) AS EXPR$3, start('w$) AS w$start, end('w$) AS w$end, 
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
-:     +- Exchange(distribution=[single], reuse_id=[1])
-:        +- Calc(select=[rowtime, c])
-:           +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
-+- Calc(select=[w$start AS EXPR$0, w$end AS EXPR$1, EXPR$2, EXPR$3])
-   +- GroupWindowAggregate(window=[SlidingGroupWindow('w$, rowtime, 86400000, 
60000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) 
AS EXPR$2, SUM(c) AS EXPR$3, start('w$) AS w$start, end('w$) AS w$end, 
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
-      +- Reused(reference_id=[1])
-]]>
-    </Resource>
-  </TestCase>
   <TestCase name="testMultiHopWindowsJoin">
     <Resource name="sql">
       <![CDATA[
@@ -473,6 +419,40 @@ GroupWindowAggregate(window=[SessionGroupWindow('w$, $f2, 
60000)], select=[SUM(a
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testTumbleFunAndRegularAggFunInGroupBy">
+    <Resource name="sql">
+      <![CDATA[
+SELECT weightedAvg(c, a) FROM
+    (SELECT a, b, c, count(*) d,
+        TUMBLE_START(rowtime, INTERVAL '15' MINUTE) as ping_start
+     FROM MyTable
+         GROUP BY a, b, c, TUMBLE(rowtime, INTERVAL '15' MINUTE)) AS t1
+GROUP BY b, d, ping_start
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(EXPR$0=[$3])
++- LogicalAggregate(group=[{0, 1, 2}], EXPR$0=[weightedAvg($3, $4)])
+   +- LogicalProject(b=[$1], d=[$4], ping_start=[TUMBLE_START($3)], c=[$2], 
a=[$0])
+      +- LogicalAggregate(group=[{0, 1, 2, 3}], d=[COUNT()])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[$TUMBLE($4, 
900000:INTERVAL MINUTE)])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[EXPR$0])
++- GroupAggregate(groupBy=[b, d, ping_start], select=[b, d, ping_start, 
weightedAvg(c, a) AS EXPR$0])
+   +- Exchange(distribution=[hash[b, d, ping_start]])
+      +- Calc(select=[b, d, w$start AS ping_start, c, a])
+         +- GroupWindowAggregate(groupBy=[a, b, c], 
window=[TumblingGroupWindow('w$, rowtime, 900000)], properties=[w$start, w$end, 
w$rowtime, w$proctime], select=[a, b, c, COUNT(*) AS d, start('w$) AS w$start, 
end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+            +- Exchange(distribution=[hash[a, b, c]])
+               +- Calc(select=[a, b, c, rowtime])
+                  +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testTumbleFunction">
     <Resource name="sql">
       <![CDATA[
@@ -537,57 +517,103 @@ Calc(select=[EXPR$0])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testTumblingWindowWithProctime">
+  <TestCase name="testTumbleFunNotInGroupBy">
     <Resource name="sql">
-      <![CDATA[select sum(a), max(b) from MyTable1 group by TUMBLE(c, INTERVAL 
'1' SECOND)]]>
+      <![CDATA[
+SELECT weightedAvg(c, a) FROM
+    (SELECT a, b, c,
+        TUMBLE_START(rowtime, INTERVAL '15' MINUTE) as ping_start
+     FROM MyTable
+         GROUP BY a, b, c, TUMBLE(rowtime, INTERVAL '15' MINUTE)) AS t1
+GROUP BY b
+      ]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(EXPR$0=[$1], EXPR$1=[$2])
-+- LogicalAggregate(group=[{0}], EXPR$0=[SUM($1)], EXPR$1=[MAX($2)])
-   +- LogicalProject($f0=[$TUMBLE(PROCTIME(), 1000:INTERVAL SECOND)], a=[$0], 
b=[$1])
-      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [CollectionTableSource(a, b)]]])
+LogicalProject(EXPR$0=[$1])
++- LogicalAggregate(group=[{0}], EXPR$0=[weightedAvg($1, $2)])
+   +- LogicalProject(b=[$1], c=[$2], a=[$0])
+      +- LogicalAggregate(group=[{0, 1, 2, 3}])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[$TUMBLE($4, 
900000:INTERVAL MINUTE)])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-GroupWindowAggregate(window=[TumblingGroupWindow('w$, $f2, 1000)], 
select=[SUM(a) AS EXPR$0, MAX(b) AS EXPR$1])
-+- Exchange(distribution=[single])
-   +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, 
source: [CollectionTableSource(a, b)]]], fields=[a, b])
+Calc(select=[EXPR$0])
++- GroupAggregate(groupBy=[b], select=[b, weightedAvg(c, a) AS EXPR$0])
+   +- Exchange(distribution=[hash[b]])
+      +- GroupWindowAggregate(groupBy=[a, b, c], 
window=[TumblingGroupWindow('w$, rowtime, 900000)], select=[a, b, c])
+         +- Exchange(distribution=[hash[a, b, c]])
+            +- Calc(select=[a, b, c, rowtime])
+               +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testTumbleFunAndRegularAggFunInGroupBy">
+  <TestCase name="testMultiHopWindows">
     <Resource name="sql">
       <![CDATA[
-SELECT weightedAvg(c, a) FROM
-    (SELECT a, b, c, count(*) d,
-        TUMBLE_START(rowtime, INTERVAL '15' MINUTE) as ping_start
-     FROM MyTable
-         GROUP BY a, b, c, TUMBLE(rowtime, INTERVAL '15' MINUTE)) AS t1
-GROUP BY b, d, ping_start
+SELECT
+   HOP_START(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' HOUR),
+   HOP_END(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' HOUR),
+   count(*),
+   sum(c)
+FROM MyTable
+GROUP BY HOP(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' HOUR)
+UNION ALL
+SELECT
+   HOP_START(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' DAY),
+   HOP_END(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' DAY),
+   count(*),
+   sum(c)
+FROM MyTable
+GROUP BY HOP(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' DAY)
       ]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(EXPR$0=[$3])
-+- LogicalAggregate(group=[{0, 1, 2}], EXPR$0=[weightedAvg($3, $4)])
-   +- LogicalProject(b=[$1], d=[$4], ping_start=[TUMBLE_START($3)], c=[$2], 
a=[$0])
-      +- LogicalAggregate(group=[{0, 1, 2, 3}], d=[COUNT()])
-         +- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[$TUMBLE($4, 
900000:INTERVAL MINUTE)])
-            +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
+LogicalUnion(all=[true])
+:- LogicalProject(EXPR$0=[HOP_START($0)], EXPR$1=[HOP_END($0)], EXPR$2=[$1], 
EXPR$3=[$2])
+:  +- LogicalAggregate(group=[{0}], EXPR$2=[COUNT()], EXPR$3=[SUM($1)])
+:     +- LogicalProject($f0=[HOP($4, 60000:INTERVAL MINUTE, 3600000:INTERVAL 
HOUR)], c=[$2])
+:        +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
++- LogicalProject(EXPR$0=[HOP_START($0)], EXPR$1=[HOP_END($0)], EXPR$2=[$1], 
EXPR$3=[$2])
+   +- LogicalAggregate(group=[{0}], EXPR$2=[COUNT()], EXPR$3=[SUM($1)])
+      +- LogicalProject($f0=[HOP($4, 60000:INTERVAL MINUTE, 86400000:INTERVAL 
DAY)], c=[$2])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[EXPR$0])
-+- GroupAggregate(groupBy=[b, d, ping_start], select=[b, d, ping_start, 
weightedAvg(c, a) AS EXPR$0])
-   +- Exchange(distribution=[hash[b, d, ping_start]])
-      +- Calc(select=[b, d, w$start AS ping_start, c, a])
-         +- GroupWindowAggregate(groupBy=[a, b, c], 
window=[TumblingGroupWindow('w$, rowtime, 900000)], properties=[w$start, w$end, 
w$rowtime, w$proctime], select=[a, b, c, COUNT(*) AS d, start('w$) AS w$start, 
end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
-            +- Exchange(distribution=[hash[a, b, c]])
-               +- Calc(select=[a, b, c, rowtime])
-                  +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
+Union(all=[true], union=[EXPR$0, EXPR$1, EXPR$2, EXPR$3])
+:- Calc(select=[w$start AS EXPR$0, w$end AS EXPR$1, EXPR$2, EXPR$3])
+:  +- GroupWindowAggregate(window=[SlidingGroupWindow('w$, rowtime, 3600000, 
60000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) 
AS EXPR$2, SUM(c) AS EXPR$3, start('w$) AS w$start, end('w$) AS w$end, 
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+:     +- Exchange(distribution=[single], reuse_id=[1])
+:        +- Calc(select=[rowtime, c])
+:           +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
++- Calc(select=[w$start AS EXPR$0, w$end AS EXPR$1, EXPR$2, EXPR$3])
+   +- GroupWindowAggregate(window=[SlidingGroupWindow('w$, rowtime, 86400000, 
60000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) 
AS EXPR$2, SUM(c) AS EXPR$3, start('w$) AS w$start, end('w$) AS w$end, 
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+      +- Reused(reference_id=[1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTumblingWindowWithProctime">
+    <Resource name="sql">
+      <![CDATA[select sum(a), max(b) from MyTable1 group by TUMBLE(c, INTERVAL 
'1' SECOND)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(EXPR$0=[$1], EXPR$1=[$2])
++- LogicalAggregate(group=[{0}], EXPR$0=[SUM($1)], EXPR$1=[MAX($2)])
+   +- LogicalProject($f0=[$TUMBLE(PROCTIME(), 1000:INTERVAL SECOND)], a=[$0], 
b=[$1])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [CollectionTableSource(a, b)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+GroupWindowAggregate(window=[TumblingGroupWindow('w$, $f2, 1000)], 
select=[SUM(a) AS EXPR$0, MAX(b) AS EXPR$1])
++- Exchange(distribution=[single])
+   +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, 
source: [CollectionTableSource(a, b)]]], fields=[a, b])
 ]]>
     </Resource>
   </TestCase>
@@ -638,4 +664,30 @@ Union(all=[true], union=[EXPR$0])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testWindowAggregateWithAllowLatenessOnly">
+    <Resource name="sql">
+      <![CDATA[
+SELECT TUMBLE_START(`rowtime`, INTERVAL '1' SECOND), COUNT(*) cnt
+FROM MyTable
+GROUP BY TUMBLE(`rowtime`, INTERVAL '1' SECOND)
+]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(EXPR$0=[TUMBLE_START($0)], cnt=[$1])
++- LogicalAggregate(group=[{0}], cnt=[COUNT()])
+   +- LogicalProject($f0=[$TUMBLE($4, 1000:INTERVAL SECOND)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[w$start AS EXPR$0, cnt], changelogMode=[I])
++- GroupWindowAggregate(window=[TumblingGroupWindow('w$, rowtime, 1000)], 
properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) AS cnt, 
start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, 
proctime('w$) AS w$proctime], changelogMode=[I])
+   +- Exchange(distribution=[single], changelogMode=[I])
+      +- Calc(select=[rowtime], changelogMode=[I])
+         +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime], changelogMode=[I])
+]]>
+    </Resource>
+  </TestCase>
 </Root>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
index 83d5544..fe66dd4 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
@@ -18,10 +18,12 @@
 
 package org.apache.flink.table.planner.plan.stream.sql.agg
 
+import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.{TableException, ValidationException}
 import 
org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMerge
+import 
org.apache.flink.table.planner.plan.utils.WindowEmitStrategy.{TABLE_EXEC_EMIT_LATE_FIRE_DELAY,
 TABLE_EXEC_EMIT_LATE_FIRE_ENABLED}
 import org.apache.flink.table.planner.utils.TableTestBase
 
 import org.junit.Test
@@ -414,4 +416,30 @@ class WindowAggregateTest extends TableTestBase {
 
     util.verifyPlan(sql)
   }
+
+  @Test
+  def testWindowAggregateWithLateFire(): Unit = {
+    util.conf.getConfiguration.setBoolean(TABLE_EXEC_EMIT_LATE_FIRE_ENABLED, 
true)
+    util.conf.getConfiguration.setString(TABLE_EXEC_EMIT_LATE_FIRE_DELAY, "5s")
+    util.conf.setIdleStateRetentionTime(Time.hours(1), Time.hours(2))
+    val sql =
+      """
+        |SELECT TUMBLE_START(`rowtime`, INTERVAL '1' SECOND), COUNT(*) cnt
+        |FROM MyTable
+        |GROUP BY TUMBLE(`rowtime`, INTERVAL '1' SECOND)
+        |""".stripMargin
+    util.verifyPlanWithTrait(sql)
+  }
+
+  @Test
+  def testWindowAggregateWithAllowLatenessOnly(): Unit = {
+    util.conf.setIdleStateRetentionTime(Time.hours(1), Time.hours(2))
+    val sql =
+      """
+        |SELECT TUMBLE_START(`rowtime`, INTERVAL '1' SECOND), COUNT(*) cnt
+        |FROM MyTable
+        |GROUP BY TUMBLE(`rowtime`, INTERVAL '1' SECOND)
+        |""".stripMargin
+    util.verifyPlanWithTrait(sql)
+  }
 }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/WindowOperatorBuilder.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/WindowOperatorBuilder.java
index 9962438..4433cbc 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/WindowOperatorBuilder.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/WindowOperatorBuilder.java
@@ -155,8 +155,6 @@ public class WindowOperatorBuilder {
                checkArgument(!allowedLateness.isNegative());
                if (allowedLateness.toMillis() > 0) {
                        this.allowedLateness = allowedLateness.toMillis();
-                       // allow late element, which means this window will 
send retractions
-                       this.sendRetraction = true;
                }
                return this;
        }

Reply via email to