[jira] [Commented] (FLINK-6368) Grouping keys in stream aggregations have wrong order
[ https://issues.apache.org/jira/browse/FLINK-6368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983234#comment-15983234 ] ASF GitHub Bot commented on FLINK-6368: --- Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3768 I ran into the same problem today when adding the new test cases for UDAGG. Thanks for the fix, @xccui > Grouping keys in stream aggregations have wrong order > - > > Key: FLINK-6368 > URL: https://issues.apache.org/jira/browse/FLINK-6368 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Xingcan Cui > Fix For: 1.3.0 > > > FLINK-5768 removed the `AggregateUtil.createPrepareMapFunction` stage. It > seems that the order of grouping keys is sometimes messed up. The following > tests fails: > {code} > @Test > def testEventTimeSlidingGroupWindow(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > val tEnv = TableEnvironment.getTableEnvironment(env) > StreamITCase.testResults = mutable.MutableList() > val stream = env > .fromCollection(data) > .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) > .map(t => (t._2, t._6)) > val table = stream.toTable(tEnv, 'int, 'string) > val windowedTable = table > .window(Slide over 10.milli every 5.milli on 'rowtime as 'w) > .groupBy('w, 'string) > .select('string, 'int.count, 'w.start, 'w.end) > val results = windowedTable.toDataStream[Row] > results.addSink(new StreamITCase.StringSink) > env.execute() > } > {code} > Exception: > {code} > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:532) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:505) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:485) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:871) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:849) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:50) > at > org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:29) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.scala:74) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.scala:64) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.scala:35) > at > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:45) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:598) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:505) > at > org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:276) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:119) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:940) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288) > ... 7 more > Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to > java.lang.String > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6368) Grouping keys in stream aggregations have wrong order
[ https://issues.apache.org/jira/browse/FLINK-6368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15982958#comment-15982958 ] ASF GitHub Bot commented on FLINK-6368: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3768 > Grouping keys in stream aggregations have wrong order > - > > Key: FLINK-6368 > URL: https://issues.apache.org/jira/browse/FLINK-6368 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Xingcan Cui > Fix For: 1.3.0 > > > FLINK-5768 removed the `AggregateUtil.createPrepareMapFunction` stage. It > seems that the order of grouping keys is sometimes messed up. The following > tests fails: > {code} > @Test > def testEventTimeSlidingGroupWindow(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > val tEnv = TableEnvironment.getTableEnvironment(env) > StreamITCase.testResults = mutable.MutableList() > val stream = env > .fromCollection(data) > .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) > .map(t => (t._2, t._6)) > val table = stream.toTable(tEnv, 'int, 'string) > val windowedTable = table > .window(Slide over 10.milli every 5.milli on 'rowtime as 'w) > .groupBy('w, 'string) > .select('string, 'int.count, 'w.start, 'w.end) > val results = windowedTable.toDataStream[Row] > results.addSink(new StreamITCase.StringSink) > env.execute() > } > {code} > Exception: > {code} > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:532) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:505) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:485) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:871) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:849) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:50) > at > org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:29) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.scala:74) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.scala:64) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.scala:35) > at > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:45) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:598) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:505) > at > org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:276) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:119) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:940) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288) > ... 7 more > Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to > java.lang.String > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6368) Grouping keys in stream aggregations have wrong order
[ https://issues.apache.org/jira/browse/FLINK-6368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15982943#comment-15982943 ] ASF GitHub Bot commented on FLINK-6368: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3768 Merging... > Grouping keys in stream aggregations have wrong order > - > > Key: FLINK-6368 > URL: https://issues.apache.org/jira/browse/FLINK-6368 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Xingcan Cui > > FLINK-5768 removed the `AggregateUtil.createPrepareMapFunction` stage. It > seems that the order of grouping keys is sometimes messed up. The following > tests fails: > {code} > @Test > def testEventTimeSlidingGroupWindow(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > val tEnv = TableEnvironment.getTableEnvironment(env) > StreamITCase.testResults = mutable.MutableList() > val stream = env > .fromCollection(data) > .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) > .map(t => (t._2, t._6)) > val table = stream.toTable(tEnv, 'int, 'string) > val windowedTable = table > .window(Slide over 10.milli every 5.milli on 'rowtime as 'w) > .groupBy('w, 'string) > .select('string, 'int.count, 'w.start, 'w.end) > val results = windowedTable.toDataStream[Row] > results.addSink(new StreamITCase.StringSink) > env.execute() > } > {code} > Exception: > {code} > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:532) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:505) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:485) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:871) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:849) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:50) > at > org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:29) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.scala:74) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.scala:64) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.scala:35) > at > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:45) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:598) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:505) > at > org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:276) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:119) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:940) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288) > ... 7 more > Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to > java.lang.String > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6368) Grouping keys in stream aggregations have wrong order
[ https://issues.apache.org/jira/browse/FLINK-6368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15982555#comment-15982555 ] ASF GitHub Bot commented on FLINK-6368: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3768 Thanks for fixing this so quickly @xccui! I will test this fix with my branch and merge this later today. > Grouping keys in stream aggregations have wrong order > - > > Key: FLINK-6368 > URL: https://issues.apache.org/jira/browse/FLINK-6368 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Xingcan Cui > > FLINK-5768 removed the `AggregateUtil.createPrepareMapFunction` stage. It > seems that the order of grouping keys is sometimes messed up. The following > tests fails: > {code} > @Test > def testEventTimeSlidingGroupWindow(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > val tEnv = TableEnvironment.getTableEnvironment(env) > StreamITCase.testResults = mutable.MutableList() > val stream = env > .fromCollection(data) > .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) > .map(t => (t._2, t._6)) > val table = stream.toTable(tEnv, 'int, 'string) > val windowedTable = table > .window(Slide over 10.milli every 5.milli on 'rowtime as 'w) > .groupBy('w, 'string) > .select('string, 'int.count, 'w.start, 'w.end) > val results = windowedTable.toDataStream[Row] > results.addSink(new StreamITCase.StringSink) > env.execute() > } > {code} > Exception: > {code} > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:532) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:505) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:485) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:871) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:849) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:50) > at > org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:29) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.scala:74) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.scala:64) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.scala:35) > at > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:45) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:598) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:505) > at > org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:276) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:119) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:940) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288) > ... 7 more > Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to > java.lang.String > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6368) Grouping keys in stream aggregations have wrong order
[ https://issues.apache.org/jira/browse/FLINK-6368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15982523#comment-15982523 ] ASF GitHub Bot commented on FLINK-6368: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3768 Thanks for the fix @xccui! +1 to merge > Grouping keys in stream aggregations have wrong order > - > > Key: FLINK-6368 > URL: https://issues.apache.org/jira/browse/FLINK-6368 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Xingcan Cui > > FLINK-5768 removed the `AggregateUtil.createPrepareMapFunction` stage. It > seems that the order of grouping keys is sometimes messed up. The following > tests fails: > {code} > @Test > def testEventTimeSlidingGroupWindow(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > val tEnv = TableEnvironment.getTableEnvironment(env) > StreamITCase.testResults = mutable.MutableList() > val stream = env > .fromCollection(data) > .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) > .map(t => (t._2, t._6)) > val table = stream.toTable(tEnv, 'int, 'string) > val windowedTable = table > .window(Slide over 10.milli every 5.milli on 'rowtime as 'w) > .groupBy('w, 'string) > .select('string, 'int.count, 'w.start, 'w.end) > val results = windowedTable.toDataStream[Row] > results.addSink(new StreamITCase.StringSink) > env.execute() > } > {code} > Exception: > {code} > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:532) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:505) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:485) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:871) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:849) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:50) > at > org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:29) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.scala:74) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.scala:64) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.scala:35) > at > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:45) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:598) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:505) > at > org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:276) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:119) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:940) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288) > ... 7 more > Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to > java.lang.String > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6368) Grouping keys in stream aggregations have wrong order
[ https://issues.apache.org/jira/browse/FLINK-6368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15982497#comment-15982497 ] ASF GitHub Bot commented on FLINK-6368: --- GitHub user xccui opened a pull request: https://github.com/apache/flink/pull/3768 [FLINK-6368][table] Grouping keys in stream aggregations have wrong order ​FLINK-5768 removed the `AggregateUtil.createPrepareMapFunction` stage, who maps all grouping keys to the first n fields of a record. That's why in old versions we generated new shifted grouping keys (`val groupingKeys = grouping.indices.toArray`) by the original keys' indices. Now that the mapping has been removed, we should use the original grouping keys rather than the shifted keys. Also, a test method posted in https://issues.apache.org/jira/browse/FLINK-6368 is added to DataStreamAggregateITCase. You can merge this pull request into a Git repository by running: $ git pull https://github.com/xccui/flink FLINK-6368 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3768.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3768 commit ed03570d9bfa52e634de5a13b3425a5fd21fe6c8 Author: xccuiDate: 2017-04-25T06:06:45Z [FLINK-6368] Fix the wrong ordered keys problem > Grouping keys in stream aggregations have wrong order > - > > Key: FLINK-6368 > URL: https://issues.apache.org/jira/browse/FLINK-6368 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Xingcan Cui > > FLINK-5768 removed the `AggregateUtil.createPrepareMapFunction` stage. It > seems that the order of grouping keys is sometimes messed up. The following > tests fails: > {code} > @Test > def testEventTimeSlidingGroupWindow(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > val tEnv = TableEnvironment.getTableEnvironment(env) > StreamITCase.testResults = mutable.MutableList() > val stream = env > .fromCollection(data) > .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) > .map(t => (t._2, t._6)) > val table = stream.toTable(tEnv, 'int, 'string) > val windowedTable = table > .window(Slide over 10.milli every 5.milli on 'rowtime as 'w) > .groupBy('w, 'string) > .select('string, 'int.count, 'w.start, 'w.end) > val results = windowedTable.toDataStream[Row] > results.addSink(new StreamITCase.StringSink) > env.execute() > } > {code} > Exception: > {code} > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:532) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:505) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:485) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:871) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:849) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:50) > at > org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:29) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.scala:74) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.scala:64) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.scala:35) > at > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:45) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:598) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:505) > at >