[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985209#comment-15985209
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user stefanobortoli closed the pull request at:

https://github.com/apache/flink/pull/3771


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985208#comment-15985208
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user huawei-flink commented on the issue:

https://github.com/apache/flink/pull/3771
  
@fhueske I have created #3783 with just the code generation part. At least 
the GROUP BY distinct can move ahead. I will close this PR and wait for the 
merging of the Calcite fix.


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985015#comment-15985015
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user huawei-flink commented on the issue:

https://github.com/apache/flink/pull/3771
  
So, what do you want me to keep for this PR? just the code generation and 
its test?


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985003#comment-15985003
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3771
  
If we merge this change together with #3764, we would check it with tests 
that use distinct grouped window aggregates. 
But you are right, it might in fact make sense to test the 
`GeneratedAggregations` class generated by the `CodeGenerator` individually in 
a unit test. I wouldn't make this as part of this PR though.


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984997#comment-15984997
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3771#discussion_r113483437
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -335,14 +371,28 @@ class CodeGenerator(
 j"""
 |  public final void accumulate(
 |org.apache.flink.types.Row accs,
-|org.apache.flink.types.Row input)""".stripMargin
+|org.apache.flink.types.Row input) throws 
Exception""".stripMargin
 
   val accumulate: String = {
 for (i <- aggs.indices) yield
-  j"""
+ if(distinctAggsFlags(i)){
+   j"""
+  |  Long distValCount$i = (Long) 
distStateList[$i].get(${parameters(i)});
+  |  if( distValCount$i == null){
--- End diff --

Ah, that makes sense. Scala `Long` is backed by Java `long`, i.e., it does 
not support `null`.


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984939#comment-15984939
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3771#discussion_r113472166
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
 ---
@@ -72,7 +73,15 @@ class ProcTimeBoundedRowsOver(
   genAggregations.code)
 LOG.debug("Instantiating AggregateHelper.")
 function = clazz.newInstance()
-
+
+var initialized = false
+for(i <- distinctAggFlags.indices){
+  if(distinctAggFlags(i) && !initialized){
+function.initialize(getRuntimeContext())
--- End diff --

right!


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984933#comment-15984933
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3771#discussion_r113471605
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -335,14 +371,28 @@ class CodeGenerator(
 j"""
 |  public final void accumulate(
 |org.apache.flink.types.Row accs,
-|org.apache.flink.types.Row input)""".stripMargin
+|org.apache.flink.types.Row input) throws 
Exception""".stripMargin
 
   val accumulate: String = {
 for (i <- aggs.indices) yield
-  j"""
+ if(distinctAggsFlags(i)){
+   j"""
+  |  Long distValCount$i = (Long) 
distStateList[$i].get(${parameters(i)});
+  |  if( distValCount$i == null){
--- End diff --

In scala it is 0L, is Java it is null. :-/


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984929#comment-15984929
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user huawei-flink commented on the issue:

https://github.com/apache/flink/pull/3771
  
@fhueske @haohui I have no problem removing the DIST() part, it is just not 
possible to test it without. Shall I push just the code generation and 
aggregates util changes?


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984926#comment-15984926
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user huawei-flink closed the pull request at:

https://github.com/apache/flink/pull/3732


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984937#comment-15984937
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3771#discussion_r113472029
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
 ---
@@ -20,16 +20,22 @@ package org.apache.flink.table.runtime.aggregate
 
 import org.apache.flink.api.common.functions.Function
 import org.apache.flink.types.Row
+import org.apache.flink.api.common.functions.RuntimeContext
 
 /**
   * Base class for code-generated aggregations.
   */
 abstract class GeneratedAggregations extends Function {
+  
+  /**
+* Initialize the state for the distinct aggregation check
+*
+* @param ctx the runtime context to retrieve and initialize the 
distinct states
+*/
+  def initialize(ctx: RuntimeContext)
 
   /**
-* Sets the results of the aggregations (partial or final) to the 
output row.
--- End diff --

probably an error in the merging. sorry about that


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984931#comment-15984931
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3771#discussion_r113471045
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -296,6 +297,41 @@ class CodeGenerator(
   fields.mkString(", ")
 }
 
+def genInitialize(existDistinct : Boolean): String = {
+  
+  val sig: String = 
+j"""
+   | org.apache.flink.api.common.state.MapState[] distStateList =
+   |   new org.apache.flink.api.common.state.MapState[ 
${distinctAggsFlags.size} ];
+   | 
+   |  public void initialize(
+   |org.apache.flink.api.common.functions.RuntimeContext ctx
+   |  )""".stripMargin
+  if(existDistinct){
--- End diff --

you are right


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984794#comment-15984794
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3732
  
Since PR #3771 is the follow up of this PR, could you close this one? Thanks


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984783#comment-15984783
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3771#discussion_r113446823
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
 ---
@@ -72,7 +73,15 @@ class ProcTimeBoundedRowsOver(
   genAggregations.code)
 LOG.debug("Instantiating AggregateHelper.")
 function = clazz.newInstance()
-
+
+var initialized = false
+for(i <- distinctAggFlags.indices){
+  if(distinctAggFlags(i) && !initialized){
+function.initialize(getRuntimeContext())
--- End diff --

I would always call `function.initialize()`. If there are no distinct 
aggregates, nothing will happen


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984782#comment-15984782
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3771#discussion_r113446166
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -290,6 +295,10 @@ object AggregateUtil {
 val aggMapping = aggregates.indices.toArray.map(_ + groupings.length)
 val outputArity = aggregates.length + groupings.length + 1
 
+// remove when distinct is supported
+val distinctAggregatesFlags = new Array[Boolean](aggregates.size)
--- End diff --

`DataSet` has it's own way to deal with distinct aggregates. They could 
also not initialize the state. So it is save to call `generateAggregations()` 
with an empty array: `Array[Boolean]()` or we make the parameter an 
`Option[Array[Boolean]]` and pass `None`


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984780#comment-15984780
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3771#discussion_r113442825
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -335,14 +371,28 @@ class CodeGenerator(
 j"""
 |  public final void accumulate(
 |org.apache.flink.types.Row accs,
-|org.apache.flink.types.Row input)""".stripMargin
+|org.apache.flink.types.Row input) throws 
Exception""".stripMargin
 
   val accumulate: String = {
 for (i <- aggs.indices) yield
-  j"""
+ if(distinctAggsFlags(i)){
+   j"""
+  |  Long distValCount$i = (Long) 
distStateList[$i].get(${parameters(i)});
+  |  if( distValCount$i == null){
+  |${aggs(i)}.accumulate(
+  |  ((${accTypes(i)}) accs.getField($i)),
+  |  ${parameters(i)});
+  |distValCount$i = 0L;
+  |  }
+  |  distValCount$i += 1;
+  |  distStateList[$i].put(${parameters(i)}, distValCount$i);
+   """.stripMargin
+ }else {
--- End diff --

+space `} else {`


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984788#comment-15984788
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3771#discussion_r113442315
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -296,6 +297,41 @@ class CodeGenerator(
   fields.mkString(", ")
 }
 
+def genInitialize(existDistinct : Boolean): String = {
+  
+  val sig: String = 
+j"""
+   | org.apache.flink.api.common.state.MapState[] distStateList =
--- End diff --

Reusable fields should be added with `reusableMemberStatements.add()`.
(Same should be done for the Iterables in `genMergeList()`)


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984787#comment-15984787
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3771#discussion_r113446347
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -290,6 +295,10 @@ object AggregateUtil {
 val aggMapping = aggregates.indices.toArray.map(_ + groupings.length)
 val outputArity = aggregates.length + groupings.length + 1
 
+// remove when distinct is supported
+val distinctAggregatesFlags = new Array[Boolean](aggregates.size)
--- End diff --

The same applies for all DataSet aggregations.


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984778#comment-15984778
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3771#discussion_r113443027
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -296,6 +297,41 @@ class CodeGenerator(
   fields.mkString(", ")
 }
 
+def genInitialize(existDistinct : Boolean): String = {
+  
+  val sig: String = 
+j"""
+   | org.apache.flink.api.common.state.MapState[] distStateList =
+   |   new org.apache.flink.api.common.state.MapState[ 
${distinctAggsFlags.size} ];
+   | 
+   |  public void initialize(
+   |org.apache.flink.api.common.functions.RuntimeContext ctx
+   |  )""".stripMargin
+  if(existDistinct){
+  val initDist: String = {
+  for(i <- distinctAggsFlags.indices) yield
+if( distinctAggsFlags(i)) {
+  j"""
+ | 
+ | 
org.apache.flink.api.common.state.MapStateDescriptor distDesc$i =
+ |   new 
org.apache.flink.api.common.state.MapStateDescriptor(
+ | "distinctValuesBufferMapState" + $i,
+ | Object.class, Long.class);
+ | distStateList[$i] = ctx.getMapState( distDesc$i );
+  """.stripMargin
+} else {
+  ""
+}
+  }.mkString("\n")
+  
+  j"""$sig {
+ |  $initDist
+ |  }""".stripMargin
+  }else {
--- End diff --

+space


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984789#comment-15984789
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3771#discussion_r113446851
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
 ---
@@ -46,6 +46,7 @@ import org.slf4j.LoggerFactory
   */
 class ProcTimeBoundedRowsOver(
 genAggregations: GeneratedAggregationsFunction,
+distinctAggFlags: Array[Boolean],
--- End diff --

remove this parameter.


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984784#comment-15984784
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3771#discussion_r113446943
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala
 ---
@@ -88,6 +88,12 @@ class BoundedProcessingOverRangeProcessFunctionTest {
 |"mluZyRMb25nJOda0iCPo2ukAgAAeHA");
 |  }
 |
+|  public void initialize(
--- End diff --

can you make this a one-line change?


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984786#comment-15984786
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3771#discussion_r113446584
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
 ---
@@ -20,16 +20,22 @@ package org.apache.flink.table.runtime.aggregate
 
 import org.apache.flink.api.common.functions.Function
 import org.apache.flink.types.Row
+import org.apache.flink.api.common.functions.RuntimeContext
 
 /**
   * Base class for code-generated aggregations.
   */
 abstract class GeneratedAggregations extends Function {
+  
+  /**
+* Initialize the state for the distinct aggregation check
+*
+* @param ctx the runtime context to retrieve and initialize the 
distinct states
+*/
+  def initialize(ctx: RuntimeContext)
 
   /**
-* Sets the results of the aggregations (partial or final) to the 
output row.
--- End diff --

Revert this change?


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984779#comment-15984779
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3771#discussion_r113442648
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -296,6 +297,41 @@ class CodeGenerator(
   fields.mkString(", ")
 }
 
+def genInitialize(existDistinct : Boolean): String = {
+  
+  val sig: String = 
+j"""
+   | org.apache.flink.api.common.state.MapState[] distStateList =
+   |   new org.apache.flink.api.common.state.MapState[ 
${distinctAggsFlags.size} ];
+   | 
+   |  public void initialize(
+   |org.apache.flink.api.common.functions.RuntimeContext ctx
+   |  )""".stripMargin
+  if(existDistinct){
+  val initDist: String = {
+  for(i <- distinctAggsFlags.indices) yield
+if( distinctAggsFlags(i)) {
+  j"""
+ | 
+ | 
org.apache.flink.api.common.state.MapStateDescriptor distDesc$i =
--- End diff --

We should use the right type of the field instead of `Object`. With 
`Object`, we will use the GenericTypeSerializer instead of Flink's 
type-specific serializers.


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984781#comment-15984781
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3771#discussion_r113443910
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -335,14 +371,28 @@ class CodeGenerator(
 j"""
 |  public final void accumulate(
 |org.apache.flink.types.Row accs,
-|org.apache.flink.types.Row input)""".stripMargin
+|org.apache.flink.types.Row input) throws 
Exception""".stripMargin
 
   val accumulate: String = {
 for (i <- aggs.indices) yield
-  j"""
+ if(distinctAggsFlags(i)){
+   j"""
+  |  Long distValCount$i = (Long) 
distStateList[$i].get(${parameters(i)});
+  |  if( distValCount$i == null){
--- End diff --

Didn't you say that the return value is never `null` but `0` if not set?


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984777#comment-15984777
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3771#discussion_r113442944
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -335,14 +371,28 @@ class CodeGenerator(
 j"""
 |  public final void accumulate(
 |org.apache.flink.types.Row accs,
-|org.apache.flink.types.Row input)""".stripMargin
+|org.apache.flink.types.Row input) throws 
Exception""".stripMargin
 
   val accumulate: String = {
 for (i <- aggs.indices) yield
-  j"""
+ if(distinctAggsFlags(i)){
--- End diff --

+spaces `if (distinctAggsFlags(i)) {
`


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984785#comment-15984785
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3771#discussion_r113443466
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -296,6 +297,41 @@ class CodeGenerator(
   fields.mkString(", ")
 }
 
+def genInitialize(existDistinct : Boolean): String = {
+  
+  val sig: String = 
+j"""
+   | org.apache.flink.api.common.state.MapState[] distStateList =
+   |   new org.apache.flink.api.common.state.MapState[ 
${distinctAggsFlags.size} ];
+   | 
+   |  public void initialize(
+   |org.apache.flink.api.common.functions.RuntimeContext ctx
+   |  )""".stripMargin
+  if(existDistinct){
--- End diff --

I think this check and the `existDistinct` can be removed. If all fields 
are `false`, we won't generate anything.


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984727#comment-15984727
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3732
  
Thanks @stefanobortoli! I'll have a look at #3771


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984392#comment-15984392
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user stefanobortoli commented on the issue:

https://github.com/apache/flink/pull/3732
  
@fhueske @sunjincheng121 @shijinkui @hongyuhong  I have created a PR with 
the latest master with the code generated distinct, #3771 please have a look. 
If we it is fine, we can basically support distinct for all the window types


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983254#comment-15983254
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user stefanobortoli commented on the issue:

https://github.com/apache/flink/pull/3771
  
@fhueske @rtudoran @shijinkui @sunjincheng121 I have create a new PR for 
distinct in the code generator. Please have a look and let me know. I have 
implemented and tested only for OverProcTimeRowBounded window, but if you like 
it I can quickly implement and test also the others. 


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983252#comment-15983252
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

GitHub user stefanobortoli opened a pull request:

https://github.com/apache/flink/pull/3771

[FLINK-6250] Distinct procTime with Rows boundaries

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [X ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ X] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ X] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/stefanobortoli/flink FLINK-6250b

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3771.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3771






> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15982492#comment-15982492
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user rtudoran commented on the issue:

https://github.com/apache/flink/pull/3732
  
@fhueske @stefanobortoli  I suggest we merge this temporary solution into 
flink (with using a special marker for distinct) until the flink module will be 
upgraded to the next calcite release. I have fixed the issue into calcite. 
However, the advantages of pushing already this is that:
1) we can reuse the code
2) when we have the distinct marker we can simply modify the check for 
distinct for the aggregates in the DataStreamOver 


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978996#comment-15978996
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user stefanobortoli commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112725482
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UnsupportedOperatorsIndicatorFunctions.scala
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions
+
+import java.nio.charset.Charset
+import java.util.List
+
+import org.apache.calcite.rel.`type`._
+import org.apache.calcite.sql._
+import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes, 
SqlTypeFamily, SqlTypeName}
+import org.apache.calcite.sql.validate.SqlMonotonicity
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.expressions.LeafExpression
+import org.apache.calcite.sql.`type`.InferTypes
+import org.apache.calcite.sql.validate.SqlValidator
+import org.apache.calcite.sql.validate.SqlValidatorScope
+
+/**
+ * An SQL Function DISTINCT() used to mark the DISTINCT operator
+ * on aggregation input. This is temporary workaround waiting for 
+ * https://issues.apache.org/jira/browse/CALCITE-1740 being solved
+ */
+object DistinctAggregatorExtractor extends SqlFunction("DIST", 
SqlKind.OTHER_FUNCTION,
+  ReturnTypes.ARG0, InferTypes.RETURN_TYPE,
+  OperandTypes.NUMERIC, SqlFunctionCategory.NUMERIC) {
--- End diff --

one never ends to learn. :-)



> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15979007#comment-15979007
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user stefanobortoli commented on the issue:

https://github.com/apache/flink/pull/3732
  
@fhueske I have just pushed a version working with code generation (without 
modifying the code generation) There will be the need for some refactoring in 
the AggregateUtil function, but if the overall concept is sound, I will fix 
things. 

@hongyuhong , @shijinkui you could also have a look if you have time. 


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978999#comment-15978999
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user stefanobortoli commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112725820
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -91,6 +93,22 @@ class DataStreamOverAggregate(
 
 val overWindow: org.apache.calcite.rel.core.Window.Group = 
logicWindow.groups.get(0)
 
+val distinctVarMap: Map[String,Boolean] = new HashMap[String, Boolean]
--- End diff --

This is a good point. The string trick is anyway a temporary workaround.


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978943#comment-15978943
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3732
  
Btw, the code generation is not so fancy. The best way to learn it would be 
to debug a simple batch GROUP BY query (once batch aggregations are code-gen'd) 
as well


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978936#comment-15978936
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3732
  
sounds good to me. IMO, we can also add the runtime code to the code base 
even if there is no API support if we cover it with test cases. Then we could 
quickly enable it once DISTINCT in OVER becomes available.


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978934#comment-15978934
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112716483
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
 ---
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.state.ListState
+
+class ProcTimeBoundedDistinctRowsOver(
+  private val aggregates: Array[AggregateFunction[_]],
+  private val aggFields: Array[Array[Int]],
+  private val distinctAggsFlag: Array[Boolean],
+  private val precedingOffset: Long,
+  private val forwardedFieldCount: Int,
+  private val aggregatesTypeInfo: RowTypeInfo,
+  private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkNotNull(distinctAggsFlag)
+  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkArgument(precedingOffset > 0)
+
+  private var accumulatorState: ValueState[Row] = _
+  private var rowMapState: MapState[Long, JList[Row]] = _
+  private var output: Row = _
+  private var counterState: ValueState[Long] = _
+  private var smallestTsState: ValueState[Long] = _
+  private var distinctValueStateList: Array[MapState[Any, Long]] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState = 
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("smallestTSState", 

[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978933#comment-15978933
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user stefanobortoli commented on the issue:

https://github.com/apache/flink/pull/3732
  
@fhueske, I agree with you about the risk of temporary DIST() ingestion. 
Perhaps we could meanwhile just work on the "ProcessFunction +  Code 
generation" keeping the DIST function for test purposes tests. My concern is 
that the code my change again and all the work would just be wasted. To be 
honest, the code generation is quite new to me, and I will have to learn to 
work on that. Meanwhile, I have almost completed a version that relies on 
current code generation, nesting the distinct logic. As it is almost done, I 
will share this one as well and then if necessary move to the code generation. 
what do you think? 


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978917#comment-15978917
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user stefanobortoli commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112713096
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
 ---
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.state.ListState
+
+class ProcTimeBoundedDistinctRowsOver(
+  private val aggregates: Array[AggregateFunction[_]],
+  private val aggFields: Array[Array[Int]],
+  private val distinctAggsFlag: Array[Boolean],
+  private val precedingOffset: Long,
+  private val forwardedFieldCount: Int,
+  private val aggregatesTypeInfo: RowTypeInfo,
+  private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkNotNull(distinctAggsFlag)
+  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkArgument(precedingOffset > 0)
+
+  private var accumulatorState: ValueState[Row] = _
+  private var rowMapState: MapState[Long, JList[Row]] = _
+  private var output: Row = _
+  private var counterState: ValueState[Long] = _
+  private var smallestTsState: ValueState[Long] = _
+  private var distinctValueStateList: Array[MapState[Any, Long]] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState = 
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("smallestTSState", 

[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978916#comment-15978916
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user stefanobortoli commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112712717
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
 ---
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.state.ListState
+
+class ProcTimeBoundedDistinctRowsOver(
+  private val aggregates: Array[AggregateFunction[_]],
+  private val aggFields: Array[Array[Int]],
+  private val distinctAggsFlag: Array[Boolean],
+  private val precedingOffset: Long,
+  private val forwardedFieldCount: Int,
+  private val aggregatesTypeInfo: RowTypeInfo,
+  private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkNotNull(distinctAggsFlag)
+  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkArgument(precedingOffset > 0)
+
+  private var accumulatorState: ValueState[Row] = _
+  private var rowMapState: MapState[Long, JList[Row]] = _
+  private var output: Row = _
+  private var counterState: ValueState[Long] = _
+  private var smallestTsState: ValueState[Long] = _
+  private var distinctValueStateList: Array[MapState[Any, Long]] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState = 
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("smallestTSState", 

[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978895#comment-15978895
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112702342
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
 ---
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.state.ListState
+
+class ProcTimeBoundedDistinctRowsOver(
+  private val aggregates: Array[AggregateFunction[_]],
+  private val aggFields: Array[Array[Int]],
+  private val distinctAggsFlag: Array[Boolean],
+  private val precedingOffset: Long,
+  private val forwardedFieldCount: Int,
+  private val aggregatesTypeInfo: RowTypeInfo,
+  private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkNotNull(distinctAggsFlag)
+  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkArgument(precedingOffset > 0)
+
+  private var accumulatorState: ValueState[Row] = _
+  private var rowMapState: MapState[Long, JList[Row]] = _
+  private var output: Row = _
+  private var counterState: ValueState[Long] = _
+  private var smallestTsState: ValueState[Long] = _
+  private var distinctValueStateList: Array[MapState[Any, Long]] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState = 
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("smallestTSState", 

[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978891#comment-15978891
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112694911
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -152,7 +153,14 @@ object AggregateUtil {
 
 val aggregationStateType: RowTypeInfo = 
createAccumulatorRowType(aggregates)
 val inputRowType = 
FlinkTypeFactory.toInternalRowTypeInfo(inputType).asInstanceOf[RowTypeInfo]
-
+
+var hasDistinct = false
--- End diff --

can be simplified to 
```
val hasDistinct = distinctAggregatesFlags.exists(_)
```

`exists` returns `true` if the function (here `_`) returns `true` at least 
once.


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978894#comment-15978894
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112695257
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -186,11 +194,21 @@ object AggregateUtil {
   }
 } else {
   if (isRowsClause) {
-new ProcTimeBoundedRowsOver(
-  genFunction,
-  precedingOffset,
-  aggregationStateType,
-  inputRowType)
+if (hasDistinct){
+  new ProcTimeBoundedDistinctRowsOver(aggregates,
--- End diff --

`aggregates` in new line


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978902#comment-15978902
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112708098
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
 ---
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.state.ListState
+
+class ProcTimeBoundedDistinctRowsOver(
+  private val aggregates: Array[AggregateFunction[_]],
+  private val aggFields: Array[Array[Int]],
+  private val distinctAggsFlag: Array[Boolean],
+  private val precedingOffset: Long,
+  private val forwardedFieldCount: Int,
+  private val aggregatesTypeInfo: RowTypeInfo,
+  private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkNotNull(distinctAggsFlag)
+  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkArgument(precedingOffset > 0)
+
+  private var accumulatorState: ValueState[Row] = _
+  private var rowMapState: MapState[Long, JList[Row]] = _
+  private var output: Row = _
+  private var counterState: ValueState[Long] = _
+  private var smallestTsState: ValueState[Long] = _
+  private var distinctValueStateList: Array[MapState[Any, Long]] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState = 
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("smallestTSState", 

[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978898#comment-15978898
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112706580
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
 ---
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.state.ListState
+
+class ProcTimeBoundedDistinctRowsOver(
+  private val aggregates: Array[AggregateFunction[_]],
+  private val aggFields: Array[Array[Int]],
+  private val distinctAggsFlag: Array[Boolean],
+  private val precedingOffset: Long,
+  private val forwardedFieldCount: Int,
+  private val aggregatesTypeInfo: RowTypeInfo,
+  private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkNotNull(distinctAggsFlag)
+  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkArgument(precedingOffset > 0)
+
+  private var accumulatorState: ValueState[Row] = _
+  private var rowMapState: MapState[Long, JList[Row]] = _
+  private var output: Row = _
+  private var counterState: ValueState[Long] = _
+  private var smallestTsState: ValueState[Long] = _
+  private var distinctValueStateList: Array[MapState[Any, Long]] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState = 
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("smallestTSState", 

[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978904#comment-15978904
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112691071
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UnsupportedOperatorsIndicatorFunctions.scala
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions
+
+import java.nio.charset.Charset
+import java.util.List
+
+import org.apache.calcite.rel.`type`._
+import org.apache.calcite.sql._
+import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes, 
SqlTypeFamily, SqlTypeName}
+import org.apache.calcite.sql.validate.SqlMonotonicity
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.expressions.LeafExpression
+import org.apache.calcite.sql.`type`.InferTypes
+import org.apache.calcite.sql.validate.SqlValidator
+import org.apache.calcite.sql.validate.SqlValidatorScope
+
+/**
+ * An SQL Function DISTINCT() used to mark the DISTINCT operator
+ * on aggregation input. This is temporary workaround waiting for 
+ * https://issues.apache.org/jira/browse/CALCITE-1740 being solved
+ */
+object DistinctAggregatorExtractor extends SqlFunction("DIST", 
SqlKind.OTHER_FUNCTION,
+  ReturnTypes.ARG0, InferTypes.RETURN_TYPE,
+  OperandTypes.NUMERIC, SqlFunctionCategory.NUMERIC) {
--- End diff --

An aggregation function can also return non-numeric types such as 
MIN(String). So `SqlFunctionCategory.NUMERIC` might not be the right category. 
OTOH, I don't know how Calcite uses this category.


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978890#comment-15978890
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112690943
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UnsupportedOperatorsIndicatorFunctions.scala
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions
+
+import java.nio.charset.Charset
+import java.util.List
+
+import org.apache.calcite.rel.`type`._
--- End diff --

Many unused imports


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978903#comment-15978903
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112693953
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -91,6 +93,22 @@ class DataStreamOverAggregate(
 
 val overWindow: org.apache.calcite.rel.core.Window.Group = 
logicWindow.groups.get(0)
 
+val distinctVarMap: Map[String,Boolean] = new HashMap[String, Boolean]
+if (input.isInstanceOf[DataStreamCalc]) {
+  val dsCalc = input.asInstanceOf[DataStreamCalc]
+  val iter = dsCalc
+ .selectionToString(dsCalc.getProgram, 
dsCalc.getExpressionString)
+ .split(",")
+ .iterator
+  while (iter.hasNext) {
+val exp = iter.next
+if(exp.contains("DIST")){
+  val varName = exp.substring(exp.indexOf("$"))
+   distinctVarMap.put(varName,true)
--- End diff --

+space


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978901#comment-15978901
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112694144
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -224,13 +244,21 @@ class DataStreamOverAggregate(
   def createBoundedAndCurrentRowOverWindow(
 generator: CodeGenerator,
 inputDS: DataStream[Row],
+distinctVarMap : Map[String,Boolean],
 isRowTimeType: Boolean,
 isRowsClause: Boolean): DataStream[Row] = {
 
 val overWindow: Group = logicWindow.groups.get(0)
 val partitionKeys: Array[Int] = overWindow.keys.toArray
 val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = 
generateNamedAggregates
 
+val aggregateCalls = overWindow.getAggregateCalls(logicWindow)
+val distinctAggFlags: Array[Boolean] =  new 
Array[Boolean](aggregateCalls.size)
+for (i <- 0 until aggregateCalls.size()){
--- End diff --

+space `.size()) {`


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978893#comment-15978893
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112700998
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
 ---
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.state.ListState
+
+class ProcTimeBoundedDistinctRowsOver(
+  private val aggregates: Array[AggregateFunction[_]],
+  private val aggFields: Array[Array[Int]],
+  private val distinctAggsFlag: Array[Boolean],
+  private val precedingOffset: Long,
+  private val forwardedFieldCount: Int,
+  private val aggregatesTypeInfo: RowTypeInfo,
+  private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkNotNull(distinctAggsFlag)
+  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkArgument(precedingOffset > 0)
+
+  private var accumulatorState: ValueState[Row] = _
+  private var rowMapState: MapState[Long, JList[Row]] = _
+  private var output: Row = _
+  private var counterState: ValueState[Long] = _
+  private var smallestTsState: ValueState[Long] = _
+  private var distinctValueStateList: Array[MapState[Any, Long]] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState = 
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("smallestTSState", 

[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978900#comment-15978900
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112702163
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
 ---
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.state.ListState
+
+class ProcTimeBoundedDistinctRowsOver(
+  private val aggregates: Array[AggregateFunction[_]],
+  private val aggFields: Array[Array[Int]],
+  private val distinctAggsFlag: Array[Boolean],
+  private val precedingOffset: Long,
+  private val forwardedFieldCount: Int,
+  private val aggregatesTypeInfo: RowTypeInfo,
+  private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkNotNull(distinctAggsFlag)
+  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkArgument(precedingOffset > 0)
+
+  private var accumulatorState: ValueState[Row] = _
+  private var rowMapState: MapState[Long, JList[Row]] = _
+  private var output: Row = _
+  private var counterState: ValueState[Long] = _
+  private var smallestTsState: ValueState[Long] = _
+  private var distinctValueStateList: Array[MapState[Any, Long]] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState = 
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("smallestTSState", 

[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978899#comment-15978899
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112701470
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
 ---
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.state.ListState
+
+class ProcTimeBoundedDistinctRowsOver(
+  private val aggregates: Array[AggregateFunction[_]],
+  private val aggFields: Array[Array[Int]],
+  private val distinctAggsFlag: Array[Boolean],
+  private val precedingOffset: Long,
+  private val forwardedFieldCount: Int,
+  private val aggregatesTypeInfo: RowTypeInfo,
+  private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkNotNull(distinctAggsFlag)
+  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkArgument(precedingOffset > 0)
+
+  private var accumulatorState: ValueState[Row] = _
+  private var rowMapState: MapState[Long, JList[Row]] = _
+  private var output: Row = _
+  private var counterState: ValueState[Long] = _
+  private var smallestTsState: ValueState[Long] = _
+  private var distinctValueStateList: Array[MapState[Any, Long]] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState = 
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("smallestTSState", 

[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978892#comment-15978892
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112693618
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -91,6 +93,22 @@ class DataStreamOverAggregate(
 
 val overWindow: org.apache.calcite.rel.core.Window.Group = 
logicWindow.groups.get(0)
 
+val distinctVarMap: Map[String,Boolean] = new HashMap[String, Boolean]
--- End diff --

I would do the extraction in the DataStreamOverAggregateRule. There we have 
proper access to the input `Calc` and the `RexProgram`. Extraction the function 
call as a String is quite fragile. The calc could for instance contain an 
attribute called "DISTRIBUTION".

The rule would remove unnest the expression from the `DIST()` RexNode and 
remove `DIST`. The distinct information would need to be added to the 
DataStreamOverAggregate.


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978896#comment-15978896
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112706609
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
 ---
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.state.ListState
+
+class ProcTimeBoundedDistinctRowsOver(
+  private val aggregates: Array[AggregateFunction[_]],
+  private val aggFields: Array[Array[Int]],
+  private val distinctAggsFlag: Array[Boolean],
+  private val precedingOffset: Long,
+  private val forwardedFieldCount: Int,
+  private val aggregatesTypeInfo: RowTypeInfo,
+  private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkNotNull(distinctAggsFlag)
+  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkArgument(precedingOffset > 0)
+
+  private var accumulatorState: ValueState[Row] = _
+  private var rowMapState: MapState[Long, JList[Row]] = _
+  private var output: Row = _
+  private var counterState: ValueState[Long] = _
+  private var smallestTsState: ValueState[Long] = _
+  private var distinctValueStateList: Array[MapState[Any, Long]] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState = 
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("smallestTSState", 

[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978897#comment-15978897
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112702313
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
 ---
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.state.ListState
+
+class ProcTimeBoundedDistinctRowsOver(
+  private val aggregates: Array[AggregateFunction[_]],
+  private val aggFields: Array[Array[Int]],
+  private val distinctAggsFlag: Array[Boolean],
+  private val precedingOffset: Long,
+  private val forwardedFieldCount: Int,
+  private val aggregatesTypeInfo: RowTypeInfo,
+  private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkNotNull(distinctAggsFlag)
+  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkArgument(precedingOffset > 0)
+
+  private var accumulatorState: ValueState[Row] = _
+  private var rowMapState: MapState[Long, JList[Row]] = _
+  private var output: Row = _
+  private var counterState: ValueState[Long] = _
+  private var smallestTsState: ValueState[Long] = _
+  private var distinctValueStateList: Array[MapState[Any, Long]] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState = 
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("smallestTSState", 

[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974731#comment-15974731
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112210389
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
 ---
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+class ProcTimeBoundedDistinctRowsOver(
+  private val aggregates: Array[AggregateFunction[_]],
+  private val aggFields: Array[Array[Int]],
+  private val distinctAggsFlag: Array[Boolean],
+  private val precedingOffset: Long,
+  private val forwardedFieldCount: Int,
+  private val aggregatesTypeInfo: RowTypeInfo,
+  private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkNotNull(distinctAggsFlag)
+  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkArgument(precedingOffset > 0)
+
+  private var accumulatorState: ValueState[Row] = _
+  private var rowMapState: MapState[Long, JList[Row]] = _
+  private var output: Row = _
+  private var counterState: ValueState[Long] = _
+  private var smallestTsState: ValueState[Long] = _
+  private var distinctValueState: MapState[Any, Row] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState = 
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
+smallestTsState = 

[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974726#comment-15974726
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112210097
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
 ---
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+class ProcTimeBoundedDistinctRowsOver(
+  private val aggregates: Array[AggregateFunction[_]],
+  private val aggFields: Array[Array[Int]],
+  private val distinctAggsFlag: Array[Boolean],
+  private val precedingOffset: Long,
+  private val forwardedFieldCount: Int,
+  private val aggregatesTypeInfo: RowTypeInfo,
+  private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkNotNull(distinctAggsFlag)
+  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkArgument(precedingOffset > 0)
+
+  private var accumulatorState: ValueState[Row] = _
+  private var rowMapState: MapState[Long, JList[Row]] = _
+  private var output: Row = _
+  private var counterState: ValueState[Long] = _
+  private var smallestTsState: ValueState[Long] = _
+  private var distinctValueState: MapState[Any, Row] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState = 
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
+smallestTsState = 

[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974485#comment-15974485
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112180091
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
 ---
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+class ProcTimeBoundedDistinctRowsOver(
+  private val aggregates: Array[AggregateFunction[_]],
+  private val aggFields: Array[Array[Int]],
+  private val distinctAggsFlag: Array[Boolean],
+  private val precedingOffset: Long,
+  private val forwardedFieldCount: Int,
+  private val aggregatesTypeInfo: RowTypeInfo,
+  private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkNotNull(distinctAggsFlag)
+  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkArgument(precedingOffset > 0)
+
+  private var accumulatorState: ValueState[Row] = _
+  private var rowMapState: MapState[Long, JList[Row]] = _
+  private var output: Row = _
+  private var counterState: ValueState[Long] = _
+  private var smallestTsState: ValueState[Long] = _
+  private var distinctValueState: MapState[Any, Row] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState = 
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
+smallestTsState = 

[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974357#comment-15974357
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112151646
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
 ---
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+class ProcTimeBoundedDistinctRowsOver(
+  private val aggregates: Array[AggregateFunction[_]],
+  private val aggFields: Array[Array[Int]],
+  private val distinctAggsFlag: Array[Boolean],
+  private val precedingOffset: Long,
+  private val forwardedFieldCount: Int,
+  private val aggregatesTypeInfo: RowTypeInfo,
+  private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkNotNull(distinctAggsFlag)
+  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkArgument(precedingOffset > 0)
+
+  private var accumulatorState: ValueState[Row] = _
+  private var rowMapState: MapState[Long, JList[Row]] = _
+  private var output: Row = _
+  private var counterState: ValueState[Long] = _
+  private var smallestTsState: ValueState[Long] = _
+  private var distinctValueState: MapState[Any, Row] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState = 
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
+smallestTsState = 

[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974352#comment-15974352
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112151222
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
 ---
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+class ProcTimeBoundedDistinctRowsOver(
+  private val aggregates: Array[AggregateFunction[_]],
+  private val aggFields: Array[Array[Int]],
+  private val distinctAggsFlag: Array[Boolean],
+  private val precedingOffset: Long,
+  private val forwardedFieldCount: Int,
+  private val aggregatesTypeInfo: RowTypeInfo,
+  private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkNotNull(distinctAggsFlag)
+  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkArgument(precedingOffset > 0)
+
+  private var accumulatorState: ValueState[Row] = _
+  private var rowMapState: MapState[Long, JList[Row]] = _
+  private var output: Row = _
+  private var counterState: ValueState[Long] = _
+  private var smallestTsState: ValueState[Long] = _
+  private var distinctValueState: MapState[Any, Row] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState = 
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
+smallestTsState = 

[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973954#comment-15973954
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user hongyuhong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112105763
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
 ---
@@ -1140,6 +1140,202 @@ class SqlITCase extends StreamingWithStateTestBase {
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  @Test
+  def testNonPartitionedProcTimeOverDistinctWindow(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setParallelism(1)
+StreamITCase.testResults = mutable.MutableList()
+
+val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv).as('a, 
'b, 'c, 'd, 'e)
+tEnv.registerTable("MyTable", t)
+
+val sqlQuery = "SELECT a,  " +
+  "  SUM(DIST(e)) OVER (" +
+  " ORDER BY procTime() ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS 
sumE " +
--- End diff --

I think we can add some test for multi aggregation with distinct.


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973955#comment-15973955
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user hongyuhong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112103318
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
 ---
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+class ProcTimeBoundedDistinctRowsOver(
+  private val aggregates: Array[AggregateFunction[_]],
+  private val aggFields: Array[Array[Int]],
+  private val distinctAggsFlag: Array[Boolean],
+  private val precedingOffset: Long,
+  private val forwardedFieldCount: Int,
+  private val aggregatesTypeInfo: RowTypeInfo,
+  private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkNotNull(distinctAggsFlag)
+  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkArgument(precedingOffset > 0)
+
+  private var accumulatorState: ValueState[Row] = _
+  private var rowMapState: MapState[Long, JList[Row]] = _
+  private var output: Row = _
+  private var counterState: ValueState[Long] = _
+  private var smallestTsState: ValueState[Long] = _
+  private var distinctValueState: MapState[Any, Row] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState = 
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
+smallestTsState = 

[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973956#comment-15973956
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user hongyuhong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112105507
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
 ---
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+class ProcTimeBoundedDistinctRowsOver(
+  private val aggregates: Array[AggregateFunction[_]],
+  private val aggFields: Array[Array[Int]],
+  private val distinctAggsFlag: Array[Boolean],
+  private val precedingOffset: Long,
+  private val forwardedFieldCount: Int,
+  private val aggregatesTypeInfo: RowTypeInfo,
+  private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkNotNull(distinctAggsFlag)
+  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkArgument(precedingOffset > 0)
+
+  private var accumulatorState: ValueState[Row] = _
+  private var rowMapState: MapState[Long, JList[Row]] = _
+  private var output: Row = _
+  private var counterState: ValueState[Long] = _
+  private var smallestTsState: ValueState[Long] = _
+  private var distinctValueState: MapState[Any, Row] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState = 
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
+smallestTsState = 

[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973953#comment-15973953
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user hongyuhong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112105029
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
 ---
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+class ProcTimeBoundedDistinctRowsOver(
+  private val aggregates: Array[AggregateFunction[_]],
+  private val aggFields: Array[Array[Int]],
+  private val distinctAggsFlag: Array[Boolean],
+  private val precedingOffset: Long,
+  private val forwardedFieldCount: Int,
+  private val aggregatesTypeInfo: RowTypeInfo,
+  private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkNotNull(distinctAggsFlag)
+  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkArgument(precedingOffset > 0)
+
+  private var accumulatorState: ValueState[Row] = _
+  private var rowMapState: MapState[Long, JList[Row]] = _
+  private var output: Row = _
+  private var counterState: ValueState[Long] = _
+  private var smallestTsState: ValueState[Long] = _
+  private var distinctValueState: MapState[Any, Row] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState = 
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
+smallestTsState = 

[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972582#comment-15972582
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

GitHub user huawei-flink opened a pull request:

https://github.com/apache/flink/pull/3732

[FLINK-6250] Distinct procTime with Rows boundaries

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [X] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [X] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [X] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/huawei-flink/flink FLINK-6250

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3732.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3732


commit 4e3da4c9baebf48bfc47ef192287e7e17ab69efd
Author: Stefano Bortoli 
Date:   2017-04-18T12:27:26Z

DIST() for aggregation on procTime row bounded windows




> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-18 Thread Stefano Bortoli (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972574#comment-15972574
 ] 

Stefano Bortoli commented on FLINK-6250:


Hi [~fhueske], 

we are working with Calcite to bring in the support to DISTINCT. I think that 
[~rtudoran] implemented a first version and we are under core review. Still, 
the time to get that support through Calcite is far away (release, + inclusion 
in Flink). 

I agree with you that it is not optimal, but at least we are moving ahead. I 
will push the branch without the code generation. Then you can have a look at 
the implementation. We keep it for when DISTINCT is officially supported then.

Code generation aggregation interfaces do no include specific parameters for 
distinct aggregation and retraction, thus those will have to be reworked 
anyway. Should I open a JIRA for those? 

> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-18 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972486#comment-15972486
 ] 

Fabian Hueske commented on FLINK-6250:
--

Sorry for the inconvenience [~stefano.bortoli]. Code generation is required to 
bring the interface of the user-defined aggregates in the final shape for the 
1.3 release (e.g., to allow multiple input values).

I'm not sure if we should work around the missing support for `DISTINCT` 
aggregates in Calcite by adding a non-standard compliant behavior that would 
need to be removed afterwards (breaking existing queries that use the feature). 
Calling a method `DIST()` on an aggregation function usually means to compute 
the aggregate and apply the function on its result. 

Did you check at which level, Calcite is missing support for DISTINCT? Is it 
already in the parser or later in the validator? 

> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-18 Thread Stefano Bortoli (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972387#comment-15972387
 ] 

Stefano Bortoli commented on FLINK-6250:


[~fhueske], I just saw that you just merged FLINK-6240... I was about to push 
the FLINK-6250 branch, but the function was now replaced with code generation. 
This makes the adding of DISTINCT function further more complicated. Was is it 
really necessary while are still developing the functions? 

> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-13 Thread Stefano Bortoli (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15967477#comment-15967477
 ] 

Stefano Bortoli commented on FLINK-6250:


Ok, as a workaround for the missing distinct aggregation, I have implemented a 
new {{SqlFunction}} called {{DIST()}} that is intercepted in the 
{{DataStreamOverAggregate}} and used to create the {{distinctFlag}} for the 
aggregators. That part will be removed as soon as calcite supports DISTINCT 
aggregations. I think that eventually this task should be subject of a JIRA 
issue by itself.

> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-11 Thread Stefano Bortoli (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964250#comment-15964250
 ] 

Stefano Bortoli commented on FLINK-6250:


According to the JIRA issue open to Calcite, there is no clear plan to solve 
the DISTINCT aggregation problem. You mentioned that we could work around it in 
Flink and then remove the workaround when calcite supports it. Not sure where I 
should start from. Is it something like for the LogicalAggregation?

> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-04 Thread Stefano Bortoli (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15955200#comment-15955200
 ] 

Stefano Bortoli commented on FLINK-6250:


I am also fine implementing it extending the existing function. Practically it 
would be what would have happened, but in another file. Currently I am trying 
to understand where the DISTINCT marker gets lost as it does not arrive to the 
OVER aggregation function. Probably it is intercepted in early stages of the 
query processing. 

> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-04 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15955117#comment-15955117
 ] 

Fabian Hueske commented on FLINK-6250:
--

I'm not sure about implementing this as a separate {{ProcessFunction}}. 
I think we should adapt the existing ones because we will need support for 
distinct and not distinct aggregates in the same {{ProcessFunction}}.

> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-04 Thread Stefano Bortoli (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15955041#comment-15955041
 ] 

Stefano Bortoli commented on FLINK-6250:


I will approach this implementing a processing function starting from the the 
simple aggregation:
1 - add a "distinctValue" MapState counting aggregated unique values in the 
window, 
2 - aggregating when the value is previously unseen
3 - decreasing counter when the value goes out of boundaries
4 - retract aggregator & remove from state when the counter is set to zero.

> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)