[jira] [Commented] (FLINK-6368) Grouping keys in stream aggregations have wrong order

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983234#comment-15983234
 ] 

ASF GitHub Bot commented on FLINK-6368:
---

Github user shaoxuan-wang commented on the issue:

https://github.com/apache/flink/pull/3768
  
I ran into the same problem today when adding the new test cases for UDAGG. 
Thanks for the fix, @xccui 


> Grouping keys in stream aggregations have wrong order
> -
>
> Key: FLINK-6368
> URL: https://issues.apache.org/jira/browse/FLINK-6368
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
> Fix For: 1.3.0
>
>
> FLINK-5768 removed the `AggregateUtil.createPrepareMapFunction` stage. It 
> seems that the order of grouping keys is sometimes messed up. The following 
> tests fails:
> {code}
>   @Test
>   def testEventTimeSlidingGroupWindow(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val tEnv = TableEnvironment.getTableEnvironment(env)
> StreamITCase.testResults = mutable.MutableList()
> val stream = env
>   .fromCollection(data)
>   .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
>   .map(t => (t._2, t._6))
> val table = stream.toTable(tEnv, 'int, 'string)
> val windowedTable = table
>   .window(Slide over 10.milli every 5.milli on 'rowtime as 'w)
>   .groupBy('w, 'string)
>   .select('string, 'int.count, 'w.start, 'w.end)
> val results = windowedTable.toDataStream[Row]
> results.addSink(new StreamITCase.StringSink)
> env.execute()
>   }
> {code}
> Exception:
> {code}
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:532)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:505)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:485)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:871)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:849)
>   at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>   at 
> org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:50)
>   at 
> org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:29)
>   at 
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.scala:74)
>   at 
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.scala:64)
>   at 
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.scala:35)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:45)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:598)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:505)
>   at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:276)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:119)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:940)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288)
>   ... 7 more
> Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to 
> java.lang.String
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6368) Grouping keys in stream aggregations have wrong order

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15982958#comment-15982958
 ] 

ASF GitHub Bot commented on FLINK-6368:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3768


> Grouping keys in stream aggregations have wrong order
> -
>
> Key: FLINK-6368
> URL: https://issues.apache.org/jira/browse/FLINK-6368
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
> Fix For: 1.3.0
>
>
> FLINK-5768 removed the `AggregateUtil.createPrepareMapFunction` stage. It 
> seems that the order of grouping keys is sometimes messed up. The following 
> tests fails:
> {code}
>   @Test
>   def testEventTimeSlidingGroupWindow(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val tEnv = TableEnvironment.getTableEnvironment(env)
> StreamITCase.testResults = mutable.MutableList()
> val stream = env
>   .fromCollection(data)
>   .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
>   .map(t => (t._2, t._6))
> val table = stream.toTable(tEnv, 'int, 'string)
> val windowedTable = table
>   .window(Slide over 10.milli every 5.milli on 'rowtime as 'w)
>   .groupBy('w, 'string)
>   .select('string, 'int.count, 'w.start, 'w.end)
> val results = windowedTable.toDataStream[Row]
> results.addSink(new StreamITCase.StringSink)
> env.execute()
>   }
> {code}
> Exception:
> {code}
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:532)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:505)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:485)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:871)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:849)
>   at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>   at 
> org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:50)
>   at 
> org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:29)
>   at 
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.scala:74)
>   at 
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.scala:64)
>   at 
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.scala:35)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:45)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:598)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:505)
>   at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:276)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:119)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:940)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288)
>   ... 7 more
> Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to 
> java.lang.String
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6368) Grouping keys in stream aggregations have wrong order

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15982943#comment-15982943
 ] 

ASF GitHub Bot commented on FLINK-6368:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/3768
  
Merging...


> Grouping keys in stream aggregations have wrong order
> -
>
> Key: FLINK-6368
> URL: https://issues.apache.org/jira/browse/FLINK-6368
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>
> FLINK-5768 removed the `AggregateUtil.createPrepareMapFunction` stage. It 
> seems that the order of grouping keys is sometimes messed up. The following 
> tests fails:
> {code}
>   @Test
>   def testEventTimeSlidingGroupWindow(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val tEnv = TableEnvironment.getTableEnvironment(env)
> StreamITCase.testResults = mutable.MutableList()
> val stream = env
>   .fromCollection(data)
>   .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
>   .map(t => (t._2, t._6))
> val table = stream.toTable(tEnv, 'int, 'string)
> val windowedTable = table
>   .window(Slide over 10.milli every 5.milli on 'rowtime as 'w)
>   .groupBy('w, 'string)
>   .select('string, 'int.count, 'w.start, 'w.end)
> val results = windowedTable.toDataStream[Row]
> results.addSink(new StreamITCase.StringSink)
> env.execute()
>   }
> {code}
> Exception:
> {code}
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:532)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:505)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:485)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:871)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:849)
>   at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>   at 
> org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:50)
>   at 
> org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:29)
>   at 
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.scala:74)
>   at 
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.scala:64)
>   at 
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.scala:35)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:45)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:598)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:505)
>   at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:276)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:119)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:940)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288)
>   ... 7 more
> Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to 
> java.lang.String
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6368) Grouping keys in stream aggregations have wrong order

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15982555#comment-15982555
 ] 

ASF GitHub Bot commented on FLINK-6368:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/3768
  
Thanks for fixing this so quickly @xccui! I will test this fix with my 
branch and merge this later today.


> Grouping keys in stream aggregations have wrong order
> -
>
> Key: FLINK-6368
> URL: https://issues.apache.org/jira/browse/FLINK-6368
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>
> FLINK-5768 removed the `AggregateUtil.createPrepareMapFunction` stage. It 
> seems that the order of grouping keys is sometimes messed up. The following 
> tests fails:
> {code}
>   @Test
>   def testEventTimeSlidingGroupWindow(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val tEnv = TableEnvironment.getTableEnvironment(env)
> StreamITCase.testResults = mutable.MutableList()
> val stream = env
>   .fromCollection(data)
>   .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
>   .map(t => (t._2, t._6))
> val table = stream.toTable(tEnv, 'int, 'string)
> val windowedTable = table
>   .window(Slide over 10.milli every 5.milli on 'rowtime as 'w)
>   .groupBy('w, 'string)
>   .select('string, 'int.count, 'w.start, 'w.end)
> val results = windowedTable.toDataStream[Row]
> results.addSink(new StreamITCase.StringSink)
> env.execute()
>   }
> {code}
> Exception:
> {code}
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:532)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:505)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:485)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:871)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:849)
>   at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>   at 
> org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:50)
>   at 
> org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:29)
>   at 
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.scala:74)
>   at 
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.scala:64)
>   at 
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.scala:35)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:45)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:598)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:505)
>   at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:276)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:119)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:940)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288)
>   ... 7 more
> Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to 
> java.lang.String
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6368) Grouping keys in stream aggregations have wrong order

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15982523#comment-15982523
 ] 

ASF GitHub Bot commented on FLINK-6368:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3768
  
Thanks for the fix @xccui!

+1 to merge


> Grouping keys in stream aggregations have wrong order
> -
>
> Key: FLINK-6368
> URL: https://issues.apache.org/jira/browse/FLINK-6368
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>
> FLINK-5768 removed the `AggregateUtil.createPrepareMapFunction` stage. It 
> seems that the order of grouping keys is sometimes messed up. The following 
> tests fails:
> {code}
>   @Test
>   def testEventTimeSlidingGroupWindow(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val tEnv = TableEnvironment.getTableEnvironment(env)
> StreamITCase.testResults = mutable.MutableList()
> val stream = env
>   .fromCollection(data)
>   .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
>   .map(t => (t._2, t._6))
> val table = stream.toTable(tEnv, 'int, 'string)
> val windowedTable = table
>   .window(Slide over 10.milli every 5.milli on 'rowtime as 'w)
>   .groupBy('w, 'string)
>   .select('string, 'int.count, 'w.start, 'w.end)
> val results = windowedTable.toDataStream[Row]
> results.addSink(new StreamITCase.StringSink)
> env.execute()
>   }
> {code}
> Exception:
> {code}
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:532)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:505)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:485)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:871)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:849)
>   at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>   at 
> org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:50)
>   at 
> org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:29)
>   at 
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.scala:74)
>   at 
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.scala:64)
>   at 
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.scala:35)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:45)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:598)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:505)
>   at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:276)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:119)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:940)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288)
>   ... 7 more
> Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to 
> java.lang.String
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6368) Grouping keys in stream aggregations have wrong order

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15982497#comment-15982497
 ] 

ASF GitHub Bot commented on FLINK-6368:
---

GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/3768

[FLINK-6368][table] Grouping keys in stream aggregations have wrong order

​FLINK-5768 removed the `AggregateUtil.createPrepareMapFunction` stage, who 
maps all grouping keys to the first n fields of a record. That's why in old 
versions we generated new shifted grouping keys (`val groupingKeys = 
grouping.indices.toArray`) by the original keys' indices. Now that the mapping 
has been removed, we should use the original grouping keys rather than the 
shifted keys. Also, a test method posted in 
https://issues.apache.org/jira/browse/FLINK-6368 is added to 
DataStreamAggregateITCase.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink FLINK-6368

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3768.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3768


commit ed03570d9bfa52e634de5a13b3425a5fd21fe6c8
Author: xccui 
Date:   2017-04-25T06:06:45Z

[FLINK-6368] Fix the wrong ordered keys problem




> Grouping keys in stream aggregations have wrong order
> -
>
> Key: FLINK-6368
> URL: https://issues.apache.org/jira/browse/FLINK-6368
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>
> FLINK-5768 removed the `AggregateUtil.createPrepareMapFunction` stage. It 
> seems that the order of grouping keys is sometimes messed up. The following 
> tests fails:
> {code}
>   @Test
>   def testEventTimeSlidingGroupWindow(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val tEnv = TableEnvironment.getTableEnvironment(env)
> StreamITCase.testResults = mutable.MutableList()
> val stream = env
>   .fromCollection(data)
>   .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
>   .map(t => (t._2, t._6))
> val table = stream.toTable(tEnv, 'int, 'string)
> val windowedTable = table
>   .window(Slide over 10.milli every 5.milli on 'rowtime as 'w)
>   .groupBy('w, 'string)
>   .select('string, 'int.count, 'w.start, 'w.end)
> val results = windowedTable.toDataStream[Row]
> results.addSink(new StreamITCase.StringSink)
> env.execute()
>   }
> {code}
> Exception:
> {code}
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:532)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:505)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:485)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:871)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:849)
>   at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>   at 
> org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:50)
>   at 
> org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:29)
>   at 
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.scala:74)
>   at 
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.scala:64)
>   at 
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.scala:35)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:45)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:598)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:505)
>   at 
>