[jira] [Commented] (FLINK-15494) time field index wrong in LogicalWindowAggregateRuleBase

2020-01-31 Thread Yu Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17027982#comment-17027982
 ] 

Yu Li commented on FLINK-15494:
---

Thanks for the confirmation [~lzljs3620320], good to know :-)

> time field index wrong in LogicalWindowAggregateRuleBase
> 
>
> Key: FLINK-15494
> URL: https://issues.apache.org/jira/browse/FLINK-15494
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.9.1, 1.10.0
>Reporter: Benchao Li
>Assignee: Benchao Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0, 1.9.3
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> When we use two time window in blink planner, will result in calculating 
> wrong time field index.
>  
> This has two phenomenon as far as I know:
>  # wrong index may has correct field name, and will pass the build, but has 
> wrong rowtime in runtime.
>  # wrong index has incorrect field name, and will not pass the build.
>  
> How to reproduce this problem:
> I added a unit test in `WindowAggregateITCase`:
> {code:java}
> @Test
> def testDoubleTumbleWindow(): Unit = {
>   val stream = failingDataSource(data)
> .assignTimestampsAndWatermarks(
>   new TimestampAndWatermarkWithOffset
> [(Long, Int, Double, Float, BigDecimal, String, String)](10L))
>   val table = stream.toTable(tEnv,
> 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string, 'name)
>   tEnv.registerTable("T1", table)
>   val sql =
> """
>   |SELECT SUM(cnt)
>   |FROM (
>   |  SELECT COUNT(1) AS cnt, TUMBLE_ROWTIME(rowtime, INTERVAL '10' 
> SECOND) AS ts
>   |  FROM T1
>   |  GROUP BY `int`, `string`, TUMBLE(rowtime, INTERVAL '10' SECOND)
>   |)
>   |GROUP BY TUMBLE(ts, INTERVAL '10' SECOND)
>   |""".stripMargin
>   val sink = new TestingAppendSink
>   tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
>   env.execute()
>   val expected = Seq("9")
>   assertEquals(expected.sorted, sink.getAppendResults.sorted)
> }
> {code}
> which will result in exception:
> {code:java}
> java.lang.RuntimeException: Error while applying rule 
> StreamExecGroupWindowAggregateRule(in:LOGICAL,out:STREAM_PHYSICAL), args 
> [rel#308:FlinkLogicalWindowAggregate.LOGICAL.any.None: 
> 0.false.UNKNOWN(input=RelSubset#307,group={},EXPR$0=SUM($1),window=TumblingGroupWindow('w$,
>  int, 1),properties=)]java.lang.RuntimeException: Error while applying 
> rule StreamExecGroupWindowAggregateRule(in:LOGICAL,out:STREAM_PHYSICAL), args 
> [rel#308:FlinkLogicalWindowAggregate.LOGICAL.any.None: 
> 0.false.UNKNOWN(input=RelSubset#307,group={},EXPR$0=SUM($1),window=TumblingGroupWindow('w$,
>  int, 1),properties=)]
>  at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
>  at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:631)
>  at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327) 
> at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>  at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:891) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at 
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at 
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>  at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:167)
>  at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:89)
>  at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>  at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>  at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>  at 
> 

[jira] [Commented] (FLINK-15494) time field index wrong in LogicalWindowAggregateRuleBase

2020-01-31 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17027979#comment-17027979
 ] 

Jingsong Lee commented on FLINK-15494:
--

I think it should not be a blocker, but nice to merge it in 1.10. [~jark] will 
take a look and try merging but no guarantee.

> time field index wrong in LogicalWindowAggregateRuleBase
> 
>
> Key: FLINK-15494
> URL: https://issues.apache.org/jira/browse/FLINK-15494
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.9.1, 1.10.0
>Reporter: Benchao Li
>Assignee: Benchao Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0, 1.9.3
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> When we use two time window in blink planner, will result in calculating 
> wrong time field index.
>  
> This has two phenomenon as far as I know:
>  # wrong index may has correct field name, and will pass the build, but has 
> wrong rowtime in runtime.
>  # wrong index has incorrect field name, and will not pass the build.
>  
> How to reproduce this problem:
> I added a unit test in `WindowAggregateITCase`:
> {code:java}
> @Test
> def testDoubleTumbleWindow(): Unit = {
>   val stream = failingDataSource(data)
> .assignTimestampsAndWatermarks(
>   new TimestampAndWatermarkWithOffset
> [(Long, Int, Double, Float, BigDecimal, String, String)](10L))
>   val table = stream.toTable(tEnv,
> 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string, 'name)
>   tEnv.registerTable("T1", table)
>   val sql =
> """
>   |SELECT SUM(cnt)
>   |FROM (
>   |  SELECT COUNT(1) AS cnt, TUMBLE_ROWTIME(rowtime, INTERVAL '10' 
> SECOND) AS ts
>   |  FROM T1
>   |  GROUP BY `int`, `string`, TUMBLE(rowtime, INTERVAL '10' SECOND)
>   |)
>   |GROUP BY TUMBLE(ts, INTERVAL '10' SECOND)
>   |""".stripMargin
>   val sink = new TestingAppendSink
>   tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
>   env.execute()
>   val expected = Seq("9")
>   assertEquals(expected.sorted, sink.getAppendResults.sorted)
> }
> {code}
> which will result in exception:
> {code:java}
> java.lang.RuntimeException: Error while applying rule 
> StreamExecGroupWindowAggregateRule(in:LOGICAL,out:STREAM_PHYSICAL), args 
> [rel#308:FlinkLogicalWindowAggregate.LOGICAL.any.None: 
> 0.false.UNKNOWN(input=RelSubset#307,group={},EXPR$0=SUM($1),window=TumblingGroupWindow('w$,
>  int, 1),properties=)]java.lang.RuntimeException: Error while applying 
> rule StreamExecGroupWindowAggregateRule(in:LOGICAL,out:STREAM_PHYSICAL), args 
> [rel#308:FlinkLogicalWindowAggregate.LOGICAL.any.None: 
> 0.false.UNKNOWN(input=RelSubset#307,group={},EXPR$0=SUM($1),window=TumblingGroupWindow('w$,
>  int, 1),properties=)]
>  at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
>  at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:631)
>  at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327) 
> at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>  at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:891) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at 
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at 
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>  at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:167)
>  at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:89)
>  at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>  at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>  at 
> 

[jira] [Commented] (FLINK-15494) time field index wrong in LogicalWindowAggregateRuleBase

2020-01-31 Thread Yu Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17027971#comment-17027971
 ] 

Yu Li commented on FLINK-15494:
---

Thanks for pointing this issue out in RC1 voting [~libenchao].

According to the current priority, this seems more like a nice-to-fix one for 
1.10.0 instead of a blocker. Could you double check and let us know the reason 
if you think we should escalate the priority to Blocker [~libenchao] [~jark]? 
Thanks.

If it's not a blocker but the PR could be merged with proper process (no rush 
please), it will be automatically included in the next RC. However, we won't 
wait for it to be merged to produce the next RC or release, FYI.

> time field index wrong in LogicalWindowAggregateRuleBase
> 
>
> Key: FLINK-15494
> URL: https://issues.apache.org/jira/browse/FLINK-15494
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.9.1, 1.10.0
>Reporter: Benchao Li
>Assignee: Benchao Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0, 1.9.3
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> When we use two time window in blink planner, will result in calculating 
> wrong time field index.
>  
> This has two phenomenon as far as I know:
>  # wrong index may has correct field name, and will pass the build, but has 
> wrong rowtime in runtime.
>  # wrong index has incorrect field name, and will not pass the build.
>  
> How to reproduce this problem:
> I added a unit test in `WindowAggregateITCase`:
> {code:java}
> @Test
> def testDoubleTumbleWindow(): Unit = {
>   val stream = failingDataSource(data)
> .assignTimestampsAndWatermarks(
>   new TimestampAndWatermarkWithOffset
> [(Long, Int, Double, Float, BigDecimal, String, String)](10L))
>   val table = stream.toTable(tEnv,
> 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string, 'name)
>   tEnv.registerTable("T1", table)
>   val sql =
> """
>   |SELECT SUM(cnt)
>   |FROM (
>   |  SELECT COUNT(1) AS cnt, TUMBLE_ROWTIME(rowtime, INTERVAL '10' 
> SECOND) AS ts
>   |  FROM T1
>   |  GROUP BY `int`, `string`, TUMBLE(rowtime, INTERVAL '10' SECOND)
>   |)
>   |GROUP BY TUMBLE(ts, INTERVAL '10' SECOND)
>   |""".stripMargin
>   val sink = new TestingAppendSink
>   tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
>   env.execute()
>   val expected = Seq("9")
>   assertEquals(expected.sorted, sink.getAppendResults.sorted)
> }
> {code}
> which will result in exception:
> {code:java}
> java.lang.RuntimeException: Error while applying rule 
> StreamExecGroupWindowAggregateRule(in:LOGICAL,out:STREAM_PHYSICAL), args 
> [rel#308:FlinkLogicalWindowAggregate.LOGICAL.any.None: 
> 0.false.UNKNOWN(input=RelSubset#307,group={},EXPR$0=SUM($1),window=TumblingGroupWindow('w$,
>  int, 1),properties=)]java.lang.RuntimeException: Error while applying 
> rule StreamExecGroupWindowAggregateRule(in:LOGICAL,out:STREAM_PHYSICAL), args 
> [rel#308:FlinkLogicalWindowAggregate.LOGICAL.any.None: 
> 0.false.UNKNOWN(input=RelSubset#307,group={},EXPR$0=SUM($1),window=TumblingGroupWindow('w$,
>  int, 1),properties=)]
>  at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
>  at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:631)
>  at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327) 
> at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>  at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:891) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at 
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at 
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>  at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:167)
>  at 
> 

[jira] [Commented] (FLINK-15494) time field index wrong in LogicalWindowAggregateRuleBase

2020-01-06 Thread Benchao Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17009414#comment-17009414
 ] 

Benchao Li commented on FLINK-15494:


[~jark] Yes. I have fixed this internally, and would like to contribute it to 
community. 

> time field index wrong in LogicalWindowAggregateRuleBase
> 
>
> Key: FLINK-15494
> URL: https://issues.apache.org/jira/browse/FLINK-15494
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.9.1, 1.10.0
>Reporter: Benchao Li
>Priority: Major
> Fix For: 1.9.2, 1.10.0
>
>
> When we use two time window in blink planner, will result in calculating 
> wrong time field index.
>  
> This has two phenomenon as far as I know:
>  # wrong index may has correct field name, and will pass the build, but has 
> wrong rowtime in runtime.
>  # wrong index has incorrect field name, and will not pass the build.
>  
> How to reproduce this problem:
> I added a unit test in `WindowAggregateITCase`:
> {code:java}
> @Test
> def testDoubleTumbleWindow(): Unit = {
>   val stream = failingDataSource(data)
> .assignTimestampsAndWatermarks(
>   new TimestampAndWatermarkWithOffset
> [(Long, Int, Double, Float, BigDecimal, String, String)](10L))
>   val table = stream.toTable(tEnv,
> 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string, 'name)
>   tEnv.registerTable("T1", table)
>   val sql =
> """
>   |SELECT SUM(cnt)
>   |FROM (
>   |  SELECT COUNT(1) AS cnt, TUMBLE_ROWTIME(rowtime, INTERVAL '10' 
> SECOND) AS ts
>   |  FROM T1
>   |  GROUP BY `int`, `string`, TUMBLE(rowtime, INTERVAL '10' SECOND)
>   |)
>   |GROUP BY TUMBLE(ts, INTERVAL '10' SECOND)
>   |""".stripMargin
>   val sink = new TestingAppendSink
>   tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
>   env.execute()
>   val expected = Seq("9")
>   assertEquals(expected.sorted, sink.getAppendResults.sorted)
> }
> {code}
> which will result in exception:
> {code:java}
> java.lang.RuntimeException: Error while applying rule 
> StreamExecGroupWindowAggregateRule(in:LOGICAL,out:STREAM_PHYSICAL), args 
> [rel#308:FlinkLogicalWindowAggregate.LOGICAL.any.None: 
> 0.false.UNKNOWN(input=RelSubset#307,group={},EXPR$0=SUM($1),window=TumblingGroupWindow('w$,
>  int, 1),properties=)]java.lang.RuntimeException: Error while applying 
> rule StreamExecGroupWindowAggregateRule(in:LOGICAL,out:STREAM_PHYSICAL), args 
> [rel#308:FlinkLogicalWindowAggregate.LOGICAL.any.None: 
> 0.false.UNKNOWN(input=RelSubset#307,group={},EXPR$0=SUM($1),window=TumblingGroupWindow('w$,
>  int, 1),properties=)]
>  at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
>  at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:631)
>  at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327) 
> at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>  at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:891) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at 
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at 
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>  at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:167)
>  at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:89)
>  at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>  at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>  at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>  at 
> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:210)
>  at 
> 

[jira] [Commented] (FLINK-15494) time field index wrong in LogicalWindowAggregateRuleBase

2020-01-06 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17009410#comment-17009410
 ] 

Jark Wu commented on FLINK-15494:
-

Thanks for reporting this [~libenchao]. Would like to contribute the fix?

> time field index wrong in LogicalWindowAggregateRuleBase
> 
>
> Key: FLINK-15494
> URL: https://issues.apache.org/jira/browse/FLINK-15494
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.9.1, 1.10.0
>Reporter: Benchao Li
>Priority: Major
> Fix For: 1.9.2, 1.10.0
>
>
> When we use two time window in blink planner, will result in calculating 
> wrong time field index.
>  
> This has two phenomenon as far as I know:
>  # wrong index may has correct field name, and will pass the build, but has 
> wrong rowtime in runtime.
>  # wrong index has incorrect field name, and will not pass the build.
>  
> How to reproduce this problem:
> I added a unit test in `WindowAggregateITCase`:
> {code:java}
> @Test
> def testDoubleTumbleWindow(): Unit = {
>   val stream = failingDataSource(data)
> .assignTimestampsAndWatermarks(
>   new TimestampAndWatermarkWithOffset
> [(Long, Int, Double, Float, BigDecimal, String, String)](10L))
>   val table = stream.toTable(tEnv,
> 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string, 'name)
>   tEnv.registerTable("T1", table)
>   val sql =
> """
>   |SELECT SUM(cnt)
>   |FROM (
>   |  SELECT COUNT(1) AS cnt, TUMBLE_ROWTIME(rowtime, INTERVAL '10' 
> SECOND) AS ts
>   |  FROM T1
>   |  GROUP BY `int`, `string`, TUMBLE(rowtime, INTERVAL '10' SECOND)
>   |)
>   |GROUP BY TUMBLE(ts, INTERVAL '10' SECOND)
>   |""".stripMargin
>   val sink = new TestingAppendSink
>   tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
>   env.execute()
>   val expected = Seq("9")
>   assertEquals(expected.sorted, sink.getAppendResults.sorted)
> }
> {code}
> which will result in exception:
> {code:java}
> java.lang.RuntimeException: Error while applying rule 
> StreamExecGroupWindowAggregateRule(in:LOGICAL,out:STREAM_PHYSICAL), args 
> [rel#308:FlinkLogicalWindowAggregate.LOGICAL.any.None: 
> 0.false.UNKNOWN(input=RelSubset#307,group={},EXPR$0=SUM($1),window=TumblingGroupWindow('w$,
>  int, 1),properties=)]java.lang.RuntimeException: Error while applying 
> rule StreamExecGroupWindowAggregateRule(in:LOGICAL,out:STREAM_PHYSICAL), args 
> [rel#308:FlinkLogicalWindowAggregate.LOGICAL.any.None: 
> 0.false.UNKNOWN(input=RelSubset#307,group={},EXPR$0=SUM($1),window=TumblingGroupWindow('w$,
>  int, 1),properties=)]
>  at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
>  at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:631)
>  at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327) 
> at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>  at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:891) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at 
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at 
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>  at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:167)
>  at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:89)
>  at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>  at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>  at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>  at 
> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:210)
>  at 
>