[jira] [Commented] (FLINK-8903) Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in Group Windows

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16409603#comment-16409603
 ] 

ASF GitHub Bot commented on FLINK-8903:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5706


> Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in 
> Group Windows
> ---
>
> Key: FLINK-8903
> URL: https://issues.apache.org/jira/browse/FLINK-8903
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.3.2, 1.5.0, 1.4.2
>Reporter: lilizhao
>Assignee: Fabian Hueske
>Priority: Critical
> Fix For: 1.5.0
>
> Attachments: QQ图片20180312180143.jpg, TableAndSQLTest.java
>
>
> The built-in aggregation functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP 
> are translated into regular AVG functions if they are applied in the context 
> of a Group Window aggregation (\{{GROUP BY TUMBLE/HOP/SESSION}}).
> The reason is that these functions are internally represented as 
> {{SqlAvgAggFunction}} but with different {{SqlKind}}. When translating 
> Calcite aggregation functions to Flink Table agg functions, we only look at 
> the type of the class, not at the value of the {{kind}} field. We did not 
> notice that before, because in all other cases (regular {{GROUP BY}} without 
> windows or {{OVER}} windows, we have a translation rule 
> {{AggregateReduceFunctionsRule}} that decomposes the more complex functions 
> into expressions of {{COUNT}} and {{SUM}} functions such that we never 
> execute an {{AVG}} Flink function. That rule can only be applied on 
> {{LogicalAggregate}}, however, we represent group windows as 
> {{LogicalWindowAggregate}}, so the rule does not match.
> We should fix this by:
> 1. restrict the translation to Flink avg functions in {{AggregateUtil}} to 
> {{SqlKind.AVG}}. 
> 2. implement a rule (hopefully based on {{AggregateReduceFunctionsRule}}) 
> that decomposes the complex agg functions into the {{SUM}} and {{COUNT}}.
> Step 1. is easy and a quick fix but we would get an exception "Unsupported 
> Function" if {{VAR_POP}} is used in a {{GROUP BY}} window.
> Step 2. might be more involved, depending on how difficult it is to port the 
> rule.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8903) Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in Group Windows

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16407123#comment-16407123
 ] 

ASF GitHub Bot commented on FLINK-8903:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5706
  
Thanks for the feedback everybody!
Will merge the PR tomorrow.


> Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in 
> Group Windows
> ---
>
> Key: FLINK-8903
> URL: https://issues.apache.org/jira/browse/FLINK-8903
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.3.2, 1.5.0, 1.4.2
>Reporter: lilizhao
>Assignee: Fabian Hueske
>Priority: Critical
> Fix For: 1.5.0
>
> Attachments: QQ图片20180312180143.jpg, TableAndSQLTest.java
>
>
> The built-in aggregation functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP 
> are translated into regular AVG functions if they are applied in the context 
> of a Group Window aggregation (\{{GROUP BY TUMBLE/HOP/SESSION}}).
> The reason is that these functions are internally represented as 
> {{SqlAvgAggFunction}} but with different {{SqlKind}}. When translating 
> Calcite aggregation functions to Flink Table agg functions, we only look at 
> the type of the class, not at the value of the {{kind}} field. We did not 
> notice that before, because in all other cases (regular {{GROUP BY}} without 
> windows or {{OVER}} windows, we have a translation rule 
> {{AggregateReduceFunctionsRule}} that decomposes the more complex functions 
> into expressions of {{COUNT}} and {{SUM}} functions such that we never 
> execute an {{AVG}} Flink function. That rule can only be applied on 
> {{LogicalAggregate}}, however, we represent group windows as 
> {{LogicalWindowAggregate}}, so the rule does not match.
> We should fix this by:
> 1. restrict the translation to Flink avg functions in {{AggregateUtil}} to 
> {{SqlKind.AVG}}. 
> 2. implement a rule (hopefully based on {{AggregateReduceFunctionsRule}}) 
> that decomposes the complex agg functions into the {{SUM}} and {{COUNT}}.
> Step 1. is easy and a quick fix but we would get an exception "Unsupported 
> Function" if {{VAR_POP}} is used in a {{GROUP BY}} window.
> Step 2. might be more involved, depending on how difficult it is to port the 
> rule.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8903) Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in Group Windows

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406559#comment-16406559
 ] 

ASF GitHub Bot commented on FLINK-8903:
---

Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5706#discussion_r175825522
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
 ---
@@ -103,6 +106,22 @@ class FlinkLogicalWindowAggregateConverter
 FlinkConventions.LOGICAL,
 "FlinkLogicalWindowAggregateConverter") {
 
+  override def matches(call: RelOptRuleCall): Boolean = {
+val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate]
+
+// we do not support these functions natively
+// they have to be converted using the 
WindowAggregateReduceFunctionsRule
+val supported = 
agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
+  // we support AVG
+  case SqlKind.AVG => true
+  // but none of the other AVG agg functions
+  case k if SqlKind.AVG_AGG_FUNCTIONS.contains(k) => false
+  case _ => true
+}
+
+!agg.containsDistinctCall() && supported
--- End diff --

Yes. This was kinda confusing to me, we should clean this up when adding 
DISTINCT support. Thanks for the update @fhueske 


> Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in 
> Group Windows
> ---
>
> Key: FLINK-8903
> URL: https://issues.apache.org/jira/browse/FLINK-8903
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.3.2, 1.5.0, 1.4.2
>Reporter: lilizhao
>Assignee: Fabian Hueske
>Priority: Critical
> Fix For: 1.5.0
>
> Attachments: QQ图片20180312180143.jpg, TableAndSQLTest.java
>
>
> The built-in aggregation functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP 
> are translated into regular AVG functions if they are applied in the context 
> of a Group Window aggregation (\{{GROUP BY TUMBLE/HOP/SESSION}}).
> The reason is that these functions are internally represented as 
> {{SqlAvgAggFunction}} but with different {{SqlKind}}. When translating 
> Calcite aggregation functions to Flink Table agg functions, we only look at 
> the type of the class, not at the value of the {{kind}} field. We did not 
> notice that before, because in all other cases (regular {{GROUP BY}} without 
> windows or {{OVER}} windows, we have a translation rule 
> {{AggregateReduceFunctionsRule}} that decomposes the more complex functions 
> into expressions of {{COUNT}} and {{SUM}} functions such that we never 
> execute an {{AVG}} Flink function. That rule can only be applied on 
> {{LogicalAggregate}}, however, we represent group windows as 
> {{LogicalWindowAggregate}}, so the rule does not match.
> We should fix this by:
> 1. restrict the translation to Flink avg functions in {{AggregateUtil}} to 
> {{SqlKind.AVG}}. 
> 2. implement a rule (hopefully based on {{AggregateReduceFunctionsRule}}) 
> that decomposes the complex agg functions into the {{SUM}} and {{COUNT}}.
> Step 1. is easy and a quick fix but we would get an exception "Unsupported 
> Function" if {{VAR_POP}} is used in a {{GROUP BY}} window.
> Step 2. might be more involved, depending on how difficult it is to port the 
> rule.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8903) Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in Group Windows

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406091#comment-16406091
 ] 

ASF GitHub Bot commented on FLINK-8903:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5706
  
PR updated


> Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in 
> Group Windows
> ---
>
> Key: FLINK-8903
> URL: https://issues.apache.org/jira/browse/FLINK-8903
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.3.2, 1.5.0, 1.4.2
>Reporter: lilizhao
>Assignee: Fabian Hueske
>Priority: Critical
> Fix For: 1.5.0
>
> Attachments: QQ图片20180312180143.jpg, TableAndSQLTest.java
>
>
> The built-in aggregation functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP 
> are translated into regular AVG functions if they are applied in the context 
> of a Group Window aggregation (\{{GROUP BY TUMBLE/HOP/SESSION}}).
> The reason is that these functions are internally represented as 
> {{SqlAvgAggFunction}} but with different {{SqlKind}}. When translating 
> Calcite aggregation functions to Flink Table agg functions, we only look at 
> the type of the class, not at the value of the {{kind}} field. We did not 
> notice that before, because in all other cases (regular {{GROUP BY}} without 
> windows or {{OVER}} windows, we have a translation rule 
> {{AggregateReduceFunctionsRule}} that decomposes the more complex functions 
> into expressions of {{COUNT}} and {{SUM}} functions such that we never 
> execute an {{AVG}} Flink function. That rule can only be applied on 
> {{LogicalAggregate}}, however, we represent group windows as 
> {{LogicalWindowAggregate}}, so the rule does not match.
> We should fix this by:
> 1. restrict the translation to Flink avg functions in {{AggregateUtil}} to 
> {{SqlKind.AVG}}. 
> 2. implement a rule (hopefully based on {{AggregateReduceFunctionsRule}}) 
> that decomposes the complex agg functions into the {{SUM}} and {{COUNT}}.
> Step 1. is easy and a quick fix but we would get an exception "Unsupported 
> Function" if {{VAR_POP}} is used in a {{GROUP BY}} window.
> Step 2. might be more involved, depending on how difficult it is to port the 
> rule.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8903) Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in Group Windows

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406085#comment-16406085
 ] 

ASF GitHub Bot commented on FLINK-8903:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5706#discussion_r175716765
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
 ---
@@ -103,6 +106,22 @@ class FlinkLogicalWindowAggregateConverter
 FlinkConventions.LOGICAL,
 "FlinkLogicalWindowAggregateConverter") {
 
+  override def matches(call: RelOptRuleCall): Boolean = {
+val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate]
+
+// we do not support these functions natively
+// they have to be converted using the 
WindowAggregateReduceFunctionsRule
+val supported = 
agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
+  // we support AVG
+  case SqlKind.AVG => true
+  // but none of the other AVG agg functions
+  case k if SqlKind.AVG_AGG_FUNCTIONS.contains(k) => false
+  case _ => true
+}
+
+!agg.containsDistinctCall() && supported
--- End diff --

Hmm, that's a good point. 
In fact, there won't be any plan with a `DISTINCT` aggregation in a 
`LogicalWindowAggregate` because `LogicalWindowAggregateRule` prevents 
translation of (`Calc(TUMBLE) -> Aggregate()`) into (`WindowAggregate(TUMBLE)`) 
if there is a distinct aggregate. This prevents window aggregates in SQL 
queries being translated into `WindowAggregate`. The Table API does not even 
have an API to define such queries.

So, I'd simply remove the `containsDistinctCall()` check for now. We should 
definitely clean this up when we add support for DISTINCT aggregates.

@walterddr, are you fine with this?



> Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in 
> Group Windows
> ---
>
> Key: FLINK-8903
> URL: https://issues.apache.org/jira/browse/FLINK-8903
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.3.2, 1.5.0, 1.4.2
>Reporter: lilizhao
>Assignee: Fabian Hueske
>Priority: Critical
> Fix For: 1.5.0
>
> Attachments: QQ图片20180312180143.jpg, TableAndSQLTest.java
>
>
> The built-in aggregation functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP 
> are translated into regular AVG functions if they are applied in the context 
> of a Group Window aggregation (\{{GROUP BY TUMBLE/HOP/SESSION}}).
> The reason is that these functions are internally represented as 
> {{SqlAvgAggFunction}} but with different {{SqlKind}}. When translating 
> Calcite aggregation functions to Flink Table agg functions, we only look at 
> the type of the class, not at the value of the {{kind}} field. We did not 
> notice that before, because in all other cases (regular {{GROUP BY}} without 
> windows or {{OVER}} windows, we have a translation rule 
> {{AggregateReduceFunctionsRule}} that decomposes the more complex functions 
> into expressions of {{COUNT}} and {{SUM}} functions such that we never 
> execute an {{AVG}} Flink function. That rule can only be applied on 
> {{LogicalAggregate}}, however, we represent group windows as 
> {{LogicalWindowAggregate}}, so the rule does not match.
> We should fix this by:
> 1. restrict the translation to Flink avg functions in {{AggregateUtil}} to 
> {{SqlKind.AVG}}. 
> 2. implement a rule (hopefully based on {{AggregateReduceFunctionsRule}}) 
> that decomposes the complex agg functions into the {{SUM}} and {{COUNT}}.
> Step 1. is easy and a quick fix but we would get an exception "Unsupported 
> Function" if {{VAR_POP}} is used in a {{GROUP BY}} window.
> Step 2. might be more involved, depending on how difficult it is to port the 
> rule.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8903) Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in Group Windows

2018-03-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16404070#comment-16404070
 ] 

ASF GitHub Bot commented on FLINK-8903:
---

Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5706#discussion_r175296744
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
 ---
@@ -103,6 +106,22 @@ class FlinkLogicalWindowAggregateConverter
 FlinkConventions.LOGICAL,
 "FlinkLogicalWindowAggregateConverter") {
 
+  override def matches(call: RelOptRuleCall): Boolean = {
+val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate]
+
+// we do not support these functions natively
+// they have to be converted using the 
WindowAggregateReduceFunctionsRule
+val supported = 
agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
+  // we support AVG
+  case SqlKind.AVG => true
+  // but none of the other AVG agg functions
+  case k if SqlKind.AVG_AGG_FUNCTIONS.contains(k) => false
+  case _ => true
+}
+
+!agg.containsDistinctCall() && supported
--- End diff --

shouldn't the logical rule supports distinct call here? It seems like 
previously the error were thrown on the `DataSetWindowAggregateRule` and 
`DataStreamWindowAggregateRule` respectively. Any chance we can add a unit-test 
to further clarify this change?



> Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in 
> Group Windows
> ---
>
> Key: FLINK-8903
> URL: https://issues.apache.org/jira/browse/FLINK-8903
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.3.2, 1.5.0, 1.4.2
>Reporter: lilizhao
>Assignee: Fabian Hueske
>Priority: Critical
> Fix For: 1.5.0
>
> Attachments: QQ图片20180312180143.jpg, TableAndSQLTest.java
>
>
> The built-in aggregation functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP 
> are translated into regular AVG functions if they are applied in the context 
> of a Group Window aggregation (\{{GROUP BY TUMBLE/HOP/SESSION}}).
> The reason is that these functions are internally represented as 
> {{SqlAvgAggFunction}} but with different {{SqlKind}}. When translating 
> Calcite aggregation functions to Flink Table agg functions, we only look at 
> the type of the class, not at the value of the {{kind}} field. We did not 
> notice that before, because in all other cases (regular {{GROUP BY}} without 
> windows or {{OVER}} windows, we have a translation rule 
> {{AggregateReduceFunctionsRule}} that decomposes the more complex functions 
> into expressions of {{COUNT}} and {{SUM}} functions such that we never 
> execute an {{AVG}} Flink function. That rule can only be applied on 
> {{LogicalAggregate}}, however, we represent group windows as 
> {{LogicalWindowAggregate}}, so the rule does not match.
> We should fix this by:
> 1. restrict the translation to Flink avg functions in {{AggregateUtil}} to 
> {{SqlKind.AVG}}. 
> 2. implement a rule (hopefully based on {{AggregateReduceFunctionsRule}}) 
> that decomposes the complex agg functions into the {{SUM}} and {{COUNT}}.
> Step 1. is easy and a quick fix but we would get an exception "Unsupported 
> Function" if {{VAR_POP}} is used in a {{GROUP BY}} window.
> Step 2. might be more involved, depending on how difficult it is to port the 
> rule.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8903) Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in Group Windows

2018-03-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16403095#comment-16403095
 ] 

ASF GitHub Bot commented on FLINK-8903:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5706
  
updated PR


> Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in 
> Group Windows
> ---
>
> Key: FLINK-8903
> URL: https://issues.apache.org/jira/browse/FLINK-8903
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.3.2, 1.5.0, 1.4.2
>Reporter: lilizhao
>Assignee: Fabian Hueske
>Priority: Critical
> Fix For: 1.5.0
>
> Attachments: QQ图片20180312180143.jpg, TableAndSQLTest.java
>
>
> The built-in aggregation functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP 
> are translated into regular AVG functions if they are applied in the context 
> of a Group Window aggregation (\{{GROUP BY TUMBLE/HOP/SESSION}}).
> The reason is that these functions are internally represented as 
> {{SqlAvgAggFunction}} but with different {{SqlKind}}. When translating 
> Calcite aggregation functions to Flink Table agg functions, we only look at 
> the type of the class, not at the value of the {{kind}} field. We did not 
> notice that before, because in all other cases (regular {{GROUP BY}} without 
> windows or {{OVER}} windows, we have a translation rule 
> {{AggregateReduceFunctionsRule}} that decomposes the more complex functions 
> into expressions of {{COUNT}} and {{SUM}} functions such that we never 
> execute an {{AVG}} Flink function. That rule can only be applied on 
> {{LogicalAggregate}}, however, we represent group windows as 
> {{LogicalWindowAggregate}}, so the rule does not match.
> We should fix this by:
> 1. restrict the translation to Flink avg functions in {{AggregateUtil}} to 
> {{SqlKind.AVG}}. 
> 2. implement a rule (hopefully based on {{AggregateReduceFunctionsRule}}) 
> that decomposes the complex agg functions into the {{SUM}} and {{COUNT}}.
> Step 1. is easy and a quick fix but we would get an exception "Unsupported 
> Function" if {{VAR_POP}} is used in a {{GROUP BY}} window.
> Step 2. might be more involved, depending on how difficult it is to port the 
> rule.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8903) Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in Group Windows

2018-03-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16402691#comment-16402691
 ] 

ASF GitHub Bot commented on FLINK-8903:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5706#discussion_r175230116
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
 ---
@@ -103,6 +106,19 @@ class FlinkLogicalWindowAggregateConverter
 FlinkConventions.LOGICAL,
 "FlinkLogicalWindowAggregateConverter") {
 
+  override def matches(call: RelOptRuleCall): Boolean = {
+val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate]
+
+// we do not support these functions natively
+// they have to be converted using the 
WindowAggregateReduceFunctionsRule
+val supported = 
agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
+  case SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | 
SqlKind.VAR_SAMP => false
--- End diff --

sounds good to me. Will update the PR


> Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in 
> Group Windows
> ---
>
> Key: FLINK-8903
> URL: https://issues.apache.org/jira/browse/FLINK-8903
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.3.2, 1.5.0, 1.4.2
>Reporter: lilizhao
>Assignee: Fabian Hueske
>Priority: Critical
> Fix For: 1.5.0
>
> Attachments: QQ图片20180312180143.jpg, TableAndSQLTest.java
>
>
> The built-in aggregation functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP 
> are translated into regular AVG functions if they are applied in the context 
> of a Group Window aggregation (\{{GROUP BY TUMBLE/HOP/SESSION}}).
> The reason is that these functions are internally represented as 
> {{SqlAvgAggFunction}} but with different {{SqlKind}}. When translating 
> Calcite aggregation functions to Flink Table agg functions, we only look at 
> the type of the class, not at the value of the {{kind}} field. We did not 
> notice that before, because in all other cases (regular {{GROUP BY}} without 
> windows or {{OVER}} windows, we have a translation rule 
> {{AggregateReduceFunctionsRule}} that decomposes the more complex functions 
> into expressions of {{COUNT}} and {{SUM}} functions such that we never 
> execute an {{AVG}} Flink function. That rule can only be applied on 
> {{LogicalAggregate}}, however, we represent group windows as 
> {{LogicalWindowAggregate}}, so the rule does not match.
> We should fix this by:
> 1. restrict the translation to Flink avg functions in {{AggregateUtil}} to 
> {{SqlKind.AVG}}. 
> 2. implement a rule (hopefully based on {{AggregateReduceFunctionsRule}}) 
> that decomposes the complex agg functions into the {{SUM}} and {{COUNT}}.
> Step 1. is easy and a quick fix but we would get an exception "Unsupported 
> Function" if {{VAR_POP}} is used in a {{GROUP BY}} window.
> Step 2. might be more involved, depending on how difficult it is to port the 
> rule.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8903) Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in Group Windows

2018-03-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16402345#comment-16402345
 ] 

ASF GitHub Bot commented on FLINK-8903:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5706#discussion_r175183811
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
 ---
@@ -103,6 +106,19 @@ class FlinkLogicalWindowAggregateConverter
 FlinkConventions.LOGICAL,
 "FlinkLogicalWindowAggregateConverter") {
 
+  override def matches(call: RelOptRuleCall): Boolean = {
+val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate]
+
+// we do not support these functions natively
+// they have to be converted using the 
WindowAggregateReduceFunctionsRule
+val supported = 
agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
+  case SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | 
SqlKind.VAR_SAMP => false
--- End diff --

How about SqlKink.AVG_AGG_FUNCTIONS.contains(kind) && kind != SqlKind.SUM 
&& kind != SqlKind.AVG?


> Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in 
> Group Windows
> ---
>
> Key: FLINK-8903
> URL: https://issues.apache.org/jira/browse/FLINK-8903
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.3.2, 1.5.0, 1.4.2
>Reporter: lilizhao
>Assignee: Fabian Hueske
>Priority: Critical
> Fix For: 1.5.0
>
> Attachments: QQ图片20180312180143.jpg, TableAndSQLTest.java
>
>
> The built-in aggregation functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP 
> are translated into regular AVG functions if they are applied in the context 
> of a Group Window aggregation (\{{GROUP BY TUMBLE/HOP/SESSION}}).
> The reason is that these functions are internally represented as 
> {{SqlAvgAggFunction}} but with different {{SqlKind}}. When translating 
> Calcite aggregation functions to Flink Table agg functions, we only look at 
> the type of the class, not at the value of the {{kind}} field. We did not 
> notice that before, because in all other cases (regular {{GROUP BY}} without 
> windows or {{OVER}} windows, we have a translation rule 
> {{AggregateReduceFunctionsRule}} that decomposes the more complex functions 
> into expressions of {{COUNT}} and {{SUM}} functions such that we never 
> execute an {{AVG}} Flink function. That rule can only be applied on 
> {{LogicalAggregate}}, however, we represent group windows as 
> {{LogicalWindowAggregate}}, so the rule does not match.
> We should fix this by:
> 1. restrict the translation to Flink avg functions in {{AggregateUtil}} to 
> {{SqlKind.AVG}}. 
> 2. implement a rule (hopefully based on {{AggregateReduceFunctionsRule}}) 
> that decomposes the complex agg functions into the {{SUM}} and {{COUNT}}.
> Step 1. is easy and a quick fix but we would get an exception "Unsupported 
> Function" if {{VAR_POP}} is used in a {{GROUP BY}} window.
> Step 2. might be more involved, depending on how difficult it is to port the 
> rule.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8903) Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in Group Windows

2018-03-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16402335#comment-16402335
 ] 

ASF GitHub Bot commented on FLINK-8903:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5706#discussion_r175181100
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
 ---
@@ -103,6 +106,19 @@ class FlinkLogicalWindowAggregateConverter
 FlinkConventions.LOGICAL,
 "FlinkLogicalWindowAggregateConverter") {
 
+  override def matches(call: RelOptRuleCall): Boolean = {
+val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate]
+
+// we do not support these functions natively
+// they have to be converted using the 
WindowAggregateReduceFunctionsRule
+val supported = 
agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
+  case SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | 
SqlKind.VAR_SAMP => false
--- End diff --

Replacing the current code by `SqlKind.AVG_AGG_FUNCTIONS.contains()` lead 
to several test failures. These tests expected an `AVG` aggregation function 
that was now replaced by `SUM / COUNT`.


> Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in 
> Group Windows
> ---
>
> Key: FLINK-8903
> URL: https://issues.apache.org/jira/browse/FLINK-8903
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.3.2, 1.5.0, 1.4.2
>Reporter: lilizhao
>Assignee: Fabian Hueske
>Priority: Critical
> Fix For: 1.5.0
>
> Attachments: QQ图片20180312180143.jpg, TableAndSQLTest.java
>
>
> The built-in aggregation functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP 
> are translated into regular AVG functions if they are applied in the context 
> of a Group Window aggregation (\{{GROUP BY TUMBLE/HOP/SESSION}}).
> The reason is that these functions are internally represented as 
> {{SqlAvgAggFunction}} but with different {{SqlKind}}. When translating 
> Calcite aggregation functions to Flink Table agg functions, we only look at 
> the type of the class, not at the value of the {{kind}} field. We did not 
> notice that before, because in all other cases (regular {{GROUP BY}} without 
> windows or {{OVER}} windows, we have a translation rule 
> {{AggregateReduceFunctionsRule}} that decomposes the more complex functions 
> into expressions of {{COUNT}} and {{SUM}} functions such that we never 
> execute an {{AVG}} Flink function. That rule can only be applied on 
> {{LogicalAggregate}}, however, we represent group windows as 
> {{LogicalWindowAggregate}}, so the rule does not match.
> We should fix this by:
> 1. restrict the translation to Flink avg functions in {{AggregateUtil}} to 
> {{SqlKind.AVG}}. 
> 2. implement a rule (hopefully based on {{AggregateReduceFunctionsRule}}) 
> that decomposes the complex agg functions into the {{SUM}} and {{COUNT}}.
> Step 1. is easy and a quick fix but we would get an exception "Unsupported 
> Function" if {{VAR_POP}} is used in a {{GROUP BY}} window.
> Step 2. might be more involved, depending on how difficult it is to port the 
> rule.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8903) Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in Group Windows

2018-03-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16402332#comment-16402332
 ] 

ASF GitHub Bot commented on FLINK-8903:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5706
  
Updated the PR with the Calcite issue. 


> Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in 
> Group Windows
> ---
>
> Key: FLINK-8903
> URL: https://issues.apache.org/jira/browse/FLINK-8903
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.3.2, 1.5.0, 1.4.2
>Reporter: lilizhao
>Assignee: Fabian Hueske
>Priority: Critical
> Fix For: 1.5.0
>
> Attachments: QQ图片20180312180143.jpg, TableAndSQLTest.java
>
>
> The built-in aggregation functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP 
> are translated into regular AVG functions if they are applied in the context 
> of a Group Window aggregation (\{{GROUP BY TUMBLE/HOP/SESSION}}).
> The reason is that these functions are internally represented as 
> {{SqlAvgAggFunction}} but with different {{SqlKind}}. When translating 
> Calcite aggregation functions to Flink Table agg functions, we only look at 
> the type of the class, not at the value of the {{kind}} field. We did not 
> notice that before, because in all other cases (regular {{GROUP BY}} without 
> windows or {{OVER}} windows, we have a translation rule 
> {{AggregateReduceFunctionsRule}} that decomposes the more complex functions 
> into expressions of {{COUNT}} and {{SUM}} functions such that we never 
> execute an {{AVG}} Flink function. That rule can only be applied on 
> {{LogicalAggregate}}, however, we represent group windows as 
> {{LogicalWindowAggregate}}, so the rule does not match.
> We should fix this by:
> 1. restrict the translation to Flink avg functions in {{AggregateUtil}} to 
> {{SqlKind.AVG}}. 
> 2. implement a rule (hopefully based on {{AggregateReduceFunctionsRule}}) 
> that decomposes the complex agg functions into the {{SUM}} and {{COUNT}}.
> Step 1. is easy and a quick fix but we would get an exception "Unsupported 
> Function" if {{VAR_POP}} is used in a {{GROUP BY}} window.
> Step 2. might be more involved, depending on how difficult it is to port the 
> rule.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8903) Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in Group Windows

2018-03-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16401737#comment-16401737
 ] 

ASF GitHub Bot commented on FLINK-8903:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5706#discussion_r175055646
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/AggregateReduceFunctionsRule.java
 ---
@@ -0,0 +1,590 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeUtil;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.CompositeList;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Util;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/*
+ * THIS FILE HAS BEEN COPIED FROM THE APACHE CALCITE PROJECT TO MAKE IT 
MORE EXTENSIBLE.
--- End diff --

Yes, I agree. I would be much better to have this in code in Calcite. 
However, the changes are very Flink specific (we need to add a few fields 
to the projection).
OTOH its just moving some code in a protected function, so no change in 
functionality and only few lines touched.

I'll create a JIRA in Calcite and reference the issue. In case, Calcite 
does not want the change, we can keep the class in Flink.


> Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in 
> Group Windows
> ---
>
> Key: FLINK-8903
> URL: https://issues.apache.org/jira/browse/FLINK-8903
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.3.2, 1.5.0, 1.4.2
>Reporter: lilizhao
>Assignee: Fabian Hueske
>Priority: Critical
> Fix For: 1.5.0
>
> Attachments: QQ图片20180312180143.jpg, TableAndSQLTest.java
>
>
> The built-in aggregation functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP 
> are translated into regular AVG functions if they are applied in the context 
> of a Group Window aggregation (\{{GROUP BY TUMBLE/HOP/SESSION}}).
> The reason is that these functions are internally represented as 
> {{SqlAvgAggFunction}} but with different {{SqlKind}}. When translating 
> Calcite aggregation functions to Flink Table agg functions, we only look at 
> the type of the class, not at the value of the {{kind}} field. We did not 
> notice that before, because in all other cases (regular {{GROUP BY}} without 
> windows or {{OVER}} windows, we have a translation rule 
> {{AggregateReduceFunctionsRule}} that decomposes the more complex functions 
> into expressions of {{COUNT}} and {{SUM}} 

[jira] [Commented] (FLINK-8903) Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in Group Windows

2018-03-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16401746#comment-16401746
 ] 

ASF GitHub Bot commented on FLINK-8903:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5706#discussion_r175056688
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
 ---
@@ -103,6 +106,19 @@ class FlinkLogicalWindowAggregateConverter
 FlinkConventions.LOGICAL,
 "FlinkLogicalWindowAggregateConverter") {
 
+  override def matches(call: RelOptRuleCall): Boolean = {
+val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate]
+
+// we do not support these functions natively
+// they have to be converted using the 
WindowAggregateReduceFunctionsRule
+val supported = 
agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
+  case SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | 
SqlKind.VAR_SAMP => false
--- End diff --

We have a built-in function for AVG (which we don't really need anymore) 
and SUM, so we could translate such plans.
But I agree, using SqlKind.AVG_AGG_FUNCTIONS.contains() is better.


> Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in 
> Group Windows
> ---
>
> Key: FLINK-8903
> URL: https://issues.apache.org/jira/browse/FLINK-8903
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.3.2, 1.5.0, 1.4.2
>Reporter: lilizhao
>Assignee: Fabian Hueske
>Priority: Critical
> Fix For: 1.5.0
>
> Attachments: QQ图片20180312180143.jpg, TableAndSQLTest.java
>
>
> The built-in aggregation functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP 
> are translated into regular AVG functions if they are applied in the context 
> of a Group Window aggregation (\{{GROUP BY TUMBLE/HOP/SESSION}}).
> The reason is that these functions are internally represented as 
> {{SqlAvgAggFunction}} but with different {{SqlKind}}. When translating 
> Calcite aggregation functions to Flink Table agg functions, we only look at 
> the type of the class, not at the value of the {{kind}} field. We did not 
> notice that before, because in all other cases (regular {{GROUP BY}} without 
> windows or {{OVER}} windows, we have a translation rule 
> {{AggregateReduceFunctionsRule}} that decomposes the more complex functions 
> into expressions of {{COUNT}} and {{SUM}} functions such that we never 
> execute an {{AVG}} Flink function. That rule can only be applied on 
> {{LogicalAggregate}}, however, we represent group windows as 
> {{LogicalWindowAggregate}}, so the rule does not match.
> We should fix this by:
> 1. restrict the translation to Flink avg functions in {{AggregateUtil}} to 
> {{SqlKind.AVG}}. 
> 2. implement a rule (hopefully based on {{AggregateReduceFunctionsRule}}) 
> that decomposes the complex agg functions into the {{SUM}} and {{COUNT}}.
> Step 1. is easy and a quick fix but we would get an exception "Unsupported 
> Function" if {{VAR_POP}} is used in a {{GROUP BY}} window.
> Step 2. might be more involved, depending on how difficult it is to port the 
> rule.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8903) Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in Group Windows

2018-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16401281#comment-16401281
 ] 

ASF GitHub Bot commented on FLINK-8903:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5706#discussion_r174962566
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/AggregateReduceFunctionsRule.java
 ---
@@ -0,0 +1,590 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeUtil;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.CompositeList;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Util;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/*
+ * THIS FILE HAS BEEN COPIED FROM THE APACHE CALCITE PROJECT TO MAKE IT 
MORE EXTENSIBLE.
--- End diff --

I think we should create a Calcite JIRA to allow 
AggregateReduceFunctionsRule in Calcite to support this extension, document the 
JIRA ticket here. And remove this overwrite once Calcite is upgraded. What do 
you think?


> Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in 
> Group Windows
> ---
>
> Key: FLINK-8903
> URL: https://issues.apache.org/jira/browse/FLINK-8903
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.3.2, 1.5.0, 1.4.2
>Reporter: lilizhao
>Assignee: Fabian Hueske
>Priority: Critical
> Fix For: 1.5.0
>
> Attachments: QQ图片20180312180143.jpg, TableAndSQLTest.java
>
>
> The built-in aggregation functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP 
> are translated into regular AVG functions if they are applied in the context 
> of a Group Window aggregation (\{{GROUP BY TUMBLE/HOP/SESSION}}).
> The reason is that these functions are internally represented as 
> {{SqlAvgAggFunction}} but with different {{SqlKind}}. When translating 
> Calcite aggregation functions to Flink Table agg functions, we only look at 
> the type of the class, not at the value of the {{kind}} field. We did not 
> notice that before, because in all other cases (regular {{GROUP BY}} without 
> windows or {{OVER}} windows, we have a translation rule 
> {{AggregateReduceFunctionsRule}} that decomposes the more complex functions 
> into expressions of {{COUNT}} and {{SUM}} functions such that we never 
> execute an {{AVG}} Flink function. That rule can only be applied on 
> {{LogicalAggregate}}, however, we represent group windows as 
> {{LogicalWindowAggregate}}, so the rule does 

[jira] [Commented] (FLINK-8903) Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in Group Windows

2018-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16401280#comment-16401280
 ] 

ASF GitHub Bot commented on FLINK-8903:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5706#discussion_r174965171
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
 ---
@@ -103,6 +106,19 @@ class FlinkLogicalWindowAggregateConverter
 FlinkConventions.LOGICAL,
 "FlinkLogicalWindowAggregateConverter") {
 
+  override def matches(call: RelOptRuleCall): Boolean = {
+val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate]
+
+// we do not support these functions natively
+// they have to be converted using the 
WindowAggregateReduceFunctionsRule
+val supported = 
agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
+  case SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | 
SqlKind.VAR_SAMP => false
--- End diff --

How about AVG and SUM? they are also in AggregateReduceFunctionsRule. Also, 
I think it's better to use SqlKind.AVG_AGG_FUNCTIONS.contains() or 
AggregateReduceFunctionsRule.isReducible() (it's private now though) here in 
the case statement. So it will keep consistent if calcite changes.


> Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in 
> Group Windows
> ---
>
> Key: FLINK-8903
> URL: https://issues.apache.org/jira/browse/FLINK-8903
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.3.2, 1.5.0, 1.4.2
>Reporter: lilizhao
>Assignee: Fabian Hueske
>Priority: Critical
> Fix For: 1.5.0
>
> Attachments: QQ图片20180312180143.jpg, TableAndSQLTest.java
>
>
> The built-in aggregation functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP 
> are translated into regular AVG functions if they are applied in the context 
> of a Group Window aggregation (\{{GROUP BY TUMBLE/HOP/SESSION}}).
> The reason is that these functions are internally represented as 
> {{SqlAvgAggFunction}} but with different {{SqlKind}}. When translating 
> Calcite aggregation functions to Flink Table agg functions, we only look at 
> the type of the class, not at the value of the {{kind}} field. We did not 
> notice that before, because in all other cases (regular {{GROUP BY}} without 
> windows or {{OVER}} windows, we have a translation rule 
> {{AggregateReduceFunctionsRule}} that decomposes the more complex functions 
> into expressions of {{COUNT}} and {{SUM}} functions such that we never 
> execute an {{AVG}} Flink function. That rule can only be applied on 
> {{LogicalAggregate}}, however, we represent group windows as 
> {{LogicalWindowAggregate}}, so the rule does not match.
> We should fix this by:
> 1. restrict the translation to Flink avg functions in {{AggregateUtil}} to 
> {{SqlKind.AVG}}. 
> 2. implement a rule (hopefully based on {{AggregateReduceFunctionsRule}}) 
> that decomposes the complex agg functions into the {{SUM}} and {{COUNT}}.
> Step 1. is easy and a quick fix but we would get an exception "Unsupported 
> Function" if {{VAR_POP}} is used in a {{GROUP BY}} window.
> Step 2. might be more involved, depending on how difficult it is to port the 
> rule.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8903) Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in Group Windows

2018-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16401160#comment-16401160
 ] 

ASF GitHub Bot commented on FLINK-8903:
---

GitHub user fhueske opened a pull request:

https://github.com/apache/flink/pull/5706

[FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDEV_SAMP, STDEV_POP functions 
on GROUP BY windows.

## What is the purpose of the change

* Fixes the computation of `VAR_SAMP`, `VAR_POP`, `STDDEV_SAMP`, 
`STDDEV_POP` aggregations in the context of `GROUP BY` windows (`TUMBLE`, 
`HOP`, `SESSION`). Right now, these methods are computed as `AVG`.

## Brief change log

* copy Calcite's `AggregateReduceFunctionsRule` to Flink and improve its 
extensibility
* add a `WindowAggregateReduceFunctionsRule` based on the copied 
`AggregateReduceFunctionsRule` to decompose the faulty aggregation functions 
into `COUNT` and `SUM` functions.
* add restriction to `FlinkLogicalWindowAggregateConverter` to prevent 
translation of group window aggregates with failing aggregation functions
* prevent translation of `VAR_SAMP`, `VAR_POP`, `STDDEV_SAMP`, `STDDEV_POP` 
in `AggregateUtil`
* add unit tests (plan validation) for batch (SQL, Table API) and stream 
(SQL, Table API)

## Verifying this change

* run the added plan tests

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
  - The S3 file system connector: **no**

## Documentation

  - Does this pull request introduce a new feature? **no**
  - If yes, how is the feature documented? **n/a**


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fhueske/flink tableVarStddevAggFix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5706.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5706


commit 517567348b0ec0c23ef0c1dcc05c54a91d5c5671
Author: Fabian Hueske 
Date:   2018-03-15T20:04:00Z

[FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDEV_SAMP, STDEV_POP functions 
on GROUP BY windows.




> Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in 
> Group Windows
> ---
>
> Key: FLINK-8903
> URL: https://issues.apache.org/jira/browse/FLINK-8903
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.3.2, 1.5.0, 1.4.2
>Reporter: lilizhao
>Assignee: Fabian Hueske
>Priority: Critical
> Fix For: 1.5.0
>
> Attachments: QQ图片20180312180143.jpg, TableAndSQLTest.java
>
>
> The built-in aggregation functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP 
> are translated into regular AVG functions if they are applied in the context 
> of a Group Window aggregation (\{{GROUP BY TUMBLE/HOP/SESSION}}).
> The reason is that these functions are internally represented as 
> {{SqlAvgAggFunction}} but with different {{SqlKind}}. When translating 
> Calcite aggregation functions to Flink Table agg functions, we only look at 
> the type of the class, not at the value of the {{kind}} field. We did not 
> notice that before, because in all other cases (regular {{GROUP BY}} without 
> windows or {{OVER}} windows, we have a translation rule 
> {{AggregateReduceFunctionsRule}} that decomposes the more complex functions 
> into expressions of {{COUNT}} and {{SUM}} functions such that we never 
> execute an {{AVG}} Flink function. That rule can only be applied on 
> {{LogicalAggregate}}, however, we represent group windows as 
> {{LogicalWindowAggregate}}, so the rule does not match.
> We should fix this by:
> 1. restrict the translation to Flink avg functions in {{AggregateUtil}} to 
> {{SqlKind.AVG}}. 
> 2. implement a rule (hopefully based on {{AggregateReduceFunctionsRule}}) 
> that decomposes the complex agg functions into the {{SUM}} and {{COUNT}}.
> Step 1. is easy and a quick fix but we would get an exception "Unsupported 
> Function" if {{VAR_POP}} is used in a {{GROUP BY}} window.
> Step 2. might be more involved, depending on how difficult it is to port the 
> rule.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8903) Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in Group Windows

2018-03-15 Thread Shuyi Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16400990#comment-16400990
 ] 

Shuyi Chen commented on FLINK-8903:
---

Hi [~fhueske], please work on it if you are already in the middle. I am just 
interested as well, and thought you might be busy. The way how the 
VolcanoPlanner pick the plan might be tricky AFAIR. Let me know if I can help.

> Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in 
> Group Windows
> ---
>
> Key: FLINK-8903
> URL: https://issues.apache.org/jira/browse/FLINK-8903
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.3.2, 1.5.0, 1.4.2
>Reporter: lilizhao
>Assignee: Shuyi Chen
>Priority: Critical
> Fix For: 1.5.0
>
> Attachments: QQ图片20180312180143.jpg, TableAndSQLTest.java
>
>
> The built-in aggregation functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP 
> are translated into regular AVG functions if they are applied in the context 
> of a Group Window aggregation (\{{GROUP BY TUMBLE/HOP/SESSION}}).
> The reason is that these functions are internally represented as 
> {{SqlAvgAggFunction}} but with different {{SqlKind}}. When translating 
> Calcite aggregation functions to Flink Table agg functions, we only look at 
> the type of the class, not at the value of the {{kind}} field. We did not 
> notice that before, because in all other cases (regular {{GROUP BY}} without 
> windows or {{OVER}} windows, we have a translation rule 
> {{AggregateReduceFunctionsRule}} that decomposes the more complex functions 
> into expressions of {{COUNT}} and {{SUM}} functions such that we never 
> execute an {{AVG}} Flink function. That rule can only be applied on 
> {{LogicalAggregate}}, however, we represent group windows as 
> {{LogicalWindowAggregate}}, so the rule does not match.
> We should fix this by:
> 1. restrict the translation to Flink avg functions in {{AggregateUtil}} to 
> {{SqlKind.AVG}}. 
> 2. implement a rule (hopefully based on {{AggregateReduceFunctionsRule}}) 
> that decomposes the complex agg functions into the {{SUM}} and {{COUNT}}.
> Step 1. is easy and a quick fix but we would get an exception "Unsupported 
> Function" if {{VAR_POP}} is used in a {{GROUP BY}} window.
> Step 2. might be more involved, depending on how difficult it is to port the 
> rule.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8903) Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in Group Windows

2018-03-15 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16400963#comment-16400963
 ] 

Fabian Hueske commented on FLINK-8903:
--

Thanks Shuyi!

In fact, I was curious how hard it would be to implement a rule and started 
working on this issue. It was quite easy and the rule seems to work, however, 
the correct plan is not used. I need to dig deeper into Calcite to figure out 
what's going on.

If you'd like to do that, I can also share my branch and you can finish what I 
started. Just let me know...

> Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in 
> Group Windows
> ---
>
> Key: FLINK-8903
> URL: https://issues.apache.org/jira/browse/FLINK-8903
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.3.2, 1.5.0, 1.4.2
>Reporter: lilizhao
>Assignee: Shuyi Chen
>Priority: Critical
> Fix For: 1.5.0
>
> Attachments: QQ图片20180312180143.jpg, TableAndSQLTest.java
>
>
> The built-in aggregation functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP 
> are translated into regular AVG functions if they are applied in the context 
> of a Group Window aggregation (\{{GROUP BY TUMBLE/HOP/SESSION}}).
> The reason is that these functions are internally represented as 
> {{SqlAvgAggFunction}} but with different {{SqlKind}}. When translating 
> Calcite aggregation functions to Flink Table agg functions, we only look at 
> the type of the class, not at the value of the {{kind}} field. We did not 
> notice that before, because in all other cases (regular {{GROUP BY}} without 
> windows or {{OVER}} windows, we have a translation rule 
> {{AggregateReduceFunctionsRule}} that decomposes the more complex functions 
> into expressions of {{COUNT}} and {{SUM}} functions such that we never 
> execute an {{AVG}} Flink function. That rule can only be applied on 
> {{LogicalAggregate}}, however, we represent group windows as 
> {{LogicalWindowAggregate}}, so the rule does not match.
> We should fix this by:
> 1. restrict the translation to Flink avg functions in {{AggregateUtil}} to 
> {{SqlKind.AVG}}. 
> 2. implement a rule (hopefully based on {{AggregateReduceFunctionsRule}}) 
> that decomposes the complex agg functions into the {{SUM}} and {{COUNT}}.
> Step 1. is easy and a quick fix but we would get an exception "Unsupported 
> Function" if {{VAR_POP}} is used in a {{GROUP BY}} window.
> Step 2. might be more involved, depending on how difficult it is to port the 
> rule.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8903) Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in Group Windows

2018-03-15 Thread Shuyi Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16400925#comment-16400925
 ] 

Shuyi Chen commented on FLINK-8903:
---

[~fhueske], I can help take a look at this issue.

> Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in 
> Group Windows
> ---
>
> Key: FLINK-8903
> URL: https://issues.apache.org/jira/browse/FLINK-8903
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.3.2, 1.5.0, 1.4.2
>Reporter: lilizhao
>Assignee: Shuyi Chen
>Priority: Critical
> Fix For: 1.5.0
>
> Attachments: QQ图片20180312180143.jpg, TableAndSQLTest.java
>
>
> The built-in aggregation functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP 
> are translated into regular AVG functions if they are applied in the context 
> of a Group Window aggregation (\{{GROUP BY TUMBLE/HOP/SESSION}}).
> The reason is that these functions are internally represented as 
> {{SqlAvgAggFunction}} but with different {{SqlKind}}. When translating 
> Calcite aggregation functions to Flink Table agg functions, we only look at 
> the type of the class, not at the value of the {{kind}} field. We did not 
> notice that before, because in all other cases (regular {{GROUP BY}} without 
> windows or {{OVER}} windows, we have a translation rule 
> {{AggregateReduceFunctionsRule}} that decomposes the more complex functions 
> into expressions of {{COUNT}} and {{SUM}} functions such that we never 
> execute an {{AVG}} Flink function. That rule can only be applied on 
> {{LogicalAggregate}}, however, we represent group windows as 
> {{LogicalWindowAggregate}}, so the rule does not match.
> We should fix this by:
> 1. restrict the translation to Flink avg functions in {{AggregateUtil}} to 
> {{SqlKind.AVG}}. 
> 2. implement a rule (hopefully based on {{AggregateReduceFunctionsRule}}) 
> that decomposes the complex agg functions into the {{SUM}} and {{COUNT}}.
> Step 1. is easy and a quick fix but we would get an exception "Unsupported 
> Function" if {{VAR_POP}} is used in a {{GROUP BY}} window.
> Step 2. might be more involved, depending on how difficult it is to port the 
> rule.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)